In [1]:
import numpy as np
import pandas as pd
import cv2
from PIL import Image, ImageOps
import os
import sys
import shutil
import random
from tqdm import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt

## Creating folders

In [None]:
try:
    shutil.rmtree(os.getcwd()+'\\images')
except:
    None

try:
    shutil.rmtree(os.getcwd()+'\\images\\cropped')
except:
    None

try:
    os.makedirs(os.getcwd()+'\\images\\cropped\\class_1')
    os.makedirs(os.getcwd()+'\\images\\cropped\\class_2')
except:
    None

In [2]:
valid_video_path = os.getcwd()+'\\videos_validation\\'
img_path = os.getcwd()+'\\images\\'

## Capture frames

In [None]:
# vidcap = cv2.VideoCapture('video.mp4')

# def getFrame(sec):
#     vidcap.set(cv2.CAP_PROP_POS_MSEC,sec*1000)
#     hasFrames,image = vidcap.read()
#     if hasFrames:
#         cv2.imwrite(img_path + str(count-1)+".jpg", image)     # save frame as JPG file
#     return hasFrames
# sec = 0
# frameRate = 10 #//it will capture image in each 0.5 second
# count=1
# success = getFrame(sec)
# while success:
#     count = count + 1
#     sec = sec + frameRate
#     sec = round(sec, 2)
#     success = getFrame(sec)

In [None]:
def vid_to_frames(path = None, vid_name = None, frate_sec = 30):
    vidcap = cv2.VideoCapture(path)

    img_path = os.getcwd()+'\\images\\'

    def getFrame(sec):
        vidcap.set(cv2.CAP_PROP_POS_MSEC,sec*1000)
        hasFrames,image = vidcap.read()
        if hasFrames:
            cv2.imwrite(img_path + str(count-1)+ '_' + vid_name[:-4] + ".jpg", image)     # save frame as JPG file
        return hasFrames
    sec = 0
    frameRate = frate_sec
    count=1
    success = getFrame(sec)
    while success:
        count = count + 1
        sec = sec + frameRate
        sec = round(sec, 2)
        success = getFrame(sec)

In [None]:
for i in tqdm(os.listdir(valid_video_path)):
    vid_to_frames(valid_video_path + i, i, 60)

## Load R-CNN model (COCO, resnet50)

In [3]:
# model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
# model = torchvision.models.detection.maskrcnn_resnet50_fpn(pretrained=True)

# model.eval()

COCO_INSTANCE_CATEGORY_NAMES = [
    '__background__', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus',
    'train', 'truck', 'boat', 'traffic light', 'fire hydrant', 'N/A', 'stop sign',
    'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
    'elephant', 'bear', 'zebra', 'giraffe', 'N/A', 'backpack', 'umbrella', 'N/A', 'N/A',
    'handbag', 'tie', 'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball',
    'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard', 'tennis racket',
    'bottle', 'N/A', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl',
    'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza',
    'donut', 'cake', 'chair', 'couch', 'potted plant', 'bed', 'N/A', 'dining table',
    'N/A', 'N/A', 'toilet', 'N/A', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone',
    'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'N/A', 'book',
    'clock', 'vase', 'scissors', 'teddy bear', 'hair drier', 'toothbrush'
]

### Getting prediction bbox, classes, masks

In [4]:
def get_prediction(img_path, threshold, device):
    img = Image.open(img_path) # Load the image
    transform = transforms.Compose([transforms.ToTensor()]) # Defing PyTorch Transform
    img = transform(img).to(device) # Apply the transform to the image
    pred = model([img]) # Pass the image to the model
    pred_class = [COCO_INSTANCE_CATEGORY_NAMES[i] for i in list(pred[0]['labels'].cpu().numpy())] # Get the Prediction Score
    pred_boxes = [[i[0], i[1], i[2], i[3]] for i in list(pred[0]['boxes'].cpu().detach().numpy())] # Bounding boxes
    pred_score = list(pred[0]['scores'].cpu().detach().numpy())
    pred_mask = list(pred[0]['masks'].cpu().detach().numpy()) #MASK RCNN

#     print(pred_boxes, pred_class, pred_score, pred_mask)
#     print('^'*20)
    
    pred_t = [pred_score.index(x) for x in pred_score if x > threshold][-1] # Get list of index with score greater than threshold.
    pred_boxes = pred_boxes[:pred_t+1]
    pred_class = pred_class[:pred_t+1]
    pred_score = pred_score[:pred_t+1]
    pred_mask = pred_mask[:pred_t+1] #MASK RCNN
    
    return pred_boxes, pred_class, pred_score, pred_mask

In [4]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = torch.load(os.getcwd() + '/models/' + 'MRCNN.pt', map_location=torch.device('cpu'))
model.eval()
model = model.to(device)

### Results

In [8]:
def mask(a,b,c,d, mask_param = 50):
    if x != 'cropped':
        img = Image.open(img_path + x)
        suf = 0
        idx = 0
        
        for per, box, msk in zip(b, a, d):
            if per == 'person':

                mm = (msk.squeeze() * 255).astype(np.uint8)
                mm[mm<mask_param] = 0 #regulates area of the mask based on sharpness
                mm[mm>0] = 255 #full white for all non-null mask pixels
                mm = Image.fromarray(mm)
                mm = ImageOps.invert(mm)

                to_save = Image.composite(mm, img, mm)
                to_save = to_save.crop(box=box)
                
                to_save.save(img_path + 'cropped\\class_1\\' + x + '_' + str(idx) + '_' + str(suf) + '.jpg')
                
                suf += 1
                idx += 1

In [9]:
results = []
excl_list = []
l = 3


for x in tqdm(os.listdir(img_path)[:20]):
    a, b, c, d = [0], [0], [0], [0]
    try:
        a, b, c, d = get_prediction(img_path+x, 0.90, device)
        mask(a,b,c,d, mask_param = 50)
    except:
        print(x, ' _'*20)
        excl_list.append(x)
#     results.append([a,b,c,d])

	nonzero(Tensor input, *, Tensor out)
Consider using one of the following signatures instead:
	nonzero(Tensor input, *, bool as_tuple)
100%|██████████████████████████████████████████| 20/20 [02:16<00:00,  6.83s/it]


In [None]:
for idx, x in enumerate(os.listdir(img_path)):
    if x != 'cropped':
        img = Image.open(img_path + x)
        suf = 0
        for per, box, msk in zip(results[idx][1], results[idx][0], results[idx][3]):
            if per == 'person':
                
                to_save = img.crop(box=box)
                to_save.save(img_path + 'cropped\\class_2\\' + str(idx) + '_' + str(suf) + '.jpg')
                suf += 1

### Check results

In [None]:
# rand_img = random.choice(os.listdir(img_path))
# sers = pd.Series(os.listdir(img_path))
# idx = sers[sers == rand_img].index.values[0]

# img = np.array(Image.open(img_path+rand_img))

# for per, box in zip(results[idx][1], results[idx][0]):
#     if per == 'person':
#         img = cv2.rectangle(img, (box[0], box[1]), ((box[2], box[3])), (0,0,255), 10)

# plt.figure(figsize=(7,7))
# plt.imshow(img)

In [None]:
# torch.save(model, 'C:\\Users\\Gleb\\Documents\\Data science\\PROJECTS\\P1_object_detection\\Video_learning\\box_model.pt')

In [None]:
# os.listdir(img_path).intersection(excl_list)

## Create images for classifier

In [None]:
# img=Image.open(img_path+rand_img)

In [None]:
try:
    shutil.rmtree(os.getcwd()+'\\images\\cropped')
except:
    None

try:
    os.mkdir(os.getcwd()+'\\images\\cropped\\')
    os.mkdir(os.getcwd()+'\\images\\cropped\\class_1')
except:
    None

for idx, x in enumerate(os.listdir(img_path)):
    if x != 'cropped':
        img = Image.open(img_path + x)
        suf = 0
        for per, box in zip(results[idx][1], results[idx][0]):
            if per == 'person':
                to_save = img.crop(box=box)
                to_save.save(img_path + 'cropped\\class_1\\' + str(idx) + '_' + str(suf) + '.jpg')
                suf += 1

## Classification

### Dataloader

In [None]:
shuffle = False

In [None]:
# Data augmentation and normalization for training
# Just normalization for testidation
data_transforms = {
    'cropped': transforms.Compose([
        transforms.Resize(256),
#         transforms.CenterCrop(256),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

data_dir = os.getcwd()+'\\images\\'
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),
                                          data_transforms[x])
                  for x in ['cropped']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x],
                                             shuffle=shuffle, num_workers=4)
              for x in ['cropped']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['cropped']}
class_names = image_datasets['cropped'].classes


### Random image show (from dataloader)

In [None]:
def imshow(inp, title=None):
    plt.figure(figsize=(7,7))
    """Imshow for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)  # pause a bit so that plots are updated


# Get a batch of training data
inputs, classes = next(iter(dataloaders['cropped']))

# Make a grid from batch
out = torchvision.utils.make_grid(inputs)

imshow(out)

### Loading classification model weights (ResNet 50)

In [None]:
# loaded_model = models.resnet50(pretrained=True)
# num_ftrs = loaded_model.fc.in_features
# # Here the size of each output sample is set to 2.
# # Alternatively, it can be generalized to nn.Linear(num_ftrs, len(class_names)).
# loaded_model.fc = nn.Linear(num_ftrs, 2)

In [None]:
Path_model_wts = 'C:/Users/gleb/Documents/Data science/PROJECTS/P1_object_detection/model1.pt'

In [None]:
Path_model_wts

In [None]:
loaded_model = torch.load('C:/Users/gleb/Documents/Data science/PROJECTS/P1_object_detection/Video_learning/predict_model.pt')
model.eval()

In [None]:
# loaded_model.load_state_dict(torch.load(Path_model_wts))
loaded_model = torch.load('C:/Users/gleb/Documents/Data science/PROJECTS/P1_object_detection/model_full.pt')
loaded_model.eval()

In [None]:
# torch.save(loaded_model, 'C:\\Users\\Gleb\\Documents\\Data science\\PROJECTS\\P1_object_detection\\Video_learning\\predict_model.pt')

### Visualization of results (probs + image)

In [None]:
all_preds = []

def visualize_model_cpu(model, data_set = 'test', num_images=6):
    
    model.eval()
    model = model.to(device)
    images_so_far = 0
    fig = plt.figure()

    with torch.no_grad():
        for i, (inputs, labels) in enumerate(dataloaders[data_set]):
            inputs = inputs.to(device)#.to('cpu')
            labels = labels.to(device)#.to('cpu')
            
            outputs = model(inputs)
            smth, preds = torch.max(outputs, 1)
            
            title = torch.nn.functional.softmax(outputs, dim=1)
            all_preds.append(preds.cpu().numpy()[0])
            
            for j in range(inputs.size()[0]):
                images_so_far += 1
                ax = plt.subplot(num_images//2, 2, images_so_far)
                ax.axis('off')
                ax.set_title('Gleb: {:.2f}, Katya: {:.2f}'.format(title.cpu().numpy()[0][0], title.cpu().numpy()[0][1]))
                imshow(inputs.cpu().data[j])

                if images_so_far == num_images:
#                     model.train(mode=was_training)
                    return
    return all_preds

In [None]:
visualize_model_cpu(loaded_model, 'cropped', 20)

In [None]:
all_preds

## Saving model

In [None]:
torch.save(model, 'C:\\Users\\Gleb\\Documents\\Data science\\PROJECTS\\P1_object_detection\\Video_learning\\MRCNN.pt')