# D&D Session IC/OOC Classification Worker

This notebook runs on Google Colab to provide GPU-accelerated classification for the VideoChunking pipeline.

## Setup Instructions

1. **Open in Colab**: `File -> Open in Colab` or upload to Google Drive
2. **Enable GPU**: `Runtime -> Change runtime type -> Hardware accelerator -> GPU (T4)`
3. **Locate Google Drive**: Run Cell 1
4. **Install Dependencies**: Run Cell 2
5. **Load Model**: Run Cell 3 (this may take a few minutes)
6. **Start Worker**: Run Cell 4 - this will continuously process jobs

## How It Works

- Your local pipeline uploads classification jobs to `VideoChunking/classification_pending/` in Google Drive
- This notebook watches that folder and processes jobs using a local LLM
- Results are written to `VideoChunking/classification_complete/`
- Your local pipeline polls for results and continues

Keep this notebook running while processing sessions!


In [None]:
# Cell 1: Locate Google Drive (Works locally AND in Colab)
import os
import sys

# Detect if we're ACTUALLY in Colab (not just VS Code Colab integration)
# Real Colab has /content directory AND is on Linux
IN_COLAB = False

try:
    from google.colab import drive
    # Real Colab characteristics:
    # - Has /content directory
    # - Platform is Linux (not Windows/nt)
    # - sys.platform is 'linux'
    if os.path.exists("/content") and sys.platform.startswith("linux"):
        IN_COLAB = True
        print("[INFO] Detected real Google Colab environment")
    else:
        # VS Code Colab integration on Windows - treat as local
        drive = None
        print(f"[INFO] VS Code Colab integration detected (platform: {sys.platform})")
except ImportError:
    drive = None

drive_root = None

if IN_COLAB:
    # Running in actual Google Colab - mount Drive
    print("[INFO] Mounting Google Drive in Colab...")
    drive.mount("/content/drive")
    drive_root = "/content/drive/MyDrive"
else:
    # Running locally (or VS Code Colab integration) - find Google Drive Desktop sync folder
    print("[INFO] Running locally - looking for Google Drive Desktop sync folder")
    
    # Use raw string paths to avoid PosixPath issues in WSL/containers
    drive_root_candidates = [
        r"G:\My Drive",
        os.path.join(os.path.expanduser("~"), "My Drive"),
        os.path.join(os.path.expanduser("~"), "Google Drive"),
    ]
    
    # Find first existing path
    for candidate in drive_root_candidates:
        if os.path.exists(candidate):
            drive_root = candidate
            print(f"[OK] Found Google Drive at: {drive_root}")
            break
    
    if drive_root is None:
        raise RuntimeError(
            "Could not find Google Drive sync folder. "
            "Make sure Google Drive Desktop is installed and syncing. "
            f"Checked: {drive_root_candidates}"
        )

# Use string paths with os.path.join (more reliable than pathlib)
pending_dir = os.path.join(drive_root, "VideoChunking", "classification_pending")
complete_dir = os.path.join(drive_root, "VideoChunking", "classification_complete")

# Create directories if they don't exist
os.makedirs(pending_dir, exist_ok=True)
os.makedirs(complete_dir, exist_ok=True)

print(f"[OK] Pending jobs: {pending_dir}")
print(f"[OK] Completed jobs: {complete_dir}")

# Verify paths are string types (not PosixPath/WindowsPath)
assert isinstance(pending_dir, str), f"pending_dir should be str, got {type(pending_dir)}"
assert isinstance(complete_dir, str), f"complete_dir should be str, got {type(complete_dir)}"
print(f"[OK] Path types verified: str")

In [None]:
# Cell 2: Install Dependencies
!pip install -q transformers torch accelerate bitsandbytes

print("[OK] Dependencies installed")

In [None]:
# Cell 3: Load LLM Model (Local version - no quantization needed)
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM

# Model selection - Using a smaller model that works locally without quantization
# Options: Qwen/Qwen2.5-3B-Instruct, Qwen/Qwen2.5-1.5B-Instruct (even smaller)
MODEL_NAME = "Qwen/Qwen2.5-1.5B-Instruct"  # Smaller for local CPU/GPU

print(f"Loading model: {MODEL_NAME}")
print("This may take 2-5 minutes on first run...")

# Load tokenizer
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, trust_remote_code=True)
tokenizer.pad_token = tokenizer.eos_token

# Load model (will auto-detect CUDA if available)
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")

model = AutoModelForCausalLM.from_pretrained(
    MODEL_NAME,
    torch_dtype=torch.float16 if device == "cuda" else torch.float32,
    device_map="auto" if device == "cuda" else None,
    trust_remote_code=True,
    low_cpu_mem_usage=True
)

if device == "cpu":
    model = model.to(device)

# Print memory usage
if torch.cuda.is_available():
    print(f"[OK] Model loaded on GPU")
    print(f"[INFO] GPU memory: {torch.cuda.memory_allocated() / 1024**3:.2f} GB")
else:
    print(f"[OK] Model loaded on CPU")
    print(f"[WARN] Running on CPU will be slower but will work fine for classification")

In [None]:
# Cell 4: Classification Functions
import json
import re
from pathlib import Path
from typing import List, Dict

def classify_segment(prompt: str, max_length: int = 512) -> str:
    """
    Classify a single segment using the loaded model.
    
    Args:
        prompt: Classification prompt
        max_length: Maximum tokens for generation
    
    Returns:
        Model response text
    """
    # Tokenize input
    inputs = tokenizer(
        prompt,
        return_tensors="pt",
        truncation=True,
        max_length=max_length
    ).to(model.device)
    
    # Generate response
    with torch.no_grad():
        outputs = model.generate(
            **inputs,
            max_new_tokens=150,
            temperature=0.7,
            top_p=0.9,
            do_sample=True,
            pad_token_id=tokenizer.eos_token_id
        )
    
    # Decode response
    response = tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True)
    return response.strip()


def build_prompt(segment_data: Dict, job_data: Dict) -> str:
    """
    Build classification prompt from segment and job data.
    
    Args:
        segment_data: Current segment with text
        job_data: Job containing character_names, player_names, prompt_template
    
    Returns:
        Formatted prompt string
    """
    segments = job_data['segments']
    idx = segment_data['index']
    
    prev_text = segments[idx-1]['text'] if idx > 0 else ""
    current_text = segment_data['text']
    next_text = segments[idx+1]['text'] if idx < len(segments) - 1 else ""
    
    char_list = ", ".join(job_data['character_names']) if job_data['character_names'] else "Unknown"
    player_list = ", ".join(job_data['player_names']) if job_data['player_names'] else "Unknown"
    
    return job_data['prompt_template'].format(
        char_list=char_list,
        player_list=player_list,
        prev_text=prev_text,
        current_text=current_text,
        next_text=next_text
    )


def parse_classification_response(response: str, index: int) -> Dict:
    """
    Parse model response into classification result.
    
    Expected format:
    Classificatie: IC|OOC|MIXED
    Reden: <reasoning>
    Vertrouwen: <0.0-1.0>
    Personage: <name or N/A>
    
    Args:
        response: Model response text
        index: Segment index
    
    Returns:
        Classification result dictionary
    """
    # Default values
    classification = "IC"
    confidence = 0.7
    reasoning = "Could not parse response"
    character = None
    
    # Extract classification
    class_match = re.search(r'Classificatie:\s*(\w+)', response, re.IGNORECASE)
    if class_match:
        classification = class_match.group(1).strip().upper()
    
    # Extract reasoning
    reden_match = re.search(
        r'Reden:\s*(.+?)(?=(?:Vertrouwen:|Personage:|$))',
        response,
        re.DOTALL | re.IGNORECASE
    )
    if reden_match:
        reasoning = reden_match.group(1).strip()
    
    # Extract confidence
    conf_match = re.search(r'Vertrouwen:\s*([\d.]+)', response, re.IGNORECASE)
    if conf_match:
        try:
            confidence = float(conf_match.group(1).strip())
            confidence = max(0.0, min(1.0, confidence))  # Clamp to [0, 1]
        except ValueError:
            pass
    
    # Extract character
    char_match = re.search(r'Personage:\s*(.+?)(?:\n|$)', response, re.IGNORECASE)
    if char_match:
        char_text = char_match.group(1).strip()
        if char_text.upper() != "N/A":
            character = char_text
    
    return {
        "segment_index": index,
        "classification": classification,
        "confidence": confidence,
        "reasoning": reasoning,
        "character": character
    }


def process_job(job_file: Path) -> None:
    """
    Process a single classification job.
    
    Args:
        job_file: Path to job JSON file
    """
    print(f"\n{'='*60}")
    print(f"Processing: {job_file.name}")
    
    # Load job data
    with open(job_file, 'r', encoding='utf-8') as f:
        job_data = json.load(f)
    
    job_id = job_data['job_id']
    segments = job_data['segments']
    
    print(f"Job ID: {job_id}")
    print(f"Segments to classify: {len(segments)}")
    
    # Classify each segment
    classifications = []
    for i, segment in enumerate(segments):
        segment_with_index = {**segment, 'index': i}
        prompt = build_prompt(segment_with_index, job_data)
        
        # Get classification from model
        response = classify_segment(prompt)
        result = parse_classification_response(response, i)
        
        classifications.append(result)
        
        # Progress indicator
        if (i + 1) % 10 == 0 or (i + 1) == len(segments):
            print(f"  Progress: {i+1}/{len(segments)} segments classified")
    
    # Write results
    result_file = Path(complete_dir) / f"{job_id}_result.json"
    result_data = {
        "job_id": job_id,
        "classifications": classifications
    }
    
    with open(result_file, 'w', encoding='utf-8') as f:
        json.dump(result_data, f, indent=2, ensure_ascii=False)
    
    print(f"[OK] Results written: {result_file.name}")
    print(f"{'='*60}\n")


print("[OK] Classification functions ready")

In [None]:
# Cell 5: Start Classification Worker (Fixed for Google Drive)
import time
import os
from datetime import datetime
from pathlib import Path
import signal
import sys

print("[START] Classification worker")
print("[INFO] Watching:", pending_dir)
print("[INFO] Results to:", complete_dir)

# EXPLICIT DEBUG
print("\n[DEBUG] Testing file detection:")
print(f"  pending_dir type: {type(pending_dir)}")
print(f"  pending_dir value: {pending_dir}")
print(f"  str(pending_dir): {str(pending_dir)}")
try:
    test_list = os.listdir(str(pending_dir))
    print(f"  os.listdir() returned: {test_list}")
    test_filtered = [f for f in test_list if f.startswith('job_') and f.endswith('.json')]
    print(f"  Filtered job files: {test_filtered}")
except Exception as e:
    print(f"  os.listdir() ERROR: {e}")

print("\nPress Ctrl+C (or interrupt kernel) to stop\n")
print("=" * 60)

# IMPORTANT: Reset processed jobs on each run to pick up existing files
processed_jobs = set()
stop_requested = False

# Check for existing job files using os.listdir (works better with Google Drive)
def find_job_files(directory):
    """Find job files using os.listdir (more reliable with Google Drive)"""
    try:
        dir_str = str(directory)
        all_files = os.listdir(dir_str)
        job_files = [f for f in all_files if f.startswith('job_') and f.endswith('.json')]
        return [Path(directory) / f for f in job_files]
    except Exception as e:
        print(f"[ERROR] Could not list directory {directory}: {e}")
        import traceback
        traceback.print_exc()
        return []

initial_files = find_job_files(pending_dir)
if initial_files:
    print(f"[INFO] Found {len(initial_files)} existing job(s) to process")
    for f in initial_files:
        print(f"  - {f.name} ({f.stat().st_size / 1024 / 1024:.1f} MB)")
else:
    print("[INFO] No existing jobs found, will wait for new ones")

print()

def signal_handler(sig, frame):
    global stop_requested
    stop_requested = True
    print("\n[STOP] Stop requested, finishing current job...")

# Register signal handler
signal.signal(signal.SIGINT, signal_handler)

try:
    while not stop_requested:
        job_files = find_job_files(pending_dir)
        new_jobs = [f for f in job_files if f.name not in processed_jobs]

        if new_jobs:
            print(f"[{datetime.now():%H:%M:%S}] Processing {len(new_jobs)} job(s)")
            for job_file in new_jobs:
                if stop_requested:
                    print("[INFO] Skipping remaining jobs due to stop request")
                    break
                try:
                    process_job(job_file)
                    processed_jobs.add(job_file.name)
                    job_file.unlink()
                except Exception as exc:
                    print(f"[ERROR] Failed on {job_file.name}: {exc}")
                    import traceback
                    traceback.print_exc()
        else:
            # Only print waiting message occasionally to reduce spam
            if len(processed_jobs) == 0 or int(time.time()) % 30 == 0:
                print(f"[{datetime.now():%H:%M:%S}] No pending jobs, waiting...")

        # Sleep in smaller intervals to check stop_requested more frequently
        for _ in range(10):
            if stop_requested:
                break
            time.sleep(0.5)

except Exception as e:
    print(f"\n[ERROR] Worker crashed: {e}")
    import traceback
    traceback.print_exc()
finally:
    print("\n[STOP] Worker stopped")
    print(f"[INFO] Processed {len(processed_jobs)} jobs total")