In [1]:
import os
import json
from PIL import Image
import numpy as np
import torch
import torch.utils.data
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor
from torchvision.models.detection import fasterrcnn_resnet50_fpn, maskrcnn_resnet50_fpn
from torchvision import transforms as T
from tqdm import tqdm
import math
import datetime
import time
import collections
import sys
import random
import tempfile

from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
import pycocotools.mask as mask_util

In [2]:
BASE_DIR = "D:/DS_repo/Sweet-Chili/dataset"

IMG_DIR_TRAIN = "D:/DS_repo/Sweet-Chili/dataset/images/train"
ANNOTATION_FILE_TRAIN = "D:/DS_repo/Sweet-Chili/dataset/annotations/instance-train.json"
IMG_DIR_VAL = "D:/DS_repo/Sweet-Chili/dataset/images/val"
ANNOTATION_FILE_VAL = "D:/DS_repo/Sweet-Chili/dataset/annotations/instances-val.json"

MODEL_TYPE = 'mask_rcnn'

In [3]:
#parameters:
BATCH_SIZE = 2
LEARNING_RATE = 0.005
NUM_EPOCHS = 10
MOMENTUM = 0.9
WEIGHT_DECAY = 0.0005
LR_STEP_SIZE = 3
LR_GAMMA = 0.1
PRINT_FREQ = 50

SAVE_MODEL_PATH = f"{MODEL_TYPE}_chili_model.pth"
DEVICE = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

print(f"Using device: {DEVICE}")
if DEVICE.type == 'cuda':
    print(f"CUDA version: {torch.version.cuda}")
    print(f"GPU Name: {torch.cuda.get_device_name(0)}")

Using device: cuda
CUDA version: 12.6
GPU Name: NVIDIA GeForce GTX 1650


In [4]:
class ChiliDataset(torch.utils.data.Dataset):
    def __init__(self, root, annotation_file, is_train=False):
        self.root = root
        self.is_train = is_train
        self.transforms = get_transform()
        with open(annotation_file, 'r') as f:
            self.coco = json.load(f)

        self.imgs = self.coco['images']
        self.annotations = self.coco['annotations']
        self.categories_info = self.coco['categories']

        self.category_id_to_name = {cat['id']: cat['name'] for cat in self.categories_info}
        self.category_name_to_new_label = {"background": 0}
        self.new_label_to_category_name = {0: "background"}
        
        sorted_categories = sorted(self.categories_info, key=lambda x: x['id'])
        
        for new_idx, cat in enumerate(sorted_categories):
            self.category_name_to_new_label[cat['name']] = new_idx + 1
            self.new_label_to_category_name[new_idx + 1] = cat['name']
        
        self.num_classes = len(self.new_label_to_category_name)

        img_ids_with_annotations = set(ann['image_id'] for ann in self.annotations)
        self.ids = [img['id'] for img in self.imgs if img['id'] in img_ids_with_annotations]

        self.img_id_to_info = {img['id']: img for img in self.imgs}

        self.img_id_to_annotations = {}
        for ann in self.annotations:
            image_id = ann['image_id']
            if image_id not in self.img_id_to_annotations:
                self.img_id_to_annotations[image_id] = []
            self.img_id_to_annotations[image_id].append(ann)
    
    def __getitem__(self, idx):
        img_id = self.ids[idx]
        img_info = self.img_id_to_info[img_id]
        path = os.path.join(self.root, img_info['file_name'])

        img = Image.open(path).convert("RGB")
        original_width, original_height = img.size
        
        annotations = self.img_id_to_annotations.get(img_id, [])

        boxes = []
        labels = []
        masks = []
        area = []
        iscrowd = []

        for ann in annotations:
            xmin, ymin, width_bbox, height_bbox = ann['bbox']
            if width_bbox <= 0 or height_bbox <= 0:
                continue

            boxes.append([xmin, ymin, xmin + width_bbox, ymin + height_bbox])
            original_category_name = self.category_id_to_name[ann['category_id']]
            labels.append(self.category_name_to_new_label[original_category_name])
            area.append(ann['area'])
            iscrowd.append(ann['iscrowd'])

            if MODEL_TYPE == 'mask_rcnn' and mask_util is not None:
                if isinstance(ann['segmentation'], list):
                    polygons = [np.array(p, dtype=np.float32).flatten().tolist() for p in ann['segmentation']]
                    rles = mask_util.frPyObjects(polygons, original_height, original_width)
                    mask = mask_util.decode(rles)
                    if mask.ndim > 2:  # Ensure mask is 2D
                        mask = mask.squeeze()  # Remove extra dimensions
                elif isinstance(ann['segmentation'], dict):
                    mask = mask_util.decode(ann['segmentation'])
                    if mask.ndim > 2:  # Ensure mask is 2D
                        mask = mask.squeeze()
                else:
                    mask = np.zeros((original_height, original_width), dtype=np.uint8)
                    print(f"Warning: Unknown segmentation format for image {img_id}. Using empty mask.")
                mask = mask.astype(np.uint8)
                
                if mask.shape[0] == 0 or mask.shape[1] == 0:
                    print(f"WARNING: Degenerate mask shape detected for image {img_id}: {mask.shape}. Skipping mask.")
                    continue  # Skip this annotation if mask is degenerate
                if mask.shape[0] == 1 or mask.shape[1] == 1:
                    print(f"WARNING: 1-pixel dimension mask detected for image {img_id}: {mask.shape}. This might cause issues downstream.")
                
                masks.append(mask)
            elif MODEL_TYPE == 'mask_rcnn' and mask_util is None:
                mask = np.zeros((original_height, original_width), dtype=np.uint8)
                masks.append(mask)
                if not hasattr(self, '_mask_warning_printed'):
                    print("WARNING: pycocotools not found. Mask generation for training/inference will use dummy masks.")
                    self._mask_warning_printed = True

        img_tensor = self.transforms(img)

        boxes_tensor = torch.as_tensor(boxes, dtype=torch.float32)
        labels_tensor = torch.as_tensor(labels, dtype=torch.int64)
        area_tensor = torch.as_tensor(area, dtype=torch.float32)
        iscrowd_tensor = torch.as_tensor(iscrowd, dtype=torch.int64)
        
        target = {}
        target["boxes"] = boxes_tensor
        target["labels"] = labels_tensor
        target["image_id"] = torch.tensor([img_id])
        target["area"] = area_tensor
        target["iscrowd"] = iscrowd_tensor
        
        if MODEL_TYPE == 'mask_rcnn':
            if len(masks) > 0:
                masks = [m.squeeze() if m.ndim > 2 else m for m in masks]
                masks_tensor = torch.as_tensor(np.array(masks), dtype=torch.uint8)
                if masks_tensor.ndim == 4:
                    masks_tensor = masks_tensor.squeeze(-1)
        else:
            masks_tensor = torch.empty((0, original_height, original_width), dtype=torch.uint8)
        target["masks"] = masks_tensor

        if self.is_train and random.random() < 0.5:
            img_tensor = T.functional.hflip(img_tensor)
            
            boxes_flipped = target["boxes"].clone()
            boxes_flipped[:, 0] = original_width - target["boxes"][:, 2]
            boxes_flipped[:, 2] = original_width - target["boxes"][:, 0]
            target["boxes"] = boxes_flipped

            if MODEL_TYPE == 'mask_rcnn' and target["masks"].numel() > 0:
                target["masks"] = T.functional.hflip(target["masks"])

        if target["boxes"].numel() > 0:
            widths = target["boxes"][:, 2] - target["boxes"][:, 0]
            heights = target["boxes"][:, 3] - target["boxes"][:, 1]

            keep_indices = (widths > 0) & (heights > 0)
            
            if not torch.all(keep_indices):
                print(f"WARNING: Degenerate bounding box(es) detected after transformations for image {img_id}.")
                print(f"Original boxes count: {target['boxes'].shape[0]}, Keeping: {torch.sum(keep_indices).item()}")
                
                target["boxes"] = target["boxes"][keep_indices]
                target["labels"] = target["labels"][keep_indices]
                target["area"] = target["area"][keep_indices]
                target["iscrowd"] = target["iscrowd"][keep_indices]
                if MODEL_TYPE == 'mask_rcnn' and target["masks"].numel() > 0:
                    target["masks"] = target["masks"][keep_indices]

        if target["boxes"].numel() == 0:
            print(f"Image {img_id} has no valid objects after filtering. Providing an empty target.")
            target["boxes"] = torch.empty((0, 4), dtype=torch.float32)
            target["labels"] = torch.empty((0,), dtype=torch.int64)
            target["area"] = torch.empty((0,), dtype=torch.float32)
            target["iscrowd"] = torch.empty((0,), dtype=torch.int64)
            if MODEL_TYPE == 'mask_rcnn':
                target["masks"] = torch.empty((0, img_tensor.shape[1], img_tensor.shape[2]), dtype=torch.uint8)

        return img_tensor, target

    def __len__(self):
        return len(self.ids)

def get_transform():
    transforms = []
    transforms.append(T.PILToTensor())
    transforms.append(T.ConvertImageDtype(torch.float))
    return T.Compose(transforms)

In [5]:
def get_model(num_classes, model_type='mask_rcnn'):
    from torchvision.models.detection import MaskRCNN_ResNet50_FPN_Weights, FasterRCNN_ResNet50_FPN_Weights
    min_size = 400
    max_size = 1000

    if model_type == 'mask_rcnn':
        model = maskrcnn_resnet50_fpn(
            weights=MaskRCNN_ResNet50_FPN_Weights.COCO_V1,
            min_size=min_size,
            max_size=max_size 
        )

        in_features = model.roi_heads.box_predictor.cls_score.in_features
        model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

        in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
        hidden_layer = 256
        model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask,
                                                           hidden_layer,
                                                           num_classes)
    elif model_type == 'faster_rcnn':
        model = fasterrcnn_resnet50_fpn(
            weights=FasterRCNN_ResNet50_FPN_Weights.COCO_V1,
            min_size=min_size, # <--- ADD THIS
            max_size=max_size  # <--- ADD THIS
        )
        in_features = model.roi_heads.box_predictor.cls_score.in_features
        model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    else:
        raise ValueError(f"Unknown model type: {model_type}. Choose 'mask_rcnn' or 'faster_rcnn'.")
    
    return model

def collate_fn(batch):
    return tuple(zip(*batch))

In [None]:
class SmoothedValue(object):

    def __init__(self, window_size=20, fmt=None):
        if fmt is None:
            fmt = "{avg:.4f} ({global_avg:.4f})"
        self.deque = collections.deque(maxlen=window_size)
        self.total = 0.0
        self.count = 0
        self.fmt = fmt

    def update(self, value, n=1):
        self.deque.append(value)
        self.count += n
        self.total += value

    def synchronize_between_processes(self):
        """
        Warning: does not synchronize smoothed values across processes.
        """
        return

    @property
    def median(self):
        return np.median(list(self.deque))

    @property
    def avg(self):
        return np.mean(list(self.deque))

    @property
    def global_avg(self):
        return self.total / self.count

    @property
    def max(self):
        return max(self.deque)

    @property
    def value(self):
        return self.deque[-1]

    def __str__(self):
        return self.fmt.format(
            median=self.median,
            avg=self.avg,
            global_avg=self.global_avg,
            max=self.max,
            value=self.value)

class MetricLogger(object):
    def __init__(self, delimiter="\t"):
        self.meters = collections.defaultdict(SmoothedValue)
        self.delimiter = delimiter

    def update(self, **kwargs):
        for k, v in kwargs.items():
            if isinstance(v, torch.Tensor):
                v = v.item()
            assert isinstance(v, (float, int))
            self.meters[k].update(v)

    def __getattr__(self, attr):
        if attr in self.meters:
            return self.meters[attr]
        if attr in self.__dict__:
            return self.__dict__[attr]
        raise AttributeError(f"'{type(self).__name__}' object has no attribute '{attr}'")

    def __str__(self):
        loss_str = []
        for name, meter in self.meters.items():
            loss_str.append(
                f"{name}: {str(meter)}"
            )
        return self.delimiter.join(loss_str)

    def synchronize_between_processes(self):
        for meter in self.meters.values():
            meter.synchronize_between_processes()

    def add_meter(self, name, meter):
        self.meters[name] = meter

    def log_every(self, iterable, print_freq, header=None):
        i = 0
        if not header:
            header = ''
        start_time = time.time()
        end = time.time()
        mlogger = self
        for obj in iterable:
            data_time = time.time() - end
            yield obj
            batch_time = time.time() - end
            end = time.time()
            if i % print_freq == 0 or i == len(iterable) - 1:
                eta_seconds = mlogger.batch_time.global_avg * (len(iterable) - i)
                eta_string = str(datetime.timedelta(seconds=int(eta_seconds)))
                print(mlogger.delimiter.join([
                    f"{header}",
                    f"eta: {eta_string}",
                    f"{str(mlogger)}",
                    f"data: {data_time:.4f}",
                    f"batch: {batch_time:.4f}",
                    f"time: {end - start_time:.4f}"
                ]))
            i += 1
        total_time = time.time() - start_time
        total_time_str = str(datetime.timedelta(seconds=int(total_time)))
        print(f"{header} Total time: {total_time_str} ({total_time:.4f}s over {len(iterable)} iterations)")


def reduce_dict(input_dict):
    """
    Args:
        input_dict (dict): all the values in the dict are torch.Tensor
    Returns:
        a new dict with the reduced values
    """
    world_size = 1
    if world_size < 2:
        return input_dict
    with torch.no_grad():
        names = []
        values = []
        for k in sorted(input_dict.keys()):
            names.append(k)
            values.append(input_dict[k])
        values = torch.stack(values, dim=0)
        values = values / world_size
        reduced_dict = {k: v for k, v in zip(names, values)}
    return reduced_dict

def prepare_for_coco_eval(predictions, targets, original_sizes, new_label_to_category_name, dataset_ref):
    coco_results = []
    for img_idx, (output, target) in enumerate(zip(predictions, targets)):
        image_id = target["image_id"].item()
        
        original_width, original_height = original_sizes[img_idx]

        boxes = output["boxes"].tolist()
        scores = output["scores"].tolist()
        labels = output["labels"].tolist()

        if MODEL_TYPE == 'mask_rcnn' and "masks" in output:
            masks = output["masks"]
            masks = masks.cpu().numpy()
            
            for i, (box, score, label, mask) in enumerate(zip(boxes, scores, labels, masks)):
                predicted_category_name = new_label_to_category_name.get(label, "unknown")
                
                original_coco_category_id = None
                for cat_id, cat_name in dataset_ref.category_id_to_name.items():
                    if cat_name == predicted_category_name:
                        original_coco_category_id = cat_id
                        break
                
                if original_coco_category_id is None:
                    original_coco_category_id = label

                mask = mask[0] 
                mask = Image.fromarray((mask * 255).astype(np.uint8))
                mask = mask.resize((original_width, original_height), Image.NEAREST)
                mask = np.array(mask) > 128 

                rle = mask_util.encode(np.asfortranarray(mask))
                rle['counts'] = rle['counts'].decode('utf-8') 

                coco_results.append(
                    {
                        "image_id": image_id,
                        "category_id": original_coco_category_id,
                        "bbox": box,
                        "score": score,
                        "segmentation": rle,
                    }
                )
        else: # Faster R-CNN or no masks
            for i, (box, score, label) in enumerate(zip(boxes, scores, labels)):
                predicted_category_name = new_label_to_category_name.get(label, "unknown")
                original_coco_category_id = None
                for cat_id, cat_name in dataset_ref.category_id_to_name.items():
                    if cat_name == predicted_category_name:
                        original_coco_category_id = cat_id
                        break
                if original_coco_category_id is None:
                    original_coco_category_id = label

                coco_results.append(
                    {
                        "image_id": image_id,
                        "category_id": original_coco_category_id,
                        "bbox": box,
                        "score": score,
                    }
                )
    return coco_results

class CocoEvaluator:
    def __init__(self, coco_gt, iou_types):
        assert isinstance(iou_types, (list, tuple))
        self.coco_gt = coco_gt
        self.iou_types = iou_types
        self.coco_eval = {}
        for iou_type in iou_types:
            self.coco_eval[iou_type] = COCOeval(coco_gt, iouType=iou_type)
        self.img_ids = []
        self.eval_imgs = {k: [] for k in iou_types}

    def update(self, coco_results):
        img_ids = [res["image_id"] for res in coco_results]
        self.img_ids.extend(img_ids)

        # Save coco_results to a temporary JSON file
        coco_dt = COCO()
        if coco_results:
            with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as temp_file:
                json.dump(coco_results, temp_file)
                temp_file_path = temp_file.name
            try:
                coco_dt = self.coco_gt.loadRes(temp_file_path)
            finally:
                # Clean up the temporary file
                os.remove(temp_file_path)
        else:
            coco_dt = COCO()  # Empty COCO object if no results

        for iou_type in self.iou_types:
            self.coco_eval[iou_type].cocoDt = coco_dt
            self.coco_eval[iou_type].params.imgIds = list(np.unique(img_ids))
            self.coco_eval[iou_type].evaluate()

    def synchronize_between_processes(self):
        pass

    def accumulate(self):
        for coco_eval in self.coco_eval.values():
            coco_eval.accumulate()

    def summarize(self):
        for iou_type, coco_eval in self.coco_eval.items():
            print(f"IoU metric: {iou_type}")
            coco_eval.summarize()
            print("-" * 20)

def train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq):
    model.train()
    metric_logger = MetricLogger(delimiter="  ")
    metric_logger.add_meter('lr', SmoothedValue(window_size=1, fmt='{value:.6f}'))
    metric_logger.add_meter('batch_time', SmoothedValue(window_size=20, fmt='{avg:.4f}'))
    metric_logger.add_meter('data_time', SmoothedValue(window_size=20, fmt='{avg:.4f}'))
    header = f'Epoch: [{epoch}]'

    lr_scheduler = None
    if epoch == 0:
        warmup_factor = 1.0 / 1000
        warmup_iters = min(1000, len(data_loader) - 1)

        lr_scheduler = torch.optim.lr_scheduler.LinearLR(
            optimizer, start_factor=warmup_factor, total_iters=warmup_iters
        )

    for images, targets in metric_logger.log_every(data_loader, print_freq, header):
        data_time_start = time.time()
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        for i, t in enumerate(targets):
            if t["boxes"].numel() == 0:
                print(f"DEBUG: Batch {metric_logger.meters['batch_time'].count}, Image {i}: No boxes found in target.")
            else:
                widths = t["boxes"][:, 2] - t["boxes"][:, 0]
                heights = t["boxes"][:, 3] - t["boxes"][:, 1]
                if not torch.all((widths > 0) & (heights > 0)):
                    print(f"DEBUG: Batch {metric_logger.meters['batch_time'].count}, Image {i}: Degenerate boxes found (width/height <= 0).")
                    print(f"Boxes: {t['boxes']}")
                
                if MODEL_TYPE == 'mask_rcnn' and "masks" in t and t["masks"].numel() > 0:
                    if t["masks"].ndim != 3 or t["masks"].shape[0] != t["boxes"].shape[0]:
                        print(f"DEBUG: Batch {metric_logger.meters['batch_time'].count}, Image {i}: Mask shape mismatch with boxes or incorrect dimensions: {t['masks'].shape}")
                    if t["masks"].shape[1] == 0 or t["masks"].shape[2] == 0:
                        print(f"DEBUG: Batch {metric_logger.meters['batch_time'].count}, Image {i}: Degenerate mask dimensions (H or W is 0): {t['masks'].shape}")

        data_time = time.time() - data_time_start
        metric_logger.update(data_time=data_time)

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        loss_dict_reduced = reduce_dict(loss_dict)
        losses_reduced = sum(loss for loss in loss_dict_reduced.values())

        loss_value = losses_reduced.item()

        if not math.isfinite(loss_value):
            print(f"Loss is {loss_value}, stopping training")
            print(loss_dict_reduced)
            sys.exit(1)

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        if lr_scheduler is not None:
            lr_scheduler.step()

        metric_logger.update(loss=losses_reduced, **loss_dict_reduced)
        metric_logger.update(lr=optimizer.param_groups[0]["lr"])
        metric_logger.update(batch_time=time.time() - data_time_start)

    return metric_logger.meters["loss"].global_avg

@torch.no_grad()
def evaluate(model, data_loader, device, coco_api=None):
    model.eval()
    metric_logger = MetricLogger(delimiter="  ")
    metric_logger.add_meter('batch_time', SmoothedValue(window_size=20, fmt='{avg:.4f}'))
    metric_logger.add_meter('data_time', SmoothedValue(window_size=20, fmt='{avg:.4f}'))
    metric_logger.add_meter('model_time', SmoothedValue(window_size=20, fmt='{avg:.4f}'))
    header = 'Test:'

    coco_gt = coco_api 
    iou_types = ["bbox"]
    if MODEL_TYPE == 'mask_rcnn':
        iou_types.append("segm")

    coco_evaluator = CocoEvaluator(coco_gt, iou_types)

    for images, targets in metric_logger.log_every(data_loader, 100, header):
        data_time_start = time.time()
        images = list(img.to(device) for img in images)
        
        original_sizes = []
        for t in targets:
            img_id = t["image_id"].item()
            img_info = data_loader.dataset.img_id_to_info[img_id]
            original_sizes.append((img_info['width'], img_info['height']))

        metric_logger.update(data_time=time.time() - data_time_start)

        torch.cuda.synchronize(device)
        model_time = time.time()
        outputs = model(images)

        outputs = [{k: v.to(torch.device('cpu')) for k, v in t.items()} for t in outputs]
        model_time = time.time() - model_time

        res = prepare_for_coco_eval(outputs, targets, original_sizes, 
                                    data_loader.dataset.new_label_to_category_name, 
                                    data_loader.dataset)
        coco_evaluator.update(res)
        metric_logger.update(model_time=model_time)
        metric_logger.update(batch_time=time.time() - data_time_start)

    metric_logger.synchronize_between_processes()
    coco_evaluator.synchronize_between_processes()

    coco_evaluator.accumulate()
    coco_evaluator.summarize()
    
    return coco_evaluator

In [7]:
def train_and_evaluate_model():
    if not os.path.exists(ANNOTATION_FILE_TRAIN):
        print(f"Error: Training annotation file not found at {ANNOTATION_FILE_TRAIN}")
        return

    with open(ANNOTATION_FILE_TRAIN, 'r') as f:
        train_coco_data = json.load(f)
    
    all_category_names = [cat['name'] for cat in train_coco_data['categories']]
    ALL_CATEGORIES = ["background"] + sorted(all_category_names)
    NUM_CLASSES = len(ALL_CATEGORIES)

    print(f"Detected categories: {ALL_CATEGORIES}")
    print(f"Number of classes (including background): {NUM_CLASSES}")

    dataset_train = ChiliDataset(IMG_DIR_TRAIN, ANNOTATION_FILE_TRAIN, is_train=True)
    dataset_val = ChiliDataset(IMG_DIR_VAL, ANNOTATION_FILE_VAL, is_train=False)

    data_loader_train = torch.utils.data.DataLoader(
        dataset_train, batch_size=BATCH_SIZE, shuffle=True, num_workers=0,
        collate_fn=collate_fn
    )
    data_loader_val = torch.utils.data.DataLoader(
        dataset_val, batch_size=BATCH_SIZE, shuffle=False, num_workers=0,
        collate_fn=collate_fn
    )

    model = get_model(NUM_CLASSES, MODEL_TYPE)
    model.to(DEVICE)

    params = [p for p in model.parameters() if p.requires_grad]
    optimizer = torch.optim.SGD(params, lr=LEARNING_RATE,
                                momentum=MOMENTUM, weight_decay=WEIGHT_DECAY)

    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                                   step_size=LR_STEP_SIZE,
                                                   gamma=LR_GAMMA)

    print(f"Starting training for {MODEL_TYPE} model...")
    for epoch in range(NUM_EPOCHS):
        train_loss = train_one_epoch(model, optimizer, data_loader_train, DEVICE, epoch, print_freq=PRINT_FREQ)
        lr_scheduler.step()

        print(f"Epoch {epoch+1} training loss: {train_loss:.4f}")

        if COCO is not None and os.path.exists(ANNOTATION_FILE_VAL):
            print(f"Evaluating on validation set for Epoch {epoch+1}...")
            coco_val_gt = COCO(ANNOTATION_FILE_VAL)
            evaluator = evaluate(model, data_loader_val, DEVICE, coco_val_gt)
        else:
            print("Skipping validation evaluation: pycocotools not available or validation annotation file missing.")

    print("Training complete.")
    torch.save(model.state_dict(), SAVE_MODEL_PATH)
    print(f"Model saved to {SAVE_MODEL_PATH}")

In [8]:
train_and_evaluate_model()

Detected categories: ['background', 'Bacterial Spot', 'Cercospora Spot', 'Curl Virus', 'Dry chili', 'Flower', 'Green chili', 'Healthy Leaf', 'Nutrition Deficiency', 'Red chili', 'Rotten chili', 'White Spot']
Number of classes (including background): 12
Starting training for mask_rcnn model...
Epoch: [0]  eta: 0:14:17  lr: 0.000014  batch_time: 1.5587  data_time: 0.0776  loss: 4.3229 (4.3229)  loss_classifier: 2.8170 (2.8170)  loss_box_reg: 0.1221 (0.1221)  loss_mask: 1.3569 (1.3569)  loss_objectness: 0.0247 (0.0247)  loss_rpn_box_reg: 0.0022 (0.0022)  data: 0.1915  batch: 1.7502  time: 1.7502
Epoch: [0]  eta: 0:03:23  lr: 0.000469  batch_time: 0.3778  data_time: 0.0093  loss: 1.1143 (2.2725)  loss_classifier: 0.1630 (0.7407)  loss_box_reg: 0.0848 (0.0981)  loss_mask: 0.6918 (1.0263)  loss_objectness: 0.1497 (0.3635)  loss_rpn_box_reg: 0.0249 (0.0439)  data: 0.0666  batch: 0.4334  time: 26.5934
Epoch: [0]  eta: 0:02:57  lr: 0.000924  batch_time: 0.3764  data_time: 0.0071  loss: 0.8624 (

KeyError: 'annotations'

In [1]:
def predict_and_visualize(image_path, model_path, dataset_for_labels, output_dir="output", score_threshold=0.7):
    num_classes = dataset_for_labels.num_classes
    model = get_model(num_classes, MODEL_TYPE)
    model.load_state_dict(torch.load(model_path, map_location=DEVICE))
    model.to(DEVICE)
    model.eval()

    img = Image.open(image_path).convert("RGB")
    img_tensor = T.Compose([T.PILToTensor(), T.ConvertImageDtype(torch.float)])(img)
    img_tensor = img_tensor.to(DEVICE)

    with torch.no_grad():
        prediction = model([img_tensor])

    prediction = prediction[0]

    scores = prediction['scores'].cpu().numpy()
    boxes = prediction['boxes'].cpu().numpy()
    labels = prediction['labels'].cpu().numpy()
    
    keep = scores > score_threshold
    boxes = boxes[keep]
    labels = labels[keep]
    scores = scores[keep]

    import matplotlib.pyplot as plt
    import matplotlib.patches as patches
    import random

    os.makedirs(output_dir, exist_ok=True)
    output_image_path = os.path.join(output_dir, os.path.basename(image_path).replace(".JPG", "_segmented.JPG"))

    fig, ax = plt.subplots(1, figsize=(12, 12))
    ax.imshow(img)

    def get_random_color():
        return (random.random(), random.random(), random.random(), 0.6)

    for i in range(len(boxes)):
        box = boxes[i]
        label_idx = labels[i]
        score = scores[i]
        
        class_name = dataset_for_labels.new_label_to_category_name.get(label_idx, f"Unknown_{label_idx}")

        rect = patches.Rectangle((box[0], box[1]), box[2] - box[0], box[3] - box[1],\
                                 linewidth=2, edgecolor='r', facecolor='none')
        ax.add_patch(rect)

        ax.text(box[0], box[1] - 10, f"{class_name}: {score:.2f}",
                bbox=dict(facecolor='white', alpha=0.7), fontsize=8, color='black')

        if MODEL_TYPE == 'mask_rcnn' and "masks" in prediction:
            masks = prediction['masks'].cpu().numpy()
            mask = masks[keep][i, 0]
            
            mask = (mask > 0.5).astype(np.uint8)

            colored_mask = np.zeros((mask.shape[0], mask.shape[1], 4)) # RGBA
            color = get_random_color()
            colored_mask[mask == 1] = color
            ax.imshow(colored_mask)

    ax.axis('off')
    plt.savefig(output_image_path, bbox_inches='tight', pad_inches=0)
    plt.close(fig)
    print(f"Segmented image saved to {output_image_path}")


if os.path.exists(ANNOTATION_FILE_TRAIN):
    with open(ANNOTATION_FILE_TRAIN, 'r') as f:
        train_coco_data_inference = json.load(f)
    
    all_category_names_for_inference = [cat['name'] for cat in train_coco_data_inference['categories']]
    ALL_CATEGORIES_FOR_INFERENCE = ["background"] + sorted(all_category_names_for_inference)
    
    dummy_dataset_for_labels = ChiliDataset(IMG_DIR_TRAIN, ANNOTATION_FILE_TRAIN) 
    
    example_image_filename = "Dry chili00002.JPG" # replace-image file
    example_image_for_inference = os.path.join(IMG_DIR_VAL, example_image_filename) 
    
    if os.path.exists(example_image_for_inference):
        print(f"\nPerforming inference on {example_image_for_inference}...")
        predict_and_visualize(example_image_for_inference, SAVE_MODEL_PATH, dummy_dataset_for_labels)
    else:
        print(f"\nSkipping inference: {example_image_for_inference} not found.")
        print("Please ensure the example image path is correct and the file exists in your validation directory.")
else:
    print(f"Cannot perform inference: Training annotation file not found at {ANNOTATION_FILE_TRAIN}")

NameError: name 'os' is not defined