In [None]:
import os
import random
import math
from datetime import datetime
from collections import Counter
import pandas as pd
import numpy as np

import re
import cv2
from PIL import Image
from pathlib import Path
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
from sklearn.model_selection import train_test_split
import xml.etree.ElementTree as ET

import torch
from torch.utils.data import Dataset, DataLoader
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models

images_path = Path('./data/train')
anno_path = Path('./data/train/train_bbox.txt')

In [None]:
def draw(imagePath, coordinates):
    # Reading the image
    image = cv2.imread(imagePath)

    # Fixing the coordinates from float type to int type
    x1, y1, x2, y2 = float(coordinates[0]), float(coordinates[1]), float(coordinates[2]), float(coordinates[3])
    start_point = (int(x1), int(y1)) 
    end_point = (int(x2), int(y2))

    # Plotting the bbox
    image = cv2.rectangle(image, start_point, end_point, (255, 0, 0), 2) 
    plt.imshow(image)
    plt.show()

In [None]:
imageList, labelList, anno_list = [], [], []
imageList = os.listdir(images_path)
imageList.remove('train_bbox.txt')
sorted_imageList = sorted(imageList)

labelList = open(anno_path, "r")
labels = labelList.readlines()

# print(len(sorted_imageList), len(labels))
# label = labels[idx].split(" ")
# draw("./data/train/"+sorted_imageList[idx], label[:4])

for image, label in zip(sorted_imageList,labels):
    image_label_idx = [re.findall(r'(\w+?)(\d+)', image)[0]][0][1]
    filename = "./data/train/"+image[:-9]+image_label_idx+".png"
    img = cv2.imread(filename)
    height, width, channel = img.shape

    lab = label.split(" ")
    
    x = lab[4]
    if x[0].startswith("t"):
        class_label = 1 # "tamper"
    else:
        class_label = 0 # "authentic"

    anno = {}
    anno['filename'] = filename
    anno['width']    = width
    anno['height']   = height
    anno['label']    = class_label
    anno['xmin']     = int(float(lab[0]))
    anno['ymin']     = int(float(lab[1]))
    anno['xmax']     = int(float(lab[2]))
    anno['ymax']     = int(float(lab[3]))
    anno['bbox']     = [int(float(lab[1])), int(float(lab[0])), int(float(lab[3])), int(float(lab[2]))]
    anno_list.append(anno)

df_train = pd.DataFrame(anno_list)

# class_dict = {'authentic': 0, 'tamper': 1}
# df_train['label'] = df_train['label'].apply(lambda x:  class_dict[x])
print(df_train.shape)
df_train.head(10)

In [None]:
def read_image(path):
    return cv2.cvtColor(cv2.imread(str(path)), cv2.COLOR_BGR2RGB)

def create_mask(bb, x):
    """
    Creates a mask for the bounding box of same shape as image
    """
    rows,cols,*_ = x.shape
    Y = np.zeros((rows, cols))
    # bb = bb.astype(np.int)
    Y[bb[0]:bb[2], bb[1]:bb[3]] = 1.
    return Y

def mask_to_bb(Y):
    """
    Convert mask Y to a bounding box, assumes 0 as background nonzero object
    """
    cols, rows = np.nonzero(Y)
    if len(cols)==0:
        return np.zeros(4, dtype=np.float32)
    top_row = np.min(rows)
    left_col = np.min(cols)
    bottom_row = np.max(rows)
    right_col = np.max(cols)
    return np.array([left_col, top_row, right_col, bottom_row], dtype=np.float32)

def create_bb_array(x):
    """
    Generates bounding box array from a train_df row
    """
    return np.array([x[5], x[4], x[7], x[6]])

def resize_image_bb(read_path, write_path, bb, sz):
    """
    Resize an image and its bounding box and write image to new path
    """
    im = read_image(read_path)
    im_resized = cv2.resize(im, (int(1.49*sz), sz))
    Y_resized = cv2.resize(create_mask(bb, im), (int(1.49*sz), sz))
    new_path = str(write_path/read_path) #.parts[-1])
    cv2.imwrite(new_path, cv2.cvtColor(im_resized, cv2.COLOR_RGB2BGR))
    return new_path, mask_to_bb(Y_resized)

# modified from fast.ai
def crop(im, r, c, target_r, target_c): 
    return im[r:r+target_r, c:c+target_c]

# random crop to the original size
def random_crop(x, r_pix=8):
    """
    Returns a random crop
    """
    r, c,*_ = x.shape
    c_pix = round(r_pix*c/r)
    rand_r = random.uniform(0, 1)
    rand_c = random.uniform(0, 1)
    start_r = np.floor(2*rand_r*r_pix).astype(int)
    start_c = np.floor(2*rand_c*c_pix).astype(int)
    return crop(x, start_r, start_c, r-2*r_pix, c-2*c_pix)

def center_crop(x, r_pix=8):
    r, c,*_ = x.shape
    c_pix = round(r_pix*c/r)
    return crop(x, r_pix, c_pix, r-2*r_pix, c-2*c_pix)

def rotate_cv(im, deg, y=False, mode=cv2.BORDER_REFLECT, interpolation=cv2.INTER_AREA):
    """
    Rotates an image by deg degrees
    """
    r,c,*_ = im.shape
    M = cv2.getRotationMatrix2D((c/2,r/2),deg,1)
    if y:
        return cv2.warpAffine(im, M,(c,r), borderMode=cv2.BORDER_CONSTANT)
    return cv2.warpAffine(im,M,(c,r), borderMode=mode, flags=cv2.WARP_FILL_OUTLIERS+interpolation)

def random_cropXY(x, Y, r_pix=8):
    """
    Returns a random crop
    """
    r, c,*_ = x.shape
    c_pix = round(r_pix*c/r)
    rand_r = random.uniform(0, 1)
    rand_c = random.uniform(0, 1)
    start_r = np.floor(2*rand_r*r_pix).astype(int)
    start_c = np.floor(2*rand_c*c_pix).astype(int)
    xx = crop(x, start_r, start_c, r-2*r_pix, c-2*c_pix)
    YY = crop(Y, start_r, start_c, r-2*r_pix, c-2*c_pix)
    return xx, YY

def transformsXY(path, bb, transforms):
    x = cv2.imread(str(path))#.astype(np.float32)
    x = cv2.cvtColor(x, cv2.COLOR_BGR2RGB)/255
    Y = create_mask(bb, x)
    if transforms:
        rdeg = (np.random.random()-.50)*20
        x = rotate_cv(x, rdeg)
        Y = rotate_cv(Y, rdeg, y=True)
        if np.random.random() > 0.5: 
            x = np.fliplr(x).copy()
            Y = np.fliplr(Y).copy()
        x, Y = random_cropXY(x, Y)
    else:
        x, Y = center_crop(x), center_crop(Y)
    return x, mask_to_bb(Y)

def create_corner_rect(bb, color='red'):
    bb = np.array(bb, dtype=np.float32)
    return plt.Rectangle((bb[1], bb[0]), bb[3]-bb[1], bb[2]-bb[0], color=color, fill=False, lw=3)

def show_corner_bb(im, bb):
    plt.imshow(im)
    plt.gca().add_patch(create_corner_rect(bb))

def normalize(im):
    """
    Normalizes images with Imagenet stats.
    """
    imagenet_stats = np.array([[0.485, 0.456, 0.406], [0.229, 0.224, 0.225]])
    return (im - imagenet_stats[0])/imagenet_stats[1]

In [None]:
# x = random.randint(1, len(sorted_imageList))
# im = cv2.imread(str(df_train.values[x][0]))
# bb = create_bb_array(df_train.values[x])
# Y = create_mask(bb, im)
# mask_to_bb(Y)
# plt.imshow(im)

In [None]:
# plt.imshow(Y, cmap='gray')

In [None]:
# Populating Training DF with new paths and bounding boxes
new_paths, new_bbs = [], []
train_path_resized = Path('./data/resized')
for index, row in df_train.iterrows():
    new_path, new_bb = resize_image_bb(row['filename'], train_path_resized, create_bb_array(row.values), 416)
    new_paths.append(new_path)
    new_bbs.append(new_bb)
df_train['filename_'] = new_paths
df_train['bbox_'] = new_bbs

In [None]:
# idx = 9430
# im = cv2.imread(df_train.values[idx][0])
# # show_corner_bb(im, [df_train.values[idx][5], df_train.values[idx][4], df_train.values[idx][7], df_train.values[idx][6]])
# show_corner_bb(im, [df_train.values[idx][5], df_train.values[idx][4], df_train.values[idx][7], df_train.values[idx][6]])

In [None]:
df_train = df_train.reset_index()
X = df_train[['filename_', 'bbox_']]
Y = df_train['label']

In [None]:
X_train, X_val, y_train, y_val = train_test_split(X, Y, test_size=0.2, random_state=42)

In [None]:
class COCODataset(Dataset):
    def __init__(self, paths, bb, y, transforms=False):
        self.transforms = transforms
        self.paths = paths.values
        self.bb = bb.values
        self.y = y.values

    def __len__(self):
        return len(self.paths)
    
    def __getitem__(self, idx):
        path = self.paths[idx]
        y_class = self.y[idx]
        x, y_bb = transformsXY(path, self.bb[idx], self.transforms)
        x = normalize(x)
        x = np.rollaxis(x, 2)
        return x, y_class, y_bb

train_ds = COCODataset(X_train['filename_'],X_train['bbox_'] ,y_train, transforms=True)
valid_ds = COCODataset(X_val['filename_'],X_val['bbox_'],y_val)

batch_size = 64
train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
valid_dl = DataLoader(valid_ds, batch_size=batch_size)



In [None]:
class Resnet50(nn.Module):
    def __init__(self):
        super(Resnet50, self).__init__()
        resnet = models.resnet50(pretrained=True)
        layers = list(resnet.children())[:8]
        self.features1 = nn.Sequential(*layers[:6])
        self.features2 = nn.Sequential(*layers[6:])
        self.classifier = nn.Sequential(nn.BatchNorm1d(512), nn.Linear(512, 4))
        self.bb = nn.Sequential(nn.BatchNorm1d(512), nn.Linear(512, 4))
        
    def forward(self, x):
        x = self.features1(x)
        x = self.features2(x)
        x = F.relu(x)
        x = nn.AdaptiveAvgPool2d((1,1))(x)
        x = x.view(x.shape[0], -1)
        return self.classifier(x), self.bb(x)

In [None]:
def update_optimizer(optimizer, lr):
    for i, param_group in enumerate(optimizer.param_groups):
        param_group["lr"] = lr

In [None]:
def train_epocs(model, optimizer, train_dl, val_dl, epochs=10,C=1000):
    idx = 0
    for i in range(epochs):
        model.train()
        total = 0
        sum_loss = 0
        for x, y_class, y_bb in train_dl:
            batch = y_class.shape[0]
            x = x.cuda().float()
            y_class = y_class.cuda()
            y_bb = y_bb.cuda().float()
            out_class, out_bb = model(x)
            loss_class = F.cross_entropy(out_class, y_class, reduction="sum")
            loss_bb = F.l1_loss(out_bb, y_bb, reduction="none").sum(1)
            loss_bb = loss_bb.sum()
            loss = loss_class + loss_bb/C
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            idx += 1
            total += batch
            sum_loss += loss.item()
        train_loss = sum_loss/total
        val_loss, val_acc = val_metrics(model, valid_dl, C)
        print("train_loss %.3f val_loss %.3f val_acc %.3f" % (train_loss, val_loss, val_acc))
    return sum_loss/total

def val_metrics(model, valid_dl, C=1000):
    model.eval()
    total = 0
    sum_loss = 0
    correct = 0 
    for x, y_class, y_bb in valid_dl:
        batch = y_class.shape[0]
        x = x.cuda().float()
        y_class = y_class.cuda()
        y_bb = y_bb.cuda().float()
        out_class, out_bb = model(x)
        loss_class = F.cross_entropy(out_class, y_class, reduction="sum")
        loss_bb = F.l1_loss(out_bb, y_bb, reduction="none").sum(1)
        loss_bb = loss_bb.sum()
        loss = loss_class + loss_bb/C
        _, pred = torch.max(out_class, 1)
        correct += pred.eq(y_class).sum().item()
        sum_loss += loss.item()
        total += batch
    return sum_loss/total, correct/total

model = Resnet50()
parameters = filter(lambda p: p.requires_grad, model.parameters())
optimizer = torch.optim.Adam(parameters, lr=0.006)

In [None]:
train_epocs(model, optimizer, train_dl, valid_dl, epochs=5)