In [1]:
import os
import shutil
import torch
import torch.utils.data
import torchvision
from PIL import Image, ImageDraw
from pycocotools.coco import COCO
from pycocotools import mask as cocomask
import cv2
import copy
import matplotlib.pyplot as plt
import pandas as pd

import random
import json

In [2]:
# from https://stackoverflow.com/questions/75326066/coco-annotations-convert-rle-to-polygon-segmentation
def rle_to_coco(annotation: dict) -> list[dict]:
    """Transform the rle coco annotation (a single one) into coco style.
    In this case, one mask can contain several polygons, later leading to several `Annotation` objects.
    In case of not having a valid polygon (the mask is a single pixel) it will be an empty list.
    Parameters
    ----------
    annotation : dict
        rle coco style annotation
    Returns
    -------
    list[dict]
        list of coco style annotations (in dict format)
    """

    annotation["segmentation"] = cocomask.frPyObjects(
        annotation["segmentation"],
        annotation["segmentation"]["size"][0],
        annotation["segmentation"]["size"][1],
    )

    maskedArr = cocomask.decode(annotation["segmentation"])
    contours, _ = cv2.findContours(maskedArr, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE)
    segmentation = []

    for contour in contours:
        if contour.size >= 6:
            segmentation.append(contour)

    if len(segmentation) == 0:
        annotation["segmentation"] = []

    else:
        # annotation["bbox"] = annotation["segmentation"]["bbox"]
        # annotation["area"] = annotation["segmentation"]["area"]
        annotation["segmentation"] = []
        for i, seg in enumerate(segmentation):
            annotation["segmentation"].append(
                seg.astype(float).flatten().tolist()
            )
            # annotation["bbox"] = list(cv2.boundingRect(seg))
            # annotation["area"] = cv2.contourArea(seg)
            # annotation["instance_id"] = annotation["id"]
            # annotation["id"] = f"{annotation['id']}_{i}"
            # single_annotation.pop("segmentation")


    return annotation

In [3]:
class TreeLabel(torch.utils.data.Dataset):
    def __init__(self, root, annotation, transforms=None):
        self.root = root
        self.transforms = transforms
        self.coco = COCO(annotation)
        self.ids = list(sorted(self.coco.imgs.keys()))
        self.ann_count = {}

    def __getitem__(self, index):
        # Own coco file
        coco = self.coco
        # Image ID
        img_id = self.ids[index]
        # List: get annotation id from coco
        ann_ids = coco.getAnnIds(imgIds=img_id)
        # Dictionary: target coco_annotation file for an image
        coco_annotation = coco.loadAnns(ann_ids)
        # path for input image
        path = coco.loadImgs(img_id)[0]['file_name']
        # open the input image
        img = Image.open(os.path.join(self.root, path))

        # number of objects in the image
        num_objs = len(coco_annotation)

        # Bounding boxes for objects
        # In coco format, bbox = [xmin, ymin, width, height]
        # In pytorch, the input should be [xmin, ymin, xmax, ymax]
        boxes = []
        for i in range(num_objs):
            xmin = coco_annotation[i]['bbox'][0]
            ymin = coco_annotation[i]['bbox'][1]
            xmax = xmin + coco_annotation[i]['bbox'][2]
            ymax = ymin + coco_annotation[i]['bbox'][3]
            boxes.append([xmin, ymin, xmax, ymax])
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        # Labels (In my case, I only one class: target class or background)
        labels = torch.ones((num_objs,), dtype=torch.int64)
        # Tensorise img_id
        img_id = torch.tensor([img_id])
        # Size of bbox (Rectangular)
        areas = []
        for i in range(num_objs):
            areas.append(coco_annotation[i]['area'])
        areas = torch.as_tensor(areas, dtype=torch.float32)
        # Iscrowd
        iscrowd = torch.zeros((num_objs,), dtype=torch.int64)

        # Annotation is in dictionary format
        my_annotation = {}
        my_annotation["boxes"] = boxes
        my_annotation["labels"] = labels
        my_annotation["image_id"] = img_id
        my_annotation["area"] = areas
        my_annotation["iscrowd"] = iscrowd

        if self.transforms is not None:
            img = self.transforms(img)

        return img, my_annotation

    def __len__(self):
        return len(self.ids)
    
    def get_ann_counts(self):
        coco = self.coco
        for i in range(len(self.ids)):
            self.ann_count[self.ids[i]] = 0

        for i in range(len(self.ids)):
            ann_ids = coco.getAnnIds(imgIds=self.ids[i])
            # Dictionary: target coco_annotation file for an image
            coco_annotation = coco.loadAnns(ann_ids)
            for ann in coco_annotation:
                if (ann['iscrowd'] == 1):
                    self.ann_count[int(ann['image_id'])] += 1

    
    def export_images(self, offset: int, select, state: str = None):
        exp = []
        coco = self.coco
        for i in select:
            self.ann_count[self.ids[i]] = 0
            img = coco.loadImgs(self.ids[i])[0]
            img['id'] = int(img['id']) + offset
            img.pop('flickr_url')
            img.pop('coco_url')
            img.pop('date_captured')
            if state is not None:
                img['state'] = state

            ann_ids = coco.getAnnIds(imgIds=self.ids[i])
            coco_annotation = coco.loadAnns(ann_ids)
            if len(coco_annotation) > 0:
                exp.append(img)
        
        return exp
    
    def export_annotations(self, offset: int, select, img_offset: int):
        exp = []
        coco = self.coco
        for i in select:
            ann_ids = coco.getAnnIds(imgIds=self.ids[i])
            # Dictionary: target coco_annotation file for an image
            coco_annotation = coco.loadAnns(ann_ids)
            for ann in coco_annotation:
                self.ann_count[int(ann['image_id'])] += 1
                ann['id'] = int(ann['id']) + offset
                ann['image_id'] = int(ann['image_id']) + img_offset
                
                if (ann['iscrowd'] == 1):
                    ann['iscrowd'] = 0 # so we can train on it
                    ann = rle_to_coco(ann)
                    if len(ann["segmentation"]) > 0:
                        exp.append(ann)
                    
                else:
                    print(ann['segmentation'])
        
        return exp
    
    def export_mask(self, output_dir: str):
        for id in range(len(self.ids)):
            coco = self.coco
            ann_ids = coco.getAnnIds(imgIds=self.ids[id])
            path = coco.loadImgs(self.ids[id])[0]['file_name']
            # open the input image
            img = Image.open(os.path.join(self.root, path))

            mask_img = Image.new('L', (img.width, img.height), 0)
            
            # Dictionary: target coco_annotation file for an image
            coco_annotation = coco.loadAnns(ann_ids)
            for ann in coco_annotation:
                for poly in ann['segmentation']:
                    ImageDraw.Draw(mask_img, 'L').polygon(poly, fill=(255))
            
            mask_img.save(os.path.join(output_dir, path))

    def prep_zone_buckets(self, zones, csv):
        self.by_zone = {}
        for zone in zones:
            self.by_zone[zone] = []

        csv = pd.read_csv(csv)
        img_map = {}
        for _, row in csv.iterrows():
            img_map[row['filename']] = row['zone']

        for id in range(len(self.ids)):
            coco = self.coco
            ann_ids = coco.getAnnIds(imgIds=self.ids[id])
            path = coco.loadImgs(self.ids[id])[0]['file_name']

            if len(ann_ids) > 0 and path in img_map and img_map[path] in zones:
                self.by_zone[img_map[path]].append(id)
        
def get_transform():
    custom_transforms = []
    custom_transforms.append(torchvision.transforms.ToTensor())
    return torchvision.transforms.Compose(custom_transforms)

In [12]:
# get all directories to copy over
dirs = []

for file in os.listdir('../Treework/filtered_imported_data'):
    if file == '.DS_Store':
        continue

    if file[:file.find("Trees")] == 'California':
        dirs.append((f'../Treework/filtered_imported_data/{file}', file[:file.find("Trees")]))

print(dirs)

[('../Treework/filtered_imported_data/CaliforniaTrees7', 'California'), ('../Treework/filtered_imported_data/CaliforniaTrees0', 'California'), ('../Treework/filtered_imported_data/CaliforniaTrees1', 'California'), ('../Treework/filtered_imported_data/CaliforniaTrees6', 'California'), ('../Treework/filtered_imported_data/CaliforniaTrees8', 'California'), ('../Treework/filtered_imported_data/CaliforniaTrees3', 'California'), ('../Treework/filtered_imported_data/CaliforniaTrees4', 'California'), ('../Treework/filtered_imported_data/CaliforniaTrees5', 'California'), ('../Treework/filtered_imported_data/CaliforniaTrees2', 'California')]


In [13]:
zones = ['4a', '4b', '5a', '5b', '6a', '6b', '7a', '7b', '8a', '8b', '9a', '9b', '10a', '10b', '11a', '11b']
zone_counts = {}
for zone in zones:
    zone_counts[zone] = 0

for i, tup in enumerate(dirs):
    dir, state = tup
    train_coco = f'{dir}/annotations/instances_default.json'
    train_data_dir = f'{dir}/images/default'

    my_dataset = TreeLabel(root=train_data_dir,
                            annotation=train_coco,
                            transforms=get_transform()
                            )
    
    my_dataset.prep_zone_buckets(zones, 'california_image_zones.csv')
    for zone in my_dataset.by_zone:
        zone_counts[zone] += len(my_dataset.by_zone[zone])

for zone, count in zone_counts.items():
    print(zone, count)

loading annotations into memory...
Done (t=0.18s)
creating index...
index created!
loading annotations into memory...
Done (t=0.41s)
creating index...
index created!
loading annotations into memory...
Done (t=0.18s)
creating index...
index created!
loading annotations into memory...
Done (t=0.16s)
creating index...
index created!
loading annotations into memory...
Done (t=0.17s)
creating index...
index created!
loading annotations into memory...
Done (t=0.15s)
creating index...
index created!
loading annotations into memory...
Done (t=0.14s)
creating index...
index created!
loading annotations into memory...
Done (t=0.22s)
creating index...
index created!
loading annotations into memory...
Done (t=0.17s)
creating index...
index created!
4a 0
4b 0
5a 0
5b 23
6a 1048
6b 999
7a 1123
7b 949
8a 1047
8b 943
9a 847
9b 1027
10a 1051
10b 803
11a 20
11b 0


In [9]:
# get all directories to copy over
dirs = []

for file in os.listdir('../Treework/filtered_imported_data'):
    if file == '.DS_Store':
        continue

    if file[:file.find("Trees")] == 'Karnataka':
        dirs.append((f'../Treework/filtered_imported_data/{file}', file[:file.find("Trees")]))

print(dirs)

[('../Treework/filtered_imported_data/KarnatakaTrees4', 'Karnataka'), ('../Treework/filtered_imported_data/KarnatakaTrees3', 'Karnataka'), ('../Treework/filtered_imported_data/KarnatakaTrees2', 'Karnataka'), ('../Treework/filtered_imported_data/KarnatakaTrees5', 'Karnataka'), ('../Treework/filtered_imported_data/KarnatakaTrees', 'Karnataka'), ('../Treework/filtered_imported_data/KarnatakaTrees0', 'Karnataka'), ('../Treework/filtered_imported_data/KarnatakaTrees6', 'Karnataka'), ('../Treework/filtered_imported_data/KarnatakaTrees1', 'Karnataka'), ('../Treework/filtered_imported_data/KarnatakaTrees7Partial', 'Karnataka')]


In [11]:
zones = ['NORTH EAST TRANSITION', 'NORTH EAST DRY', 'NORTHERN DRY', 'CENTRAL DRY', 'EASTERN DRY', 
         'SOUTHERN DRY', 'SOUTHERN TRANSITION', 'WESTERN TRANSITION', 'HILL', 'COASTAL']
zone_counts = {}
for zone in zones:
    zone_counts[zone] = 0

for i, tup in enumerate(dirs):
    dir, state = tup
    train_coco = f'{dir}/annotations/instances_default.json'
    train_data_dir = f'{dir}/images/default'

    my_dataset = TreeLabel(root=train_data_dir,
                            annotation=train_coco,
                            transforms=get_transform()
                            )
    
    my_dataset.prep_zone_buckets(zones, 'karnataka_image_zones.csv')
    for zone in my_dataset.by_zone:
        zone_counts[zone] += len(my_dataset.by_zone[zone])

for zone, count in zone_counts.items():
    print(zone, count)

loading annotations into memory...
Done (t=0.04s)
creating index...
index created!
loading annotations into memory...
Done (t=0.04s)
creating index...
index created!
loading annotations into memory...
Done (t=0.03s)
creating index...
index created!
loading annotations into memory...
Done (t=0.03s)
creating index...
index created!
loading annotations into memory...
Done (t=0.02s)
creating index...
index created!
loading annotations into memory...
Done (t=0.03s)
creating index...
index created!
loading annotations into memory...
Done (t=0.03s)
creating index...
index created!
loading annotations into memory...
Done (t=0.03s)
creating index...
index created!
loading annotations into memory...
Done (t=0.02s)
creating index...
index created!
NORTH EAST TRANSITION 0
NORTH EAST DRY 142
NORTHERN DRY 304
CENTRAL DRY 621
EASTERN DRY 787
SOUTHERN DRY 175
SOUTHERN TRANSITION 498
WESTERN TRANSITION 146
HILL 20
COASTAL 131


In [18]:
# create the correct sections for COCO format
img_written = 0
ann_written = 0

test_dir = 'test_karnataka_only_ann'
train_dir = 'train_karnataka_only_ann'
os.makedirs(test_dir, exist_ok=True)
os.makedirs(train_dir, exist_ok=True)
shutil.rmtree(test_dir)
shutil.rmtree(train_dir)
os.mkdir(test_dir)
os.mkdir(f'{test_dir}/images')
os.mkdir(train_dir)
os.mkdir(f'{train_dir}/images')

for i, tup in enumerate(dirs):
    dir, state = tup
    train_coco = f'{dir}/annotations/instances_default.json'
    train_data_dir = f'{dir}/images/default'

    my_dataset = TreeLabel(root=train_data_dir,
                            annotation=train_coco,
                            transforms=get_transform()
                            )
    
    test = []
    train = []
    # split data 10%/90%
    for j in range(len(my_dataset.ids)):
        if random.random() < 0.1:
            test.append(j)
        else:
            train.append(j)

    imgs_test = my_dataset.export_images(img_written, test, state=state)
    imgs_train = my_dataset.export_images(img_written, train, state=state)

    anns_test = my_dataset.export_annotations(ann_written, test, img_written)
    anns_train = my_dataset.export_annotations(ann_written, train, img_written)

    img_written += len(imgs_test) + len(imgs_train) + 10000
    ann_written += len(anns_test) + len(anns_train) + 10000

    with open(f"{test_dir}/annotations_img.txt", 'a') as f:
        for j, img in enumerate(imgs_test):
            if i != 0 or j != 0:
                f.write(',')
            f.write(f"{json.dumps(img)}")

    with open(f"{test_dir}/annotations_ann.txt", 'a') as f:
        for j, ann in enumerate(anns_test):
            if i != 0 or j != 0:
                f.write(',')
            f.write(f"{json.dumps(ann)}")

    with open(f"{train_dir}/annotations_img.txt", 'a') as f:
        for j, img in enumerate(imgs_train):
            if i != 0 or j != 0:
                f.write(',')
            f.write(f"{json.dumps(img)}")

    with open(f"{train_dir}/annotations_ann.txt", 'a') as f:
        for j, ann in enumerate(anns_train):
            if i != 0 or j != 0:
                f.write(',')
            f.write(f"{json.dumps(ann)}")

    # copy images to their respective directories
    for j in test:
        file = my_dataset.coco.loadImgs(my_dataset.ids[j])[0]['file_name']
        shutil.copy(f'{dir}/images/default/{file}', f'{test_dir}/images/{file}')

    for j in train:
        file = my_dataset.coco.loadImgs(my_dataset.ids[j])[0]['file_name']
        shutil.copy(f'{dir}/images/default/{file}', f'{train_dir}/images/{file}')

loading annotations into memory...
Done (t=0.05s)
creating index...
index created!
loading annotations into memory...
Done (t=0.04s)
creating index...
index created!
loading annotations into memory...
Done (t=0.04s)
creating index...
index created!
[]
loading annotations into memory...
Done (t=0.04s)
creating index...
index created!
[]
[]
loading annotations into memory...
Done (t=0.03s)
creating index...
index created!
loading annotations into memory...
Done (t=0.04s)
creating index...
index created!
[]
loading annotations into memory...
Done (t=0.04s)
creating index...
index created!
loading annotations into memory...
Done (t=0.04s)
creating index...
index created!
loading annotations into memory...
Done (t=0.03s)
creating index...
index created!
[]
[]


In [19]:
print(img_written - 10000 * len(dirs))

3080


In [20]:
# merge COCO formatted data
with open(f'{test_dir}/annotations.json', 'w') as f1:
    f1.write('{"licenses":[{"name":"","id":0,"url":""}],"info":{"contributor":"","date_created":"","description":"","url":"","version":"","year":""},"categories":[{"id":1,"name":"Kejri Tree","supercategory":""},{"id":2,"name":"Low Quality","supercategory":""}],"images":[')
    with open(f'{test_dir}/annotations_img.txt', 'r') as f2:
        f1.write(f2.read())
    f1.write('],"annotations":[')
    with open(f'{test_dir}/annotations_ann.txt', 'r') as f2:
        f1.write(f2.read())
    f1.write(']}')

with open(f'{train_dir}/annotations.json', 'w') as f1:
    f1.write('{"licenses":[{"name":"","id":0,"url":""}],"info":{"contributor":"","date_created":"","description":"","url":"","version":"","year":""},"categories":[{"id":1,"name":"Kejri Tree","supercategory":""},{"id":2,"name":"Low Quality","supercategory":""}],"images":[')
    with open(f'{train_dir}/annotations_img.txt', 'r') as f2:
        f1.write(f2.read())
    f1.write('],"annotations":[')
    with open(f'{train_dir}/annotations_ann.txt', 'r') as f2:
        f1.write(f2.read())
    f1.write(']}')

In [48]:
# export masks

data_dir = f'{train_dir}/images'
annotations = f'{train_dir}/annotations.json'
my_dataset = TreeLabel(root=data_dir,
                        annotation=annotations,
                        transforms=get_transform()
                        )
my_dataset.export_mask(f'{train_dir}/masks')

data_dir = f'{test_dir}/images'
annotations = f'{test_dir}/annotations.json'
my_dataset = TreeLabel(root=data_dir,
                        annotation=annotations,
                        transforms=get_transform()
                        )
my_dataset.export_mask(f'{test_dir}/masks')

loading annotations into memory...
Done (t=0.54s)
creating index...
index created!
loading annotations into memory...
Done (t=0.07s)
creating index...
index created!
