## Testing work

In [None]:
from google.colab import drive
drive.mount('/content/drive')

## Preprocess FoodSeg103

In [None]:
import os
import json
import base64
import zlib
import cv2

from PIL import Image
from io import BytesIO
import numpy as np
from tqdm import tqdm

def get_class_and_tag_id_mapping(meta_dir: str):
    with open(meta_dir, 'r') as file:
        meta_data = json.load(file)

    class_mapping = {}
    tag_mapping = {}
    count = 1
    for meta_cls in meta_data['classes']:
        class_mapping[meta_cls["id"]] = {"id": count, "name": meta_cls["title"]}
        count += 1

    for meta_tags in meta_data['tags']:
        tag_mapping[meta_tags["id"]] = {"id": count, "name": meta_tags["name"]}
        count += 1

    return class_mapping, tag_mapping

In [None]:
def decode_bitmap(bitmap_data, origin, size):
    compressed_data = base64.b64decode(bitmap_data)
    decoded = zlib.decompress(compressed_data)
    mask_img = Image.open(BytesIO(decoded)).convert("L")
    full_mask = Image.new("L", size, 0)
    full_mask.paste(mask_img, tuple(origin))
    return np.array(full_mask, dtype=np.uint8)

def extract_bbox(mask):
    pos = np.where(mask)
    if pos[0].size == 0 or pos[1].size == 0:
        print('No bbox')
        return None
    xmin = int(np.min(pos[1]))
    xmax = int(np.max(pos[1]))
    ymin = int(np.min(pos[0]))
    ymax = int(np.max(pos[0]))
    return [xmin, ymin, xmax - xmin, ymax - ymin]

def mask_to_coco_polygons(mask):
    contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    polygons = []
    for contour in contours:
        contour = contour.flatten().tolist()
        if len(contour) >= 6:  # at least 3 points
            polygons.append(contour)
        else:
            print("Contour length: ", len(contour))
    if not polygons:
        print('No polygons')
    return polygons


def convert_annotations(input_dir, class_id_mapping, tag_id_mapping):
    coco = {
        "images": [],
        "annotations": [],
        "categories": []
    }

    class_id_map = {}
    annotation_id = 1

    for filename in tqdm(os.listdir(input_dir + '/ann')):
        if not filename.endswith(".json"):
            continue

        ann_path = os.path.join(input_dir, f'ann/{filename}')
        image_path = os.path.join(input_dir, f'img/{filename.replace(".json", "")}')

        if not os.path.exists(image_path):
            continue

        with open(ann_path) as f:
            ann_data = json.load(f)

        width = ann_data["size"]["width"]
        height = ann_data["size"]["height"]

        coco["images"].append({
            "id": int(filename.replace(".jpg", "").replace(".json", "")),
            "file_name": filename.replace(".json", ""),
            "width": width,
            "height": height
        })

        for obj in ann_data.get("objects", []):
            if obj["geometryType"] != "bitmap":
                continue

            category_id = class_id_mapping[obj["classId"]]["id"]

            bitmap = obj["bitmap"]
            mask = decode_bitmap(bitmap["data"], bitmap["origin"], (width, height))
            bbox = extract_bbox(mask)
            polygons = mask_to_coco_polygons(mask)

            if polygons == []:
                continue

            coco["annotations"].append({
                "id": annotation_id,
                "image_id": int(filename.replace(".json", "").replace(".jpg", "")),
                "category_id": category_id,
                "bbox": bbox,
                "area": int(np.sum(mask > 0)),
                "iscrowd": 0,
                "segmentation": polygons
            })

            annotation_id += 1

            # Write categories if not there

            curr_cat = {
                "id": category_id,
                "name": class_id_mapping[obj["classId"]]["name"],
                "supercategory_id": tag_id_mapping[obj["tags"][0]["tagId"]]["id"],
                "supercategory": tag_id_mapping[obj["tags"][0]["tagId"]]["name"]
            }

            if curr_cat not in coco["categories"]:
              coco["categories"].append(curr_cat)

    return coco


# Main
if __name__ == "__main__":

    meta_dir = "/content/drive/MyDrive/foodseg103/meta.json"

    class_id_mapping, tag_id_mapping = get_class_and_tag_id_mapping(meta_dir)

    train_input_dir = "/content/drive/MyDrive/foodseg103/train"
    test_input_dir = "/content/drive/MyDrive/foodseg103/test"

    train_output_path = "/content/drive/MyDrive/foodseg103/train.json"
    test_output_path = "/content/drive/MyDrive/foodseg103/test.json"

    train_formatted_ann_data = convert_annotations(train_input_dir, class_id_mapping, tag_id_mapping)
    test_formatted_ann_data = convert_annotations(test_input_dir, class_id_mapping, tag_id_mapping)

    with open(train_output_path, "w") as f:
        json.dump(train_formatted_ann_data, f)

    with open(test_output_path, "w") as f:
        json.dump(test_formatted_ann_data, f)

    print(f"✅ COCO annotations saved to: {train_output_path} and {test_output_path}")

In [None]:
meta_dir = "/content/drive/MyDrive/foodseg103/meta.json"
class_id_mapping, tag_id_mapping = get_class_and_tag_id_mapping(meta_dir)

In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from matplotlib.patches import Polygon
from PIL import Image

def draw_image_with_boxes(image_path, annotations, categories):
    """
    Draws bounding boxes and category labels from COCO-style annotations.

    Args:
        image_path (str): Path to the image.
        annotations (list): List of annotations with:
            - bbox: [x, y, width, height]
            - category_id: int
            - segmentation: list of polygons (each polygon is a list of x,y coords)
            - image_id: int
        categories (list): List of dicts with 'id' and 'name' fields.
                           e.g., [{"id": 1, "name": "rice"}, ...]
    """
    # Build a mapping from category_id to name
    category_id_to_name = {cat["id"]: cat["name"] for cat in categories}

    # Load image
    image = Image.open(image_path).convert("RGB")
    filename = os.path.splitext(os.path.basename(image_path))[0]
    plt.figure(figsize=(8, 8))
    plt.imshow(image)
    ax = plt.gca()

    for ann in annotations:
        if ann["image_id"] != int(filename):
          continue
        bbox = ann["bbox"]
        category_id = ann["category_id"]
        label = category_id_to_name.get(category_id, f"ID {category_id}")

        # Draw rectangle
        rect = patches.Rectangle(
            (bbox[0], bbox[1]), bbox[2], bbox[3],
            linewidth=2, edgecolor='red', facecolor='none'
        )
        ax.add_patch(rect)

        # Draw label
        ax.text(bbox[0], bbox[1] - 5, label,
                color='white', backgroundcolor='red', fontsize=10)

        # Draw polygon masks
        if "segmentation" in ann:
            segmentations = ann["segmentation"]
            # COCO segmentation can be a list of polygons or a single polygon list
            # Ensure it's a list of polygons
            print(len(segmentations))
            print(segmentations)
            if isinstance(segmentations[0], list):
                polygons = segmentations
            else:
                polygons = [segmentations]

            for poly in polygons:
                # poly is a flat list: [x1, y1, x2, y2, ..., xn, yn]
                # Convert to Nx2 array of points
                poly_points = [(poly[i], poly[i+1]) for i in range(0, len(poly), 2)]
                polygon_patch = Polygon(poly_points, closed=True, linewidth=1,
                                        edgecolor='yellow', facecolor='yellow', alpha=0.4)
                ax.add_patch(polygon_patch)

    plt.axis("off")
    plt.tight_layout()
    plt.show()


In [None]:
with open("/content/drive/MyDrive/foodseg103/train.json", 'r') as f:
    train_formatted_ann_data = json.load(f)

In [None]:
draw_image_with_boxes(
    "/content/drive/MyDrive/foodseg103/train/img/00001080.jpg",
    train_formatted_ann_data["annotations"],
    train_formatted_ann_data["categories"])

# Official code part

#### Utils

In [None]:
import random
import torchvision.transforms.functional as F

class Compose:
    def __init__(self, transforms):
        self.transforms = transforms

    def __call__(self, image, target):
        for t in self.transforms:
            image, target = t(image, target)
        return image, target

class RandomHorizontalFlip:
    def __init__(self, prob=0.5):
        self.prob = prob

    def __call__(self, image, target):
        if random.random() < self.prob:
            # image must be a PIL image here
            image = F.hflip(image)

            width, _ = image.size  # PIL image size

            # Flip boxes
            if "boxes" in target:
                boxes = target["boxes"]
                boxes[:, [0, 2]] = width - boxes[:, [2, 0]]
                target["boxes"] = boxes

            # Flip masks (tensor)
            if "masks" in target:
                target["masks"] = target["masks"].flip(-1)

        return image, target

class ToTensor:
    def __call__(self, image, target):
        image = F.to_tensor(image)  # converts PIL image to tensor
        return image, target

def get_transform(train=True):
    transforms = []
    if train:
        transforms.append(RandomHorizontalFlip(0.5))
    transforms.append(ToTensor())
    return Compose(transforms)


#### Dataset

In [None]:
import os
import json
import numpy as np
from PIL import Image, ImageDraw
import torch
from torch.utils.data import Dataset
import torchvision.transforms.functional as F
from pycocotools.coco import COCO

class FoodSegJSONDataset(Dataset):
    def __init__(self, img_dir, ann_path, transforms=None):
        self.image_dir = os.path.join(img_dir, "img")
        self.coco = COCO(ann_path)
        self.image_ids = list(self.coco.imgs.keys())
        self.transforms = transforms


    def __len__(self):
        return len(self.image_ids)


    def __getitem__(self, idx):
        image_id = self.image_ids[idx]
        img_info = self.coco.loadImgs(image_id)[0]
        img_path = os.path.join(self.image_dir, img_info['file_name'])

        img = Image.open(img_path).convert("RGB")
        ann_ids = self.coco.getAnnIds(imgIds=image_id)
        anns = self.coco.loadAnns(ann_ids)

        boxes = []
        labels = []
        masks = []

        for ann in anns:
            if ('segmentation' not in ann) or ('bbox' not in ann):
                continue

            masks.append(self.coco.annToMask(ann))
            boxes.append(ann['bbox'])
            labels.append(ann['category_id'])

        if len(masks) == 0:
            return self.__getitem__((idx + 1) % len(self))  # skip bad data

        masks = torch.as_tensor(np.array(masks), dtype=torch.uint8)
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)

        # COCO bbox is [x, y, width, height] -> convert to [x1, y1, x2, y2]
        boxes[:, 2] = boxes[:, 0] + boxes[:, 2]
        boxes[:, 3] = boxes[:, 1] + boxes[:, 3]

        target = {
            "boxes": boxes,
            "labels": labels,
            "masks": masks,
            "image_id": torch.tensor([image_id]),
            "area": torch.tensor([ann["area"] for ann in anns], dtype=torch.float32),
            "iscrowd": torch.tensor([ann.get("iscrowd", 0) for ann in anns], dtype=torch.int64)
        }

        if self.transforms:
            img, target = self.transforms(img, target)

        return img, target




#### Engine

In [None]:
from tqdm import tqdm


def train_one_epoch(model, optimiser, data_loader, device, epoch):
    model.train()
    google_drive_path = '/content/drive/MyDrive/foodseg103'

    count = 0
    for images, targets in tqdm(data_loader, desc=f"Epoch {epoch}"):
        images = list(img.to(device) for img in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        optimiser.zero_grad()
        losses.backward()
        optimiser.step()

        if count%10 == 0:
          os.makedirs(google_drive_path + "/outputs/models", exist_ok=True)
          torch.save(model.state_dict(), f"{google_drive_path}/outputs/models/model_epoch_{epoch}_checkpoint_{count}.pth")

        count += 1



    print(f"Loss: {losses.item():.4f}")

#### Models

In [None]:
import torchvision

from torchvision.models.detection import (
    maskrcnn_resnet50_fpn_v2,
    MaskRCNN_ResNet50_FPN_V2_Weights,
    faster_rcnn,
    mask_rcnn)


def get_model(num_classes):
    model = maskrcnn_resnet50_fpn_v2(weights=MaskRCNN_ResNet50_FPN_V2_Weights.COCO_V1)

    # print(model.roi_heads)
    # print()
    # print()
    # print()

    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = faster_rcnn.FastRCNNPredictor(in_features, num_classes)
    in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels

    hidden_layer = 256
    model.roi_heads.mask_predictor = mask_rcnn.MaskRCNNPredictor(in_features_mask, hidden_layer, num_classes)

    # print(model.roi_heads)
    # print()

    return model

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import torch
import random

def show_image_with_masks(img, pred, categories=None, score_thresh=0.5):
    img = img.permute(1,2,0).numpy()

    plt.figure(figsize=(10,10))
    plt.imshow(img)

    ax = plt.gca()

    masks = pred["masks"]
    boxes = pred["boxes"]
    labels = pred["labels"]
    scores = pred["scores"]

    for i in range(len(masks)):
        if scores[i] < score_thresh:
            continue

        mask = masks[i,0].mul(255).byte().cpu().numpy()
        color = np.random.rand(3,)

        ax.contour(mask, levels=[0.5], colors=[color])

        x1, y1, x2, y2 = boxes[i].detach().cpu().numpy()
        ax.add_patch(plt.Rectangle((x1, y1), x2 - x1, y2 - y1,
                                   fill=False, color=color, linewidth=2))
        label_id = labels[i].item()
        label_name = categories[label_id] if categories and label_id in categories else str(label_id)
        ax.text(x1, y1, f"{label_id}:{scores[i]:.2f}", color=color, fontsize=12,
                bbox=dict(facecolor='white', edgecolor=color, boxstyle='round,pad=0.2'))

    plt.axis("off")
    plt.tight_layout()
    plt.show()




#### Main.py

In [None]:
# need class_id_mapping mapping

In [None]:
import torch
import json
import os

from torch.utils.data import DataLoader

# from dataset.foodseg_json_dataset import FoodSegJSONDataset
# from models.mask_rcnn import get_model
# from utils.transforms import get_transform
# from engine.train import train_one_epoch
# from utils.visualise import show_image_with_masks


def collate_fn(batch):
    return tuple(zip(*batch))


def load_categories(meta_path):
    with open(meta_path) as f:
        meta = json.load(f)
    return {cat['id']: cat['title'] for cat in meta['classes']}


def main():
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"device: {device}")

    # train_data_path = "data/foodseg103/train"
    # test_data_path = "data/foodseg103/test"
    # meta_path = "data/foodseg103/meta.json"

    google_drive_path = '/content/drive/MyDrive/foodseg103'

    train_data_path = os.path.join(google_drive_path, 'train')
    test_data_path = os.path.join(google_drive_path, 'test')
    meta_path = os.path.join(google_drive_path, 'meta.json')

    train_ann_path = os.path.join(google_drive_path, 'train.json')
    test_ann_path = os.path.join(google_drive_path, 'test.json')

    # Dataset and Dataloader
    dataset = FoodSegJSONDataset(train_data_path, train_ann_path, transforms=get_transform(train=True))
    data_loader = DataLoader(dataset, batch_size=3, shuffle=True, collate_fn=collate_fn)

    print("len", len(dataset))

    test_dataset = FoodSegJSONDataset(test_data_path, test_ann_path, transforms=get_transform(train=False))
    # test_loader = DataLoader(test_dataset, batch_size=1, shuffle=True, collate_fn=collate_fn)

    # Model
    num_classes = len(class_id_mapping.keys()) + 1 # background + category count
    print()
    print(f"num classes: {num_classes}")
    model = get_model(num_classes).to(device)

    # Optimizer
    params = [p for p in model.parameters() if p.requires_grad]
    optimiser = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)

    # Training Loop
    for epoch in range(1):
        train_one_epoch(model, optimiser, data_loader, device, epoch)

    # Visualize predictions on test set
    model.eval()
    with torch.no_grad():
        img, _ = test_dataset[0]
        pred = model([img.to(device)])[0]

    show_image_with_masks(img, pred, class_id_mapping)

In [None]:
main()