In [1]:
import os
import json
from PIL import Image
import torch
from torch.utils.data import Dataset
from torchvision import transforms

class CustomDataset(Dataset):
    def __init__(self, annotations_dir, img_dir, transform=None):
        self.img_labels = []
        self.img_paths = []
        self.transform = transform  # Assign it once
        self.class_to_idx = {'rect': 1, 'circle': 2, 'error_rect': 3, 'error_circle': 4}  # Define your mapping here
        
        # Load annotations and image paths
        annotation_files = sorted(os.listdir(annotations_dir))
        for annotation_file in annotation_files:
            annotation_path = os.path.join(annotations_dir, annotation_file)
            with open(annotation_path, 'r') as file:
                annotation = json.load(file)
                self.img_labels.append(annotation['objects'])
            
            # Assuming image files are named with the same prefix as annotation files
            img_file = annotation_file.replace('.json', '.png')
            img_path = os.path.join(img_dir, img_file)
            self.img_paths.append(img_path)
    
    def __len__(self):
        return len(self.img_labels)
    
    def __getitem__(self, idx):
        # Load image
        img_path = self.img_paths[idx]
        img = Image.open(img_path).convert("RGB")
    
        if self.transform:
            img = self.transform(img)
        
        # Load annotations
        annotations = self.img_labels[idx]
        boxes = []
        labels = []
        for annotation in annotations:
            bbox = annotation['bbox']
            xmin = bbox[0]
            ymin = bbox[1]
            xmax = xmin + bbox[2]  # Since width = size
            ymax = ymin + bbox[3]  # Since height = size
            boxes.append([xmin, ymin, xmax, ymax])
            label = annotation['label']  # Keeping label as string
            labels.append(label)
        
        if not boxes:  # Check if boxes is empty
            # Handle the case where there are no boxes
            # For example, you might return a default value, or skip this frame
            boxes = torch.zeros((0, 4), dtype=torch.float32)
            labels = torch.zeros(0, dtype=torch.int64)
        else:
            # Convert boxes to tensors, but keep labels as a list of strings
            boxes = torch.as_tensor(boxes, dtype=torch.float32)
            labels = torch.as_tensor([self.class_to_idx[label] for label in labels], dtype=torch.int64)
        
        # Create a dictionary for target
        target = {"boxes": boxes, "labels": labels,}
        
        return img, target


In [2]:
import torch
from torch.utils.data import DataLoader
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

from torch.optim import lr_scheduler


# Load your custom dataset
transform = transforms.Compose([transforms.ToTensor()])
dataset = CustomDataset(annotations_dir='metadata', img_dir='images', transform=transform)

# Split dataset into train and validation sets
n_train = int(len(dataset) * 0.8)
n_val = len(dataset) - n_train
train_dataset, val_dataset = torch.utils.data.random_split(dataset, [n_train, n_val])

# Define data loaders
def collate_fn(batch):
    images = [item[0] for item in batch]
    targets = [item[1] for item in batch]
    return images, targets



data_loader = DataLoader(train_dataset, batch_size=2, shuffle=True,collate_fn=collate_fn)
data_loader_val = DataLoader(val_dataset, batch_size=1, shuffle=False,collate_fn=collate_fn)

# Load a pre-trained Faster R-CNN model
model = fasterrcnn_resnet50_fpn(pretrained=True)

# Number of classes is 2 (background and your object class)
num_classes = 5

# Get the number of input features for the classifier
in_features = model.roi_heads.box_predictor.cls_score.in_features

# Replace the pre-trained head with a new one
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

# Move model to the right device
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

# Construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)


# Train for 10 epochs (you might need to increase this)
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    i = 0
    epoch_loss = 0
    for imgs, annotations in data_loader:
        imgs = list(img.to(device) for img in imgs)
        annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]
        
        loss_dict = model(imgs, annotations)
        losses = sum(loss for loss in loss_dict.values())
        
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()
        
        epoch_loss += losses.item()
        i += 1
        
        if i % 10 == 0:
            print(f"Iteration #{i} loss: {losses.item()}")
    
    # Print the total loss for the epoch
    print(f"Epoch #{epoch} loss: {epoch_loss}")
    lr_scheduler.step()

# You might want to save the model when training is finished
torch.save(model.state_dict(), 'model_weights.pth')




Iteration #10 loss: 1.163272738456726
Iteration #20 loss: 0.5335590839385986
Iteration #30 loss: 0.44286489486694336


KeyboardInterrupt: 

In [15]:
torch.save(model.state_dict(), 'model_weights2.pth')

In [3]:
import os
import json
from PIL import Image
import torch
from torch.utils.data import Dataset
from torchvision import transforms
import torch
from torch.utils.data import DataLoader
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

# Load a pre-trained Faster R-CNN model
model = fasterrcnn_resnet50_fpn(pretrained=False)

# Number of classes is 5 (background + your 4 classes)
num_classes = 5

# Get the number of input features for the classifier
in_features = model.roi_heads.box_predictor.cls_score.in_features

# Replace the pre-trained head with a new one
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

# Load the trained weights
model.load_state_dict(torch.load('model_weights2.pth'))

# Set the model to evaluation mode
model.eval()



FasterRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d(64, eps=1e-05)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d(64, eps=1e-05)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d(64, eps=1e-05)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): FrozenBatchNorm2d(256, eps=1e-05)
          (relu

In [4]:
# Load an image from file
image = Image.open("images/frame_0122.png").convert("RGB")

# Define the transformation
transform = transforms.Compose([transforms.ToTensor()])

# Apply the transformation to the image
image = transform(image).unsqueeze(0)  # Add batch dimension

# Perform inference
with torch.no_grad():
    prediction = model(image)

# Print the prediction
print(prediction)

[{'boxes': tensor([[112.7234, 139.6666, 163.3908, 189.4779],
        [133.2630,  99.7954, 183.4214, 149.9316],
        [ 93.2143, 132.2568, 143.2755, 181.8841],
        [ 13.4508, 256.2191,  63.0322, 305.5014],
        [ 72.9193, 168.2786, 123.2876, 219.0601],
        [ 32.3059, 208.4922, 107.5272, 284.5913]]), 'labels': tensor([1, 1, 2, 1, 1, 4]), 'scores': tensor([0.9998, 0.9994, 0.9993, 0.9993, 0.9988, 0.9965])}]


In [5]:
import os
import json
from PIL import Image
import torch
from torchvision import transforms

import torch
from torch.utils.data import DataLoader
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor


torch.cuda.empty_cache()
# Set the path to your images folder
images_folder = 'images'

# Create a predictions folder if it doesn't exist
if not os.path.exists('predictions'):
    os.makedirs('predictions')

# Set the batch size
batch_size = 4  # Adjust this based on your available memory

# Initialize variables to hold batch data
image_batch = []
file_batch = []

# Define the transform
transform = transforms.Compose([transforms.ToTensor()])  # Add any other required transforms

# Set the device (CPU or CUDA)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Ensure your model is on the correct device
model.to(device)
model.eval()  # Set the model to evaluation mode

# Iterate over all images in the images folder
for image_file in os.listdir(images_folder):
    # Check if the file is an image
    if image_file.lower().endswith(('.png', '.jpg', '.jpeg')):
        # Load and transform the image
        image_path = os.path.join(images_folder, image_file)
        image = Image.open(image_path).convert("RGB")
        image = transform(image).to(device)
        
        # Add the image and filename to the batch
        image_batch.append(image)
        file_batch.append(image_file)
        
        # Check if the batch is full
        if len(image_batch) == batch_size:
            # Convert image batch to tensor
            image_tensor = torch.stack(image_batch).to(device)
            
            # Perform inference on the batch
            with torch.no_grad():
                predictions = model(image_tensor)
            
            # Save predictions
            for i, prediction in enumerate(predictions):
                prediction_dict = {key: prediction[key].cpu().tolist() for key in prediction}
                prediction_file = file_batch[i].rsplit('.', 1)[0] + '_prediction.json'
                prediction_path = os.path.join('predictions', prediction_file)
                with open(prediction_path, 'w') as file:
                    json.dump(prediction_dict, file)
            
            # Clear the batch
            image_batch = []
            file_batch = []

# Handle any remaining images in the batch
if image_batch:
    image_tensor = torch.stack(image_batch).to(device)
    with torch.no_grad():
        predictions = model(image_tensor)
    for i, prediction in enumerate(predictions):
        prediction_dict = {key: prediction[key].cpu().tolist() for key in prediction}
        prediction_file = file_batch[i].rsplit('.', 1)[0] + '_prediction.json'
        prediction_path = os.path.join('predictions', prediction_file)
        with open(prediction_path, 'w') as file:
            json.dump(prediction_dict, file)
