###                  ADAS VIDEO PERCEPTION → RAW INSTRUCTION DATASET PIPELINE
###     Qwen2.5-VL | Automatic Scene Parsing | JSON Validation

This script processes a directory of driving videos and automatically generates a raw instruction dataset for ADAS/Autonomous Driving applications using a custom vision-language model.

For each video, the system produces structured scene understanding, driving parameters, and risk assessments in natural language and JSON format. The generated outputs serve as initial (raw) instruction data that can later be refined, cleaned, or validated using domain-specific rules, human annotation, or additional QA tools. This pipeline enables scalable, automated creation of training data for perception, reasoning, and safety-related ADAS models.

Author: Sarvesh Telang

The default configuration uses:

**Qwen2.5-VL-3B-Instruct** — a compact vision–language model suitable for image/ video understanding and supported on colab free tier setting

The `MODEL_NAME` field can be replaced with other supported Hugging Face models depending on the application. Examples include:

• Video Understanding Models such as **Video-LLaMA** for multi-frame and temporal reasoning `"DAMO-NLP-SG/Video-LLaMA-2-7B"`

• Object Detection Models such as the **YOLO series** (YOLOv8, YOLO-NAS) for detection tasks (using `ultralytics`)

• General-Purpose VLM Models such as **LLaVA** or **LLaVA-NeXT** for image–text reasoning `"llava-hf/llava-v1.6-vicuna-13b-hf"`

• Grounding and Captioning Models such as **InternVL2** or **Florence-2** `"OpenGVLab/InternVL2-8B"`

• Captioning / VQA Models such as **BLIP** / **BLIP-2** `"Salesforce/blip2-opt-2.7b"`

In [None]:
# ---------------------------------------------------------------
# 1. MOUNT GOOGLE DRIVE
# ---------------------------------------------------------------
from google.colab import drive
drive.mount('/content/drive')

VIDEO_DIR = "/content/drive/MyDrive/SFT_VLA_dataset" # video folder path
SAVE_PATH = "/content/drive/MyDrive"
OUTPUT_JSON = f"{SAVE_PATH}/BDD_instruct_train.json"

In [None]:
# ---------------------------------------------------------------
# 2. INSTALL DEPENDENCIES
# ---------------------------------------------------------------
!pip install -q git+https://github.com/huggingface/transformers accelerate
!pip install -q qwen-vl-utils[decord]==0.0.8   # video support

In [None]:
# ---------------------------------------------------------------
# 3. IMPORTS & MODEL LOADING
# ---------------------------------------------------------------
import os
import json
import time
from tqdm import tqdm
# import torch

from transformers import AutoProcessor, Qwen2_5_VLForConditionalGeneration
from qwen_vl_utils import process_vision_info

MODEL_NAME = "Qwen/Qwen2.5-VL-3B-Instruct" # Select custom model for raw instruction generation

processor = AutoProcessor.from_pretrained(MODEL_NAME)
model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
    MODEL_NAME,
    torch_dtype="auto",
    device_map="auto"
)

In [None]:
# ---------------------------------------------------------------
# 4. PROMPT TEMPLATE
# ---------------------------------------------------------------
PROMPT = """
You are an ADAS driving assistant. Analyze the scene from a driving and safety perspective, and produce Scene Description, Driving Parameters and Risk Assessment. Follow below instructions:

1. Scene Description:
   - Focus only on elements relevant to driving behavior and safety.
   - Describe the positions, movements, and actions of other vehicles, pedestrians, and obstacles.
   - Mention traffic signs, lights, road markings, and lane information if relevant.
   - Highlight any potential hazards or situations that require attention from the ego vehicle.

2. Driving Parameters (JSON):
{
 "road_type": "...",
 "lane_count": "...",
 "ego_lane_position": "...",
 "traffic_light_state": "...",
 "pedestrian_on_road": "...",
 "closest_vehicle_distance": "...",
 "ego_vehicle_speed": "...",
 "road_curvature": "...",
 "weather": "...",
 "visibility": "...",
 "traffic_density": "...",
 "risk_factor": "..."
}

3. Risk Assessment:
   - Consider nearby vehicles, pedestrians, road conditions, traffic rules, visibility and environmental conditions.

Do not mention that it is a video or footage. Provide precise, actionable observations for an ADAS system.
"""

In [None]:
# ---------------------------------------------------------------
# 5. PROCESS ALL VIDEOS AND GENERATE RESPONSES
# ---------------------------------------------------------------
video_files = sorted(
    f for f in os.listdir(VIDEO_DIR)
    if f.lower().endswith((".mp4", ".mov", ".avi"))
)

results = []
start_total = time.time()

for video_name in tqdm(video_files, desc="Processing videos", unit="video"):

    video_path = os.path.join(VIDEO_DIR, video_name)
    print(f"\n▶ Processing {video_name}")

    messages = [
        {
            "role": "user",
            "content": [
                {
                    "type": "video",
                    "video": video_path,
                    "max_pixels": 360 * 420,
                    "fps": 1.0
                },
                {"type": "text", "text": ADAS_PROMPT},
            ]
        }
    ]

    # Encode text + video
    text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
    image_inputs, video_inputs = process_vision_info(messages)

    inputs = processor(
        text=[text],
        images=image_inputs,
        videos=video_inputs,
        padding=True,
        return_tensors="pt"
    ).to("cuda")

    # Generate output
    generated_ids = model.generate(**inputs, max_new_tokens=512)
    trimmed_ids = [
        out_ids[len(in_ids):] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
    ]
    output = processor.batch_decode(
        trimmed_ids,
        skip_special_tokens=True,
        clean_up_tokenization_spaces=False
    )[0]

    # Save entry
    results.append({
        "video_id": video_name,
        "QA": {"q": PROMPT, "a": output}
    })

# Save all responses
with open(OUTPUT_JSON, "w") as f:
    json.dump(results, f, indent=4)

print("\n✅ Saved output to:", OUTPUT_JSON)
print(f"⏱ Total processing time: {time.time()-start_total:.2f} seconds")

In [None]:
# ===============================================================
# 6. VALIDATION (OPTIONAL): TO ENSURE CONSISTENT COMPLETION FORMAT
# ===============================================================
import re

RE_JSON_BLOCK = re.compile(r"```json(.*?)```", re.DOTALL)

REQUIRED_FIELDS = {
    "road_type", "lane_count", "ego_lane_position", "traffic_light_state",
    "pedestrian_on_road", "closest_vehicle_distance", "ego_vehicle_speed",
    "road_curvature", "weather", "visibility", "traffic_density", "risk_factor"
}

def find_line(all_lines, text):
    for i, line in enumerate(all_lines, start=1):
        if text in line:
            return i
    return None


# ----------------------- JSON Error Context --------------------
def json_error_context(json_text):
    try:
        json.loads(json_text)
        return None
    except json.JSONDecodeError as e:
        lines = json_text.split("\n")
        faulty = lines[e.lineno-1]
        caret = " " * (e.colno-1) + "^"
        return (e.msg, e.lineno, e.colno, faulty, caret)


# ----------------------- Entry Validation ----------------------
def validate_entry(entry, all_lines):
    errors = []
    answer = entry["QA"]["a"]

    # Check sections
    for section in ["Scene Description", "Driving Parameters", "Risk Assessment"]:
        if section not in answer:
            errors.append((f"Missing section: {section}", None))

    # Extract JSON block
    match = RE_JSON_BLOCK.search(answer)
    if not match:
        errors.append(("Missing JSON block", None))
        return errors

    json_text = match.group(1).strip()
    json_line = find_line(all_lines, json_text.split("\n")[0])

    # JSON validity check
    context = json_error_context(json_text)
    if context:
        msg, line, col, faulty, caret = context
        errors.append((f"Invalid JSON: {msg}", json_line + line - 1, faulty, caret))
        return errors

    # Structure validation
    parsed = json.loads(json_text)
    missing = REQUIRED_FIELDS - set(parsed.keys())
    extra = set(parsed.keys()) - REQUIRED_FIELDS

    if missing:
        errors.append((f"Missing JSON fields: {missing}", json_line))
    if extra:
        errors.append((f"Extra JSON fields: {extra}", json_line))

    return errors


# ----------------------- Full File Validation ------------------
def validate_file(path):
    with open(path, "r") as f:
        all_lines = f.readlines()

    dataset = json.loads("".join(all_lines"))
    print("\n==================== VALIDATION REPORT ====================\n")

    for item in dataset:
        video_id = item["video_id"]
        issues = validate_entry(item, all_lines)

        if not issues:
            print(f"[✔ OK] {video_id}")
        else:
            print(f"[❌ Issues in] {video_id}:")
            for e in issues:
                if len(e) == 4:
                    msg, line, faulty, caret = e
                    print(f"   Line {line}: {msg}")
                    print(f"      {faulty}")
                    print(f"      {caret}")
                else:
                    print(f"   {e[0]}")

    print("\n========================= DONE =============================\n")


# ----------------------- Run Validation ------------------------
validate_file(OUTPUT_JSON)