# Mask R-CNN

The goal of this notebook is to run a Mask R-CNN model on R-CNN images and save the state
Then, use it to make real-world predictions


In [1]:
import os.path

import cv2
import numpy as np
import requests
import torchvision
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt


### Load the Dataset

In [2]:
# training data
train_data_transform = transforms.Compose([
    transforms.Resize(224),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4821, 0.4465), (0.2470, 
    0.2435, 0.2616))
])
batch_size = 50
train_set = torchvision.datasets.CIFAR10(root='./data',
                           train=True, download=True,
                           transform=train_data_transform)

train_loader = torch.utils.data.DataLoader(train_set,
                           batch_size=batch_size,
                           shuffle=True, num_workers=2)

val_data_transform = transforms.Compose([
    transforms.Resize(224),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4821, 0.4465), (0.2470, 0.2435, 
    0.2616))
])

val_set = torchvision.datasets.CIFAR10(root='./data',
                                  train=False, download=True,
                                  transform=val_data_transform)

val_order = torch.utils.data.DataLoader(val_set,
                                  batch_size=batch_size,
                                  shuffle=False, num_workers=2)

Files already downloaded and verified
Files already downloaded and verified


In [8]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [9]:
model = torchvision.models.detection.maskrcnn_resnet50_fpn(pretrained=True)

In [10]:
model.eval()

MaskRCNN(
  (transform): GeneralizedRCNNTransform()
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d()
      (relu): ReLU(inplace)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d()
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d()
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): FrozenBatchNorm2d()
          (relu): ReLU(inplace)
          (downsample): Sequential(
            (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
            (1): FrozenBatchNorm2d()
          )
        )
    

## ResNet
Let's try with a simple ResNet and we will move on to retraining Mask RCNN.

In [11]:
def plot_accuracy(accuracy: list):
    """Plot accuracy"""
    plt.figure()
    plt.plot(accuracy)
    plt.xticks(
        [i for i in range(0, len(accuracy))],
        [i + 1 for i in range(0, len(accuracy))])
    plt.ylabel('Accuracy')
    plt.xlabel('Epoch')
    plt.show()


In [12]:
# define a train function

def train_model(model, loss_function, optimizer, data_loader):
    
    # set model on training mode
    model.train()
    
    # set current losses and accuracy
    current_loss = 0.0
    current_acc = 0
    
    # iterate over the training data
    for i, (inputs, labels) in enumerate(data_loader):
        # send the input/labels to the GPU
        inputs = inputs.to(device)
        labels = labels.to(device)
        
        # zero the paramter gradients
        optimizer.zero_grad()
        
        # calculate gradients automatically
# Look a bit into this
        with torch.set_grad_enabled(True):
            # forward pass
            outputs = model(inputs)
            _, predictions = torch.max(outputs, 1)
            loss = loss_function(outputs, labels)

            # backward and calculating gradients
            loss.backward()
            optimizer.step() # change the weights according to gradients

        # statistics
        current_loss += loss.item() * inputs.size(0)
        current_acc += torch.sum(predictions == labels.data)

    total_loss = current_loss/ len(data_loader.dataset)
    total_acc = current_acc.double() / len(data_loader.dataset)
    
    print('Total loss: {:.4f}; Accuracy: {:0.4f}'.format(total_loss, total_acc))

In [13]:
def test_model(model, loss_function, data_loader):
    # set model in eval mode
    model.eval()
    
    current_loss = 0.0
    current_acc = 0
    
    # iterate over the val_data:
    for i, (inputs, labels) in enumerate(data_loader):
        # send the input/labels to the GPU
        inputs = inputs.to(device)
        labels = labels.to(device)
        
        # forward 
        with torch.set_grad_enabled(False):
            outputs = model(inputs)
            _, predictions = torch.max(outputs, 1)
            loss = loss_function(outputs, labels)
            
        # stats
        current_loss += loss.item() * inputs.size(0)
        current_acc += torch.sum(predictions == labels.data)
        
    total_loss = current_loss / len(data_loader.dataset)
    total_acc = current_acc.double() / len(data_loader.dataset)
    
    print('Test Loss: {:.4f}; Accuracy: {:.4f}'.format(total_loss, total_acc))
    
    return total_loss, total_acc

In [14]:
# Now, we are going to do transfer learning where we only change the last layer of the model and don't 
# retrain the whole model, so:

def extract_features(epochs=5):
    
    # load the pretrained model
    model = torchvision.models.resnet18(pretrained=True)
    
    # exclude the existing parameters from backward pass for performance reasons
    for param in model.parameters():
        param.requires_grad = False
        
    # newly constructed layers have requires_grad=True by default
    
    num_features = model.fc.in_features
    model.fc = nn.Linear(num_features, 10)
    
    # trasnfer to GPU (if available)
    model = model.to(device)
    
    loss_function = nn.CrossEntropyLoss()
    
    # only parameters of the final layers are being optimized
    
    optimizer = optim.Adam(model.fc.parameters())
    
    # train
    test_acc = list()
    for epoch in range(epochs):
        print('Epoch {}/{}'.format(epoch+1, epochs))
        
        train_model(model, loss_function, optimizer, train_loader)
        _, acc = test_model(model, loss_function, val_order)
        test_acc.append(acc)
        
    plot_accuracy(test_acc)

In [None]:
extract_features()

Epoch 1/5
Total loss: 1.0504; Accuracy: 0.6424
Test Loss: 0.7219; Accuracy: 0.7538
Epoch 2/5


In [None]:
def train_model(model, loss_function, optimizer, data_loader):
    # set model to training mode
    model.train()

    # we have current_loss equal to 0 and current accuracy equal to 0 as well
    current_loss = 0.0
    current_acc = 0

    # iterate over the training data
    for i, (inputs, labels) in enumerate(data_loader):
        # send the input/labels to the GPU
        inputs = inputs.to(device)
        labels = labels.to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        with torch.set_grad_enabled(True):
            # forward
            outputs = model(inputs)
            _, predictions = torch.max(outputs, 1)
            loss = loss_function(outputs, labels)

            # backward
            loss.backward()
            optimizer.step()

        # statistics
        current_loss += loss.item() * inputs.size(0)
        current_acc += torch.sum(predictions == labels.data)

    total_loss = current_loss / len(data_loader.dataset)
    total_acc = current_acc.double() / len(data_loader.dataset)

    print('Train Loss: {:.4f}; Accuracy: {:.4f}'.format(total_loss, 
    total_acc))

### Define test_model function

In [None]:
def test_model(model, loss_function, data_loader):
    # set model in evaluation mode
    model.eval()

    current_loss = 0.0
    current_acc = 0

    # iterate over  the validation data
    for i, (inputs, labels) in enumerate(data_loader):
        # send the input/labels to the GPU
        inputs = inputs.to(device)
        labels = labels.to(device)

        # forward
        with torch.set_grad_enabled(False):
            outputs = model(inputs)
            _, predictions = torch.max(outputs, 1)
            loss = loss_function(outputs, labels)

        # statistics
        current_loss += loss.item() * inputs.size(0)
        current_acc += torch.sum(predictions == labels.data)

    total_loss = current_loss / len(data_loader.dataset)
    total_acc = current_acc.double() / len(data_loader.dataset)

    print('Test Loss: {:.4f}; Accuracy: {:.4f}'.format(total_loss, 
    total_acc))

    return total_loss, total_acc

### Let's change the model architecture

- We are going to use MaskRCNN and download the pretrained weights
- Replace the last network layer with a new layer with 10 outputs
    - Now, since MaskRCNN predicts one more thing than FasterRCNN, we have to change the FasterRCNN as well as MaskRCNN
- Exclude the existing layers from the backward pass and only pass the newly added FC layer to the Adam optimizer
- Run the training for epochs and evaluate the network accuracy after each epoch.
- Plot the test accuracy

In [None]:
def model_predict(epochs=5):
    
    # Load the model
    model = torchvision.models.detection.maskrcnn_resnet50_fpn(pretrained=True)
    
    # exclude existing parameters from backward pass for performance
    # we do not want to retrain the entire network, so set requires_grad = False
    for para in model.parameters():
        param.requires_grad = False
        
    # get number of input features for classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    
    # replace the pre-trained head with a new one
    num_classes = 10
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    
    # get the number of input features for the mask classifier
    in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
    hidden_layer = 256
    
    # replace the pre-trained head with a new one
    model.roi_heads.box_predictor = MaskRCNNPredictor(in_features_mask,
                                                      hidden_layer,
                                                      num_classes)
    
    return model

In [None]:
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor

In [None]:
def tl_feature_extractor(epochs = 5):
    # load the pretrained model
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    # exclude existing parameters from backward pass for performance
    
    for param in model.parameters():
        param.requires_grad = False
        
    # new layers have requires_grad=True by default
    num_classes = 10
    
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    
    # transfer to GPu if available
    
    # get the number of input features for the mask classifier
#    in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
#    hidden_layer = 256
    
 #   model.roi_heads.box_predictor = MaskRCNNPredictor(in_features_mask,
 #                                                     hidden_layer,
 #                                                     num_classes)
    # model to GPU
    model = model.to(device)
    
    # loss
    loss_function = nn.CrossEntropyLoss()
    
    # only parameters of the final layer are being optimized
    optimizer = optim.Adam(model.roi_heads.box_predictor.parameters())
    
    # train:
    test_acc = list()  # collect accuracy for plotting
    for epoch in range(epochs):
        print('Epoch {}/{}'.format(epoch + 1, epochs))

        train_model(model, loss_function, optimizer, train_loader)
        _, acc = test_model(model, loss_function, val_order)
        test_acc.append(acc)

    plot_accuracy(test_acc)


In [None]:
tl_feature_extractor()

In [None]:

    model = model.to(device)

    loss_function = nn.CrossEntropyLoss()
    
    # only last layers are being optimized 
    optimizer = torch.optim.Adam(model.roi_heads.box_predictor.parameters())
    
    # train 
    test_acc = list() # collect accuracy for plotting
    
    for epoch in range(epochs):
        print('Epoch {}/{}'.format(epoch + 1, epochs))
        
        train_model(model, loss_function, optimizer, train_loader)
        _, acc = test_model(model, loss_function, val_order)
        test_acc.append(acc)
        
    plot_accuracy(test_acc)
    

In [78]:
tl_feature_extractor()

Epoch 1/5


ValueError: In training mode, targets should be passed

In [7]:
# read the file:
image_file = 'C:/Users/Billi/repos/Computer_Vision/OpenCV/Scripts/camera_frame_6.png'

In [8]:
classes = [
    'background', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus',
    'train', 'truck', 'boat', 'traffic light', 'fire hydrant', 'street sign',
    'stop sign', 'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse',
    'sheep', 'cow', 'elephant', 'bear', 'zebra', 'giraffe', 'hat', 'backpack',
    'umbrella', 'shoe', 'eye glasses', 'handbag', 'tie', 'suitcase', 'frisbee',
    'skis', 'snowboard', 'sports ball', 'kite', 'baseball bat', 'baseball glove',
    'skateboard', 'surfboard', 'tennis racket', 'bottle', 'plate', 'wine glass',
    'cup', 'fork', 'knife', 'spoon', 'bowl', 'banana', 'apple', 'sandwich',
    'orange', 'broccoli', 'carrot', 'hot dog', 'pizza', 'donut', 'cake', 'chair',
    'couch', 'potted plant', 'bed', 'mirror', 'dining table', 'window', 'desk',
    'toilet', 'door', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone',
    'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'blender', 'book',
    'clock', 'vase', 'scissors', 'teddy bear', 'hair drier', 'toothbrush', 'hair brushes']

In [9]:
img = cv2.imread(image_file)

In [10]:
# transform the loaded image

transform = transforms.Compose([transforms.ToPILImage(), transforms.ToTensor()])
nn_input = transform(img)
output = model([nn_input])

In [11]:
colors = np.random.uniform(0, 255, size=(len(classes), 3))

In [12]:
# iterate over the network output for all boxes

for box, box_class, score in zip(output[0]['boxes'].detach().numpy(),
                                 output[0]['labels'].detach().numpy(),
                                 output[0]['scores'].detach().numpy()):
    
    # filter the boxes by score
    if score > 0.5:
        # transform bounding box format
        box = [(box[0], box[1]), (box[2], box[3])]
        
        # select class color
        color = colors[box_class]
        
        # extract class name
        class_name = classes[box_class]
        
        # draw the bounding box
        cv2.rectangle(img=img,
                      pt1=box[0],
                      pt2=box[1],
                      color=color,
                      thickness=2)
        
        # display the box class label
        cv2.putText(img=img,
                    text=class_name,
                    org=box[0],
                    fontFace=cv2.FONT_HERSHEY_SIMPLEX,
                    fontScale=1,
                    color=color,
                    thickness=2)
        

In [13]:
import matplotlib.pyplot as plt

plt.imshow(img)

<matplotlib.image.AxesImage at 0x1ec8e52e358>

In [None]:
cv2.imshow('img', img)
cv2.waitKey(0)