In [None]:
from transformers import SpeechT5Processor, SpeechT5ForTextToSpeech, SpeechT5HifiGan
from datasets import load_dataset
import torch
import soundfile as sf
from peft import LoraConfig, PeftModel
import os
import numpy as np
from huggingface_hub import hf_hub_download
import zipfile

# -----------------------------
# Load base models
# -----------------------------
processor = SpeechT5Processor.from_pretrained("microsoft/speecht5_tts")
base_model = SpeechT5ForTextToSpeech.from_pretrained("microsoft/speecht5_tts")
vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan")

# -----------------------------
# Function to create or load LoRA adapter
# -----------------------------
def get_tone_adapter(model, tone: str, save_dir="models", r=8, alpha=16, dropout=0.05):
    """
    Load LoRA adapter if it exists, otherwise create and save it.
    """
    tone_dir = os.path.join(save_dir, tone)
    os.makedirs(tone_dir, exist_ok=True)

    nested_dir = os.path.join(tone_dir, tone)

    if os.path.exists(os.path.join(nested_dir, "adapter_config.json")):
        print(f"🔄 Loading existing adapter for tone '{tone}' from {nested_dir}")
        peft_model = PeftModel.from_pretrained(model, nested_dir, adapter_name=tone)
    else:
        print(f"✨ Creating new adapter for tone '{tone}'...")
        config = LoraConfig(
            r=r,
            lora_alpha=alpha,
            target_modules=["q_proj", "v_proj"],
            lora_dropout=dropout,
            bias="none",
            task_type="FEATURE_EXTRACTION"
        )
        peft_model = PeftModel(model, config, adapter_name=tone)
        peft_model.save_pretrained(tone_dir, safe_serialization=True)
        print(f"✅ Adapter for tone '{tone}' saved at: {nested_dir}")

    return peft_model

# -----------------------------
# Function to load or cache speaker embedding (gender-based selection)
# -----------------------------
def get_speaker_embedding(gender: str = "female", cache_dir="speaker_embeddings"):
    """
    Load cached speaker embedding if available, otherwise fetch and cache it.
    Gender: 'male' or 'female'.
    """
    os.makedirs(cache_dir, exist_ok=True)
    cache_path = os.path.join(cache_dir, f"speaker_embedding_{gender.lower()}.npy")

    if os.path.exists(cache_path):
        print(f"🔄 Loading cached {gender} speaker embedding from {cache_path}")
        xvector = np.load(cache_path)
    else:
        print(f"✨ Fetching and caching new {gender} speaker embedding...")
        try:
            # Try the HF dataset normally
            embeddings_dataset = load_dataset("Matthijs/cmu-arctic-xvectors", split="validation")

            if gender.lower() == "female":
                sample = next(x for x in embeddings_dataset if "cmu_us_slt_" in x["filename"])
                print("🎙 Using female speaker: slt (Susan)")
            else:
                sample = next(x for x in embeddings_dataset if "cmu_us_bdl_" in x["filename"])
                print("🎙 Using male speaker: bdl")

            xvector = sample["xvector"]

        except Exception as e:
            print("⚠️ Falling back to manual download...")
            zip_path = hf_hub_download("Matthijs/cmu-arctic-xvectors", "spkrec-xvect.zip", repo_type="dataset")
            extract_dir = "cmu-arctic-xvectors"
            os.makedirs(extract_dir, exist_ok=True)
            with zipfile.ZipFile(zip_path, "r") as zip_ref:
                zip_ref.extractall(extract_dir)

            # Find all npy files
            npy_files = [os.path.join(root, f)
                         for root, _, files in os.walk(extract_dir) for f in files if f.endswith(".npy")]
            if not npy_files:
                raise RuntimeError("No .npy files found in extracted speaker embeddings.")

            # Filter by gender selection
            if gender.lower() == "female":
                female_files = [f for f in npy_files if "cmu_us_slt" in f or "cmu_us_clb" in f]
                if not female_files:
                    raise RuntimeError("No female speaker embeddings found.")
                xvector = np.load(female_files[0])
                print(f"🎙 Using female speaker from file: {os.path.basename(female_files[0])}")
            else:
                male_files = [f for f in npy_files if "cmu_us_bdl" in f or "cmu_us_rms" in f]
                if not male_files:
                    raise RuntimeError("No male speaker embeddings found.")
                xvector = np.load(male_files[0])
                print(f"🎙 Using male speaker from file: {os.path.basename(male_files[0])}")

        np.save(cache_path, xvector)
        print(f"✅ {gender.capitalize()} speaker embedding cached at {cache_path}")

    return torch.tensor(xvector).unsqueeze(0)


# -----------------------------
# Example usage
# -----------------------------
tone = "happy"
gender = "female"

# model = get_tone_adapter(base_model, tone)
inputs = processor(text=f"I am very {tone} right now!", return_tensors="pt")

speaker_embeddings = get_speaker_embedding(gender=gender)

# Generate speech with adapter
speech = base_model.generate_speech(inputs["input_ids"], speaker_embeddings, vocoder=vocoder)
sf.write(f"speech_{tone}_{gender}.wav", speech.numpy(), samplerate=16000)
print(f"🎤 Saved generated speech with tone: {tone} and gender: {gender}")


✨ Fetching and caching new female speaker embedding...
⚠️ Falling back to manual download...
🎙 Using female speaker from file: cmu_us_clb_arctic-wav-arctic_a0001.npy
✅ Female speaker embedding cached at speaker_embeddings\speaker_embedding_female.npy
🎤 Saved generated speech with tone: happy and gender: female


In [None]:
import os
import json
import torch
import soundfile as sf
from torch.utils.data import DataLoader
from datasets import Dataset
from transformers import SpeechT5Processor, SpeechT5ForTextToSpeech, SpeechT5HifiGan
from peft import PeftModel, LoraConfig
from huggingface_hub import hf_hub_download
import numpy as np
import zipfile

# -----------------------------
# Load base SpeechT5 models
# -----------------------------
processor = SpeechT5Processor.from_pretrained("microsoft/speecht5_tts")
base_model = SpeechT5ForTextToSpeech.from_pretrained("microsoft/speecht5_tts")
vocoder = SpeechT5HifiGan.from_pretrained("microsoft/speecht5_hifigan")


# -----------------------------
# Create/load LoRA adapter
# -----------------------------
def get_tone_adapter(model, tone: str, save_dir="models", r=8, alpha=16, dropout=0.05):
    """
    Load LoRA adapter if it exists, otherwise create and save it.
    """
    # top-level directory
    tone_dir = os.path.join(save_dir, tone)
    os.makedirs(tone_dir, exist_ok=True)

    # nested path where PEFT actually saves (tone_dir/tone/adapter_config.json)
    nested_dir = os.path.join(tone_dir, tone)

    if os.path.exists(os.path.join(nested_dir, "adapter_config.json")):
        print(f"🔄 Loading existing adapter for tone '{tone}' from {nested_dir}")
        peft_model = PeftModel.from_pretrained(model, nested_dir, adapter_name=tone)
    else:
        print(f"✨ Creating new adapter for tone '{tone}'...")
        config = LoraConfig(
            r=r,
            lora_alpha=alpha,
            target_modules=["q_proj", "v_proj"],
            lora_dropout=dropout,
            bias="none",
            task_type="FEATURE_EXTRACTION"
        )
        peft_model = PeftModel(model, config, adapter_name=tone)
        peft_model.save_pretrained(tone_dir, safe_serialization=True)  # PEFT will create tone_dir/tone/
        print(f"✅ Adapter for tone '{tone}' saved at: {nested_dir}")

    return peft_model

# -----------------------------
# Speaker embedding loader
# -----------------------------
def get_speaker_embedding(gender: str = "female", cache_dir="speaker_embeddings"):
    """
    Load cached speaker embedding if available, otherwise fetch and cache it.
    Gender: 'male' or 'female'
    """
    os.makedirs(cache_dir, exist_ok=True)
    cache_path = os.path.join(cache_dir, f"speaker_embedding_{gender.lower()}.npy")

    if os.path.exists(cache_path):
        print(f"🔄 Loading cached {gender} speaker embedding from {cache_path}")
        xvector = np.load(cache_path)
    else:
        print(f"✨ Fetching new {gender} speaker embedding...")
        try:
            from datasets import load_dataset
            embeddings_dataset = load_dataset("Matthijs/cmu-arctic-xvectors", split="validation")

            if gender.lower() == "female":
                sample = next(x for x in embeddings_dataset if "cmu_us_slt_" in x["filename"])
                print("🎙 Using female speaker: slt (Susan)")
            else:
                sample = next(x for x in embeddings_dataset if "cmu_us_bdl_" in x["filename"])
                print("🎙 Using male speaker: bdl")

            xvector = sample["xvector"]

        except Exception:
            print("⚠️ Falling back to manual download...")
            zip_path = hf_hub_download("Matthijs/cmu-arctic-xvectors", "spkrec-xvect.zip", repo_type="dataset")
            extract_dir = "cmu-arctic-xvectors"
            os.makedirs(extract_dir, exist_ok=True)
            with zipfile.ZipFile(zip_path, "r") as zip_ref:
                zip_ref.extractall(extract_dir)

            npy_files = [os.path.join(root, f)
                         for root, _, files in os.walk(extract_dir) for f in files if f.endswith(".npy")]
            if not npy_files:
                raise RuntimeError("No .npy files found in speaker embeddings.")

            if gender.lower() == "female":
                female_files = [f for f in npy_files if "cmu_us_slt" in f or "cmu_us_clb" in f]
                if not female_files:
                    raise RuntimeError("No female speaker embeddings found.")
                xvector = np.load(female_files[0])
                print(f"🎙 Using female speaker from file: {os.path.basename(female_files[0])}")
            else:
                male_files = [f for f in npy_files if "cmu_us_bdl" in f or "cmu_us_rms" in f]
                if not male_files:
                    raise RuntimeError("No male speaker embeddings found.")
                xvector = np.load(male_files[0])
                print(f"🎙 Using male speaker from file: {os.path.basename(male_files[0])}")

        np.save(cache_path, xvector)
        print(f"✅ {gender.capitalize()} speaker embedding cached at {cache_path}")

    return torch.tensor(xvector).unsqueeze(0)


# -----------------------------
# Adapter weight updater
# -----------------------------
def update_emotion_adapters(model, dataset: list, save_dir: str):
    texts = [item["text"] for item in dataset]
    emotions = [item["major_emotion"] for item in dataset]

    # ✅ Convert features back into torch tensors
    features_list = [
        torch.tensor(item["features"], dtype=torch.float32)
        if isinstance(item["features"], (list, tuple)) else item["features"]
        for item in dataset
    ]

    ds = Dataset.from_dict({"text": texts, "emotion": emotions})
    dataloader = DataLoader(ds, batch_size=1)

    for i, batch in enumerate(dataloader):
        emotion = batch["emotion"][0]
        features = features_list[i]

        adapter_dir = os.path.join(save_dir, emotion, emotion)
        model_peft = PeftModel.from_pretrained(model, adapter_dir, adapter_name=emotion)

        # ✅ Unfreeze LoRA parameters
        for name, param in model_peft.named_parameters():
            if "lora" in name:
                param.requires_grad = True

        print(f"🔄 Updating adapter for [{emotion}]...")

        for name, param in model_peft.named_parameters():
            if "lora" in name and param.requires_grad:
                before = param.data.clone().detach().cpu()
                param.data += 0.01 * features.mean()
                after = param.data.clone().detach().cpu()

                print(f"➡️ {name}: Δmean={after.mean() - before.mean():.8f}")

        # Save updated adapter
        model_peft.save_pretrained(adapter_dir, safe_serialization=True)

# -----------------------------
# Main Example
# -----------------------------
if __name__ == "__main__":
    # Load dataset and convert features back
    dataset_path = "results.json"   # <-- put your dataset JSON file here
    with open(dataset_path, "r") as f:
        dataset = json.load(f)

    dataset = dataset[:10]

    # (Already handled above inside update_emotion_adapters)
    save_dir = "models"

    # Create adapters first (one per emotion)
    unique_emotions = set(item["major_emotion"] for item in dataset)
    for emotion in unique_emotions:
        _ = get_tone_adapter(base_model, emotion, save_dir=save_dir)

    # Update weights with features
    update_emotion_adapters(base_model, dataset, save_dir=save_dir)


✨ Creating new adapter for tone 'frustrated'...
✅ Adapter for tone 'frustrated' saved at: models\frustrated\frustrated
✨ Creating new adapter for tone 'angry'...
✅ Adapter for tone 'angry' saved at: models\angry\angry
🔄 Updating adapter for [frustrated]...
➡️ base_model.model.speecht5.encoder.wrapped_encoder.layers.0.attention.v_proj.lora_A.frustrated.weight: Δmean=0.00128259
➡️ base_model.model.speecht5.encoder.wrapped_encoder.layers.0.attention.v_proj.lora_A.angry.weight: Δmean=0.00128259
➡️ base_model.model.speecht5.encoder.wrapped_encoder.layers.0.attention.v_proj.lora_B.frustrated.weight: Δmean=0.00128259
➡️ base_model.model.speecht5.encoder.wrapped_encoder.layers.0.attention.v_proj.lora_B.angry.weight: Δmean=0.00128259
➡️ base_model.model.speecht5.encoder.wrapped_encoder.layers.0.attention.q_proj.lora_A.frustrated.weight: Δmean=0.00128259
➡️ base_model.model.speecht5.encoder.wrapped_encoder.layers.0.attention.q_proj.lora_A.angry.weight: Δmean=0.00128259
➡️ base_model.model.speech

In [None]:
import os
import json
import soundfile as sf

# ✅ Base directory
base_dir = "audios"
os.makedirs(base_dir, exist_ok=True)

# ✅ Subdirectory for baseline
baseline_dir = os.path.join(base_dir, "baseline")
os.makedirs(baseline_dir, exist_ok=True)

# ✅ Iterate over dataset
for idx, sample in enumerate(dataset[:3]):  # limit to 3 for demo
    emotion = sample["major_emotion"]
    text = sample["text"]

    # Inputs
    inputs = processor(text=text, return_tensors="pt")
    speaker_embeddings = get_speaker_embedding(gender='female')

    # -----------------------------
    # 1) 🔊 Baseline (no adapter)
    # -----------------------------
    print(f"[{idx}] 🎤 Generating baseline speech (no adapter)...")
    speech_baseline = base_model.generate_speech(
        inputs["input_ids"],
        speaker_embeddings,
        vocoder=vocoder
    )

    out_path_base = os.path.join(baseline_dir, f"speech_baseline_{idx}.wav")
    sf.write(out_path_base, speech_baseline.numpy(), samplerate=16000)
    print(f"✅ Saved baseline speech at {out_path_base}")

    # Store metrics for baseline
    baseline_metrics = {
        "id": idx,
        "text": text,
        "emotion": "neutral",
        "params": {
            "samplerate": 16000,
            "length": len(speech_baseline.numpy())
        }
    }
    with open(os.path.join(baseline_dir, f"metrics_{idx}.json"), "w") as f:
        json.dump(baseline_metrics, f, indent=4)

    # -----------------------------
    # 2) 🔊 With adapter
    # -----------------------------
    print(f"[{idx}] 🎤 Generating speech with adapter for [{emotion}]...")

    # Emotion-specific directory
    adapter_dir = os.path.join(base_dir, emotion)
    os.makedirs(adapter_dir, exist_ok=True)

    model_with_adapter = get_tone_adapter(base_model, emotion, save_dir=save_dir)
    speech_adapter = model_with_adapter.generate_speech(
        inputs["input_ids"],
        speaker_embeddings,
        vocoder=vocoder
    )

    out_path_adapter = os.path.join(adapter_dir, f"speech_{emotion}_{idx}.wav")
    sf.write(out_path_adapter, speech_adapter.numpy(), samplerate=16000)
    print(f"✅ Saved adapter speech with emotion [{emotion}] at {out_path_adapter}")

    # Store metrics for adapter
    adapter_metrics = {
        "id": idx,
        "text": text,
        "emotion": emotion,
        "params": {
            "samplerate": 16000,
            "length": len(speech_adapter.numpy())
        }
    }
    with open(os.path.join(adapter_dir, f"metrics_{idx}.json"), "w") as f:
        json.dump(adapter_metrics, f, indent=4)


[0] 🎤 Generating baseline speech (no adapter)...
✅ Saved baseline speech at audios\baseline\speech_baseline_0.wav
[0] 🎤 Generating speech with adapter for [frustrated]...
🔄 Loading existing adapter for tone 'frustrated' from models\frustrated\frustrated
✅ Saved adapter speech with emotion [frustrated] at audios\frustrated\speech_frustrated_0.wav
[1] 🎤 Generating baseline speech (no adapter)...
✅ Saved baseline speech at audios\baseline\speech_baseline_1.wav
[1] 🎤 Generating speech with adapter for [frustrated]...
🔄 Loading existing adapter for tone 'frustrated' from models\frustrated\frustrated
✅ Saved adapter speech with emotion [frustrated] at audios\frustrated\speech_frustrated_1.wav
[2] 🎤 Generating baseline speech (no adapter)...
✅ Saved baseline speech at audios\baseline\speech_baseline_2.wav
[2] 🎤 Generating speech with adapter for [frustrated]...
🔄 Loading existing adapter for tone 'frustrated' from models\frustrated\frustrated
✅ Saved adapter speech with emotion [frustrated] a

In [None]:
import numpy as np
import soundfile as sf
from Implicit import extract_prosody_features


def test_prosody_shift(sample, base_model, processor, vocoder, whisper_model, save_dir, n_trials=5):
    emotion = sample["major_emotion"]
    text = sample["text"]

    inputs = processor(text=text, return_tensors="pt")
    speaker_embeddings = get_speaker_embedding(gender='female')

    results = {"baseline": [], "adapter": []}

    # 1) Baseline (no adapter)
    for i in range(n_trials):
        out_path = f"speech_baseline_{i}.wav"
        speech = base_model.generate_speech(
            inputs["input_ids"],
            speaker_embeddings,
            vocoder=vocoder
        )
        sf.write(out_path, speech.numpy(), samplerate=16000)

        feats = extract_prosody_features(out_path, whisper_model)
        results["baseline"].append(feats)

    # 2) Adapter
    model_with_adapter = get_tone_adapter(base_model, emotion, save_dir=save_dir)
    for i in range(n_trials):
        out_path = f"speech_{emotion}_{i}.wav"
        speech = model_with_adapter.generate_speech(
            inputs["input_ids"],
            speaker_embeddings,
            vocoder=vocoder
        )
        sf.write(out_path, speech.numpy(), samplerate=16000)

        feats = extract_prosody_features(out_path, whisper_model)
        results["adapter"].append(feats)

    # Average results
    def mean_dict(list_of_dicts):
        keys = list(list_of_dicts[0].keys())
        return {k: float(np.mean([d[k] for d in list_of_dicts])) for k in keys}

    mean_baseline = mean_dict(results["baseline"])
    mean_adapter = mean_dict(results["adapter"])

    print("\n📊 Prosody comparison:")
    print(f"Emotion: {emotion}")
    print("Baseline:", mean_baseline)
    print("Adapter :", mean_adapter)

    return mean_baseline, mean_adapter


In [16]:
import whisper

whisper_model = whisper.load_model("base")
baseline_feats, adapter_feats = test_prosody_shift(
    sample,
    base_model,
    processor,
    vocoder,
    whisper_model,
    save_dir,
    n_trials=5
)


DEBUG:Implicit:extract_prosody_features called with audio_path=speech_baseline_0.wav
DEBUG:Implicit:librosa.load -> y.shape=(22016,), sr=16000
DEBUG:Implicit:parselmouth.Sound loaded successfully
DEBUG:Implicit:Raw pitch array length: 134
DEBUG:Implicit:Pitch finite count: 134
DEBUG:Implicit:Pitch after range filter (50-500 Hz) count: 59
DEBUG:Implicit:mean_pitch(raw)=112.39760373187262, std_pitch(raw)=12.180130577664011
DEBUG:Implicit:RMS array length: 44, sample values (first 5): [0.00109624 0.00447103 0.03039728 0.0408951  0.0438465 ]
DEBUG:Implicit:mean_rms(raw)=0.03467020019888878
DEBUG:Implicit:duration=1.376 seconds
DEBUG:Implicit:voice_to_text returned text length=22
DEBUG:Implicit:word_count=6, speaking_rate(raw)=4.36046511627907
DEBUG:Implicit:{'mean_pitch': np.float64(112.39760373187262), 'std_pitch': np.float64(12.180130577664011), 'mean_rms': 0.03467020019888878, 'speaking_rate': 4.36046511627907, 'mean_pitch_norm': np.float64(0.12397603731872621), 'std_pitch_norm': np.flo

🔄 Loading existing adapter for tone 'frustrated' from models\frustrated\frustrated


DEBUG:Implicit:extract_prosody_features called with audio_path=speech_frustrated_0.wav
DEBUG:Implicit:librosa.load -> y.shape=(21504,), sr=16000
DEBUG:Implicit:parselmouth.Sound loaded successfully
DEBUG:Implicit:Raw pitch array length: 131
DEBUG:Implicit:Pitch finite count: 131
DEBUG:Implicit:Pitch after range filter (50-500 Hz) count: 49
DEBUG:Implicit:mean_pitch(raw)=109.58417344353661, std_pitch(raw)=11.422154695958238
DEBUG:Implicit:RMS array length: 43, sample values (first 5): [0.00115959 0.00307664 0.03163126 0.0394759  0.04260156]
DEBUG:Implicit:mean_rms(raw)=0.03526454418897629
DEBUG:Implicit:duration=1.344 seconds
DEBUG:Implicit:voice_to_text returned text length=22
DEBUG:Implicit:word_count=6, speaking_rate(raw)=4.464285714285714
DEBUG:Implicit:{'mean_pitch': np.float64(109.58417344353661), 'std_pitch': np.float64(11.422154695958238), 'mean_rms': 0.03526454418897629, 'speaking_rate': 4.464285714285714, 'mean_pitch_norm': np.float64(0.09584173443536613), 'std_pitch_norm': np


📊 Prosody comparison:
Emotion: frustrated
Baseline: {'mean_pitch': 0.1346076577929143, 'std_pitch': 0.2874173041967934, 'mean_rms': 1.3236416205763815, 'speaking_rate': 1.1707943219571129}
Adapter : {'mean_pitch': 0.08918977065745667, 'std_pitch': 0.2189056776831105, 'mean_rms': 1.1971696726977823, 'speaking_rate': 1.1707943219571129}


In [18]:
print("Baseline Feats: ", baseline_feats)
print("Adapter Feats: ", adapter_feats)

Baseline Feats:  {'mean_pitch': 0.1346076577929143, 'std_pitch': 0.2874173041967934, 'mean_rms': 1.3236416205763815, 'speaking_rate': 1.1707943219571129}
Adapter Feats:  {'mean_pitch': 0.08918977065745667, 'std_pitch': 0.2189056776831105, 'mean_rms': 1.1971696726977823, 'speaking_rate': 1.1707943219571129}


In [19]:
import json
import pandas as pd

# -------------------------------
# Load dataset (replace with your path or JSON variable)
# -------------------------------
with open("results.json", "r") as f:
    dataset = json.load(f)

# -------------------------------
# Convert into DataFrame
# -------------------------------
rows = []
for sample in dataset:
    emotion = sample["major_emotion"]
    prosody = sample["prosody"]
    rows.append({
        "emotion": emotion,
        "mean_pitch": prosody["mean_pitch"],
        "std_pitch": prosody["std_pitch"],
        "mean_rms": prosody["mean_rms"],
        "speaking_rate": prosody["speaking_rate"]
    })

df = pd.DataFrame(rows)

# -------------------------------
# Group by emotion: mean + count
# -------------------------------
summary = df.groupby("emotion").agg(
    count=("emotion", "size"),
    mean_pitch=("mean_pitch", "mean"),
    std_pitch=("std_pitch", "mean"),
    mean_rms=("mean_rms", "mean"),
    speaking_rate=("speaking_rate", "mean")
).reset_index()

print(summary)

# Optional: save to CSV for later analysis
summary.to_csv("prosody_summary.csv", index=False)


      emotion  count  mean_pitch  std_pitch  mean_rms  speaking_rate
0       angry   1269    1.265345   1.168601  1.600285       0.281477
1  frustrated   2917    0.965802   1.020676  0.437580       0.281469
