In [1]:
# run once in a notebook cell from repo root
from pathlib import Path
import shutil, os

ROOT = Path("../datasets/ICPR")
STD  = ROOT / "icdar15_std"
(STD / "ch4_training_images").mkdir(parents=True, exist_ok=True)
(STD / "ch4_training_localization_transcription_gt").mkdir(parents=True, exist_ok=True)
(STD / "ch4_test_images").mkdir(parents=True, exist_ok=True)
(STD / "ch4_test_localization_transcription_gt").mkdir(parents=True, exist_ok=True)

def link_or_copy(src, dst):
    try:
        os.link(src, dst)  # hardlink (fast, no extra space if same drive)
    except Exception:
        shutil.copy2(src, dst)

# train
for p in (ROOT/"train").glob("*.jpg"):
    link_or_copy(p, STD/"ch4_training_images"/p.name)
for p in (ROOT/"train_truth").glob("gt_*.txt"):
    link_or_copy(p, STD/"ch4_training_localization_transcription_gt"/p.name)

# test
for p in (ROOT/"test").glob("*.jpg"):
    link_or_copy(p, STD/"ch4_test_images"/p.name)
for p in (ROOT/"test_truth").glob("gt_*.txt"):
    link_or_copy(p, STD/"ch4_test_localization_transcription_gt"/p.name)

print("ICDAR15 standardized at:", STD.resolve())


PermissionError: [WinError 32] The process cannot access the file because it is being used by another process

In [None]:
from pathlib import Path
STD = Path("../datasets/ICPR/icdar15_std")
print("train imgs:", len(list((STD/"ch4_training_images").glob("*.jpg"))))
print("train gts :", len(list((STD/"ch4_training_localization_transcription_gt").glob("gt_*.txt"))))
print("test  imgs:", len(list((STD/"ch4_test_images").glob("*.jpg"))))
print("test  gts :", len(list((STD/"ch4_test_localization_transcription_gt").glob("gt_*.txt"))))


train imgs: 1000
train gts : 1000
test  imgs: 500
test  gts : 500


# Optional baseline

In [29]:
import os, re, time, json
import numpy as np
import cv2
import torch

print("Torch:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())
if torch.cuda.is_available():
    print("GPU:", torch.cuda.get_device_name(0))
device = "cuda" if torch.cuda.is_available() else "cpu"


Torch: 2.8.0+cu128
CUDA available: True
GPU: NVIDIA GeForce RTX 5060 Laptop GPU


In [30]:
from typing import List, Dict, Tuple

class OCRPipeline:
    def __init__(self, det_arch="db_resnet50", reco_arch="parseq"):
        try:
            from doctr.models import ocr_predictor
            self.kind = "doctr"
            self.model = ocr_predictor(det_arch=det_arch, reco_arch=reco_arch, pretrained=True)
            self.model = self.model.to(device).eval()
        except Exception as e:
            print("docTR unavailable → falling back to EasyOCR:", e)
            import easyocr
            self.kind = "easyocr"
            self.reader = easyocr.Reader(["en"], gpu=torch.cuda.is_available())

    def infer(self, img_bgr) -> Dict:
        """
        Returns a normalized export dict like docTR's .export() so downstream code is uniform.
        """
        if self.kind == "doctr":
            rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB)
            doc = self.model([rgb])
            return doc.export()   # {pages:[{blocks:[{lines:[{words:[{value, confidence, geometry}]}]}]}]}
        else:
            # emulate a minimal export with EasyOCR
            res = self.reader.readtext(img_bgr, detail=1, paragraph=False)
            H, W = img_bgr.shape[:2]
            words = []
            for box, text, conf in res:
                xs = [p[0]/W for p in box]; ys = [p[1]/H for p in box]
                x0, x1 = min(xs), max(xs); y0, y1 = min(ys), max(ys)
                words.append({"value": text, "confidence": float(conf), "geometry": ((x0,y0),(x1,y1))})
            return {"pages":[{"blocks":[{"lines":[{"words":words, "geometry": ((0,0),(1,1))}]}]}]}
            
ocr = OCRPipeline(det_arch="db_resnet50", reco_arch="parseq")
print("OCR backend ready.")


OCR backend ready.


In [31]:
# Base tokens (extend per locale)
STREET_TOKENS = r"(Street|St|Road|Rd|Avenue|Ave|Drive|Dr|Lane|Ln|Boulevard|Blvd|Way|Terrace|Ter|Court|Ct|Crescent|Cres|Place|Pl|Highway|Hwy|Expressway|Expwy|Jalan|Jln|Lorong|Lor)"
UNIT          = r"#\s?\d{1,3}-\d{1,4}"
POSTAL_SG     = r"\b(?:S\s*)?\d{6}\b"
HOUSE_NO      = r"\b(?:Blk|Block)?\s?\d{1,5}[A-Z]?\b"
COMPOSED      = rf"{HOUSE_NO}.*\b{STREET_TOKENS}\b"

PATTERNS = [STREET_TOKENS, UNIT, POSTAL_SG, COMPOSED]
REGEXES  = [re.compile(p, re.IGNORECASE) for p in PATTERNS]

# Optional: load extra street names from a text file (one per line)
EXTRA_STREETS_FILE = None  # e.g., "./runs_sg/street_tokens.txt"
if EXTRA_STREETS_FILE and os.path.exists(EXTRA_STREETS_FILE):
    with open(EXTRA_STREETS_FILE, "r", encoding="utf-8") as f:
        names = [re.escape(l.strip()) for l in f if l.strip()]
    if names:
        REGEXES.append(re.compile(r"(" + "|".join(names) + r")", re.IGNORECASE))

def is_pii_text(text: str, conf: float, conf_thresh: float = 0.35) -> bool:
    """Rules only; no ML, no fine-tuning."""
    if not text or conf < conf_thresh:
        return False
    t = text.strip()
    return any(rx.search(t) for rx in REGEXES)


In [32]:
def poly_from_box_norm(box, W, H):
    (x0,y0),(x1,y1) = box
    x0, x1 = int(x0*W), int(x1*W)
    y0, y1 = int(y0*H), int(y1*H)
    return np.array([[x0,y0],[x1,y0],[x1,y1],[x0,y1]], dtype=np.int32)

def iou(a, b):
    xi1, yi1 = max(a[0],b[0]), max(a[1],b[1])
    xi2, yi2 = min(a[2],b[2]), min(a[3],b[3])
    inter = max(0, xi2-xi1) * max(0, yi2-yi1)
    if inter <= 0: return 0.0
    area_a = (a[2]-a[0])*(a[3]-a[1]); area_b = (b[2]-b[0])*(b[3]-b[1])
    return inter / (area_a + area_b - inter + 1e-6)

def aabb_from_poly(poly):
    xs = poly[:,0]; ys = poly[:,1]
    return [int(xs.min()), int(ys.min()), int(xs.max()), int(ys.max())]

def blur_polygon(img, poly, ksize=41, dilate_px=3):
    # expand bbox slightly to be safe
    aabb = aabb_from_poly(poly)
    aabb = [max(0,aabb[0]-dilate_px), max(0,aabb[1]-dilate_px),
            min(img.shape[1]-1, aabb[2]+dilate_px), min(img.shape[0]-1, aabb[3]+dilate_px)]
    x0,y0,x1,y1 = aabb
    if x1<=x0 or y1<=y0: return img
    roi = img[y0:y1, x0:x1].copy()
    img[y0:y1, x0:x1] = cv2.GaussianBlur(roi, (ksize,ksize), 0)
    return img

class Hysteresis:
    def __init__(self, iou_thresh=0.3, k_confirm=2, k_hold=8):
        self.iou_thresh = iou_thresh
        self.k_confirm  = k_confirm
        self.k_hold     = k_hold
        self.tracks     = {}  # id -> dict
        self.next_id    = 1
        self.frame_id   = 0

    def update(self, polys_active: List[Tuple[np.ndarray, bool]]):
        self.frame_id += 1
        # polys_active: list of (poly, is_positive_now)
        # match by IoU against existing tracks
        used = [False]*len(polys_active)
        for tid,t in list(self.tracks.items()):
            # find best match
            best_j, best_iou = -1, 0.0
            taabb = aabb_from_poly(t["poly"])
            for j,(poly,pos) in enumerate(polys_active):
                if used[j]: continue
                paabb = aabb_from_poly(poly)
                ov = iou(taabb, paabb)
                if ov > best_iou:
                    best_iou, best_j = ov, j
            if best_iou >= self.iou_thresh and best_j >= 0:
                poly,pos = polys_active[best_j]
                used[best_j] = True
                t["poly"] = poly
                t["last"] = self.frame_id
                if pos:
                    t["hits"] += 1
                    if not t["active"] and t["hits"] >= self.k_confirm:
                        t["active"] = True
                # decay handled by time since last
            # deactivate after hold window
            if t["active"] and (self.frame_id - t["last"]) > self.k_hold:
                t["active"] = False

        # new tracks
        for j,(poly,pos) in enumerate(polys_active):
            if not used[j]:
                self.tracks[self.next_id] = {
                    "poly": poly, "hits": 1 if pos else 0, "active": bool(pos) and (1>=self.k_confirm), "last": self.frame_id
                }
                self.next_id += 1

        # GC stale
        drop = [tid for tid,t in self.tracks.items() if (self.frame_id - t["last"]) > (3*self.k_hold)]
        for tid in drop: self.tracks.pop(tid, None)

        return [(t["poly"], t["active"]) for t in self.tracks.values()]


In [33]:
def process_frame(frame_bgr,
                  conf_thresh=0.35,
                  line_first=True,
                  ksize=41,
                  min_area=80):
    """
    Returns a list of polygons to blur (in pixel coords) and the rendered frame.
    """
    H,W = frame_bgr.shape[:2]
    data = ocr.infer(frame_bgr)  # docTR-like export
    pages = data.get("pages", [])
    if not pages: return frame_bgr, []

    blur_polys = []

    # 1) Try line-level PII: if the whole line looks like an address, blur all its words.
    for blk in pages[0].get("blocks", []):
        for line in blk.get("lines", []):
            words = line.get("words", [])
            if not words: continue
            line_text = " ".join([w.get("value","") for w in words]).strip()
            line_conf = np.mean([float(w.get("confidence", 1.0)) for w in words]) if words else 1.0

            word_polys = []
            for w in words:
                geom = w["geometry"]
                poly = poly_from_box_norm(geom, W, H)
                if cv2.contourArea(poly) < min_area:
                    continue
                word_polys.append(poly)

            if line_first and is_pii_text(line_text, line_conf, conf_thresh):
                blur_polys.extend(word_polys)
            else:
                # 2) Otherwise test each word
                for w,poly in zip(words, word_polys):
                    if is_pii_text(w.get("value",""), float(w.get("confidence",1.0)), conf_thresh):
                        blur_polys.append(poly)

    # Render (polys only; no stabilization here)
    out = frame_bgr.copy()
    for poly in blur_polys:
        out = blur_polygon(out, poly, ksize=ksize, dilate_px=3)
        cv2.polylines(out, [poly], True, (0,255,0), 2)
    return out, blur_polys


In [34]:
def run_live(source=0, width=1280, height=720, fps_target=30,
             conf_thresh=0.35, k_confirm=2, k_hold=8, ksize=41):
    cap = cv2.VideoCapture(source)
    if width:  cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
    if height: cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
    if fps_target: cap.set(cv2.CAP_PROP_FPS, fps_target)
    assert cap.isOpened(), "Cannot open video source."

    stab = Hysteresis(iou_thresh=0.3, k_confirm=k_confirm, k_hold=k_hold)

    while True:
        t0 = time.time()
        ok, frame = cap.read()
        if not ok: break

        frame_proc, polys = process_frame(frame, conf_thresh=conf_thresh, line_first=True, ksize=ksize)
        # convert to (poly, pos) for stabilization
        st = stab.update([(p, True) for p in polys])

        out = frame.copy()
        for poly, active in st:
            if active:
                out = blur_polygon(out, poly, ksize=ksize, dilate_px=3)
                cv2.polylines(out, [poly], True, (0,255,0), 2)
        fps = 1.0 / max(time.time()-t0, 1e-3)
        cv2.putText(out, f"DBNet+PARSeq | FPS {fps:.1f}", (10,28), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255,255,255), 2)
        cv2.imshow("Live PII Blur (ESC to quit)", out)
        if cv2.waitKey(1) & 0xFF == 27: break

    cap.release(); cv2.destroyAllWindows()

# To run:
# run_live(source=0)   # ESC to exit


In [35]:
def run_video_file(path, out_path="output_blurred.mp4",
                   conf_thresh=0.35, k_confirm=2, k_hold=8, ksize=41):
    cap = cv2.VideoCapture(path)
    assert cap.isOpened(), f"Cannot open {path}"
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)); H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    FPS = cap.get(cv2.CAP_PROP_FPS) or 25.0
    writer = cv2.VideoWriter(out_path, fourcc, FPS, (W,H))
    stab = Hysteresis(iou_thresh=0.3, k_confirm=k_confirm, k_hold=k_hold)

    while True:
        ok, frame = cap.read()
        if not ok: break
        _, polys = process_frame(frame, conf_thresh=conf_thresh, line_first=True, ksize=ksize)
        st = stab.update([(p, True) for p in polys])

        out = frame.copy()
        for poly, active in st:
            if active:
                out = blur_polygon(out, poly, ksize=ksize, dilate_px=3)
        writer.write(out)

    writer.release(); cap.release()
    print("Saved:", out_path)

# Example:
# run_video_file("sample_street.mp4", out_path="sample_street_blurred.mp4")


In [36]:
# Blur *all* detected text (regardless of PII) for a quick demo:
BLUR_ALL = False

def process_frame_alltext(frame_bgr, ksize=41, min_area=80):
    H,W = frame_bgr.shape[:2]
    data = ocr.infer(frame_bgr)
    pages = data.get("pages", [])
    blur_polys=[]
    for blk in pages[0].get("blocks", []):
        for line in blk.get("lines", []):
            for w in line.get("words", []):
                poly = poly_from_box_norm(w["geometry"], W, H)
                if cv2.contourArea(poly) >= min_area:
                    blur_polys.append(poly)
    out = frame_bgr.copy()
    for p in blur_polys:
        out = blur_polygon(out, p, ksize=ksize, dilate_px=3)
    return out, blur_polys

# In run_* loops, swap process_frame → process_frame_alltext when BLUR_ALL=True


# PII Detection

In [38]:
import json, re, random
from pathlib import Path
import numpy as np
import pandas as pd

DATA_FILE = Path("../datasets/mixtral_pii/mixtral-8x7b-v1.json")  # path to the JSON holding 'root': [{...}]

def join_tokens(tokens, trailing_ws):
    """Rebuilds contiguous text the way your dataset intended (space only if trailing_ws[i] is True)."""
    out = []
    for t, tw in zip(tokens, trailing_ws):
        out.append(t)
        if tw: out.append(" ")
    return "".join(out).rstrip()

def extract_entities(tokens, labels, trailing_ws):
    """
    From BIO labels, build a list of entity dicts:
    [{'text': '...', 'type': 'NAME_STUDENT', 'span': (i0, i1)}]
    Also returns a list of contiguous O-spans: [(start, end), ...]
    """
    ents = []
    i = 0
    n = len(tokens)
    o_spans = []
    # collect contiguous O runs for negative sampling
    while i < n:
        lab = labels[i]
        if lab == "O":
            j = i
            while j < n and labels[j] == "O":
                j += 1
            o_spans.append((i, j))   # [i, j)
            i = j
        else:
            # expecting 'B-XXX' then possibly several 'I-XXX'
            if not lab.startswith("B-"):
                # if broken BIO, treat as B-
                typ = lab.split("-", 1)[-1] if "-" in lab else lab
            else:
                typ = lab[2:]
            j = i + 1
            while j < n and labels[j].startswith("I-") and labels[j][2:] == typ:
                j += 1
            # rebuild entity string with dataset's spacing
            text_parts = []
            for k in range(i, j):
                text_parts.append(tokens[k])
                if trailing_ws[k]:
                    text_parts.append(" ")
            ent_text = "".join(text_parts).strip()
            if ent_text:
                ents.append({"text": ent_text, "type": typ, "span": (i, j)})
            i = j
    return ents, o_spans

def sample_negatives(tokens, trailing_ws, o_spans, pos_len_hist, max_neg=20000):
    """
    Sample negative strings from O regions, with a length distribution similar to positives.
    pos_len_hist: dict {length_in_tokens: count}
    """
    negatives = []
    # Flatten a list of candidate (start,end) windows with lengths we need
    lens = []
    for L, cnt in pos_len_hist.items():
        lens += [L] * cnt
    random.shuffle(lens)
    for L in lens:
        # pick a random O-span that can fit length L
        candidates = [(s, e) for (s, e) in o_spans if (e - s) >= L]
        if not candidates: 
            continue
        s, e = random.choice(candidates)
        start = random.randint(s, e - L)
        end = start + L
        # build text
        text_parts = []
        for k in range(start, end):
            text_parts.append(tokens[k])
            if trailing_ws[k]:
                text_parts.append(" ")
        neg = "".join(text_parts).strip()
        if len(neg) >= 2 and not all(ch in ",.;:!?-–—()[]{}'\"/\\|" for ch in neg):
            negatives.append({"text": neg, "type": "O"})
        if len(negatives) >= max_neg:
            break
    return negatives

def load_bio_json(path: Path):
    with open(path, "r", encoding="utf-8") as f:
        raw = json.load(f)
    docs = raw["root"] if "root" in raw else raw
    all_pos = []
    all_neg = []
    for d in docs:
        tokens = d["tokens"]
        labels = d["labels"]
        trailing = d.get("trailing_whitespace", [True]*len(tokens))
        ents, o_spans = extract_entities(tokens, labels, trailing)
        # build positive length histogram
        len_hist = {}
        for ent in ents:
            L = ent["span"][1] - ent["span"][0]
            len_hist[L] = len_hist.get(L, 0) + 1
        negs = sample_negatives(tokens, trailing, o_spans, len_hist, max_neg=5_000)  # cap optional
        all_pos.extend(ents)
        all_neg.extend(negs)
    # Deduplicate texts (optional)
    pos_df = pd.DataFrame([{"text": e["text"], "label": 1, "type": e["type"]} for e in all_pos]).drop_duplicates("text")
    neg_df = pd.DataFrame([{"text": e["text"], "label": 0, "type": e["type"]} for e in all_neg]).drop_duplicates("text")
    return pos_df, neg_df

# Load
pos_df, neg_df = load_bio_json(DATA_FILE)
print("Positives:", len(pos_df), "Negatives:", len(neg_df))
pos_df.head(), neg_df.head()


Positives: 10699 Negatives: 8905


(                                      text  label          type
 0                            Tiburce Evans      1  NAME_STUDENT
 1  https://www.instagram.com/tiburce-evans      1  URL_PERSONAL
 2                                 bLBeoRIe      1        ID_NUM
 3                    001-691-518-9820x5621      1     PHONE_NUM
 5                       Rose-Mai Rodriguez      1  NAME_STUDENT,
                                                 text  label type
 0                                    applications of      0    O
 1                               inclusivity criteria      0    O
 2  from this experience can inform future applica...      0    O
 3                                             Design      0    O
 4                                                 of      0    O)

In [39]:
from sklearn.model_selection import train_test_split

# Balance classes (optional): subsample negatives to match positives × factor
NEG_FACTOR = 1.5
neg_sample = neg_df.sample(
    n=min(len(neg_df), int(NEG_FACTOR * len(pos_df))),
    random_state=42
)

df = pd.concat([pos_df[["text","label"]], neg_sample[["text","label"]]], ignore_index=True)
df = df.sample(frac=1.0, random_state=42).reset_index(drop=True)  # shuffle

X_train, X_val, y_train, y_val = train_test_split(
    df["text"], df["label"], test_size=0.2, random_state=42, stratify=df["label"]
)
len(X_train), len(X_val), y_train.mean(), y_val.mean()


(15683, 3921, np.float64(0.5457501753491041), np.float64(0.5457791379750063))

In [40]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import precision_recall_fscore_support
from sklearn.utils.class_weight import compute_class_weight
import numpy as np, joblib

vec = TfidfVectorizer(analyzer="char", ngram_range=(3,5), min_df=2)
Xtr = vec.fit_transform(X_train)
Xva = vec.transform(X_val)

classes = np.array([0,1])
cw = compute_class_weight("balanced", classes=classes, y=y_train)
clf = LogisticRegression(solver="liblinear", max_iter=1000, class_weight={0:cw[0],1:cw[1]})
clf.fit(Xtr, y_train)

proba_val = clf.predict_proba(Xva)[:,1]

def sweep_threshold(y_true, proba, lo=0.2, hi=0.9, steps=36, prefer="f1"):
    best = {"thr":0.5,"p":0,"r":0,"f1":0}
    for thr in np.linspace(lo, hi, steps):
        yhat = (proba >= thr).astype(int)
        p,r,f1,_ = precision_recall_fscore_support(y_true, yhat, average="binary", zero_division=0)
        if ({"precision":p,"recall":r,"f1":f1}[prefer]) > best[prefer]:
            best = {"thr":float(thr),"p":float(p),"r":float(r),"f1":float(f1)}
    return best

best = sweep_threshold(y_val.values, proba_val, prefer="f1")
print("Best threshold:", best)

joblib.dump({"vec":vec,"clf":clf,"thr":best["thr"]}, "pii_clf.joblib")
print("Saved → pii_clf.joblib")


Best threshold: {'thr': 0.6000000000000001, 'p': 0.9712696941612604, 'r': 0.9794392523364486, 'f1': 0.9753373662168451}
Saved → pii_clf.joblib


In [41]:
# Minimal rules — keep your stronger rules if you already have them
STREET_TOKENS = r"(Street|St|Road|Rd|Avenue|Ave|Drive|Dr|Lane|Ln|Boulevard|Blvd|Way|Terrace|Ter|Court|Ct|Crescent|Cres|Place|Pl|Highway|Hwy|Expressway|Expwy|Jalan|Jln|Lorong|Lor)"
UNIT          = r"#\s?\d{1,3}-\d{1,4}"
POSTAL_SG     = r"\b(?:S\s*)?\d{6}\b"
HOUSE_NO      = r"\b(?:Blk|Block)?\s?\d{1,5}[A-Z]?\b"
COMPOSED      = rf"{HOUSE_NO}.*\b{STREET_TOKENS}\b"
REGEXES  = [re.compile(p, re.IGNORECASE) for p in [STREET_TOKENS, UNIT, POSTAL_SG, COMPOSED]]

def rule_is_pii(text: str) -> int:
    if not text: return 0
    t = text.strip()
    return int(any(rx.search(t) for rx in REGEXES))

# Load classifier once at startup
_bundle = joblib.load("pii_clf.joblib")
_VEC, _CLF, _THR = _bundle["vec"], _bundle["clf"], float(_bundle["thr"])

def pii_prob(text: str) -> float:
    if not text: return 0.0
    return float(_CLF.predict_proba(_VEC.transform([text]))[0,1])

def is_pii_hybrid(text: str, conf: float, conf_thresh=0.35) -> bool:
    if not text or conf < conf_thresh: return False
    if rule_is_pii(text):             # strong rule short-circuit
        return True
    return pii_prob(text) >= _THR


# Fine-tuned version

In [42]:
from doctr.models import ocr_predictor
ocr = ocr_predictor(det_arch="db_resnet50", reco_arch="parseq", pretrained=True).to(device).eval()
print("OCR predictor ready.")


OCR predictor ready.


In [43]:
# Load classifier bundle saved earlier
bundle = joblib.load("pii_clf.joblib")
VEC, CLF, THR = bundle["vec"], bundle["clf"], float(bundle["thr"])
print("Classifier loaded with threshold =", THR)

# Rules (keep/extend as needed for your locale)
STREET_TOKENS = r"(Street|St|Road|Rd|Avenue|Ave|Drive|Dr|Lane|Ln|Boulevard|Blvd|Way|Terrace|Ter|Court|Ct|Crescent|Cres|Place|Pl|Highway|Hwy|Expressway|Expwy|Jalan|Jln|Lorong|Lor)"
UNIT          = r"#\s?\d{1,3}-\d{1,4}"
POSTAL_SG     = r"\b(?:S\s*)?\d{6}\b"
HOUSE_NO      = r"\b(?:Blk|Block)?\s?\d{1,5}[A-Z]?\b"
COMPOSED      = rf"{HOUSE_NO}.*\b{STREET_TOKENS}\b"
REGEXES = [re.compile(p, re.I) for p in [STREET_TOKENS, UNIT, POSTAL_SG, COMPOSED]]

def rule_is_pii(text: str) -> int:
    if not text: return 0
    t = text.strip()
    return int(any(rx.search(t) for rx in REGEXES))

def pii_prob(text: str) -> float:
    if not text: return 0.0
    return float(CLF.predict_proba(VEC.transform([text]))[0,1])

def is_pii_hybrid(text: str, conf: float, conf_thresh: float = 0.35) -> bool:
    """Rules ∨ ML with a confidence gate from OCR."""
    if not text or conf < conf_thresh:
        return False
    if rule_is_pii(text):
        return True
    return pii_prob(text) >= THR


Classifier loaded with threshold = 0.6000000000000001


In [44]:
def poly_from_box_norm(box, W, H):
    (x0,y0),(x1,y1) = box
    x0, x1 = int(x0*W), int(x1*W)
    y0, y1 = int(y0*H), int(y1*H)
    return np.array([[x0,y0],[x1,y0],[x1,y1],[x0,y1]], dtype=np.int32)

def aabb(poly):
    xs, ys = poly[:,0], poly[:,1]
    return [int(xs.min()), int(ys.min()), int(xs.max()), int(ys.max())]

def iou(a, b):
    xi1, yi1 = max(a[0],b[0]), max(a[1],b[1])
    xi2, yi2 = min(a[2],b[2]), min(a[3],b[3])
    inter = max(0, xi2-xi1) * max(0, yi2-yi1)
    if inter <= 0: return 0.0
    area_a = (a[2]-a[0])*(a[3]-a[1]); area_b = (b[2]-b[0])*(b[3]-b[1])
    return inter / (area_a + area_b - inter + 1e-6)

def blur_polygon(img, poly, ksize=41, pad=3):
    x0,y0,x1,y1 = aabb(poly)
    x0, y0 = max(0, x0-pad), max(0, y0-pad)
    x1, y1 = min(img.shape[1]-1, x1+pad), min(img.shape[0]-1, y1+pad)
    if x1<=x0 or y1<=y0: return img
    roi = img[y0:y1, x0:x1].copy()
    img[y0:y1, x0:x1] = cv2.GaussianBlur(roi, (ksize, ksize), 0)
    return img

class Hysteresis:
    """Confirm after K_confirm hits; hold for K_hold frames."""
    def __init__(self, iou_thresh=0.3, K_confirm=2, K_hold=8):
        self.iou_thresh, self.K_confirm, self.K_hold = iou_thresh, K_confirm, K_hold
        self.tracks = {}; self.next_id=1; self.frame=0
    def update(self, polys):
        self.frame += 1
        used = [False]*len(polys)
        # match
        for tid,t in list(self.tracks.items()):
            ta = aabb(t["poly"])
            best, bj = 0.0, -1
            for j,p in enumerate(polys):
                if used[j]: continue
                ov = iou(ta, aabb(p))
                if ov > best: best, bj = ov, j
            if best >= self.iou_thresh and bj>=0:
                t["poly"] = polys[bj]; t["hits"] += 1; t["last"]=self.frame
                if not t["active"] and t["hits"] >= self.K_confirm: t["active"]=True
                used[bj] = True
            if t["active"] and (self.frame - t["last"]) > self.K_hold: t["active"]=False
        # new
        for j,p in enumerate(polys):
            if not used[j]:
                self.tracks[self.next_id] = {"poly":p,"hits":1,"active":(1>=self.K_confirm),"last":self.frame}
                self.next_id += 1
        # gc
        drop=[tid for tid,t in self.tracks.items() if (self.frame - t["last"]) > (3*self.K_hold)]
        for tid in drop: self.tracks.pop(tid, None)
        return [(t["poly"], t["active"]) for t in self.tracks.values()]


In [45]:
def collect_pii_polys(frame_bgr, conf_thresh=0.35, min_area=80):
    H,W = frame_bgr.shape[:2]
    rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
    doc = ocr([rgb]).export()
    polys_to_blur = []

    for blk in doc["pages"][0].get("blocks", []):
        for line in blk.get("lines", []):
            # word-level decisions
            for w in line.get("words", []):
                text = w.get("value","")
                conf = float(w.get("confidence", 1.0))
                poly = poly_from_box_norm(w["geometry"], W, H)
                if cv2.contourArea(poly) < min_area: 
                    continue
                if is_pii_hybrid(text, conf, conf_thresh):
                    polys_to_blur.append(poly)
    return polys_to_blur


In [46]:
def run_video_with_classifier(
    src_path,
    out_path="output_blurred.mp4",
    conf_thresh=0.35,
    K_confirm=2, K_hold=8,
    ksize=41
):
    cap = cv2.VideoCapture(str(src_path))
    assert cap.isOpened(), f"Cannot open {src_path}"
    W = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH));  H = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    FPS = cap.get(cv2.CAP_PROP_FPS) or 25.0
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    writer = cv2.VideoWriter(str(out_path), fourcc, FPS, (W,H))
    stab = Hysteresis(iou_thresh=0.3, K_confirm=K_confirm, K_hold=K_hold)

    frame_idx=0
    while True:
        ok, frame = cap.read()
        if not ok: break
        frame_idx += 1

        # 1) Collect PII polygons from this frame
        polys = collect_pii_polys(frame, conf_thresh=conf_thresh)
        # 2) Stabilize (confirm/hold)
        stabilized = stab.update(polys)

        # 3) Blur active polygons
        out = frame.copy()
        for poly, active in stabilized:
            if active:
                out = blur_polygon(out, poly, ksize=ksize, pad=3)

        writer.write(out)
        if frame_idx % 30 == 0:
            print(f"Processed frame {frame_idx}")

    writer.release(); cap.release()
    print("Saved →", out_path)


## RUNNN

In [47]:
def run_live(source=0,
             width=1280, height=720,  # set None to keep native
             conf_thresh=0.35,
             K_confirm=2, K_hold=8,
             ksize=41,
             show_boxes=False,        # draw green boxes where we blur
             target_fps_txt=True):    # draw FPS overlay
    """
    Live PII blur from webcam/RTSP/USB camera.
    - Press ESC to quit.
    - source can be int (0,1,...) or a string (RTSP/URL).
    """
    cap = cv2.VideoCapture(source)
    if width:  cap.set(cv2.CAP_PROP_FRAME_WIDTH,  width)
    if height: cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
    cap.set(cv2.CAP_PROP_BUFFERSIZE, 2)

    assert cap.isOpened(), f"Cannot open video source: {source}"
    stab = Hysteresis(iou_thresh=0.3, K_confirm=K_confirm, K_hold=K_hold)

    print("Live stream started. Press ESC to exit.")
    while True:
        t0 = time.time()
        ok, frame = cap.read()
        if not ok:
            break

        # Collect PII polygons for this frame (hybrid rules ∨ classifier)
        polys = collect_pii_polys(frame, conf_thresh=conf_thresh)

        # Stabilize detections (confirm/hold)
        tracks = stab.update(polys)

        # Blur active tracks
        out = frame.copy()
        active_cnt = 0
        for poly, active in tracks:
            if active:
                active_cnt += 1
                out = blur_polygon(out, poly, ksize=ksize, pad=3)
                if show_boxes:
                    cv2.polylines(out, [poly], True, (0,255,0), 2)

        # HUD
        if target_fps_txt:
            fps = 1.0 / max(time.time() - t0, 1e-3)
            cv2.putText(out, f"DBNet+PARSeq | Hybrid PII | FPS {fps:.1f} | Blurs {active_cnt}",
                        (10,28), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255,255,255), 2)
            cv2.putText(out, "Privacy Filter ON", (10,55),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.7, (30,220,30), 2)

        cv2.imshow("Live PII Blur (ESC to quit)", out)
        if cv2.waitKey(1) & 0xFF == 27:  # ESC
            break

    cap.release()
    cv2.destroyAllWindows()
    print("Live stream ended.")


In [48]:
# Default webcam
run_live(source=0, conf_thresh=0.35, K_confirm=2, K_hold=8, ksize=41, show_boxes=True)

# If you have multiple cameras, try 1, 2, ...; for RTSP/USB string sources, pass the URL instead.
# run_live(source="rtsp://user:pass@ip:554/stream1")


Live stream started. Press ESC to exit.
Live stream ended.
