In [1]:
!pip uninstall opencv-python-headless -y
!pip install opencv-python


Found existing installation: opencv-python-headless 4.12.0.88
Uninstalling opencv-python-headless-4.12.0.88:
  Successfully uninstalled opencv-python-headless-4.12.0.88


In [2]:
from ultralytics import YOLO

# Load a COCO-pretrained YOLOv8n model
model = YOLO("yolov8n.pt")
# Train the model on the COCO8 example dataset for 100 epochs
results = model.train(data="dataset/data.yaml", epochs=100, imgsz=640)


[KDownloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8n.pt to 'yolov8n.pt': 100% ━━━━━━━━━━━━ 6.2/6.2MB 1.9MB/s 3.4s0.0s8s
New https://pypi.org/project/ultralytics/8.3.197 available  Update with 'pip install -U ultralytics'
Ultralytics 8.3.191  Python-3.10.18 torch-2.8.0+cu126 CUDA:0 (NVIDIA GeForce RTX 4050 Laptop GPU, 6140MiB)
[34m[1mengine\trainer: [0magnostic_nms=False, amp=True, augment=False, auto_augment=randaugment, batch=16, bgr=0.0, box=7.5, cache=False, cfg=None, classes=None, close_mosaic=10, cls=0.5, conf=None, copy_paste=0.0, copy_paste_mode=flip, cos_lr=False, cutmix=0.0, data=dataset/data.yaml, degrees=0.0, deterministic=True, device=None, dfl=1.5, dnn=False, dropout=0.0, dynamic=False, embed=None, epochs=100, erasing=0.4, exist_ok=False, fliplr=0.5, flipud=0.0, format=torchscript, fraction=1.0, freeze=None, half=False, hsv_h=0.015, hsv_s=0.7, hsv_v=0.4, imgsz=640, int8=False, iou=0.7, keras=False, kobj=1.0, line_width=None, lr0=0.01, lr

In [6]:
!pip install pytesseract




In [4]:
import os
import sqlite3
from datetime import datetime
import cv2
import torch
from ultralytics import YOLO
import easyocr
import pytesseract
import numpy as np

# ---------------- CONFIG ----------------
CAM_ID = 0
DATA_DIR = "data"
PLATE_DIR = os.path.join(DATA_DIR, "plates")
DB_FILE = os.path.join(DATA_DIR, "anpr_logs.db")
MODEL_PATH = "best.pt"   # <-- your trained YOLOv8 model
MIN_PLATE_CONF = 0.50
DUPLICATE_SECONDS = 30
FRAME_SKIP = 2
SHOW_PREVIEW = True
ZONE = None

os.makedirs(PLATE_DIR, exist_ok=True)

# ---------- DATABASE ----------
def init_db():
    conn = sqlite3.connect(DB_FILE)
    c = conn.cursor()
    c.execute('''CREATE TABLE IF NOT EXISTS logs
                 (id INTEGER PRIMARY KEY,
                  plate TEXT,
                  vehicle_type TEXT,
                  confidence REAL,
                  timestamp TEXT,
                  camera_id TEXT,
                  image_path TEXT)''')
    conn.commit()
    return conn

def insert_log(conn, plate, vehicle_type, conf, ts, cam_id, img_path):
    c = conn.cursor()
    c.execute('INSERT INTO logs (plate, vehicle_type, confidence, timestamp, camera_id, image_path) VALUES (?, ?, ?, ?, ?, ?)',
              (plate, vehicle_type, conf, ts, cam_id, img_path))
    conn.commit()

# ---------- MODELS ----------
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print("Using device:", device)
det_model = YOLO(MODEL_PATH).to(device)

ocr = easyocr.Reader(['en'], gpu=torch.cuda.is_available())
recent_map = {}

# ---------- HELPERS ----------
def prep_plate_crop(img):
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    gray = cv2.resize(gray, (300, 100), interpolation=cv2.INTER_CUBIC)
    gray = cv2.bilateralFilter(gray, 11, 17, 17)
    clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8,8))
    gray = clahe.apply(gray)
    return cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR)

def run_ocr(img):
    results = ocr.readtext(img)
    if results:
        texts = [res[1] for res in results if res[2] > 0.6]
        if texts:
            clean_text = ''.join(ch for ch in texts[0] if ch.isalnum())
            return clean_text.upper()

    # fallback to Tesseract
    text = pytesseract.image_to_string(img, config="--psm 7")
    text = ''.join(ch for ch in text if ch.isalnum())
    return text.upper()

def stack_with_zoom(frame, plate_crop, plate_text):
    """Make side-by-side preview (frame + zoomed plate)."""
    zoomed = cv2.resize(plate_crop, (250, 100))
    cv2.putText(zoomed, plate_text, (10, 90),
                cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0,255,0), 2)
    h, w, _ = frame.shape
    canvas = np.zeros((max(h, zoomed.shape[0]), w + zoomed.shape[1], 3), dtype=np.uint8)
    canvas[:h, :w] = frame
    canvas[:zoomed.shape[0], w:w+zoomed.shape[1]] = zoomed
    return canvas

def process_frame(frame, conn, cam_id="VideoFile"):
    global recent_map
    img_roi = frame if ZONE is None else frame[ZONE[1]:ZONE[3], ZONE[0]:ZONE[2]]
    results = det_model.predict(source=img_roi, conf=MIN_PLATE_CONF, iou=0.3, verbose=False)

    display_frame = frame.copy()
    for r in results:
        for box, conf in zip(r.boxes.xyxy.cpu().numpy(), r.boxes.conf.cpu().numpy()):
            x1, y1, x2, y2 = map(int, box)
            plate_crop = frame[y1:y2, x1:x2]
            if plate_crop.size == 0: continue

            proc = prep_plate_crop(plate_crop)
            plate_text = run_ocr(proc)
            if not plate_text or len(plate_text) < 4:
                continue

            now = datetime.now()
            duplicate = False
            if plate_text in recent_map and (now - recent_map[plate_text]).total_seconds() < DUPLICATE_SECONDS:
                duplicate = True
            recent_map[plate_text] = now

            # Save image + log
            ts = now.strftime("%Y%m%d_%H%M%S")
            fname = f"{plate_text}_{ts}.jpg"
            fpath = os.path.join(PLATE_DIR, fname)
            cv2.imwrite(fpath, plate_crop)
            insert_log(conn, plate_text, "unknown", float(conf), now.isoformat(), str(cam_id), fpath)

            # Draw detection
            color = (0,255,0) if not duplicate else (0,255,255)
            cv2.rectangle(display_frame, (x1, y1), (x2, y2), color, 3)
            cv2.putText(display_frame, f"{plate_text} ({conf:.2f})", (x1, y1-10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)

            # Combine with zoomed-in plate
            display_frame = stack_with_zoom(display_frame, plate_crop, plate_text)

            print(f"[{'DUP' if duplicate else 'NEW'}] Plate: {plate_text} | Conf: {conf:.2f}")

    return display_frame

# ---------- VIDEO FUNCTIONS ----------
def run_on_video(video_path, save_output=False):
    conn = init_db()
    cap = cv2.VideoCapture(video_path)
    frame_idx = 0
    fps = cap.get(cv2.CAP_PROP_FPS) or 25

    if save_output:
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter("output.mp4", fourcc, fps,
                              (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))*2,  # width doubled (side-by-side)
                               int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))))

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret: break
        frame_idx += 1
        if frame_idx % FRAME_SKIP != 0: continue

        frame = process_frame(frame, conn, cam_id="VideoFile")

        # FPS counter
        cv2.putText(frame, f"FPS: {fps:.1f}", (20, 40),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.0, (255, 0, 0), 2)

        if SHOW_PREVIEW: cv2.imshow("ANPR System", frame)
        if save_output: out.write(frame)
        if cv2.waitKey(1) & 0xFF == ord('q'): break

    cap.release(); conn.close()
    if save_output: out.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    run_on_video("C:/Users/VICTUS/Downloads/text_v.mp4", save_output=True)


Using device: cuda
[NEW] Plate: GUNES | Conf: 0.65
[NEW] Plate: BNAI3NRU | Conf: 0.74
[NEW] Plate: OENRU | Conf: 0.74
[NEW] Plate: PANIANRUW | Conf: 0.69
[NEW] Plate: IBNAIANRU | Conf: 0.59
[NEW] Plate: TENAHRU | Conf: 0.64
[NEW] Plate: MJSSU | Conf: 0.53
[NEW] Plate: TIINA | Conf: 0.63
[NEW] Plate: UNERE | Conf: 0.71
[NEW] Plate: DMBSIVSU | Conf: 0.51
[NEW] Plate: EANAIZNRU | Conf: 0.63
[NEW] Plate: LANAIRU | Conf: 0.68
[NEW] Plate: MWSSU | Conf: 0.67
[NEW] Plate: BANADNRU | Conf: 0.67
[NEW] Plate: MNSVSU | Conf: 0.52
[NEW] Plate: BANAINRU | Conf: 0.60
[NEW] Plate: MWSISU | Conf: 0.82
[NEW] Plate: 08NAIDNRU | Conf: 0.64
[NEW] Plate: MWSIYSU | Conf: 0.74
[NEW] Plate: BMWSVSU | Conf: 0.75
[NEW] Plate: MWSVSU | Conf: 0.76
[NEW] Plate: MWSIVSU | Conf: 0.81
[NEW] Plate: TEKE | Conf: 0.78
[NEW] Plate: SMWSIVSU | Conf: 0.71
[DUP] Plate: MWSIVSU | Conf: 0.75
[DUP] Plate: SMWSIVSU | Conf: 0.75
[NEW] Plate: SMWSIWSU | Conf: 0.79
[DUP] Plate: SMWSIVSU | Conf: 0.77
[DUP] Plate: MWSSU | Conf: 0.78

In [43]:
"""
Optimized single-file ANPR pipeline (non-modular)
- YOLOv8 detection
- EasyOCR primary, Tesseract fallback
- Tracking + fast cleanup (bounding boxes removed quickly)
- Saves crops + logs to SQLite, writes output video
"""

import os, sqlite3, time, logging
from collections import deque, defaultdict
from datetime import datetime
import cv2, torch, numpy as np
from ultralytics import YOLO
import easyocr, pytesseract

# ================== CONFIG ==================
MODEL_PATH = "best.pt"
VIDEO_PATH = "C:/Users/VICTUS/Downloads/test2.mp4"  # or 0 for webcam
DATA_DIR = "data"
PLATE_DIR = os.path.join(DATA_DIR, "plates")
DB_FILE = os.path.join(DATA_DIR, "anpr_logs.db")

MIN_PLATE_CONF = 0.55
IMG_SIZE = 640
FRAME_SKIP = 3
DUP_PERIOD = 2
TRACK_IOU_THRESH = 0.35
VOTE_WINDOW = 6
VOTE_REQUIRED = 3
SAVE_CONF_THRESHOLD = 0.5
SHOW_PREVIEW = True
SAVE_PLATE_CROPS = True
LOG_TO_DB = True
USE_HALF = True
UPSCALE_THRESHOLD = 75

# cleanup times (short!)
FINALIZED_LINGER = 0.3      # seconds to keep green box after disappear
UNFINISHED_LINGER = 0.1     # seconds to keep yellow box after disappear

# ============================================
os.makedirs(PLATE_DIR, exist_ok=True)
logging.basicConfig(format="[%(levelname)s] %(asctime)s - %(message)s", level=logging.INFO)

# ---------- DB ----------
def init_db():
    if not LOG_TO_DB:
        return None
    conn = sqlite3.connect(DB_FILE)
    c = conn.cursor()
    c.execute("""CREATE TABLE IF NOT EXISTS logs (
        id INTEGER PRIMARY KEY,
        plate TEXT,
        confidence REAL,
        timestamp TEXT,
        camera_id TEXT,
        image_path TEXT
    )""")
    conn.commit()
    return conn

def insert_log(conn, plate, conf, ts, cam_id, img_path):
    if conn is None:
        return
    try:
        conn.execute('INSERT INTO logs (plate, confidence, timestamp, camera_id, image_path) VALUES (?,?,?,?,?)',
                     (plate, conf, ts, cam_id, img_path))
        conn.commit()
    except Exception as e:
        logging.error("DB ERROR: %s", e)

# ---------- models ----------
USE_CUDA = torch.cuda.is_available()
device = 'cuda' if USE_CUDA else 'cpu'
logging.info(f"Device: {device}")
det_model = YOLO(MODEL_PATH)
ocr_reader = easyocr.Reader(['en'], gpu=USE_CUDA)

# ---------- utilities ----------
COMMON_CONFUSIONS = {"O":"0","0":"O","I":"1","1":"I","B":"8","8":"B","S":"5","5":"S"}
def correct_text(s): return "".join(COMMON_CONFUSIONS.get(c,c) for c in s)
def tidy_text(s): return ''.join(ch for ch in s if ch.isalnum()).upper()

def iou_xyxy(a,b):
    ax1,ay1,ax2,ay2 = a; bx1,by1,bx2,by2 = b
    xi1, yi1 = max(ax1,bx1), max(ay1,by1)
    xi2, yi2 = min(ax2,bx2), min(ay2,by2)
    inter = max(0, xi2-xi1) * max(0, yi2-yi1)
    areaA = max(0,(ax2-ax1)) * max(0,(ay2-ay1))
    areaB = max(0,(bx2-bx1)) * max(0,(by2-by1))
    den = areaA + areaB - inter + 1e-6
    return inter/den

def normalize_bbox(b,w,h):
    x1,y1,x2,y2 = b
    return [max(0,int(x1)), max(0,int(y1)), min(w-1,int(x2)), min(h-1,int(y2))]

def ocr_candidates(img):
    out = []
    try:
        rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    except:
        rgb = img
    try:
        results = ocr_reader.readtext(rgb, detail=1)
        for bbox, text, conf in results:
            t = tidy_text(text)
            if t: out.append((correct_text(t), float(conf)))
    except: pass
    if not out:
        try:
            gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
            _, th = cv2.threshold(gray,0,255,cv2.THRESH_BINARY+cv2.THRESH_OTSU)
            txt = tidy_text(pytesseract.image_to_string(th, config="--psm 7"))
            if txt: out.append((correct_text(txt), 0.6))
        except: pass
    agg = defaultdict(float)
    for t,c in out: agg[t]+=float(c)
    items = [(t,agg[t]) for t in agg]
    items.sort(key=lambda x:x[1], reverse=True)
    return items

# ---------------- tracking ----------------
NEXT_TRACK_ID = 0
tracks = []
last_saved = {}

def create_track(bbox, conf, text, text_conf, crop, now_time):
    global NEXT_TRACK_ID
    t = {
        'id': NEXT_TRACK_ID,
        'bbox': bbox,
        'first_seen': now_time,
        'last_seen': now_time,
        'confs': deque([conf], maxlen=30),
        'texts': deque([(text, text_conf)], maxlen=20),
        'crops': deque([crop], maxlen=20),
        'frames': 1,
        'finalized': False
    }
    NEXT_TRACK_ID += 1
    return t

def match_track(bbox):
    best, best_iou = None, 0.0
    for t in tracks:
        i = iou_xyxy(bbox, t['bbox'])
        if i > best_iou: best_iou, best = i, t
    return best if best_iou >= TRACK_IOU_THRESH else None

def weighted_vote(pairs):
    weight = defaultdict(float)
    for t,c in pairs:
        if not t: continue
        weight[t]+=float(c)
    if not weight: return "",0.0
    return max(weight.items(), key=lambda x:x[1])

# --------------- main frame processing ---------------
def process_frame(frame, conn, cam_id="video"):
    global tracks, last_saved
    fh, fw = frame.shape[:2]
    results = det_model.predict(frame, conf=MIN_PLATE_CONF, imgsz=IMG_SIZE, device=device, verbose=False)
    dets=[]
    for r in results:
        xy=r.boxes.xyxy.cpu().numpy(); confs=r.boxes.conf.cpu().numpy()
        for (x1,y1,x2,y2), conf in zip(xy, confs):
            bb=normalize_bbox([x1,y1,x2,y2],fw,fh)
            dets.append((bb,float(conf)))
    now_time=time.time()
    for bb,conf in dets:
        x1,y1,x2,y2=bb
        if x2-x1<=8 or y2-y1<=8: continue
        crop=frame[y1:y2,x1:x2].copy()
        candidates=ocr_candidates(crop)
        text,tc=(candidates[0] if candidates else ("",0.0))
        matched=match_track(bb)
        if matched is None:
            t=create_track(bb,conf,text,tc,crop,now_time); tracks.append(t)
        else:
            matched['bbox']=[int(0.6*a+0.4*b) for a,b in zip(matched['bbox'],bb)]
            matched['last_seen']=now_time; matched['frames']+=1
            matched['confs'].append(conf); matched['crops'].append(crop)
            if candidates:
                for txt,cc in candidates: matched['texts'].append((txt,cc))
    disp=frame.copy()
    for t in list(tracks):
        age=time.time()-t['last_seen']
        # cleanup: remove quickly when disappeared
        if t['finalized'] and age>FINALIZED_LINGER:
            tracks.remove(t); continue
        if not t['finalized'] and age>UNFINISHED_LINGER:
            tracks.remove(t); continue
        if not t['finalized']:
            best_text,total=weighted_vote(list(t['texts']))
            if best_text and total>=VOTE_REQUIRED:
                ts=datetime.now().isoformat()
                fname=f"{best_text}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg"
                path=os.path.join(PLATE_DIR,fname) if SAVE_PLATE_CROPS else ""
                if SAVE_PLATE_CROPS: cv2.imwrite(path, t['crops'][-1])
                insert_log(conn,best_text,total,ts,cam_id,path)
                last_saved[best_text]=time.time()
                logging.info(f"[SAVED] {best_text} conf={total:.2f}")
                t['finalized']=True
        x1,y1,x2,y2=t['bbox']
        color=(0,255,0) if t['finalized'] else (0,165,255)
        cv2.rectangle(disp,(x1,y1),(x2,y2),color,2)
        top,_=weighted_vote(list(t['texts']))
        cv2.putText(disp, top if top else f"id:{t['id']}", (x1,max(12,y1-6)),
                    cv2.FONT_HERSHEY_SIMPLEX,0.7,color,2)
    return disp

# --------------- runner ----------------
def run_video(path=VIDEO_PATH, save_output=True, out_file="output_anpr.mp4"):
    conn=init_db()
    cap=cv2.VideoCapture(path if path is not None else 0)
    writer=None
    if save_output:
        fourcc=cv2.VideoWriter_fourcc(*'mp4v')
        fps=cap.get(cv2.CAP_PROP_FPS) or 25
        w,h=int(cap.get(3)),int(cap.get(4))
        writer=cv2.VideoWriter(out_file,fourcc,fps,(w,h))
    idx,start=0,time.time()
    while True:
        ret,frame=cap.read()
        if not ret: break
        idx+=1
        if idx%FRAME_SKIP: 
            if save_output and writer: writer.write(frame)
            continue
        out_frame=process_frame(frame,conn,os.path.basename(path))
        fps_est=idx/max(1e-3,(time.time()-start))
        cv2.putText(out_frame,f"FPS:{fps_est:.1f}",(8,30),
                    cv2.FONT_HERSHEY_SIMPLEX,1.0,(0,255,255),2)
        if SHOW_PREVIEW:
            cv2.imshow("ANPR (q to quit)",out_frame)
            if cv2.waitKey(1)&0xFF==ord('q'): break
        if save_output and writer:
            out_write=cv2.resize(out_frame,(w,h))
            writer.write(out_write)
    cap.release()
    if writer: writer.release()
    if conn: conn.close()
    cv2.destroyAllWindows()
    logging.info("Done.")

if __name__=="__main__":
    run_video()


[INFO] 2025-09-10 12:25:55,170 - Device: cuda
[INFO] 2025-09-10 12:26:01,757 - [SAVED] RAV4 conf=3.98
[INFO] 2025-09-10 12:26:05,634 - [SAVED] RAV4 conf=3.80
[INFO] 2025-09-10 12:26:10,249 - [SAVED] T0Y0TA conf=3.79
[INFO] 2025-09-10 12:26:13,942 - [SAVED] RAV4 conf=3.21
[INFO] 2025-09-10 12:26:31,295 - [SAVED] T0Y0TA conf=3.90
[INFO] 2025-09-10 12:26:33,006 - [SAVED] RAV4 conf=3.96
[INFO] 2025-09-10 12:26:33,670 - [SAVED] T0Y0TA conf=3.54
[INFO] 2025-09-10 12:26:38,192 - Done.
