# Mask R-CNN — Word Segmentation for Answer Sheets (PyTorch/Torchvision)

Trains **Mask R-CNN (ResNet-50-FPN v2)** to segment **handwritten words** on scanned answer sheets and, at inference, classifies each detected word as **Left/Right of the vertical border** to support grouping by questions.

**Stable on Windows / Python 3.11** (uses torchvision build, no Detectron2).


## 0) Install (run in your venv PowerShell)
```powershell
pip uninstall -y numpy opencv-python
pip cache purge
pip install "numpy==1.26.4" "opencv-python==4.8.1.78" pillow matplotlib

# PyTorch (choose ONE)
# CPU-only:
pip install "torch>=2.2" "torchvision>=0.17" --index-url https://download.pytorch.org/whl/cpu
# OR CUDA 12.1 (if supported):
# pip install torch torchvision --index-url https://download.pytorch.org/whl/cu121

# COCO API (Windows wheel)
pip install pycocotools-windows
# (Linux/Mac: pip install pycocotools)
```


## 1) Dataset (COCO format)
Expected structure:
```
DATA_ROOT/
  train/
    images/*.jpg|png
    annotations/instances_train.json
  val/
    images/*.jpg|png
    annotations/instances_val.json
```
- Single category: **`word`** with `category_id=1` in COCO.
- Export via Label Studio / VIA in COCO format.


In [None]:
from __future__ import annotations
from pathlib import Path
from typing import Any, Dict, List, Tuple

import json, random, time
import numpy as np
import torch
import torch.utils.data
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import cv2
import torchvision
from torchvision.transforms.functional import to_tensor

# COCO API
from pycocotools.coco import COCO
from pycocotools import mask as maskUtils

from torchvision.models.detection import maskrcnn_resnet50_fpn_v2
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.mask_rcnn import MaskRCNNPredictor

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print("Torch:", torch.__version__, "| Torchvision:", torchvision.__version__, "| Device:", DEVICE)

# ---- Paths (EDIT THESE) ----
DATA_ROOT    = Path(r"E:\EvaluationAI\datasets\answer_words_coco")
TRAIN_JSON   = DATA_ROOT / "train" / "annotations" / "instances_train.json"
VAL_JSON     = DATA_ROOT / "val"   / "annotations" / "instances_val.json"
TRAIN_IMGDIR = DATA_ROOT / "train" / "images"
VAL_IMGDIR   = DATA_ROOT / "val"   / "images"

OUT_DIR = Path("maskrcnn_wordseg_outputs"); OUT_DIR.mkdir(parents=True, exist_ok=True)

# ---- Hyperparams ----
NUM_CLASSES = 2          # background + word
EPOCHS      = 30
BATCH_SIZE  = 2
LR          = 0.005
WORKERS     = 2
SCORE_THR   = 0.50

print("Train JSON:", TRAIN_JSON.exists(), "| Val JSON:", VAL_JSON.exists())
print("Train imgs:", TRAIN_IMGDIR.exists(), "| Val imgs:", VAL_IMGDIR.exists())


In [None]:
def coco_poly_to_mask(segmentation, height, width):
    # segmentation can be RLE or list of polygons
    if isinstance(segmentation, list):
        rles = maskUtils.frPyObjects(segmentation, height, width)
        rle = maskUtils.merge(rles)
    elif isinstance(segmentation['counts'], list):
        rle = maskUtils.frPyObjects(segmentation, height, width)
    else:
        rle = segmentation
    m = maskUtils.decode(rle)
    if m.ndim == 3:
        m = np.any(m, axis=2)
    return m.astype(np.uint8)


In [None]:
class CocoWordDataset(Dataset):
    def __init__(self, img_dir: Path, ann_json: Path):
        self.img_dir = Path(img_dir)
        self.coco = COCO(str(ann_json))
        self.img_ids = list(self.coco.imgs.keys())
        # Map category ids to contiguous [1..N]
        self.cat_id_to_contig = {cat_id: i+1 for i, cat_id in enumerate(sorted(self.coco.getCatIds()))}

    def __len__(self): return len(self.img_ids)

    def __getitem__(self, idx: int):
        img_id = self.img_ids[idx]
        info = self.coco.loadImgs([img_id])[0]
        img_path = self.img_dir / info["file_name"]
        image = Image.open(img_path).convert("RGB")
        W, H = image.size

        ann_ids = self.coco.getAnnIds(imgIds=[img_id], iscrowd=None)
        anns = self.coco.loadAnns(ann_ids)

        masks, boxes, labels, areas, iscrowd = [], [], [], [], []
        for ann in anns:
            if "segmentation" not in ann: 
                continue
            m = coco_poly_to_mask(ann["segmentation"], H, W)
            if m.sum() <= 0: 
                continue
            ys, xs = np.where(m > 0)
            x0, y0, x1, y1 = xs.min(), ys.min(), xs.max(), ys.max()
            if (x1-x0) <= 0 or (y1-y0) <= 0: 
                continue
            masks.append(m)
            boxes.append([x0, y0, x1, y1])
            labels.append(self.cat_id_to_contig.get(ann["category_id"], 1))
            areas.append(float((x1-x0)*(y1-y0)))
            iscrowd.append(ann.get("iscrowd", 0))

        if len(masks)==0:
            masks = np.zeros((0, H, W), dtype=np.uint8)
            boxes = np.zeros((0, 4), dtype=np.float32)
            labels= np.zeros((0,), dtype=np.int64)
            areas = np.zeros((0,), dtype=np.float32)
            iscrowd = np.zeros((0,), dtype=np.int64)
        else:
            masks = np.stack(masks, axis=0).astype(np.uint8)
            boxes = np.array(boxes, dtype=np.float32)
            labels= np.array(labels, dtype=np.int64)
            areas = np.array(areas, dtype=np.float32)
            iscrowd = np.array(iscrowd, dtype=np.int64)

        image_tensor = to_tensor(image)  # [0,1]
        target = {
            "boxes": torch.as_tensor(boxes, dtype=torch.float32),
            "labels": torch.as_tensor(labels, dtype=torch.int64),
            "masks": torch.as_tensor(masks, dtype=torch.uint8),
            "image_id": torch.as_tensor([img_id]),
            "area": torch.as_tensor(areas, dtype=torch.float32),
            "iscrowd": torch.as_tensor(iscrowd, dtype=torch.int64),
        }
        return image_tensor, target

def collate_fn(batch): return tuple(zip(*batch))

train_ds = CocoWordDataset(TRAIN_IMGDIR, TRAIN_JSON)
val_ds   = CocoWordDataset(VAL_IMGDIR,   VAL_JSON)
train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,  num_workers=WORKERS, collate_fn=collate_fn)
val_loader   = DataLoader(val_ds,   batch_size=1,         shuffle=False, num_workers=WORKERS, collate_fn=collate_fn)

print("Train images:", len(train_ds), "| Val images:", len(val_ds))


In [None]:
def get_model(num_classes: int):
    model = maskrcnn_resnet50_fpn_v2(weights="DEFAULT")
    # Replace heads
    in_features = model.roi_heads.box_predictor.cls_score.in_features
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
    model.roi_heads.mask_predictor = MaskRCNNPredictor(in_features_mask, 256, num_classes)
    return model

model = get_model(NUM_CLASSES).to(DEVICE)
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=LR, momentum=0.9, weight_decay=1e-4)
lr_scheduler = torch.optim.lr_scheduler.MultiStepLR(optimizer, milestones=[int(EPOCHS*0.6), int(EPOCHS*0.85)], gamma=0.1)

print("Model ready. Trainable params:", sum(p.numel() for p in params))


In [None]:
def train_one_epoch(model, optimizer, loader, device, epoch):
    model.train()
    loss_sum = 0.0; t0 = time.time()
    for i, (images, targets) in enumerate(loader):
        images  = [img.to(device) for img in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        loss_dict = model(images, targets)
        loss = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        loss_sum += float(loss.item())
        if (i+1) % 10 == 0:
            print(f"[Epoch {epoch}] {i+1}/{len(loader)} loss={loss.item():.4f} "
                  f"cls={loss_dict['loss_classifier']:.3f} box={loss_dict['loss_box_reg']:.3f} "
                  f"mask={loss_dict['loss_mask']:.3f}")
    return loss_sum / max(1, len(loader))

best = 1e9
for epoch in range(1, EPOCHS+1):
    avg = train_one_epoch(model, optimizer, train_loader, DEVICE, epoch)
    lr_scheduler.step()
    print(f"Epoch {epoch} done | avg loss {avg:.4f}")
    torch.save(model.state_dict(), OUT_DIR / f"maskrcnn_words_e{epoch}.pth")
    if avg < best:
        best = avg
        torch.save(model.state_dict(), OUT_DIR / "maskrcnn_words_best.pth")
        print("  ↳ saved best checkpoint")


In [None]:
def detect_vertical_border(img_bgr: np.ndarray) -> int | None:
    H,W = img_bgr.shape[:2]
    gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
    bw = cv2.adaptiveThreshold(gray,255,cv2.ADAPTIVE_THRESH_MEAN_C,cv2.THRESH_BINARY_INV,35,10)
    vk = cv2.getStructuringElement(cv2.MORPH_RECT,(1,max(25,H//60)))
    vert = cv2.morphologyEx(bw, cv2.MORPH_OPEN, vk, 1)
    edges = cv2.Canny(vert, 50, 150)
    lines = cv2.HoughLinesP(edges, 1, np.pi/180, 120, minLineLength=int(0.5*H), maxLineGap=10)
    if lines is None: return None
    cand = []
    for x1,y1,x2,y2 in lines[:,0]:
        if abs(x1-x2) <= 6:
            x = int((x1+x2)//2)
            if 0 <= x <= int(0.6*W): cand.append((abs(y2-y1), x))
    if not cand: return None
    top = max(c[0] for c in cand)
    left = min([c for c in cand if c[0] >= 0.8*top], key=lambda t: t[1])
    return int(left[1])

def run_inference(model, img_path: str|Path, out_dir: Path, score_thr=SCORE_THR, border_x: int|None=None):
    model.eval()
    with torch.no_grad():
        bgr = cv2.imread(str(img_path))
        rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
        x = to_tensor(Image.fromarray(rgb)).to(DEVICE)
        pred = model([x])[0]

    H,W = bgr.shape[:2]
    scores = pred["scores"].detach().cpu().numpy()
    boxes  = pred["boxes"].detach().cpu().numpy().astype(np.int32)
    masks  = pred["masks"].detach().cpu().numpy()[:,0]

    keep = np.where(scores >= score_thr)[0]
    boxes, masks, scores = boxes[keep], masks[keep], scores[keep]

    if border_x is None:
        bx = detect_vertical_border(bgr)
        border_x = bx if bx is not None else int(0.18*W)

    vis = bgr.copy()
    cv2.line(vis, (border_x,0), (border_x,H-1), (0,165,255), 2, cv2.LINE_AA)

    left_dir  = out_dir / "words_left"
    right_dir = out_dir / "words_right"
    left_dir.mkdir(parents=True, exist_ok=True)
    right_dir.mkdir(parents=True, exist_ok=True)

    for i,(box,mask,score) in enumerate(zip(boxes,masks,scores), start=1):
        x0,y0,x1,y1 = box.tolist()
        cx = int((x0 + x1)/2)
        side = "left" if cx <= border_x else "right"
        color = (0,200,0) if side=="right" else (255,128,0)

        cv2.rectangle(vis,(x0,y0),(x1,y1),color,2,cv2.LINE_AA)
        cv2.putText(vis, f"{score:.2f}/{side}", (x0, max(0,y0-6)), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1, cv2.LINE_AA)

        pad=2
        xs0,ys0 = max(0,x0-pad), max(0,y0-pad)
        xs1,ys1 = min(W-1,x1+pad), min(H-1,y1+pad)
        crop = bgr[ys0:ys1, xs0:xs1]
        cv2.imwrite(str((left_dir if side=='left' else right_dir)/f"word_{i:04d}.png"), crop)

    cv2.imwrite(str(out_dir/"overlay.png"), vis)
    print(f"Saved overlay + crops to: {out_dir.resolve()} | kept {len(keep)} words (thr={score_thr})")

# Example (uncomment after training):
# test_img = r"E:\EvaluationAI\Dataset\42.jpg"
# run_inference(model, test_img, OUT_DIR, score_thr=0.5)
