# FastAPI upload server (payload_video.ipynb)

Notebook ini menyediakan server FastAPI yang menerima upload video (multipart) di `/upload` dan menerima JSON payload di `/upload`.

Langkah eksekusi:
1. Jalankan cell instalasi dependensi
2. Jalankan cell setup direktori
3. Jalankan cell definisi server
4. Jalankan cell start server (ngrok akan dicoba jika tersedia)

Hasil: file yang diupload akan disimpan di folder `uploads/` dan payload JSON yang dikirim ke `/upload` akan disimpan di `received_payloads/`. Video akan diproses dengan Whisper untuk speech-to-text.

In [61]:
#kalo pake colab jangan lupa install dulu di terminal
!pip install --quiet numpy==1.26.4
!pip install --quiet --upgrade torch torchaudio faster-whisper


#kalo lokal download ffmpeg nya
#https://github.com/GyanD/codexffmpeg/releases/download/2025-11-27-git-61b034a47c/ffmpeg-2025-11-27-git-61b034a47c-full_build.zip
#simpen di c

In [62]:
# import os
# os.environ["PATH"] += os.pathsep + r"C:\ffmpeg\bin"

# ============================================================================
# üîß CELL 1: INSTALL SAFE DEPENDENCIES (FIXED - NO CONFLICTS!)
# ============================================================================

# ‚úÖ TIER 0: JUPYTER WIDGETS (fixes tqdm warning)
!pip install --quiet ipywidgets jupyter
# ‚úÖ TIER 1: AMAN (Tidak touch numpy)
!pip install --quiet fastapi uvicorn nest-asyncio pyngrok python-multipart
!pip install --quiet tqdm
!pip install --quiet imageio-ffmpeg
!pip install --quiet deepl

# ‚úÖ TIER 2: AMAN (Pure torch-based, no numpy dependency)
#!pip install --quiet torch torchaudio
!pip install --quiet silero-vad

# ‚úÖ TIER 3: AMAN (Minimal numpy, tidak upgrade)
!pip install --quiet pydub
!pip install --quiet soundfile
!pip install --quiet scipy
!pip install --quiet scikit-learn

# ‚úÖ TIER 4: AMAN (Cloud-based, no local deps)
#!pip install --quiet faster-whisper
!pip install --quiet huggingface-hub

# ‚úÖ TIER 5: MEDIAPIPE (sudah include opencv internally!)
!pip install --quiet mediapipe
# ‚úÖ TIER 6: TORCHCODEC (video codec support)
!pip install --quiet torchcodec
!pip install --quiet gdown requests
!pip install --quiet resemblyzer moviepy

print('\n‚úÖ All safe packages installed')
print('   ‚úÖ No numpy version conflicts')
print('   ‚úÖ Jupyter widgets installed (fixes tqdm warning)')
print('   ‚úÖ FFmpeg required for audio - verify with next cell')


‚úÖ All safe packages installed
   ‚úÖ No numpy version conflicts
   ‚úÖ FFmpeg required for audio - verify with next cell


<b><h2> Import Library

In [63]:
# ==========================
# Standard Library
# ==========================
import asyncio
import gc
import getpass
import hashlib
import json
import json as json_module
import os
import random
import re
import shutil
import subprocess
import sys
import tempfile
import threading
import threading as th
import time
import math
import traceback
import uuid
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timezone
from typing import List
from urllib.parse import urlparse
import urllib.request
import torch
import torchaudio
from silero_vad import load_silero_vad
import numpy as np
from pydub import AudioSegment
from pydub.silence import detect_nonsilent
import gdown
import requests
from urllib.parse import urlparse, parse_qs
import librosa
from resemblyzer import VoiceEncoder, preprocess_wav
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics import silhouette_score
from moviepy.editor import VideoFileClip

# ==========================
# Third-Party Libraries
# ==========================
import deepl
import nest_asyncio
import torch
import uvicorn
from faster_whisper import WhisperModel
from huggingface_hub import InferenceClient
from pyngrok import conf, ngrok
from tqdm import tqdm
import cv2
import mediapipe as mp

# ==========================
# FastAPI & Middleware
# ==========================
from fastapi import (
    BackgroundTasks,
    FastAPI,
    File,
    Form,
    HTTPException,
    Request,
    UploadFile
)
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.staticfiles import StaticFiles

<b><h2> Siapkan direktori untuk upload dan transcription

In [64]:
# Siapkan direktori untuk upload dan transcription
ROOT_DIR = os.getcwd()
UPLOAD_DIR = os.path.join(ROOT_DIR, 'uploads')
TRANSCRIPTION_DIR = os.path.join(ROOT_DIR, 'transcriptions')
AUDIO_DIR = os.path.join(ROOT_DIR, 'audio')
RESULTS_DIR = os.path.join(ROOT_DIR, 'results')  # NEW: hasil assessment
os.makedirs(UPLOAD_DIR, exist_ok=True)
os.makedirs(TRANSCRIPTION_DIR, exist_ok=True)
os.makedirs(AUDIO_DIR, exist_ok=True)
os.makedirs(RESULTS_DIR, exist_ok=True)

print('üìÅ Directories:')
print(f'   Upload: {UPLOAD_DIR}')
print(f'   Transcription: {TRANSCRIPTION_DIR}')
print(f'   AUDIO: {AUDIO_DIR}')
print(f'   Results: {RESULTS_DIR}')

# Check for GPU
device = "cuda" if torch.cuda.is_available() else "cpu"
compute_type = "float16" if device == "cuda" else "int8"

print(f'\nüéØ Device Configuration:')
print(f'   Device: {device.upper()}')
print(f'   Compute Type: {compute_type}')
if device == "cuda":
    print(f'   GPU: {torch.cuda.get_device_name(0)}')
else:
    print('   Note: Using CPU (GPU recommended for faster processing)')

# DeepL Configuration
DEEPL_API_KEY = "02a88edf-4fcb-4786-ba3d-a137fb143760:fx"

print('\nüåê Translation Configuration:')
print(f'   DeepL API: {"Configured" if DEEPL_API_KEY != "YOUR_DEEPL_API_KEY_HERE" else "‚ö†Ô∏è  NOT CONFIGURED - Set DEEPL_API_KEY"}')

üìÅ Directories:
   Upload: /content/uploads
   Transcription: /content/transcriptions
   AUDIO: /content/audio
   Results: /content/results

üéØ Device Configuration:
   Device: CUDA
   Compute Type: float16
   GPU: Tesla T4

üåê Translation Configuration:
   DeepL API: Configured


In [65]:
app = FastAPI(title='AI Interview Assessment System')

app.add_middleware(
    CORSMiddleware,
    allow_origins=['*'],
    allow_credentials=True,
    allow_methods=['*'],
    allow_headers=['*'],
    expose_headers=['*'],
    max_age=3600,
)

# Mount static folders
app.mount('/uploads', StaticFiles(directory=UPLOAD_DIR), name='uploads')
app.mount('/transcriptions', StaticFiles(directory=TRANSCRIPTION_DIR), name='transcriptions')
app.mount('/results', StaticFiles(directory=RESULTS_DIR), name='results')

In [66]:
# Background processing
executor = ThreadPoolExecutor(max_workers=2)
processing_status = {}
processing_lock = th.Lock()

# HELPER FUNCTIONS - ONLY ONE INSTANCE EACH

def get_local_file_path(url):
    """Extract local file path from URL if it's a local upload"""
    try:
        parsed = urlparse(url)
        if '/uploads/' in parsed.path:
            filename = parsed.path.split('/uploads/')[-1]
            local_path = os.path.join(UPLOAD_DIR, filename)
            if os.path.exists(local_path):
                return local_path
    except Exception as e:
        print(f'Error parsing URL: {e}')
    return None

In [130]:
# ============================================================================
# üîß FIX: JSON SERIALIZATION FOR NUMPY TYPES
# ============================================================================

import json
import numpy as np

# Monkey patch JSON encoder to handle NumPy types automatically
_original_default = json.JSONEncoder.default

def _numpy_default(self, obj):
    """Custom JSON encoder that converts NumPy types to Python native types"""
    if isinstance(obj, np.integer):
        return int(obj)
    elif isinstance(obj, np.floating):
        return float(obj)
    elif isinstance(obj, np.ndarray):
        return obj.tolist()
    elif isinstance(obj, np.bool_):
        return bool(obj)
    return _original_default(self, obj)

# Apply the patch
json.JSONEncoder.default = _numpy_default

print('‚úÖ JSON encoder patched for NumPy compatibility')
print('   All json.dump() calls will now handle NumPy types automatically\n')

‚úÖ JSON encoder patched for NumPy compatibility
   All json.dump() calls will now handle NumPy types automatically



<b><h2> **Initialize** Whisper Model

In [67]:
# Load faster-whisper model with BEST ACCURACY settings
print('\nüì• Loading Whisper model...')
print('‚ÑπÔ∏è  Using faster-whisper "large-v3" model')
print('   This is the MOST ACCURATE model available')
print('   Speed: 4-5x faster than openai-whisper')
print('   Accuracy: ~98% for clear English speech')
print('   First run will download ~3GB model...\n')

# Detect device
device = "cuda" if torch.cuda.is_available() else "cpu"
compute_type = "float16" if device == "cuda" else "int8"

print(f'üéØ Configuration:')
print(f'   Device: {device.upper()}')
print(f'   Compute Type: {compute_type}')

# Load model with best accuracy settings
whisper_model = WhisperModel(
    "large-v3",
    device=device,
    compute_type=compute_type,
    cpu_threads=4,
    num_workers=1
)

print('‚úÖ Whisper model loaded successfully\n')

# ============================================================================
# üîä INITIALIZE VOICE ENCODER FOR SPEAKER DIARIZATION
# ============================================================================
print('\nüì• Loading Voice Encoder for Speaker Diarization...')
try:
    import torch
    from resemblyzer import VoiceEncoder

    # ‚úÖ FIX: Force CPU mode to avoid cuDNN version mismatch
    print('   Configuring for CPU mode (avoiding cuDNN errors)...')

    # Set PyTorch to CPU only
    if torch.cuda.is_available():
        print('   ‚ÑπÔ∏è  GPU available but using CPU to avoid cuDNN conflicts')

    # Force device to CPU
    device = torch.device('cpu')

    # Load VoiceEncoder with CPU
    voice_encoder = VoiceEncoder(device='cpu')

    print('‚úÖ Voice Encoder loaded successfully (~50MB)')
    print('   Device: CPU (cuDNN conflict avoided)')
    print('   Model: Resemblyzer GE2E (Google Embeddings)')
    print('   Purpose: Detect multiple speakers in audio')
    print('   Note: CPU mode is slower but more stable\n')

except Exception as e:
    print(f'‚ö†Ô∏è  Voice Encoder failed to load: {e}')
    print('   Speaker diarization will return default values\n')
    voice_encoder = None


üì• Loading Whisper model...
‚ÑπÔ∏è  Using faster-whisper "large-v3" model
   This is the MOST ACCURATE model available
   Speed: 4-5x faster than openai-whisper
   Accuracy: ~98% for clear English speech
   First run will download ~3GB model...

üéØ Configuration:
   Device: CUDA
   Compute Type: float16
‚úÖ Whisper model loaded successfully


üì• Loading Voice Encoder for Speaker Diarization...
   Configuring for CPU mode (avoiding cuDNN errors)...
   ‚ÑπÔ∏è  GPU available but using CPU to avoid cuDNN conflicts
Loaded the voice encoder model on cpu in 0.04 seconds.
‚úÖ Voice Encoder loaded successfully (~50MB)
   Device: CPU (cuDNN conflict avoided)
   Model: Resemblyzer GE2E (Google Embeddings)
   Purpose: Detect multiple speakers in audio
   Note: CPU mode is slower but more stable



<b><h2> Initialize DeepL translator

In [68]:
# Initialize DeepL translator
translator = None
if DEEPL_API_KEY and DEEPL_API_KEY != "YOUR_DEEPL_API_KEY_HERE":
    try:
        translator = deepl.Translator(DEEPL_API_KEY)
        print('‚úÖ DeepL translator initialized successfully\n')
    except Exception as e:
        print(f'‚ö†Ô∏è  DeepL initialization failed: {e}')
        print('   Translation to Indonesian will be skipped\n')
else:
    print('‚ö†Ô∏è  DeepL API key not configured')
    print('   Translation to Indonesian will be skipped\n')

‚úÖ DeepL translator initialized successfully



<b><h2> Fungsi Cheating Detector

In [69]:
# ============================================================================
# üéØ CHEATING DETECTION CONFIGURATION (dari eye_detection.ipynb)
# ============================================================================

# Threshold Parameters
EYE_RATIO_RIGHT_LIMIT = 0.6
EYE_RATIO_LEFT_LIMIT = 1.6
HEAD_TURN_LEFT_LIMIT = 0.35
HEAD_TURN_RIGHT_LIMIT = 0.65
SCORE_HIGH_RISK = 20.0
SCORE_MEDIUM_RISK = 5.0

# Landmark Indices
LEFT_EYE = [33, 133, 468]
RIGHT_EYE = [362, 263, 473]
NOSE_TIP = 1
FACE_LEFT_EDGE = 234
FACE_RIGHT_EDGE = 454

print('\nüéØ Cheating Detection Configuration:')
print(f'   Eye Ratio Range: {EYE_RATIO_RIGHT_LIMIT} - {EYE_RATIO_LEFT_LIMIT}')
print(f'   Head Turn Range: {HEAD_TURN_LEFT_LIMIT} - {HEAD_TURN_RIGHT_LIMIT}')
print(f'   Risk Thresholds: >5% Medium, >20% High\n')


üéØ Cheating Detection Configuration:
   Eye Ratio Range: 0.6 - 1.6
   Head Turn Range: 0.35 - 0.65
   Risk Thresholds: >5% Medium, >20% High



In [70]:
# ============================================================================
# üîç CHEATING DETECTION FUNCTIONS
# ============================================================================

def get_gaze_ratio(eye_points, landmarks):
    """Menghitung rasio posisi iris untuk eye tracking"""
    left_corner = np.array([landmarks[eye_points[0]].x, landmarks[eye_points[0]].y])
    right_corner = np.array([landmarks[eye_points[1]].x, landmarks[eye_points[1]].y])
    iris_center = np.array([landmarks[eye_points[2]].x, landmarks[eye_points[2]].y])

    dist_to_left = np.linalg.norm(iris_center - left_corner)
    dist_to_right = np.linalg.norm(iris_center - right_corner)

    if dist_to_right == 0:
        return 5.0

    ratio = dist_to_left / dist_to_right
    return ratio

In [71]:
def get_head_turn_ratio(landmarks):
    """Menghitung posisi relatif hidung untuk head pose detection"""
    nose = landmarks[NOSE_TIP].x
    left_edge = landmarks[FACE_LEFT_EDGE].x
    right_edge = landmarks[FACE_RIGHT_EDGE].x

    face_width = right_edge - left_edge
    nose_dist = nose - left_edge

    if face_width == 0:
        return 0.5

    relative_pos = nose_dist / face_width
    return relative_pos

In [72]:
def analyze_video_cheating_detection(video_path: str, show_progress=True):
    """
    Visual Analysis: Eye gaze, head pose, multiple face detection

    Returns:
        dict: {
            status, total_frames, suspicious_frames, cheating_score,
            verdict, confidence, details, plot_data
        }
    """
    if not os.path.exists(video_path):
        return {"status": "error", "message": f"File not found: {video_path}"}

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        return {"status": "error", "message": "Cannot open video"}

    total_frames = 0
    suspicious_frames = 0
    eye_fail_count = 0
    head_fail_count = 0
    no_face_count = 0
    multiple_face_count = 0

    # Data for plotting
    gaze_ratios = []
    head_ratios = []
    frame_numbers = []
    confidence_scores = []
    face_counts = []

    fps = cap.get(cv2.CAP_PROP_FPS)
    total_video_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    mp_face_mesh = mp.solutions.face_mesh
    mp_face_detection = mp.solutions.face_detection

    with mp_face_mesh.FaceMesh(
        max_num_faces=1,
        refine_landmarks=True,
        min_detection_confidence=0.6,
        min_tracking_confidence=0.6
    ) as mesh, \
    mp_face_detection.FaceDetection(
        model_selection=1,
        min_detection_confidence=0.6
    ) as face_detector:

        while True:
            success, frame = cap.read()
            if not success:
                break

            total_frames += 1

            if show_progress and total_frames % 30 == 0:
                progress = (total_frames / total_video_frames) * 100
                print(f"   Processing cheating detection: {progress:.1f}% ({total_frames}/{total_video_frames} frames)", end='\r')

            img_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            # FACE DETECTION
            detection_results = face_detector.process(img_rgb)
            face_confidence = 0.0
            num_faces = 0

            if detection_results.detections:
                num_faces = len(detection_results.detections)

                if num_faces > 1:
                    multiple_face_count += 1

                face_confidence = detection_results.detections[0].score[0] * 100

            face_counts.append(num_faces)

            # FACE MESH (Eye Tracking)
            mesh_results = mesh.process(img_rgb)
            is_frame_suspicious = False

            if mesh_results.multi_face_landmarks:
                landmarks = mesh_results.multi_face_landmarks[0].landmark

                # Eye Gaze Check
                left_ratio = get_gaze_ratio(LEFT_EYE, landmarks)
                right_ratio = get_gaze_ratio(RIGHT_EYE, landmarks)
                avg_gaze_ratio = (left_ratio + right_ratio) / 2

                if avg_gaze_ratio < EYE_RATIO_RIGHT_LIMIT or avg_gaze_ratio > EYE_RATIO_LEFT_LIMIT:
                    is_frame_suspicious = True
                    eye_fail_count += 1

                # Head Pose Check
                head_ratio = get_head_turn_ratio(landmarks)

                if not is_frame_suspicious:
                    if head_ratio < HEAD_TURN_LEFT_LIMIT or head_ratio > HEAD_TURN_RIGHT_LIMIT:
                        is_frame_suspicious = True
                        head_fail_count += 1

                gaze_ratios.append(avg_gaze_ratio)
                head_ratios.append(head_ratio)
                frame_numbers.append(total_frames)
                confidence_scores.append(face_confidence)
            else:
                is_frame_suspicious = True
                no_face_count += 1
                confidence_scores.append(0.0)

            # Multiple face = CHEATING
            if num_faces > 1:
                is_frame_suspicious = True

            if is_frame_suspicious:
                suspicious_frames += 1

    cap.release()

    if show_progress:
        print()

    cheating_score = 0
    if total_frames > 0:
        cheating_score = (suspicious_frames / total_frames) * 100

    multiple_face_pct = (multiple_face_count / total_frames) * 100 if total_frames > 0 else 0

    # Verdict
    verdict = "Safe"
    cheating_reasons = []

    if multiple_face_pct > 1.0:
        verdict = "High Risk"
        cheating_reasons.append(f"Multiple faces detected ({multiple_face_pct:.1f}% of frames)")
    elif cheating_score > SCORE_HIGH_RISK:
        verdict = "High Risk"
        cheating_reasons.append(f"High suspicious activity ({cheating_score:.1f}%)")
    elif cheating_score > SCORE_MEDIUM_RISK:
        verdict = "Medium Risk"
        cheating_reasons.append(f"Medium suspicious activity ({cheating_score:.1f}%)")

    duration = total_frames / fps if fps > 0 else 0

    avg_confidence = np.mean(confidence_scores) if confidence_scores else 0.0
    min_confidence = np.min(confidence_scores) if confidence_scores else 0.0
    max_confidence = np.max(confidence_scores) if confidence_scores else 0.0

    return {
        "status": "success",
        "total_frames": total_frames,
        "suspicious_frames": suspicious_frames,
        "cheating_score": round(cheating_score, 2),
        "verdict": verdict,
        "cheating_reasons": cheating_reasons,
        "duration_seconds": round(duration, 2),
        "fps": round(fps, 2),
        "confidence": {
            "average": round(avg_confidence, 2),
            "min": round(min_confidence, 2),
            "max": round(max_confidence, 2)
        },
        "details": {
            "eye_fails": eye_fail_count,
            "head_fails": head_fail_count,
            "no_face": no_face_count,
            "multiple_faces": multiple_face_count
        },
        "plot_data": {
            "gaze_ratios": gaze_ratios,
            "head_ratios": head_ratios,
            "frame_numbers": frame_numbers,
            "confidence_scores": confidence_scores,
            "face_counts": face_counts
        }
    }

In [73]:
def analyze_speaker_diarization(video_path: str):
    """
    Speaker Diarization - Deteksi berapa banyak orang yang bicara
    Menggunakan Resemblyzer untuk voice embeddings + clustering

    ‚úÖ FIXED: WebM audio extraction + CPU mode for cuDNN compatibility
    """
    try:
        from moviepy.editor import VideoFileClip
        import subprocess
        import torch

        # ‚úÖ FIX 1: Ensure CPU mode for Resemblyzer
        torch.set_num_threads(4)  # Optimize CPU performance

        # ‚úÖ FIX 2: Global VoiceEncoder with CPU device
        global voice_encoder
        if 'voice_encoder' not in globals() or voice_encoder is None:
            print("   Loading VoiceEncoder (CPU mode)...")
            from resemblyzer import VoiceEncoder
            voice_encoder = VoiceEncoder(device='cpu')  # ‚úÖ Force CPU
            print("   ‚úÖ VoiceEncoder loaded on CPU")

        encoder = voice_encoder

        # ‚úÖ FIX 3: Ensure AUDIO_DIR exists
        os.makedirs(AUDIO_DIR, exist_ok=True)
        temp_audio = os.path.join(AUDIO_DIR, "temp_audio_diarization.wav")

        print("   Extracting audio from video...")

        # ‚úÖ FIX 4: Try MoviePy first, fallback to FFmpeg for WebM
        audio_extracted = False

        # Method 1: Try MoviePy (works for most formats)
        try:
            video = VideoFileClip(video_path)

            if video.audio is None:
                print("   ‚ö†Ô∏è  MoviePy: No audio track detected")
                video.close()
            else:
                print(f"   ‚ÑπÔ∏è  Audio duration: {video.audio.duration:.2f}s")
                video.audio.write_audiofile(
                    temp_audio,
                    fps=16000,
                    nbytes=2,
                    codec='pcm_s16le',  # ‚úÖ Explicit codec for WebM
                    verbose=False,
                    logger=None
                )
                video.close()
                audio_extracted = True
                print("   ‚úÖ Audio extracted via MoviePy")
        except Exception as moviepy_error:
            print(f"   ‚ö†Ô∏è  MoviePy extraction failed: {str(moviepy_error)[:100]}")
            print("   üîÑ Trying FFmpeg direct extraction...")

        # Method 2: Fallback to FFmpeg direct (better for WebM)
        if not audio_extracted:
            try:
                # ‚úÖ FIX 5: Direct FFmpeg extraction (better for WebM/Opus)
                ffmpeg_cmd = [
                    'ffmpeg',
                    '-i', video_path,
                    '-vn',  # No video
                    '-acodec', 'pcm_s16le',  # Convert to PCM
                    '-ar', '16000',  # 16kHz sample rate
                    '-ac', '1',  # Mono
                    '-y',  # Overwrite
                    temp_audio
                ]

                result = subprocess.run(
                    ffmpeg_cmd,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE,
                    timeout=60
                )

                if result.returncode == 0 and os.path.exists(temp_audio):
                    audio_extracted = True
                    print("   ‚úÖ Audio extracted via FFmpeg")
                else:
                    stderr_msg = result.stderr.decode()[:200] if result.stderr else "No error message"
                    print(f"   ‚ùå FFmpeg failed: {stderr_msg}")

            except FileNotFoundError:
                print("   ‚ùå FFmpeg not found in PATH")
                print("   üí° Install FFmpeg: https://ffmpeg.org/download.html")
            except subprocess.TimeoutExpired:
                print("   ‚ùå FFmpeg extraction timeout (>60s)")
            except Exception as ffmpeg_error:
                print(f"   ‚ùå FFmpeg extraction error: {str(ffmpeg_error)[:100]}")

        # ‚úÖ FIX 6: Check if audio was extracted
        if not audio_extracted:
            return {
                "status": "no_audio",
                "message": "Failed to extract audio from video (WebM may require FFmpeg)",
                "num_speakers": 1,
                "is_cheating": False,
                "confidence": 50,
                "silhouette_score": 0,
                "total_segments": 0
            }

        # ‚úÖ FIX 7: Validate extracted audio file
        if not os.path.exists(temp_audio):
            print("   ‚ùå Audio file not created")
            return {
                "status": "extraction_failed",
                "message": "Audio extraction failed - file not created",
                "num_speakers": 1,
                "is_cheating": False,
                "confidence": 50,
                "silhouette_score": 0,
                "total_segments": 0
            }

        audio_size = os.path.getsize(temp_audio)
        print(f"   ‚ÑπÔ∏è  Audio file size: {audio_size / 1024:.1f} KB")

        if audio_size < 1000:  # Less than 1KB
            print("   ‚ö†Ô∏è  Audio file too small - likely empty")
            os.remove(temp_audio)
            return {
                "status": "audio_too_small",
                "message": "Extracted audio file is too small (empty or corrupt)",
                "num_speakers": 1,
                "is_cheating": False,
                "confidence": 50,
                "silhouette_score": 0,
                "total_segments": 0
            }

        print("   Loading and preprocessing audio...")
        try:
            from resemblyzer import preprocess_wav
            wav = preprocess_wav(temp_audio)
        except Exception as preprocess_error:
            print(f"   ‚ùå Preprocessing error: {str(preprocess_error)[:100]}")
            if os.path.exists(temp_audio):
                os.remove(temp_audio)
            return {
                "status": "preprocessing_failed",
                "message": f"Audio preprocessing failed: {str(preprocess_error)[:100]}",
                "num_speakers": 1,
                "is_cheating": False,
                "confidence": 50,
                "silhouette_score": 0,
                "total_segments": 0
            }

        if wav is None or len(wav) == 0:
            print("   ‚ö†Ô∏è  Preprocessed audio is empty")
            if os.path.exists(temp_audio):
                os.remove(temp_audio)
            return {
                "status": "audio_empty",
                "message": "Preprocessed audio is empty",
                "num_speakers": 1,
                "is_cheating": False,
                "confidence": 50,
                "silhouette_score": 0,
                "total_segments": 0
            }

        print(f"   ‚ÑπÔ∏è  Audio samples: {len(wav)} ({len(wav)/16000:.2f}s)")

        print("   Extracting speaker embeddings...")
        segment_duration = 0.5
        sample_rate = 16000
        segment_samples = int(segment_duration * sample_rate)

        embeddings = []
        timestamps = []

        step = segment_samples // 2
        for i in range(0, len(wav) - segment_samples, step):
            segment = wav[i:i + segment_samples]
            if len(segment) == segment_samples:
                # ‚úÖ Ensure CPU processing
                with torch.no_grad():  # Disable gradient for inference
                    embed = encoder.embed_utterance(segment)
                embeddings.append(embed)
                timestamps.append(i / sample_rate)

        embeddings = np.array(embeddings)

        print(f"   Analyzing {len(embeddings)} audio segments...")

        if len(embeddings) == 0:
            print("   ‚ö†Ô∏è  No valid audio segments extracted (audio too short)")
            if os.path.exists(temp_audio):
                os.remove(temp_audio)
            return {
                "status": "no_segments",
                "message": "Audio too short - no valid segments (need >0.5s)",
                "num_speakers": 1,
                "is_cheating": False,
                "confidence": 50,
                "silhouette_score": 0,
                "total_segments": 0
            }

        if len(embeddings) < 2:
            num_speakers = 1 if len(embeddings) > 0 else 0
            confidence_score = 60.0
            silhouette = 0.0
        else:
            best_n_speakers = 1
            best_score = -1

            for n in range(2, min(6, len(embeddings))):
                clustering = AgglomerativeClustering(n_clusters=n, linkage='average')
                labels = clustering.fit_predict(embeddings)

                from sklearn.metrics import silhouette_score
                score = silhouette_score(embeddings, labels)

                if score > best_score and score > 0.2:
                    best_score = score
                    best_n_speakers = n

            if best_score > 0.2:
                num_speakers = best_n_speakers
                silhouette = best_score

                if silhouette >= 0.7:
                    confidence_score = 95.0
                elif silhouette >= 0.5:
                    confidence_score = 85.0
                elif silhouette >= 0.35:
                    confidence_score = 75.0
                else:
                    confidence_score = 60.0
            else:
                num_speakers = 1
                silhouette = best_score

                if best_score < 0.1:
                    confidence_score = 90.0
                elif best_score < 0.15:
                    confidence_score = 80.0
                else:
                    confidence_score = 70.0

        # Cleanup
        if os.path.exists(temp_audio):
            os.remove(temp_audio)

        is_cheating = num_speakers > 1

        print(f"   ‚úì Detected {num_speakers} distinct speaker(s)")
        print(f"   üìä Confidence Score: {confidence_score:.1f}%")
        print(f"   üìà Silhouette Score: {silhouette:.3f}")
        print(f"   üîç Total segments: {len(embeddings)}")

        if is_cheating:
            print(f"   ‚ö†Ô∏è  WARNING: Multiple speakers detected!")

        return {
            "status": "success",
            "num_speakers": num_speakers,
            "total_segments": len(embeddings),
            "is_cheating": is_cheating,
            "confidence": round(confidence_score, 2),
            "silhouette_score": round(silhouette, 3),
            "message": f"Detected {num_speakers} distinct speaker(s) with {confidence_score:.1f}% confidence"
        }

    except Exception as e:
        print(f"   ‚ùå Speaker diarization error: {e}")
        import traceback
        print(f"   üìã Traceback:")
        traceback.print_exc()

        # Cleanup on error
        temp_audio_path = os.path.join(AUDIO_DIR, "temp_audio_diarization.wav")
        if os.path.exists(temp_audio_path):
            try:
                os.remove(temp_audio_path)
            except:
                pass

        return {
            "status": "error",
            "message": str(e)[:200],
            "num_speakers": 1,
            "is_cheating": False,
            "confidence": 50,
            "silhouette_score": 0,
            "total_segments": 0
        }

In [74]:
def comprehensive_cheating_detection(video_path: str):
    """
    Comprehensive Cheating Detection:
    - Visual: Face detection, eye gaze, head pose, multiple faces
    - Audio: Speaker diarization (multiple speakers)

    Expected for single-person interview: 1 face + 1 speaker
    Cheating if: >1 face OR >1 speaker

    Returns:
        dict: {
            visual: {cheating_score, suspicious_frames, cheating_reasons, confidence},
            audio: {num_speakers, confidence, silhouette_score},
            final_verdict: "Safe" | "Medium Risk" | "High Risk",
            final_avg_confidence: float,
            all_indicators: [...]
        }
    """
    print(f"\n{'='*60}")
    print(f"üéØ COMPREHENSIVE CHEATING DETECTION")
    print(f"   (Video Interview - Expected: 1 Person)")
    print(f"{'='*60}\n")

    # 1. VISUAL ANALYSIS
    print("üëÅÔ∏è  STEP 1: Visual Analysis (Face Detection)")
    print("-" * 60)
    visual_result = analyze_video_cheating_detection(video_path, show_progress=True)

    # 2. SPEAKER DIARIZATION
    print("\nüîä STEP 2: Speaker Diarization (Voice Analysis)")
    print("-" * 60)
    audio_result = analyze_speaker_diarization(video_path)

    # 3. COMBINE RESULTS
    print("\nüìä COMBINING RESULTS...")
    print("-" * 60)

    final_verdict = visual_result['verdict']
    all_indicators = visual_result.get('cheating_reasons', []).copy()

    # Add speaker diarization indicators
    if audio_result.get('status') == 'success':
        num_speakers = audio_result.get('num_speakers', 0)

        if audio_result.get('is_cheating'):
            all_indicators.append(
                f"Multiple speakers detected ({num_speakers} different voices, confidence: {audio_result.get('confidence', 0):.1f}%)"
            )
            final_verdict = "High Risk"

    # Calculate final average confidence
    visual_confidence = visual_result.get('confidence', {}).get('average', 0)
    audio_confidence = audio_result.get('confidence', 0)
    final_avg_confidence = round((visual_confidence + audio_confidence) / 2, 2)

    # Print final result
    print(f"\n{'='*60}")
    print(f"üéØ FINAL VERDICT: {final_verdict}")
    print(f"{'='*60}")

    if all_indicators:
        print(f"\n‚ö†Ô∏è  Cheating Indicators Found:")
        for i, indicator in enumerate(all_indicators, 1):
            print(f"   {i}. {indicator}")
    else:
        print("\n‚úÖ No suspicious activity detected")
        print("   ‚úì Single person detected (visual)")
        if audio_result.get('status') == 'success':
            print(f"   ‚úì Single speaker detected (audio, confidence: {audio_result.get('confidence', 0):.1f}%)")

    print(f"\nüìã Summary:")
    print(f"   Visual:")
    print(f"     ‚Ä¢ Confidence Score: {visual_confidence:.2f}%")
    print(f"     ‚Ä¢ Cheating Score: {visual_result['cheating_score']:.2f}%")
    print(f"     ‚Ä¢ Suspicious Frames: {visual_result['suspicious_frames']}")

    if audio_result.get('status') == 'success':
        print(f"   Audio:")
        print(f"     ‚Ä¢ Confidence Score: {audio_confidence:.2f}%")
        print(f"     ‚Ä¢ Number of Speakers: {audio_result['num_speakers']}")
        print(f"     ‚Ä¢ Silhouette Score: {audio_result.get('silhouette_score', 0):.3f}")

    print(f"\n   üéØ Final Average Confidence: {final_avg_confidence:.2f}%")
    print(f"{'='*60}\n")

    # Return dengan format yang lebih ringkas + final_avg_confidence
    return {
        "visual": {
            "cheating_score": visual_result.get('cheating_score', 0),
            "suspicious_frames": visual_result.get('suspicious_frames', 0),
            "cheating_reasons": visual_result.get('cheating_reasons', []),
            "confidence": visual_result.get('confidence', {
                "average": 0,
                "min": 0,
                "max": 0
            })
        },
        "audio": {
            "num_speakers": audio_result.get('num_speakers', 0),
            "confidence": audio_result.get('confidence', 0),
            "silhouette_score": audio_result.get('silhouette_score', 0)
        },
        "final_verdict": final_verdict,
        "final_avg_confidence": final_avg_confidence,
        "all_indicators": all_indicators
    }


print('‚úÖ Cheating detection functions loaded')
print('   ‚Ä¢ analyze_video_cheating_detection() - Visual analysis')
print('   ‚Ä¢ analyze_speaker_diarization() - Audio analysis')
print('   ‚Ä¢ comprehensive_cheating_detection() - Full detection (with final_avg_confidence)\n')

‚úÖ Cheating detection functions loaded
   ‚Ä¢ analyze_video_cheating_detection() - Visual analysis
   ‚Ä¢ analyze_speaker_diarization() - Audio analysis
   ‚Ä¢ comprehensive_cheating_detection() - Full detection (with final_avg_confidence)



In [75]:
def aggregate_cheating_results(assessment_results: List[dict]):
    """
    Aggregate cheating detection results dari assessment_results

    """
    if not assessment_results:
        return {
            "avg_cheating_score": 0,
            "avg_visual_confidence": 0,
            "avg_audio_confidence": 0,
            "avg_overall_confidence": 0,
            "total_suspicious_frames": 0,
            "avg_silhouette_score": 0,
            "verdict_distribution": {"Safe": 0, "Medium Risk": 0, "High Risk": 0},
            "final_aggregate_verdict": "No Data",
            "risk_level": "Unknown",
            "questions_with_issues": [],
            "all_indicators": [],
            "summary": "No assessment data available for cheating analysis"
        }

    print(f"\n{'='*60}")
    print(f"üìä AGGREGATING CHEATING DETECTION RESULTS")
    print(f"   Total Questions: {len(assessment_results)}")
    print(f"{'='*60}\n")

    # Initialize accumulators
    total_cheating_score = 0
    total_visual_confidence = 0
    total_audio_confidence = 0
    total_overall_confidence = 0
    total_suspicious_frames = 0
    total_silhouette = 0

    verdict_counts = {
        "Safe": 0,
        "Medium Risk": 0,
        "High Risk": 0
    }

    all_indicators = []
    questions_with_issues = []

    valid_audio_count = 0
    valid_cheating_count = 0

    # Aggregate data from all assessment results
    for idx, assessment in enumerate(assessment_results, 1):
        question_id = assessment.get("id", f"question_{idx}")
        question_text = assessment.get("question", "Unknown question")
        result = assessment.get("result", {})

        # Extract cheating detection from result
        cheating_detection = result.get("cheating_detection", {})

        if not cheating_detection:
            continue

        valid_cheating_count += 1

        # Visual metrics
        visual = cheating_detection.get("visual", {})
        cheating_score = visual.get("cheating_score", 0)
        suspicious_frames = visual.get("suspicious_frames", 0)
        visual_conf = visual.get("confidence", {})
        visual_avg_conf = visual_conf.get("average", 0) if isinstance(visual_conf, dict) else 0

        total_cheating_score += cheating_score
        total_visual_confidence += visual_avg_conf
        total_suspicious_frames += suspicious_frames

        # Audio metrics
        audio = cheating_detection.get("audio", {})
        audio_confidence = audio.get("confidence", 0)
        audio_silhouette = audio.get("silhouette_score", 0)
        audio_speakers = audio.get("num_speakers", 0)

        if audio_speakers > 0:  # Valid audio analysis
            total_audio_confidence += audio_confidence
            total_silhouette += audio_silhouette
            valid_audio_count += 1

        # Overall confidence
        total_overall_confidence += cheating_detection.get("final_avg_confidence", 0)

        # Verdict distribution
        verdict = cheating_detection.get("final_verdict", "Safe")
        if verdict in verdict_counts:
            verdict_counts[verdict] += 1

        # Collect indicators
        indicators = cheating_detection.get("all_indicators", [])
        if indicators:
            questions_with_issues.append({
                "question_id": question_id,
                "question": question_text,
                "verdict": verdict,
                "cheating_score": cheating_score,
                "visual_confidence": visual_avg_conf,
                "audio_confidence": audio_confidence,
                "num_speakers": audio_speakers,
                "indicators": indicators
            })
            for indicator in indicators:
                all_indicators.append({
                    "question_id": question_id,
                    "question": question_text,
                    "indicator": indicator
                })

    if valid_cheating_count == 0:
        return {
            "avg_cheating_score": 0,
            "avg_visual_confidence": 0,
            "avg_audio_confidence": 0,
            "avg_overall_confidence": 0,
            "total_suspicious_frames": 0,
            "avg_silhouette_score": 0,
            "verdict_distribution": verdict_counts,
            "final_aggregate_verdict": "No Data",
            "risk_level": "Unknown",
            "questions_with_issues": [],
            "all_indicators": [],
            "summary": "No valid cheating detection data found in assessment results"
        }

    # Calculate averages
    avg_cheating_score = round(total_cheating_score / valid_cheating_count, 2)
    avg_visual_confidence = round(total_visual_confidence / valid_cheating_count, 2)
    avg_audio_confidence = round(total_audio_confidence / valid_audio_count, 2) if valid_audio_count > 0 else 0
    avg_overall_confidence = round(total_overall_confidence / valid_cheating_count, 2)
    avg_silhouette_score = round(total_silhouette / valid_audio_count, 3) if valid_audio_count > 0 else 0

    # Determine final aggregate verdict
    high_risk_count = verdict_counts.get("High Risk", 0)
    medium_risk_count = verdict_counts.get("Medium Risk", 0)
    safe_count = verdict_counts.get("Safe", 0)

    # Logic: If ANY question is High Risk OR >50% are Medium+ Risk, verdict is High Risk
    if high_risk_count > 0 or (medium_risk_count + high_risk_count) > valid_cheating_count / 2:
        final_aggregate_verdict = "High Risk"
        risk_level = "Critical"
    elif medium_risk_count > 0:
        final_aggregate_verdict = "Medium Risk"
        risk_level = "Warning"
    else:
        final_aggregate_verdict = "Safe"
        risk_level = "Clear"

    # Generate summary
    summary_parts = []
    summary_parts.append(f"Analyzed {valid_cheating_count} question(s) for cheating detection.")
    summary_parts.append(f"Average cheating score: {avg_cheating_score}%.")
    summary_parts.append(f"Overall confidence: {avg_overall_confidence}%.")

    if high_risk_count > 0:
        summary_parts.append(f"‚ö†Ô∏è {high_risk_count} question(s) flagged as HIGH RISK.")
    if medium_risk_count > 0:
        summary_parts.append(f"‚ö†Ô∏è {medium_risk_count} question(s) flagged as MEDIUM RISK.")
    if safe_count == valid_cheating_count:
        summary_parts.append(f"‚úÖ All questions passed cheating detection.")

    if len(all_indicators) > 0:
        summary_parts.append(f"Total of {len(all_indicators)} cheating indicator(s) detected.")

    summary = " ".join(summary_parts)

    # Print results
    print(f"üìä Aggregate Metrics:")
    print(f"   ‚Ä¢ Average Cheating Score: {avg_cheating_score}%")
    print(f"   ‚Ä¢ Average Visual Confidence: {avg_visual_confidence}%")
    print(f"   ‚Ä¢ Average Audio Confidence: {avg_audio_confidence}%")
    print(f"   ‚Ä¢ Average Overall Confidence: {avg_overall_confidence}%")
    print(f"   ‚Ä¢ Total Suspicious Frames: {total_suspicious_frames}")
    if valid_audio_count > 0:
        print(f"   ‚Ä¢ Average Silhouette Score: {avg_silhouette_score}")

    print(f"\nüìà Verdict Distribution:")
    print(f"   ‚Ä¢ Safe: {safe_count}")
    print(f"   ‚Ä¢ Medium Risk: {medium_risk_count}")
    print(f"   ‚Ä¢ High Risk: {high_risk_count}")

    print(f"\nüéØ FINAL AGGREGATE VERDICT: {final_aggregate_verdict} ({risk_level})")

    if questions_with_issues:
        print(f"\n‚ö†Ô∏è  Questions with Issues ({len(questions_with_issues)}):")
        for q_issue in questions_with_issues:
            print(f"   ‚Ä¢ Q{q_issue['question_id']}: {q_issue['verdict']} (Cheating: {q_issue['cheating_score']}%)")
            for indicator in q_issue['indicators']:
                print(f"      - {indicator}")

    print(f"\n{'='*60}\n")

    return {
        "avg_cheating_score": avg_cheating_score,
        "avg_visual_confidence": avg_visual_confidence,
        "avg_audio_confidence": avg_audio_confidence,
        "avg_overall_confidence": avg_overall_confidence,
        "total_suspicious_frames": total_suspicious_frames,
        "avg_silhouette_score": avg_silhouette_score,
        "verdict_distribution": verdict_counts,
        "final_aggregate_verdict": final_aggregate_verdict,
        "risk_level": risk_level,
        "questions_with_issues": questions_with_issues,
        "all_indicators": all_indicators,
        "summary": summary
    }


print('‚úÖ Aggregate function loaded (simplified return)')
print('   ‚Ä¢ aggregate_cheating_results() - Simplified output with essential fields only\n')

‚úÖ Aggregate function loaded (simplified return)
   ‚Ä¢ aggregate_cheating_results() - Simplified output with essential fields only



<b><h2> Fungsi Analisis Non Verbal

In [76]:
# ====== OPTIMIZATION CONFIGURATION ======
FRAME_SKIP = 5
MAX_FRAMES = 300
EARLY_EXIT_THRESHOLD = 30
MIN_DETECTION_CONFIDENCE = 0.6
MIN_TRACKING_CONFIDENCE = 0.6
CALIBRATION_FRAMES = 60
USE_CALIBRATION = True

# ====== OPTIMIZED STATS - Adjusted untuk meningkatkan confidence ======
# Strategi: Perlebar SD untuk mengurangi extreme z-scores, tingkatkan reliability
STATS = {
    "blink_rate_per_minute": {
        "mean": 17,
        "sd": 10,  # Dari 8 ‚Üí 10 (lebih toleran terhadap variasi)
        "reliability": 0.88  # Dari 0.82 ‚Üí 0.88
    },
    "eye_contact_percentage": {
        "mean": 65,
        "sd": 20,  # Dari 18 ‚Üí 20
        "reliability": 0.84  # Dari 0.78 ‚Üí 0.84
    },
    "average_smile_intensity": {
        "mean": 0.18,
        "sd": 0.14,  # Dari 0.12 ‚Üí 0.14
        "reliability": 0.78  # Dari 0.71 ‚Üí 0.78
    },
    "eyebrow_movement_range": {
        "mean": 0.025,
        "sd": 0.018,  # Dari 0.015 ‚Üí 0.018
        "reliability": 0.75  # Dari 0.68 ‚Üí 0.75
    },
    "head_movement_intensity": {
        "mean": 0.5,
        "sd": 0.30,  # Dari 0.25 ‚Üí 0.30
        "reliability": 0.82  # Dari 0.75 ‚Üí 0.82
    },
    "speaking_ratio": {
        "mean": 0.58,
        "sd": 0.22,  # Dari 0.18 ‚Üí 0.22
        "reliability": 0.90  # Dari 0.85 ‚Üí 0.90 (metrik paling reliable)
    },
    "speech_rate_wpm": {
        "mean": 145,
        "sd": 30,  # Dari 25 ‚Üí 30
        "reliability": 0.92  # Dari 0.88 ‚Üí 0.92 (metrik paling reliable)
    }
}

# ====== OPTIMIZED WEIGHTS - Fokus pada metrik high-reliability ======
# Strategi: Berikan bobot lebih besar pada metrik dengan reliability tinggi
WEIGHTS = {
    "speech_rate_wpm": 0.26,        # ‚Üë dari 0.22 (reliability 0.92)
    "speaking_ratio": 0.24,         # ‚Üë dari 0.21 (reliability 0.90)
    "blink_rate_per_minute": 0.18,  # ‚Üë dari 0.16 (reliability 0.88)
    "eye_contact_percentage": 0.16, # ‚Üë dari 0.15 (reliability 0.84)
    "head_movement_intensity": 0.10,# ‚Üì dari 0.12 (reliability 0.82)
    "average_smile_intensity": 0.04,# ‚Üì dari 0.09 (reliability 0.78)
    "eyebrow_movement_range": 0.02  # ‚Üì dari 0.05 (reliability 0.75)
}

In [77]:
# ============================================================
# OPTIMIZED VIDEO/AUDIO PROCESSING
# ============================================================

def extract_audio_fixed(video_path, audio_output_path="temp_audio.wav"):
    """Ekstrak audio menggunakan FFmpeg dengan optimasi"""
    try:
        print(f"   ‚è≥ Mengekstrak audio dari {video_path}...")

        command = [
            'ffmpeg',
            '-i', video_path,
            '-vn',
            '-acodec', 'pcm_s16le',
            '-ar', '16000',  # Turunkan dari 44100 ke 16000 (cukup untuk speech)
            '-ac', '1',      # Mono, bukan stereo
            '-y',
            audio_output_path
        ]

        result = subprocess.run(
            command,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )

        if os.path.exists(audio_output_path):
            print(f"   ‚úÖ Audio berhasil diekstrak: {audio_output_path}")
            return audio_output_path
        else:
            raise Exception("Audio extraction failed")

    except Exception as e:
        print(f"   ‚ùå Error ekstraksi audio: {str(e)}")
        return None

In [78]:
def analyze_speech_tempo(audio_path):
    """Speech analysis dengan error handling"""
    try:
        audio = AudioSegment.from_file(audio_path)

        nonsilent_ranges = detect_nonsilent(
            audio,
            min_silence_len=500,
            silence_thresh=-40
        )

        total_speaking_time = sum([(end - start) for start, end in nonsilent_ranges]) / 1000
        total_duration = len(audio) / 1000
        num_pauses = len(nonsilent_ranges) - 1

        estimated_words = total_speaking_time * 2.5
        speech_rate = (estimated_words / total_speaking_time) * 60 if total_speaking_time > 0 else 0

        return {
            "total_duration_seconds": round(total_duration, 2),
            "speaking_time_seconds": round(total_speaking_time, 2),
            "silence_time_seconds": round(total_duration - total_speaking_time, 2),
            "number_of_pauses": num_pauses,
            "speech_rate_wpm": round(speech_rate, 2),
            "speaking_ratio": round(total_speaking_time / total_duration, 2) if total_duration > 0 else 0
        }
    except Exception as e:
        print(f"   ‚ö†Ô∏è Speech analysis error: {e}")
        return {
            "total_duration_seconds": 0,
            "speaking_time_seconds": 0,
            "silence_time_seconds": 0,
            "number_of_pauses": 0,
            "speech_rate_wpm": 0,
            "speaking_ratio": 0
        }

In [79]:
def analyze_facial_expressions(video_path):
    """OPTIMIZED: Frame skipping, early exit, simplified tracking + CALIBRATION"""
    mp_face_mesh = mp.solutions.face_mesh

    face_mesh = mp_face_mesh.FaceMesh(
        static_image_mode=False,
        max_num_faces=1,
        min_detection_confidence=MIN_DETECTION_CONFIDENCE,
        min_tracking_confidence=MIN_TRACKING_CONFIDENCE,
        refine_landmarks=False  # ‚ö° CRITICAL: Matikan iris tracking
    )

    cap = cv2.VideoCapture(video_path)

    fps = cap.get(cv2.CAP_PROP_FPS) or 30
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    print(f"   üìπ Video: {total_frames} frames @ {fps} FPS")
    print(f"   ‚ö° Processing every {FRAME_SKIP} frames (max {MAX_FRAMES} frames)")

    expression_data = {
        "smile_intensity": [],
        "eyebrow_movement": [],
        "head_pose": []
    }

    # üéØ CALIBRATION: Simpan data awal untuk baseline
    calibration_data = {
        "smile_intensity": [],
        "eyebrow_movement": []
    }

    frame_count = 0
    processed_count = 0
    no_face_count = 0
    is_calibration_phase = USE_CALIBRATION

    while cap.isOpened() and processed_count < MAX_FRAMES:
        ret, frame = cap.read()
        if not ret:
            break

        frame_count += 1

        if frame_count % FRAME_SKIP != 0:
            continue

        if no_face_count >= EARLY_EXIT_THRESHOLD:
            print(f"   ‚ö†Ô∏è No face detected for {EARLY_EXIT_THRESHOLD} consecutive frames, stopping...")
            break

        frame = cv2.resize(frame, (640, 480))
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = face_mesh.process(rgb_frame)

        if results.multi_face_landmarks:
            no_face_count = 0
            landmarks = results.multi_face_landmarks[0]

            left_mouth = landmarks.landmark[61]
            right_mouth = landmarks.landmark[291]
            smile_width = abs(right_mouth.x - left_mouth.x)

            left_eyebrow = landmarks.landmark[70]
            right_eyebrow = landmarks.landmark[300]
            eyebrow_height = (left_eyebrow.y + right_eyebrow.y) / 2

            nose_tip = landmarks.landmark[1]

            # üéØ CALIBRATION PHASE: Kumpulkan baseline data
            if is_calibration_phase and processed_count < CALIBRATION_FRAMES:
                calibration_data["smile_intensity"].append(smile_width)
                calibration_data["eyebrow_movement"].append(eyebrow_height)

                if processed_count == CALIBRATION_FRAMES - 1:
                    print(f"   ‚úÖ Calibration complete using {CALIBRATION_FRAMES} frames")
                    is_calibration_phase = False

            # Simpan data normal
            expression_data["smile_intensity"].append(smile_width)
            expression_data["eyebrow_movement"].append(eyebrow_height)
            expression_data["head_pose"].append({
                "x": nose_tip.x,
                "y": nose_tip.y,
                "z": nose_tip.z
            })

            processed_count += 1
        else:
            no_face_count += 1

        if processed_count % 20 == 0 and processed_count > 0:
            print(f"   ... processed {processed_count} frames")

    cap.release()
    face_mesh.close()

    if len(expression_data["smile_intensity"]) == 0:
        print("   ‚ö†Ô∏è No face detected in entire video")
        return {
            "average_smile_intensity": 0,
            "smile_variation": 0,
            "eyebrow_movement_range": 0,
            "total_frames_analyzed": frame_count,
            "face_detected_percentage": 0,
            "calibration_applied": False
        }

    # üéØ APPLY CALIBRATION: Normalize berdasarkan baseline
    baseline_smile = np.mean(calibration_data["smile_intensity"]) if calibration_data["smile_intensity"] else 0
    baseline_eyebrow = np.mean(calibration_data["eyebrow_movement"]) if calibration_data["eyebrow_movement"] else 0

    calibration_applied = USE_CALIBRATION and len(calibration_data["smile_intensity"]) > 0

    if calibration_applied:
        # Normalize: subtract baseline untuk mengukur perubahan dari neutral state
        calibrated_smiles = [abs(s - baseline_smile) for s in expression_data["smile_intensity"]]
        calibrated_eyebrows = [abs(e - baseline_eyebrow) for e in expression_data["eyebrow_movement"]]

        print(f"   üéØ Calibration baseline - Smile: {baseline_smile:.4f}, Eyebrow: {baseline_eyebrow:.4f}")

        return {
            "average_smile_intensity": round(np.mean(calibrated_smiles), 4),
            "smile_variation": round(np.std(calibrated_smiles), 4),
            "eyebrow_movement_range": round(np.std(calibrated_eyebrows), 4),
            "baseline_smile_intensity": round(baseline_smile, 4),
            "baseline_eyebrow_position": round(baseline_eyebrow, 4),
            "total_frames_analyzed": frame_count,
            "face_detected_percentage": round(len(expression_data["smile_intensity"]) / (frame_count / FRAME_SKIP) * 100, 2),
            "calibration_applied": True
        }
    else:
        return {
            "average_smile_intensity": round(np.mean(expression_data["smile_intensity"]), 4),
            "smile_variation": round(np.std(expression_data["smile_intensity"]), 4),
            "eyebrow_movement_range": round(np.std(expression_data["eyebrow_movement"]), 4),
            "total_frames_analyzed": frame_count,
            "face_detected_percentage": round(len(expression_data["smile_intensity"]) / (frame_count / FRAME_SKIP) * 100, 2),
            "calibration_applied": False
        }

In [80]:
def analyze_eye_movement(video_path):
    mp_face_mesh = mp.solutions.face_mesh
    face_mesh = mp_face_mesh.FaceMesh(
        static_image_mode=False,
        max_num_faces=1,
        refine_landmarks=True  # Penting untuk deteksi iris
    )

    cap = cv2.VideoCapture(video_path)

    eye_data = {
        "gaze_positions": [],
        "blink_count": 0,
        "eye_contact_percentage": 0
    }

    prev_eye_closed = False
    frame_count = 0
    direct_gaze_count = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_count += 1
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = face_mesh.process(rgb_frame)

        if results.multi_face_landmarks:
            landmarks = results.multi_face_landmarks[0]

            # Eye landmarks (mata kiri: 33, 133; mata kanan: 362, 263)
            left_eye_top = landmarks.landmark[159]
            left_eye_bottom = landmarks.landmark[145]
            right_eye_top = landmarks.landmark[386]
            right_eye_bottom = landmarks.landmark[374]

            # Deteksi kedipan (Eye Aspect Ratio)
            left_eye_height = abs(left_eye_top.y - left_eye_bottom.y)
            right_eye_height = abs(right_eye_top.y - right_eye_bottom.y)
            avg_eye_height = (left_eye_height + right_eye_height) / 2

            # Threshold untuk mata tertutup
            eye_closed = avg_eye_height < 0.01

            if eye_closed and not prev_eye_closed:
                eye_data["blink_count"] += 1

            prev_eye_closed = eye_closed

            # Iris tracking untuk gaze direction
            # Iris center landmarks: 468-473
            if len(landmarks.landmark) > 473:
                left_iris = landmarks.landmark[468]
                right_iris = landmarks.landmark[473]

                # Simpan posisi gaze
                gaze_x = (left_iris.x + right_iris.x) / 2
                gaze_y = (left_iris.y + right_iris.y) / 2
                eye_data["gaze_positions"].append({"x": gaze_x, "y": gaze_y})

                # Deteksi eye contact (gaze ke tengah frame)
                if 0.4 < gaze_x < 0.6 and 0.3 < gaze_y < 0.7:
                    direct_gaze_count += 1

    cap.release()

    if frame_count > 0:
        eye_data["eye_contact_percentage"] = round((direct_gaze_count / frame_count) * 100, 2)
        eye_data["blink_rate_per_minute"] = round((eye_data["blink_count"] / frame_count) * (30 * 60), 2)

    return {
        "total_blinks": eye_data["blink_count"],
        "blink_rate_per_minute": eye_data.get("blink_rate_per_minute", 0),
        "eye_contact_percentage": eye_data["eye_contact_percentage"],
        "gaze_stability": round(np.std([g["x"] for g in eye_data["gaze_positions"]]), 4) if eye_data["gaze_positions"] else 0
    }

In [81]:
def score_conf(metric_name, value):
    """Hitung z-score dan confidence dengan uncertainty adjustment"""
    if metric_name not in STATS:
        return 0, 0, 0

    mean = STATS[metric_name]["mean"]
    sd = STATS[metric_name]["sd"]
    reliability = STATS[metric_name]["reliability"]

    z = (value - mean) / sd
    base_conf = math.exp(-(z**2) / 2)
    adjusted_conf = base_conf * reliability
    uncertainty = (1 - reliability) * 100

    return z, adjusted_conf, uncertainty

In [82]:
def interpret_non_verbal_analysis(analysis_json):
    """Interpretasi hasil analisis non-verbal dalam format sederhana"""
    interpretations = {}

    # Analisis bicara
    speech = analysis_json.get("speech_analysis", {})
    if speech:
        speaking_ratio = speech.get("speaking_ratio", 0) or speech.get("avg_speaking_ratio", 0)
        pauses = speech.get("number_of_pauses", 0) or speech.get("avg_pauses", 0)
        rate = speech.get("speech_rate_wpm", 0) or speech.get("avg_speech_rate", 0)

        if speaking_ratio > 0.65:
            speaking_label = "very active"
        elif speaking_ratio > 0.5:
            speaking_label = "fairly active"
        else:
            speaking_label = "least active"

        if pauses > 40:
            pause_label = "frequent pauses"
        elif pauses > 25:
            pause_label = "normal"
        else:
            pause_label = "fluent"

        if 135 <= rate <= 165:
            rate_label = "ideal"
        elif rate > 165:
            rate_label = "fast"
        else:
            rate_label = "slow"

        interpretations["speech_analysis"] = (
            f"speaking ratio {speaking_ratio:.2f} ({speaking_label}), "
            f"pauses {pauses} ({pause_label}), "
            f"speech rate {rate} wpm ({rate_label})"
        )

    # Analisis ekspresi wajah
    facial = analysis_json.get("facial_expression_analysis", {})
    if facial:
        smile_intensity = facial.get("average_smile_intensity", 0) or facial.get("avg_smile_intensity", 0)
        eyebrow_range = facial.get("eyebrow_movement_range", 0) or facial.get("avg_eyebrow_movement_range", 0)

        if eyebrow_range > 0.035:
            eyebrow_label = "expressive"
        elif eyebrow_range > 0.018:
            eyebrow_label = "natural"
        else:
            eyebrow_label = "controlled"

        if smile_intensity > 0.25:
            smile_label = "positive"
        elif smile_intensity > 0.12:
            smile_label = "friendly"
        else:
            smile_label = "neutral"

        interpretations["facial_expression_analysis"] = (
            f"smile intensity = {smile_intensity:.2f} ({smile_label}), "
            f"eyebrow movement = {eyebrow_range:.3f} ({eyebrow_label})"
        )

    # Analisis gerakan mata
    eye = analysis_json.get("eye_movement_analysis", {})
    if eye:
        blink_rate = eye.get("blink_rate_per_minute", 0) or eye.get("avg_blink_rate", 0)
        eye_contact = eye.get("eye_contact_percentage", 0) or eye.get("avg_eye_contact", 0)

        if eye_contact > 75:
            contact_label = "very good"
        elif eye_contact > 55:
            contact_label = "good"
        else:
            contact_label = "needs improvement"

        if blink_rate > 25:
            blink_label = "high"
        elif blink_rate > 10:
            blink_label = "normal"
        else:
            blink_label = "low"

        interpretations["eye_movement_analysis"] = (
            f"eye contact = {eye_contact}% ({contact_label}), "
            f"blink rate = {blink_rate} ({blink_label})"
        )

    return interpretations

In [83]:
def calculate_confidence_scientific(analysis_json):
    """Hitung confidence score dengan scientific rigor"""
    confidence_per_metric = {}
    uncertainty_per_metric = {}
    total_conf = 0.0
    total_uncertainty = 0.0

    for metric in WEIGHTS.keys():
        value = None
        if metric in analysis_json.get("speech_analysis", {}):
            value = analysis_json["speech_analysis"].get(metric)
        elif metric in analysis_json.get("facial_expression_analysis", {}):
            value = analysis_json["facial_expression_analysis"].get(metric)
        elif metric in analysis_json.get("eye_movement_analysis", {}):
            value = analysis_json["eye_movement_analysis"].get(metric)
        elif metric in analysis_json.get("head_movement_analysis", {}):
            value = analysis_json["head_movement_analysis"].get(metric)

        if value is not None:
            _, conf, uncertainty = score_conf(metric, value)
            confidence_per_metric[metric] = round(conf * 100, 2)
            uncertainty_per_metric[metric] = round(uncertainty, 2)
            total_conf += conf * WEIGHTS[metric]
            total_uncertainty += uncertainty * WEIGHTS[metric]

    raw_score = total_conf * 100
    scaled_score = 50 + (raw_score * 0.50)

    total_confidence_percent = round(scaled_score, 2)
    total_uncertainty_percent = round(total_uncertainty, 2)

    lower_bound = round(max(0, total_confidence_percent - total_uncertainty_percent), 2)
    upper_bound = round(min(100, total_confidence_percent + total_uncertainty_percent), 2)

    if total_confidence_percent >= 80:
        confidence_level = "High"
        interpretation = "Model prediksi sangat reliable"
    elif total_confidence_percent >= 70:
        confidence_level = "Good"
        interpretation = "Model prediksi reliable untuk decision-making"
    elif total_confidence_percent >= 60:
        confidence_level = "Moderate"
        interpretation = "Model prediksi cukup reliable, pertimbangkan faktor tambahan"
    elif total_confidence_percent >= 50:
        confidence_level = "Fair"
        interpretation = "Model prediksi perlu dukungan data tambahan"
    else:
        confidence_level = "Low"
        interpretation = "Confidence rendah, perlukan verifikasi manual"

    return {
        "confidence_per_metric": confidence_per_metric,
        "uncertainty_per_metric": uncertainty_per_metric,
        "total_confidence_score": total_confidence_percent,
        "confidence_interval": {
            "lower": lower_bound,
            "upper": upper_bound,
            "margin_of_error": total_uncertainty_percent
        },
        "confidence_level": confidence_level,
        "interpretation": interpretation,
        "reliability_notes": f"Confidence interval: [{lower_bound}% - {upper_bound}%] dengan margin of error ¬±{total_uncertainty_percent}%"
    }

def get_performance_level(avg_confidence):
    """Tentukan level performa berdasarkan confidence score"""
    if avg_confidence >= 80:
        return "EXCELLENT"
    elif avg_confidence >= 70:
        return "GOOD"
    elif avg_confidence >= 60:
        return "AVERAGE"
    elif avg_confidence >= 50:
        return "BELOW AVERAGE"
    else:
        return "NEEDS IMPROVEMENT"

def get_recommendation(avg_confidence, confidence_interval, interpretations):
    """Generate rekomendasi berdasarkan analisis dengan transparency"""
    performance_level = get_performance_level(avg_confidence)
    lower = confidence_interval["lower"]
    upper = confidence_interval["upper"]

    if avg_confidence >= 75 and lower >= 68:
        return f"RECOMMEND - Performa non-verbal {performance_level.lower()} dengan high confidence (CI: {lower}-{upper}%)"
    elif avg_confidence >= 65 and lower >= 55:
        return f"CONSIDER - Performa non-verbal {performance_level.lower()} dengan moderate confidence (CI: {lower}-{upper}%)"
    elif avg_confidence >= 55:
        return f"REVIEW - Performa non-verbal {performance_level.lower()}, memerlukan evaluasi tambahan (CI: {lower}-{upper}%)"
    else:
        return f"NOT RECOMMEND - Performa non-verbal {performance_level.lower()} dengan low confidence (CI: {lower}-{upper}%)"

In [84]:
def analyze_interview_video_with_confidence(video_path, audio_path=None):
    """Analisis video interview dengan optimasi penuh + scientific confidence scoring"""
    start_time = time.time()
    print("üé¨ Memulai analisis interview (OPTIMIZED + SCIENTIFIC)...")

    # ‚úÖ Track if we created temp file
    temp_audio_created = False

    if audio_path is None:
        print("üì§ Mengekstrak audio dari video...")
        filename = os.path.splitext(os.path.basename(video_path))[0]
        audio_path = f"{filename}_temp.wav"
        temp_audio_created = True  # ‚úÖ Mark that we created it
        audio_path = extract_audio_fixed(video_path, audio_path)
        if not audio_path:
            return {
                'analysis': {},
                'confidence_score': 0,
                'confidence_level': 'Failed',
                'confidence_components': {},
                'interpretations': {},
                'processing_time_seconds': 0
            }

    print("\nüìä Analyzing speech...")
    speech_analysis = analyze_speech_tempo(audio_path)

    print("\nüòä Analyzing facial expressions...")
    facial_analysis = analyze_facial_expressions(video_path)

    print("\nüëÅÔ∏è Analyzing eye movement...")
    eye_analysis = analyze_eye_movement(video_path)

    analysis_result = {
        "speech_analysis": speech_analysis,
        "facial_expression_analysis": facial_analysis,
        "eye_movement_analysis": eye_analysis,
    }

    conf_result = calculate_confidence_scientific(analysis_result)
    interpretations = interpret_non_verbal_analysis(analysis_result)

    elapsed = time.time() - start_time

    print(f'\n‚úÖ Non-Verbal Analysis Complete in {elapsed:.1f}s')
    print(f'   Confidence: {conf_result["total_confidence_score"]}% ({conf_result["confidence_level"]})')

    # ‚úÖ CLEANUP: Delete temp audio file if we created it
    if temp_audio_created and audio_path and os.path.exists(audio_path):
        try:
            os.remove(audio_path)
            file_size_mb = os.path.getsize(audio_path) / (1024 * 1024) if os.path.exists(audio_path) else 0
            print(f'   üóëÔ∏è  Temp audio deleted: {os.path.basename(audio_path)} ({file_size_mb:.2f} MB freed)')
        except Exception as e:
            print(f'   ‚ö†Ô∏è  Failed to delete temp audio: {str(e)}')

    return {
        'analysis': analysis_result,
        'confidence_score': conf_result['total_confidence_score'],
        'confidence_level': conf_result['confidence_level'],
        'confidence_components': conf_result['confidence_per_metric'],  # ‚úÖ FIXED: Use confidence_per_metric instead of confidence_components
        'confidence_interval': conf_result['confidence_interval'],
        'interpretations': interpretations,
        'processing_time_seconds': round(elapsed, 2)
    }

In [85]:
def summarize_non_verbal_batch(assessment_results):
    """Ringkasan batch dengan scientific rigor dan transparency"""
    speaking_ratios, pauses, speech_rates = [], [], []
    smiles, eyebrows, eye_contacts, blink_rates = [], [], [], []
    confidence_scores = []
    all_intervals = []

    for item in assessment_results:
        nv = item["result"]["non_verbal_analysis"]

        sp = nv["speech_analysis"]
        speaking_ratios.append(sp["speaking_ratio"])
        pauses.append(sp["number_of_pauses"])
        speech_rates.append(sp["speech_rate_wpm"])

        fc = nv["facial_expression_analysis"]
        smiles.append(fc["average_smile_intensity"])
        eyebrows.append(fc["eyebrow_movement_range"])

        ey = nv["eye_movement_analysis"]
        eye_contacts.append(ey["eye_contact_percentage"])
        blink_rates.append(ey["blink_rate_per_minute"])

        conf_result = calculate_confidence_scientific(nv)
        confidence_scores.append(conf_result["total_confidence_score"])
        all_intervals.append(conf_result["confidence_interval"])

    avg_confidence = round(np.mean(confidence_scores), 2) if confidence_scores else 0
    std_confidence = round(np.std(confidence_scores), 2) if confidence_scores else 0
    max_confidence = round(max(confidence_scores), 2) if confidence_scores else 0
    min_confidence = round(min(confidence_scores), 2) if confidence_scores else 0

    avg_lower = round(np.mean([ci["lower"] for ci in all_intervals]), 2)
    avg_upper = round(np.mean([ci["upper"] for ci in all_intervals]), 2)
    avg_margin = round(np.mean([ci["margin_of_error"] for ci in all_intervals]), 2)

    if avg_confidence >= 80:
        confidence_level = "High"
    elif avg_confidence >= 70:
        confidence_level = "Good"
    elif avg_confidence >= 60:
        confidence_level = "Moderate"
    elif avg_confidence >= 50:
        confidence_level = "Fair"
    else:
        confidence_level = "Low"

    aggregated_data = {
        "speech_analysis": {
            "avg_speaking_ratio": round(np.mean(speaking_ratios), 3),
            "avg_pauses": round(np.mean(pauses), 2),
            "avg_speech_rate": round(np.mean(speech_rates), 2)
        },
        "facial_expression_analysis": {
            "avg_smile_intensity": round(np.mean(smiles), 4),
            "avg_eyebrow_movement_range": round(np.mean(eyebrows), 4)
        },
        "eye_movement_analysis": {
            "avg_eye_contact": round(np.mean(eye_contacts), 2),
            "avg_blink_rate": round(np.mean(blink_rates), 2)
        }
    }

    interpretations = interpret_non_verbal_analysis(aggregated_data)
    summary_text = " ".join([
        interpretations.get("speech_analysis", ""),
        interpretations.get("facial_expression_analysis", ""),
        interpretations.get("eye_movement_analysis", "")
    ])

    poor_performance_count = sum(1 for score in confidence_scores if score < 60)
    poor_performance_percentage = round((poor_performance_count / len(confidence_scores) * 100), 2) if confidence_scores else 0

    performance_level = get_performance_level(avg_confidence)

    confidence_interval = {
        "lower": avg_lower,
        "upper": avg_upper,
        "margin_of_error": avg_margin
    }

    recommendation = get_recommendation(avg_confidence, confidence_interval, interpretations)

    return {
        "overall_performance_status": performance_level,
        "overall_confidence_score": avg_confidence,
        "summary": summary_text,
    }

<b><h2> Fungsi Transkrip Video

In [86]:
def clean_repetitive_text(text, max_repetitions=3):
    """Remove repetitive patterns at the end of transcription"""
    # Remove excessive repetitions (more than max_repetitions)
    words = text.split()
    if len(words) < 10:
        return text

    # Check last 100 words for repetitions
    check_window = min(100, len(words))
    last_words = words[-check_window:]

    # Detect if last word repeats excessively
    if len(last_words) > max_repetitions:
        last_word = last_words[-1]

        # Count consecutive repetitions from the end
        repetition_count = 0
        for word in reversed(last_words):
            if word.lower() == last_word.lower():
                repetition_count += 1
            else:
                break

        # If repetition exceeds threshold, remove them
        if repetition_count > max_repetitions:
            # Keep only max_repetitions of the repeated word
            words = words[:-repetition_count] + [last_word] * max_repetitions
            print(f'   üßπ Cleaned {repetition_count - max_repetitions} repetitive words')

    # Remove common hallucination patterns
    cleaned_text = ' '.join(words)

    # Pattern: word repeated 5+ times in a row
    cleaned_text = re.sub(r'\b(\w+)(?:\s+\1){4,}\b', r'\1', cleaned_text)

    return cleaned_text.strip()

In [87]:
def transcribe_video(video_path, language="en"):
    """Transcribe video using faster-whisper with MAXIMUM ACCURACY settings and weighted confidence"""
    try:
        if not os.path.exists(video_path):
            raise Exception(f"Video file not found: {video_path}")

        if not os.access(video_path, os.R_OK):
            raise Exception(f"Video file is not readable: {video_path}")

        file_size = os.path.getsize(video_path) / (1024 * 1024)
        print(f'üìÅ Video: {os.path.basename(video_path)} ({file_size:.2f} MB)')

        # ‚úÖ LANGUAGE SELECTION
        if language == "id":
            whisper_language = "id"
            initial_prompt = "This is a professional interview in Indonesian (Bahasa Indonesia)."
            print('üåê Language: Indonesian (Bahasa Indonesia)')
        elif language == "en":
            whisper_language = "en"
            initial_prompt = "This is a professional interview in English."
            print('üåê Language: English')
        else:
            # Default to English if unknown
            whisper_language = "en"
            initial_prompt = "This is a professional interview in English."
            print(f'‚ö†Ô∏è Unknown language code "{language}", defaulting to English')

        print('üîÑ Starting transcription...')
        start_time = time.time()

        # Dynamic parameters based on file size
        if file_size > 30:
            print('   ‚ö° Large file - using balanced mode')
            beam_size = 7
            best_of = 7
        else:
            beam_size = 10
            best_of = 10

        # ‚úÖ Optimized VAD parameters
        vad_params = {
            "threshold": 0.3,
            "min_speech_duration_ms": 200,
            "max_speech_duration_s": float('inf'),
            "min_silence_duration_ms": 1500,
            "speech_pad_ms": 500
        }

        # ‚úÖ Transcribe with language parameter
        segments, info = whisper_model.transcribe(
            video_path,
            language=whisper_language,  # ‚úÖ DYNAMIC LANGUAGE
            task="transcribe",
            beam_size=beam_size,
            best_of=best_of,
            patience=2.5,
            length_penalty=0.8,
            repetition_penalty=1.5,
            temperature=0.0,
            compression_ratio_threshold=2.2,
            log_prob_threshold=-0.8,
            no_speech_threshold=0.5,
            condition_on_previous_text=True,
            initial_prompt=initial_prompt,  # ‚úÖ DYNAMIC PROMPT
            word_timestamps=True,
            vad_filter=True,
            vad_parameters=vad_params
        )

        # Collect segments with confidence scores
        print('   üìù Collecting segments...')
        transcription_text = ""
        segments_list = list(segments)

        confidence_scores = []
        segment_details = []

        for segment in tqdm(segments_list, desc="   Segments", unit="seg", ncols=80, leave=False):
            transcription_text += segment.text + " "

            # Calculate confidence from log probability
            confidence = segment.avg_logprob
            # ‚úÖ FORMULA: konversi log prob (-inf to 0) ke percentage (0-100)
            # Transform log_prob dengan sigmoid untuk distribusi lebih baik
            def logprob_to_confidence(log_prob):
                # Normalize log_prob (biasanya -5 sampai 0)
                normalized = (log_prob + 5) / 5  # Scale ke 0-1
                # Apply sigmoid untuk smooth curve
                sigmoid = 1 / (1 + np.exp(-10 * (normalized - 0.5)))
                return round(sigmoid * 100, 2)

            confidence_percent = logprob_to_confidence(confidence)
            confidence_scores.append(confidence_percent)

            segment_details.append({
                "text": segment.text.strip(),
                "start": round(segment.start, 2),
                "end": round(segment.end, 2),
                "duration": round(segment.end - segment.start, 2),
                "confidence": confidence_percent
            })

        transcription_text = transcription_text.strip()

        if not transcription_text:
            print('   ‚ö†Ô∏è  No speech detected')
            return "[No speech detected in video]", 0.0

        if confidence_scores:
            # 1. Calculate weighted confidence by segment duration
            segment_durations = [seg.end - seg.start for seg in segments_list]
            total_duration = sum(segment_durations)

            weighted_confidence = sum(
                conf * (duration / total_duration)
                for conf, duration in zip(confidence_scores, segment_durations)
            )

            # 2. Quality-based boost
            word_count = len(transcription_text.split())
            min_conf = min(confidence_scores)
            max_conf = max(confidence_scores)
            conf_variance = max_conf - min_conf

            avg_confidence = round(sum(confidence_scores) / len(confidence_scores), 2)

        # Clean repetitive text
        original_length = len(transcription_text)
        transcription_text = clean_repetitive_text(transcription_text, max_repetitions=3)

        if len(transcription_text) < original_length:
            print(f'   üßπ Cleaned: {original_length} ‚Üí {len(transcription_text)} chars')

        total_time = time.time() - start_time
        words = transcription_text.split()

        # Display results
        print(f'   ‚úÖ Completed in {total_time:.1f}s | {len(segments_list)} segments | {len(words)} words')
        print(f'   üéØ Transcription Confidence: {avg_confidence}% {"‚úÖ" if avg_confidence >= 70 else "‚ö†Ô∏è" if avg_confidence >= 50 else "‚ùå"}')

        if confidence_scores:
            min_conf = min(confidence_scores)
            max_conf = max(confidence_scores)
            print(f'   üìä Confidence Range: {min_conf}% - {max_conf}%')

        # Cleanup
        gc.collect()

        return transcription_text, avg_confidence, min_conf, max_conf

    except Exception as e:
        print(f'   ‚ùå Error: {str(e)}')
        gc.collect()
        raise Exception(f"Transcription failed: {str(e)}")
    finally:
        # Always cleanup
        gc.collect()

<b><h2> Fungsi Translate to Indonesia

In [88]:
def translate_to_indonesian(text):
    """Translate English text to Indonesian using DeepL"""
    if not translator:
        print('   ‚ö†Ô∏è  Translation skipped (no API key)')
        return {
            "translated_text": "[Translation not available]"
        }

    try:
        max_chunk_size = 5000

        if len(text) <= max_chunk_size:
            result = translator.translate_text(text, source_lang="EN", target_lang="ID")
            translated_text = result.text
        else:
            sentences = text.split('. ')
            chunks = []
            current_chunk = ""

            for sentence in sentences:
                if len(current_chunk) + len(sentence) + 2 <= max_chunk_size:
                    current_chunk += sentence + ". "
                else:
                    if current_chunk:
                        chunks.append(current_chunk)
                    current_chunk = sentence + ". "

            if current_chunk:
                chunks.append(current_chunk)

            translated_chunks = []
            for chunk in chunks:
                result = translator.translate_text(chunk, source_lang="EN", target_lang="ID")
                translated_chunks.append(result.text)

            translated_text = " ".join(translated_chunks)

        print(f'   ‚úÖ Translation: {len(text)} ‚Üí {len(translated_text)} chars')

        return {
            "translated_text": translated_text
        }

    except Exception as e:
        print(f'   ‚ùå Translation failed: {str(e)}')
        return {
            "translated_text": f"[Translation failed: {str(e)}]"
        }

In [89]:
def translate_to_english(text):
    """Translate Indonesian text to English using DeepL"""
    if not translator:
        print('   ‚ö†Ô∏è  Translation skipped (no API key)')
        return {
            "translated_text": "[Translation not available]"
        }

    try:
        max_chunk_size = 5000

        if len(text) <= max_chunk_size:
            result = translator.translate_text(text, source_lang="ID", target_lang="EN-US")
            translated_text = result.text
        else:
            # Split by sentences for Indonesian
            sentences = text.split('. ')
            chunks = []
            current_chunk = ""

            for sentence in sentences:
                if len(current_chunk) + len(sentence) + 2 <= max_chunk_size:
                    current_chunk += sentence + ". "
                else:
                    if current_chunk:
                        chunks.append(current_chunk)
                    current_chunk = sentence + ". "

            if current_chunk:
                chunks.append(current_chunk)

            translated_chunks = []
            for chunk in chunks:
                result = translator.translate_text(chunk, source_lang="ID", target_lang="EN-US")
                translated_chunks.append(result.text)

            translated_text = " ".join(translated_chunks)

        print(f'   ‚úÖ Translation: {len(text)} ‚Üí {len(translated_text)} chars')

        return {
            "translated_text": translated_text
        }

    except Exception as e:
        print(f'   ‚ùå Translation failed: {str(e)}')
        return {
            "translated_text": f"[Translation failed: {str(e)}]"
        }

<b><h2> LLM Analysis

In [None]:
# ‚úÖ HuggingFace API Token
# HF_TOKEN = "token_goes_here"
# os.environ["HF_TOKEN"] = HF_TOKEN

# Initialize Inference Client
print('üì• Initializing HuggingFace Inference API...')
print('‚ÑπÔ∏è  Using meta-llama/Llama-3.1-8B-Instruct via Inference API')

client = InferenceClient(api_key=HF_TOKEN)

print('‚úÖ Inference API initialized successfully\n')

def evaluate_with_llm(transcription_text: str, question: str, position_id: int):
    """
    Evaluate interview answer using deterministic LLM evaluation with confidence scoring.
    NOW WITH LOG PROBABILITIES SUPPORT + BOOST SYSTEM for confidence enhancement.
    """
    try:
        # Construct evaluation prompt
        user_message = f"""You are an expert interview evaluator about programming and machine learning. You must provide objective, consistent scores based on explicit criteria and formulas.

**INTERVIEW QUESTION**: "{question}"

**CANDIDATE'S ANSWER**: "{transcription_text}"

**EVALUATION RUBRIC WITH FORMULAS**:

1. **KUALITAS JAWABAN (Quality of Answer)** [1-100]:

   Base Score Formula:
   - If answer addresses question with examples/details: BASE = 85
   - If answer addresses question adequately: BASE = 75
   - If answer is brief but relevant: BASE = 65
   - If answer is unclear/irrelevant: BASE = 45

   Adjustments:
   - Provides specific examples: +5 to +15
   - Shows deep understanding: +5 to +10
   - Lacks depth: -10 to -20
   - Vague/incomplete: -15 to -25

   MINIMUM for acceptable answers: 70

2. **KOHERENSI (Coherence)** [1-100]:

   Formula:
   - Logical flow, well-structured: BASE = 85
   - Adequate structure: BASE = 75
   - Some inconsistency: BASE = 65
   - Disorganized: BASE = 45

   Adjustments:
   - Clear progression: +5 to +10
   - Smooth transitions: +5 to +10
   - Contradictory statements: -15 to -25
   - Jumps between topics: -10 to -20

   MINIMUM for coherent answers: 70

3. **RELEVANSI (Relevance)** [1-100]:

   Formula:
   - Directly answers the question: BASE = 85
   - Addresses most aspects: BASE = 75
   - Partially relevant: BASE = 65
   - Off-topic: BASE = 45

   Adjustments:
   - Covers all question aspects: +10 to +20
   - Provides context: +5 to +10
   - Deviates from topic: -15 to -25

   MINIMUM for on-topic answers: 70

**CALCULATION STEPS**:
1. Analyze the answer content and structure
2. Calculate base scores using formulas
3. Apply adjustments

**OUTPUT FORMAT** (JSON only, no explanation):
{{
  "kualitas_jawaban": <integer 1-100>,
  "koherensi": <integer 1-100>,
  "relevansi": <integer 1-100>,
  "analysis": "<2-3 sentence justification with reasoning>"
}}
"""

        # Calculate word count for boost system
        word_count = len(transcription_text.split())

        print(f'‚îÇ ü§ñ LLM Evaluation...')
        print(f'‚îÇ üìù Answer length: {len(transcription_text)} chars ({word_count} words)')

        # ‚úÖ API Call with logprobs enabled
        completion = client.chat.completions.create(
            model="meta-llama/Llama-3.1-8B-Instruct",
            messages=[
                {
                    "role": "system",
                    "content": "You are a highly objective interview evaluator about programming and machine learning. Always respond with valid JSON only, no markdown."
                },
                {
                    "role": "user",
                    "content": user_message
                }
            ],
            max_tokens=600,
            temperature=0.1,
            top_p=0.9,
            logprobs=True,        # ‚úÖ ENABLE LOG PROBABILITIES
            top_logprobs=3        # ‚úÖ Get top 3 alternative tokens for each position
        )

        # Extract response text
        response = completion.choices[0].message.content.strip()
        response = re.sub(r'^```json\s*', '', response)
        response = re.sub(r'\s*```$', '', response)

        print(f'‚îÇ üì® API Response received ({len(response)} chars)')

        # ============================================================
        # ‚úÖ EXTRACT LOGPROBS DATA (NEW!)
        # ============================================================
        logprobs_data = None
        raw_token_confidence = None
        raw_avg_probability = None

        try:
            if hasattr(completion.choices[0], 'logprobs') and completion.choices[0].logprobs:
                logprobs_obj = completion.choices[0].logprobs

                # Check if content exists
                if hasattr(logprobs_obj, 'content') and logprobs_obj.content:
                    logprobs_data = logprobs_obj.content

                    # Calculate average log probability
                    token_logprobs = [token.logprob for token in logprobs_data if hasattr(token, 'logprob')]

                    if token_logprobs:
                        avg_logprob = sum(token_logprobs) / len(token_logprobs)
                        # Convert log probability to percentage confidence
                        raw_avg_probability = math.exp(avg_logprob)
                        raw_token_confidence = round(raw_avg_probability * 100, 2)

                        print(f'‚îÇ üéØ Raw Logprobs extracted: {len(token_logprobs)} tokens')
                        print(f'‚îÇ üìä Avg log prob: {avg_logprob:.4f}')
                        print(f'‚îÇ ‚ú® Raw token confidence: {raw_token_confidence}%')
                    else:
                        print(f'‚îÇ ‚ö†Ô∏è  Logprobs available but no token data')
                else:
                    print(f'‚îÇ ‚ö†Ô∏è  Logprobs object has no content')
            else:
                print(f'‚îÇ ‚ö†Ô∏è  No logprobs in API response (may not be supported)')
        except Exception as logprob_error:
            print(f'‚îÇ ‚ö†Ô∏è  Logprobs extraction failed: {str(logprob_error)}')
            # Continue without logprobs - non-critical feature

        # Extract JSON from response
        json_match = re.search(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', response, re.DOTALL)
        if json_match:
            json_str = json_match.group(0)
            evaluation = json_module.loads(json_str)
        else:
            raise ValueError("No valid JSON found in API response")

        # Validate scores
        required_keys = ['kualitas_jawaban', 'koherensi', 'relevansi']
        for key in required_keys:
            if key not in evaluation:
                raise ValueError(f"Missing required key: {key}")
            evaluation[key] = max(1, min(100, int(evaluation[key])))

        # ============================================================
        # DISPLAY RESULTS
        # ============================================================
        print(f'‚îÇ üìä LLM Scores:')
        print(f'‚îÇ    ‚Ä¢ Quality: {evaluation["kualitas_jawaban"]}/100')
        print(f'‚îÇ    ‚Ä¢ Coherence: {evaluation["koherensi"]}/100')
        print(f'‚îÇ    ‚Ä¢ Relevance: {evaluation["relevansi"]}/100')

        # Calculate total score (using boosted model confidence)
        total = round((
            evaluation["kualitas_jawaban"] +
            evaluation["koherensi"] +
            evaluation["relevansi"]
        ) / 3)

        print(f'‚îÇ ‚úÖ Total Score: {total}/100')

        # ‚úÖ Return with logprobs data and boost info
        result = {
            "scores": {
                "kualitas_jawaban": evaluation["kualitas_jawaban"],
                "koherensi": evaluation["koherensi"],
                "relevansi": evaluation["relevansi"],
                "confidence_score": raw_token_confidence
            },
            "total": total,
            "analysis": evaluation.get('analysis', 'No analysis provided'),
            # üÜï Logprobs data
            "logprobs_confidence": raw_token_confidence,
            "logprobs_probability": raw_avg_probability,
            "logprobs_available": logprobs_data is not None,
        }

        return result

    except Exception as e:
        print(f'‚îÇ ‚ö†Ô∏è  LLM evaluation failed: {str(e)}')
        print(f'‚îÇ üîÑ Falling back to rule-based assessment...')

        # Fallback assessment
        word_count = len(transcription_text.split())

        # Simple heuristic scoring
        if word_count > 100:
            quality_score = 75
            coherence_score = 70
            relevance_score = 70
            model_confidence = 60
        elif word_count > 50:
            quality_score = 65
            coherence_score = 65
            relevance_score = 65
            model_confidence = 55
        elif word_count > 20:
            quality_score = 55
            coherence_score = 55
            relevance_score = 55
            model_confidence = 50
        else:
            quality_score = 40
            coherence_score = 35
            relevance_score = 35
            model_confidence = 50

        total = round((quality_score + coherence_score + relevance_score) / 3)

        return {
            "scores": {
                "kualitas_jawaban": quality_score,
                "koherensi": coherence_score,
                "relevansi": relevance_score,
                "confidence_score": model_confidence
            },
            "total": total,
            "analysis": f"Fallback rule-based assessment (word count: {word_count}). LLM evaluation unavailable: {str(e)}",
            # Fallback has no logprobs or boost
            "logprobs_confidence": None,
            "logprobs_probability": None,
            "logprobs_available": False,
        }

üì• Initializing HuggingFace Inference API...
‚ÑπÔ∏è  Using meta-llama/Llama-3.1-8B-Instruct via Inference API
‚úÖ Inference API initialized successfully



In [None]:
def summarize_llm_analysis_batch(assessment_results):
    """
    Generate overall summary from all assessments

    ‚úÖ OPTIMIZED: If only 1 video, reuse existing analysis_llm instead of calling LLM again
    """
    try:
        if not assessment_results:
            return {
                "kesimpulan_llm": "Tidak ada hasil penilaian yang tersedia.",
                "rata_rata_confidence_score": 0,
                "avg_total_llm": 0,
                "avg_logprobs_confidence": None,
                "final_score_llm": 0
            }

        # Calculate averages
        confidence_scores = []
        total_scores = []
        logprobs_confidences = []

        for result in assessment_results:
            assessment = result.get('result', {}).get('penilaian', {})
            confidence_scores.append(assessment.get('confidence_score', 0))
            total_scores.append(assessment.get('total', 0))

            # Extract logprobs confidence if available
            lp_conf = assessment.get('logprobs_confidence')
            if lp_conf is not None:
                logprobs_confidences.append(lp_conf)

        avg_confidence = round(sum(confidence_scores) / len(confidence_scores)) if confidence_scores else 0
        avg_total = round(sum(total_scores) / len(total_scores)) if total_scores else 0

        # Calculate average logprobs confidence
        avg_logprobs_confidence = None
        if logprobs_confidences:
            avg_logprobs_confidence = round(sum(logprobs_confidences) / len(logprobs_confidences), 2)

        # Determine final score
        projectScore = 100
        final_score = projectScore * 0.7 + avg_total * 0.3

        # ‚úÖ NEW: If only 1 video, reuse existing analysis instead of calling LLM
        if len(assessment_results) == 1:
            print(f'\n{"="*70}')
            print(f'üìã Single Video Assessment - Reusing Existing Analysis')
            print(f'{"="*70}')
            print(f'‚ÑπÔ∏è  Only 1 video detected - skipping LLM summary generation')
            print(f'‚úÖ Using existing analysis_llm from video assessment')

            # Get existing analysis from the single video
            single_assessment = assessment_results[0].get('result', {}).get('penilaian', {})
            existing_analysis = single_assessment.get('analisis_llm', '')

            # Format as summary
            if existing_analysis:
                kesimpulan_llm = f"Assessment Summary: {existing_analysis}"
            else:
                # Fallback if no analysis
                quality = single_assessment.get('kualitas_jawaban', 0)
                coherence = single_assessment.get('koherensi', 0)
                relevance = single_assessment.get('relevansi', 0)
                total = single_assessment.get('total', 0)

                kesimpulan_llm = (
                    f"Candidate demonstrated performance with total score of {total}/100. "
                    f"Quality: {quality}/100, Coherence: {coherence}/100, Relevance: {relevance}/100."
                )

            print(f'   üìä Score: {avg_total}/100')
            print(f'   ‚ú® Analysis reused successfully')
            print(f'{"="*70}\n')

            return {
                "kesimpulan_llm": kesimpulan_llm,
                "rata_rata_confidence_score": avg_confidence,
                "avg_total_llm": avg_total,
                "final_score_llm": final_score,
                "avg_logprobs_confidence": avg_logprobs_confidence,
                "reused_single_analysis": True  # ‚úÖ Flag untuk tracking
            }

        # ‚úÖ Multiple videos: Generate comprehensive LLM summary
        print(f'\n{"="*70}')
        print(f'ü§ñ Generating Batch LLM Summary...')
        print(f'{"="*70}')
        print(f'üìä Processing {len(assessment_results)} video assessments')
        print(f'üìà Average Score: {avg_total}/100')
        if avg_logprobs_confidence is not None:
            print(f'‚ú® Avg Logprobs Confidence: {avg_logprobs_confidence}%')

        # Prepare assessment summary for multiple videos
        summary_lines = []
        for idx, result in enumerate(assessment_results, 1):
            assessment = result.get('result', {}).get('penilaian', {})
            question = result.get('question', f'Question {idx}')

            summary_lines.append(
                f"Video {idx}: Total {assessment.get('total', 0)}/100 "
                f"(Quality: {assessment.get('kualitas_jawaban', 0)}, "
                f"Coherence: {assessment.get('koherensi', 0)}, "
                f"Relevance: {assessment.get('relevansi', 0)})"
            )

        assessment_summary = "\n".join(summary_lines)

        # Detect language from first result
        source_language = assessment_results[0].get('result', {}).get('metadata', {}).get('source_language', 'English')

        # Generate LLM summary prompt
        user_message = f"""Based on the following interview assessment results, provide a comprehensive summary in {source_language} (2-3 paragraphs, ~150-200 words).

**ASSESSMENT RESULTS**:
{assessment_summary}

**AVERAGES**:
- Average Total Score: {avg_total}/100

**INSTRUCTIONS**:
1. Summarize the candidate's overall performance across all {len(assessment_results)} video interviews
2. Highlight consistent strengths and areas for improvement
3. Be objective, constructive, and professional
4. Consider both technical competence and communication skills

Respond with plain text summary only (no JSON, no markdown formatting)."""

        print(f'ü§ñ Calling LLM to generate comprehensive summary...')

        # API Call with logprobs
        completion = client.chat.completions.create(
            model="meta-llama/Llama-3.1-8B-Instruct",
            messages=[
                {
                    "role": "system",
                    "content": "You are an expert interview analyst. Provide comprehensive, objective assessments. Respond with plain text only, no JSON or markdown."
                },
                {
                    "role": "user",
                    "content": user_message
                }
            ],
            max_tokens=500,
            temperature=0.3,
            top_p=0.9,
            logprobs=True,
            top_logprobs=3
        )

        # Extract summary
        kesimpulan_llm = completion.choices[0].message.content.strip()
        kesimpulan_llm = re.sub(r'^```.*?\n', '', kesimpulan_llm)
        kesimpulan_llm = re.sub(r'\n```$', '', kesimpulan_llm)

        # Extract summary logprobs
        summary_logprobs_confidence = None
        try:
            if hasattr(completion.choices[0], 'logprobs') and completion.choices[0].logprobs:
                logprobs_obj = completion.choices[0].logprobs
                if hasattr(logprobs_obj, 'content') and logprobs_obj.content:
                    token_logprobs = [token.logprob for token in logprobs_obj.content if hasattr(token, 'logprob')]
                    if token_logprobs:
                        avg_logprob = sum(token_logprobs) / len(token_logprobs)
                        summary_logprobs_confidence = round(math.exp(avg_logprob) * 100, 2)
                        print(f'‚ú® Summary logprobs confidence: {summary_logprobs_confidence}%')
        except Exception as e:
            print(f'‚ö†Ô∏è  Summary logprobs extraction failed: {str(e)}')

        print(f'‚úÖ LLM Summary generated successfully')
        print(f'   Length: {len(kesimpulan_llm)} characters')
        print(f'   Words: {len(kesimpulan_llm.split())}')
        print(f'{"="*70}\n')

        return {
            "kesimpulan_llm": kesimpulan_llm,
            "rata_rata_confidence_score": avg_confidence,
            "avg_total_llm": avg_total,
            "final_score_llm": final_score,
            "avg_logprobs_confidence": avg_logprobs_confidence,
            "summary_logprobs_confidence": summary_logprobs_confidence,  # ‚úÖ Separate summary confidence
            "reused_single_analysis": False  # ‚úÖ Flag untuk tracking
        }

    except Exception as e:
        print(f'‚ùå LLM summary generation failed: {str(e)}')
        print(f'üîÑ Using fallback summary...')

        # Fallback summary
        return {
            "kesimpulan_llm": f"Kandidat menunjukkan performa dengan rata-rata skor {avg_total}/100 dari {len(assessment_results)} video interview. "
                             f"(LLM summary unavailable: {str(e)[:100]})",
            "rata_rata_confidence_score": avg_confidence,
            "avg_total_llm": avg_total,
            "final_score_llm": final_score,
            "avg_logprobs_confidence": avg_logprobs_confidence,
            "summary_logprobs_confidence": None,
            "reused_single_analysis": False
        }

<b><h2> Google Drive Download

In [92]:
# ===== HELPER: Download video from Google Drive =====
import gdown
import requests
from urllib.parse import urlparse, parse_qs

def download_video_from_google_drive(video_url, dest_folder):
    """Download video from Google Drive URL"""
    try:
        # Extract file ID from Google Drive URL
        if 'drive.google.com' in video_url:
            # Format 1: https://drive.google.com/file/d/FILE_ID/view?usp=...
            if '/file/d/' in video_url:
                file_id = video_url.split('/file/d/')[1].split('/')[0]
            # Format 2: https://drive.google.com/open?id=FILE_ID
            elif 'id=' in video_url:
                parsed = urlparse(video_url)
                file_id = parse_qs(parsed.query)['id'][0]
            else:
                raise ValueError(f"Unsupported Google Drive URL format")

            # Generate download URL
            download_url = f"https://drive.google.com/uc?id={file_id}&export=download"

            # Generate safe filename
            safe_name = f"{datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S')}_{uuid.uuid4().hex}.mp4"
            dest_path = os.path.join(dest_folder, safe_name)

            print(f"      üì• Downloading from Google Drive (ID: {file_id[:20]}...)")

            # Download with gdown
            gdown.download(download_url, dest_path, quiet=False)

            # Verify file exists and has content
            if not os.path.exists(dest_path) or os.path.getsize(dest_path) == 0:
                raise ValueError("Downloaded file is empty or doesn't exist")

            file_size_mb = os.path.getsize(dest_path) / (1024 * 1024)
            print(f"      ‚úÖ Downloaded: {safe_name} ({file_size_mb:.2f} MB)")

            return safe_name, dest_path

        else:
            # Direct URL download (fallback)
            safe_name = f"{datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S')}_{uuid.uuid4().hex}.mp4"
            dest_path = os.path.join(dest_folder, safe_name)

            print(f"      üì• Downloading from direct URL")
            response = requests.get(video_url, stream=True, timeout=300)
            response.raise_for_status()

            with open(dest_path, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)

            file_size_mb = os.path.getsize(dest_path) / (1024 * 1024)
            print(f"      ‚úÖ Downloaded: {safe_name} ({file_size_mb:.2f} MB)")

            return safe_name, dest_path

    except Exception as e:
        print(f"      ‚ùå Download failed: {str(e)}")
        raise

In [93]:
# ===== BACKGROUND THREAD: Download and Process Videos =====
def download_and_process_videos(session_id, candidate_name, interviews, language, base_url):
    """
    Background thread: Download videos from Google Drive URLs, then process identically to /upload endpoint.
    """
    print(f"\nüîΩ [Thread-{session_id[:8]}] Starting video downloads...")

    try:
        uploaded_videos = []

        # PHASE 1: Download all videos
        print(f'\nüì• Downloading {len(interviews)} video(s) from Google Drive...')
        for idx, interview in enumerate(interviews, 1):
            try:
                # Update download progress
                with processing_lock:
                    processing_status[session_id]['message'] = f'Downloading video {idx}/{len(interviews)}...'
                    processing_status[session_id]['progress'] = f'{idx}/{len(interviews)}'

                # Extract fields from interview
                position_id = interview.get('positionId', idx)
                question = interview.get('question', '')
                is_video_exist = interview.get('isVideoExist', False)
                video_url = interview.get('recordedVideoUrl', '')

                print(f'\n   üìπ Video {idx}/{len(interviews)}:')
                print(f'      Position ID: {position_id}')
                print(f'      Question: {question[:60]}{"..." if len(question) > 60 else ""}')
                print(f'      Video exists: {is_video_exist}')
                print(f'      URL: {video_url[:80]}{"..." if len(video_url) > 80 else ""}')

                # Validate
                if not question:
                    print(f'      ‚ö†Ô∏è Missing question, skipping')
                    uploaded_videos.append({
                        'positionId': position_id,
                        'question': '',
                        'isVideoExist': False,
                        'recordedVideoUrl': None,
                        'error': 'Missing question field'
                    })
                    continue

                if not is_video_exist or not video_url:
                    print(f'      ‚ö†Ô∏è No video URL, skipping')
                    uploaded_videos.append({
                        'positionId': position_id,
                        'question': question,
                        'isVideoExist': False,
                        'recordedVideoUrl': None,
                        'error': 'No video URL provided'
                    })
                    continue

                # Download video from Google Drive
                safe_name, dest_path = download_video_from_google_drive(video_url, UPLOAD_DIR)

                # Create local file URL (same as /upload endpoint)
                file_url = f"{base_url}/uploads/{safe_name}"

                uploaded_videos.append({
                    'positionId': position_id,
                    'question': question,
                    'isVideoExist': True,
                    'recordedVideoUrl': file_url,
                    'filename': safe_name
                })

            except Exception as e:
                print(f'      ‚ùå Failed to download video {idx}: {str(e)}')
                uploaded_videos.append({
                    'positionId': interview.get('positionId', idx),
                    'question': interview.get('question', ''),
                    'isVideoExist': False,
                    'recordedVideoUrl': None,
                    'error': str(e)
                })

        successful_downloads = len([v for v in uploaded_videos if v['isVideoExist']])
        print(f"\n‚úÖ Download complete: {successful_downloads}/{len(interviews)} successful")

        # PHASE 2: Update status to processing (same as /upload endpoint)
        with processing_lock:
            processing_status[session_id] = {
                'status': 'processing',
                'progress': '0/' + str(len(uploaded_videos)),
                'message': 'Starting transcription...',
                'uploaded_videos': len(uploaded_videos)
            }

        # PHASE 3: Process transcriptions (IDENTICAL to /upload endpoint)
        print(f'\nüîÑ Starting transcription process (identical to /upload endpoint)...')
        process_transcriptions_sync(session_id, candidate_name, uploaded_videos, base_url, language)

    except Exception as e:
        error_detail = traceback.format_exc()
        print(f"‚ùå Download thread error:\n{error_detail}")

        with processing_lock:
            processing_status[session_id] = {
                'status': 'error',
                'error': str(e),
                'error_detail': error_detail
            }

<b><h2> Final Processing

In [131]:
def process_transcriptions_sync(session_id: str, candidate_name: str, uploaded_videos: list, base_url: str, language: str = "en"):
    """Background transcription processing WITH COMPREHENSIVE LOGGING"""

    # ‚≠ê SETUP LOGGING TO FILE
    log_file = f'session_{session_id}.log'
    log_handle = open(log_file, 'w', encoding='utf-8', buffering=1)

    def log_print(msg):
        """Print to both console and log file"""
        print(msg, flush=True)
        log_handle.write(msg + '\n')
        log_handle.flush()

    try:
        log_print(f'\n{"="*70}')
        log_print(f'üéôÔ∏è  SESSION: {session_id}')
        log_print(f'üë§ CANDIDATE: {candidate_name}')
        log_print(f'üåê LANGUAGE: {"English" if language == "en" else "Indonesian" if language == "id" else language}')
        log_print(f'üìπ VIDEOS: {len(uploaded_videos)}')
        log_print(f'üìù LOG FILE: {log_file}')
        log_print(f'{"="*70}\n')

        transcriptions = []
        assessment_results = []

        with processing_lock:
            processing_status[session_id] = {'status': 'processing', 'progress': '0/0'}

        # Process each video
        for idx, interview in enumerate(uploaded_videos, 1):
            log_print(f'\n{"‚îÄ"*70}')
            log_print(f'Processing video {idx}/{len(uploaded_videos)}')
            log_print(f'{"‚îÄ"*70}')

            if not interview.get('isVideoExist') or not interview.get('recordedVideoUrl'):
                log_print(f'‚ö†Ô∏è Video {idx} - No video exists or no URL')
                transcriptions.append({
                    'positionId': interview['positionId'],
                    'error': interview.get('error', 'Video upload failed')
                })
                continue

            position_id = interview['positionId']
            video_url = interview['recordedVideoUrl']
            question = interview.get('question', '')

            try:
                log_print(f'\n‚îå‚îÄ Video {position_id}/{len(uploaded_videos)} ‚îÄ{"‚îÄ"*50}‚îê')
                if question:
                    log_print(f'‚îÇ ‚ùì Question: {question[:60]}{"..." if len(question) > 60 else ""}')

                local_file = get_local_file_path(video_url)
                if not local_file:
                    raise Exception(f"Local file not found")

                log_print(f'‚îÇ üìÅ Local file: {local_file}')
                log_print(f'‚îÇ üìè File exists: {os.path.exists(local_file)}')

                file_size_mb = os.path.getsize(local_file) / (1024 * 1024)
                log_print(f'‚îÇ üìä File size: {file_size_mb:.1f} MB')

                with processing_lock:
                    processing_status[session_id] = {
                        'status': 'processing',
                        'progress': f'{position_id}/{len(uploaded_videos)}',
                        'current_video': position_id,
                        'message': f'Processing video {position_id}/{len(uploaded_videos)}...'
                    }

                video_start = time.time()

                # Step 1: Transcribe
                log_print(f'‚îÇ 1Ô∏è‚É£  TRANSCRIPTION ({file_size_mb:.1f} MB)')
                try:
                    transcription_text, avg_confidence, min_conf, max_conf = transcribe_video(local_file, language=language)
                    transcribe_time = time.time() - video_start
                    log_print(f'‚îÇ    ‚úÖ Transcription completed')
                    log_print(f'‚îÇ    üéØ Transcription Confidence: {avg_confidence}%')
                    log_print(f'‚îÇ    üìù Text length: {len(transcription_text)} chars')
                except Exception as e:
                    log_print(f'‚îÇ    ‚ùå Transcription ERROR: {str(e)}')
                    raise

                # Step 2: Translate (conditional based on language)
                log_print(f'‚îÇ 2Ô∏è‚É£  TRANSLATION')
                try:
                    translate_start = time.time()

                    if language == "en":
                        # English ‚Üí Indonesian
                        translation_result = translate_to_indonesian(transcription_text)
                        transcription_en = transcription_text  # Original is English
                        transcription_id = translation_result['translated_text']  # Translated to Indonesian
                        log_print(f'‚îÇ    üåê Direction: English ‚Üí Indonesian')
                    elif language == "id":
                        # Indonesian ‚Üí English
                        translation_result = translate_to_english(transcription_text)
                        transcription_id = transcription_text  # Original is Indonesian
                        transcription_en = translation_result['translated_text']  # Translated to English
                        log_print(f'‚îÇ    üåê Direction: Indonesian ‚Üí English')
                    else:
                        # Default: assume English
                        translation_result = translate_to_indonesian(transcription_text)
                        transcription_en = transcription_text
                        transcription_id = translation_result['translated_text']
                        log_print(f'‚îÇ    ‚ö†Ô∏è  Unknown language, defaulting to English ‚Üí Indonesian')

                    translate_time = time.time() - translate_start
                    log_print(f'‚îÇ    ‚úÖ Translation completed in {translate_time:.1f}s')
                    log_print(f'‚îÇ    üìù EN length: {len(transcription_en)} chars')
                    log_print(f'‚îÇ    üìù ID length: {len(transcription_id)} chars')
                except Exception as e:
                    log_print(f'‚îÇ    ‚ùå Translation ERROR: {str(e)}')
                    raise

                # Step 3: Cheating Detection
                log_print(f'‚îÇ 2Ô∏è‚É£¬Ω CHEATING DETECTION')
                print('\nüîç Running Cheating Detection...')
                try:
                    cheating_start = time.time()
                    cheating_result = comprehensive_cheating_detection(local_file)
                    cheating_time = time.time() - cheating_start
                    log_print(f'‚îÇ    ‚úÖ Cheating detection completed in {cheating_time:.1f}s')
                except Exception as e:
                    log_print(f'‚îÇ    ‚ùå Cheating detection ERROR: {str(e)}')
                    raise

                # Step 4: Non-Verbal Analysis
                log_print(f'‚îÇ 2Ô∏è‚É£¬æ NON-VERBAL ANALYSIS')
                try:
                    non_verbal_start = time.time()
                    non_verbal_result = analyze_interview_video_with_confidence(
                        video_path=local_file,
                        audio_path=None
                    )
                    non_verbal_time = time.time() - non_verbal_start
                    log_print(f'‚îÇ    ‚úÖ Non-verbal analysis completed in {non_verbal_time:.1f}s')
                    log_print(f'‚îÇ    üìä Non-Verbal Confidence: {non_verbal_result["confidence_score"]}%')
                except Exception as e:
                    log_print(f'‚îÇ    ‚ùå Non-verbal analysis ERROR: {str(e)}')
                    raise

                # Step 5: LLM Evaluation (always use English text for better accuracy)
                log_print(f'‚îÇ 3Ô∏è‚É£  AI ASSESSMENT')
                try:
                    llm_start = time.time()
                    llm_evaluation = evaluate_with_llm(transcription_en, question, position_id)  # ‚úÖ Use English version
                    llm_time = time.time() - llm_start
                    log_print(f'‚îÇ    ‚úÖ LLM evaluation completed in {llm_time:.1f}s')
                    log_print(f'‚îÇ    üìä Total Score: {llm_evaluation["total"]}/100')
                except Exception as e:
                    log_print(f'‚îÇ    ‚ùå LLM evaluation ERROR: {str(e)}')
                    raise

                # Step 6: Save transcription file
                log_print(f'‚îÇ 4Ô∏è‚É£  SAVING FILES')
                trans_fname = f"transcription_pos{position_id}_{datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S')}_{uuid.uuid4().hex}.txt"
                trans_path = os.path.join(TRANSCRIPTION_DIR, trans_fname)

                with open(trans_path, 'w', encoding='utf-8') as f:
                    f.write(f"Candidate: {candidate_name}\n")
                    f.write(f"Position ID: {position_id}\n")
                    f.write(f"Question: {question}\n")
                    f.write(f"Video URL: {video_url}\n")
                    f.write(f"Language: {language}\n")  # ‚úÖ Added language info
                    f.write(f"Transcribed at: {datetime.now(timezone.utc).isoformat()}\n")
                    f.write(f"\n{'='*50}\n")

                    if language == "id":
                        f.write(f"INDONESIAN TRANSCRIPTION (Original):\n")
                        f.write(f"{'='*50}\n\n")
                        f.write(transcription_id)
                        f.write(f"\n\n{'='*50}\n")
                        f.write(f"ENGLISH TRANSLATION:\n")
                        f.write(f"{'='*50}\n\n")
                        f.write(transcription_en)
                    else:
                        f.write(f"ENGLISH TRANSCRIPTION (Original):\n")
                        f.write(f"{'='*50}\n\n")
                        f.write(transcription_en)
                        f.write(f"\n\n{'='*50}\n")
                        f.write(f"INDONESIAN TRANSLATION:\n")
                        f.write(f"{'='*50}\n\n")
                        f.write(transcription_id)

                    f.write(f"\n\nTranscription Confidence: {avg_confidence}%\n")

                log_print(f'‚îÇ    ‚úÖ Transcription file saved: {trans_fname}')

                transcription_url = f"{base_url}/transcriptions/{trans_fname}"

                # Build assessment
                words = transcription_en.split()

                assessment = {
                    "penilaian": {
                        "confidence_score": llm_evaluation['scores']['confidence_score'],
                        "kualitas_jawaban": llm_evaluation['scores']['kualitas_jawaban'],
                        "relevansi": llm_evaluation['scores']['relevansi'],
                        "koherensi": llm_evaluation['scores']['koherensi'],
                        "analisis_llm": llm_evaluation['analysis'],
                        "total": llm_evaluation['total'],
                        # üÜï NEW: Add logprobs data
                        "logprobs_confidence": llm_evaluation.get('logprobs_confidence'),
                        "logprobs_probability": llm_evaluation.get('logprobs_probability'),
                        "logprobs_available": llm_evaluation.get('logprobs_available', False)
                    },
                    "non_verbal_analysis": non_verbal_result['analysis'],
                    "non_verbal_confidence_score": non_verbal_result['confidence_score'],
                    "transkripsi_en": transcription_en,
                    "transkripsi_id": transcription_id,
                    "transkripsi_confidence": avg_confidence,
                    "transkripsi_min_confidence": min_conf,
                    "transkripsi_max_confidence": max_conf,
                    "cheating_detection": cheating_result,
                    "metadata": {
                        "word_count": len(words),
                        "processed_at": datetime.now(timezone.utc).isoformat(),
                        # üÜï NEW: Logprobs metadata
                        "logprobs_enabled": True,
                        "source_language": "English" if language == "en" else "Indonesian" if language == "id" else "Unknown"
                    }
                }

                assessment_results.append({
                    "id": position_id,
                    "question": question,
                    "result": assessment
                })
                log_print(f'‚îÇ    ‚úÖ Assessment added to results (total: {len(assessment_results)})')

                transcriptions.append({
                    'positionId': position_id,
                    'question': question,
                    'videoUrl': video_url,
                    'transcription': transcription_en,
                    'transcription_id': transcription_id,
                    'transcriptionUrl': transcription_url,
                    'transcriptionFile': trans_fname,
                    'assessment': assessment
                })

                # Delete video
                if os.path.exists(local_file):
                    os.remove(local_file)
                    log_print(f'‚îÇ üóëÔ∏è  Video deleted ({file_size_mb:.1f} MB freed)')

                total_time = time.time() - video_start
                log_print(f'‚îÇ ‚è±Ô∏è  Total: {total_time:.1f}s')
                log_print(f'‚îî‚îÄ{"‚îÄ"*68}‚îò')

                gc.collect()

            except Exception as e:
                log_print(f'‚îÇ ‚ùå ERROR processing video {position_id}: {str(e)}')
                log_print(f'‚îÇ üìã Traceback:')
                for line in traceback.format_exc().split('\n'):
                    log_print(f'‚îÇ    {line}')
                log_print(f'‚îî‚îÄ{"‚îÄ"*68}‚îò')

                transcriptions.append({
                    'positionId': position_id,
                    'question': question,
                    'videoUrl': video_url,
                    'error': str(e)
                })

        # ============================================================================
        # AGGREGATE ANALYSIS
        # ============================================================================
        log_print(f'\n{"="*70}')
        log_print(f'üìä STARTING AGGREGATE ANALYSIS')
        log_print(f'{"="*70}')
        log_print(f'Assessment Results Count: {len(assessment_results)}')

        if len(assessment_results) == 0:
            log_print(f'‚ö†Ô∏è WARNING: No assessment results! Cannot create aggregate analysis.')
            log_print(f'   Total transcriptions: {len(transcriptions)}')
            log_print(f'   Transcriptions with errors: {sum(1 for t in transcriptions if "error" in t)}')

        # 1. Aggregate Cheating
        try:
            log_print(f'\nüëÄ Calculating aggregate non-verbal...')
            aggregate_cheating = aggregate_cheating_results(assessment_results)
            log_print(f'‚úÖ Aggregate cheating completed')
        except Exception as e:
            log_print(f'‚ùå ERROR in aggregate_non_verbal: {str(e)}')
            log_print(f'   Traceback: {traceback.format_exc()}')
            aggregate_cheating = {"error": str(e)}


        # 2. Aggregate Non-Verbal
        try:
            log_print(f'\nüëÄ Calculating aggregate non-verbal...')
            aggregate_non_verbal = summarize_non_verbal_batch(assessment_results)
            log_print(f'‚úÖ Aggregate non-verbal completed')
        except Exception as e:
            log_print(f'‚ùå ERROR in aggregate_non_verbal: {str(e)}')
            log_print(f'   Traceback: {traceback.format_exc()}')
            aggregate_non_verbal = {"error": str(e)}

        # 3. LLM Summary
        try:
            log_print(f'\nü§ñ Generating LLM summary...')
            hasil_llm = summarize_llm_analysis_batch(assessment_results)
            log_print(f'‚úÖ LLM summary completed')
        except Exception as e:
            log_print(f'‚ùå ERROR in LLM summary: {str(e)}')
            log_print(f'   Traceback: {traceback.format_exc()}')
            hasil_llm = {
                "kesimpulan_llm": f"Error: {str(e)}",
                "rata_rata_confidence_score": 0,
                "error": str(e)
            }

        log_print(f'\n{"="*70}')
        log_print(f'‚úÖ ALL AGGREGATE ANALYSIS COMPLETED')
        log_print(f'{"="*70}')

        # ============================================================================
        # SAVE JSON
        # ============================================================================
        if assessment_results:
            try:
                log_print(f'\nüíæ SAVING JSON RESULTS...')

                results_json = {
                   "success": True,
                    "name": candidate_name,
                    "session": session_id,
                    "llm_results": hasil_llm,
                    "aggregate_cheating_detection": aggregate_cheating,
                    "aggregate_non_verbal_analysis": aggregate_non_verbal,
                    "content": assessment_results,
                    "metadata": {
                        "total_videos": len(uploaded_videos),
                        "successful_videos": len(assessment_results),
                        "processed_at": datetime.now(timezone.utc).isoformat(),
                        "model": "faster-whisper large-v3",
                        "llm_model": "meta-llama/Llama-3.1-8B-Instruct"
                    }
                }

                results_filename = f"{session_id}.json"
                results_path = os.path.join(RESULTS_DIR, results_filename)

                log_print(f'üìÇ Results path: {results_path}')
                log_print(f'üìä JSON size: {len(str(results_json))} chars')

                # Ensure directory exists
                os.makedirs(RESULTS_DIR, exist_ok=True)
                log_print(f'‚úÖ Results directory ensured: {RESULTS_DIR}')

                # Write JSON
                try:
                    with open(results_path, 'w', encoding='utf-8') as f:
                        json.dump(results_json, f, ensure_ascii=False, indent=2)

                    file_size = os.path.getsize(results_path)
                    print(f'‚úÖ JSON saved successfully')
                    print(f'   File: {results_filename}')
                    print(f'   Size: {file_size:,} bytes ({file_size/1024:.1f} KB)')

                except Exception as save_error:
                    print(f'‚ùå ERROR saving JSON: {save_error}')
                    print(f'   Attempting alternative save method...')

                    # Fallback: Manually convert NumPy types
                    def convert_to_native(obj):
                        if isinstance(obj, dict):
                            return {k: convert_to_native(v) for k, v in obj.items()}
                        elif isinstance(obj, list):
                            return [convert_to_native(item) for item in obj]
                        elif isinstance(obj, (np.integer, np.int32, np.int64)):
                            return int(obj)
                        elif isinstance(obj, (np.floating, np.float32, np.float64)):
                            return float(obj)
                        elif isinstance(obj, np.ndarray):
                            return obj.tolist()
                        elif isinstance(obj, np.bool_):
                            return bool(obj)
                        return obj

                    try:
                        cleaned_json = convert_to_native(results_json)
                        with open(results_path, 'w', encoding='utf-8') as f:
                            json.dump(cleaned_json, f, ensure_ascii=False, indent=2)
                        print(f'‚úÖ JSON saved successfully (fallback method)')
                    except Exception as fallback_error:
                        print(f'‚ùå CRITICAL: Both save methods failed: {fallback_error}')
                        raise

                log_print(f'‚úÖ JSON written to file')

                # Verify
                if os.path.exists(results_path):
                    file_size = os.path.getsize(results_path)
                    log_print(f'‚úÖ‚úÖ‚úÖ JSON FILE SAVED SUCCESSFULLY! ‚úÖ‚úÖ‚úÖ')
                    log_print(f'   Path: {results_path}')
                    log_print(f'   Size: {file_size} bytes')
                else:
                    log_print(f'‚ùå‚ùå‚ùå WARNING: JSON FILE NOT CREATED! ‚ùå‚ùå‚ùå')

                results_url = f"{base_url}/results/{results_filename}"
                log_print(f'üåê Results URL: {results_url}')

            except Exception as e:
                log_print(f'‚ùå CRITICAL ERROR saving JSON: {str(e)}')
                log_print(f'   Traceback: {traceback.format_exc()}')
        else:
            log_print(f'\n‚ö†Ô∏è‚ö†Ô∏è‚ö†Ô∏è WARNING: assessment_results is EMPTY! ‚ö†Ô∏è‚ö†Ô∏è‚ö†Ô∏è')
            log_print(f'   JSON will NOT be saved.')

        successful_count = sum(1 for t in transcriptions if 'transcription' in t)

        with processing_lock:
            processing_status[session_id] = {
                'status': 'completed',
                'result': {
                    'success': True,
                    'transcriptions': transcriptions,
                    'processed_videos': len(transcriptions),
                    'successful_videos': successful_count,
                    'failed_videos': len(transcriptions) - successful_count,
                    'results_url': f"{base_url}/results/{session_id}.json" if assessment_results else None
                }
            }

        log_print(f'\n{"="*70}')
        log_print(f'‚úÖ SESSION COMPLETED')
        log_print(f'   Success: {successful_count}/{len(transcriptions)} videos')
        log_print(f'   Log file: {log_file}')
        log_print(f'{"="*70}\n')

    except Exception as e:
        log_print(f'\n‚ùå SESSION ERROR:\n{traceback.format_exc()}')

        with processing_lock:
            processing_status[session_id] = {
                'status': 'error',
                'error': str(e),
                'error_detail': traceback.format_exc()
            }

    finally:
        log_handle.close()

<b><h2> ENDPOINT

In [95]:
# ENDPOINTS
@app.post('/upload')
async def receive_videos_and_process(
    request: Request,
    candidate_name: str = Form(...),
    language: str = Form("en"),
    videos: List[UploadFile] = File(...),
    questions: List[str] = Form(...)  # NEW: Accept questions array
):
    """Upload videos and start background transcription"""
    session_id = uuid.uuid4().hex
    print(f'\nüîµ NEW UPLOAD REQUEST - Session: {session_id}')
    print(f'   Candidate: {candidate_name}')
    print(f'   Videos: {len(videos)} file(s)')
    print(f'   Questions: {len(questions)} question(s)')  # NEW

    # NEW: Validate questions count matches videos count
    if len(questions) != len(videos):
        return JSONResponse(
            content={
                'success': False,
                'error': f'Questions count ({len(questions)}) must match videos count ({len(videos)})'
            },
            status_code=400,
            headers={
                'Access-Control-Allow-Origin': '*',
                'Access-Control-Allow-Methods': 'POST, GET, OPTIONS',
                'Access-Control-Allow-Headers': '*',
            }
        )

    if language not in ["en", "id"]:
        return JSONResponse(
            {
                'success': False,
                'error': f'Invalid language code: {language}. Must be "en" or "id".'
            },
            status_code=400
        )

    # Initialize status FIRST
    with processing_lock:
        processing_status[session_id] = {
            'status': 'uploading',
            'progress': '0/0',
            'message': 'Uploading videos...'
        }

    try:
        # 1. Upload semua video (fast)
        base_url = str(request.base_url).rstrip('/')
        uploaded_videos = []

        print(f'\nüì§ Uploading {len(videos)} video(s)...')
        for idx, (video, question) in enumerate(zip(videos, questions), 1):  # NEW: zip with questions
            try:
                ext = os.path.splitext(video.filename)[1] or '.webm'
                safe_name = f"{datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S')}_{uuid.uuid4().hex}{ext}"
                dest_path = os.path.join(UPLOAD_DIR, safe_name)

                # Update upload progress
                with processing_lock:
                    processing_status[session_id]['message'] = f'Uploading video {idx}/{len(videos)}...'
                    processing_status[session_id]['progress'] = f'{idx}/{len(videos)}'

                with open(dest_path, 'wb') as buffer:
                    shutil.copyfileobj(video.file, buffer)

                file_url = f"{base_url}/uploads/{safe_name}"
                uploaded_videos.append({
                    'positionId': idx,
                    'question': question,  # NEW: Include question
                    'isVideoExist': True,
                    'recordedVideoUrl': file_url,
                    'filename': safe_name
                })
                print(f'   ‚úÖ Uploaded: {safe_name} | Q: {question[:50]}{"..." if len(question) > 50 else ""}')  # NEW

            except Exception as e:
                print(f'   ‚ùå Failed: {str(e)}')
                uploaded_videos.append({
                    'positionId': idx,
                    'question': question if idx <= len(questions) else '',  # NEW: Include question even on error
                    'isVideoExist': False,
                    'recordedVideoUrl': None,
                    'error': str(e)
                })

        # 2. Update status to processing
        with processing_lock:
            processing_status[session_id] = {
                'status': 'processing',
                'progress': '0/' + str(len(uploaded_videos)),
                'message': 'Starting transcription...',
                'uploaded_videos': len(uploaded_videos)
            }

        # 3. Start background thread
        thread = th.Thread(
            target=process_transcriptions_sync,
            args=(session_id, candidate_name, uploaded_videos, base_url, language),
            daemon=True
        )
        thread.start()

        print(f'‚úÖ Upload complete. Background thread started.')
        print(f'üì§ Returning immediate response with session_id: {session_id}')

        # 4. RETURN IMMEDIATELY - no waiting!
        return JSONResponse(
            content={
                'success': True,
                'session_id': session_id,
                'message': 'Videos uploaded successfully. Processing started.',
                'uploaded_videos': len(uploaded_videos)
            },
            status_code=200,
            headers={
                'Access-Control-Allow-Origin': '*',
                'Access-Control-Allow-Methods': 'POST, GET, OPTIONS',
                'Access-Control-Allow-Headers': '*',
            }
        )

    except Exception as e:
        error_detail = traceback.format_exc()
        print(f'‚ùå Error:\n{error_detail}')

        # Update status to error
        with processing_lock:
            processing_status[session_id] = {
                'status': 'error',
                'error': str(e),
                'error_detail': error_detail
            }

        return JSONResponse(
            content={
                'success': False,
                'session_id': session_id,
                'error': str(e)
            },
            status_code=500,
            headers={
                'Access-Control-Allow-Origin': '*',
                'Access-Control-Allow-Methods': 'POST, GET, OPTIONS',
                'Access-Control-Allow-Headers': '*',
            }
        )

In [96]:
# ===== ENDPOINT: /upload_json =====
@app.post('/upload_json')
async def receive_json_and_download_videos(request: Request):
    """Receive JSON with Google Drive URLs, download videos, then process"""
    session_id = uuid.uuid4().hex

    try:
        # Parse JSON
        json_data = await request.json()

        print(f'\nüîµ NEW JSON UPLOAD REQUEST - Session: {session_id}')

        # Validate structure
        if not json_data.get('success') or not json_data.get('data'):
            return JSONResponse(
                {'success': False, 'error': 'Invalid JSON: missing success or data'},
                status_code=400,
                headers={'Access-Control-Allow-Origin': '*'}
            )

        data = json_data['data']

        # Extract candidate
        if not data.get('candidate') or not data['candidate'].get('name'):
            return JSONResponse(
                {'success': False, 'error': 'Missing candidate name'},
                status_code=400,
                headers={'Access-Control-Allow-Origin': '*'}
            )

        candidate_name = data['candidate']['name']
        candidate_email = data['candidate'].get('email', 'N/A')

        # Extract interviews
        if not data.get('reviewChecklists') or not data['reviewChecklists'].get('interviews'):
            return JSONResponse(
                {'success': False, 'error': 'Missing interviews data'},
                status_code=400,
                headers={'Access-Control-Allow-Origin': '*'}
            )

        interviews = data['reviewChecklists']['interviews']

        if not isinstance(interviews, list) or len(interviews) == 0:
            return JSONResponse(
                {'success': False, 'error': 'Interviews array is empty'},
                status_code=400,
                headers={'Access-Control-Allow-Origin': '*'}
            )

        # Get language
        language = json_data.get('language', 'en')

        print(f'   Candidate: {candidate_name} ({candidate_email})')
        print(f'   Videos: {len(interviews)} video(s)')
        print(f'   Language: {language}')

        # Validate language
        if language not in ["en", "id"]:
            return JSONResponse(
                {'success': False, 'error': f'Invalid language: {language}'},
                status_code=400,
                headers={'Access-Control-Allow-Origin': '*'}
            )

        # Log certification info
        if data.get('certification'):
            cert = data['certification']
            print(f'   Certification: {cert.get("abbreviatedType", "N/A")} - {cert.get("status", "N/A")}')

        # Initialize status
        with processing_lock:
            processing_status[session_id] = {
                'status': 'downloading',
                'progress': '0/' + str(len(interviews)),
                'message': 'Downloading videos from Google Drive...'
            }

        # Start background thread
        thread = th.Thread(
            target=download_and_process_videos,
            args=(session_id, candidate_name, interviews, language, str(request.base_url).rstrip('/')),
            daemon=True
        )
        thread.start()

        print(f'‚úÖ JSON received. Background download thread started.')
        print(f'üì§ Returning immediate response with session_id: {session_id}')

        return JSONResponse(
            content={
                'success': True,
                'session_id': session_id,
                'message': 'JSON received. Downloading videos from Google Drive...',
                'video_count': len(interviews)
            },
            status_code=200,
            headers={
                'Access-Control-Allow-Origin': '*',
                'Access-Control-Allow-Methods': 'POST, OPTIONS',
                'Access-Control-Allow-Headers': '*',
            }
        )

    except Exception as e:
        error_detail = traceback.format_exc()
        print(f'‚ùå Error processing JSON:\n{error_detail}')

        return JSONResponse(
            content={'success': False, 'error': str(e)},
            status_code=500,
            headers={'Access-Control-Allow-Origin': '*'}
        )

In [97]:
@app.get('/status/{session_id}')
async def get_processing_status(session_id: str):
    """Check processing status"""
    with processing_lock:
        if session_id not in processing_status:
            return JSONResponse(
                {
                    'status': 'not_found',
                    'message': 'Session not found'
                },
                status_code=404,
                headers={
                    'Access-Control-Allow-Origin': '*',
                    'Access-Control-Allow-Methods': 'GET, OPTIONS',
                    'Access-Control-Allow-Headers': '*',
                    'Cache-Control': 'no-cache, no-store, must-revalidate',
                }
            )

        status_copy = processing_status[session_id].copy()

    # Add redirect URL if completed
    if status_copy.get('status') == 'completed':
        status_copy['redirect'] = f"halaman_dasboard.html?session={session_id}"

    return JSONResponse(
        status_copy,
        headers={
            'Access-Control-Allow-Origin': '*',
            'Access-Control-Allow-Methods': 'GET, OPTIONS',
            'Access-Control-Allow-Headers': '*',
            'Cache-Control': 'no-cache, no-store, must-revalidate',
        }
    )


In [98]:
@app.get('/results/{session_id}')
async def get_results(session_id: str):
    """Get assessment results for a session"""
    results_filename = f"{session_id}.json"
    results_path = os.path.join(RESULTS_DIR, results_filename)

    if not os.path.exists(results_path):
        return JSONResponse(
            {
                'success': False,
                'message': 'Results not found for this session',
                'session_id': session_id
            },
            status_code=404,
            headers={
                'Access-Control-Allow-Origin': '*',
                'Access-Control-Allow-Methods': 'GET, OPTIONS',
                'Access-Control-Allow-Headers': '*',
            }
        )

    try:
        with open(results_path, 'r', encoding='utf-8') as f:
            results_data = json.load(f)

        return JSONResponse(
            results_data,
            headers={
                'Access-Control-Allow-Origin': '*',
                'Access-Control-Allow-Methods': 'GET, OPTIONS',
                'Access-Control-Allow-Headers': '*',
                'Cache-Control': 'no-cache, no-store, must-revalidate',
            }
        )
    except Exception as e:
        return JSONResponse(
            {
                'success': False,
                'message': f'Error reading results: {str(e)}',
                'session_id': session_id
            },
            status_code=500,
            headers={
                'Access-Control-Allow-Origin': '*',
                'Access-Control-Allow-Methods': 'GET, OPTIONS',
                'Access-Control-Allow-Headers': '*',
            }
        )


In [99]:
@app.get('/')
async def index():
    return {
        'message': 'AI Interview Assessment System',
        'model': 'faster-whisper large-v3',
        'accuracy': '98%+ for clear English speech',
        'speed': '4-5x faster than standard Whisper',
        'endpoints': {
            'upload': 'POST /upload',
            'status': 'GET /status/{session_id}',
            'results': 'GET /results/{session_id}',
            'test_form': 'GET /upload_form'
        }
    }

<b><h2> LOCAL SERVER

In [100]:
# Jalankan server uvicorn di dalam notebook (tanpa ngrok)
nest_asyncio.apply()
PORT = 8888

# Hentikan server sebelumnya jika ada
if 'server_thread' in globals() and server_thread is not None:
    try:
        print('‚è∏Ô∏è  Stopping previous server...')
        if 'server' in globals() and server is not None:
            server.should_exit = True
        # Tunggu thread selesai (dengan timeout)
        if server_thread.is_alive():
            server_thread.join(timeout=2)
        print('‚úÖ Previous server stopped.')
    except Exception as e:
        print(f'‚ö†Ô∏è  Error stopping previous server: {e}')

# Buat server instance baru dengan log level yang lebih rendah
config = uvicorn.Config(
    app=app,
    host='0.0.0.0',
    port=PORT,
    log_level='warning',  # Kurangi verbosity untuk menghindari duplikasi log
    access_log=False  # Nonaktifkan access log di console
)
server = uvicorn.Server(config=config)

# Fungsi untuk menjalankan server di thread
def run_server_in_thread():
    # Buat event loop baru untuk thread ini
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        loop.run_until_complete(server.serve())
    except Exception as e:
        print(f'‚ùå Server error: {e}')
    finally:
        loop.close()

# Jalankan server di background thread
server_thread = threading.Thread(target=run_server_in_thread, daemon=True)
server_thread.start()

print('‚îÅ' * 60)
print('üöÄ Server started successfully!')
print(f'üìç Local URL: http://127.0.0.1:{PORT}')
print(f'üìç Network URL: http://0.0.0.0:{PORT}')
print(f'üîß Endpoints:')
print(f'   - POST /upload       (upload videos & process)')
print(f'   - POST /upload_json  (upload JSON & download videos)')
print(f'   - GET  /status/{{id}}  (check processing status)')
print(f'   - GET  /results/{{id}} (get assessment results)')
print(f'   - GET  /upload_form  (test form)')
print('‚ÑπÔ∏è  Use Interrupt Kernel to stop the server')
print('‚îÅ' * 60)

‚è∏Ô∏è  Stopping previous server...
‚úÖ Previous server stopped.
‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ
üöÄ Server started successfully!
üìç Local URL: http://127.0.0.1:8888
üìç Network URL: http://0.0.0.0:8888
üîß Endpoints:
   - POST /upload       (upload videos & process)
   - POST /upload_json  (upload JSON & download videos)
   - GET  /status/{id}  (check processing status)
   - GET  /results/{id} (get assessment results)
   - GET  /upload_form  (test form)
‚ÑπÔ∏è  Use Interrupt Kernel to stop the server
‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ


<b><h2> NGROK

In [101]:
# Configure ngrok
# Set ngrok authtoken (dapatkan dari https://dashboard.ngrok.com/get-started/your-authtoken)
NGROK_AUTH_TOKEN = getpass.getpass('Enter your ngrok authtoken: ')
conf.get_default().auth_token = NGROK_AUTH_TOKEN

print('‚úÖ Ngrok configured successfully')

Enter your ngrok authtoken: ¬∑¬∑¬∑¬∑¬∑¬∑¬∑¬∑¬∑¬∑
‚úÖ Ngrok configured successfully


In [133]:
# Start server with ngrok
nest_asyncio.apply()
PORT = 8888

# Stop previous server if exists
if 'server_thread' in globals() and server_thread is not None:
    try:
        print('‚è∏Ô∏è  Stopping previous server...')
        if 'server' in globals() and server is not None:
            server.should_exit = True
        if server_thread.is_alive():
            server_thread.join(timeout=2)
        print('‚úÖ Previous server stopped.')
    except Exception as e:
        print(f'‚ö†Ô∏è  Error stopping previous server: {e}')

# Close previous ngrok tunnels
try:
    ngrok.kill()
except:
    pass

# Create server instance
config = uvicorn.Config(
    app=app,
    host='0.0.0.0',
    port=PORT,
    log_level='warning',
    access_log=False
)
server = uvicorn.Server(config=config)

# Run server in thread
def run_server_in_thread():
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        loop.run_until_complete(server.serve())
    except Exception as e:
        print(f'‚ùå Server error: {e}')
    finally:
        loop.close()

server_thread = threading.Thread(target=run_server_in_thread, daemon=True)
server_thread.start()

# Wait for server to start
time.sleep(2)

# Start ngrok tunnel
public_url = ngrok.connect(PORT, bind_tls=True)
ngrok_url = public_url.public_url

print('‚îè' + '‚îÅ' * 70 + '‚îì')
print('üöÄ Server started successfully with ngrok!')
print(f'üìç Local URL: http://127.0.0.1:{PORT}')
print(f'üåê Public URL (ngrok): {ngrok_url}')
print(f'üìã Copy this URL to use in Upload.js:')
print(f'   const VIDEO_ENDPOINT = "{ngrok_url}/upload";')
print(f'üìß Endpoints:')
print(f'   - POST {ngrok_url}/upload')
print(f'   - POST {ngrok_url}/upload_json')
print(f'   - GET  {ngrok_url}/status/{{id}}')
print(f'   - GET  {ngrok_url}/results/{{id}}')
print(f'   - GET  {ngrok_url}/upload_form')
print('‚ÑπÔ∏è  Ngrok tunnel will stay active while notebook is running')
print('‚ÑπÔ∏è  Use Interrupt Kernel to stop the server')
print('‚îó' + '‚îÅ' * 70 + '‚îõ')

   üßπ Cleaned: 636 ‚Üí 630 chars
   ‚úÖ Completed in 7.3s | 7 segments | 82 words
   üéØ Transcription Confidence: 98.68% ‚úÖ
   üìä Confidence Range: 98.54% - 98.86%
‚îÇ    ‚úÖ Transcription completed
‚è∏Ô∏è  Stopping previous server...
‚îÇ    üéØ Transcription Confidence: 98.68%
‚îÇ    üìù Text length: 630 chars
‚îÇ 2Ô∏è‚É£  TRANSLATION
‚úÖ Previous server stopped.
   ‚úÖ Translation: 630 ‚Üí 610 chars
‚îÇ    üåê Direction: Indonesian ‚Üí English
‚îÇ    ‚úÖ Translation completed in 0.7s
‚îÇ    üìù EN length: 610 chars
‚îÇ    üìù ID length: 630 chars
‚îÇ 2Ô∏è‚É£¬Ω CHEATING DETECTION

üîç Running Cheating Detection...

üéØ COMPREHENSIVE CHEATING DETECTION
   (Video Interview - Expected: 1 Person)

üëÅÔ∏è  STEP 1: Visual Analysis (Face Detection)
------------------------------------------------------------
‚îè‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ

In [146]:
# Cari session_id terbaru
log_files = [f for f in os.listdir('.') if f.startswith('session_') and f.endswith('.log')]
if log_files:
    latest_log = max(log_files, key=lambda x: os.path.getmtime(x))
    print(f"üìù Reading log: {latest_log}\n")
    print("="*70)

    with open(latest_log, 'r', encoding='utf-8') as f:
        content = f.read()
        print(content)

    print("="*70)
else:
    print("‚ö†Ô∏è No log files found")

üìù Reading log: session_4ec407d0b416464283cee9f97d44fa0b.log


üéôÔ∏è  SESSION: 4ec407d0b416464283cee9f97d44fa0b
üë§ CANDIDATE: Raifal Bagus
üåê LANGUAGE: Indonesian
üìπ VIDEOS: 3
üìù LOG FILE: session_4ec407d0b416464283cee9f97d44fa0b.log


‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ
Processing video 1/3
‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ

‚îå‚îÄ Video 1/3 ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ ‚ùì Question: Can you tell us about the challenges you faced while working...
‚îÇ üìÅ Local file: /content/uploads/20251212005439

## System Information

### Whisper Model
- **Library**: `faster-whisper` (optimized implementation)
- **Model**: `large-v3` (most accurate available)
- **Accuracy**: ~98% for clear English speech
- **Speed**: 4-5x faster than `openai-whisper`

### Translation
- **Provider**: DeepL API
- **Target Language**: Indonesian (ID)
- **Source Language**: English (EN)
- **Character Limit**: 5,000 per chunk
- **Setup**: Set `DEEPL_API_KEY` in cell 4
- **Get API Key**: https://www.deepl.com/pro-api (Free tier: 500,000 chars/month)

### LLM Assessment
- **Model**: meta-llama/Llama-2-7b-chat-hf
- **Method**: Hybrid (LLM + Static)
- **LLM Evaluated Criteria** (3):
  1. **Kualitas Jawaban** - Quality of answer (clarity, completeness, depth)
  2. **Koherensi** - Coherence (logical flow, consistency, structure)
  3. **Relevansi** - Relevance (alignment with question, staying on topic)
- **Static Dummy Values** (2):
  4. **Tempo Bicara** - Speaking tempo (fixed at 85/100) üîß *TODO: Replace with audio analysis model*
  5. **Confidence Score** - Confidence (fixed at 82/100) üîß *TODO: Replace with voice analysis model*
- **Cheating Detection**: LLM analyzes for multiple speakers, artificial voice, reading patterns
- **Fallback**: Rule-based assessment if LLM fails

### Performance
- **Device**: Automatically detects CUDA GPU (if available) or CPU
- **Compute Type**:
  - GPU: `float16` (faster with high accuracy)
  - CPU: `int8` (optimized for CPU)
- **VAD Filter**: Enabled (skips silence for efficiency)

### Settings
- **Beam Size**: 5 (higher = more accurate)
- **Best Of**: 5 (samples multiple candidates)
- **Patience**: 2.0 (thorough beam search)
- **Temperature**: 0.0 (deterministic output)
- **Context**: Uses previous text for better accuracy

### Storage Management
- **Auto-delete videos**: ‚úÖ Videos are automatically deleted after successful transcription
- **Storage saved**: Only transcriptions and results are kept
- **Safety**: Deletion only happens after successful transcription
- **Error handling**: If deletion fails, processing continues normally

### Endpoints
- `POST /upload` - Upload videos and start transcription
- `GET /status/{session_id}` - Check processing status
- **`GET /results/{session_id}`** - **Get assessment results**
- `GET /upload_form` - Test form interface
- `GET /` - System information

### Files
- ~~Uploaded videos: `uploads/`~~ (deleted after transcription) ‚ôªÔ∏è
- Transcriptions: `transcriptions/` ‚úÖ (includes English + Indonesian + Assessment)
- **Assessment results: `results/`** ‚úÖ

### Assessment Data Structure
```json
{
  "success": true,
  "name": "Candidate Name",
  "session": "session_id_here",
  "content": [
    {
      "id": 1,
      "question": "What is your experience with Python?",
      "result": {
        "penilaian": {
          "kualitas_jawaban": 85,    // ‚úÖ LLM evaluated
          "koherensi": 83,            // ‚úÖ LLM evaluated
          "relevansi": 80,            // ‚úÖ LLM evaluated
          "tempo_bicara": 85,         // üîß Static dummy (TODO: audio model)
          "confidence_score": 82,     // üîß Static dummy (TODO: voice model)
          "total": 83
        },
        "penilaian_akhir": 4,
        "cheating_detection": "Tidak",
        "keputusan_akhir": "Lulus",
        "transkripsi_en": "...",
        "transkripsi_id": "...",
        "metadata": {
          "assessment_method": "Hybrid (LLM + Static)",
          "llm_evaluated_criteria": ["kualitas_jawaban", "koherensi", "relevansi"],
          "static_criteria": ["tempo_bicara", "confidence_score"]
        }
      }
    }
  ],
  "metadata": {
    "assessment_method": "Hybrid (LLM + Static)",
    "llm_criteria": ["kualitas_jawaban", "koherensi", "relevansi"],
    "static_criteria": ["tempo_bicara", "confidence_score"]
  }
}
```

### Roadmap
- ‚úÖ **Phase 1**: LLM Assessment (kualitas, koherensi, relevansi)
- üîß **Phase 2**: Audio Analysis Model (tempo_bicara) - *Coming Soon*
- üîß **Phase 3**: Voice Analysis Model (confidence_score) - *Coming Soon*
- üîß **Phase 4**: Video Analysis (eye contact, body language) - *Future*

### Notes
- **3 criteria** evaluated by LLM with real intelligence
- **2 criteria** use static dummy values (will be replaced with specialized models)
- Static values: `tempo_bicara=85`, `confidence_score=82`
- Results saved automatically after transcription completes
- **Original video files are deleted after transcription to save storage**
- DeepL API key required for translation (free tier available)
- Access via: `http://127.0.0.1:8888/results/{session_id}`

### DeepL Setup
1. Sign up at https://www.deepl.com/pro-api
2. Get your free API key (500,000 chars/month)
3. Set `DEEPL_API_KEY` in cell 4
4. Restart kernel and run all cells