In [None]:
from google.colab import drive

drive.mount('/content/gdrive', force_remount=True)

Mounted at /content/gdrive


In [None]:
import os

# (WARNING!!!) DIRECT THE PATH TO "Data-Competition" folder
path = '.../Data-Competition'
os.chdir(path)

In [None]:
!pip install -U albumentations



# Library

In [None]:
import pandas as pd
import numpy as np
import glob
import cv2
import os
import re

from PIL import Image

import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2, ToTensor

import torch
import torchvision

from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator

from torch.utils.data import DataLoader, Dataset
from torch.utils.data.sampler import SequentialSampler

from matplotlib import pyplot as plt

# DIR_TRAIN_VAL = './dataset_origin/images/train_val'
DIR_TRAIN = './dataset_origin/images/train'
DIR_VAL = './dataset_origin/images/val'
DIR_TEST = './dataset_origin/images/public_test'


SEED = 42
def seed_everything(seed=SEED):
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    # random.seed(seed)
    # tf.random.set_seed(seed)

seed_everything(SEED)

# Load train_csv data

In [None]:
train_df = pd.read_csv('./dataset_origin/train_csv.csv', index_col=False)
val_df = pd.read_csv('./dataset_origin/val_csv.csv', index_col=False) 
test_df = pd.read_csv('./dataset_origin/test_csv.csv', index_col=False)

In [None]:
print(f'There are totally {len(train_df["image_id"].unique())} images in total dataset')
print(f'There are totally {len(val_df["image_id"].unique())} images in val dataset')

There are totally 937 images in total dataset
There are totally 152 images in val dataset


In [None]:
train_df

Unnamed: 0,image_id,width,height,label,x,y,w,h
0,1035,960.0,960.0,1.0,0.704688,0.522917,0.034375,0.098611
1,1035,960.0,960.0,1.0,0.551562,0.201389,0.045312,0.077778
2,289,960.0,960.0,1.0,0.241858,0.605691,0.059107,0.162602
3,504,960.0,960.0,1.0,0.296875,0.501389,0.064062,0.077778
4,504,960.0,960.0,2.0,0.791016,0.022917,0.021094,0.031944
...,...,...,...,...,...,...,...,...
1565,1047,960.0,960.0,1.0,0.454688,0.227083,0.040625,0.073611
1566,1047,960.0,960.0,0.0,0.734375,0.472222,0.053125,0.077778
1567,1047,960.0,960.0,1.0,0.402734,0.059028,0.032031,0.054167
1568,1047,960.0,960.0,1.0,0.848437,0.021528,0.020313,0.031944


## Mosaic

In [None]:
from sklearn.utils import shuffle
import random

class FPTDatasetMosaic(Dataset):
  def __init__(self, dataframe, image_dir, transforms=None):
    super().__init__()

    self.df = dataframe  # Annotation & Image's ID dataframe
    self.transforms = transforms  # Albumentation's augmentation
    self.image_ids = shuffle(dataframe['image_id'].unique())  # Image's ID
    self.labels = [np.zeros((0, 4), dtype=np.float32)] * len(self.image_ids) # Image's bboxes
    self.class_labels = [np.zeros((0, 1), dtype=np.float32)] * len(self.image_ids)  # Image's label
    self.img_size = 960
    self.image_dir = image_dir
    self.mosaic = True
    im_w = 1280
    im_h = 720

    # Loop through each image (Each image might containt multiple bboxes & labels)
    for i, img_id in enumerate(self.image_ids):
      records = self.df[self.df['image_id'] == img_id]
      labels = records[['label', 'x', 'y', 'w', 'h']].values  # Annotations
      self.labels[i] = np.array(labels)

      
  def __getitem__(self, index: int):
    if self.mosaic == True:
      # Load mosaic
      img, labels = load_mosaic(self, index)
      shapes = None
      img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  # BGR to RGB      
      return img, labels

  def __len__(self) -> int:
    return self.image_ids.shape[0]


In [None]:
def random_affine(img, targets=(), degrees=10, translate=.1, scale=.1, shear=10, border=0):
    # torchvision.transforms.RandomAffine(degrees=(-10, 10), translate=(.1, .1), scale=(.9, 1.1), shear=(-10, 10))
    # https://medium.com/uruvideo/dataset-augmentation-with-random-homographies-a8f4b44830d4

    if targets is None:  # targets = [cls, xyxy]
        targets = []
    height = img.shape[0] + border * 2
    width = img.shape[1] + border * 2

    # Rotation and Scale
    R = np.eye(3)
    a = random.uniform(-degrees, degrees)
    # a += random.choice([-180, -90, 0, 90])  # add 90deg rotations to small rotations
    s = random.uniform(1 - scale, 1 + scale)
    R[:2] = cv2.getRotationMatrix2D(angle=a, center=(img.shape[1] / 2, img.shape[0] / 2), scale=s)

    # Translation
    T = np.eye(3)
    T[0, 2] = random.uniform(-translate, translate) * img.shape[0] + border  # x translation (pixels)
    T[1, 2] = random.uniform(-translate, translate) * img.shape[1] + border  # y translation (pixels)

    # Shear
    S = np.eye(3)
    S[0, 1] = math.tan(random.uniform(-shear, shear) * math.pi / 180)  # x shear (deg)
    S[1, 0] = math.tan(random.uniform(-shear, shear) * math.pi / 180)  # y shear (deg)

    # Combined rotation matrix
    M = S @ T @ R  # ORDER IS IMPORTANT HERE!!
    if (border != 0) or (M != np.eye(3)).any():  # image changed
        img = cv2.warpAffine(img, M[:2], dsize=(width, height), flags=cv2.INTER_LINEAR, borderValue=(114, 114, 114))

    # Transform label coordinates (Important!!!)
    n = len(targets)
    if n:
        # warp points
        xy = np.ones((n * 4, 3))
        xy[:, :2] = targets[:, [1, 2, 3, 4, 1, 4, 3, 2]].reshape(n * 4, 2)  # x1y1, x2y2, x1y2, x2y1
        xy = (xy @ M.T)[:, :2].reshape(n, 8)

        # create new boxes
        x = xy[:, [0, 2, 4, 6]]
        y = xy[:, [1, 3, 5, 7]]
        xy = np.concatenate((x.min(1), y.min(1), x.max(1), y.max(1))).reshape(4, n).T

        # # apply angle-based reduction of bounding boxes
        # radians = a * math.pi / 180
        # reduction = max(abs(math.sin(radians)), abs(math.cos(radians))) ** 0.5
        # x = (xy[:, 2] + xy[:, 0]) / 2
        # y = (xy[:, 3] + xy[:, 1]) / 2
        # w = (xy[:, 2] - xy[:, 0]) * reduction
        # h = (xy[:, 3] - xy[:, 1]) * reduction
        # xy = np.concatenate((x - w / 2, y - h / 2, x + w / 2, y + h / 2)).reshape(4, n).T

        # Explanation of this part of code: https://github.com/ultralytics/yolov5/issues/448
        # reject warped points outside of image
        xy[:, [0, 2]] = xy[:, [0, 2]].clip(0, width)
        xy[:, [1, 3]] = xy[:, [1, 3]].clip(0, height)
        w = xy[:, 2] - xy[:, 0]
        h = xy[:, 3] - xy[:, 1]
        area = w * h
        area0 = (targets[:, 3] - targets[:, 1]) * (targets[:, 4] - targets[:, 2])
        ar = np.maximum(w / (h + 1e-16), h / (w + 1e-16))  # aspect ratio
        i = (w > 4) & (h > 4) & (area / (area0 * s + 1e-16) > 0.2) & (ar < 10)

        targets = targets[i]
        targets[:, 1:5] = xy[i]

    return img, targets

In [None]:
def load_mosaic(self, index):
    """Load image in a mosaic form _ combines 4 training images into one in certain ratios (instead of only two in CutMix"""

    labels4 = []
    s = self.img_size  # Size of image
    xc, yc = [int(random.uniform(s * 0.5, s * 1.5)) for _ in range(2)]  # mosaic center x, y
    indices = [index] + [random.randint(0, len(self.labels) - 1) for _ in range(3)]  # 3 additional image indices
    for i, index in enumerate(indices):
        # Load image
        img, (h, w) = load_image(self, index)

        # place img in img4
        if i == 0:  # top left
            img4 = np.full(shape=(s * 2, s * 2, img.shape[2]), fill_value=114, dtype=np.uint8)  # base image with 4 tiles
            x1a, y1a, x2a, y2a = max(xc - w, 0), max(yc - h, 0), xc, yc  # xmin, ymin, xmax, ymax (large image)
            x1b, y1b, x2b, y2b = w - (x2a - x1a), h - (y2a - y1a), w, h  # xmin, ymin, xmax, ymax (small image)
        elif i == 1:  # top right
            x1a, y1a, x2a, y2a = xc, max(yc - h, 0), min(xc + w, s * 2), yc
            x1b, y1b, x2b, y2b = 0, h - (y2a - y1a), min(w, x2a - x1a), h
        elif i == 2:  # bottom left
            x1a, y1a, x2a, y2a = max(xc - w, 0), yc, xc, min(s * 2, yc + h)
            x1b, y1b, x2b, y2b = w - (x2a - x1a), 0, max(xc, w), min(y2a - y1a, h)
        elif i == 3:  # bottom right
            x1a, y1a, x2a, y2a = xc, yc, min(xc + w, s * 2), min(s * 2, yc + h)
            x1b, y1b, x2b, y2b = 0, 0, min(w, x2a - x1a), min(y2a - y1a, h)

        img4[y1a:y2a, x1a:x2a] = img[y1b:y2b, x1b:x2b]  # img4[ymin:ymax, xmin:xmax]
        padw = x1a - x1b
        padh = y1a - y1b

        # Labels
        x = self.labels[index]
        labels = x.copy()
        if x.size > 0:  # Normalized xywh to pixel xyxy format
            labels[:, 1] = w * (x[:, 1] - x[:, 3] / 2) + padw
            labels[:, 2] = h * (x[:, 2] - x[:, 4] / 2) + padh
            labels[:, 3] = w * (x[:, 1] + x[:, 3] / 2) + padw
            labels[:, 4] = h * (x[:, 2] + x[:, 4] / 2) + padh
        labels4.append(labels)

    # Concat/clip labels
    if len(labels4):
        labels4 = np.concatenate(labels4, 0)
        # np.clip(labels4[:, 1:] - s / 2, 0, s, out=labels4[:, 1:])  # use with center crop
        np.clip(labels4[:, 1:], 0, 2 * s, out=labels4[:, 1:])  # use with random_affine

    # Reason should add "random_affine()" in mosaic https://github.com/ultralytics/yolov5/issues/448
    img4, labels4 = random_affine(img4, labels4,
                                  degrees=1.98 * 2,
                                  translate=0.05 * 2,
                                  scale=0.05 * 2,
                                  shear=0.641 * 2,
                                  border=-s // 2)  # border to remove

    return img4, labels4


In [None]:
def arr_to_str(arr, count_f):
  """Transform arr to string & save them to folder dataset_aug"""
  result = str()

  # Arr to String
  label_1 = int(arr[0])
  result = str(label_1) + ' '
  line = ' '.join([str(item) for item in arr[1:]]) + '\n'
  result = result + line
  return result

def pascal_to_yolo(xmin, ymin, xmax, ymax, image_width=640, image_height=640):
  x_coord = (xmin + xmax) / 2 / image_width
  y_coord = (ymin + ymax) / 2 / image_height
  shape_width = (xmax - xmin) / image_width
  shape_height = (ymax - ymin) / image_height
  return x_coord, y_coord, shape_width, shape_height

def collate_fn(batch):
    return tuple(zip(*batch))

import random, math
import tensorflow as tf

def load_image(self, index):
  # loads 1 image from dataset, returns img, original hw, resized hw
  """Load 1 image from dataset
  Input:
    index: idx to search for image's id
  Output:
    img, hw_original, hw_resized """
  # Read an image using opencv2
  image_id = self.image_ids[index]
  img = cv2.imread(f'{self.image_dir}/{image_id}.jpg', cv2.IMREAD_COLOR)
    
  assert img is not None, 'Image Not Found ' + imgpath
  h0, w0 = img.shape[:2]  # orig hw
  return img, (h0, w0)  # img, hw_original

In [None]:
def mosaic_filter(num_img, train_df, DIR_TRAIN):
  """
    Create & filter only mosaic image with # of labels in each image larger than 1
    Input:
      num_img: (int) # of mosaic images wanted to create
      train_df: (df) .csv metadata file of train dataset wanted to do augmentation
      DIR_TRAIN: (str) path direct to train's folder
    Output:
      image_lst: list of mosaic images
      target_lst: list of appropriate mosaic labels
  """
  a = 0
  image_lst = list()
  target_lst = list()

  train_dataset = FPTDatasetMosaic(train_df, DIR_TRAIN)  # 792 images
  train_data_loader = DataLoader(
      train_dataset,
      batch_size=15,
      shuffle=True,
      num_workers=4,
      collate_fn=collate_fn
  )

  while (a < num_img):
    images, targets = next(iter(train_data_loader))
    for image, target in zip(images, targets):
      if len(target) > 1:
        image_lst.append(image)
        target_lst.append(target)
        a += 1
      else:
        continue

      if a == num_img:
        break

  return image_lst, target_lst


# Create 50 mosaic images based "train" dataset folder
images, targets = mosaic_filter(50, train_df, DIR_TRAIN)

In [None]:
def save_mosaic(images, targets, save_labels_path, save_images_path):
  """Save the mosaic images & labels into image's folder & label's folder
  Input:
    images: (list) of mosaic images (np) 
    targets: (list) of associated mosaic labels (np)
    save_labels_path: (str) path folder used to save .txt labels
    save_images_path: (str) path folder used to save .jpg images
  """
  a1 = 0
  b1 = 0

  for img, label in zip(images, targets):
    height, width = img.shape[:2]
    a1 += 1

    txt_file = open(f'{save_labels_path}/img_mosaic_{a1}.txt', 'w')
    # Through each bbox of an image
    for j in range(len(label)):
      # Normalize the box's annotation after augmentation (AS requirement from competition)
      a,b,c,d = pascal_to_yolo(label[j][1], label[j][2], label[j][3], label[j][4], width, height)
      label_yolo = np.array([label[j][0], a, b, c, d])
      label_yolo = arr_to_str(label_yolo, a1)

      # Save the string for txt file
      txt_file.write(label_yolo)
    txt_file.close()
  print(f'FINISH SAVING MOSAIC LABELS TO FOLDER: {save_labels_path}')

  # Save images into folder "images/train"
  for img in images:
    b1 += 1
    im = Image.fromarray(img, "RGB")
    im.save(f'{save_images_path}/img_mosaic_{b1}.jpg')
  print(f'FINISH SAVING MOSAIC IMAGES TO FOLDER: {save_images_path}')

In [None]:
# Save the created mosaic images to folder
save_mosaic(images, targets, "./dataset_aug/labels/train", "./dataset_aug/images/train")

FINISH SAVING MOSAIC LABELS TO FOLDER: ./dataset_aug/labels/train
FINISH SAVING MOSAIC IMAGES TO FOLDER: ./dataset_aug/images/train
