# Change detection algorithm for arousal and valence values.


In [None]:
import random
import os
import matplotlib.pyplot as plt
import numpy as np
import math
from typing import List, Tuple
from pydantic import BaseModel
from datetime import datetime, timedelta, time

from IPython.display import Audio, HTML

from welford import Welford

from matplotlib.collections import LineCollection
from matplotlib.colors import Normalize

class TimeStampFloat (BaseModel):
    timestamp: datetime
    value: float

class ClipInfo(BaseModel):
    start: time
    end: time
    file_name: str
    arousal: float
    valence: float

import requests
import librosa
import soundfile as sf

import pandas as pd

In [None]:
base_path = "/run/media/chr1s/chr1sdrive1/MuInf/kleiner-prinz/"
audio_path = f"{base_path}audio.mp3"
audio_clip_file_path = f"{base_path}audio_cuts"
csv_file_path = f"{base_path}audio_cuts.csv"
y, sr = librosa.load(audio_path, sr=None, mono=True)

In [None]:
split_length = sr * 10  # 10 seconds

In [None]:
y_splits = librosa.util.frame(y, frame_length=split_length, hop_length=split_length, axis=0)
len(y_splits)

In [None]:
def get_arousal_valence(audio_path: str) -> Tuple[float, float]:
    response = requests.post(
        "http://localhost:8000/recommend/from-speech",
        files={"file": open(audio_path, "rb")}
    )
    if response.status_code == 200:
        data = response.json()
        features = data.get("features", {})
        arousal = features.get("arousal", 0.0)
        valence = features.get("valence", 0.0)
        return arousal, valence
    else:
        raise Exception(f"Error: {response.status_code} - {response.text}")

In [None]:
def audio_split_to_file(split: np.ndarray, index: int) -> ClipInfo:
    audio_length = len(split) / sr

    start_seconds = index * 10
    end_seconds = start_seconds + audio_length

    start_time = (datetime.min + timedelta(seconds=start_seconds)).time()
    end_time = (datetime.min + timedelta(seconds=end_seconds)).time()
    
    audio_path = os.path.join(audio_clip_file_path, f"clip_{index}.wav")

    #if not os.path.exists(audio_clip_file_path):
    sf.write(audio_path, split, sr)

    arousal, valence = get_arousal_valence(audio_path)

    clip_info = ClipInfo(
        start=start_time,
        end=end_time,
        file_name=os.path.basename(audio_path),
        arousal=arousal,
        valence=valence
    )  

    
    return clip_info

In [None]:
audio_style = "<style>audio { width: 100%; }</style>"
display(HTML(audio_style))
Audio(audio_path)

In [None]:
data = pd.read_csv(csv_file_path)

In [None]:
data.head()

In [None]:
plt.figure(figsize=(17, 5))

arousal_values = data['arousal'].tolist()
valence_values = data['valence'].tolist()
time_str = data['start'].tolist()  # Already in string format

x = range(len(arousal_values))

plt.plot(x, arousal_values, label='Arousal', color='red')
plt.plot(x, valence_values, label='Valence', color='blue')

plt.ylabel('Emotion Values')
plt.legend()

n = 6  # Show every 5th label
plt.xticks(x[::n], time_str[::n], rotation=90)
plt.xlabel('Time')

plt.title('Arousal and Valence Over Time')
plt.tight_layout()
plt.show()

In [None]:
class SlopeDetector:
    def __init__(self, window_size: int = 10, sma: int = 3, gain: int = 10, threshold: float = 0.2) -> None:
        self.samples = np.array([0.5] * window_size, dtype=np.float32)
        self.samples_smoothed = np.array([0.5] * window_size, dtype=np.float32)
        self.window_size = window_size
        self.sma = sma
        self.index = 0

        self.gain = gain
        self.threshold = threshold

    def calc_samples_smoothed(self) -> None:
        for i in range(self.window_size):
            total = 0.0
            count = 0
            # Look back 'sma' elements (handling circular buffer)
            for j in range(max(0, self.sma - 1), -1, -1):
                pos = (i - j) % self.window_size
                total += self.samples[pos]
                count += 1
            self.samples_smoothed[i] = total / count

    def get_slope_probablity(self, value: float) -> float:
        self.calc_samples_smoothed()

        average = np.mean(self.samples_smoothed)
        std_dev = np.std(self.samples_smoothed)

        last_index = (self.index - 1) % self.window_size
        last_value = self.samples_smoothed[last_index]

        slope1 = value - last_value
        slope2 = last_value - average
        slope = (slope1 + slope2) / 2.0

        # Update buffer
        self.samples[self.index] = value
        self.index = (self.index + 1) % self.window_size

        if std_dev == 0:
            return 0.0  # Avoid division by zero
        
        slope_probablity = math.tanh((slope-self.threshold) * self.gain)
        slope_probablity = min(max(slope_probablity, 0), 1.0)
        return slope_probablity

In [None]:
#valence_slope_detector = SlopeDetector(window_size=25, sma=3, gain=10, threshold=0.03)
valence_slope_detector = SlopeDetector()
valence_sp_values = [
    valence_slope_detector.get_slope_probablity(value) for value in data['valence']
]
data['valence_slope_probablity'] = valence_sp_values

arousal_slope_detector = SlopeDetector(window_size=25, sma=3, gain=10, threshold=0.03)
arousal_sp_values = [
    arousal_slope_detector.get_slope_probablity(value) for value in data['arousal']
]
data['arousal_slope_probablity'] = arousal_sp_values
data['combined_slope_probability'] = (
    data['valence_slope_probablity'] + data['arousal_slope_probablity']
) / 2

In [None]:
probablity_threshold = 0.3
minimum_free_samples = 5 # in x10 seconds

# Create figure with two subplots
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 5), sharex=True)

# --- Valence Plot (Top) ---
# Create segments for valence
points_valence = np.array([x, valence_values]).T.reshape(-1, 1, 2)
segments_valence = np.concatenate([points_valence[:-1], points_valence[1:]], axis=1)

# Create LineCollection for valence
lc_valence = LineCollection(segments_valence, cmap='viridis', norm=plt.Normalize(min(valence_sp_values), max(valence_sp_values)))
lc_valence.set_array(np.array(valence_sp_values))
lc_valence.set_linewidth(2)
ax1.add_collection(lc_valence)

# Add colorbar for valence
cbar_valence = plt.colorbar(lc_valence, ax=ax1)
cbar_valence.set_label('Valence Slope Probability')

# Configure valence plot
ax1.set_ylabel('Valence')
ax1.set_title('Valence Over Time (Colored by Slope Probability)')

# --- Arousal Plot (Bottom) ---
# Create segments for arousal
points_arousal = np.array([x, arousal_values]).T.reshape(-1, 1, 2)
segments_arousal = np.concatenate([points_arousal[:-1], points_arousal[1:]], axis=1)

# Create LineCollection for arousal
lc_arousal = LineCollection(segments_arousal, cmap='plasma', norm=plt.Normalize(min(arousal_sp_values), max(arousal_sp_values)))
lc_arousal.set_array(np.array(arousal_sp_values))
lc_arousal.set_linewidth(2)
ax2.add_collection(lc_arousal)

# Add colorbar for arousal
cbar_arousal = plt.colorbar(lc_arousal, ax=ax2)
cbar_arousal.set_label('Arousal Slope Probability')

# Configure arousal plot
ax2.set_ylabel('Arousal')
ax2.set_title('Arousal Over Time (Colored by Slope Probability)')

# --- Highlight regions with combined_slope_probability >= probablity_threshold ---
combined_high = data['combined_slope_probability'] >= probablity_threshold
blocked_high = combined_high.copy()
i = 0
while i < len(blocked_high):
    if blocked_high[i]:  # When we find a True value
        # Block the next minimum_free_samples samples by setting them to False
        end_block = min(i + 1 + minimum_free_samples, len(blocked_high))
        blocked_high[i+1:end_block] = False
        i = end_block  # Jump to the end of the blocked region
    else:
        i += 1  # Move to the next sample

# Find contiguous regions where condition is True
changes = np.diff(blocked_high.astype(int))
starts = np.where(changes == 1)[0] + 1
ends = np.where(changes == -1)[0]

# Handle edge cases
if blocked_high.iloc[0]:
    starts = np.insert(starts, 0, 0)
if blocked_high.iloc[-1]:
    ends = np.append(ends, len(blocked_high)-1)

# Add horizontal lines for each region
for start, end in zip(starts, ends):
    # Valence plot
    ax1.axvspan(start, end, color='red', alpha=0.3)
    ax1.axhline(y=valence_values[start], color='red', linestyle='--', alpha=0.5, xmin=start/len(x), xmax=end/len(x))
    
    # Arousal plot
    ax2.axvspan(start, end, color='red', alpha=0.3)
    ax2.axhline(y=arousal_values[start], color='red', linestyle='--', alpha=0.5, xmin=start/len(x), xmax=end/len(x))

# --- Shared X-axis Configuration ---
n = 6  # Show every 6th label
ax2.set_xticks(x[::n])
ax2.set_xticklabels(time_str[::n], rotation=90)
ax2.set_xlabel('Time')

plt.tight_layout()
plt.show()

In [None]:
class WelfordSlopeDetector:
    def __init__(self, window_size: int = 10, gain: int = 10, threshold: float = 0.2) -> None:
        self.samples = np.full((window_size, 2), 0.5, dtype=np.float32)
        self.window_size = window_size
        self.index = 0
        self.gain = gain
        self.threshold = threshold
        self.prev_mean = np.array([0.5, 0.5], dtype=np.float32)

    def get_welford_values(self, values: np.ndarray) -> np.ndarray:
        self.samples[self.index] = values
        self.index = (self.index + 1) % self.window_size

        welford = Welford(self.samples)
        
        mean = welford.mean
        var_s = welford.var_s  # sample variance
        var_p = welford.var_p  # population variance

        # Stack the statistics along a new axis to create (3, 2) array
        return np.stack([mean, var_s, var_p])
    
    def get_slope_probablity(self, values: np.ndarray) -> float:
        welford_values = self.get_welford_values(values)

        mean = welford_values[0]

        delta = np.sum(np.abs(mean - self.prev_mean)) / 2

        slope_probablity = math.tanh((delta-self.threshold) * self.gain)
        slope_probablity = min(max(slope_probablity, 0), 1.0)
        
        # Update previous mean for next calculation
        self.prev_mean = mean
        
        return slope_probablity

In [None]:
welford_slope_detector = WelfordSlopeDetector()
welford_values = np.stack(
    data.apply(
        lambda row: welford_slope_detector.get_welford_values(np.array([row['arousal'], row['valence']])),
        axis=1
    )
)

welford_df = pd.DataFrame(
    {
        'arousal_mean': welford_values[:, 0, 0],
        'arousal_var_s': welford_values[:, 1, 0],
        'arousal_var_p': welford_values[:, 2, 0],
        'valence_mean': welford_values[:, 0, 1],
        'valence_var_s': welford_values[:, 1, 1],
        'valence_var_p': welford_values[:, 2, 1]
    }
)

In [None]:
# Create figure with two subplots
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 8), sharex=True)

# Valence Subplot
ax1.plot(data['valence'], label='Valence (Original)', color='blue', alpha=0.7)
ax1.plot(welford_df['valence_mean'], label='Valence (Welford Mean)', linestyle='--', color='red')
ax1.plot(welford_df['valence_var_s'], label='Valence (Sample Variance)', linestyle=':', color='green')
ax1.plot(welford_df['valence_var_p'], label='Valence (Population Variance)', linestyle='-.', color='purple')
ax1.set_ylabel('Valence')
ax1.legend()
ax1.grid(True, alpha=0.3)

# Arousal Subplot
ax2.plot(data['arousal'], label='Arousal (Original)', color='blue', alpha=0.7)
ax2.plot(welford_df['arousal_mean'], label='Arousal (Welford Mean)', linestyle='--', color='red')
ax2.plot(welford_df['arousal_var_s'], label='Arousal (Sample Variance)', linestyle=':', color='green')
ax2.plot(welford_df['arousal_var_p'], label='Arousal (Population Variance)', linestyle='-.', color='purple')
ax2.set_ylabel('Arousal')
ax2.legend()
ax2.grid(True, alpha=0.3)

plt.xlabel('Time (index)')
plt.suptitle('Valence & Arousal with Welford Statistics')
plt.tight_layout()
plt.show()


In [None]:
welford_slope_detector = WelfordSlopeDetector(gain=30, threshold=0.012)
welford_slope_probabilities = np.stack(
    data.apply(
        lambda row: welford_slope_detector.get_slope_probablity(np.array([row['arousal'], row['valence']])),
        axis=1
    )
)

# Create figure with two subplots
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 8), sharex=True)

# Valence Subplot
ax1.plot(data['valence'], label='Valence (Original)', color='blue', alpha=0.7)
ax1.plot(welford_slope_probabilities, label='Valence (Welford Mean)', linestyle='--', color='red')
ax1.set_ylabel('Valence')
ax1.legend()
ax1.grid(True, alpha=0.3)

# Arousal Subplot
ax2.plot(data['arousal'], label='Arousal (Original)', color='blue', alpha=0.7)
ax2.plot(welford_slope_probabilities, label='Arousal (Welford Mean)', linestyle='--', color='red')
ax2.set_ylabel('Arousal')
ax2.legend()
ax2.grid(True, alpha=0.3)

plt.xlabel('Time (index)')
plt.suptitle('Valence & Arousal with Welford Statistics')
plt.tight_layout()
plt.show()

In [None]:
probablity_threshold = 0.3
minimum_free_samples = 5  # in x10 seconds

# Create figure with two subplots
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 5), sharex=True)

# --- Valence Plot (Top) ---
# Create segments for valence
points_valence = np.array([x, valence_values]).T.reshape(-1, 1, 2)
segments_valence = np.concatenate([points_valence[:-1], points_valence[1:]], axis=1)

# Create LineCollection for valence - using welford_slope_probabilities for coloring
lc_valence = LineCollection(segments_valence, cmap='viridis', norm=plt.Normalize(min(welford_slope_probabilities), max(welford_slope_probabilities)))
lc_valence.set_array(np.array(welford_slope_probabilities))
lc_valence.set_linewidth(2)
ax1.add_collection(lc_valence)

# Add colorbar for valence
cbar_valence = plt.colorbar(lc_valence, ax=ax1)
cbar_valence.set_label('Slope Probability')

# Configure valence plot
ax1.set_ylabel('Valence')
ax1.set_title('Valence Over Time (Colored by Welford Slope Probability)')

# --- Arousal Plot (Bottom) ---
# Create segments for arousal
points_arousal = np.array([x, arousal_values]).T.reshape(-1, 1, 2)
segments_arousal = np.concatenate([points_arousal[:-1], points_arousal[1:]], axis=1)

# Create LineCollection for arousal - using welford_slope_probabilities for coloring
lc_arousal = LineCollection(segments_arousal, cmap='plasma', norm=plt.Normalize(min(welford_slope_probabilities), max(welford_slope_probabilities)))
lc_arousal.set_array(np.array(welford_slope_probabilities))
lc_arousal.set_linewidth(2)
ax2.add_collection(lc_arousal)

# Add colorbar for arousal
cbar_arousal = plt.colorbar(lc_arousal, ax=ax2)
cbar_arousal.set_label('Slope Probability')

# Configure arousal plot
ax2.set_ylabel('Arousal')
ax2.set_title('Arousal Over Time (Colored by Welford Slope Probability)')

# --- Highlight regions with welford_slope_probabilities >= probablity_threshold ---
combined_high = welford_slope_probabilities >= probablity_threshold
blocked_high = combined_high.copy()
i = 0
while i < len(blocked_high):
    if blocked_high[i]:  # When we find a True value
        # Block the next minimum_free_samples samples by setting them to False
        end_block = min(i + 1 + minimum_free_samples, len(blocked_high))
        blocked_high[i+1:end_block] = False
        i = end_block  # Jump to the end of the blocked region
    else:
        i += 1  # Move to the next sample

# Find contiguous regions where condition is True
changes = np.diff(blocked_high.astype(int))
starts = np.where(changes == 1)[0] + 1
ends = np.where(changes == -1)[0]

# Handle edge cases
if blocked_high[0]:
    starts = np.insert(starts, 0, 0)
if blocked_high[-1]:
    ends = np.append(ends, len(blocked_high)-1)

# Add horizontal lines for each region
for start, end in zip(starts, ends):
    # Valence plot
    ax1.axvspan(start, end, color='red', alpha=0.3)
    ax1.axhline(y=valence_values[start], color='red', linestyle='--', alpha=0.5, xmin=start/len(x), xmax=end/len(x))
    
    # Arousal plot
    ax2.axvspan(start, end, color='red', alpha=0.3)
    ax2.axhline(y=arousal_values[start], color='red', linestyle='--', alpha=0.5, xmin=start/len(x), xmax=end/len(x))

# --- Shared X-axis Configuration ---
n = 6  # Show every 6th label
ax2.set_xticks(x[::n])
ax2.set_xticklabels(time_str[::n], rotation=90)
ax2.set_xlabel('Time')

plt.tight_layout()
plt.show()