In [1]:
# ── CELL 1: Install Dependencies ──────────────────────────────────────────────

!apt-get update -qq
!apt-get install -y -qq tesseract-ocr tesseract-ocr-eng \
    libgl1-mesa-glx libglib2.0-0 libsm6 libxrender1 libxext6

!pip install -q pytesseract
!pip install -q easyocr
!pip install -q transformers
!pip install -q torch torchvision
!pip install -q opencv-python-headless scikit-image Pillow imageio
!pip install -q fastapi "uvicorn[standard]" python-multipart nest-asyncio
!pip install -q pyngrok

print("✅ All dependencies installed")


W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.9/2.9 MB[0m [31m50.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m180.7/180.7 kB[0m [31m18.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m978.2/978.2 kB[0m [31m63.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m300.6/300.6 kB[0m [31m29.8 MB/s[0m eta [36m0:00:00[0m
[?25h✅ All dependencies installed


In [2]:
# ── CELL 2: Imports & GPU Check ────────────────────────────────────────────────

import cv2
import numpy as np
import time, os, json, threading, warnings
from pathlib import Path
from typing import List, Dict

from PIL import Image
import torch
import nest_asyncio
nest_asyncio.apply()
warnings.filterwarnings('ignore')

gpu_available = torch.cuda.is_available()
DEVICE        = "cuda" if gpu_available else "cpu"

print(f"✅ Imports done | device={DEVICE}")
if gpu_available:
    print(f"   GPU: {torch.cuda.get_device_name(0)}")
else:
    print("   ⚠️  No GPU — running on CPU")


✅ Imports done | device=cuda
   GPU: Tesla T4


In [3]:
# ── CELL 3: Image Preprocessing ────────────────────────────────────────────────

def detect_content_type(frame: np.ndarray) -> str:
    """Classify frame as 'board', 'slide', or 'standard' by brightness."""
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) if len(frame.shape) == 3 else frame
    brightness = np.mean(gray)
    if brightness < 80:
        return "board"
    elif brightness > 190:
        return "slide"
    return "standard"


def preprocess_image(image: np.ndarray, mode: str = "standard") -> np.ndarray:
    """Enhance image for better OCR accuracy."""
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) if len(image.shape) == 3 else image.copy()

    if mode == "board":
        clahe = cv2.createCLAHE(clipLimit=3.0, tileGridSize=(8, 8))
        gray  = clahe.apply(gray)
        gray  = cv2.GaussianBlur(gray, (3, 3), 0)
        _, processed = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)

    elif mode == "slide":
        kernel = np.array([[0,-1,0],[-1,5,-1],[0,-1,0]])
        gray   = cv2.filter2D(gray, -1, kernel)
        _, processed = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)

    else:
        gray = cv2.GaussianBlur(gray, (3, 3), 0)
        processed = cv2.adaptiveThreshold(
            gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
            cv2.THRESH_BINARY, 11, 2)

    h, w = processed.shape
    if w < 640:
        scale     = 640 / w
        processed = cv2.resize(processed, None, fx=scale, fy=scale,
                               interpolation=cv2.INTER_CUBIC)
    return processed


print("✅ Preprocessing utilities ready")


✅ Preprocessing utilities ready


In [4]:
# ── CELL 4: Frame Extraction ───────────────────────────────────────────────────

def _phash(frame: np.ndarray) -> str:
    small = cv2.resize(frame, (16, 16))
    gray  = cv2.cvtColor(small, cv2.COLOR_BGR2GRAY) if len(small.shape) == 3 else small
    mean  = gray.mean()
    return ''.join('1' if p > mean else '0' for p in gray.flatten())


def _hamming(a: str, b: str) -> int:
    return sum(x != y for x, y in zip(a, b))


def extract_frames(video_path: str,
                   sample_interval: float = 2.0,
                   similarity_threshold: int = 8) -> List[Dict]:
    """
    Extract visually unique frames from a video.
    Returns list of dicts: {frame, frame_number, timestamp}
    """
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        raise ValueError(f"Cannot open video: {video_path}")

    fps        = cap.get(cv2.CAP_PROP_FPS) or 25
    total      = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    duration   = total / fps
    frame_step = max(1, int(fps * sample_interval))

    print(f"📹 FPS:{fps:.1f} | Frames:{total} | Duration:{duration:.1f}s")
    print(f"   Sampling every {sample_interval}s (~{total//frame_step} candidates)")

    results, seen_hashes, idx = [], [], 0

    while True:
        ret, frame = cap.read()
        if not ret:
            break
        if idx % frame_step == 0:
            ph     = _phash(frame)
            is_dup = any(_hamming(ph, h) < similarity_threshold for h in seen_hashes)
            if not is_dup:
                seen_hashes.append(ph)
                results.append({
                    'frame':        frame,
                    'frame_number': idx,
                    'timestamp':    round(idx / fps, 2)
                })
        idx += 1

    cap.release()
    print(f"   ✅ Unique frames extracted: {len(results)}")
    return results


print("✅ Frame extractor ready")


✅ Frame extractor ready


In [5]:
# ── CELL 5: Tesseract OCR ──────────────────────────────────────────────────────

import pytesseract

def ocr_tesseract(image: np.ndarray) -> Dict:
    mode      = detect_content_type(image)
    processed = preprocess_image(image, mode=mode)
    pil_img   = Image.fromarray(processed)

    data  = pytesseract.image_to_data(
                pil_img, config="--oem 3 --psm 6",
                output_type=pytesseract.Output.DICT)

    words, confs = [], []
    for i, conf in enumerate(data['conf']):
        c = int(conf)
        w = data['text'][i].strip()
        if c > 30 and w:
            words.append(w)
            confs.append(c)

    text = ' '.join(words).strip()
    return {
        'text':       text,
        'confidence': round(np.mean(confs) / 100.0, 3) if confs else 0.0,
        'word_count': len(words)
    }


print("✅ Tesseract ready")


✅ Tesseract ready


In [6]:
# ── CELL 6: EasyOCR ────────────────────────────────────────────────────────────

import easyocr

print("⏳ Loading EasyOCR (~100MB on first run) ...")
_easy_reader = easyocr.Reader(['en'], gpu=gpu_available, verbose=False)
print(f"✅ EasyOCR ready | GPU={gpu_available}")


def ocr_easyocr(image: np.ndarray) -> Dict:
    mode = detect_content_type(image)
    img  = preprocess_image(image, mode="board") if mode == "board" else image

    results      = _easy_reader.readtext(img, detail=1, paragraph=False)
    texts, confs = [], []
    for (_, text, conf) in results:
        if conf > 0.3 and text.strip():
            texts.append(text.strip())
            confs.append(conf)

    return {
        'text':       ' '.join(texts).strip(),
        'confidence': round(float(np.mean(confs)), 3) if confs else 0.0,
        'word_count': len(texts)
    }




⏳ Loading EasyOCR (~100MB on first run) ...




✅ EasyOCR ready | GPU=True


In [7]:
# ── CELL 7: TrOCR (Microsoft Transformer OCR) ─────────────────────────────────

from transformers import TrOCRProcessor, VisionEncoderDecoderModel

print("⏳ Loading TrOCR (~400MB on first run) ...")
_trocr_processor = TrOCRProcessor.from_pretrained("microsoft/trocr-base-printed")
_trocr_model     = VisionEncoderDecoderModel.from_pretrained(
                       "microsoft/trocr-base-printed").to(DEVICE)
_trocr_model.eval()
print(f"✅ TrOCR ready | device={DEVICE}")


def ocr_trocr(image: np.ndarray) -> Dict:
    if len(image.shape) == 3:
        pil_img = Image.fromarray(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
    else:
        pil_img = Image.fromarray(image).convert("RGB")

    w, h         = pil_img.size
    strip_height = max(60, h // max(1, h // 80))
    all_texts    = []

    with torch.no_grad():
        for y in range(0, h, strip_height):
            strip = pil_img.crop((0, y, w, min(y + strip_height, h)))
            if strip.size[1] < 10:
                continue
            try:
                pixel_values  = _trocr_processor(
                                    images=strip,
                                    return_tensors="pt"
                                ).pixel_values.to(DEVICE)
                generated_ids = _trocr_model.generate(pixel_values, max_new_tokens=64)
                text          = _trocr_processor.batch_decode(
                                    generated_ids, skip_special_tokens=True
                                )[0].strip()
                if text:
                    all_texts.append(text)
            except Exception:
                continue

    combined   = ' '.join(all_texts).strip()
    confidence = min(0.95, 0.6 + len(combined) * 0.002) if combined else 0.0

    return {
        'text':       combined,
        'confidence': round(confidence, 3),
        'word_count': len(combined.split()) if combined else 0
    }


⏳ Loading TrOCR (~400MB on first run) ...


preprocessor_config.json:   0%|          | 0.00/224 [00:00<?, ?B/s]

The image processor of type `ViTImageProcessor` is now loaded as a fast processor by default, even if the model checkpoint was saved with a slow processor. This is a breaking change and may produce slightly different outputs. To continue using the slow processor, instantiate this class with `use_fast=False`. 


config.json: 0.00B [00:00, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/772 [00:00<?, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

model.safetensors:   0%|          | 0.00/1.33G [00:00<?, ?B/s]

Loading weights:   0%|          | 0/478 [00:00<?, ?it/s]

VisionEncoderDecoderModel LOAD REPORT from: microsoft/trocr-base-printed
Key                         | Status  | 
----------------------------+---------+-
encoder.pooler.dense.weight | MISSING | 
encoder.pooler.dense.bias   | MISSING | 

Notes:
- MISSING	:those params were newly initialized because missing from the checkpoint. Consider training on your downstream task.


generation_config.json:   0%|          | 0.00/190 [00:00<?, ?B/s]

✅ TrOCR ready | device=cuda


In [8]:
# ── CELL 8: Full Video Pipeline ────────────────────────────────────────────────

# ▶ Change to whichever model you decide is best after testing
BEST_MODEL = "easyocr"   # options: "tesseract" | "easyocr" | "trocr"

_MODEL_MAP = {
    'tesseract': ocr_tesseract,
    'easyocr':   ocr_easyocr,
    'trocr':     ocr_trocr,
}


def deduplicate_text(texts: List[str]) -> str:
    """Combine text from all frames, removing duplicate lines."""
    seen, lines = set(), []
    for block in texts:
        for line in block.splitlines():
            norm = ' '.join(line.lower().split())
            if norm and norm not in seen:
                seen.add(norm)
                lines.append(line.strip())
    return '\n'.join(lines)


def process_video_full(video_path: str,
                       ocr_model: str = "easyocr",
                       sample_interval: float = 2.0) -> Dict:
    """
    Full end-to-end pipeline.
    Returns API-compliant JSON dict per OCR_REQUIREMENTS.md:
    {
        text                   : str   — all deduplicated text combined
        frames                 : list  — per-frame results with timestamps
        summary                : str   — brief processing summary
        total_frames_processed : int
    }
    """
    print(f"\n🚀 START | model={ocr_model.upper()} | interval={sample_interval}s")
    t0     = time.time()
    ocr_fn = _MODEL_MAP.get(ocr_model.lower(), ocr_easyocr)
    frames = extract_frames(video_path, sample_interval=sample_interval)

    frame_results, all_texts = [], []

    print(f"\n🔤 Running OCR on {len(frames)} frames ...")
    for i, fd in enumerate(frames):
        try:
            res  = ocr_fn(fd['frame'])
            text = res['text']
            conf = res['confidence']
            flag = f"✓ {len(text)} chars | conf={conf:.2f}" if text.strip() else "— empty"
        except Exception as e:
            text, conf, flag = '', 0.0, f"❌ {e}"

        print(f"  [{i+1:>3}/{len(frames)}] t={fd['timestamp']:>6.1f}s  {flag}")

        if text.strip():
            all_texts.append(text)
            frame_results.append({
                'frame_number': fd['frame_number'],
                'timestamp':    fd['timestamp'],
                'text':         text,
                'confidence':   conf,
            })

    combined = deduplicate_text(all_texts)
    elapsed  = round(time.time() - t0, 2)

    print(f"\n✅ Done in {elapsed}s | frames with text: {len(frame_results)}/{len(frames)}")
    print(f"   Total characters: {len(combined)}")

    return {
        "text":                   combined,
        "frames":                 frame_results,
        "summary":                (f"Processed {len(frames)} frames using {ocr_model}. "
                                   f"Text found in {len(frame_results)} frames in {elapsed}s."),
        "total_frames_processed": len(frames),
    }


print("✅ Pipeline ready | BEST_MODEL =", BEST_MODEL)


✅ Pipeline ready | BEST_MODEL = easyocr


In [9]:
# ── CELL 9: FastAPI Server + ngrok ─────────────────────────────────────────────

# ▶▶▶ PASTE YOUR NGROK TOKEN HERE
# Get it free: https://dashboard.ngrok.com/get-started/your-authtoken
NGROK_TOKEN = "39fOQyszqBRHKsKFpJNeutSzP3W_31MjQVCW8xqnyitaQNJmZ"

# ──────────────────────────────────────────────────────────────────────────────

from fastapi import FastAPI, File, UploadFile, Form, HTTPException
from fastapi.responses import JSONResponse
import uvicorn
from pyngrok import ngrok

ALLOWED_EXT = {'.mp4', '.avi', '.mov', '.mkv', '.webm'}

app = FastAPI(title="Lecture OCR Service", version="1.0.0")


@app.get("/")
async def root():
    return {"status": "running", "active_model": BEST_MODEL}


@app.get("/health")
async def health():
    return {"status": "healthy", "model": BEST_MODEL, "gpu": gpu_available}


@app.post("/process")
async def process_video_endpoint(
    video:  UploadFile = File(...),
    job_id: str        = Form(...),
):
    ext = Path(video.filename).suffix.lower()
    if ext not in ALLOWED_EXT:
        raise HTTPException(
            status_code=400,
            detail=f"Unsupported format '{ext}'. Allowed: {sorted(ALLOWED_EXT)}"
        )

    tmp_path = f"/tmp/ocr_{job_id}{ext}"
    try:
        content = await video.read()
        with open(tmp_path, 'wb') as f:
            f.write(content)
        print(f"📥 job_id={job_id} | {len(content)/1024/1024:.1f} MB → {tmp_path}")

        result = process_video_full(
            video_path      = tmp_path,
            ocr_model       = BEST_MODEL,
            sample_interval = 2.0
        )
        return JSONResponse(content=result, status_code=200)

    except Exception as e:
        import traceback
        traceback.print_exc()
        return JSONResponse(content={"error": str(e)}, status_code=500)

    finally:
        if os.path.exists(tmp_path):
            os.remove(tmp_path)
            print(f"🗑  Cleaned up {tmp_path}")


# ── Launch server ──────────────────────────────────────────────────────────────
def _run_server():
    uvicorn.run(app, host="0.0.0.0", port=8000, log_level="warning")

ngrok.set_auth_token(NGROK_TOKEN)
ngrok.kill()  # close any stale tunnels

server_thread = threading.Thread(target=_run_server, daemon=True)
server_thread.start()
time.sleep(3)  # wait for uvicorn to start

tunnel     = ngrok.connect(8000, proto="http")
PUBLIC_URL = tunnel.public_url

print("=" * 60)
print("🌍  OCR SERVICE IS LIVE")
print(f"   Health   : {PUBLIC_URL}/health")
print(f"   Endpoint : {PUBLIC_URL}/process")
print("=" * 60)
print(f"\n📋  Add to your backend .env:")
print(f"    OCR_SERVICE_URL={PUBLIC_URL}")




Exception in thread Thread-5 (_run_server):
Traceback (most recent call last):
  File "/usr/lib/python3.12/threading.py", line 1075, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.12/threading.py", line 1012, in run
    self._target(*self._args, **self._kwargs)
  File "/tmp/ipython-input-2837272286.py", line 68, in _run_server
  File "/usr/local/lib/python3.12/dist-packages/uvicorn/main.py", line 606, in run
    server.run()
  File "/usr/local/lib/python3.12/dist-packages/uvicorn/server.py", line 75, in run
    return asyncio_run(self.serve(sockets=sockets), loop_factory=self.config.get_loop_factory())
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: _patch_asyncio.<locals>.run() got an unexpected keyword argument 'loop_factory'


🌍  OCR SERVICE IS LIVE
   Health   : https://theosophic-francie-fellowly.ngrok-free.dev/health
   Endpoint : https://theosophic-francie-fellowly.ngrok-free.dev/process

📋  Add to your backend .env:
    OCR_SERVICE_URL=https://theosophic-francie-fellowly.ngrok-free.dev
