<h3>COCO Converter</h3>

<code>Language  : Python</code><br>
<code>Owned By  : Labelbees.Inc</code><br>
<code>Version   : 1.1</code><br>
<code>Author    : Ramasamy Rajendran</code><br>

__Description__
<br><br><code>This converter converts different types of json files from annotation platforms.<br>
It automatically detects the format of the files and converts them, of course you can use them directly.</code><br>
<br><code>__Json Types Supported__</code><br>
<li>Label-Studio</li>
<li>Coco_Annotator</li>
<li>Labelme</li>

<br><code>__Supported Geometries__</code><br>
<li>BBox</li>
<li>Polygon</li>
<li>Mask(Pixel)</li>

In [144]:
# Importing neccessary libraries
import re
import json
import pandas as pd
from tqdm import tqdm
from glob import glob, os
from itertools import chain
import shapely.geometry as sg
from os.path import basename, dirname

In [145]:
def read_json(file):
    json_read_data = []
    loaded_files = []
    if os.path.isfile(file) and file.endswith('.json'):
        loaded_files.append(basename(file))
        with open(file, 'r') as js_file:
            read_file = json.loads(js_file.read())
            if isinstance(read_file, list):
                for a_d in read_file: json_read_data.append(a_d)
            else:json_read_data.append(read_file)
            js_file.close()
    elif os.path.isfile(file) and not file.endswith('.json'):
        raise ValueError(f"{basename(file)} file is not a json file !!!")
    elif os.path.exists(file):
        for jfile in tqdm(glob(os.path.join(file, '*.json')), desc="Meging JSON files:"):
            loaded_files.append(basename(jfile))
            with open(jfile, 'r') as js_file:
                read_file = json.loads(js_file.read())
                if isinstance(read_file, list):
                    for a_d in read_file: json_read_data.append(a_d)
                else:json_read_data.append(read_file)
                js_file.close()
    if len(json_read_data) == 0:
        raise ValueError("\n This error occured because of below reasons: \n 1.File or directory is not exists \n 2.File is not readable \n 3.Directory not contains any json files")
    else:
        print(f"Loaded Json Files: {len(loaded_files)}")
        if len(loaded_files) < 10: print(*loaded_files, sep='\n')
        save_json(json_read_data, read_json.__name__, file)
        return json_read_data

def save_json(data, file_name, loc):
    if os.path.isfile(loc):
        dir_name = dirname(loc)
    else: dir_name = loc
    dir_name = os.path.join(dir_name, 'Merged_Json')
    os.makedirs(dir_name, exist_ok=True)
    with open(os.path.join(dir_name, f'{file_name}.json'), 'w') as wj_file:
        write_jd = json.dump(data, wj_file)
        wj_file.close()
    print(f'\nFile Saved at {dir_name} as {file_name}.json \n')

def guess_format(req_keys):
    formats = {
    "['images', 'categories', 'annotations']" : coco_to_df,
    "['id', 'annotations', 'drafts', 'predictions', 'data', 'meta', 'created_at', 'updated_at', 'inner_id', 'total_annotations', 'cancelled_annotations', 'total_predictions', 'comment_count', 'unresolved_comment_count', 'last_comment_updated_at', 'project', 'updated_by', 'comment_authors']" : ls_json_to_df,
    "['version', 'flags', 'shapes', 'imagePath', 'imageData', 'imageHeight', 'imageWidth']" : labelme_to_df
    }
    map_format = formats[str(req_keys)]
    return map_format

def get_poly(segment):
    if isinstance(segment[0], list):
        poly = sg.Polygon(segment)
        bbox = list(poly.bounds)
        area = round(poly.area,2)
        bbox = [bbox[0], bbox[1], bbox[2]-bbox[0], bbox[3]-bbox[1]]
        segment = list(chain.from_iterable(segment))
        segment = [[round(seg,2) for seg in segment]]
        bbox = [round(box,2) for box in bbox]
        return(segment, bbox, area)
    else: raise ValueError('Incorrect coords!, Polygon needs to be in the format [[x1,y1], [x2,y2]]')

def ls_coords_to_bbox(coords, original_width, original_height):
    x = round(coords[0] / 100.0 * original_width, 2)
    y = round(coords[1] / 100.0 * original_height, 2)
    width = round(coords[2] / 100.0 * original_width, 2)
    height = round(coords[3] / 100.0 * original_height, 2)
    return [x, y, width, height]

def ls_coords_to_polygon(coords, original_width, original_height):
    new_coords = []
    for points in coords:
        ptn1 = points[0] / 100.0 * original_width
        ptn2 = points[1] / 100.0 * original_height
        new_coords.append([round(ptn1, 2), round(ptn2, 2)])
    return new_coords
    
def get_ls_annotations(anno_dict, width, height):
    if anno_dict.get('points'):
        segment = ls_coords_to_polygon(anno_dict.get('points'), width, height)
        segment, bbox, area = get_poly(segment)
    elif anno_dict.get('rle'):
        segment = anno_dict.get('rle')
        bbox = []
        area = 0
        # Find the bbox of the pixel segment
        # write code for keypoints
    else:
        bbox = [anno_dict.get('x'), anno_dict.get('y'), anno_dict.get('width'), anno_dict.get('height')]
        bbox = ls_coords_to_bbox(bbox, width, height)
        segment = [bbox[0], bbox[1], round(bbox[0]+bbox[2],2), round(bbox[1]+bbox[3],2)]
        area = round(sg.box(*segment).area,2)
    rotation = round(anno_dict.get('rotation', 0),2)
    return (segment, bbox, area, rotation)

def ls_json_to_df(json_data):
    path = json_data['data'].get('image', json_data['data'].get('img'))
    file_name = re.split('%5C|-',path)[-1]
    annotations = json_data['annotations'][0]['result']
    anno_df = pd.DataFrame(annotations)
    anno_type = anno_df.type.unique()[0]
    w = anno_df.original_width.unique()[0]
    h = anno_df.original_height.unique()[0]
    anno_df['file_name'] = file_name
    anno_df['name'] = anno_df.value.apply(lambda row: row[anno_type][0].lower())
    anno_df['value'] = anno_df.value.apply(lambda row: get_ls_annotations(row, w, h))
    anno_df['segmentation'] = anno_df.value.apply(lambda row: row[0])
    anno_df['bbox'] = anno_df.value.apply(lambda row: row[1])
    anno_df['area'] = anno_df.value.apply(lambda row: row[2])
    anno_df['rotation'] = anno_df.value.apply(lambda row: row[3])
    anno_df = anno_df[['file_name', 'original_width', 'original_height', 'name', 'segmentation', 'bbox', 'type', 'area', 'rotation']]
    anno_df.columns = ['file_name', 'width', 'height', 'name', 'segmentation', 'bbox', 'type', 'area', 'rotation']
    anno_df.type = anno_df.type.replace('labels','', regex=True)
    return anno_df

def coco_to_df(json_element):
    image_dict = json_element['images']
    cat_dict = json_element['categories']
    anno_dict = json_element['annotations']
    img_df = pd.DataFrame(image_dict)
    cat_df = pd.DataFrame(cat_dict)
    anno_df = pd.DataFrame(anno_dict)
    coco_df = pd.merge(anno_df, cat_df, left_on='category_id', right_on='id')
    coco_df = pd.merge(coco_df, img_df, left_on='image_id', right_on='id')
    coco_df['rotation'] = 0
    coco_df = coco_df[['file_name', 'width_x', 'height_x', 'name', 'segmentation', 'bbox', 'isbbox', 'area', 'rotation']]
    coco_df.name = coco_df.name.str.lower()
    coco_df.isbbox = coco_df.isbbox.apply(lambda row: "rectangle" if row == True else "polygon")
    coco_df.columns = ['file_name', 'width', 'height', 'name', 'segmentation', 'bbox', 'type', 'area', 'rotation']
    return coco_df

def labelme_to_df(lbme_file):
    label_me_df = pd.DataFrame()
    file_name = lbme_file['imagePath']
    width = lbme_file['imageWidth']
    height = lbme_file['imageHeight']
    for shape in lbme_file['shapes']:
        anno_type = shape['shape_type']
        rotation = round(shape.get('rotation', 0),2)
        if anno_type == 'polygon':
            segment, bbox, area = get_poly(shape['points'])
            label = shape['label']
        elif anno_type == 'rectangle':
            bbox = shape['points']
            segment = list(chain.from_iterable(bbox))
            bbox = [segment[0], segment[1], segment[2]-segment[0], segment[3]-segment[1]]
            area = round(sg.box(*segment).area,2)
            segment = [[round(seg,2) for seg in segment]]
            bbox = [round(box,2) for box in bbox]
            label = shape['label']
        else: raise NameError(f'{anno_type} geometry conversion is not written yet!!')
        shape_dict = {
            'file_name': file_name,
            'width'    : width,
            'height'   : height,
            'name'     : label,
            'segmentation': segment,
            'bbox'     : bbox,
            'type'     : anno_type,
            'area'     : area,
            'rotation' : rotation
            }
        shape_df = pd.json_normalize(shape_dict)
        label_me_df = pd.concat([label_me_df, shape_df], ignore_index=True)
    return label_me_df

def df_to_coco(df):
    
    images = []
    categories = []
    annotations = []
    final_dict = {}

    df['image_id'] = df['file_name'].astype('category').cat.codes
    df['categoryid'] = pd.Categorical(df['name'], ordered=True).codes
    df['categoryid'] = df['categoryid'] + 1
    df['image_id'] = df['image_id'] + 1
    df['anno_id'] = df.index
    df['anno_id'] = df['anno_id'] + 1

    def image(row):
        image = {
            "file_name" : row.file_name,
            "id" : row.image_id,
            "width" : row.width,
            "height" : row.height,
        }
        return image

    def category(row):
        category = {
            "id" : row.categoryid,
            "name" : row.name,
            "supercategory" : ""#row.supercategory
        }
        return category

    def annotation(row):
        annotation = {
            "id" : row.anno_id,
            "image_id" : row.image_id,
            "category_id" : row.categoryid,
            "segmentation" : row.segmentation,
            "bbox" : row.bbox,
            "iscrowd" : 0,
            "area" : row.area,
        }
        return annotation

    img_df = df.drop_duplicates(subset=['image_id']).sort_values(by='image_id', ignore_index=True)
    print(f'Images Present: {len(img_df.image_id)}')
    for row in img_df.itertuples():
        images.append(image(row))

    cat_df = df.drop_duplicates(subset=['categoryid']).sort_values(by='categoryid', ignore_index=True)
    print(f'Categories Present: {len(cat_df.categoryid)}')
    for row in cat_df.itertuples():
        categories.append(category(row))

    print(f'Annotations Present: {len(df.anno_id)}')
    for row in df.itertuples():
        annotations.append(annotation(row))

    final_dict['images'] = images
    final_dict['categories'] = categories
    final_dict['annotations'] = annotations
    return final_dict

def convert_coco():
    json_dir = input('Give the path to a JSON file or directory !!')
    if os.path.isfile(json_dir) or os.path.isdir(json_dir):
        json_data = read_json(json_dir)
    else: raise ValueError('Incorrect file or directory !!')
    coco_conv_df = pd.DataFrame()
    for ann_data in json_data:
        dict_keys = list(ann_data.keys())
        dict_format = guess_format(dict_keys)
        ann_df = dict_format(ann_data)
        coco_conv_df = pd.concat([coco_conv_df, ann_df], ignore_index=True)
    coco_conv_df.name = coco_conv_df.name.str.lower()
    conv_file = df_to_coco(coco_conv_df)
    save_json(conv_file, 'coco_file', json_dir)

convert_coco()

Meging JSON files:: 100%|██████████| 71/71 [00:00<00:00, 237.47it/s]


Loaded Json Files: 71

File Saved at D:\Tasks\prototypes\lb_tod_jsons\lvl_2\Merged_Json as read_json.json 

Images Present: 1049
Categories Present: 58
Annotations Present: 3757

File Saved at D:\Tasks\prototypes\lb_tod_jsons\lvl_2\Merged_Json as coco_file.json 

