In [None]:
%pip install git+https://github.com/openai/whisper.git
%pip install torch torchvision torchaudio
%pip install tensorflow transformers fer librosa soundfile moviepy ffmpeg opencv-python-headless
# !apt-get install -y ffmpeg

Collecting git+https://github.com/openai/whisper.git
  Cloning https://github.com/openai/whisper.git to /tmp/pip-req-build-0dr1r342
  Running command git clone --filter=blob:none --quiet https://github.com/openai/whisper.git /tmp/pip-req-build-0dr1r342
  Resolved https://github.com/openai/whisper.git to commit dd985ac4b90cafeef8712f2998d62c59c3e62d22
  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


In [5]:
%pip install moviepy==1.0.3 ffmpeg==1.4

Note: you may need to restart the kernel to use updated packages.


In [12]:
%pip install -U fer

Note: you may need to restart the kernel to use updated packages.


In [6]:
import os, json, math, tempfile, subprocess
import torch, whisper, cv2, numpy as np
import librosa, soundfile as sf
import tensorflow as tf
from fer import FER
from transformers import pipeline

# Setup TF GPU
gpus = tf.config.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)

def extract_audio(video_path, audio_path):
    subprocess.run(["ffmpeg", "-y", "-i", video_path, "-ar", "16000", "-ac", "1", "-vn", audio_path],
                   check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)

def transcribe_audio(audio_path, model_size, device):
    model = whisper.load_model(model_size, device=device)
    return model.transcribe(audio_path, word_timestamps=False)["segments"]

def extract_video_emotions(video_path, interval=0.5):
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    duration = cap.get(cv2.CAP_PROP_FRAME_COUNT) / fps
    detector = FER(mtcnn=True)
    vid_emotions = {}
    t = 0.0
    while t < duration:
        cap.set(cv2.CAP_PROP_POS_MSEC, t * 1000)
        ret, frame = cap.read()
        if not ret:
            break
        emo, score = detector.top_emotion(frame)
        vid_emotions[round(t, 3)] = {
            "emotion": emo or "unknown",
            "score": float(score or 0.0)
        }
        t += interval
    cap.release()
    return vid_emotions

def extract_audio_emotions(audio_path, interval=0.5, sr=16000, device_idx=0):
    y, _ = librosa.load(audio_path, sr=sr)
    hop = int(interval * sr)
    clf = pipeline("audio-classification", model="ehcalabres/wav2vec2-lg-xlsr-en-speech-emotion-recognition", device=device_idx)
    # clf = pipeline("audio-classification", model="superb/wav2vec2-base-superb-er", device=device_idx)
   
  
    audio_emotions = {}
    for i in range(0, len(y), hop):
        chunk = y[i:i+hop]
        if len(chunk) < hop:
            chunk = np.pad(chunk, (0, hop - len(chunk)))
        t = round(i / sr, 3)
        with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tf:
            sf.write(tf.name, chunk, sr)
            pred = clf(tf.name, top_k=1)[0]
        audio_emotions[t] = {"emotion": pred["label"], "score": float(pred["score"])}
        os.unlink(tf.name)
    return audio_emotions

def align_and_merge(segments, vid_e, aud_e, interval=0.5):
    rich = []
    for seg in segments:
        start, end, text = seg["start"], seg["end"], seg["text"].strip()
        mid = (start + end) / 2
        t_chunk = round(math.floor(mid / interval) * interval, 3)
        v = vid_e.get(t_chunk, {"emotion": "unknown", "score": 0.0})
        a = aud_e.get(t_chunk, {"emotion": "unknown", "score": 0.0})
        rich.append({
            "start": start,
            "end": end,
            "text": text,
            "video_emotion": v,
            "audio_emotion": a
        })
    return rich

def write_txt(rich, txt_path):
    def fmt_ts(s):
        m = int(s // 60)
        sec = s % 60
        return f"[{m:02d}:{sec:04.1f}]"
    with open(txt_path, "w", encoding="utf-8") as f:
        for item in rich:
            ts = fmt_ts(item["start"])
            ve = item["video_emotion"]["emotion"]
            ae = item["audio_emotion"]["emotion"]
            f.write(f"{ts} (face: {ve}, voice: {ae}) ‚Äú{item['text']}‚Äù\n")


In [None]:
from pathlib import Path

device = "cuda" if torch.cuda.is_available() else "cpu"
device_idx = 0 if device == "cuda" else -1
# video_path = Path("/content/katrina-stands-her-ground-to-the-almighty-harvey-specter-shorts-suits-ytshorts.savetube.me.mp4") # yt shorts - SUITS clip
# video_path = Path("/content/WhatsApp Video 2025-04-24 at 7.41.49 PM.mp4") # colab
video_path = Path("WhatsApp Video 2025-04-24 at 7.41.49 PM.mp4") # ishowspeed video

audio_path = "temp_audio.wav"
output_json = "rich_transcript_new_w2.json"
output_txt = "rich_transcript_new_w2.txt"
model_size = "base"

print("‚è∫ Extracting audio...")
extract_audio(video_path, audio_path)

print(f"ü§ñ Transcribing with Whisper on {device}...")
segments = transcribe_audio(audio_path, model_size, device)

print("üòä Detecting facial emotions...")
vid_e = extract_video_emotions(video_path)

print("üîä Detecting vocal emotions...")
aud_e = extract_audio_emotions(audio_path, device_idx=device_idx)

print("üîó Merging...")
rich = align_and_merge(segments, vid_e, aud_e)

print("üíæ Saving...")
with open(output_json, "w") as f:
    json.dump(rich, f, indent=2, ensure_ascii=False)
write_txt(rich, output_txt)
os.remove(audio_path)

print("‚úÖ Done!")


‚è∫ Extracting audio...


CalledProcessError: Command '['ffmpeg', '-y', '-i', PosixPath('/content/WhatsApp Video 2025-04-24 at 7.41.49 PM.mp4'), '-ar', '16000', '-ac', '1', '-vn', 'temp_audio.wav']' returned non-zero exit status 1.

In [2]:
%pip install accelerate 

Collecting accelerate
  Downloading accelerate-1.6.0-py3-none-any.whl.metadata (19 kB)
Downloading accelerate-1.6.0-py3-none-any.whl (354 kB)
Installing collected packages: accelerate
Successfully installed accelerate-1.6.0
Note: you may need to restart the kernel to use updated packages.


In [None]:
from typing import List, Dict, Any
import json
from pathlib import Path
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline
from accelerate import init_empty_weights, load_checkpoint_and_dispatch

class VideoLLM:
    def __init__(
        self,
        model_path: str = "dganochenko/llama-3-8b-chat",  # WE ARE NOT USING THIS
        device: str = "cuda" if torch.cuda.is_available() else "cpu",
        offload_folder: str = None,
        max_context_length: int = 4096,
        max_new_tokens: int = 512
    ):
        """Initialize the VideoLLM with a local Llama model."""
        self.device = device
        self.max_context_length = max_context_length
        self.max_new_tokens = max_new_tokens
        
        # Load model and tokenizer
        self.tokenizer = AutoTokenizer.from_pretrained(model_path)
        
        # Initialize model with empty weights and dispatch to device
        with init_empty_weights():
            self.model = AutoModelForCausalLM.from_pretrained(model_path,
                                                        torch_dtype=torch.float16 if device == "cuda" else torch.float32)
        self.model = load_checkpoint_and_dispatch(
            self.model, model_path, device_map="auto", offload_folder=offload_folder
        )
        
        self.pipeline = pipeline(
            "text-generation",
            model=self.model,
            tokenizer=self.tokenizer,
            # device=self.device
        )
        
        self.transcript_data = None
        
    def load_transcript(self, json_path: str) -> None:
        """Load the rich transcript from JSON file."""
        with open(json_path, 'r', encoding='utf-8') as f:
            self.transcript_data = json.load(f)
            
    def _format_transcript_for_context(self) -> str:
        """Format the transcript data into a context string."""
        if not self.transcript_data:
            raise ValueError("No transcript data loaded. Call load_transcript first.")
            
        context_parts = []
        for segment in self.transcript_data:
            timestamp = f"[{int(segment['start']//60):02d}:{segment['start']%60:04.1f}]"
            video_emotion = segment['video_emotion']['emotion']
            audio_emotion = segment['audio_emotion']['emotion']
            text = segment['text']
            
            segment_str = (
                f"{timestamp} "
                f"(facial expression: {video_emotion}, "
                f"voice emotion: {audio_emotion}) "
                f"\"{text}\""
            )
            context_parts.append(segment_str)
            
        return "\n".join(context_parts)
    
    def _create_prompt(self, question: str) -> str:
        """Create a prompt for the model combining context and question."""
        context = self._format_transcript_for_context()
        
        prompt = f"""Below is a transcript of a video with timestamps, facial expressions, and voice emotions detected.
Please analyze this information to answer the question.

Transcript:
{context}

Question: {question}

Please provide a detailed answer based on the video transcript and emotional information provided above.

Answer:"""
        return prompt
    
    def ask(self, question: str) -> str:
        """Ask a question about the video."""
        if not self.transcript_data:
            raise ValueError("No transcript data loaded. Call load_transcript first.")
            
        prompt = self._create_prompt(question)
        
        # Generate response
        response = self.pipeline(
            prompt,
            max_new_tokens=self.max_new_tokens,
            do_sample=True,
            temperature=0.7,
            top_p=0.95,
            num_return_sequences=1,
            eos_token_id=self.tokenizer.eos_token_id,
            pad_token_id=self.tokenizer.pad_token_id if self.tokenizer.pad_token_id else self.tokenizer.eos_token_id,
        )[0]['generated_text']
        
        # Extract the answer part (after "Answer:")
        answer = response.split("Answer:")[-1].strip()
        return answer

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
video_llm = VideoLLM(
    # model_path="meta-llama/Meta-Llama-3-8B",  # local repo or hf repo for models
    model_path="Meta-Llama-3-8B",
    device="auto",
    offload_folder="./offload",  # Optional: specify an offload folder for large models
)
video_llm.load_transcript("rich_transcript.json")

# Ask questions about the video
questions = [
    "What was the overall emotional state of the speaker in this video?",
    "Were there any moments where the facial expression didn't match the voice emotion?",
    "What was the main topic discussed in this video?",
    "At what timestamp did the speaker show the strongest emotional response?"
]

for question in questions:
    print(f"\nQ: {question}")
    answer = video_llm.ask(question)
    print(f"A: {answer}")

Loading checkpoint shards: 100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 4/4 [01:34<00:00, 23.51s/it]
Device set to use cpu



Q: What was the overall emotional state of the speaker in this video?
A: The speaker in this video was in a state of anger and frustration. They were upset and felt betrayed by the other person in the conversation. The speaker was also defensive and tried to justify their actions. The overall tone of the conversation was confrontational and aggressive.

Q: Were there any moments where the facial expression didn't match the voice emotion?
A: Yes, there were a few moments where the facial expression didn't match the voice emotion.

For example, at 00:24, the voice emotion is neutral, but the facial expression is angry. This could indicate that the speaker is trying to mask their true feelings or that they are feeling conflicted about the situation.

Another example is at 00:31, where the voice emotion is happy, but the facial expression is neutral. This could indicate that the speaker is trying to maintain a calm and collected demeanor, even though they may be feeling anxious or frustra

In [None]:
%pip install datasets

In [None]:
from datasets import Dataset

def prepare_rich_prompt_dataset(json_path):
    import json
    with open(json_path, 'r') as f:
        rich_data = json.load(f)

    def format_prompt(item):
        return {
            "prompt": [
                {
                    "role": "system",
                    "content": "You will see a transcript with facial and vocal emotion context. Answer in this format:\n<reasoning>\n...\n</reasoning>\n<answer>\n...\n</answer>"
                },
                {
                    "role": "user",
                    "content": f"At [{item['start']:.1f}s], the person said: \"{item['text']}\".\nFacial emotion: {item['video_emotion']['emotion']}, Vocal emotion: {item['audio_emotion']['emotion']}.\nWhat do you infer?"
                }
            ],
            "answer": None  # no reference answer
        }

    data = [format_prompt(item) for item in rich_data]
    return Dataset.from_list(data)


In [None]:
import re

def format_reward_func(completions, **kwargs):
    pattern = r"<reasoning>.*?</reasoning>\s*<answer>.*?</answer>"
    responses = [completion[0]["content"] for completion in completions]
    return [1.0 if re.search(pattern, r, re.DOTALL) else 0.0 for r in responses]

def emotion_word_match_reward_func(prompts, completions, **kwargs):
    responses = [completion[0]['content'].lower() for completion in completions]
    prompt_texts = [p[-1]['content'].lower() for p in prompts]
    rewards = []
    for prompt, response in zip(prompt_texts, responses):
        if any(em in response for em in ['happy', 'sad', 'angry', 'surprised']):
            rewards.append(0.5)
        else:
            rewards.append(0.0)
    return rewards


In [None]:
%pip install trl==0.15.2

In [None]:
from trl import GRPOConfig, GRPOTrainer

training_args = GRPOConfig(
    learning_rate=5e-6,
    per_device_train_batch_size=1,
    gradient_accumulation_steps=1,
    num_generations=4,
    max_prompt_length=256,
    max_completion_length=768,
    max_steps=100,
    save_steps=100,
    logging_steps=1,
    report_to="none",
    output_dir="./grpo_outputs",
)

trainer = GRPOTrainer(
    model=video_llm.model, # check
    processing_class=tokenizer,
    reward_funcs=[
        format_reward_func,
        emotion_word_match_reward_func
    ],
    args=training_args,
    train_dataset=prepare_rich_prompt_dataset("rich_transcript_new_w2.json"),
)
trainer.train()
