# Road Buddy - Traffic Video Question Answering

This notebook runs the inference pipeline for traffic video question answering using the R-4B model.

## Setup Instructions:
1. Make sure GPU is enabled (Runtime → Change runtime type → GPU)
2. Run all cells in order
3. Upload your test data when prompted
4. Download the results at the end

## 1. Check GPU Availability

In [None]:
import torch
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"GPU: {torch.cuda.get_device_name(0)}")
    print(f"Memory: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.2f} GB")
else:
    print("⚠️ GPU not available. Please enable GPU: Runtime → Change runtime type → GPU")

## 2. Install Dependencies

In [None]:
!pip install -q torch>=2.0.0 transformers>=4.40.0 accelerate>=0.20.0 \
    pillow>=10.0.0 opencv-python>=4.8.0 pandas>=2.0.0 tqdm>=4.65.0 numpy>=1.24.0

print("✓ Dependencies installed successfully!")

## 3. Upload Project Files

**Option A: Upload from GitHub (if you have a repo)**

In [None]:
# Uncomment and modify if you want to clone from GitHub
# !git clone https://github.com/YOUR_USERNAME/road_buddy.git
# %cd road_buddy

**Option B: Create project structure and upload files manually**

In [None]:
import os
from pathlib import Path

# Create project structure
!mkdir -p src
!mkdir -p data/traffic_buddy_train+public_test/public_test/videos
!mkdir -p output

print("✓ Project structure created!")

## 4. Create Source Files

In [None]:
# Create __init__.py
%%writefile src/__init__.py
"""Road Buddy package."""
__version__ = "0.1.0"

In [None]:
# Create config.py
%%writefile src/config.py
"""Configuration settings for the inference pipeline."""

import os
from pathlib import Path

# Project paths
PROJECT_ROOT = Path("/content")
DATA_DIR = PROJECT_ROOT / "data" / "traffic_buddy_train+public_test"
PUBLIC_TEST_DIR = DATA_DIR / "public_test"
PUBLIC_TEST_JSON = PUBLIC_TEST_DIR / "public_test.json"
VIDEOS_DIR = PUBLIC_TEST_DIR / "videos"
OUTPUT_DIR = PROJECT_ROOT / "output"
SUBMISSION_FILE = OUTPUT_DIR / "submission.csv"

# Model configuration
MODEL_NAME = "YannQi/R-4B"
DEVICE = "cuda"  # or "cpu"
TRUST_REMOTE_CODE = True

# Video processing configuration
FRAME_SAMPLE_RATE = 1.0  # Extract 1 frame per second
MAX_FRAMES_PER_VIDEO = 20  # Maximum number of frames to extract
USE_MID_FRAME_ONLY = False  # If True, only extract the middle frame

# Inference configuration
THINKING_MODE = "auto"  # "auto", "explicit", or "non-thinking"
MAX_NEW_TOKENS = 512
TEMPERATURE = 0.1  # Low temperature for more deterministic outputs
DO_SAMPLE = False  # Use greedy decoding for consistent answers

# Batch processing
BATCH_SIZE = 1  # Process one question at a time (model handles one image-question pair)
CACHE_FRAMES = True  # Cache extracted frames when multiple questions use same video

# Prompt template
PROMPT_TEMPLATE = """Dựa vào video này, hãy trả lời câu hỏi sau:

Câu hỏi: {question}

Các lựa chọn:
{choices}

Hãy chọn đáp án đúng (chỉ trả lời A, B, C, hoặc D)."""

# Answer parsing
VALID_ANSWERS = ["A", "B", "C", "D"]

In [None]:
# Create video_processor.py
%%writefile src/video_processor.py
"""Video processing utilities for extracting frames."""

import cv2
import numpy as np
from pathlib import Path
from typing import List, Optional
from PIL import Image

from src import config


class VideoProcessor:
    """Handles video frame extraction and preprocessing."""

    def __init__(
        self,
        frame_sample_rate: float = config.FRAME_SAMPLE_RATE,
        max_frames: int = config.MAX_FRAMES_PER_VIDEO,
        use_mid_frame_only: bool = config.USE_MID_FRAME_ONLY,
    ):
        """
        Initialize video processor.

        Args:
            frame_sample_rate: Frames to extract per second
            max_frames: Maximum number of frames to extract
            use_mid_frame_only: If True, only extract the middle frame
        """
        self.frame_sample_rate = frame_sample_rate
        self.max_frames = max_frames
        self.use_mid_frame_only = use_mid_frame_only
        self._frame_cache = {}

    def extract_frames(self, video_path: Path) -> List[Image.Image]:
        """
        Extract frames from a video file.

        Args:
            video_path: Path to the video file

        Returns:
            List of PIL Images extracted from the video
        """
        # Check cache if enabled
        if config.CACHE_FRAMES and str(video_path) in self._frame_cache:
            return self._frame_cache[str(video_path)]

        if not video_path.exists():
            raise FileNotFoundError(f"Video file not found: {video_path}")

        # Open video
        cap = cv2.VideoCapture(str(video_path))
        if not cap.isOpened():
            raise ValueError(f"Failed to open video: {video_path}")

        try:
            # Get video properties
            fps = cap.get(cv2.CAP_PROP_FPS)
            total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
            duration = total_frames / fps if fps > 0 else 0

            frames = []

            if self.use_mid_frame_only:
                # Extract only the middle frame
                mid_frame_idx = total_frames // 2
                cap.set(cv2.CAP_PROP_POS_FRAMES, mid_frame_idx)
                ret, frame = cap.read()
                if ret:
                    frames.append(self._convert_frame(frame))
            else:
                # Calculate frame sampling interval
                frame_interval = int(fps / self.frame_sample_rate) if self.frame_sample_rate > 0 else 1
                frame_interval = max(1, frame_interval)

                frame_idx = 0
                while len(frames) < self.max_frames:
                    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)
                    ret, frame = cap.read()

                    if not ret:
                        break

                    frames.append(self._convert_frame(frame))
                    frame_idx += frame_interval

                    if frame_idx >= total_frames:
                        break

            # Cache frames if enabled
            if config.CACHE_FRAMES:
                self._frame_cache[str(video_path)] = frames

            return frames

        finally:
            cap.release()

    def _convert_frame(self, frame: np.ndarray) -> Image.Image:
        """
        Convert OpenCV frame (BGR) to PIL Image (RGB).

        Args:
            frame: OpenCV frame in BGR format

        Returns:
            PIL Image in RGB format
        """
        # Convert BGR to RGB
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        return Image.fromarray(frame_rgb)

    def clear_cache(self):
        """Clear the frame cache."""
        self._frame_cache.clear()

    def get_cache_size(self) -> int:
        """Get the number of cached videos."""
        return len(self._frame_cache)

In [None]:
# Create inference.py
%%writefile src/inference.py
"""Main inference pipeline for traffic video question answering."""

import json
import re
from pathlib import Path
from typing import Dict, List, Optional
import torch
from tqdm import tqdm
import pandas as pd
from transformers import AutoModel, AutoProcessor

from src import config
from src.video_processor import VideoProcessor


class R4BInferencePipeline:
    """Inference pipeline using R-4B model for video question answering."""

    def __init__(
        self,
        model_name: str = config.MODEL_NAME,
        device: str = config.DEVICE,
    ):
        """
        Initialize the inference pipeline.

        Args:
            model_name: HuggingFace model name
            device: Device to run inference on ('cuda' or 'cpu')
        """
        self.device = device
        self.model_name = model_name

        print(f"Loading model: {model_name}")
        print(f"Device: {device}")

        # Load model and processor
        self.model = AutoModel.from_pretrained(
            model_name,
            trust_remote_code=config.TRUST_REMOTE_CODE,
            torch_dtype=torch.float16 if device == "cuda" else torch.float32,
        ).to(device)
        self.model.eval()

        self.processor = AutoProcessor.from_pretrained(
            model_name,
            trust_remote_code=config.TRUST_REMOTE_CODE,
        )

        # Initialize video processor
        self.video_processor = VideoProcessor()

        print("Model and processor loaded successfully!")

    def format_prompt(self, question: str, choices: List[str]) -> str:
        """
        Format the prompt for the model.

        Args:
            question: The question to ask
            choices: List of answer choices

        Returns:
            Formatted prompt string
        """
        choices_str = "\n".join(choices)
        return config.PROMPT_TEMPLATE.format(
            question=question,
            choices=choices_str,
        )

    def parse_answer(self, response: str) -> str:
        """
        Parse the answer from model response.

        Args:
            response: Raw model output

        Returns:
            Parsed answer (A, B, C, or D)
        """
        # Try to find answer pattern like "A", "B.", "Answer: C", etc.
        patterns = [
            r'\b([ABCD])\b',  # Single letter
            r'(?:đáp án|answer|chọn)[\s:]*([ABCD])',  # "đáp án A" or "answer: B"
            r'^([ABCD])[.\s]',  # "A. " at start
        ]

        for pattern in patterns:
            matches = re.findall(pattern, response, re.IGNORECASE)
            if matches:
                answer = matches[0].upper()
                if answer in config.VALID_ANSWERS:
                    return answer

        # If no clear answer found, try to extract the first valid letter
        for char in response.upper():
            if char in config.VALID_ANSWERS:
                return char

        # Default to A if no answer found
        print(f"Warning: Could not parse answer from response: {response[:100]}")
        return "A"

    def run_inference(self, question_data: Dict) -> str:
        """
        Run inference for a single question.

        Args:
            question_data: Dictionary containing question information

        Returns:
            Predicted answer (A, B, C, or D)
        """
        # Get video path
        video_path = config.DATA_DIR / question_data["video_path"]

        # Extract frames from video
        try:
            frames = self.video_processor.extract_frames(video_path)
        except Exception as e:
            print(f"Error extracting frames from {video_path}: {e}")
            return "A"  # Default answer on error

        if not frames:
            print(f"Warning: No frames extracted from {video_path}")
            return "A"

        # Use the middle frame (or first frame if only one)
        frame = frames[len(frames) // 2]

        # Format prompt
        prompt = self.format_prompt(
            question=question_data["question"],
            choices=question_data["choices"],
        )

        # Prepare inputs
        messages = [
            {
                "role": "user",
                "content": [
                    {"type": "image", "image": frame},
                    {"type": "text", "text": prompt},
                ],
            }
        ]

        # Process inputs
        text = self.processor.apply_chat_template(
            messages,
            tokenize=False,
            add_generation_prompt=True,
        )
        inputs = self.processor(
            text=[text],
            images=[frame],
            return_tensors="pt",
            padding=True,
        )

        # Move inputs to device
        inputs = {k: v.to(self.device) for k, v in inputs.items()}

        # Generate response
        with torch.no_grad():
            outputs = self.model.generate(
                **inputs,
                max_new_tokens=config.MAX_NEW_TOKENS,
                do_sample=config.DO_SAMPLE,
                temperature=config.TEMPERATURE if config.DO_SAMPLE else None,
                thinking_mode=config.THINKING_MODE,
            )

        # Decode response
        response = self.processor.batch_decode(
            outputs,
            skip_special_tokens=True,
            clean_up_tokenization_spaces=True,
        )[0]

        # Parse answer
        answer = self.parse_answer(response)

        return answer

    def run_pipeline(self, test_json_path: Path, output_csv_path: Path):
        """
        Run the full inference pipeline on test data.

        Args:
            test_json_path: Path to test JSON file
            output_csv_path: Path to save submission CSV
        """
        # Load test data
        print(f"Loading test data from: {test_json_path}")
        with open(test_json_path, "r", encoding="utf-8") as f:
            test_data = json.load(f)

        questions = test_data["data"]
        print(f"Total questions: {len(questions)}")

        # Run inference
        results = []
        for question_data in tqdm(questions, desc="Processing questions"):
            question_id = question_data["id"]
            answer = self.run_inference(question_data)
            results.append({"id": question_id, "answer": answer})

        # Save results
        df = pd.DataFrame(results)
        output_csv_path.parent.mkdir(parents=True, exist_ok=True)
        df.to_csv(output_csv_path, index=False)
        print(f"\nResults saved to: {output_csv_path}")
        print(f"Total predictions: {len(results)}")

        # Clear cache
        self.video_processor.clear_cache()


def main():
    """Main entry point for the inference pipeline."""
    # Initialize pipeline
    pipeline = R4BInferencePipeline()

    # Run inference on public test data
    pipeline.run_pipeline(
        test_json_path=config.PUBLIC_TEST_JSON,
        output_csv_path=config.SUBMISSION_FILE,
    )


if __name__ == "__main__":
    main()

## 5. Upload Test Data

You need to upload:
1. `public_test.json` - The test questions file
2. Video files - All the video files referenced in the JSON

**Option A: Upload from Google Drive**

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Modify this path to point to your data in Google Drive
# !cp -r "/content/drive/MyDrive/your_data_folder/*" /content/data/traffic_buddy_train+public_test/public_test/

**Option B: Upload directly (for small datasets)**

In [None]:
from google.colab import files
import shutil

# Upload public_test.json
print("Please upload public_test.json")
uploaded = files.upload()
for filename in uploaded.keys():
    shutil.move(filename, "/content/data/traffic_buddy_train+public_test/public_test/public_test.json")
    print(f"✓ {filename} uploaded successfully")

# Upload videos
print("\nPlease upload video files (you may need to zip them first)")
uploaded = files.upload()
for filename in uploaded.keys():
    if filename.endswith('.zip'):
        !unzip -q {filename} -d /content/data/traffic_buddy_train+public_test/public_test/videos/
        print(f"✓ {filename} extracted")
    else:
        shutil.move(filename, f"/content/data/traffic_buddy_train+public_test/public_test/videos/{filename}")
        print(f"✓ {filename} uploaded")

**Option C: Download from URL**

In [None]:
# Uncomment and modify if you have a download link
# !wget -O /content/data.zip "YOUR_DOWNLOAD_LINK"
# !unzip -q /content/data.zip -d /content/data/traffic_buddy_train+public_test/public_test/

## 6. Verify Data Upload

In [None]:
import json
from pathlib import Path

# Check if JSON file exists
json_path = Path("/content/data/traffic_buddy_train+public_test/public_test/public_test.json")
if json_path.exists():
    with open(json_path) as f:
        data = json.load(f)
    print(f"✓ JSON file found: {len(data['data'])} questions")
else:
    print("✗ JSON file not found!")

# Check video files
videos_dir = Path("/content/data/traffic_buddy_train+public_test/public_test/videos")
if videos_dir.exists():
    video_files = list(videos_dir.glob("*.mp4")) + list(videos_dir.glob("*.avi"))
    print(f"✓ Found {len(video_files)} video files")
else:
    print("✗ Videos directory not found!")

# List directory structure
!ls -lh /content/data/traffic_buddy_train+public_test/public_test/

## 7. Run Inference Pipeline

In [None]:
import sys
sys.path.insert(0, '/content')

from src.inference import main

# Run the pipeline
print("Starting inference pipeline...\n")
main()
print("\n✓ Inference completed!")

## 8. View Results

In [None]:
import pandas as pd

# Load and display results
results_df = pd.read_csv("/content/output/submission.csv")
print(f"Total predictions: {len(results_df)}")
print(f"\nFirst 10 predictions:")
print(results_df.head(10))

# Show answer distribution
print(f"\nAnswer distribution:")
print(results_df['answer'].value_counts().sort_index())

## 9. Download Results

In [None]:
from google.colab import files

# Download the submission file
files.download("/content/output/submission.csv")
print("✓ Submission file downloaded!")

## 10. (Optional) Test on a Single Question

In [None]:
# Test on a single question to debug
from src.inference import R4BInferencePipeline
import json

# Load test data
with open("/content/data/traffic_buddy_train+public_test/public_test/public_test.json") as f:
    test_data = json.load(f)

# Initialize pipeline
pipeline = R4BInferencePipeline()

# Test first question
first_question = test_data["data"][0]
print(f"Question: {first_question['question']}")
print(f"Choices: {first_question['choices']}")

answer = pipeline.run_inference(first_question)
print(f"\nPredicted answer: {answer}")

## Troubleshooting

### Out of Memory Error
If you get OOM errors, try:
- Reducing `MAX_FRAMES_PER_VIDEO` in config
- Setting `USE_MID_FRAME_ONLY = True` to use only one frame per video
- Using a smaller batch size

### Video Loading Errors
- Make sure video paths in the JSON match the actual file names
- Check that all videos are in the correct directory
- Verify video files are not corrupted

### Model Loading Errors
- Ensure you have a stable internet connection
- The model will be downloaded from HuggingFace on first run
- May require HuggingFace authentication for private models