# Data Preparation

In [3]:
# Packages
import json
from pathlib import Path

In [4]:
# Parameters
NB_DIR = Path.cwd()
REPO_ROOT = NB_DIR.parent


## JSON Conversion

### Solafune JSON format to COCO format

In [5]:
# input and output paths
input_path = REPO_ROOT / 'data/raw/JSONs/train_annotations.json'
output_path = REPO_ROOT / 'data/processed/JSONs/train_annotations_coco.json'

# read train annotations data
with open(input_path) as f:
    train_annotations = json.load(f)

# initialize COCO data structure
coco_data = {
    "images": [],
    "annotations": [],
    "categories": [
        {"id": 1, "name": "individual_tree", "supercategory": "tree"},
        {"id": 2, "name": "group_of_trees", "supercategory": "tree"},
    ]
}

# category mapping
category_map = {
    "individual_tree": 1,
    "group_of_trees": 2
}

# initialize annotation and image ID counters
annotation_id = 1
image_id = 1

# for each image...
for image in train_annotations["images"]:
    
    # add image metadata
    coco_data["images"].append(
        {
            "id": image_id,
            "file_name": image["file_name"],
            "width": image["width"],
            "height": image["height"]
        }
    )

    # for each annotation in this image
    for ann in image.get("annotations", []):
        # extract segmentation polygon
        segmentation = ann["segmentation"]

        # skip if fewer than 3 points (expected to cause errors later)
        if len(segmentation) < 6:
            continue

        # append annotation
        coco_data["annotations"].append(
            {
                "id": annotation_id,                                    # annotation ID
                "image_id": image_id,                                   # image ID
                "category_id": category_map[ann["class"]],              # category ID
                "segmentation": [segmentation],                         # segmentation polygon
                "area": 0,                                              # area (not used but setting anyway)
                "bbox": [                                               # bounding box
                    min(segmentation[::2]),                             # ... x
                    min(segmentation[1::2]),                            # ... y
                    max(segmentation[::2]) - min(segmentation[::2]),    # ... w
                    max(segmentation[1::2]) - min(segmentation[1::2])   # ... h
                ],
                "iscrowd": 0,                                           # is-crowded (not used but setting anyway)
                "score": ann.get("confidence_score", 1.0)               # confidence score (nonsense for ground-truth but setting anyway)
            }
        )

        # increment annotation ID counter
        annotation_id += 1

    # increment image ID counter
    image_id += 1

# ensure parent directory exists
output_path.parent.mkdir(parents=True, exist_ok=True)

# save output
with open(output_path, "w") as f:
    json.dump(coco_data, f, indent=2)

print(f"✅ Saved COCO annotations to: {output_path}")

✅ Saved COCO annotations to: /Users/mitchellpalmer/Projects/solafune-canopy-capstone-clean/data/processed/JSONs/train_annotations_coco.json


### COCO Format to YOLO Machine Learning Format

In [None]:
from ultralytics.data.converter import convert_coco
import shutil
import os

# Convert COCO annotations for instance segmentation
from ultralytics.data.converter import convert_coco
convert_coco (
    labels_dir=REPO_ROOT/'data/processed/JSONs', # Target is the COCO converted JSON file in 'Data' directory
    save_dir=REPO_ROOT/'data/temp_jsons', # Output is a YOLO compatible JSON  # Folder must not exist prior
    use_keypoints= False,
    use_segments=True
    
)

Annotations /Users/mitchellpalmer/Projects/solafune-canopy-capstone-clean/data/processed/JSONs/test_annotations_coco.json: 100%|██████████| 23/23 [00:00<00:00, 224.47it/s]
Annotations /Users/mitchellpalmer/Projects/solafune-canopy-capstone-clean/data/processed/JSONs/train_annotations_coco.json: 100%|██████████| 150/150 [00:00<00:00, 269.36it/s]

COCO data converted successfully.
Results saved to /Users/mitchellpalmer/Projects/solafune-canopy-capstone-clean/data/temp_labels





## Extract Images

In [7]:
import zipfile
import os
from pathlib import Path
import shutil

def unzip_to_folder(zip_path, extract_to):
    """
    Unzips a ZIP archive into a specified directory.
    """
    extract_to = Path(extract_to)
    extract_to.mkdir(parents=True, exist_ok=True)

    with zipfile.ZipFile(zip_path, 'r') as zip_ref:
        zip_ref.extractall(extract_to)

    macosx_folder = extract_to / '__MACOSX'
    if macosx_folder.exists():
        shutil.rmtree(macosx_folder)
        
    print(f"✅ Unzipped: {zip_path} → {extract_to}")

def move_images(source_dir, dest_dir, image_extensions={'.jpg', '.jpeg', '.png', '.tif', '.tiff'}):
    """
    Moves image files from source_dir to dest_dir (non-recursive).
    """
    source_dir = Path(source_dir)
    dest_dir = Path(dest_dir)
    dest_dir.mkdir(parents=True, exist_ok=True)

    moved = 0
    for file in source_dir.iterdir():
        if file.suffix.lower() in image_extensions:
            shutil.move(str(file), dest_dir / file.name)
            moved += 1
    print(f"✅ Moved {moved} images → {dest_dir}")


In [8]:
# Designate file paths and folders

# Ground Truth (gt) Data
gt_zip = REPO_ROOT/'data/raw/zips/train_images.zip'
gt_folder_path = REPO_ROOT/ 'data/temp_images'

# Unlabeled Prediction Data
unlabeled_pred_zip = REPO_ROOT / 'data/raw/zips/evaluation_images.zip'
unlabeled_pred_path = REPO_ROOT / 'data/processed/images/predict'

# Unzip data and extract

# Ground Truth (gt) Data
unzip_to_folder(gt_zip, gt_folder_path)

# Unlabeled Prediction Data
unzip_to_folder(unlabeled_pred_zip, unlabeled_pred_path)


✅ Unzipped: /Users/mitchellpalmer/Projects/solafune-canopy-capstone-clean/data/raw/zips/train_images.zip → /Users/mitchellpalmer/Projects/solafune-canopy-capstone-clean/data/temp_images
✅ Unzipped: /Users/mitchellpalmer/Projects/solafune-canopy-capstone-clean/data/raw/zips/evaluation_images.zip → /Users/mitchellpalmer/Projects/solafune-canopy-capstone-clean/data/processed/images/predict


## Data Split | Train / Evaluation

- Automatic dataset split with Python. Using transferable code for designated directory paths

- Training Data split 70:30 between **Training** and **Evaluation**

In [None]:
import os, random, shutil
from pathlib import Path

print("Begin Data Split")
# Paths
IMG_DIR = Path(REPO_ROOT / "data/temp_images")
LBL_DIR = Path(REPO_ROOT / "data/temp_jsons/labels/train_annotations_coco") #-- Check LBL_DIR path for accuracy. Errors May Occur 

OUT_DIR = Path(REPO_ROOT/"data/processed")
splits = {"train": 0.7, "val": 0.15, "test": 0.15}  # 70/15/15 split

# Collect all images
images = list(IMG_DIR.glob("*.jpg")) + list(IMG_DIR.glob("*.png")) + list(IMG_DIR.glob("*.tif")) # glob is from package Path
random.shuffle(images)

# Split indices
n = len(images)
train_end = int(splits["train"] * n)
val_end = train_end + int(splits["val"] * n)

datasets = {
    "train": images[:train_end],
    "val": images[train_end:val_end],
    "test": images[val_end:],
}

# Copy files into YOLO structure
for split, files in datasets.items():
    (OUT_DIR / "images" / split).mkdir(parents=True, exist_ok=True)
    (OUT_DIR / "labels" / split).mkdir(parents=True, exist_ok=True)

    for img in files:
        label = LBL_DIR / (img.stem + ".txt")
        shutil.copy(img, OUT_DIR / "images" / split / img.name)
        if label.exists():
            shutil.copy(label, OUT_DIR / "labels" / split / label.name)


try:
    test_label_path = Path('data/processed/labels/test')
    
    # Check if any .txt files exist in the directory
    if any(test_label_path.glob('*.txt')):
        print('Successful Data Split:\n  Train - 70%\n  Val - 15%\n  Test - 15%')
    else:
        raise FileNotFoundError('No .txt label files found in test directory.')

except Exception as e:
    print(f'Data Split Failed - {e}')

Begin Data Split
Completed Data Split       Train - 70%       Val - 15%      Test - 15%
