### **Imports**

In [1]:
!python --version

Python 3.10.12


In [None]:
# !pip install datasets

In [None]:
import torch
import torchaudio
import librosa
import numpy as np

from datasets import Dataset
from collections import Counter
from transformers import pipeline

from concurrent.futures import ThreadPoolExecutor, as_completed

### **Constants**

In [None]:
AUDIO_FILE = "./harvard.wav"

SAMPLING_RATE = 20000
CHUNK_DURATION = 10 # in seconds
CHUNK_OVERLAP_DURATION = 2 # in seconds

EMOTION_TOP_N_RESULTS = 3

### **Preprocessing**

In [None]:
def create_chunks_with_padding(audio, sr, start, end, chunk_duration, step_size, min_size):

    local_chunks = []
    for i in range(start, end, int(step_size * sr)):
        chunk_end = i + int(chunk_duration * sr)
        chunk = audio[i:chunk_end]

        if len(chunk) < min_size:
            chunk = np.pad(chunk, (0, min_size - len(chunk)), mode="constant")

        local_chunks.append((chunk, i / sr, chunk_end / sr))
    return local_chunks

def chunk_audio_parallel_with_padding(audio_path, chunk_duration=CHUNK_DURATION, overlap=CHUNK_OVERLAP_DURATION, sample_rate=SAMPLING_RATE, num_workers=4):

    audio, sr = librosa.load(audio_path, sr=sample_rate, mono=True)
    step_size = chunk_duration - overlap
    min_size = int(chunk_duration * sr)

    # Divide the audio range into sections for parallel processing
    total_length = len(audio)
    section_size = total_length // num_workers
    sections = [(audio, sr, i, min(i + section_size, total_length), chunk_duration, step_size, min_size)
                for i in range(0, total_length, section_size)]

    # Process each section in parallel
    chunks = []
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        results = executor.map(lambda args: create_chunks_with_padding(*args), sections)
        for result in results:
            chunks.extend(result)

    return chunks

### **Tone / Emotion Analysis**

In [None]:
pipe = pipeline(
    "audio-classification",
    model="ehcalabres/wav2vec2-lg-xlsr-en-speech-emotion-recognition",
    device=0
)

Some weights of the model checkpoint at ehcalabres/wav2vec2-lg-xlsr-en-speech-emotion-recognition were not used when initializing Wav2Vec2ForSequenceClassification: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.output.bias', 'classifier.output.weight']
- This IS expected if you are initializing Wav2Vec2ForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing Wav2Vec2ForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of Wav2Vec2ForSequenceClassification were not initialized from the model checkpoint at ehcalabres/wav2vec2-lg-xlsr-en-speech-emotion-recognition and are newly initialized: ['classifier.bias', 'classifier.weight', '

In [None]:
def process_chunk(chunk_data):
    chunk, start, end = chunk_data
    chunk_results = pipe(chunk)

    print("Debug : ", chunk_results, "\n\n")

    return {
        "timestep": f"{start:.2f}-{end:.2f}",
        "emotion": chunk_results[0]["label"],
        "score": chunk_results[0]["score"]
    }

def analyze_chunks_parallel(chunks, max_workers=8):
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = list(executor.map(process_chunk, chunks))
    return results

### **Pitch and Modulation Analysis**

In [None]:
def extract_pitch(audio, sr):
    pitches, magnitudes = librosa.core.piptrack(y=audio, sr=sr)
    pitch = [np.max(pitches[:, t]) for t in range(pitches.shape[1])]
    return np.array(pitch)

def extract_loudness(audio):
    rms = librosa.feature.rms(y=audio)
    return rms[0]

def analyze_modulation_and_pitch(audio, sr):
    pitch = extract_pitch(audio, sr)
    loudness = extract_loudness(audio)

    pitch_variation = np.std(pitch)
    loudness_variation = np.std(loudness)

    return pitch, loudness, pitch_variation, loudness_variation

### **Feedback Generation**

In [None]:
def generate_feedback(pitch, loudness, pitch_variation, loudness_variation, emotion):
    feedback = {}

    if emotion == 'happy':
        feedback["modulation"] = "Great job! Your speech has excellent variation in volume and pitch. It conveys positivity and energy."
    elif emotion == 'anger':
        feedback["modulation"] = "Your speech has strong variation in pitch and volume, conveying intensity. Be mindful of the tone to avoid sounding overly aggressive."
    elif emotion == 'sad':
        feedback["modulation"] = "Your speech lacks variation, which is typical for a sad tone. Try to vary your pitch and loudness to convey more nuance."
    elif emotion == 'neutral':
        feedback["modulation"] = "Your speech is steady, which is good for neutral delivery, but varying pitch and loudness could make it more engaging."

    if emotion == 'happy':
        if np.mean(pitch) < 180:
            feedback["pitch"] = "Your pitch could be a little higher to reflect more excitement. Try to increase your pitch to convey more energy."
        else:
            feedback["pitch"] = "Your pitch is in a great range for happy speech. Keep the energy high!"
    elif emotion == 'anger':
        if np.mean(pitch) < 220:
            feedback["pitch"] = "Your pitch is on the lower side for an angry tone. Consider raising your pitch for more intensity and to emphasize anger."
        else:
            feedback["pitch"] = "Your pitch is strong and conveys anger well, but be careful not to sound too harsh."
    elif emotion == 'sad':
        if np.mean(pitch) > 150:
            feedback["pitch"] = "For a sad tone, your pitch is higher than expected. Try lowering your pitch slightly to better convey sorrow."
        else:
            feedback["pitch"] = "Your pitch matches the sad tone well. Keep it steady and low to maintain the emotional depth."
    elif emotion == 'neutral':
        feedback["pitch"] = "Your pitch is within a comfortable range. It's neutral, but adding more variation could enhance engagement."

    return feedback

### **Speech Analyzer**

In [None]:
def classify_audio(examples):
    audio_list = [np.array(audio, dtype=np.float32) for audio in examples["audio"]]

    # Classify emotions for each chunk of audio
    results = pipe(audio_list)

    emotions, scores = [], []

    for res in results :
      t1, t2 = [], []
      for i in range(EMOTION_TOP_N_RESULTS) :
        t1.append(res[i]["label"])
        t2.append(res[i]["score"])
      emotions.append(t1)
      scores.append(t2)

    # emotions = [res[0]["label"] for res in results]
    # scores = [res[0]["score"] for res in results]

    # Initialize the results
    feedback_list = []

    # Process each audio chunk
    for i, audio in enumerate(audio_list):
        # Extract pitch and loudness
        pitch, loudness, pitch_variation, loudness_variation = analyze_modulation_and_pitch(audio, SAMPLING_RATE)

        feedback = generate_feedback(pitch, loudness, pitch_variation, loudness_variation, emotions[i])

        feedback_list.append({
            "timestep": examples["timestep"][i],
            "emotion": emotions[i],
            "score": scores[i],
            # "pitch": pitch.tolist(),
            # "loudness": loudness.tolist(),
            # "feedback": feedback
        })

    return {"results": feedback_list}

In [None]:
chunks = chunk_audio_parallel_with_padding(AUDIO_FILE, chunk_duration=CHUNK_DURATION, overlap=CHUNK_OVERLAP_DURATION)
data = [
    {"audio": np.array(chunk[0], dtype=np.float32), "timestep": chunk[1]}
    for chunk in chunks
]
dataset = Dataset.from_list(data)

In [None]:
dataset = dataset.map(classify_audio, batched=True, batch_size=8)

Map:   0%|          | 0/4 [00:00<?, ? examples/s]

### **Just an idea : (Smoothing and N-Gram Like Analysis)**

- Instead of taking the top emotion, take the top 2 or 3
- For every 2-3 timesteps (depending on the `CHUNK_DURATION`), <br>take the 2 most occurring emotion and assign to all of them (**smoothing basically**)
- Using this combination on emotions, make a judgement

In [None]:
results = dataset.to_pandas()["results"].to_list()

In [None]:
emotions = [res["emotion"] for res in results]

In [None]:
import numpy as np
from collections import Counter

SMOOTHING_FACTOR = 3

smoothed_emotions = []

for i in range(0, len(emotions), SMOOTHING_FACTOR):

    temp = emotions[i : i + SMOOTHING_FACTOR]

    flat_temp = [emotion.tolist() if isinstance(emotion, np.ndarray) else emotion for emotion in temp]

    flat_temp = [item for sublist in flat_temp for item in (sublist if isinstance(sublist, list) else [sublist])]

    emotion_counts = Counter(flat_temp)

    most_frequent_emotion = emotion_counts.most_common(2)
    smoothed_emotions.append(most_frequent_emotion)

    print(i, flat_temp, "Most frequent:", most_frequent_emotion)

print("Smoothed Emotions:", smoothed_emotions)

0 ['calm', 'disgust', 'happy', 'disgust', 'calm', 'neutral', 'disgust', 'neutral', 'angry'] Most frequent: [('disgust', 3), ('calm', 2)]
3 ['disgust', 'happy', 'surprised'] Most frequent: [('disgust', 1), ('happy', 1)]
Smoothed Emotions: [[('disgust', 3), ('calm', 2)], [('disgust', 1), ('happy', 1)]]
