In [None]:
# STEP 1: Install only required packages (no roboflow)
# pip install --user pycocotools torchvision matplotlib

import os
import numpy as np
import torch
import matplotlib.pyplot as plt
from PIL import Image
from pycocotools.coco import COCO
from torchvision.transforms import functional as F
from torch.utils.data import DataLoader, Dataset
from torchvision.models.detection import maskrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor

# STEP 2: COCO Dataset Class
class COCODataset(Dataset):
    def __init__(self, root, ann_file):
        self.root = root
        self.coco = COCO(ann_file)
        self.ids = list(sorted(self.coco.imgs.keys()))

    def __getitem__(self, index):
        while True:
            img_id = self.ids[index]
            ann_ids = self.coco.getAnnIds(imgIds=img_id)
            anns = self.coco.loadAnns(ann_ids)
            if len(anns) == 0:
                index = (index + 1) % len(self.ids)
                continue
            path = self.coco.loadImgs(img_id)[0]['file_name']
            img = Image.open(os.path.join(self.root, path)).convert("RGB")
            boxes, labels, masks = [], [], []
            for ann in anns:
                if ann['iscrowd']: continue
                x, y, w, h = ann['bbox']
                boxes.append([x, y, x+w, y+h])
                labels.append(1)
                masks.append(self.coco.annToMask(ann))
            if len(boxes) == 0:
                index = (index + 1) % len(self.ids)
                continue
            target = {
                'boxes': torch.tensor(boxes, dtype=torch.float32),
                'labels': torch.tensor(labels, dtype=torch.int64),
                'masks': torch.tensor(np.stack(masks), dtype=torch.uint8),
                'image_id': torch.tensor([img_id]),
                'area': torch.tensor([ann['area'] for ann in anns], dtype=torch.float32),
                'iscrowd': torch.zeros((len(anns),), dtype=torch.int64)
            }
            img = F.to_tensor(img)
            return img, target

    def __len__(self):
        return len(self.ids)

def collate_fn(batch):
    return tuple(zip(*batch))

# STEP 3: Set paths to your local dataset
dataset_path = r"E:\ACS motion Controller\python code\code\Microfocus-1"  # <-- Change this path

train_dir = os.path.join(dataset_path, "train")
val_dir   = os.path.join(dataset_path, "valid")
test_dir  = os.path.join(dataset_path, "test")

train_data = COCODataset(train_dir, os.path.join(train_dir, "_annotations.coco.json"))
val_data   = COCODataset(val_dir, os.path.join(val_dir, "_annotations.coco.json"))
test_data  = COCODataset(test_dir, os.path.join(test_dir, "_annotations.coco.json"))

train_loader = DataLoader(train_data, batch_size=2, shuffle=True, collate_fn=collate_fn)
val_loader   = DataLoader(val_data, batch_size=2, shuffle=False, collate_fn=collate_fn)
test_loader  = DataLoader(test_data, batch_size=1, shuffle=False, collate_fn=collate_fn)

# STEP 4: Load and customize the model
model = maskrcnn_resnet50_fpn(pretrained=True)
model.roi_heads.box_predictor = FastRCNNPredictor(model.roi_heads.box_predictor.cls_score.in_features, 2)
model.roi_heads.mask_predictor = MaskRCNNPredictor(model.roi_heads.mask_predictor.conv5_mask.in_channels, 256, 2)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# STEP 5: Train the model
optimizer = torch.optim.Adam(model.parameters(), lr=5e-4)
num_epochs = 100
train_losses, val_losses, maps, ap50s, recalls = [], [], [], [], []

from pycocotools.cocoeval import COCOeval

def coco_eval(model, dataset, loader):
    model.eval()
    coco_gt = dataset.coco
    coco_dt = []
    img_ids = []

    with torch.no_grad():
        for imgs, targets in loader:
            imgs = [img.to(device) for img in imgs]
            outputs = model(imgs)
            for target, output in zip(targets, outputs):
                image_id = int(target["image_id"].item())
                img_ids.append(image_id)
                boxes = output["boxes"].cpu().numpy()
                scores = output["scores"].cpu().numpy()
                labels = output["labels"].cpu().numpy()
                for box, score, label in zip(boxes, scores, labels):
                    coco_dt.append({
                        "image_id": image_id,
                        "category_id": 1,
                        "bbox": [box[0], box[1], box[2]-box[0], box[3]-box[1]],
                        "score": float(score)
                    })

    dt_coco = coco_gt.loadRes(coco_dt)
    coco_eval = COCOeval(coco_gt, dt_coco, "bbox")
    coco_eval.params.imgIds = img_ids
    coco_eval.evaluate()
    coco_eval.accumulate()
    coco_eval.summarize()
    return coco_eval.stats

for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    for imgs, targets in train_loader:
        imgs = [img.to(device) for img in imgs]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        loss_dict = model(imgs, targets)
        losses = sum(loss for loss in loss_dict.values())
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()
        train_loss += losses.item()

    model.train()
    val_loss = 0.0
    with torch.no_grad():
        for imgs, targets in val_loader:
            imgs = [img.to(device) for img in imgs]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
            loss_dict = model(imgs, targets)
            losses = sum(loss for loss in loss_dict.values())
            val_loss += losses.item()

    metrics = coco_eval(model, val_data, val_loader)
    train_losses.append(train_loss)
    val_losses.append(val_loss)
    maps.append(metrics[0])
    ap50s.append(metrics[1])
    recalls.append(metrics[8])

    print(f"[Epoch {epoch+1}/{num_epochs}] Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | mAP: {metrics[0]:.4f} | AP50: {metrics[1]:.4f} | Recall: {metrics[8]:.4f}")

# STEP 6: Save the model
torch.save(model.state_dict(), "mask_rcnn_microfocus_local.pth")

# STEP 7: Prediction viewer
def show_predictions(model, dataset, num=5, threshold=0.3):
    model.eval()
    with torch.no_grad():
        for i in range(num):
            img, _ = dataset[i]
            output = model([img.to(device)])[0]
            img_np = img.permute(1, 2, 0).cpu().numpy()

            plt.figure(figsize=(8, 8))
            plt.imshow(img_np)
            plt.axis('off')
            plt.title(f"Sample {i} | Masks (score ≥ {threshold})")

            scores = output['scores'].cpu().numpy()
            masks = output['masks'].cpu().numpy()

            for j in range(len(scores)):
                if scores[j] >= threshold:
                    mask = masks[j][0]
                    plt.imshow(mask, alpha=0.3, cmap='jet')

            plt.show()
