In [2]:
# ==========================================================
# PIPELINE LENGKAP (India Plate):
# DCP (opsional haze) -> YOLOv11 detect plate -> Crop + margin
# -> CLAHE + blur ringan -> PaddleOCR
# -> jika invalid/low_conf: Real-ESRGAN x2 -> OCR ulang -> voting
#
# Catatan:
# - BUTUH: ultralytics (YOLOv11), paddleocr, opencv-python
# - Real-ESRGAN: script/CLI eksternal (kita panggil via subprocess).
#   Pastikan Anda punya executable/entrypoint Real-ESRGAN.
# ==========================================================

import os
import re
import cv2
import csv
import math
import shutil
import subprocess
import numpy as np
from glob import glob
from typing import Tuple, Optional, List, Dict

from paddleocr import PaddleOCR
from ultralytics import YOLO

# -------------------------
# STABILIZER (Windows/Jupyter)
# -------------------------
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"
os.environ["NUMEXPR_NUM_THREADS"] = "1"
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

# ==========================================================
# CONFIG (GANTI SESUAI PATH ANDA)
# ==========================================================
INPUT_DIR   = r"D:\Uniska\~Disertasi\~Projek Jurnal\input"     # folder gambar kendaraan (full image)
OUT_DIR     = r"D:\Uniska\~Disertasi\~Projek Jurnal\out_pipeline"      # output folder
YOLO_WEIGHTS= r"D:\Uniska\~Disertasi\~Projek Jurnal\best.pt"      # YOLOv11 plate detector weights

OUT_CSV     = os.path.join(OUT_DIR, "results_pipeline.csv")

# Real-ESRGAN (x2) config
USE_REALESRGAN_FALLBACK = True
REALESRGAN_EXE = r"D:\Uniska\~Disertasi\~Projek Jurnal\realesrgan-ncnn-vulkan-20220424-windows\realesrgan-ncnn-vulkan.exe"  # contoh (Windows). Sesuaikan.
REALESRGAN_MODEL = "realesrgan-x2plus"                         # model name tergantung build Anda
REALESRGAN_SCALE = "2"

# Threshold keputusan fallback SR
MIN_OCR_CONF = 0.85     # jika < ini -> fallback
REQUIRE_VALID = True    # jika invalid -> fallback

# Crop margin (5-10%)
CROP_MARGIN = 0.08      # 0.05 s/d 0.10

# CLAHE
CLAHE_CLIP = 2.0        # 1.5 - 2.5
CLAHE_TILE = (8, 8)
BLUR_KSIZE = (3, 3)

# Haze detection & DCP
ENABLE_DCP = True
HAZE_THRESHOLD = 30.0   # semakin besar semakin "ketat" memutuskan haze (bisa Anda tuning)

# India plate format
IND_LOOSE = re.compile(r"^[A-Z]{2}\d{1,2}[A-Z]{1,3}\d{1,4}$")
IND_SPLIT = re.compile(r"^([A-Z]{2})(\d{1,2})([A-Z]{1,3})(\d{1,4})$")

# ==========================================================
# UTILS: dirs
# ==========================================================
os.makedirs(OUT_DIR, exist_ok=True)
CROP_DIR = os.path.join(OUT_DIR, "crops")
SR_DIR   = os.path.join(OUT_DIR, "sr_x2")
os.makedirs(CROP_DIR, exist_ok=True)
os.makedirs(SR_DIR, exist_ok=True)

# ==========================================================
# NORMALIZE / OCR JOIN
# ==========================================================
def norm_plate(s: str) -> str:
    return re.sub(r"[^A-Z0-9]", "", (s or "").upper())

def join_sorted_by_x_with_conf(res):
    """
    PaddleOCR output det=False: [[ [box,(text,score)], ... ]]
    Return: (plate, avg_conf)
    """
    if res is None:
        return "", 0.0
    line = res[0] if isinstance(res, list) and len(res) == 1 and isinstance(res[0], list) else res
    if not line:
        return "", 0.0

    items = []
    confs = []
    for item in line:
        if item is None:
            continue
        try:
            box = item[0]
            text = item[1][0]
            score = float(item[1][1])
        except Exception:
            continue
        if not box or not text:
            continue
        x_left = min(pt[0] for pt in box)
        items.append((x_left, text))
        confs.append(score)

    if not items:
        return "", 0.0

    items.sort(key=lambda x: x[0])
    plate = norm_plate("".join(t for _, t in items))
    avg_conf = sum(confs) / len(confs) if confs else 0.0
    return plate, avg_conf

def is_valid_india_plate(p: str) -> bool:
    return bool(IND_LOOSE.match(norm_plate(p)))

# ==========================================================
# HAZE DETECTION (simple, fast)
# ==========================================================
def haze_score_simple(img_bgr) -> float:
    """
    Estimasi haze cepat:
    haze tinggi -> kontras tepi rendah
    pakai std dari Laplacian sebagai indikator ketajaman/kontras.
    Nilai kecil -> lebih blur/hazy.
    """
    gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
    lap = cv2.Laplacian(gray, cv2.CV_64F)
    return float(lap.std())

# ==========================================================
# DCP (Dark Channel Prior) - ringkas
# ==========================================================
def dark_channel(img, size=15):
    # img in [0,1]
    min_img = np.min(img, axis=2)
    kernel = cv2.getStructuringElement(cv2.MORPH_RECT, (size, size))
    return cv2.erode(min_img, kernel)

def estimate_atmospheric_light(img, dark):
    h, w = dark.shape
    numpx = max(int(h*w*0.001), 1)
    flat_dark = dark.reshape(-1)
    flat_img = img.reshape(-1, 3)
    idx = np.argsort(flat_dark)[-numpx:]
    A = np.mean(flat_img[idx], axis=0)
    return A

def estimate_transmission(img, A, omega=0.95, size=15):
    normed = img / (A.reshape(1,1,3) + 1e-6)
    return 1 - omega * dark_channel(normed, size)

def guided_filter(I, p, r=40, eps=1e-3):
    # I, p: grayscale float32 [0,1]
    mean_I = cv2.boxFilter(I, cv2.CV_32F, (r,r))
    mean_p = cv2.boxFilter(p, cv2.CV_32F, (r,r))
    mean_Ip = cv2.boxFilter(I*p, cv2.CV_32F, (r,r))
    cov_Ip = mean_Ip - mean_I*mean_p

    mean_II = cv2.boxFilter(I*I, cv2.CV_32F, (r,r))
    var_I = mean_II - mean_I*mean_I

    a = cov_Ip / (var_I + eps)
    b = mean_p - a*mean_I

    mean_a = cv2.boxFilter(a, cv2.CV_32F, (r,r))
    mean_b = cv2.boxFilter(b, cv2.CV_32F, (r,r))
    q = mean_a*I + mean_b
    return q

def dehaze_dcp(img_bgr, t0=0.1, omega=0.95, win=15):
    img = img_bgr.astype(np.float32) / 255.0
    dark = dark_channel(img, win)
    A = estimate_atmospheric_light(img, dark)
    t = estimate_transmission(img, A, omega=omega, size=win)

    gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY).astype(np.float32)/255.0
    t_ref = guided_filter(gray, t.astype(np.float32), r=40, eps=1e-3)

    t_ref = np.clip(t_ref, t0, 1.0)
    J = (img - A) / t_ref[...,None] + A
    J = np.clip(J, 0, 1)
    return (J*255).astype(np.uint8)

# ==========================================================
# YOLOv11 DETECT + CROP
# ==========================================================
def expand_box(x1,y1,x2,y2, margin, w, h):
    bw = x2-x1
    bh = y2-y1
    mx = bw*margin
    my = bh*margin
    nx1 = max(0, int(x1 - mx))
    ny1 = max(0, int(y1 - my))
    nx2 = min(w-1, int(x2 + mx))
    ny2 = min(h-1, int(y2 + my))
    return nx1, ny1, nx2, ny2

def detect_plate_yolo(model: YOLO, img_bgr, conf=0.25) -> Optional[Tuple[int,int,int,int,float]]:
    """
    Return best box (x1,y1,x2,y2,score) or None
    """
    results = model.predict(img_bgr, conf=conf, verbose=False)
    if not results:
        return None
    r = results[0]
    if r.boxes is None or len(r.boxes) == 0:
        return None

    # pick highest conf
    best = None
    for b in r.boxes:
        x1,y1,x2,y2 = b.xyxy[0].cpu().numpy().tolist()
        sc = float(b.conf[0].cpu().numpy())
        if best is None or sc > best[4]:
            best = (int(x1), int(y1), int(x2), int(y2), sc)
    return best

# ==========================================================
# CLAHE + blur
# ==========================================================
def preprocess_for_ocr(img_bgr, clip=2.0):
    gray = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2GRAY)
    gray = cv2.GaussianBlur(gray, BLUR_KSIZE, 0)
    clahe = cv2.createCLAHE(clipLimit=float(clip), tileGridSize=CLAHE_TILE)
    gray = clahe.apply(gray)
    return gray

# ==========================================================
# Real-ESRGAN x2 via CLI (fallback)
# ==========================================================
def run_realesrgan_x2(in_path: str, out_path: str) -> bool:
    """
    Sesuaikan args sesuai build Real-ESRGAN Anda.
    Contoh untuk realesrgan-ncnn-vulkan.exe:
      realesrgan-ncnn-vulkan.exe -i input.png -o output.png -n realesrgan-x2plus -s 2
    """
    if not os.path.exists(REALESRGAN_EXE):
        return False

    cmd = [
        REALESRGAN_EXE,
        "-i", in_path,
        "-o", out_path,
        "-n", REALESRGAN_MODEL,
        "-s", REALESRGAN_SCALE
    ]

    try:
        subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
        return os.path.exists(out_path)
    except Exception:
        return False

# ==========================================================
# OCR + voting (original vs SR)
# ==========================================================
def score_candidate(plate: str, conf: float) -> float:
    p = norm_plate(plate)
    sc = 0.0
    if IND_LOOSE.match(p):
        sc += 100.0
        m = IND_SPLIT.match(p)
        if m:
            st, rto, se, nu = m.groups()
            # state valid bonus
            if st in INDIA_STATES:
                sc += 50.0
            if len(nu) == 4:
                sc += 10.0
            if len(nu) == 4 and nu[0] == "0":
                sc += 5.0
    sc += 30.0 * float(conf)
    sc += min(len(p), 10)
    return sc

def ocr_one(ocr: PaddleOCR, img_gray_or_bgr) -> Tuple[str, float]:
    res = ocr.ocr(img_gray_or_bgr, cls=False)
    plate, conf = join_sorted_by_x_with_conf(res)
    return plate, conf

def ocr_with_preprocess(ocr: PaddleOCR, plate_bgr, clip=2.0) -> Tuple[str, float]:
    proc = preprocess_for_ocr(plate_bgr, clip=clip)
    return ocr_one(ocr, proc)

def decide_need_sr(valid: bool, conf: float) -> bool:
    if REQUIRE_VALID and (not valid):
        return True
    if conf < MIN_OCR_CONF:
        return True
    return False

# ==========================================================
# MAIN
# ==========================================================
def main():
    # init models
    yolo = YOLO(YOLO_WEIGHTS)
    ocr = PaddleOCR(lang="en", det=False, use_angle_cls=False, use_gpu=False, show_log=False)

    exts = ("*.png","*.jpg","*.jpeg","*.bmp","*.webp")
    files = []
    for e in exts:
        files.extend(glob(os.path.join(INPUT_DIR, e)))
    files = sorted(files)

    print("Total images:", len(files))

    with open(OUT_CSV, "w", newline="", encoding="utf-8-sig") as f:
        writer = csv.writer(f)
        writer.writerow([
            "file",
            "haze_score","used_dcp",
            "yolo_conf","crop_path",
            "orig_plate","orig_conf","orig_valid",
            "sr_used","sr_path",
            "sr_plate","sr_conf","sr_valid",
            "best_plate","best_conf","best_source"
        ])

        for idx, path in enumerate(files, 1):
            fname = os.path.basename(path)
            img = cv2.imread(path)
            if img is None:
                continue

            used_dcp = False
            hs = haze_score_simple(img)

            # DCP only if haze-like (low edge contrast)
            if ENABLE_DCP and hs < HAZE_THRESHOLD:
                img = dehaze_dcp(img)
                used_dcp = True

            # YOLO detect
            det = detect_plate_yolo(yolo, img, conf=0.25)
            if det is None:
                # no detection -> skip
                writer.writerow([fname, hs, used_dcp, "", "", "", 0, False, False, "", "", 0, False, "", 0, ""])
                continue

            x1,y1,x2,y2,yc = det
            h, w = img.shape[:2]
            x1,y1,x2,y2 = expand_box(x1,y1,x2,y2, CROP_MARGIN, w, h)
            plate_bgr = img[y1:y2, x1:x2]

            crop_path = os.path.join(CROP_DIR, fname)
            cv2.imwrite(crop_path, plate_bgr)

            # OCR original (CLAHE+blur)
            orig_plate, orig_conf = ocr_with_preprocess(ocr, plate_bgr, clip=CLAHE_CLIP)
            orig_plate = norm_plate(orig_plate)
            orig_valid = is_valid_india_plate(orig_plate)

            # decide SR fallback
            sr_used = False
            sr_path = ""
            sr_plate = ""
            sr_conf = 0.0
            sr_valid = False

            if USE_REALESRGAN_FALLBACK and decide_need_sr(orig_valid, orig_conf):
                # run SR x2 on crop
                sr_path = os.path.join(SR_DIR, fname)
                ok = run_realesrgan_x2(crop_path, sr_path)
                if ok:
                    sr_used = True
                    sr_img = cv2.imread(sr_path)
                    if sr_img is not None:
                        sr_plate, sr_conf = ocr_with_preprocess(ocr, sr_img, clip=CLAHE_CLIP)
                        sr_plate = norm_plate(sr_plate)
                        sr_valid = is_valid_india_plate(sr_plate)

            # voting between orig and sr
            cand = []
            cand.append(("orig", orig_plate, orig_conf, score_candidate(orig_plate, orig_conf)))
            if sr_used:
                cand.append(("sr_x2", sr_plate, sr_conf, score_candidate(sr_plate, sr_conf)))

            cand.sort(key=lambda x: x[3], reverse=True)
            best_source, best_plate, best_conf, _ = cand[0]

            writer.writerow([
                fname,
                round(hs, 3), used_dcp,
                round(yc, 4), crop_path,
                orig_plate, round(orig_conf, 4), orig_valid,
                sr_used, sr_path,
                sr_plate, round(sr_conf, 4), sr_valid,
                best_plate, round(best_conf, 4), best_source
            ])

            if idx % 25 == 0:
                print(f"Processed {idx}/{len(files)}")

    print("Done:", OUT_CSV)

if __name__ == "__main__":
    main()


OSError: [WinError 127] The specified procedure could not be found. Error loading "c:\Users\asus\anaconda3\envs\ocr-plate\lib\site-packages\torch\lib\shm.dll" or one of its dependencies.