# Full Pipeline (Module 1 -> 2 -> 3) on Colab

This notebook runs the complete system:
1. Module 1 Extraction
2. Module 2 Perception (ASR + captions)
3. Module 3 Reasoning NLP (G1->G8)


In [None]:
from google.colab import drive

drive.mount('/content/drive')



In [None]:
import importlib.util
import subprocess
import sys
from pathlib import Path

def _ensure_ffmpeg():
    if Path('/usr/bin/ffmpeg').exists():
        print('ffmpeg already installed')
        return
    subprocess.check_call(['apt-get', 'update', '-y'])
    subprocess.check_call(['apt-get', 'install', '-y', 'ffmpeg'])

def _ensure_packages():
    req = {
        'scenedetect': 'scenedetect',
        'cv2': 'opencv-python-headless',
        'faster_whisper': 'faster-whisper',
        'transformers': 'transformers',
        'PIL': 'pillow',
        'tqdm': 'tqdm',
        'jsonschema': 'jsonschema',
    }
    miss = [pip for mod,pip in req.items() if importlib.util.find_spec(mod) is None]
    if not miss:
        print('python packages already satisfied')
        return
    subprocess.check_call([sys.executable, '-m', 'pip', 'install', '-q', *miss])

_ensure_ffmpeg()
_ensure_packages()



In [None]:
import os
import subprocess
import time
import shutil
import torch
from pathlib import Path

REPO_DIR = Path('/content/video-summary')
if not REPO_DIR.exists():
    subprocess.check_call(['git', 'clone', 'https://github.com/TCTri205/video-summary.git', str(REPO_DIR)])
os.chdir(REPO_DIR)

print('CUDA available:', torch.cuda.is_available())
gpu_name = torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'CPU'
print('GPU:', gpu_name)

if not torch.cuda.is_available():
    RUNTIME_PROFILE = 'CPU'
elif 'L4' in gpu_name.upper():
    RUNTIME_PROFILE = 'L4'
else:
    RUNTIME_PROFILE = 'T4'

CAPTION_BATCH_BY_PROFILE = {
    'CPU': 1,
    'T4': 4,
    'L4': 8,
}
SUMMARIZE_TOKENS_BY_PROFILE = {
    'CPU': 256,
    'T4': 384,
    'L4': 512,
}
CAPTION_BATCH_SIZE = CAPTION_BATCH_BY_PROFILE[RUNTIME_PROFILE]
SUMMARIZE_MAX_NEW_TOKENS = SUMMARIZE_TOKENS_BY_PROFILE[RUNTIME_PROFILE]

DRIVE_ROOT = Path('/content/drive/MyDrive/video-summary')
INPUT_VIDEO_DRIVE = DRIVE_ROOT / 'input' / 'raw_video.mp4'
PROCESSED_DRIVE = DRIVE_ROOT / 'processed'
ARTIFACTS_DRIVE = DRIVE_ROOT / 'artifacts'

LOCAL_ROOT = Path('/content/video-summary-work')
LOCAL_INPUT_DIR = LOCAL_ROOT / 'input'
LOCAL_PROCESSED = LOCAL_ROOT / 'processed'
LOCAL_ARTIFACTS = LOCAL_ROOT / 'artifacts'

for path in [PROCESSED_DRIVE, ARTIFACTS_DRIVE, LOCAL_INPUT_DIR, LOCAL_PROCESSED, LOCAL_ARTIFACTS]:
    path.mkdir(parents=True, exist_ok=True)

if not INPUT_VIDEO_DRIVE.exists():
    raise FileNotFoundError(f'Missing input video: {INPUT_VIDEO_DRIVE}')

LOCAL_VIDEO = LOCAL_INPUT_DIR / INPUT_VIDEO_DRIVE.name
if (not LOCAL_VIDEO.exists()) or (LOCAL_VIDEO.stat().st_size != INPUT_VIDEO_DRIVE.stat().st_size):
    shutil.copy2(INPUT_VIDEO_DRIVE, LOCAL_VIDEO)

RAW_VIDEO_LOCAL = str(LOCAL_VIDEO)
RAW_VIDEO_DRIVE = str(INPUT_VIDEO_DRIVE)
VIDEO_NAME = LOCAL_VIDEO.stem

RUN_ID = os.environ.get('VIDEO_SUMMARY_RUN_ID', '').strip() or f'colab_full_{VIDEO_NAME}_{int(time.time())}'
REPLAY_MODE = False
KEEP_LAST_RUNS = 2
CLEAN_OLD_RUNS = True
CLEAN_HEAVY_EXTRACTION = True

print('RUNTIME_PROFILE =', RUNTIME_PROFILE)
print('CAPTION_BATCH_SIZE =', CAPTION_BATCH_SIZE)
print('SUMMARIZE_MAX_NEW_TOKENS =', SUMMARIZE_MAX_NEW_TOKENS)
print('VIDEO_NAME =', VIDEO_NAME)
print('RUN_ID =', RUN_ID)
print('REPLAY_MODE =', REPLAY_MODE)


In [None]:
from extraction_perception.extraction.extraction import VideoPreprocessor

if REPLAY_MODE:
    print('Replay mode enabled: skip Module 1 extraction')
else:
    processor = VideoPreprocessor(video_path=RAW_VIDEO_LOCAL, output_root=str(LOCAL_PROCESSED))
    timestamps = processor.detect_scenes()
    audio_path = processor.extract_audio()
    metadata = processor.extract_keyframes_and_metadata(timestamps)
    print('Scenes:', len(timestamps))
    print('Audio:', audio_path)
    print('Keyframes:', metadata.get('total_keyframes', 0))



## Step 1 - Module 1 Extraction

In [None]:
import gc
import torch
from extraction_perception.extraction.whisper_module import WhisperExtractor
from extraction_perception.perception.caption import VisualCaptioner

LOCAL_EXTRACTION_DIR = LOCAL_PROCESSED / VIDEO_NAME / 'extraction'
AUDIO_PATH = LOCAL_EXTRACTION_DIR / 'audio' / 'audio_16k.wav'
METADATA_PATH = LOCAL_EXTRACTION_DIR / 'scene_metadata.json'
CAPTIONS_PATH = LOCAL_EXTRACTION_DIR / 'visual_captions.json'

if REPLAY_MODE:
    print('Replay mode enabled: skip Module 2 perception')
else:
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    compute_type = 'float16' if device == 'cuda' else 'int8'

    asr = WhisperExtractor(model_size='base', device=device, compute_type=compute_type)
    asr.transcribe(
        input_path=str(AUDIO_PATH),
        language='vi',
        output_root=str(LOCAL_PROCESSED),
        output_name=VIDEO_NAME,
    )

    captioner = VisualCaptioner()
    captioner.caption_from_metadata(
        metadata_path=str(METADATA_PATH),
        output_path=str(CAPTIONS_PATH),
        batch_size=CAPTION_BATCH_SIZE,
    )

    del asr
    del captioner
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()


## Step 2 - Module 2 Perception

In [None]:
import shutil

DRIVE_EXTRACTION_DIR = PROCESSED_DRIVE / VIDEO_NAME / 'extraction'
DRIVE_EXTRACTION_DIR.mkdir(parents=True, exist_ok=True)

if not REPLAY_MODE:
    sync_to_drive = [
        ('scene_metadata.json', 'scene_metadata.json'),
        ('audio_transcripts.json', 'audio_transcripts.json'),
        ('visual_captions.json', 'visual_captions.json'),
    ]
    for src_rel, dst_rel in sync_to_drive:
        src = LOCAL_EXTRACTION_DIR / src_rel
        dst = DRIVE_EXTRACTION_DIR / dst_rel
        if src.exists():
            dst.parent.mkdir(parents=True, exist_ok=True)
            shutil.copy2(src, dst)

AUDIO_TRANSCRIPTS = DRIVE_EXTRACTION_DIR / 'audio_transcripts.json'
VISUAL_CAPTIONS = DRIVE_EXTRACTION_DIR / 'visual_captions.json'

if not AUDIO_TRANSCRIPTS.exists() or not VISUAL_CAPTIONS.exists():
    raise FileNotFoundError('Missing perception outputs on Drive for reasoning stage')

print('Drive transcripts/captions ready')



## Step 3 - Module 3 Reasoning (G1->G8)

In [None]:
import subprocess
import sys

artifacts_root = ARTIFACTS_DRIVE if REPLAY_MODE else LOCAL_ARTIFACTS
cmd = [
    sys.executable,
    '-m',
    'reasoning_nlp.pipeline_runner',
    '--audio-transcripts', str(AUDIO_TRANSCRIPTS),
    '--visual-captions', str(VISUAL_CAPTIONS),
    '--raw-video', RAW_VIDEO_DRIVE,
    '--stage', 'g8',
    '--run-id', RUN_ID,
    '--artifacts-root', str(artifacts_root),
    '--summarize-backend', 'local',
    '--summarize-fallback-backend', 'local',
    '--summarize-max-new-tokens', str(SUMMARIZE_MAX_NEW_TOKENS),
]
if REPLAY_MODE:
    cmd.append('--replay')

subprocess.check_call(cmd)
print('Reasoning pipeline completed')


In [None]:
import shutil
from pathlib import Path

source_run_dir = (ARTIFACTS_DRIVE if REPLAY_MODE else LOCAL_ARTIFACTS) / RUN_ID
drive_run_dir = ARTIFACTS_DRIVE / RUN_ID

def _copy_item(src: Path, dst: Path) -> None:
    if not src.exists():
        return
    dst.parent.mkdir(parents=True, exist_ok=True)
    if src.is_dir():
        if dst.exists():
            shutil.rmtree(dst)
        shutil.copytree(src, dst)
    else:
        shutil.copy2(src, dst)

keep_rel_paths = [
    'run_meta.json',
    'g1_validate/normalized_input.json',
    'g2_align/alignment_result.json',
    'g3_context/context_blocks.json',
    'g4_summarize/parse_meta.json',
    'g4_summarize/summary_script.internal.json',
    'g5_segment/summary_script.json',
    'g5_segment/summary_video_manifest.json',
    'g6_manifest/manifest_validation.json',
    'g7_assemble/render_meta.json',
    'g7_assemble/summary_video.mp4',
    'g8_qc/quality_report.json',
]
for rel in keep_rel_paths:
    _copy_item(source_run_dir / rel, drive_run_dir / rel)

if CLEAN_HEAVY_EXTRACTION and DRIVE_EXTRACTION_DIR.exists():
    heavy_items = [
        DRIVE_EXTRACTION_DIR / 'keyframes',
        DRIVE_EXTRACTION_DIR / 'audio' / 'audio_16k.wav',
    ]
    for item in heavy_items:
        if item.is_dir():
            shutil.rmtree(item, ignore_errors=True)
        elif item.exists():
            item.unlink()

if CLEAN_OLD_RUNS and KEEP_LAST_RUNS > 0:
    all_runs = [p for p in ARTIFACTS_DRIVE.iterdir() if p.is_dir() and p.name.startswith('colab_full_')]
    all_runs.sort(key=lambda x: x.stat().st_mtime, reverse=True)
    for old in all_runs[KEEP_LAST_RUNS:]:
        if old.name != RUN_ID:
            shutil.rmtree(old, ignore_errors=True)

print('Synced balanced artifacts to Drive:', drive_run_dir)



In [None]:
from IPython.display import Video
import subprocess
import sys

RUN_DIR = ARTIFACTS_DRIVE / RUN_ID
ALIGNMENT = RUN_DIR / 'g2_align' / 'alignment_result.json'
SCRIPT = RUN_DIR / 'g5_segment' / 'summary_script.json'
MANIFEST = RUN_DIR / 'g5_segment' / 'summary_video_manifest.json'
REPORT = RUN_DIR / 'g8_qc' / 'quality_report.json'

subprocess.check_call([
    sys.executable,
    'docs/Reasoning-NLP/schema/validate_artifacts.py',
    '--alignment', str(ALIGNMENT),
    '--script', str(SCRIPT),
    '--manifest', str(MANIFEST),
    '--report', str(REPORT),
    '--contracts-dir', 'contracts/v1/template',
])

OUTPUT_VIDEO = RUN_DIR / 'g7_assemble' / 'summary_video.mp4'
print('Output video:', OUTPUT_VIDEO)
Video(str(OUTPUT_VIDEO), embed=True)

