In [None]:
import os
import albumentations as alb
import shutil
import cv2
import json
import numpy as np

In [None]:
splited_folders = ['train','test','val']
inside_folders = ['images','labels']
splited_data_path = os.path.join('data','splited_data')
aug_data_path = os.path.join('data','augmented_data')
raw_data_path = os.path.join('data','raw_images')

In [None]:
def make_dictionary(path):
    # Check if the directory already exists
    if not os.path.exists(path):
        # Create the directory
        os.makedirs(path)
        print(f"Directory '{path}' created.")
    else:
        print(f"Directory '{path}' already exists.")

for folder in splited_folders:
    path_split = os.path.join(splited_data_path,folder)
    path_aug = os.path.join(aug_data_path,folder)
    make_dictionary(path_split)
    make_dictionary(path_aug)
    for sub_folder in inside_folders:
        sub_path_split = os.path.join(path_split,sub_folder)
        sub_path_aug = os.path.join(path_aug,sub_folder)
        make_dictionary(sub_path_split)
        make_dictionary(sub_path_aug)

In [None]:
train_dataset_size = 0.7
test_dataset_size = 0.15
validation_dataset_size = 0.15
percentages = [train_dataset_size,test_dataset_size,validation_dataset_size]

def count_files_in_folder(dir_path):
    return len([entry for entry in os.scandir(dir_path) if entry.is_file()])

files_in_raw_folder= count_files_in_folder(raw_data_path)
if(files_in_raw_folder%2 != 0):
    print("Uneven number of files in raw data!")
else:
    group_sizes = [int(files_in_raw_folder * percentage) for percentage in percentages]
    if sum(group_sizes) < files_in_raw_folder:
        group_sizes[0] += (files_in_raw_folder - sum(group_sizes))
    if sum(group_sizes) == files_in_raw_folder:
        print(f"all data: {files_in_raw_folder}")
        print(f"train_dataset_size {train_dataset_size*100}% = {group_sizes[0]}")
        print(f"test_dataset_size {test_dataset_size*100}% = {group_sizes[1]}")
        print(f"validation_dataset_size {validation_dataset_size*100}% = {group_sizes[2]}")

    files_names = os.listdir(raw_data_path)
    train_dataset = files_names[:group_sizes[0]] #take first group
    test_dataset = files_names[group_sizes[0]:(group_sizes[0]+group_sizes[1])]
    val_dataset = files_names[(group_sizes[0]+group_sizes[1]):(group_sizes[0]+group_sizes[1]+group_sizes[2])]

    datasets = [['train',train_dataset],['test',test_dataset],['val',val_dataset]]

    for dataset in datasets:
        print(f"copy files from raw_data folder to {dataset[0]} folder ({len(dataset[1])})")
        for filename in dataset[1]:
            if(filename.split('.')[1] == "json"):
                existing_filepath = os.path.join(raw_data_path, filename)
                new_filepath = os.path.join(splited_data_path,dataset[0], 'labels', filename) 
                if os.path.exists(existing_filepath): 
                    shutil.copy(existing_filepath, new_filepath)  
            if(filename.split('.')[1] == "jpg"):
                existing_filepath = os.path.join(raw_data_path, filename)
                new_filepath = os.path.join(splited_data_path,dataset[0], 'images', filename) 
                if os.path.exists(existing_filepath): 
                    shutil.copy(existing_filepath, new_filepath)  


In [None]:
NEW_IMAGES_AMOUNT = 10

augmentor = alb.Compose([alb.RandomCrop(width=450, height=450), 
                         alb.HorizontalFlip(p=0.5), 
                         alb.RandomBrightnessContrast(p=0.2),
                         alb.RandomGamma(p=0.2), 
                         alb.RGBShift(p=0.2), 
                         alb.VerticalFlip(p=0.5)], 
                       bbox_params=alb.BboxParams(format='albumentations', 
                                                  label_fields=['class_labels']))

for partition in splited_folders: 
    for image in os.listdir(os.path.join(splited_data_path, partition, 'images')):
        img = cv2.imread(os.path.join(splited_data_path, partition, 'images', image))

        coords = [0,0,0.00001,0.00001]
        label_path = os.path.join(splited_data_path, partition, 'labels', f'{image.split(".")[0]}.json')
        if os.path.exists(label_path):
            with open(label_path, 'r') as f:
                label = json.load(f)

            coords[0] = label['shapes'][0]['points'][0][0]
            coords[1] = label['shapes'][0]['points'][0][1]
            coords[2] = label['shapes'][0]['points'][1][0]
            coords[3] = label['shapes'][0]['points'][1][1]
            coords = list(np.divide(coords, [640,480,640,480]))

        try: 
            for x in range(NEW_IMAGES_AMOUNT):
                augmented = augmentor(image=img, bboxes=[coords], class_labels=['face'])
                cv2.imwrite(os.path.join(aug_data_path, partition, 'images', f'{image.split(".")[0]}.{x}.jpg'), augmented['image'])

                annotation = {}
                annotation['image'] = image

                if os.path.exists(label_path):
                    if len(augmented['bboxes']) == 0: 
                        annotation['bbox'] = [0,0,0,0]
                        annotation['class'] = 0 
                    else: 
                        annotation['bbox'] = augmented['bboxes'][0]
                        annotation['class'] = 1
                else: 
                    annotation['bbox'] = [0,0,0,0]
                    annotation['class'] = 0 


                with open(os.path.join(aug_data_path, partition, 'labels', f'{image.split(".")[0]}.{x}.json'), 'w') as f:
                    json.dump(annotation, f)

        except Exception as e:
            print("Exception:",e)
