In [1]:
import os
import torch
import torchvision
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision.models.detection import maskrcnn_resnet50_fpn
from torchvision.transforms.functional import to_tensor
from torchvision.io import read_image
from torchvision.ops import masks_to_boxes
from torchvision.utils import draw_bounding_boxes, draw_segmentation_masks
from engine import train_one_epoch, evaluate
import matplotlib.pyplot as plt
import cv2
import numpy as np
import datetime


In [2]:
# os.environ["CUDA_LAUNCH_BLOCKING"] = "1"


In [3]:
class ImageDataset(Dataset):
    def __init__(self, image_folder='rgbd-dataset\\food_can\\'):
        self.image_folder = image_folder
        self.image_info = []
        
        for filename in os.listdir(image_folder):
            if filename.endswith('.png') and not filename.endswith('mask.png') and not filename.endswith('depth.png'):
                image_path = os.path.join(image_folder, filename)
                mask_path = image_path.replace('.png', '_mask.png')
                # print(f"Image_path: {image_path}\nMask_path: {mask_path}")
                self.image_info.append({'image': image_path, 'mask': mask_path})

    def __len__(self):
        return len(self.image_info)
    
    def __getitem__(self, idx):
        image_path = self.image_info[idx]['image']
        mask_path = self.image_info[idx]['mask']
        
        image = read_image(image_path).to(torch.float32)  # Convert image to PyTorch tensor
        mask = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)  # Load mask using OpenCV
        mask_tensor = torch.tensor(mask > 127, dtype=torch.uint8)  # Convert mask to PyTorch tensor

        # Ensure we have a non-singleton dimension before squeezing
        if mask_tensor.dim() > 0 and mask_tensor.size(0) == 1:
            mask_tensor = mask_tensor.squeeze(0)

        # Use cv2.findContours to get bounding box from the mask
        contours, _ = cv2.findContours(mask_tensor.numpy(), cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        if contours:
            # Get the bounding box for the largest contour
            x, y, w, h = cv2.boundingRect(contours[0])
            boxes = torch.tensor([[x, y, x+w, y+h]], dtype=torch.float32)
        else:
            # If no contours are found, provide a dummy box
            boxes = torch.tensor([[0, 0, 1, 1]], dtype=torch.float32)
        
        target = {
        'boxes': boxes,
        'labels': torch.tensor([1], dtype=torch.int64), # For example, '2' for cans
        'masks': mask_tensor.unsqueeze(0)
                }
        return image, target



In [4]:
def custom_collate_fn(batch):
    images = [item[0] for item in batch]
    targets = [item[1] for item in batch]
    return images, targets


def create_model(num_classes):
    model = maskrcnn_resnet50_fpn(pretrained=True)
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    
    model.roi_heads.box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(in_features, num_classes)
    in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
    
    hidden_layer = 256
    
    model.roi_heads.mask_predictor = torchvision.models.detection.mask_rcnn.MaskRCNNPredictor(in_features_mask, hidden_layer, num_classes)
    return model

def train_model():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using {torch.cuda.get_device_name(device)}")

    dataset = ImageDataset()
    model = create_model(2)  # For background, can, others
    model.to(device)

    train_size = int(0.8 * len(dataset))
    test_size = len(dataset) - train_size
    train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

    train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, collate_fn=custom_collate_fn)
    test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False)

    optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=0.0005)
    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=5, gamma=0.5)

    num_epochs = 10
    for epoch in range(num_epochs):
        train_one_epoch(model, optimizer, train_loader, device, epoch, print_freq=10)
        lr_scheduler.step()
        torch.save(model.state_dict(), f'model_epoch{epoch}.pth')

    torch.save(model.state_dict(), 'final_model.pth')
    evaluate(model, test_loader, device=device)

def load_and_test_model(model_path):
    model = create_model(2)
    model.load_state_dict(torch.load(model_path))
    model.eval()
    dataset = ImageDataset()
    sample_image, target = dataset[0]
    prediction = model([sample_image.to(model.device)])[0]
    print(prediction)


In [5]:
train_model()
load_and_test_model('final_model.pth')

Using NVIDIA GeForce RTX 4070




Epoch: [0]  [  0/478]  eta: 0:13:08  lr: 0.000003  loss: 205.8944 (205.8944)  loss_classifier: 3.2452 (3.2452)  loss_box_reg: 0.5106 (0.5106)  loss_mask: 190.0376 (190.0376)  loss_objectness: 10.9530 (10.9530)  loss_rpn_box_reg: 1.1480 (1.1480)  time: 1.6503  data: 0.0926  max mem: 6740
Epoch: [0]  [ 10/478]  eta: 0:06:14  lr: 0.000024  loss: 32.6096 (51.6741)  loss_classifier: 0.2835 (0.8179)  loss_box_reg: 0.4287 (0.4483)  loss_mask: 24.4254 (42.2699)  loss_objectness: 5.7762 (7.4137)  loss_rpn_box_reg: 0.6743 (0.7244)  time: 0.7995  data: 0.0867  max mem: 6906
Epoch: [0]  [ 20/478]  eta: 0:05:45  lr: 0.000045  loss: 11.8524 (30.8504)  loss_classifier: 0.2778 (0.5632)  loss_box_reg: 0.3815 (0.3560)  loss_mask: 6.5951 (23.8839)  loss_objectness: 5.1585 (5.5099)  loss_rpn_box_reg: 0.4349 (0.5374)  time: 0.7097  data: 0.0843  max mem: 6906
Epoch: [0]  [ 30/478]  eta: 0:05:32  lr: 0.000066  loss: 4.7056 (22.1612)  loss_classifier: 0.2722 (0.5077)  loss_box_reg: 0.2877 (0.3452)  loss_mask

AttributeError: 'MaskRCNN' object has no attribute 'device'

In [None]:
# import os
# import numpy as np
# from PIL import Image

# from tqdm import tqdm

# import torch
# from torch.utils.data import Dataset
# from torch.utils.data import DataLoader
# import torchvision.transforms as transforms
# from torchvision.transforms import functional as F

# from torchvision.models.detection import maskrcnn_resnet50_fpn
# from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
# from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor


In [None]:
# class FoodCanDataset(Dataset):
#     def __init__(self, root, transforms=None):
#         self.root = root
#         self.transforms = transforms
#         self.imgs = []
#         self.masks = []

#         # Load all files, filtering out non-image and mask files
#         all_files = os.listdir(root)
#         self.imgs = [os.path.join(root, f) for f in all_files if f.endswith('.png') and not f.endswith('_mask.png') and not f.endswith('_depth.png')]
#         self.masks = [f"{img[:-4]}_mask.png" for img in self.imgs]  # Corresponding mask for each image

#         print(f"Found {len(self.imgs)} images and {len(self.masks)} masks.")

#     def __getitem__(self, idx):
#         img_path = self.imgs[idx]
#         mask_path = self.masks[idx]

#         img = Image.open(img_path).convert("RGB")
#         mask = Image.open(mask_path)
#         mask = np.array(mask)  # Convert mask image to numpy array

#         # Process mask data
#         obj_ids = np.unique(mask)
#         obj_ids = obj_ids[1:]  # Remove the background

#         masks = mask == obj_ids[:, None, None]
#         num_objs = len(obj_ids)
#         boxes = []
#         for i in range(num_objs):
#             pos = np.where(masks[i])
#             xmin = np.min(pos[1])
#             xmax = np.max(pos[1])
#             ymin = np.min(pos[0])
#             ymax = np.max(pos[0])
#             boxes.append([xmin, ymin, xmax, ymax])

#         boxes = torch.as_tensor(boxes, dtype=torch.float32)
#         labels = torch.ones((num_objs,), dtype=torch.int64)
#         masks = torch.as_tensor(masks, dtype=torch.uint8)

#         image_id = torch.tensor([idx])
#         area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
#         iscrowd = torch.zeros((num_objs,), dtype=torch.int32)

#         target = {
#             "boxes": boxes,
#             "labels": labels,
#             "masks": masks,
#             "image_id": image_id,
#             "area": area,
#             "iscrowd": iscrowd
#         }

#         if self.transforms:
#             img, target = self.transforms(img, target)

#         return img, target

#     def __len__(self):
#         return len(self.imgs)

# def get_transform():
#     def transform(img, target):
#         img = F.to_tensor(img)
#         return img, target
#     return transform

In [None]:

# def get_model_instance_segmentation(num_classes):
#     # Load an instance segmentation model pre-trained on COCO
#     model = maskrcnn_resnet50_fpn(pretrained=True)

#     # Freeze the backbone layers
#     for param in model.backbone.parameters():
#         param.requires_grad = False

#     # Replace the head of the model (which typically includes the classifier and box predictor)
#     in_features = model.roi_heads.box_predictor.cls_score.in_features
#     model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

#     in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
#     hidden_layer = 256
#     model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask,
#                                                        hidden_layer,
#                                                        num_classes)

#     return model


In [None]:


# # Load the training and validation sets
# def get_transform():
#     transform = transforms.Compose([
#         transforms.ToTensor(),
#     ])
#     return transform

# train_dataset = FoodCanDataset('.\\rgbd-dataset\\food_can\\', get_transform())
# val_dataset = FoodCanDataset('.\\rgbd-dataset\\food_can_val\\', get_transform())

# # Define training and validation data loaders
# train_loader = DataLoader(train_dataset, batch_size=2, shuffle=True, num_workers=28)
# val_loader = DataLoader(val_dataset, batch_size=2, shuffle=False, num_workers=28)

# device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
# print(f"Using {torch.cuda.get_device_name(device)}")
# num_classes = 2  # 1 class (food can) + background

# model = get_model_instance_segmentation(num_classes)
# model.to(device)

# # Parameters and optimizer
# params = [p for p in model.parameters() if p.requires_grad]
# optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)
# lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# # Number of epochs to train for
# num_epochs = 8

# for epoch in range(num_epochs):
#     model.train()
#     train_loop = tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} (Train)")
#     i = 0
#     for images, targets in train_loader:
#         images = list(image.to(device) for image in images)
#         targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

#         loss_dict = model(images, targets)
#         losses = sum(loss for loss in loss_dict.values())

#         optimizer.zero_grad()
#         losses.backward()
#         optimizer.step()

#         if i % 10 == 0:
#             print(f"Iteration {i} of epoch {epoch} completed, Loss: {losses.item()}")
#         i += 1

#     # Update the learning rate
#     lr_scheduler.step()

#     # Evaluate the model on the validation set
#     model.eval()
#     val_loop = tqdm(val_loader, desc=f"Epoch {epoch+1}/{num_epochs} (Validate)")
#     with torch.no_grad():
#         for images, targets in val_loader:
#             images = list(image.to(device) for image in images)
#             targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
#             prediction = model(images, targets)  # Use model to predict
#             # Here, implement your method to evaluate the prediction

# model_path = 'ft_model.pth'
# torch.save(model, model_path)
# print("Model_saved")


# print("Training complete")
