In [3]:

!pip install -q transformers torchaudio librosa soundfile numpy pandas scikit-learn tensorflow tensorflow_hub torch torchvision
!pip install -q git+https://github.com/openai/whisper.git
!pip install -q deepface


  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
  Preparing metadata (pyproject.toml) ... [?25l[?25hdone


In [4]:
import os, json, uuid, tempfile, math
import numpy as np
import torch
import librosa
import tensorflow_hub as hub
import tensorflow as tf
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from deepface import DeepFace
from moviepy.editor import VideoFileClip


25-10-25 05:35:26 - Directory /root/.deepface has been created
25-10-25 05:35:26 - Directory /root/.deepface/weights has been created


In [5]:
# Colab cell
# Text model (GoEmotions-like)
TEXT_MODEL_NAME = "bhadresh-savani/distilbert-base-uncased-emotion"
tokenizer = AutoTokenizer.from_pretrained(TEXT_MODEL_NAME)
text_model = AutoModelForSequenceClassification.from_pretrained(TEXT_MODEL_NAME)
text_model.eval().to("cuda" if torch.cuda.is_available() else "cpu")

# Audio model backbone (YAMNet for embeddings; optional)
yamnet_model = hub.load('https://tfhub.dev/google/yamnet/1')
# Colab cell
def analyze_text(text):
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(text_model.device)
    with torch.no_grad():
        outputs = text_model(**inputs)
    probs = torch.nn.functional.softmax(outputs.logits, dim=-1)[0].cpu().numpy()
    labels = text_model.config.id2label
    return {labels[i]: float(probs[i]) for i in range(len(labels))}


tokenizer_config.json:   0%|          | 0.00/291 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/768 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/112 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/268M [00:00<?, ?B/s]

In [6]:
# Colab cell
import whisper
asr_model = whisper.load_model("small")  # small = good accuracy/speed in Colab GPU

def transcribe_audio(wav_path, language=None):
    res = asr_model.transcribe(wav_path, language=language)
    return res["text"]
# Colab cell
def extract_audio_features(wav_path, sr=16000, n_mfcc=40):
    y, _ = librosa.load(wav_path, sr=sr)
    # trim / pad
    if y.shape[0] > sr*10:
        y = y[:sr*10]
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mfcc)
    mfcc_mean = np.mean(mfcc, axis=1)
    mfcc_std  = np.std(mfcc, axis=1)
    rms = np.mean(librosa.feature.rms(y=y))
    zcr = np.mean(librosa.feature.zero_crossing_rate(y=y))
    # pitch (yin)
    try:
        pitch = np.mean(librosa.yin(y, fmin=80, fmax=400))
    except Exception:
        pitch = 0.0
    feat = np.concatenate([mfcc_mean, mfcc_std, [rms, zcr, pitch]])
    return feat.astype(np.float32)


100%|███████████████████████████████████████| 461M/461M [00:18<00:00, 26.0MiB/s]


In [7]:
# Colab cell
def simple_audio_emotion(feat):
    # feat ends with [rms, zcr, pitch]; use rms,pitch heuristics
    rms = float(feat[-3])
    pitch = float(feat[-1])
    emo = {"sad":0.0, "happy":0.0, "neutral":0.0}
    if rms < 0.01:
        emo["sad"] = 0.8
    elif pitch > 180 and rms > 0.02:
        emo["happy"] = 0.8
    else:
        emo["neutral"] = 0.9
    return emo


In [8]:
# Colab cell
def extract_frames(video_path, out_dir="/tmp/frames", fps=1):
    os.makedirs(out_dir, exist_ok=True)
    clip = VideoFileClip(video_path)
    duration = clip.duration
    times = np.arange(0, duration, 1.0/fps)
    paths = []
    for i, t in enumerate(times):
        p = os.path.join(out_dir, f"frame_{i:04d}.jpg")
        clip.save_frame(p, t)
        paths.append(p)
    clip.close()
    return paths


In [9]:
# Colab cell
def analyze_frame_emotion(frame_path):
    try:
        res = DeepFace.analyze(frame_path, actions=['emotion'], enforce_detection=False)
        return res.get('emotion', None)
    except Exception as e:
        print("DeepFace error:", e)
        return None

def aggregate_video_emotions(frame_paths):
    agg = {}
    valid = 0
    for p in frame_paths:
        r = analyze_frame_emotion(p)
        if not r:
            continue
        if not agg:
            agg = {k: 0.0 for k in r.keys()}
        for k,v in r.items():
            agg[k] += v
        valid += 1
    if valid == 0:
        return None
    return {k: agg[k]/valid for k in agg}  # average probabilities


In [10]:
# Colab cell
BASELINE_DIR = "/content/baselines"
os.makedirs(BASELINE_DIR, exist_ok=True)

def baseline_file_for_user(user_id):
    return os.path.join(BASELINE_DIR, f"baseline_{user_id}.json")

def init_baseline(user_id):
    path = baseline_file_for_user(user_id)
    if os.path.exists(path):
        with open(path, "r") as f:
            return json.load(f)
    # default empty baseline
    b = {"text": None, "audio": None, "video": None, "count":0}
    with open(path, "w") as f:
        json.dump(b, f)
    return b

def ema_update(old, new, alpha=0.2):
    # old/new can be dict (emotion probs) or list/array
    if old is None:
        return new
    if isinstance(new, dict):
        res = {}
        for k in set(old.keys()).union(new.keys()):
            res[k] = alpha * new.get(k,0.0) + (1-alpha) * old.get(k,0.0)
        return res
    else:
        old = np.array(old)
        new = np.array(new)
        return (alpha * new + (1-alpha) * old).tolist()

def update_baseline(user_id, text_vec=None, audio_feat=None, video_vec=None, alpha=0.2):
    path = baseline_file_for_user(user_id)
    b = init_baseline(user_id)
    b["text"] = ema_update(b.get("text"), text_vec, alpha) if text_vec is not None else b.get("text")
    b["audio"] = ema_update(b.get("audio"), audio_feat.tolist() if audio_feat is not None else None, alpha) if (audio_feat is not None) else b.get("audio")
    b["video"] = ema_update(b.get("video"), video_vec, alpha) if video_vec is not None else b.get("video")
    b["count"] = b.get("count",0) + 1
    with open(path, "w") as f:
        json.dump(b, f)
    return b


In [11]:
# Colab cell
def compute_audio_zscore(baseline_audio, current_audio, eps=1e-6):
    # baseline_audio stored as list (ema). Use a crude std-estimate (not perfect) — for hackathon it's ok
    base = np.array(baseline_audio)
    curr = np.array(current_audio)
    # if variances unavailable, use relative diff fraction
    denom = np.maximum(np.abs(base), eps)
    z = (curr - base) / denom
    return np.mean(np.abs(z))  # return mean abs z across features

def compare_text_baseline(baseline_text, current_text):
    # baseline_text and current_text are dicts of probs. compute L1 diff
    if baseline_text is None: return 0.0
    keys = set(baseline_text.keys()).union(current_text.keys())
    return sum(abs(baseline_text.get(k,0)-current_text.get(k,0)) for k in keys) / len(keys)


In [12]:
def normalize_dict(d):
    s = sum(d.values()) + 1e-9
    return {k: v/s for k,v in d.items()}

def fuse_modality_scores(text_vec, audio_vec, video_vec=None, w_text=0.5, w_audio=0.35, w_video=0.15):
    # text_vec: dict probs, audio_vec: dict probs (we'll map simple_audio_emotion to same keys), video_vec: dict probs
    text_vec = normalize_dict(text_vec) if text_vec else {}
    audio_vec = normalize_dict(audio_vec) if audio_vec else {}
    video_vec = normalize_dict(video_vec) if video_vec else {}
    keys = set(text_vec.keys()).union(audio_vec.keys()).union(video_vec.keys())
    fused = {}
    for k in keys:
        fused[k] = w_text * text_vec.get(k,0.0) + w_audio * audio_vec.get(k,0.0) + w_video * video_vec.get(k,0.0)
    return fused


In [13]:
# Colab cell
def process_session(user_id, media_path, consent_video=False, do_calibration=False, text_override=None):
    """
    media_path: path to uploaded video OR audio-only file.
    consent_video: boolean
    do_calibration: if True, we will update baseline with this session (used during initial baseline collection)
    text_override: optional text string (if user types text instead of speaking)
    """
    # 1) if media is video and consent_video True, extract audio + frames
    audio_path = None
    frame_paths = None
    if consent_video and media_path.lower().endswith((".mp4",".mov",".avi")):
        # extract audio
        tmp_wav = f"/tmp/{uuid.uuid4().hex}.wav"
        clip = VideoFileClip(media_path)
        clip.audio.write_audiofile(tmp_wav, fps=16000, codec='pcm_s16le', verbose=False, logger=None)
        clip.close()
        audio_path = tmp_wav
        # extract frames (1 fps)
        frame_paths = extract_frames(media_path, out_dir=f"/tmp/frames_{uuid.uuid4().hex}", fps=1)
    else:
        # assume media_path is audio file already
        audio_path = media_path

    # 2) ASR
    transcript = transcribe_audio(audio_path) if text_override is None else text_override

    # 3) text analysis
    text_vec = analyze_text(transcript)

    # 4) audio features + audio emotion heuristics
    audio_feat = extract_audio_features(audio_path)
    audio_vec = simple_audio_emotion(audio_feat)

    # 5) video analysis if consent
    video_vec = None
    if consent_video and frame_paths:
        video_vec = aggregate_video_emotions(frame_paths)

    # 6) update baseline if calibration session
    if do_calibration:
        update_baseline(user_id, text_vec=text_vec, audio_feat=audio_feat, video_vec=video_vec)
        return {"status":"calibrated", "text":text_vec, "audio_sample": audio_vec, "video_sample": video_vec}

    # 7) load baseline for user and compare
    base = init_baseline(user_id)
    audio_dev = compute_audio_zscore(base.get("audio"), audio_feat) if base.get("audio") is not None else 0.0
    text_dev = compare_text_baseline(base.get("text"), text_vec) if base.get("text") is not None else 0.0

    # 8) fuse modalities
    fused = fuse_modality_scores(text_vec, audio_vec, video_vec, w_text=0.55, w_audio=0.35, w_video=0.10)

    # 9) interpret final state + use baseline diffs to raise flags
    primary = max(fused, key=fused.get)
    msg = interpret_state_simple(primary)  # see interpret_state_simple below
    # add calibration-aware suggestion
    flag = "stable"
    if audio_dev > 0.25 and text_dev > 0.15:
        flag = "elevated_concern"
    elif (audio_dev > 0.25) or (text_dev > 0.15):
        flag = "mild_concern"

    return {
        "transcript": transcript,
        "text_vec": text_vec,
        "audio_vec": audio_vec,
        "video_vec": video_vec,
        "fused": fused,
        "primary": primary,
        "message": msg,
        "baseline_audio_dev": audio_dev,
        "baseline_text_dev": text_dev,
        "flag": flag
    }


In [14]:
def interpret_state_simple(primary_label):
    low = {"sadness","sad","depressed","worry","fear","disgust"}
    pos = {"joy","happy","love","admiration","surprise"}
    if primary_label.lower() in low:
        return "⚠️ You appear lower than usual. Consider reaching out to someone you trust."
    elif primary_label.lower() in pos:
        return "😊 You seem in a positive mood today."
    else:
        return "🙂 Your mood appears neutral."


In [15]:
!pip install pydub ffmpeg
from pydub import AudioSegment
# Load the .m4a file
audio = AudioSegment.from_file("ak1.m4a", format="m4a")

# Export as .wav
audio.export("output.wav", format="wav")


Collecting ffmpeg
  Downloading ffmpeg-1.4.tar.gz (5.1 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: ffmpeg
  Building wheel for ffmpeg (setup.py) ... [?25l[?25hdone
  Created wheel for ffmpeg: filename=ffmpeg-1.4-py3-none-any.whl size=6083 sha256=a82b04004089e9324d8aaac97dabed79fc93d5c6c37cebf06f11a75b6823afc5
  Stored in directory: /root/.cache/pip/wheels/26/21/0c/c26e09dff860a9071683e279445262346e008a9a1d2142c4ad
Successfully built ffmpeg
Installing collected packages: ffmpeg
Successfully installed ffmpeg-1.4


<_io.BufferedRandom name='output.wav'>

In [18]:
audio = AudioSegment.from_file("ak2.m4a", format="m4a")

# Export as .wav
audio.export("output2.wav", format="wav")

<_io.BufferedRandom name='output2.wav'>

In [16]:
process_session(user_id="ak123",media_path="/content/output.wav",consent_video=False,do_calibration=True)

{'status': 'calibrated',
 'text': {'sadness': 0.0007108055287972093,
  'joy': 0.997940719127655,
  'love': 0.0004175841750111431,
  'anger': 0.00040509592508897185,
  'fear': 0.00034439656883478165,
  'surprise': 0.00018138778978027403},
 'audio_sample': {'sad': 0.0, 'happy': 0.8, 'neutral': 0.0},
 'video_sample': None}

In [19]:
res=process_session(user_id="ak123",media_path="/content/output2.wav",consent_video=False,do_calibration=False)


In [20]:
print(res["message"],res["flag"])

😊 You seem in a positive mood today. mild_concern


In [21]:
audio = AudioSegment.from_file("ak3.m4a", format="m4a")

# Export as .wav
audio.export("output3.wav", format="wav")

<_io.BufferedRandom name='output3.wav'>

In [22]:
res=process_session(user_id="ak123",media_path="/content/output3.wav",consent_video=False,do_calibration=False)
print(res["message"],res["flag"])

😊 You seem in a positive mood today. mild_concern
