In [3]:
# Install dependencies
!pip install tensorflow librosa transformers sounddevice numpy streamlit pydub

Collecting sounddevice
  Downloading sounddevice-0.5.2-py3-none-any.whl.metadata (1.6 kB)
Collecting streamlit
  Downloading streamlit-1.46.1-py3-none-any.whl.metadata (9.0 kB)
Collecting watchdog<7,>=2.1.5 (from streamlit)
  Downloading watchdog-6.0.0-py3-none-manylinux2014_x86_64.whl.metadata (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.3/44.3 kB[0m [31m2.6 MB/s[0m eta [36m0:00:00[0m
Collecting pydeck<1,>=0.8.0b4 (from streamlit)
  Downloading pydeck-0.9.1-py2.py3-none-any.whl.metadata (4.1 kB)
Downloading sounddevice-0.5.2-py3-none-any.whl (32 kB)
Downloading streamlit-1.46.1-py3-none-any.whl (10.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.1/10.1 MB[0m [31m96.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pydeck-0.9.1-py2.py3-none-any.whl (6.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.9/6.9 MB[0m [31m131.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading watchdog-6.0.0-py3-none-manylinux201

In [4]:
import librosa
import numpy as np

def extract_audio_features(file_path):
    # Load audio file
    y, sr = librosa.load(file_path, sr=None)

    # Extract features
    features = {
        'mfcc': np.mean(librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13), axis=1),
        'chroma': np.mean(librosa.feature.chroma_stft(y=y, sr=sr), axis=1),
        'mel': np.mean(librosa.feature.melspectrogram(y=y, sr=sr), axis=1),
        'contrast': np.mean(librosa.feature.spectral_contrast(y=y, sr=sr), axis=1),
        'tonnetz': np.mean(librosa.feature.tonnetz(y=librosa.effects.harmonic(y), sr=sr), axis=1)
    }

    # Combine all features
    combined = np.concatenate([features['mfcc'], features['chroma'],
                             features['mel'], features['contrast'],
                             features['tonnetz']])
    return combined

In [5]:
from transformers import pipeline

text_classifier = pipeline(
    "text-classification",
    model="finiteautomata/bertweet-base-emotion-analysis",
    return_all_scores=True
)

def analyze_text_emotion(text):
    results = text_classifier(text)[0]
    return {item['label']: item['score'] for item in results}

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


config.json:   0%|          | 0.00/999 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/540M [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/295 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

bpe.codes: 0.00B [00:00, ?B/s]

added_tokens.json:   0%|          | 0.00/17.0 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/150 [00:00<?, ?B/s]

Device set to use cuda:0


In [6]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Concatenate, Dropout

# Audio model
audio_input = Input(shape=(180,))  # Adjust based on your feature size
audio_dense = Dense(128, activation='relu')(audio_input)
audio_dropout = Dropout(0.3)(audio_dense)

# Text model (using pre-extracted features)
text_input = Input(shape=(6,))  # 6 emotion scores from text model
text_dense = Dense(64, activation='relu')(text_input)

# Combine
combined = Concatenate()([audio_dropout, text_dense])
dense = Dense(64, activation='relu')(combined)
output = Dense(4, activation='softmax')(dense)  # 4 emotions

model = Model(inputs=[audio_input, text_input], outputs=output)
model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

In [7]:
!apt-get install -y libportaudio2
!pip install sounddevice

Reading package lists... Done
Building dependency tree... Done
Reading state information... Done
The following NEW packages will be installed:
  libportaudio2
0 upgraded, 1 newly installed, 0 to remove and 35 not upgraded.
Need to get 65.3 kB of archives.
After this operation, 223 kB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu jammy/universe amd64 libportaudio2 amd64 19.6.0-1.1 [65.3 kB]
Fetched 65.3 kB in 1s (84.1 kB/s)
Selecting previously unselected package libportaudio2:amd64.
(Reading database ... 126308 files and directories currently installed.)
Preparing to unpack .../libportaudio2_19.6.0-1.1_amd64.deb ...
Unpacking libportaudio2:amd64 (19.6.0-1.1) ...
Setting up libportaudio2:amd64 (19.6.0-1.1) ...
Processing triggers for libc-bin (2.35-0ubuntu3.8) ...
/sbin/ldconfig.real: /usr/local/lib/libtcm_debug.so.1 is not a symbolic link

/sbin/ldconfig.real: /usr/local/lib/libtbb.so.12 is not a symbolic link

/sbin/ldconfig.real: /usr/local/lib/libhwlo

In [8]:
import sounddevice as sd
import numpy as np
from pydub import AudioSegment

def record_audio(duration=5, sample_rate=44100):
    print(f"Recording for {duration} seconds...")
    recording = sd.rec(int(duration * sample_rate),
                      samplerate=sample_rate,
                      channels=1)
    sd.wait()
    return np.squeeze(recording)

def save_and_process(audio, filename="temp.wav", sample_rate=44100):
    # Save as WAV
    audio_segment = AudioSegment(
        audio.tobytes(),
        frame_rate=sample_rate,
        sample_width=audio.dtype.itemsize,
        channels=1
    )
    audio_segment.export(filename, format="wav")

    # Process
    audio_features = extract_audio_features(filename)
    return audio_features

In [9]:
import streamlit as st
import time

st.title("🎙️ Real-Time Emotion Detector")

if st.button("Start Recording"):
    with st.spinner("Recording for 5 seconds..."):
        audio = record_audio()
        audio_features = save_and_process(audio)

        st.audio("temp.wav")

        text = st.text_input("What did you say? (For text analysis)")
        if text:
            text_emotion = analyze_text_emotion(text)
            text_features = np.array(list(text_emotion.values()))

            # Predict (using our trained model)
            prediction = model.predict([audio_features.reshape(1, -1),
                                      text_features.reshape(1, -1)])
            emotions = ["happy", "sad", "angry", "neutral"]
            predicted_emotion = emotions[np.argmax(prediction)]

            st.success(f"Predicted emotion: {predicted_emotion}")

            # Show probabilities
            st.subheader("Emotion Probabilities")
            for e, p in zip(emotions, prediction[0]):
                st.write(f"{e}: {p:.2f}")

2025-07-07 14:00:00.838 
  command:

    streamlit run /usr/local/lib/python3.11/dist-packages/colab_kernel_launcher.py [ARGUMENTS]


In [10]:
!pip install kaggle



In [11]:
from google.colab import files
files.upload() # This will prompt you to upload the kaggle.json file

Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"rishabchouhan","key":"25667319687bb061aae25f3b24cdef02"}'}

In [12]:
import os
import stat

# Create the .kaggle directory if it doesn't exist
kaggle_dir = os.path.expanduser('~/.kaggle')
os.makedirs(kaggle_dir, exist_ok=True)

# Define the path to the kaggle.json file
kaggle_json_path = os.path.join(kaggle_dir, 'kaggle.json')

# Move the uploaded kaggle.json to the correct directory
# Assuming the file was uploaded to the current working directory
if os.path.exists('kaggle.json'):
    os.rename('kaggle.json', kaggle_json_path)
    print(f"Moved kaggle.json to {kaggle_json_path}")
else:
    print("kaggle.json not found in the current directory. Please upload it again.")

# Set the permissions to read/write only for the owner (600)
if os.path.exists(kaggle_json_path):
    os.chmod(kaggle_json_path, stat.S_IREAD | stat.S_IWRITE)
    print(f"Set permissions for {kaggle_json_path} to 600")
else:
    print("Cannot set permissions, kaggle.json not found.")

Moved kaggle.json to /root/.kaggle/kaggle.json
Set permissions for /root/.kaggle/kaggle.json to 600


In [13]:
import os
import zipfile
import librosa
import numpy as np
import pandas as pd
from datasets import Dataset, Audio, ClassLabel, Features

# 1. Set the dataset identifier
KAGGLE_DATASET_ID = "ejlok1/cremad"
DOWNLOAD_DIR = "crema_d_raw" # Directory to download the zip file and extract to

# 2. Authenticate Kaggle API (if not done by files.upload() or manual placement)
# This step is usually handled by the `kaggle.json` file once placed correctly.
# from kaggle.api.kaggle_api_extended import KaggleApi
# api = KaggleApi()
# api.authenticate()

# 3. Create download directory if it doesn't exist
os.makedirs(DOWNLOAD_DIR, exist_ok=True)

# 4. Download the dataset
print(f"Downloading {KAGGLE_DATASET_ID} from Kaggle...")
# The !kaggle command uses the CLI, which is typically easier in notebooks
!kaggle datasets download -d {KAGGLE_DATASET_ID} -p {DOWNLOAD_DIR} --unzip

print(f"Dataset downloaded and unzipped to {DOWNLOAD_DIR}/AudioWAV")

# 5. Define the base path to the audio files after unzipping
# Kaggle's CREMA-D usually unzips into an 'AudioWAV' folder inside the download directory
AUDIO_BASE_PATH = os.path.join(DOWNLOAD_DIR, "AudioWAV")

# 6. Prepare data for Hugging Face Dataset
data_list = []
# These are the 6 emotions from CREMA-D's naming convention
EMOTION_MAP = {
    'ANG': 'angry', 'DIS': 'disgust', 'FEA': 'fear',
    'HAP': 'happy', 'NEU': 'neutral', 'SAD': 'sad'
}
all_emotion_names = sorted(list(EMOTION_MAP.values()))

# Walk through the directory and collect file paths and labels
print("Processing audio files and extracting labels...")
for filename in os.listdir(AUDIO_BASE_PATH):
    if filename.endswith('.wav'):
        file_path = os.path.join(AUDIO_BASE_PATH, filename)

        # Parse emotion from filename (e.g., 1001_DFA_ANG_XX.wav)
        parts = filename.split('_')
        if len(parts) >= 3:
            emotion_code = parts[2].upper() # Ensure uppercase for map lookup
            emotion_label = EMOTION_MAP.get(emotion_code, None)

            if emotion_label:
                data_list.append({
                    'audio': file_path, # Path to the audio file
                    'label': emotion_label # Categorical label string
                })

# Convert to a Pandas DataFrame (optional, but often helpful)
df_crema_d = pd.DataFrame(data_list)
print(f"Found {len(df_crema_d)} audio files in CREMA-D.")
print("Emotion distribution:\n", df_crema_d['label'].value_counts())

# 7. Create the Hugging Face Dataset
# Determine actual sampling rate by loading a sample, if not known
# You can load one file to check:
# y_sample, sr_sample = librosa.load(df_crema_d['audio'].iloc[0], sr=None)
# print(f"Sample audio sampling rate: {sr_sample}") # Usually 16000 or 44100 for CREMA-D

custom_features = Features({
    'audio': Audio(sampling_rate=16000), # Assuming 16kHz for CREMA-D, adjust if needed
    'label': ClassLabel(names=all_emotion_names)
})

crema_d_dataset = Dataset.from_pandas(df_crema_d, features=custom_features)

print("\nSuccessfully created Hugging Face Dataset for CREMA-D:")
print(crema_d_dataset)
print(crema_d_dataset.features)

# Access a sample to verify
print("\nFirst sample from CREMA-D dataset:")
# Access the first sample using integer indexing, as there are no predefined splits
print(crema_d_dataset[0])

Downloading ejlok1/cremad from Kaggle...
Dataset URL: https://www.kaggle.com/datasets/ejlok1/cremad
License(s): ODC Attribution License (ODC-By)
Downloading cremad.zip to crema_d_raw
 89% 400M/451M [00:06<00:00, 86.8MB/s]
100% 451M/451M [00:06<00:00, 73.8MB/s]
Dataset downloaded and unzipped to crema_d_raw/AudioWAV
Processing audio files and extracting labels...
Found 7442 audio files in CREMA-D.
Emotion distribution:
 label
sad        1271
angry      1271
disgust    1271
happy      1271
fear       1271
neutral    1087
Name: count, dtype: int64

Successfully created Hugging Face Dataset for CREMA-D:
Dataset({
    features: ['audio', 'label'],
    num_rows: 7442
})
{'audio': Audio(sampling_rate=16000, mono=True, decode=True, id=None), 'label': ClassLabel(names=['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad'], id=None)}

First sample from CREMA-D dataset:
{'audio': {'path': 'crema_d_raw/AudioWAV/1084_IWW_NEU_XX.wav', 'array': array([ 0.00000000e+00,  0.00000000e+00,  0.00000000e+0

Let's set up your Kaggle API key for the command line interface.

In [14]:
!kaggle datasets list

ref                                                            title                                                size  lastUpdated                 downloadCount  voteCount  usabilityRating  
-------------------------------------------------------------  ---------------------------------------------  ----------  --------------------------  -------------  ---------  ---------------  
urvishahir/electric-vehicle-specifications-dataset-2025        Electric Vehicle Specs Dataset (2025)               16631  2025-06-20 07:14:56.777000           4302        105  1.0              
divyaraj2006/social-media-engagement                           social media engagement                              2142  2025-07-02 15:34:21.323000           1641         34  1.0              
adilshamim8/salaries-for-data-science-jobs                     Data Science, AI & ML Job Salaries in 2025        2315744  2025-07-01 07:25:40.620000           3058         69  1.0              
chaudharisanika/smartphones-da

In [15]:
import os
import shutil
# Path to your Hugging Face cache directory
# This is typically ~/.cache/huggingface/datasets
# You can also find it by running: from huggingface_hub import HfFileSystem; print(HfFileSystem().root)
hf_cache_dir = os.path.expanduser("~/.cache/huggingface/datasets")

# You want to clear the cache specifically for 'go_emotions'
go_emotions_cache_path = os.path.join(hf_cache_dir, "go_emotions")

if os.path.exists(go_emotions_cache_path):
    print(f"Clearing cache for go_emotions at: {go_emotions_cache_path}")
    shutil.rmtree(go_emotions_cache_path)
else:
    print(f"GoEmotions cache directory not found at: {go_emotions_cache_path}")

# You might also want to clear any general fsspec cache if the problem persists
fsspec_cache_dir = os.path.expanduser("~/.cache/fsspec")
if os.path.exists(fsspec_cache_dir):
    print(f"Clearing fsspec cache at: {fsspec_cache_dir}")
    shutil.rmtree(fsspec_cache_dir)

GoEmotions cache directory not found at: /root/.cache/huggingface/datasets/go_emotions


In [16]:
!pip install --upgrade datasets fsspec huggingface_hub

Collecting fsspec
  Using cached fsspec-2025.5.1-py3-none-any.whl.metadata (11 kB)


In [17]:
!pip install emoji



In [18]:
import warnings
warnings.filterwarnings("ignore", category=UserWarning, module='librosa')

In [19]:
# --- 1. SETUP AND CONFIGURATION ---
import os
import zipfile
import librosa
import numpy as np
import pandas as pd
from datasets import Dataset, Audio, ClassLabel, Features, load_dataset
from transformers import pipeline
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Concatenate, Dropout
from tensorflow.keras.utils import to_categorical
import random
from collections import defaultdict
import sounddevice as sd
from scipy.io.wavfile import write as write_wav
import time

# --- Kaggle API Setup (if needed) ---
# Make sure your kaggle.json is in the correct directory (~/.kaggle/kaggle.json)
# Or upload it in your Colab environment.

# --- Core Configuration ---
# Define the target emotions for the final model
TARGET_EMOTIONS = ["happy", "sad", "angry", "neutral"]
NUM_OUTPUT_EMOTIONS = len(TARGET_EMOTIONS)
emotion_to_int = {emotion: i for i, emotion in enumerate(TARGET_EMOTIONS)}

# Define the expected order of labels from the text model
BERTWEET_EMOTION_LABELS = ['anger', 'joy', 'optimism', 'sadness'] # We only need these for our mapping
TEXT_FEATURE_SIZE = len(BERTWEET_EMOTION_LABELS)

# Audio feature settings
DUMMY_SR = 16000
# A quick check to determine the size of the audio feature vector
dummy_y = np.random.rand(DUMMY_SR * 3)
# Note: extract_audio_features function is defined in the next block
# AUDIO_FEATURE_SIZE = extract_audio_features(dummy_y, DUMMY_SR).shape[0]
# The size is known to be 166 from the original code. We can hardcode it for now.
AUDIO_FEATURE_SIZE = 166

# Training Configuration
TRAIN_RATIO = 0.8
BUFFER_SIZE = 1024
BATCH_SIZE = 32
EPOCHS = 20

In [26]:
# --- 2. FEATURE EXTRACTION AND MODEL DEFINITION ---

# --- Audio Feature Extraction ---
def extract_audio_features(y, sr):
    """
    Extracts a feature vector from an audio waveform.
    """
    y = np.array(y, dtype=np.float32)
    features = {
        'mfcc': np.mean(librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13), axis=1),
        'chroma': np.mean(librosa.feature.chroma_stft(y=y, sr=sr), axis=1),
        'mel': np.mean(librosa.feature.melspectrogram(y=y, sr=sr), axis=1),
        'contrast': np.mean(librosa.feature.spectral_contrast(y=y, sr=sr), axis=1),
        'tonnetz': np.mean(librosa.feature.tonnetz(y=librosa.effects.harmonic(y), sr=sr), axis=1)
    }
    return np.concatenate(list(features.values()))

# --- Text Emotion Analysis Pipeline ---
# Initialize the pipeline here. We will use it on a dataset later.
text_classifier = pipeline(
    "text-classification",
    model="finiteautomata/bertweet-base-emotion-analysis",
    return_all_scores=True,
    device=0 # Use GPU if available
)

def analyze_text_emotion_batch(text_batch):
    """
    Analyzes a batch of texts using the pipeline for efficiency.
    """
    results_list = text_classifier(text_batch)
    batch_scores = []
    for results in results_list:
        emotion_scores = {label: 0.0 for label in BERTWEET_EMOTION_LABELS}
        for item in results:
            if item['label'] in emotion_scores:
                emotion_scores[item['label']] = item['score']
        batch_scores.append([emotion_scores[label] for label in BERTWEET_EMOTION_LABELS])
    return np.array(batch_scores, dtype=np.float32)

# --- Multimodal Model Architecture ---
def create_multimodal_model():
    """
    Creates and compiles the Keras multimodal model.
    """
    # Audio branch
    audio_input = Input(shape=(AUDIO_FEATURE_SIZE,), name='audio_input')
    audio_dense = Dense(128, activation='relu')(audio_input)
    audio_dropout = Dropout(0.3)(audio_dense)

    # Text branch
    text_input = Input(shape=(TEXT_FEATURE_SIZE,), name='text_input')
    text_dense = Dense(64, activation='relu')(text_input)

    # Fusion
    combined = Concatenate()([audio_dropout, text_dense])
    dense = Dense(64, activation='relu')(combined)
    output = Dense(NUM_OUTPUT_EMOTIONS, activation='softmax', name='emotion_output')(dense)

    model = Model(inputs=[audio_input, text_input], outputs=output)
    model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
    return model

# Create the model and print its summary
model = create_multimodal_model()
model.summary()

Device set to use cuda:0


In [27]:
# --- 3. DATASET LOADING AND PREPROCESSING ---

# --- Emotion Mapping Dictionaries ---
CREMA_D_TO_TARGET = { 'happy': 'happy', 'sad': 'sad', 'angry': 'angry', 'neutral': 'neutral', 'disgust': 'angry', 'fear': 'neutral' }
TESS_TO_TARGET = { 'happy': 'happy', 'sad': 'sad', 'angry': 'angry', 'neutral': 'neutral', 'disgust': 'angry', 'fear': 'neutral', 'surprise': 'happy' }
GOEMOTIONS_TO_TARGET = {
    'joy': 'happy', 'amusement': 'happy', 'excitement': 'happy', 'optimism': 'happy', 'love': 'happy',
    'sadness': 'sad', 'grief': 'sad', 'disappointment': 'sad', 'remorse': 'sad', 'shame': 'sad',
    'anger': 'angry', 'annoyance': 'angry', 'disapproval': 'angry', 'frustration': 'angry', 'disgust': 'angry',
    'neutral': 'neutral', 'realization': 'neutral', 'relief': 'neutral', 'surprise': 'neutral', 'admiration': 'neutral',
    'caring': 'neutral', 'desire': 'neutral', 'embarrassment': 'neutral', 'gratitude': 'neutral',
    'nervousness': 'neutral', 'pride': 'neutral', 'curiosity': 'neutral', 'confusion': 'neutral', 'indifference': 'neutral', 'fear': 'neutral'
}

# --- Helper function for mapping labels ---
def map_and_filter_labels(example, mapping_dict, feature_obj):
    """Maps labels from a source dataset to the target emotion set."""
    original_label_str = feature_obj.int2str(example['label'])
    mapped_label_str = mapping_dict.get(original_label_str)
    if mapped_label_str and mapped_label_str in TARGET_EMOTIONS:
        example['label'] = emotion_to_int[mapped_label_str]
        return True
    return False

# --- Load and Process CREMA-D ---
print("--- Processing CREMA-D ---")
!kaggle datasets download -d ejlok1/cremad -p crema_d_raw --unzip
crema_d_paths = [os.path.join("crema_d_raw/AudioWAV", f) for f in os.listdir("crema_d_raw/AudioWAV") if f.endswith('.wav')]
crema_d_labels = [CREMA_D_TO_TARGET.get(f.split('_')[2], 'unknown') for f in os.listdir("crema_d_raw/AudioWAV") if f.endswith('.wav')]
df_crema_d = pd.DataFrame({'audio': crema_d_paths, 'label': crema_d_labels})
df_crema_d = df_crema_d[df_crema_d['label'].isin(TARGET_EMOTIONS)]
if not df_crema_d.empty:
    crema_d_dataset = Dataset.from_pandas(df_crema_d).cast_column("audio", Audio(sampling_rate=DUMMY_SR))
    crema_d_dataset = crema_d_dataset.class_encode_column('label').shuffle(seed=42)
    print(f"Loaded {len(crema_d_dataset)} CREMA-D samples.\n")
else:
    crema_d_dataset = Dataset.from_dict({'audio': [], 'label': []})
    print("No CREMA-D samples loaded after filtering.\n")


# --- Load and Process TESS ---
print("--- Processing TESS ---")
!kaggle datasets download -d ejlok1/toronto-emotional-speech-set-tess -p tess_raw --unzip
tess_base_path = "tess_raw/TESS Toronto emotional speech set data"
tess_data = []
for folder in os.listdir(tess_base_path):
    emotion = folder.split('_')[-1].lower()
    if emotion == 'ps': emotion = 'surprise' # pleasant surprise
    mapped_emotion = TESS_TO_TARGET.get(emotion)
    if mapped_emotion in TARGET_EMOTIONS:
        for file in os.listdir(os.path.join(tess_base_path, folder)):
            if file.endswith('.wav'):
                tess_data.append({
                    'audio': os.path.join(tess_base_path, folder, file),
                    'label': mapped_emotion
                })
df_tess = pd.DataFrame(tess_data)
tess_dataset = Dataset.from_pandas(df_tess).cast_column("audio", Audio(sampling_rate=DUMMY_SR))
tess_dataset = tess_dataset.class_encode_column('label').shuffle(seed=42)
print(f"Loaded {len(tess_dataset)} TESS samples.\n")

# --- Load and Process GoEmotions ---
print("--- Processing GoEmotions ---")
goemotions_ds = load_dataset("go_emotions", "simplified")
go_emotion_labels = goemotions_ds['train'].features['labels'].feature
goemotions_data = []
for split in ['train', 'validation', 'test']:
    for item in goemotions_ds[split]:
        for label_id in item['labels']:
            emotion_str = go_emotion_labels.int2str(label_id)
            mapped_emotion = GOEMOTIONS_TO_TARGET.get(emotion_str)
            if mapped_emotion in TARGET_EMOTIONS:
                goemotions_data.append({'text': item['text'], 'label': mapped_emotion})
                break # Take the first mappable emotion
df_goemotions = pd.DataFrame(goemotions_data)
goemotions_dataset = Dataset.from_pandas(df_goemotions).class_encode_column('label').shuffle(seed=42)
print(f"Loaded {len(goemotions_dataset)} GoEmotions samples.\n")

--- Processing CREMA-D ---
Dataset URL: https://www.kaggle.com/datasets/ejlok1/cremad
License(s): ODC Attribution License (ODC-By)
Downloading cremad.zip to crema_d_raw
 96% 432M/451M [00:08<00:00, 31.1MB/s]
100% 451M/451M [00:08<00:00, 58.3MB/s]
No CREMA-D samples loaded after filtering.

--- Processing TESS ---
Dataset URL: https://www.kaggle.com/datasets/ejlok1/toronto-emotional-speech-set-tess
License(s): Attribution-NonCommercial-NoDerivatives 4.0 International (CC BY-NC-ND 4.0)
Downloading toronto-emotional-speech-set-tess.zip to tess_raw
 96% 409M/428M [00:01<00:00, 271MB/s]
100% 428M/428M [00:01<00:00, 356MB/s]


Casting to class labels:   0%|          | 0/2600 [00:00<?, ? examples/s]

Loaded 2600 TESS samples.

--- Processing GoEmotions ---


Casting to class labels:   0%|          | 0/51896 [00:00<?, ? examples/s]

Loaded 51896 GoEmotions samples.



In [25]:
# --- 4. UNIFIED DATASET CREATION AND TRAINING ---

# --- Combine Datasets ---
print("--- Creating Unified Multimodal Dataset ---")
audio_data = defaultdict(list)
# Add checks for dataset existence before iterating
if 'crema_d_dataset' in locals() and crema_d_dataset is not None:
    for sample in crema_d_dataset:
        if 'label' in sample and 'audio' in sample: # Ensure keys exist
            audio_data[sample['label']].append(sample['audio'])
else:
    print("Warning: crema_d_dataset not found or is None. Skipping.")

if 'tess_dataset' in locals() and tess_dataset is not None:
    for sample in tess_dataset:
        if 'label' in sample and 'audio' in sample: # Ensure keys exist
            audio_data[sample['label']].append(sample['audio'])
else:
    print("Warning: tess_dataset not found or is None. Skipping.")

text_data = defaultdict(list)
if 'goemotions_dataset' in locals() and goemotions_dataset is not None:
    for sample in goemotions_dataset:
        if 'label' in sample and 'text' in sample: # Ensure keys exist
            text_data[sample['label']].append(sample['text'])
else:
    print("Warning: goemotions_dataset not found or is None. Skipping.")


# Balance the dataset
# Initialize with a large number to find the minimum correctly
min_samples_audio = float('inf')
if audio_data:
    min_samples_audio = min(len(lst) for lst in audio_data.values())
else:
    min_samples_audio = 0 # No audio data

min_samples_text = float('inf')
if text_data:
    min_samples_text = min(len(lst) for lst in text_data.values())
else:
    min_samples_text = 0 # No text data

min_samples = min(min_samples_audio, min_samples_text)

print(f"Balancing dataset to {min_samples} samples per emotion.")

multimodal_data = []
if min_samples > 0: # Only proceed if there are samples to balance
    for label_int, emotion_str in enumerate(TARGET_EMOTIONS):
        # Check if enough samples exist for the current emotion
        if len(audio_data[label_int]) >= min_samples and \
           len(text_data[label_int]) >= min_samples:
            audios = random.sample(audio_data[label_int], min_samples)
            texts = random.sample(text_data[label_int], min_samples)
            for i in range(min_samples):
                multimodal_data.append({
                    'audio': audios[i],
                    'text': texts[i],
                    'label': label_int
                })
        else:
            print(f"Skipping emotion '{emotion_str}' (label {label_int}) due to insufficient samples ({len(audio_data[label_int])} audio, {len(text_data[label_int])} text) for balancing to {min_samples} samples.")

random.shuffle(multimodal_data)
df_multimodal = pd.DataFrame(multimodal_data)
unified_dataset = Dataset.from_pandas(df_multimodal)
print(f"Created unified dataset with {len(unified_dataset)} samples.")

# --- BATCHED FEATURE EXTRACTION ---
# Ensure extract_audio_features, analyze_text_emotion_batch are defined and accessible
def process_in_batches(batch):
    # 1. Audio feature extraction (remains per-sample but is efficient)
    # Access the 'array' key within the audio dictionary
    batch['audio_features'] = [extract_audio_features(item['array'], item['sampling_rate']) for item in batch['audio']]

    # 2. Text feature extraction (now done in a batch)
    batch['text_features'] = analyze_text_emotion_batch(batch['text'])

    # 3. One-hot encode labels
    batch['label_one_hot'] = to_categorical(batch['label'], num_classes=NUM_OUTPUT_EMOTIONS)
    return batch

# Apply the batched processing
if len(unified_dataset) > 0: # Only map if dataset is not empty
    processed_dataset = unified_dataset.map(
        process_in_batches,
        batched=True,
        batch_size=BATCH_SIZE,
        remove_columns=['audio', 'text', 'label']
    )

    # --- Create TensorFlow Datasets ---
    processed_dataset.set_format(type='tensorflow', columns=['audio_features', 'text_features', 'label_one_hot'])
    train_size = int(len(processed_dataset) * TRAIN_RATIO)

    tf_train_dataset = processed_dataset.select(range(train_size))
    tf_val_dataset = processed_dataset.select(range(train_size, len(processed_dataset)))

    def as_tf_dataset(ds):
        return tf.data.Dataset.from_tensor_slices(
            (
                {'audio_input': ds['audio_features'], 'text_input': ds['text_features']},
                ds['label_one_hot']
            )
        )

    train_ds = as_tf_dataset(tf_train_dataset).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
    val_ds = as_tf_dataset(tf_val_dataset).batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
    print("TensorFlow datasets are ready for training.")


    # --- Train the Model ---
    print("\n--- Starting Model Training ---")
    history = model.fit(
        train_ds,
        validation_data=val_ds,
        epochs=EPOCHS
    )
    print("\nModel training complete.")

    # --- Save the Model ---
    model.save("multimodal_emotion_model.h5")
    print("Model saved to multimodal_emotion_model.h5")
else:
    print("Unified dataset is empty. Skipping feature extraction, TensorFlow dataset creation, and model training.")


--- Creating Unified Multimodal Dataset ---
Balancing dataset to 400 samples per emotion.
Created unified dataset with 1600 samples.


Map:   0%|          | 0/1600 [00:00<?, ? examples/s]

AttributeError: 'list' object has no attribute 'astype'

In [None]:
# --- 5. REAL-TIME PREDICTION (STREAMLIT APP) ---
# Save this code as a separate file, e.g., 'app.py' and run with 'streamlit run app.py'

import streamlit as st
import numpy as np
import tensorflow as tf
import librosa
import sounddevice as sd
from scipy.io.wavfile import write as write_wav
from transformers import pipeline

# --- Load Model and Helper Functions (should be in the same file or imported) ---

# NOTE: You would need to redefine or import the following from the training script:
# - TARGET_EMOTIONS, AUDIO_FEATURE_SIZE, TEXT_FEATURE_SIZE
# - extract_audio_features()
# - analyze_text_emotion_batch() or a single-instance version for prediction

# For simplicity, we redefine them here.
TARGET_EMOTIONS = ["happy", "sad", "angry", "neutral"]
AUDIO_FEATURE_SIZE = 166
TEXT_FEATURE_SIZE = 4 # Based on ['anger', 'joy', 'optimism', 'sadness']
BERTWEET_EMOTION_LABELS = ['anger', 'joy', 'optimism', 'sadness']

def extract_audio_features(y, sr):
    y = y.astype(np.float32)
    features = {
        'mfcc': np.mean(librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13), axis=1),
        'chroma': np.mean(librosa.feature.chroma_stft(y=y, sr=sr), axis=1),
        'mel': np.mean(librosa.feature.melspectrogram(y=y, sr=sr), axis=1),
        'contrast': np.mean(librosa.feature.spectral_contrast(y=y, sr=sr), axis=1),
        'tonnetz': np.mean(librosa.feature.tonnetz(y=librosa.effects.harmonic(y), sr=sr), axis=1)
    }
    return np.concatenate(list(features.values()))

@st.cache_resource
def load_resources():
    """Load models and pipelines once."""
    model = tf.keras.models.load_model("multimodal_emotion_model.h5")
    text_classifier = pipeline(
        "text-classification",
        model="finiteautomata/bertweet-base-emotion-analysis",
        return_all_scores=True
    )
    return model, text_classifier

def analyze_text_emotion(text, classifier):
    """Analyzes a single text string for prediction."""
    if not text.strip():
        return np.zeros(TEXT_FEATURE_SIZE, dtype=np.float32)
    results = classifier(text)[0]
    emotion_scores = {label: 0.0 for label in BERTWEET_EMOTION_LABELS}
    for item in results:
        if item['label'] in emotion_scores:
            emotion_scores[item['label']] = item['score']
    return np.array([emotion_scores[label] for label in BERTWEET_EMOTION_LABELS], dtype=np.float32)


# --- Streamlit App UI ---
st.title("🎙️ Real-Time Multimodal Emotion Detector")

trained_model, text_pipe = load_resources()
st.success("Model and text pipeline loaded successfully!")

if 'recording' not in st.session_state:
    st.session_state.recording = None
if 'sample_rate' not in st.session_state:
    st.session_state.sample_rate = None

if st.button("🎤 Start 5-Second Recording"):
    with st.spinner("Recording..."):
        st.session_state.recording = sd.rec(int(5 * 44100), samplerate=44100, channels=1, dtype='float32')
        sd.wait()
        st.session_state.sample_rate = 44100
    st.success("Recording finished!")
    st.audio(st.session_state.recording, format="audio/wav", sample_rate=st.session_state.sample_rate)

if st.session_state.recording is not None:
    user_text = st.text_input("Enter the text you spoke:", "")

    if st.button("Analyze Emotion") and user_text:
        with st.spinner("Analyzing..."):
            # 1. Process Audio
            audio_data = np.squeeze(st.session_state.recording)
            y, sr = librosa.load(write_wav("temp.wav", st.session_state.sample_rate, audio_data), sr=16000)
            audio_feats = extract_audio_features(y, sr).reshape(1, AUDIO_FEATURE_SIZE)

            # 2. Process Text
            text_feats = analyze_text_emotion(user_text, text_pipe).reshape(1, TEXT_FEATURE_SIZE)

            # 3. Predict
            prediction = trained_model.predict([audio_feats, text_feats])[0]
            predicted_emotion_index = np.argmax(prediction)
            predicted_emotion = TARGET_EMOTIONS[predicted_emotion_index]

            st.subheader(f"Predicted Emotion: **{predicted_emotion.upper()}**")

            # Display probabilities
            df_probs = pd.DataFrame({'Emotion': TARGET_EMOTIONS, 'Probability': prediction})
            st.bar_chart(df_probs.set_index('Emotion'))

In [None]:
# Install pyngrok
!pip install -q pyngrok

# Authenticate ngrok with your authtoken
# Replace 'YOUR_AUTHTOKEN' with the token you copied from ngrok.com
from pyngrok import ngrok
ngrok.set_auth_token("2zY1bHyRIpTKEakvuS0YU8NuxpB_7phzmz3f3wtEJdv3NBmy3")

In [None]:
!npm install localtunnel

In [None]:
# Run Streamlit in the background, redirecting output to a log file
!streamlit run app.py &>/content/streamlit_logs.txt &

# Give Streamlit a moment to start (optional, but good practice)
import time
time.sleep(5)

# Start ngrok tunnel to port 8501 (Streamlit's default port)
from pyngrok import ngrok
public_url = ngrok.connect(addr="8501", proto="http")
print("Your Streamlit App is accessible at:", public_url)

In [None]:
!cat /content/streamlit_logs.txt

In [23]:
# Create the app.py file with the Streamlit code
# This code is copied from cell dhRb1nEXyN-8
streamlit_code = '''
import streamlit as st
import numpy as np
import tensorflow as tf
import librosa
import sounddevice as sd
from scipy.io.wavfile import write as write_wav
from transformers import pipeline
import os

# --- Load Model and Helper Functions ---

# NOTE: You would need to redefine or import the following from the training script:
# - TARGET_EMOTIONS, AUDIO_FEATURE_SIZE, TEXT_FEATURE_SIZE
# - extract_audio_features()
# - analyze_text_emotion_batch() or a single-instance version for prediction

# For simplicity, we redefine them here.
TARGET_EMOTIONS = ["happy", "sad", "angry", "neutral"]
AUDIO_FEATURE_SIZE = 166
TEXT_FEATURE_SIZE = 4 # Based on ['anger', 'joy', 'optimism', 'sadness']
BERTWEET_EMOTION_LABELS = ['anger', 'joy', 'optimism', 'sadness']

def extract_audio_features(y, sr):
    y = y.astype(np.float32)
    features = {
        'mfcc': np.mean(librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13), axis=1),
        'chroma': np.mean(librosa.feature.chroma_stft(y=y, sr=sr), axis=1),
        'mel': np.mean(librosa.feature.melspectrogram(y=y, sr=sr), axis=1),
        'contrast': np.mean(librosa.feature.spectral_contrast(y=y, sr=sr), axis=1),
        'tonnetz': np.mean(librosa.feature.tonnetz(y=librosa.effects.harmonic(y), sr=sr), axis=1)
    }
    return np.concatenate(list(features.values()))

@st.cache_resource
def load_resources():
    """Load models and pipelines once.""" # Changed back to triple double quotes as the outer is triple single
    # Ensure the model file exists
    if not os.path.exists("multimodal_emotion_model.h5"):
         st.error("Model file 'multimodal_emotion_model.h5' not found. Please run the training steps first.")
         return None, None

    model = tf.keras.models.load_model("multimodal_emotion_model.h5")
    text_classifier = pipeline(
        "text-classification",
        model="finiteautomata/bertweet-base-emotion-analysis",
        return_all_scores=True
    )
    return model, text_classifier

def analyze_text_emotion(text, classifier):
    """Analyzes a single text string for prediction.""" # Changed back to triple double quotes
    if not text.strip():
        return np.zeros(TEXT_FEATURE_SIZE, dtype=np.float32)
    results = classifier(text)[0]
    emotion_scores = {label: 0.0 for label in BERTWEET_EMOTION_LABELS}
    for item in results:
        if item['label'] in emotion_scores:
            emotion_scores[item['label']] = item['score']
    return np.array([emotion_scores[label] for label in BERTWEET_EMOTION_LABELS], dtype=np.float32)

# --- Streamlit App UI ---
st.title("🎙️ Real-Time Multimodal Emotion Detector")

trained_model, text_pipe = load_resources()

if trained_model is not None and text_pipe is not None:
    st.success("Model and text pipeline loaded successfully!")

    if 'recording' not in st.session_state:
        st.session_state.recording = None
    if 'sample_rate' not in st.session_state:
        st.session_state.sample_rate = None

    if st.button("🎤 Start 5-Second Recording"):
        with st.spinner("Recording..."):
            st.session_state.recording = sd.rec(int(5 * 44100), samplerate=44100, channels=1, dtype='float32')
            sd.wait()
            st.session_state.sample_rate = 44100
        st.success("Recording finished!")
        # Save the recording to a temporary file for playback and feature extraction
        temp_audio_file = "temp_recording.wav"
        write_wav(temp_audio_file, st.session_state.sample_rate, st.session_state.recording)
        st.audio(temp_audio_file, format="audio/wav", sample_rate=st.session_state.sample_rate)
        os.remove(temp_audio_file) # Clean up the temporary file


    if st.session_state.recording is not None:
        user_text = st.text_input("Enter the text you spoke:", "")

        if st.button("Analyze Emotion") and user_text:
            with st.spinner("Analyzing..."):
                # 1. Process Audio
                audio_data = np.squeeze(st.session_state.recording)
                # Need to save to a file temporarily for librosa to load
                temp_audio_file_for_librosa = "temp_for_librosa.wav"
                write_wav(temp_audio_file_for_librosa, st.session_state.sample_rate, audio_data)

                y, sr = librosa.load(temp_audio_file_for_librosa, sr=16000)
                audio_feats = extract_audio_features(y, sr).reshape(1, AUDIO_FEATURE_SIZE)

                os.remove(temp_audio_file_for_librosa) # Clean up the temporary file

                # 2. Process Text
                text_feats = analyze_text_emotion(user_text, text_pipe).reshape(1, TEXT_FEATURE_SIZE)

                # 3. Predict
                prediction = trained_model.predict([audio_feats, text_feats])[0]
                predicted_emotion_index = np.argmax(prediction)
                predicted_emotion = TARGET_EMOTIONS[predicted_emotion_index]

                st.subheader(f"Predicted Emotion: **{predicted_emotion.upper()}**")

                # Display probabilities
                df_probs = pd.DataFrame({'Emotion': TARGET_EMOTIONS, 'Probability': prediction})
                st.bar_chart(df_probs.set_index('Emotion'))
else:
    st.error("Failed to load model or text pipeline. Please check previous steps.")

''' # Changed to triple single quotes

with open("app.py", "w") as f:
    f.write(streamlit_code)

print("app.py created successfully.")

# Run the Streamlit app in the background
get_ipython().system_raw('streamlit run app.py &>/content/logs.txt &')

# Get your public IP (needed for localtunnel password)
!curl ipv4.icanhazip.com

# Start localtunnel
!npx localtunnel --port 8501

app.py created successfully.
34.16.221.118
[1G[0K⠙[1G[0K⠹[1G[0K⠸[1G[0K⠼[1G[0K⠴[1G[0K⠦[1G[0K⠧[1G[0K⠇[1G[0K⠏[1G[0K⠋[1G[0K⠙[1G[0K⠹[1G[0K⠸[1G[0K⠼[1G[0K⠴[1G[0K⠦[1G[0K⠧[1G[0K⠇[1G[0K⠏[1G[0K[1G[0JNeed to install the following packages:
localtunnel@2.0.2
Ok to proceed? (y) [20G^C
