# Imports

In [None]:
import os
if os.path.isdir('pytorchObjectDetection'):
    os.chdir('pytorchObjectDetection')
else:
    os.makedirs('pytorchObjectDetection')
    os.chdir('pytorchObjectDetection')
os.getcwd()


In [None]:
%%bash
git clone https://github.com/pytorch/vision.git
cd vision
git checkout v0.3.0
cp references/detection/utils.py ../
cp references/detection/transforms.py ../
cp references/detection/coco_eval.py ../
cp references/detection/engine.py ../
cp references/detection/coco_utils.py ../

In [None]:
import numpy as np
import pickle
import pycocotools
import torch
import torch.utils.data
from PIL import Image
from PIL import ImageDraw
import pandas as pd
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from engine import train_one_epoch, evaluate
import utils
import transforms as T
import torchvision
import xml.etree.ElementTree as ET
import glob
os.chdir('..')


# Data class

In [None]:
class WeaponDataset(torch.utils.data.Dataset):
    
    def __init__(self, dicPics,categories, path, transforms=None): 
        self.path = path
        self.dicPics = dicPics
        self.transforms = transforms
        self.categories = categories
        self.imgs = [o for o in dicPics]

    def __getitem__(self, idx):
        img = Image.open(self.path +"/"+ self.imgs[idx]).convert("RGB")        
        box_list = self.dicPics[self.imgs[idx]][0]
        
        boxes = torch.as_tensor(box_list, dtype=torch.float32)
        num_objs = len(box_list)
        labels_list =  self.dicPics[self.imgs[idx]][1]

        # multible classes
        labels = torch.zeros((num_objs,), dtype=torch.int64)

        for i in range(num_objs):
          labels[i] = self.categories[labels_list[i]]
        #print(labels)
        image_id = torch.tensor([idx])
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:,0])

        # suppose all instances are not crowd
        iscrowd = torch.zeros((num_objs,), dtype=torch.int64)
        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["image_id"] = image_id
        target["area"] = area
        target["iscrowd"] = iscrowd
        
        if self.transforms is not None:
            img, target = self.transforms(img, target)
        
        return img, target
    
    def __len__(self):
        return len(self.imgs)

### Loading train and test dictionaries stored in drive

In [None]:
def load_dic(path):
    if os.path.isfile(path):
        return pickle.load( open( path, "rb" ) )
    else:
        print("no such file")
        return 0

In [None]:
pathDataset = "drive/MyDrive/BaggagesData"
pathTestDictionary = pathDataset + "/TestDicData.pkl"
imgbbox_test = load_dic(pathTestDictionary)
len(imgbbox_test)

In [None]:
pathTrainDictionary = pathDataset + "/trainingdata_final.pkl"
imgbbox_train = load_dic(pathTrainDictionary)
len(imgbbox_train.keys())

In [None]:
# Counts how many boxes of each label

cat = {'handgun': 1,'knife': 2, 'razorblade': 3, 'shuriken': 4}
nr = np.zeros(4)
for _, labels in imgbbox_train.values():
    for l in labels:
        nr[cat[l]-1] += 1
nr

#### Checks if the data was properly created

In [None]:
dataset = WeaponDataset(dicPics = imgbbox_train, categories = cat, path = pathDataset + "/Train", transforms = None)
dataset.__getitem__(1)

#### Function to import pretrained Faster RCNN model

In [None]:
def get_model(num_classes):
  # load an object detection model pre-trained on COCO
  model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
  # get the number of input features for the classifier
  in_features = model.roi_heads.box_predictor.cls_score.in_features
  # replace the pre-trained head with a new on
  model.roi_heads.box_predictor = FastRCNNPredictor(in_features,num_classes)
   
  return model

Converts image to tensor and defines transformations that will happen to the training data during learning

In [None]:
def get_transform(train):
    transforms = []
   # converts the image, a PIL image, into a PyTorch Tensor
    transforms.append(T.ToTensor())
    if train:
      # during training, randomly flip the training images
      # and ground-truth for data augmentation
        transforms.append(T.RandomHorizontalFlip(0.5))
    return T.Compose(transforms)

# Preperations of data and model before training takes place

#### Defines the test and train datasets and splits them 

In [None]:
pathDataset_train_imgs = pathDataset + "/Train"
pathDataset_test_imgs = pathDataset + "/Test"


# use our dataset and defined transformations
dataset = WeaponDataset(dicPics=imgbbox_train, categories = cat, path = pathDataset_train_imgs, transforms = get_transform(train=True))         # Training
dataset_test = WeaponDataset(dicPics = imgbbox_test, categories = cat, path = pathDataset_test_imgs, transforms = get_transform(train=False))   # Testing

# split the dataset in train and test set
torch.manual_seed(1)

# define training and validation data loaders
data_loader = torch.utils.data.DataLoader(
              dataset, batch_size=6, shuffle=True, num_workers=4,
              collate_fn=utils.collate_fn)

data_loader_test = torch.utils.data.DataLoader(
         dataset_test, batch_size=1, shuffle=False, num_workers=1,
         collate_fn=utils.collate_fn)

print("Training set: {} examples, Test set: {} examples".format(len(dataset), len(dataset_test)))

####Defines learning rate and tries to use to GPU for training

In [None]:
print("Will use GPU for training:", torch.cuda.is_available())
import gc 
# Your code with pytorch using GPU
gc.collect() 
numClasses = len(cat) +1 


In [None]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
# our dataset has 5 classes - weapon and background
numClasses = len(cat) +1 
# get the model using our helper function
model = get_model(numClasses)
# move model to the right device
model.to(device)
# construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)

# and a learning rate scheduler which decreases the learning rate by # 10x every 3 epochs

lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# TRAIN NEW MODEL

In [None]:
pred = []

In [None]:
num_epochs = 5
for epoch in range(num_epochs):
   # train for one epoch, printing every 10 iterations
    train_one_epoch(model, optimizer, data_loader, device, epoch+1, print_freq=10)
   # update the learning rate
    lr_scheduler.step()
   # evaluate on the test dataset
    pred.append(evaluate(model, data_loader_test, device=device))

Save model for further training or save it for good


In [None]:
import sys
import io

old_stdout = sys.stdout # Memorize the default stdout stream
sys.stdout = buffer = io.StringIO()

print(pred[5].summarize())
# Call your algorithm function.
# etc...

sys.stdout = old_stdout # Put the old stream back in place

whatWasPrinted = buffer.getvalue() # Return a str containing the entire contents of the buffer.
print(whatWasPrinted) # Why not to print it?


In [None]:
loss_map = []

In [None]:
loss = []

In [None]:
loss.append(float(whatWasPrinted[91:96]))

In [None]:
import matplotlib.pyplot as plt

plt.plot(np.arange(5), np.array(loss))
plt.ylabel("Accuracy")
plt.xlabel("Epochs")
plt.title("mAP[0.5:0.95]")
plt.figure()


plt.plot(np.arange(5), np.array(loss_map))
plt.ylabel("Accuracy")
plt.xlabel("Epochs")
plt.title("mAP[0.5]")


In [None]:
saveForGood = True

In [None]:
 # Saves model to folder trainedModels/XRAY
path_trnd_model = "drive/MyDrive/trainedModels/XRAY"
model_name = "model_" +str(len(imgbbox_train))+ "images_" +str(numClasses)+"Classes_"+str(num_epochs)+"epochs"

if os.path.isdir(path_trnd_model) is False:
    os.mkdir(path_trnd_model)

if saveForGood:
    # Saves the final model, not trainable anymore
    torch.save(model.state_dict(), path_trnd_model+"/"+model_name + "_finished")
else:
    state = {'epoch': num_epochs +1, 'state_dict': model.state_dict(), 'optimizer': optimizer.state_dict() }
    torch.save(state, path_trnd_model+"/"+model_name)

#TRAIN OLD MODEL

In [None]:
def load_checkpoint(model, optimizer=None, filename=None):
    # Note: Input model & optimizer should be pre-defined.  This routine only updates their states.
    start_epoch = 0
    if os.path.isfile(filename):
        print("=> loading checkpoint '{}'".format(filename))
        checkpoint = torch.load(filename)
        start_epoch = checkpoint['epoch']
        model.load_state_dict(checkpoint['state_dict'])
        if optimizer is not None:
          optimizer.load_state_dict(checkpoint['optimizer'])
        print("=> loaded checkpoint '{}' (epoch {})".format(filename, checkpoint['epoch']))
    else:
        print("=> no checkpoint found at '{}'".format(filename))

    return model, optimizer, start_epoch

Name and path of old model

In [None]:
print(model_name)

In [None]:
path_trnd_model = "drive/MyDrive/trainedModels/XRAY"
model_name = "model_1465images_11Classes_10epochs"

In [None]:
model, optimizer, start_epoch = load_checkpoint(get_model(num_classes = numClasses), optimizer, filename=path_trnd_model+"/"+model_name)
model = model.to(device)

gc.collect() 
# individually transfer the optimizer parts...
for state in optimizer.state.values():
    for k, v in state.items():
        if isinstance(v, torch.Tensor):
            state[k] = v.to(device)

In [None]:

num_epochs = 5
for epoch in range(num_epochs):
    # train for one epoch, printing every 10 iterations
    train_one_epoch(model, optimizer, data_loader, device, epoch+1, print_freq=10)

    # update the learning rate
    lr_scheduler.step()
   # evaluate on the test dataset
    evaluate(model, data_loader_test, device=device)

Save model for further training or save it for good


In [None]:
saveForGood = True

In [None]:
 # Saves model to folder trainedModels/XRAY
path_trnd_model = "drive/MyDrive/trainedModels/XRAY"
model_name = "model_" +str(len(imgbbox_train))+ "images_" +str(numClasses)+"Classes_"+str(start_epoch+num_epochs-1 )+"epochs"

if os.path.isdir(path_trnd_model) is False:
    os.mkdir(path_trnd_model)

if saveForGood:
    # Saves the final model, not trainable anymore
    torch.save(model.state_dict(), path_trnd_model+"/"+model_name + "_finished")
else:
    state = {'epoch': num_epochs +1, 'state_dict': model.state_dict(), 'optimizer': optimizer.state_dict() }
    torch.save(state, path_trnd_model+"/"+model_name)

#PREDICT

In [None]:
# To load trained model
path_trnd_model = "drive/MyDrive/trainedModels/XRAY"
model_name ="model_1465images_5Classes_5epochs_finished"

loaded_model = get_model(num_classes = len(cat) +1 )

if os.path.isfile(path_trnd_model +"/"+ model_name):
    loaded_model.load_state_dict(torch.load( path_trnd_model +"/"+ model_name))
else:
    print("Wrong path or filename")

In [None]:
evaluator = evaluate(model, data_loader_test, device=device)

In [None]:
gc.collect() 

In [None]:
torch.set_num_threads(1)
#device = torch.device('cpu')
device = torch.device('cuda')


loaded_model.to(device)
loaded_model.eval()

predictions = []
#image, _ = next(iter(data_loader_test))

for image, targets in data_loader_test:
    image = list(img.to(device) for img in image)

    #torch.cuda.synchronize()
    predictions.append(loaded_model(image) )



In [None]:
predictions

In [None]:
idx = 20
#for idx in range(1):

cat_color = {1:"red", 2:"yellow", 3: "blue", 4:"orange"}

img, _ = dataset_test[idx]

label_boxes = np.array(dataset_test[idx][1]["boxes"])
#put the model in evaluation mode
loaded_model.eval()
with torch.no_grad():
    prediction = loaded_model([img])
image = Image.fromarray(img.mul(255).permute(1, 2,0).byte().numpy())
draw = ImageDraw.Draw(image)

cat_rev = {cat[o]: o for o in cat}

# draw groundtruth
for elem in range(len(label_boxes)):
    draw.rectangle([(label_boxes[elem][0], label_boxes[elem][1]),
    (label_boxes[elem][2], label_boxes[elem][3])], 
    outline ="green", width =3)
    
for element in range(len(prediction[0]["boxes"])):

    boxes = prediction[0]["boxes"][element].cpu().numpy()
    score = np.round(prediction[0]["scores"][element].cpu().numpy(),
                    decimals= 4)
    if score > 0.8:

        draw.rectangle([(boxes[0], boxes[1]), (boxes[2], boxes[3])], outline =cat_color[np.int(prediction[0]["labels"][element])], width =3)
        draw.text((boxes[0], boxes[1]), text = str(score)+ " " + cat_rev[np.int(prediction[0]["labels"][element])] , fill="#000")
display(image)


In [None]:


for idx in range(len(dataset_test)):

    cat_color = {1:"red", 2:"yellow", 3: "blue", 4:"orange"}

    img, _ = dataset_test[idx]

    label_boxes = np.array(dataset_test[idx][1]["boxes"])
    #put the model in evaluation mode
    loaded_model.eval()
    with torch.no_grad():
        prediction = loaded_model([img])
    image = Image.fromarray(img.mul(255).permute(1, 2,0).byte().numpy())
    draw = ImageDraw.Draw(image)

    cat_rev = {cat[o]: o for o in cat}

    # draw groundtruth
    #for elem in range(len(label_boxes)):
    #    draw.rectangle([(label_boxes[elem][0], label_boxes[elem][1]),
    #    (label_boxes[elem][2], label_boxes[elem][3])], 
    #    outline ="green", width =2)
        
    for element in range(len(prediction[0]["boxes"])):

        boxes = prediction[0]["boxes"][element].cpu().numpy()
        score = np.round(prediction[0]["scores"][element].cpu().numpy(),
                        decimals= 4)
        if score > 0.7:

            draw.rectangle([(boxes[0], boxes[1]), (boxes[2], boxes[3])], outline =cat_color[np.int(prediction[0]["labels"][element])], width =3)
            draw.text((boxes[0], boxes[1]), text = str(score)+ " " + cat_rev[np.int(prediction[0]["labels"][element])] , fill="#000")
    display(image)