In [1]:
import torch
import albumentations as A
from albumentations.pytorch import ToTensorV2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from PIL import Image
import os
import json
from sklearn.model_selection import train_test_split
import shutil
import cv2
import math

In [2]:
train_df = (pd.read_csv('/home/Special_Problem/train_df_summary.csv'))['File'].to_list()
val_df = (pd.read_csv('/home/Special_Problem/val_df_summary.csv'))['File'].to_list()
test_df = (pd.read_csv('/home/Special_Problem/test_df_summary.csv'))['File'].to_list()

In [3]:
def populate_annotations(json_data, df, file):
    label_map = {'Cluster' : 0, 'Clusters': 0, 'Thyrocyte': 1, 'Thyrocytes': 1}
    annotations = []
    for _, row in df.iterrows():
        class_id = label_map[row['label_name']]
        x_min = row['bbox_x']
        y_min = row['bbox_y']
        bbox_width = row['bbox_width']
        bbox_height = row['bbox_height']
        annotations.append({
                "class_id" : class_id,
                "x_min" : x_min,
                "y_min" : y_min,
                "bbox_width" : bbox_width,
                "bbox_height" : bbox_height
            })
        
    for item in json_data[file].values():
        if isinstance(item, dict) and item:
            for region in item.values():
                label = region['region_attributes']["label"]
                class_id = label_map[label]  # map to int
                
                poly_x = region['shape_attributes']['all_points_x']
                poly_y = region['shape_attributes']['all_points_y']
                
                x_min, x_max = min(poly_x), max(poly_x)
                y_min, y_max = min(poly_y), max(poly_y)
                bbox_width = x_max - x_min
                bbox_height = y_max - y_min
                
                annotations.append({
                        "class_id" : class_id,
                        "x_min" : x_min,
                        "y_min" : y_min,
                        "bbox_width" : bbox_width,
                        "bbox_height" : bbox_height
                    })
    # print(annotations)
    return annotations

In [4]:
def get_coordinates_intersections(x_min, t_x_min, y_min, t_y_min, x_max, t_x_max, y_max, t_y_max):
    return (
        max(x_min, t_x_min),   # overlap left
        max(y_min, t_y_min),   # overlap top
        min(x_max, t_x_max),   # overlap right
        min(y_max, t_y_max)    # overlap bottom
    )

In [5]:
def save_adjust_bboxes_for_tile(annotations, tile_x, tile_y, tile_size, rows, tile_id, file, tile, format, min_pixel_size = 8):
    has_annotation = False
    for annotation in annotations:
        # Original Bounding Box
        x_min, y_min = annotation['x_min'], annotation['y_min']
        x_max, y_max = x_min + annotation['bbox_width'], y_min + annotation['bbox_height']

        # Tile Corners
        t_x_min, t_y_min = tile_x, tile_y
        t_x_max, t_y_max = t_x_min + tile_size, t_y_min + tile_size

        ix1, iy1, ix2, iy2 = get_coordinates_intersections(
            x_min, t_x_min, y_min, t_y_min, x_max, t_x_max, y_max, t_y_max
        )
        if ix1 < ix2 and iy1 < iy2:
            new_width = ix2 - ix1
            new_height = iy2 - iy1
            if new_width >= min_pixel_size and new_height >= min_pixel_size:
                new_x = ix1 - tile_x
                new_y = iy1 - tile_y
                rows.append({
                    'file': file,
                    'tile_id' : tile_id,
                    "class_id" : annotation['class_id'],
                    "x_min" : new_x,
                    "y_min" : new_y,
                    "bbox_width" : new_width,
                    "bbox_height" : new_height
                })
                has_annotation = True
    # Save image only if annotation exists
    save_dir = f'/home/Special_Problem/yolo_dataset_version_2/tiles/{file}'
    os.makedirs(save_dir, exist_ok=True)
    if has_annotation:
        cv2.imwrite(f'/home/Special_Problem/yolo_dataset_version_2/tiles/{file}/{tile_id}.{format}', tile)

In [6]:
def get_directory(file):
    if file in train_df:
        return "train/original"
    if file in val_df:
        return "val"
    if file in test_df:
        return "test"

In [7]:
dataset = os.walk('/home/Special_Problem/Data')
rows = []
for root, dirs, files in dataset:
    for file in files:
        if file.endswith('.jpg') or file.endswith('.jpeg') or file.endswith('.png'):
            format = file.split('.')[-1]
            try:
                csv_path = os.path.join(root + " - ANNOTATED FILES", file.replace(f".{format}", "A.csv"))
                json_path = os.path.join(root + " - ANNOTATED FILES", file.replace(f".{format}", "B.json"))
                
                with open(json_path, "r") as json_file:
                    json_data = json.load(json_file)
                df = pd.read_csv(csv_path)
                annotations = populate_annotations(json_data, df, file)
                                
                bgr_image = cv2.imread(os.path.join(root, file))
                rgb_image = cv2.cvtColor(bgr_image, cv2.COLOR_BGR2RGB)
                
                height, width, _ = rgb_image.shape
                stride = int(512 * (1 - .25)) # Tile size and 25% "Windowing Overlap"
                tile_id = 0
                
                for row_idx, y0 in enumerate(range(0, height, stride)):
                    for col_idx, x0 in enumerate(range(0, width, stride)):
                        tile_id = f"{row_idx}_{col_idx}"
                        
                        x1 = min(x0 + 512, width)
                        y1 = min(y0 + 512, height)
                        tile = rgb_image[y0:y1, x0:x1]

                        save_adjust_bboxes_for_tile(annotations, x0, y0, 512, rows, tile_id, file, tile, format)
                        
            except FileNotFoundError:
                print("File not found")
rows = pd.DataFrame(rows, columns=['file', 'tile_id', 'class_id', 'x_min', 'y_min', 'bbox_width', 'bbox_height'])
rows.to_csv('/home/Special_Problem/yolo_dataset_version_2/preprocessed/preprocessed.csv', index=False)
print("/home/Special_Problem/yolo_dataset_version_2/preprocessed/preprocessed.csv")

/home/Special_Problem/yolo_dataset_version_2/preprocessed/preprocessed.csv


In [8]:
def get_normalize_bounding_box(x_min, y_min, bbox_width, bbox_height, img_width, img_height):
    x_center = (x_min + bbox_width / 2) / img_width
    y_center = (y_min + bbox_height / 2) / img_height
    w_norm = bbox_width / img_width
    h_norm = bbox_height / img_height
    return x_center, y_center, w_norm, h_norm