In [None]:
#| default_exp data_preparation

In [None]:
#| export
from pathlib import Path
from fastcore.all import *
import cv2
import numpy as np
from typing import List, Tuple, Union, Callable, Optional, Dict
from tqdm.auto import tqdm
import matplotlib.pyplot as plt 
import json
import yaml

# first find the data

code highly taken from [here](https://github.com/bnsreenu/python_for_microscopists/blob/master/332%20-%20All%20about%20image%20annotations%E2%80%8B/binary_to_coco_V3.0.py)

In [None]:
path = Path(r'/home/hasan/workspace/data/microscopy_data/')
path.ls()

(#13) [Path('/home/hasan/workspace/data/microscopy_data/training_groundtruth.tif'),Path('/home/hasan/workspace/data/microscopy_data/training.tif'),Path('/home/hasan/workspace/data/microscopy_data/mask_name.tif'),Path('/home/hasan/workspace/data/microscopy_data/testing.tif'),Path('/home/hasan/workspace/data/microscopy_data/testing_groundtruth.tif'),Path('/home/hasan/workspace/data/microscopy_data/masks'),Path('/home/hasan/workspace/data/microscopy_data/test_patch_masks'),Path('/home/hasan/workspace/data/microscopy_data/patch_images'),Path('/home/hasan/workspace/data/microscopy_data/test_patch_images'),Path('/home/hasan/workspace/data/microscopy_data/patch_masks')...]

In [None]:
trn_msk_path = Path(r'/home/hasan/workspace/data/microscopy_data/patch_images')
trn_img_path = Path(r'/home/hasan/workspace/data/microscopy_data/patch_masks/')
trn_output_path = Path(r'/home/hasan/workspace/data/microscopy_data/yolo_dataset_train')
Path(trn_output_path).mkdir(parents=True, exist_ok=True)    

val_msk_path = Path(r'/home/hasan/workspace/data/microscopy_data/test_patch_images/')
val_img_path = Path(r'/home/hasan/workspace/data/microscopy_data/test_patch_masks/')
val_output_path = Path(r'/home/hasan/workspace/data/microscopy_data/yolo_dataset_test')
Path(val_output_path).mkdir(parents=True, exist_ok=True)

trn_json_path = Path(r'/home/hasan/workspace/data/microscopy_data/patch_mask_train_coco_format.json')
val_json_path = Path(r'/home/hasan/workspace/data/microscopy_data/patch_mask_val_coco_format.json')
trn_msk_path.ls(), trn_img_path.ls()


((#1642) [Path('/home/hasan/workspace/data/microscopy_data/patch_images/img_147_p_7.png'),Path('/home/hasan/workspace/data/microscopy_data/patch_images/img_51_p_10.png'),Path('/home/hasan/workspace/data/microscopy_data/patch_images/img_164_p_1.png'),Path('/home/hasan/workspace/data/microscopy_data/patch_images/img_122_p_4.png'),Path('/home/hasan/workspace/data/microscopy_data/patch_images/img_134_p_1.png'),Path('/home/hasan/workspace/data/microscopy_data/patch_images/img_80_p_2.png'),Path('/home/hasan/workspace/data/microscopy_data/patch_images/img_77_p_8.png'),Path('/home/hasan/workspace/data/microscopy_data/patch_images/img_47_p_7.png'),Path('/home/hasan/workspace/data/microscopy_data/patch_images/img_78_p_10.png'),Path('/home/hasan/workspace/data/microscopy_data/patch_images/img_83_p_2.png')...],
 (#1642) [Path('/home/hasan/workspace/data/microscopy_data/patch_masks/img_147_p_7.png'),Path('/home/hasan/workspace/data/microscopy_data/patch_masks/img_51_p_10.png'),Path('/home/hasan/wor

# We need to create a format which can yolov8 works

 ### 1. First convert coco format

In [None]:
#| export
get_name = np.vectorize(lambda x: Path(x).name)

In [None]:
#| export
def get_contours(img:np.ndarray):
    'get contours from masks'

    _, thresh = cv2.threshold(img, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
    return cv2.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)[0]


In [None]:
#| export
def from_contr_to_annotation(
                            sn_cntr:list, # single contour
                            consider_min_area:bool=True,# whether to use min_area parameter
                            min_area:int=0,
                            )->Tuple:
    'Create annotation dict from  a single contour'
    bbox = cv2.boundingRect(sn_cntr)
    area = cv2.contourArea(sn_cntr)
    segmentation = sn_cntr.flatten().tolist()
    if consider_min_area:
        if area > min_area:
            return bbox, area, segmentation
        return None, None, None
    else:
        return bbox, area, segmentation



In [None]:
#| export
def get_mask_info(
        msk_path, 
        min_area=0,
        )->Tuple:
    all_masks = msk_path.ls()

    image_infos = []
    annotations = []
    annotation_id=0
    for idx, msk_fn in tqdm(enumerate(all_masks),total=len(all_masks)):
        image_id = idx +1
        file_name = msk_fn.name

        mask = cv2.imread(str(msk_fn), cv2.IMREAD_GRAYSCALE)
        height, width = mask.shape

        if file_name not in map(itemgetter('file_name'), image_infos):  
            image_info = {
                'id': image_id,
                'width': width,
                'height': height,
                'file_name': file_name}
            image_infos.append(image_info)
        else:
            image_info = list(filter(lambda x: x['file_name'] == file_name, image_infos))[0]
        
        cntrs = get_contours(mask)
        for cntr in cntrs:
            bbox, area, segmentation = from_contr_to_annotation(cntr, min_area=min_area)
            if bbox:
                annotation = {
                    'image_id': image_id,
                    'id': annotation_id,
                    'category_id': 1,
                    'iscrowd': 0,
                    'area': area,
                    'bbox': bbox,
                    'segmentation': [segmentation]
                }
                annotations.append(annotation)
                annotation_id +=1
    return image_infos, annotations, annotation_id


        

        



In [None]:
image_infos, annotations, annotation_id=get_mask_info(trn_msk_path, min_area=0)

  0%|          | 0/1642 [00:00<?, ?it/s]

In [None]:
category_ids = {
    "object": 1,
}

In [None]:
#| export
def process_masks(
        mask_path:Union[str, Path],
        json_path:Union[str, Path],
        category_ids:Dict,
        ):
    coco_format = {
        "info": {},
        "licenses": [],
        "images":[],
        "categories": [{"id":v, "name":k, "supercategory":k } for k, v in category_ids.items()],
        "annotations":[]

    }

    coco_format['images'], coco_format['annotations'], ann_cnt = get_mask_info(mask_path)
    with open(json_path, 'w') as f:
        json.dump(coco_format, f, sort_keys=True, indent=4)

#### Creating coco format json file for training set

In [None]:
process_masks(
    mask_path=trn_msk_path, 
    json_path=trn_json_path, 
    category_ids=category_ids)

  0%|          | 0/1642 [00:00<?, ?it/s]

#### Creating coco format for validation data

In [None]:
process_masks(
    mask_path=val_msk_path, 
    json_path=val_json_path, 
    category_ids=category_ids)

  0%|          | 0/1725 [00:00<?, ?it/s]

### 2. Now convet to yolo format

In [None]:
#| export
def read_json(file_path):
    with open(file_path, 'r') as f:
        data = json.load(f)
    return data


In [None]:
trn_images = trn_img_path.ls()

In [None]:
json_data = read_json(trn_json_path)

In [None]:
json_data.keys()

dict_keys(['annotations', 'categories', 'images', 'info', 'licenses'])

In [None]:
#| export
def get_file_info(json_data:dict, file_name:str):
    return list(filter(lambda x: x['file_name'] == file_name, json_data['images']))[0]

In [None]:
#| export
def get_annotations(json_data:dict, file_name:str):
    image_id = get_file_info(json_data, file_name)['id']
    return list(filter(lambda x: x['image_id'] == image_id, json_data['annotations']))

In [None]:
#| export
def normalized_polygon(polygon:List, width:int, height:int):
    'normalize polygon coordinates based on image height and width'

    n_p = np.array(polygon).reshape(-1, 2) / np.array([image_width, image_height])
    return n_p.flatten().tolist()



In [None]:
trn_image_names = get_name(trn_img_path.ls())

In [None]:
os.path.splitext(i)

('img_147_p_7', '.png')

In [None]:
for i in tqdm(trn_image_names,total=len(trn_image_names)):
    file_info = get_file_info(json_data, i)
    image_height = file_info['height']
    image_width = file_info['width']
    image_annotation = get_annotations(json_data, i)
    if image_annotation:
        with open(trn_output_path/f'{Path(i).stem}.txt', 'w') as f_o:
            for ann in image_annotation:
                current_cat = ann['category_id'] -1
                polygon = ann['segmentation'][0]
                norm_poly = normalized_polygon(
                                            polygon, 
                                            width=image_width, 
                                            height=image_height)

                f_o.write(f'{current_cat} {" ".join(map(str, norm_poly))}\n')

    

  0%|          | 0/1642 [00:00<?, ?it/s]

In [None]:
#| export
def create_yolo_dataset(
    img_path:Union[str, Path],
    output_path:Union[str, Path],
    json_path:Union[str, Path],
    )->None:

    'Create yolo dataset from coco format'

    Path(output_path).mkdir(parents=True, exist_ok=True)

    json_data = read_json(json_path)

    # getting the names of  the images
    image_names = get_name(img_path.ls())

    for i in tqdm(image_names,total=len(image_names)):

        file_info = get_file_info(json_data, i)
        image_height = file_info['height']
        image_width = file_info['width']
        image_annotation = get_annotations(json_data, i)
        # in case annotations available
        if image_annotation:

            # Creating txt file for each image
            with open(output_path/f'{Path(i).stem}.txt', 'w') as f_o:
                for ann in image_annotation:
                    current_cat = ann['category_id'] -1
                    polygon = ann['segmentation'][0]
                    norm_poly = normalized_polygon(
                                                polygon, 
                                                width=image_width, 
                                                height=image_height)

                    f_o.write(f'{current_cat} {" ".join(map(str, norm_poly))}\n')


In [None]:
create_yolo_dataset(
    img_path=val_img_path,
    output_path=val_output_path,
    json_path=val_json_path,
)

  0%|          | 0/1725 [00:00<?, ?it/s]

In [None]:
j

In [None]:
names = [cat['name']for cat in json_data['categories']]
nc = len(names)
nc

1

In [None]:
#| export
def create_yaml(
    json_path:Union[str, Path], # json path with its name
    yaml_path:Union[str, Path], # output path with yaml name
    train_path:Union[str, Path],# train images path
    val_path:Union[str, Path], # validation images path
    test_path:Union[str, Path, None]=None,
    )->None:

    ' Create a yaml with trianing and validation images path'


    json_data = read_json(json_path)
    names = [cat['name']for cat in json_data['categories']]

    # Number of classes
    nc = len(names)
    yaml_data ={
        'names': names,
        'nc': nc,
        'test':test_path if test_path else '',
        'train':train_path,
         'val':val_path

    }
    with open(yaml_path, 'w') as f:
        yaml.dump(
                yaml_data, 
                f, 
                default_flow_style=False
                )


In [None]:
create_yaml(
    json_path=trn_json_path, 
    yaml_path=f'{path}/data.yaml', 
    train_path=trn_img_path, 
    val_path=val_img_path
    )

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()