In [None]:
import json
import os
import numpy as np
import matplotlib.pyplot as plt
from dataclasses import dataclass
from typing import Optional, Dict, Any

try:
    from sklearn.decomposition import PCA
    from sklearn.feature_extraction.text import TfidfVectorizer
except Exception as e:
    PCA = None
    TfidfVectorizer = None

from scipy.signal import spectrogram

def zcr(signal: np.ndarray) -> float:
    return ((signal[:-1] * signal[1:]) < 0).mean()

def rms_energy(signal: np.ndarray) -> float:
    return np.sqrt(np.mean(signal**2) + 1e-12)

def normalize_minmax(x: float, xmin: float, xmax: float) -> float:
    if xmax - xmin < 1e-12:
        return 0.5
    return (x - xmin) / (xmax - xmin)

@dataclass
class AudioFeatureExtractor:
    target_sr: int = 16000
    nperseg: int = 512
    noverlap: int = 256

    def _resample(self, x: np.ndarray, sr: int) -> np.ndarray:
        if sr == self.target_sr:
            return x
        t_old = np.linspace(0, len(x) / sr, num=len(x), endpoint=False)
        t_new = np.linspace(0, len(x) / sr, num=int(len(x) * self.target_sr / sr), endpoint=False)
        return np.interp(t_new, t_old, x).astype(np.float32)

    def compute_spectrogram(self, x: np.ndarray, sr: int) -> np.ndarray:
        x = self._resample(x, sr)
        freqs, times, Sxx = spectrogram(x, fs=self.target_sr, nperseg=self.nperseg, noverlap=self.noverlap)
        Sxx = np.log1p(Sxx)
        return Sxx

    def _pca_fallback(self, S: np.ndarray, dim: int = 128) -> np.ndarray:
        if PCA is not None:
            pca = PCA(n_components=min(dim, min(S.shape)-1))
            X = S - S.mean(axis=0, keepdims=True)
            try:
                Z = pca.fit_transform(X)
                feat = Z.mean(axis=0)
            except Exception:
                flat = S.flatten()
                idx = np.linspace(0, flat.size - 1, dim).astype(int)
                feat = flat[idx]
            if feat.size < dim:
                tmp = np.zeros(dim, dtype=np.float32)
                tmp[:feat.size] = feat.astype(np.float32)
                return tmp
            return feat[:dim].astype(np.float32)
        else:
            flat = S.flatten()
            if flat.size > dim:
                idx = np.linspace(0, flat.size - 1, dim).astype(int)
                return flat[idx].astype(np.float32)
            else:
                out = np.zeros(dim, dtype=np.float32)
                out[:flat.size] = flat.astype(np.float32)
                return out

    def extract(self, x: np.ndarray, sr: int) -> Dict[str, Any]:
        S = self.compute_spectrogram(x, sr)
        audio_emb = self._pca_fallback(S, dim=128)
        prosody = {
            "rms": float(rms_energy(x)),
            "zcr": float(zcr(x)),
        }
        return {
            "embedding": audio_emb,
            "embedding_source": "PCA-fallback",
            "spectrogram": S,
            "spectrogram_shape": S.shape,
            "prosody": prosody,
        }

@dataclass
class TextEmbedder:
    def embed(self, text: str) -> Dict[str, Any]:
        vocab = [
            "good","great","excellent","happy","satisfied","love","amazing","nice","pleasant","delight",
            "bad","terrible","awful","angry","upset","hate","disappointed","poor","rude","horrible"
        ]
        if TfidfVectorizer is not None:
            vec = TfidfVectorizer(vocabulary=vocab)
            X = vec.fit_transform([text]).toarray().squeeze().astype(np.float32)
        else:
            toks = [t.strip(".,!?").lower() for t in text.split()]
            X = np.array([toks.count(w) for w in vocab], dtype=np.float32)
        return {"embedding": X, "embedding_source": "TFIDF-lexicon-fallback"}

@dataclass
class SentimentHead:
    def predict(self, audio_emb: np.ndarray, text_emb: np.ndarray, prosody: Dict[str, float], text: str) -> Dict[str, Any]:
        pos_words = {"good","great","excellent","happy","satisfied","love","amazing","nice","pleasant","delight"}
        neg_words = {"bad","terrible","awful","angry","upset","hate","disappointed","poor","rude","horrible"}
        toks = [t.strip(".,!?").lower() for t in text.split()]
        pos = sum(w in pos_words for w in toks)
        neg = sum(w in neg_words for w in toks)
        text_score = (pos - neg) / (len(toks) + 1e-6)
        rms = prosody.get("rms", 0.0)
        zc = prosody.get("zcr", 0.0)
        rms_n = normalize_minmax(rms, 0.0, 0.2)
        zc_n = normalize_minmax(zc, 0.0, 0.2)
        arousal = (rms_n + zc_n) / 2.0
        audio_score = 1.0 - arousal
        fused = 0.7 * text_score + 0.3 * (audio_score * 2 - 1)
        prob_pos = 1 / (1 + np.exp(-3 * fused))
        label = "positive" if prob_pos >= 0.5 else "negative"
        return {
            "text_score": float(text_score),
            "audio_score": float(audio_score),
            "fused_score": float(fused),
            "prob_positive": float(prob_pos),
            "label": label
        }

def generate_sine_speech_like(duration_s=2.0, sr=16000, f0=200.0, noise=0.01):
    t = np.linspace(0, duration_s, int(sr*duration_s), endpoint=False)
    am = 0.5 * (1 + np.sin(2*np.pi*2*t))
    x = 0.3 * am * np.sin(2*np.pi*f0*t)
    x += noise * np.random.randn(len(t))
    return x.astype(np.float32), sr

def run_pipeline(manual_transcript: Optional[str]=None) -> Dict[str, Any]:
    x, sr = generate_sine_speech_like()
    afe = AudioFeatureExtractor()
    aout = afe.extract(x, sr)
    if manual_transcript is None:
        manual_transcript = "I am very happy with the service, it was excellent and the staff was nice."
    text = manual_transcript
    te = TextEmbedder()
    tout = te.embed(text)
    head = SentimentHead()
    pred = head.predict(aout["embedding"], tout["embedding"], aout["prosody"], text)
    result = {
        "audio_source": "synthetic",
        "audio_embedding_source": aout["embedding_source"],
        "spectrogram_shape": aout["spectrogram_shape"],
        "prosody": aout["prosody"],
        "text_embedding_source": tout.get("embedding_source", "unknown"),
        "prediction": pred,
        "transcript": text
    }
    out_path = "/mnt/data/pipeline_output.json"
    with open(out_path, "w") as f:
        json.dump(result, f, indent=2)
    print("Saved pipeline outputs to:", out_path)
    print(json.dumps(result, indent=2))

    S = aout["spectrogram"]
    plt.figure(figsize=(10,4))
    plt.imshow(S, aspect="auto", origin="lower")
    plt.colorbar(label="Log Power")
    plt.title("Spectrogram")
    plt.xlabel("Time frames")
    plt.ylabel("Frequency bins")
    plt.show()

    plt.figure(figsize=(4,4))
    plt.bar(["Negative","Positive"], [1-pred["prob_positive"], pred["prob_positive"]], color=["red","green"])
    plt.title(f"Sentiment Prediction: {pred['label'].upper()}")
    plt.ylim(0,1)
    plt.show()

    return result

outputs = run_pipeline()
