# Data Processing Notebook

## Setup

### Imports


In [None]:
import os
import random
import shutil

import cv2
import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import numpy as np
import xml_to_dict as xtd

# Define the paths
test_dir = './datasets/test'
output_test_dir = './datasets/test_cropped'

# Create the output directory if it doesn't exist
if not os.path.exists(output_test_dir):
    os.makedirs(output_test_dir)

ori_rendered_path = os.path.abspath('../Backup/1 - rendered_images')
ori_aspl_path = os.path.abspath('../Backup/1 - aspl_images')

rendered_path = os.path.abspath('./datasets/rendered')
aspl_path = os.path.abspath('./datasets/aspl')


### Process CVAT labelled data for YOLOv8 training

The provided data by ASPL was labelled through rotated bounding box. However, rotated/oriented bounding box remains to be a frontier of research and there is not a lot of format supporting it.

Thus, this is an attempt to convert from LabelMe format to YOLO OBB/Segmentation format.


In [None]:
def process_cvat(search_dir):
    obj_idx_dict = {
            "Main Pad": 0,
            "LED Pads": 1,
            "Main Chip": 2,
            "Left LED Chip": 3,
            "Right LED Chip": 3,
            # "Main PCB": 5
        }

    parser = xtd.XMLtoDict()

    for root, _, files in os.walk(search_dir):

        for file in files:

            if file.endswith(".xml"):

                file_path = os.path.join(root, file)
                file_name = file.split(".")[0]

                with open(file_path, 'r') as f:
                    content = f.read()
                
                xml_dict = parser.parse(content)['annotation']

                img_x = int(xml_dict['imagesize']['ncols'])
                img_y = int(xml_dict['imagesize']['nrows'])

                with open(os.path.join(root, file_name + '.txt'), 'w') as f:

                    for obj in xml_dict['object']:
                        
                        if obj['name'] not in obj_idx_dict:
                            continue

                        obj_name = obj['name']
                        points = []

                        for pt in obj['polygon']['pt']:
                            points.append((float(pt['x']), float(pt['y'])))

                        angle = obj['attributes'].split(',')[0].split('=')[1]
                        rect = cv2.RotatedRect(points[0], points[1], points[2])
                        rect.angle = float(angle)
                        verts = [f"{vert:.8f}" for vert in np.multiply(cv2.boxPoints(rect), [1/img_x, 1/img_y]).flatten().tolist()]
                    
                        f.write(f'{str(obj_idx_dict[obj_name])} {" ".join(verts)}\n')
                


### Process Rendered data for YOLOv8 training

Rendered data is already compatible with YOLO OBB/Segmentation format.

However, mistakes were made during rendering and broke the labels, this was subsequently fixed with the code below.


In [None]:
def process_rendered():
    img_dir = os.path.abspath('./datasets/rendered')
    lbl_dir = os.path.abspath('./datasets/rendered/labels')

    val_dir = os.path.abspath('./datasets/rendered/val')
    train_dir = os.path.abspath('./datasets/rendered/train')

    if not os.path.exists(val_dir):
        os.makedirs(val_dir)

    if not os.path.exists(train_dir):
        os.makedirs(train_dir)

    file_list = [file for file in os.listdir(img_dir) if file.endswith('.png')]
    random.shuffle(file_list)
    n_files = len(file_list)

    for i, item in enumerate(file_list):
        
        item_name = item.split('.')[0]
        img_path = os.path.join(img_dir, item)
        lbl_path = os.path.join(lbl_dir, item_name + '.txt')

        if i < n_files * 0.2:
            shutil.move(img_path, os.path.join(val_dir, item))
            shutil.move(lbl_path, os.path.join(val_dir, item_name + '.txt'))
        else:
            shutil.move(img_path, os.path.join(train_dir, item))
            shutil.move(lbl_path, os.path.join(train_dir, item_name + '.txt'))

        if (i - 1) % 100 == 0 or i == n_files - 1:
            print(f"{i+1} / {n_files} files processed")


In [None]:
def fix_labels(search_dir):
    for root, _, files in os.walk(search_dir):
        for file in files:
            if file.endswith(".txt"):
                file_path = os.path.join(root, file)
                labels = []

                with open(file_path, 'r') as f:
                    labels = f.read().splitlines()

                new_labels = []
                for label in labels:
                    label = label.split(' ')
                    label = [ float(x) for x in label ]
                    label[0] = int(label[0])

                    ori_label = label.copy()

                    if label[0] == 2:
                        label[2] = 1-ori_label[2]
                        label[4] = 1-ori_label[4]
                        label[6] = 1-ori_label[6]
                        label[8] = 1-ori_label[8]
                    else:
                        label[1] = ori_label[1]
                        label[2] = 1-ori_label[2]
                        label[3] = ori_label[5]
                        label[4] = 1-ori_label[6]
                        label[5] = ori_label[7]
                        label[6] = 1-ori_label[8]
                        label[7] = ori_label[3]
                        label[8] = 1-ori_label[4]

                    new_labels.append(' '.join([str(x) for x in label]))

                    print(label)
                
                with open(file_path, 'w') as f:
                    for label in new_labels:
                        f.write(label + '\n')


### Process All Labels

Code below is used to process all labels, like removing PCB and unifying different classes.


In [None]:
def remove_pcb(files, root):
    for file in files:
        if file.endswith('.txt'):
            file_path = os.path.join(root, file)
            with open(file_path, 'r') as f:
                lines = f.readlines()
            with open(file_path, 'w') as f:
                for line in lines:
                    if line.split()[0] != '5':
                        f.write(line) 

def rename_4_to_3(files, root):
    for file in files:
        if file.endswith('.txt'):
            file_path = os.path.join(root, file)
            with open(file_path, 'r') as f:
                lines = f.readlines()
            with open(file_path, 'w') as f:
                for line in lines:
                    if line.split()[0] == '4':
                        line = '3 ' + ' '.join(line.split()[1:]) + '\n'
                    f.write(line)


### Splitting Data into Train, Test, and Validation

Functions to allow splitting of data into train, test, and validation sets.


In [None]:
## copy files from a list to a specified directory
def copy_files(files, directory):
    x = 0
   
    if not os.path.exists(directory):
        os.makedirs(directory)

    for file in files:
        shutil.copy(file, directory)
        x += 1
        if x % 25 == 0 or x == len(files):
            print("Copied {} files. Last file copied was {}".format(x, file))



## Run the functions

### Copying images

Copying images from its original backup to the dataset location.


In [None]:
# First, process the ASPL XML labels to YOLO format
# Then, find all txt labels from ASPL and move them to the same folder with the images correspondingly

process_cvat(ori_aspl_path)

all_txt_labels = {}

for root, _, files in os.walk(ori_aspl_path):
    for file in files:
        if file.endswith('.txt'):
            all_txt_labels[file.split('.')[0]] = os.path.join(root, file)

for root, _, files in os.walk(ori_aspl_path):
    for file in files:
        if file.endswith('.png') or file.endswith('.jpg'):
            shutil.copy(all_txt_labels[file.split('.')[0]], root)


In [None]:
# No. of files to copy:
# Rendered - Get 600 good, 400 bad, and 200 mirage
# ASPL - Use all

# Split ratio:
# Rendered - 85% train, 15% val
# ASPL - 70% train, 20% val, 10% test

aspl = []

for root, _, files in os.walk(ori_aspl_path):
    for file in files:
        if file.endswith('.png') or file.endswith('.jpg'):
            file = file.split('.')[0]
            aspl.append((file, root))

random.shuffle(aspl)

aspl_train = aspl[:int(0.7*len(aspl))]
aspl_val = aspl[int(0.7*len(aspl)):int(0.9*len(aspl))]
aspl_test = aspl[int(0.9*len(aspl)):]

aspl_files = {
    'train': aspl_train,
    'val': aspl_val,
    'test': aspl_test
}

for type in aspl_files:
    if not os.path.exists(os.path.join(aspl_path, type)):
        os.makedirs(os.path.join(aspl_path, type))

    for file in aspl_files[type]:
        shutil.copy(os.path.join(file[1], file[0] + '.jpg'), os.path.join(aspl_path, type))
        shutil.copy(os.path.join(file[1], file[0] + '.txt'), os.path.join(aspl_path, type))


In [None]:
good_rendered = []
bad_rendered = []
mirage_rendered = []

for root, _, files in os.walk(ori_rendered_path):
    for file in files:
        if file.endswith('.png') or file.endswith('.jpg'):
            file = file.split('.')[0]
            if 'good' in root:
                good_rendered.append((file, root))
            elif 'bad' in root:
                bad_rendered.append((file, root))
            elif 'mirage' in root:
                mirage_rendered.append((file, root))


random.shuffle(good_rendered)
random.shuffle(bad_rendered)
random.shuffle(mirage_rendered)

rendered_train = good_rendered[:510] + bad_rendered[:340] + mirage_rendered[:170]
rendered_val = good_rendered[510:600] + bad_rendered[340:400] + mirage_rendered[170:200]

rendered_files = {
    'train': rendered_train,
    'val': rendered_val
}

for type in rendered_files:
    if not os.path.exists(os.path.join(rendered_path, type)):
        os.makedirs(os.path.join(rendered_path, type))

    for file in rendered_files[type]:
        shutil.copy(os.path.join(file[1], file[0] + '.png'), os.path.join(rendered_path, type))
        shutil.copy(os.path.join(file[1], file[0] + '.txt'), os.path.join(rendered_path, type))


In [None]:
ori_render_tests_path = os.path.abspath('../Backup/!rendered_original/tests')

control = []
nocomp = []
somecomp = []

for root, _, files in os.walk(ori_render_tests_path):
    for file in files:
        if file.endswith('.png') or file.endswith('.jpg'):
            file = file.split('.')[0]
            if 'control' in root:
                control.append((file, root))
            elif 'nocomp' in root:
                nocomp.append((file, root))
            elif 'somecomp' in root:
                somecomp.append((file, root))

random.shuffle(control)
random.shuffle(nocomp)
random.shuffle(somecomp)

dest_render_tests_path = os.path.abspath('./datasets/render_tests')

# copy 170 of each to their respective folders train
# copy 30 of each to their respective folders val

render_tests_files = {
    'control': control,
    'nocomp': nocomp,
    'somecomp': somecomp
}

for type in render_tests_files:
    if not os.path.exists(os.path.join(dest_render_tests_path, type)):
        os.makedirs(os.path.join(dest_render_tests_path, type))

    for x, file in enumerate(render_tests_files[type]):
        if x < 170:
            if not os.path.exists(os.path.join(dest_render_tests_path, type, 'train')):
                os.makedirs(os.path.join(dest_render_tests_path, type, 'train'))
            shutil.copy(os.path.join(file[1], file[0] + '.png'), os.path.join(dest_render_tests_path, type, 'train'))
            shutil.copy(os.path.join(file[1], file[0] + '.txt'), os.path.join(dest_render_tests_path, type, 'train'))
        else:
            if not os.path.exists(os.path.join(dest_render_tests_path, type, 'val')):
                os.makedirs(os.path.join(dest_render_tests_path, type, 'val'))
            shutil.copy(os.path.join(file[1], file[0] + '.png'), os.path.join(dest_render_tests_path, type, 'val'))
            shutil.copy(os.path.join(file[1], file[0] + '.txt'), os.path.join(dest_render_tests_path, type, 'val'))



### Rename class 4 to 3


In [None]:

for root, _, files in os.walk(aspl_path):
    rename_4_to_3(files, root)

for root, _, files in os.walk(rendered_path):
    rename_4_to_3(files, root)


In [None]:
directory = os.path.abspath('./datasets/render_tests')

for root, _, files in os.walk(directory):
    rename_4_to_3(files, root)
fix_labels(directory)


## OpenCV

Crop some of the validation images with rotation to check whether the model is robust.


In [None]:
# crop_x = 100
# crop_y = 775
# crop_width = 520
# crop_height = 320

def augment_test(output):
    # Iterate over the validation images
    for file in os.listdir(test_dir):
        if file.endswith('.jpg'):

            # Read the image
            image_path = os.path.join(test_dir, file)
            image = cv2.imread(image_path)


            # Read the corresponding label file
            with open(os.path.join(test_dir, file.split('.')[0] + '.txt'), 'r') as f:
                lines = f.readlines()
            
            shapes = []
            for i, line in enumerate(lines):
                if line[0] == '0' or line[0] == '2':
                    continue

                line = line.split()

                points = []
                for j in range(1, len(line), 2):
                    line[j] = float(line[j])*image.shape[1]
                    line[j+1] = float(line[j+1])*image.shape[0]
                    points.append([line[j], line[j+1]])
                shapes.append((line[0],points))



            # Define the cropping parameters
            crop_x = int(random.gauss(70, 8))  # Crop starting x-coordinate
            crop_y = int(random.gauss(750, 15))  # Crop starting y-coordinate
            crop_width = int(random.gauss(550, 15))  # Crop width
            crop_height = int(random.gauss(350, 10))  # Crop height
            rotate_angle = random.gauss(90, 50)-90


            # Define the rotation matrix
            M = cv2.getRotationMatrix2D((crop_width/2, crop_height/2), rotate_angle, 1)

            r = np.deg2rad(rotate_angle)
            newX,newY = (abs(np.sin(r)*crop_height) + abs(np.cos(r)*crop_width),abs(np.sin(r)*crop_width) + abs(np.cos(r)*crop_height))
            

            (tx,ty) = ((newX-crop_width)/2,(newY-crop_height)/2)
            M[0,2] += tx #third column of matrix holds translation, which takes effect after rotation.
            M[1,2] += ty



            # Cropping of image and points
            cropped_image = image[crop_y:crop_y+crop_height, crop_x:crop_x+crop_width]

            for i, (_, shape) in enumerate(shapes):
                for j, point in enumerate(shape):
                    x = point[0] - crop_x
                    y = point[1] - crop_y

                    shapes[i][1][j] = [x, y]

            # plt.imshow(cropped_image)
            # for _, shape in shapes:
            #     plt.plot(*zip(*shape), marker='o', color='r')
            # plt.show()



            # Rotation of image and points
            rotated_image = cv2.warpAffine(cropped_image, M, dsize=(int(newX), int(newY)))

            for i, (_, shape) in enumerate(shapes):
                shape = np.array([shape])
                shape = cv2.transform(shape, M)
                shapes[i] = (shapes[i][0], shape[0])

            # plt.imshow(rotated_image)
            # for _, shape in shapes:
            #     plt.plot(*zip(*shape), marker='o', color='r')
            # plt.show()


            # Save the data
            img_output_path = os.path.join(output, file)
            cv2.imwrite(img_output_path, rotated_image)

            lbl_output_path = os.path.join(output, file.split('.')[0] + '.txt')
            with open(lbl_output_path, 'w') as f:
                for id, shape in shapes:
                    f.write(f'{id} {shape[0][0]/newX} {shape[0][1]/newY} {shape[1][0]/newX} {shape[1][1]/newY} {shape[2][0]/newX} {shape[2][1]/newY} {shape[3][0]/newX} {shape[3][1]/newY}\n')

augment_test(output_test_dir)
