In [None]:
import os
import matplotlib.pyplot as plt
from bs4 import BeautifulSoup
import numpy as np

In [None]:
import cv2
from PIL import Image

def get_bbox(obj):
    xmin = int(obj.find('xmin').text)
    ymin = int(obj.find('ymin').text)
    xmax = int(obj.find('xmax').text)
    ymax = int(obj.find('ymax').text)
    return [xmin, ymin, xmax, ymax]

def get_label(obj):
    if obj.find('name').text == "with_mask":ssh
        return 2
    elif obj.find('name').text == "mask_weared_incorrect":
        return 3
    return 1

def get_sample(path, image_id): 
    with open(f'{path}/annotations/maksssksksss{image_id}.xml') as f:
        data = f.read()
        soup = BeautifulSoup(data, 'xml')
        objects = soup.find_all('object')

        num_objs = len(objects)
        boxes = []
        labels = []
        for i in objects:
            boxes.append(get_bbox(i))
            labels.append(get_label(i))

        # Annotation is in dictionary format
        sample = {}
        sample["boxes"] = boxes
        sample["labels"] = labels
        sample['image'] = np.array(Image.open(f'images/maksssksksss{image_id}.png').convert('RGB')) 
        sample['image_id'] = image_id
        
        return sample

In [None]:
def draw_rects(sample, save_path = None):
    result = sample['image'].copy()
    for box, label in zip(sample['boxes'], sample['labels']):
        color = (0, 0, 255)
        if label == 2:
            color = (0, 255, 0)
        elif label == 3:
            color = (255, 0, 0)
        cv2.rectangle(result, (box[0],box[1]), (box[2], box[3]), color, 2)
    
    if save_path:
        im = Image.fromarray(result)
        im.save(save_path)

    plt.imshow(result)

In [None]:
def normalize_dataset(dataset):
    for sample in dataset:
        sample['image'] = sample['image'] / 255.

def standarize_dataset(dataset):
    mean = np.mean([sample['image'].mean() for sample in dataset])
    std_dev = np.std([sample['image'].std() for sample in dataset])
    for sample in dataset:
        sample['image'] = (sample['image'] - mean) / std_dev

In [None]:
import imgaug as ia
import imgaug.augmenters as iaa
from imgaug.augmentables.bbs import BoundingBox, BoundingBoxesOnImage
import random
import copy

In [None]:
def random_augumentation(sample):
    augumented = copy.deepcopy(sample)
    type = random.randint(0,31)
    
    bbs = []
    for bb in augumented['boxes']:
        bbs.append(BoundingBox(x1=bb[0], x2=bb[2], y1=bb[1], y2=bb[3]))
    bbs = BoundingBoxesOnImage(bbs, shape=augumented['image'].shape)
    
    params = {'fit_output': True}
    if type & 0x1:
        params['rotate'] = (-random.randint(1,15), random.randint(1,15))
    if type & 0x2:
        params['translate_percent'] ={"x": random.uniform(0,0.2), "y": random.uniform(0,0.2)}
    if type & 0x4:
        params['shear'] =(0,random.randint(1,10))
    if type & 0x8:
        flip_hr=iaa.Fliplr(p=1.0)
        augumented['image'], bbs = flip_hr(image = augumented['image'], bounding_boxes=bbs)
    if type & 0x10:
        params['scale'] = random.uniform(1.1,1.3)
    
    aug = iaa.Affine(**params) 
    augumented['image'], bbs = aug(image = augumented['image'], bounding_boxes=bbs)
    for i, bb in enumerate(bbs):
        augumented['boxes'][i] = [bb.x1, bb.y1, bb.x2, bb.y2]
    
    #remove bbs outside the image
    to_delete = list()
    for i, bb in enumerate(augumented['boxes']):
        if (not(0.< bb[0] <= augumented['image'].shape[1])) or \
        (not(0.< bb[2] <= augumented['image'].shape[1])) or \
        (not(0.< bb[1] <= augumented['image'].shape[0])) or \
        (not(0.< bb[3] <= augumented['image'].shape[0])):
            to_delete.append(i)
        elif bb[0] >= bb[2] or bb[1] >= bb[3]:
            to_delete.append(i)
            
    for index in sorted(to_delete, reverse=True):
        del augumented['boxes'][index]
        del augumented['labels'][index]
        
    return augumented

In [None]:
from sklearn.model_selection import train_test_split

In [None]:
def load_dataset_by_id(ids, path = '.'):
    n = len(os.listdir(f'{path}/annotations/'))
    dataset = []
    for i in ids:
        dataset.append(get_sample(path, i))
    return dataset

In [None]:
#SPLIT1
def get_split_1(train_ids, test_ids, val_ids):
    return load_dataset_by_id(train_ids), load_dataset_by_id(test_ids), load_dataset_by_id(val_ids)

In [None]:
# SPLIT2
def get_agumenteted_data(samples):
    new_samples = []
    for sample in samples:
        new_samples.append(sample)
        if (3 in sample["labels"] or 1 in sample["labels"]) and 2 in sample["labels"]:
            for _ in range(1):
                new_samples.append(random_augumentation(sample)) 
        elif 3 in sample["labels"]:
            for _ in range(12):
                new_samples.append(random_augumentation(sample)) 
        elif 1 in sample["labels"]:
            for _ in range(4):
                new_samples.append(random_augumentation(sample)) 
    return new_samples
    
def get_split_2(train_ids, test_ids, val_ids):
    train_samples = get_agumenteted_data(load_dataset_by_id(train_ids))
    test_samples = get_agumenteted_data(load_dataset_by_id(test_ids))
    val_samples = get_agumenteted_data(load_dataset_by_id(val_ids))

    normalize_dataset(train_samples)
    standarize_dataset(train_samples)
    
    normalize_dataset(test_samples)
    standarize_dataset(test_samples)
    
    normalize_dataset(val_samples)
    standarize_dataset(val_samples)
    
    return train_samples, test_samples, val_samples
    

In [None]:
#SPLIT3
def get_split_3(train_samples, test_samples, val_samples):
    return train_samples + val_samples, test_samples, val_samples

In [None]:
from sklearn.model_selection import train_test_split
annotations_id_train, annotations_id_test = train_test_split(range(853), test_size=0.2)
annotations_id_train, annotations_id_val = train_test_split(annotations_id_train, test_size=0.25)

In [None]:
import numpy as np 
import pandas as pd
import torchvision
from torchvision import transforms, datasets, models
import torch
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
import matplotlib.pyplot as plt
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor
import matplotlib.patches as patches
print(torch.cuda.is_available())
print(torch.__version__)
import os


In [None]:
def get_model_instance_segmentation(num_classes):
    # load an instance segmentation model pre-trained pre-trained on COCO
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
    # get number of input features for the classifier
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes+1)

    return model

In [None]:
def grouped(iterable, n):
    "s -> (s0,s1,s2,...sn-1), (sn,sn+1,sn+2,...s2n-1), (s2n,s2n+1,s2n+2,...s3n-1), ..."
    return zip(*[iter(iterable)]*n)

# Training


In [None]:
def image_to_tensor(img):
    return torch.tensor(img.transpose((2, 0, 1)), dtype=torch.float32)

In [None]:
from datetime import datetime
def train_epoch(model, data, optimizer, logger, accumulation_steps):
    model.train()
    optimizer.zero_grad()
    i = 0    
    total_loss = 0
    accu_step_loss = 0.
    for batch in grouped(data, BATCH_SIZE):
        i += 1
        imgs = list(image_to_tensor(s['image']).to(device) for s in batch)
        annotations = [{k: torch.tensor(s[k]).to(device) for k in s.keys() - {'image'}} for s in batch]
        
        for a in annotations:
            if a['boxes'].shape[-1] != 4:
                 a['boxes'].resize_((1,4))
                 a['labels'].type(dtype=torch.int64)
        
        loss_dict = model(imgs, annotations)
        losses = sum(loss for loss in loss_dict.values())
        losses.backward()
        total_loss += losses.item()
        accu_step_loss += losses.item()
        
        if (i + 1) % accumulation_steps == 0:
            optimizer.step()
            optimizer.zero_grad()
            training_stats = accu_step_loss / accumulation_steps
            accu_step_loss = 0.
            logger.log_metrics({"accu_step_loss": training_stats})
        
    optimizer.step()
    optimizer.zero_grad()
    avg_loss = total_loss / (len(train)//BATCH_SIZE)
    return avg_loss


def evaluate(model, data):
    #model.eval()  # Turn   on the evaluation mode
    model.train() # loss calculation is a bit complicated, 
                  # for validation purposes loss can be obtained from training mode.
                  # https://stackoverflow.com/questions/60339336/validation-loss-for-pytorch-faster-rcnn/65347721#65347721
    total_loss = 0.

    with torch.no_grad():
        for sample in data:
            img = image_to_tensor(sample['image']).to(device)
            annotation = {k: torch.tensor(sample[k]).to(device) for k in sample.keys() - {'image'}}
            loss_dict = model([img], [annotation])
            total_loss += sum(loss for loss in loss_dict.values()).item()
    
    return total_loss / len(data)


def save_model(model, name = 'model-'):
    path = "./models/"
    dateTimeObj = datetime.now()
    timestamp = dateTimeObj.strftime("%d-%b-%Y_%H:%M")
    filename = path + name + timestamp + '.pt'
    torch.save(model.state_dict(), filename)

# SPLIT1

In [None]:
from src.neptune_logging import NeptuneLogger, DummyLogger

train, test, val = get_split_1(annotations_id_train, annotations_id_test, annotations_id_val)

num_epochs = 25
BATCH_SIZE = 1 # no gpu space for more, we need to accumulate gradient, and update it every ACCUMULATE steps
ACCUMULATE = 4
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = get_model_instance_segmentation(3).to(device)

# parameters
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.000001,
                                momentum=0.9, weight_decay=0.0005)
best_loss = 10000

logger = NeptuneLogger("Face mask - split 1", None)
best_loss = 10e5
for i in range(num_epochs):
    trainig_stats = train_epoch(model, train, optimizer, logger, ACCUMULATE)
    val_stats = evaluate(model, val)
    logger.log_metrics({"training_loss": trainig_stats,
                        "val_loss": val_stats})
    print('Epoch:', i + 1, "Train loss: ", trainig_stats, "Validation loss:", val_stats)
    
    if val_stats < best_loss:
        best_loss = val_stats
        save_model(model, 'model_1_split')

del model

# SPLIT2

In [None]:
from src.neptune_logging import NeptuneLogger, DummyLogger

train, test, val = get_split_2(annotations_id_train, annotations_id_test, annotations_id_val)

num_epochs = 25
BATCH_SIZE = 1 # no gpu space for more, we need to accumulate gradient, and update it every ACCUMULATE steps
ACCUMULATE = 4
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = get_model_instance_segmentation(3).to(device)

# parameters
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.0001,
                                momentum=0.9, weight_decay=0.0005)
best_loss = 10000

logger = NeptuneLogger("Face mask - split 1", None)
best_loss = 10e5
for i in range(num_epochs):
    trainig_stats = train_epoch(model, train, optimizer, logger, ACCUMULATE)
    val_stats = evaluate(model, val)
    logger.log_metrics({"training_loss": trainig_stats,
                        "val_loss": val_stats})
    print('Epoch:', i + 1, "Train loss: ", trainig_stats, "Validation loss:", val_stats)
    
    if val_stats < best_loss:
        best_loss = val_stats
        save_model(model, 'model_2_split')

del model

# SPLIT 3

In [None]:
from src.neptune_logging import NeptuneLogger, DummyLogger

train, test, val = get_split_2(annotations_id_train, annotations_id_test, annotations_id_val)
train, test, val = get_split_3(train, test, val)

num_epochs = 25
BATCH_SIZE = 1 # no gpu space for more, we need to accumulate gradient, and update it every ACCUMULATE steps
ACCUMULATE = 4
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = get_model_instance_segmentation(3).to(device)

# parameters
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.0001,
                                momentum=0.9, weight_decay=0.0005)
best_loss = 10000

logger = NeptuneLogger("Face mask - split 1", None)
best_loss = 10e5
for i in range(num_epochs):
    trainig_stats = train_epoch(model, train, optimizer, logger, ACCUMULATE)
    val_stats = evaluate(model, val)
    logger.log_metrics({"training_loss": trainig_stats,
                        "val_loss": val_stats})
    print('Epoch:', i + 1, "Train loss: ", trainig_stats, "Validation loss:", val_stats)
    
    if val_stats < best_loss:
        best_loss = val_stats
        save_model(model, 'model_3_split')

del model

# Loading model example

In [None]:
model = get_model_instance_segmentation(3).to(device)
model.load_state_dict(torch.load("models/MODEL2.pt"))
model.eval()
