<a href="https://colab.research.google.com/github/chauminhnguyen/augumentation_coco/blob/main/augmentation_coco.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Augmentation

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install --upgrade albumentations

Collecting albumentations
[?25l  Downloading https://files.pythonhosted.org/packages/95/b2/9492c74a5d260bc39f0cba9fcdc6652d0f87d342aaeb32197c62029f82df/albumentations-0.5.1-py3-none-any.whl (71kB)
[K     |████▋                           | 10kB 14.8MB/s eta 0:00:01[K     |█████████▏                      | 20kB 12.8MB/s eta 0:00:01[K     |█████████████▊                  | 30kB 9.9MB/s eta 0:00:01[K     |██████████████████▎             | 40kB 8.6MB/s eta 0:00:01[K     |██████████████████████▉         | 51kB 4.3MB/s eta 0:00:01[K     |███████████████████████████▍    | 61kB 4.5MB/s eta 0:00:01[K     |████████████████████████████████| 71kB 4.9MB/s eta 0:00:01[K     |████████████████████████████████| 81kB 3.5MB/s 
Collecting opencv-python-headless>=4.1.1
[?25l  Downloading https://files.pythonhosted.org/packages/08/e9/57d869561389884136be65a2d1bc038fe50171e2ba348fda269a4aab8032/opencv_python_headless-4.4.0.46-cp36-cp36m-manylinux2014_x86_64.whl (36.7MB)
[K     |████████████

In [None]:
import imageio
import albumentations as A
import os
import json
import numpy as np
from math import trunc
from PIL import Image as PILImage
import cv2


## Load Dataset

In [None]:
# Load the dataset json
class CocoDataset():
    '''
    annotation_path: json
    image_dir: folder of images

    '''
    def __init__(self, annotation_path, image_dir):
        self.annotation_path = annotation_path
        self.image_dir = image_dir
        
        json_file = open(self.annotation_path)
        self.coco = json.load(json_file)
        json_file.close()
        
        # self.process_info()
        # self.process_licenses()
        self.process_categories()
        self.process_images()
        self.process_segmentations()

    
    def display_image(self, image_id, show_polys=True, show_labels=True, show_bbox=True, show_crowds=True, use_url=False):
        # Print the image info
        image = self.images[image_id]
        image_path = os.path.join(self.image_dir, image['file_name'])
        image = PILImage.open(image_path)
        
        # Calculate the size and adjusted display size
        max_width = 900
        image_width, image_height = image.size
        adjusted_width = min(image_width, max_width)
        adjusted_ratio = adjusted_width / image_width
        adjusted_height = adjusted_ratio * image_height
        
        # Create list of polygons to be drawn
        polygons = {}
        bbox_polygons = {}
        rle_regions = {}
        poly_colors = {}
        polys = []
        for i, segm in enumerate(self.segmentations[image_id]):
            polygons_list = []
            if segm['iscrowd'] != 0:
                # Gotta decode the RLE
                px = 0
                x, y = 0, 0
                rle_list = []
                for j, counts in enumerate(segm['segmentation']['counts']):
                    if j % 2 == 0:
                        # Empty pixels
                        px += counts
                    else:
                        # Need to draw on these pixels, since we are drawing in vector form,
                        # we need to draw horizontal lines on the image
                        x_start = trunc(trunc(px / image_height) * adjusted_ratio)
                        y_start = trunc(px % image_height * adjusted_ratio)
                        px += counts
                        x_end = trunc(trunc(px / image_height) * adjusted_ratio)
                        y_end = trunc(px % image_height * adjusted_ratio)
                        if x_end == x_start:
                            # This is only on one line
                            rle_list.append({'x': x_start, 'y': y_start, 'width': 1 , 'height': (y_end - y_start)})
                        if x_end > x_start:
                            # This spans more than one line
                            # Insert top line first
                            rle_list.append({'x': x_start, 'y': y_start, 'width': 1, 'height': (image_height - y_start)})
                            
                            # Insert middle lines if needed
                            lines_spanned = x_end - x_start + 1 # total number of lines spanned
                            full_lines_to_insert = lines_spanned - 2
                            if full_lines_to_insert > 0:
                                full_lines_to_insert = trunc(full_lines_to_insert * adjusted_ratio)
                                rle_list.append({'x': (x_start + 1), 'y': 0, 'width': full_lines_to_insert, 'height': image_height})
                                
                            # Insert bottom line
                            rle_list.append({'x': x_end, 'y': 0, 'width': 1, 'height': y_end})
                if len(rle_list) > 0:
                    rle_regions[segm['id']] = rle_list  
            else:
                # Add the polygon segmentation
                for segmentation_points in segm['segmentation']:
                    segmentation_points = np.multiply(segmentation_points, adjusted_ratio).astype(int)
                    polygons_list.append(str(segmentation_points).lstrip('[').rstrip(']'))

            polygons[segm['id']] = polygons_list
            
            bbox = segm['bbox']
            bbox_points = [bbox[0], bbox[1], bbox[0] + bbox[2], bbox[1],
                           bbox[0] + bbox[2], bbox[1] + bbox[3], bbox[0], bbox[1] + bbox[3],
                           bbox[0], bbox[1]]
            bbox_points = np.multiply(bbox_points, adjusted_ratio).astype(int)
            polys.append(bbox_points[:-2])
        return image_path, image_width, image_height, self.segmentations[image_id]
       
    # def process_info(self):
    #     self.info = self.coco.get('info')
    
    # def process_licenses(self):
    #     self.licenses = self.coco.get('licenses')
    
    def get_max_image_id(self):
        return list(self.images.keys())[-1]

    def process_categories(self):
        self.categories = {}
        self.super_categories = {}
        for category in self.coco['categories']:
            cat_id = category['id']
            super_category = category['supercategory']
            
            # Add category to the categories dict
            if cat_id not in self.categories:
                self.categories[cat_id] = category
            else:
                print("ERROR: Skipping duplicate category id: {}".format(category))

            # Add category to super_categories dict
            if super_category not in self.super_categories:
                self.super_categories[super_category] = {cat_id} # Create a new set with the category id
            else:
                self.super_categories[super_category] |= {cat_id} # Add category id to the set
                
    def process_images(self):
        self.images = {}
        for image in self.coco['images']:
            image_id = image['id']
            if image_id in self.images:
                print("ERROR: Skipping duplicate image id: {}".format(image))
            else:
                self.images[image_id] = image

    def process_segmentations(self):
        self.segmentations = {}
        for segmentation in self.coco['annotations']:
            image_id = segmentation['image_id']
            if image_id not in self.segmentations:
                self.segmentations[image_id] = []
            self.segmentations[image_id].append(segmentation)

## Export to JSON

In [None]:
def img_convert2json(width, height, image_id, images):
    '''
    images: array type
    '''
    images.append(
        {
            "file_name": str(image_id) + ".png",
            "height": height,
            "width": width,
            "id": image_id,
            "street_id": 0
        }
    )


def ann_convert2json(area, image_id, bbox, category_id, id, annotations):
    '''
    annotations: array type
    '''
    annotations.append(
        {
            "segmentation": [],
            "area": int(area),
            "iscrowd": 0,
            "image_id": int(image_id),
            "bbox": [
                    int(bbox[0]),
                    int(bbox[1]),
                    int(bbox[2]),
                    int(bbox[3]),
                    ],
            "category_id": int(category_id),
            "id": int(id)
        }
    )

In [None]:
def export2json(annotations, images, annotation_path):
    # save to json:
    s = json.load(open(annotation_path, 'r'))
    s['images'].extend(images)
    s['annotations'].extend(annotations)
    s = json.dumps(s)
    out_file = '/content/final_augmentation_dataset.json'
    out = open(out_file, 'w')
    out.write(s)
    out.close()

## Augment Bounding Boxes

### Utils

In [None]:
def save_image(img, img_name):
    im_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    cv2.imwrite(img_name, im_rgb)

### Run Code

In [None]:
annotation_path = '/content/drive/MyDrive/Zalo AI/za_traffic_2020/traffic_train/train_traffic_sign_dataset_merging_NMS_rmBbox.json'
image_dir = '/content/drive/MyDrive/Zalo AI/za_traffic_2020/traffic_train/images'
augmentation_img_path = "/content/img1/"

coco_dataset = CocoDataset(annotation_path, image_dir)
max_index = coco_dataset.get_max_image_id()

In [None]:
# Albumentations path with bounding boxes

RGB = A.Compose([A.RGBShift(r_shift_limit=30, g_shift_limit=30, b_shift_limit=30, p=1)],bbox_params=A.BboxParams(format='coco', label_fields=['category_ids']))

brightness =  A.Compose([A.RandomBrightnessContrast(brightness_limit=0.2,contrast_limit=0.4,p=1)],bbox_params=A.BboxParams(format='coco', label_fields=['category_ids']))

rotate = A.Compose([A.Rotate(limit=15,p=0.3)],bbox_params=A.BboxParams(format='coco', label_fields=['category_ids']))

posterize = A.Compose([A.Posterize(p=1)],bbox_params=A.BboxParams(format='coco', label_fields=['category_ids']))

perspective = A.Compose([A.IAAPerspective(scale=(0.01, 0.15),p=1)],bbox_params=A.BboxParams(format='coco', label_fields=['category_ids']))

it_ann = 0
it_img = 0
annotations = []
images = []

image_indexes = os.listdir(image_dir)

for image_index in image_indexes:

    image_index = int(image_index.split('.')[0])
    try:
        image_path, image_width, image_height, segm = coco_dataset.display_image(image_index, use_url=False)

        bboxes = [segm[i]['bbox'] for i in range(len(segm))]
        labels = [segm[i]['category_id'] for i in range(len(segm))]
        image = imageio.imread(image_path)
        
        transformer = []
        transformer.append(RGB(image=image, bboxes=bboxes, category_ids=labels))
        transformer.append(brightness(image=image, bboxes=bboxes, category_ids=labels))
        transformer.append(rotate(image=image, bboxes=bboxes, category_ids=labels))
        transformer.append(posterize(image=image, bboxes=bboxes, category_ids=labels))
        transformer.append(perspective(image=image, bboxes=bboxes, category_ids=labels))

        # Crop image to 4 path
        crop_image = []

        trans_topleft = A.Compose([
                A.Crop(x_min=0, y_min=0,x_max=int(image.shape[1]/2)+40,y_max=int(image.shape[0]/2)+40)],
                bbox_params=A.BboxParams(format='coco',min_visibility=0.2, label_fields=['category_ids']),)

        trans_topright = A.Compose([
                A.Crop(x_min=int(image.shape[1]/2)-40, y_min=0,x_max=image.shape[1],y_max=int(image.shape[0]/2)+40)],
                bbox_params=A.BboxParams(format='coco',min_visibility=0.2, label_fields=['category_ids']),)

        trans_botleft = A.Compose([
                A.Crop(x_min=0, y_min=int(image.shape[0]/2)-40,x_max=int(image.shape[1]/2)+40,y_max=image.shape[0])],
                bbox_params=A.BboxParams(format='coco',min_visibility=0.2, label_fields=['category_ids']),)

        trans_botright = A.Compose([
                A.Crop(x_min=int(image.shape[1]/2)-40, y_min=int(image.shape[0]/2)-40,x_max=image.shape[1],y_max=image.shape[0])],
                bbox_params=A.BboxParams(format='coco',min_visibility=0.2, label_fields=['category_ids']),)

        for transformed in transformer:
          crop_image.append(trans_topleft(image=transformed['image'], bboxes=transformed['bboxes'], category_ids=transformed['category_ids']))
          crop_image.append(trans_topright(image=transformed['image'], bboxes=transformed['bboxes'], category_ids=transformed['category_ids']))
          crop_image.append(trans_botleft(image=transformed['image'], bboxes=transformed['bboxes'], category_ids=transformed['category_ids']))
          crop_image.append(trans_botright(image=transformed['image'], bboxes=transformed['bboxes'], category_ids=transformed['category_ids']))

        # Remove NULL BBX
        crop_image_out = []

        fullsize =  A.Compose([
                A.Resize(height=image.shape[0], width = image.shape[1])],
                bbox_params=A.BboxParams(format='coco',min_visibility=0.2, label_fields=['category_ids']),)

        for transformed in crop_image:
          if transformed['bboxes'] != []:
            transformed = fullsize(image=transformed['image'],bboxes=transformed['bboxes'],category_ids=transformed['category_ids'])
            crop_image_out.append(transformed)
        for transformed in crop_image_out:
            it_img += 1
            has_bb = False
            for i in range(len(transformed['bboxes'])):
                area = transformed['bboxes'][i][2] * transformed['bboxes'][i][3]
                if area > 25:
                    has_bb = True
                    ann_convert2json(area, max_index + it_img, list(transformed['bboxes'][i]), transformed['category_ids'][i], it_ann, annotations)
                    it_ann += 1
            if has_bb:
                img_convert2json(image_width, image_height, max_index + it_img, images)
                save_image(transformed['image'], augmentation_img_path + str(max_index + it_img) + ".png")
    except:
        continue

export2json(annotations, images, annotation_path)
print("Done")

Done
