In [1]:
"""
Fixed and consolidated audio -> split -> audio-emotion + text-emotion pipeline.

Key fixes:
- Correct, robust recording loop using sounddevice + pynput.
- Save chunks to "chunks/" folder and check paths before using them.
- Use whisper.transcribe(file_path) instead of passing raw numpy array.
- Replace deprecated return_all_scores with top_k=None in transformers pipeline.
- Robust handling if silence splitting creates zero chunks (falls back to whole file).
- Clean imports and helpful debug prints.
"""

import os
import threading
import numpy as np
import sounddevice as sd
import soundfile as sf
import librosa
from pydub import AudioSegment, silence
import matplotlib.pyplot as plt

import torch
import torch.nn.functional as F
from transformers import pipeline, AutoModelForAudioClassification, AutoFeatureExtractor
import whisper

# Optional: speechbrain import if you need it elsewhere (kept from your original file)
# from speechbrain.inference import EncoderClassifier

# ---------------- Config ----------------
FILENAME = "neutral_input.wav"
SAMPLERATE = 16000
CHUNKS_DIR = "chunks"
MIN_SILENCE_LEN = 500     # ms
SILENCE_THRESH_DELTA = 10 # dB below audio dBFS
KEEP_SILENCE_MS = 150     # ms to keep at edges

# ---------------- Recording utilities ----------------
recording_started = threading.Event()
recording_stopped = threading.Event()
_recorded_frames = []  # store incoming frames (per-callback)

def _audio_callback(indata, frames, time, status):
    # indata is shape (frames, channels)
    if recording_started.is_set() and not recording_stopped.is_set():
        # copy to avoid referencing the same buffer
        _recorded_frames.append(indata.copy())

def start_recording(filename=FILENAME, samplerate=SAMPLERATE):
    """
    Start recording using sounddevice. Press 's' to start and 'q' to stop (via pynput keyboard).
    Saves WAV to `filename`.
    """
    from pynput import keyboard

    # reset state
    _recorded_frames.clear()
    recording_started.clear()
    recording_stopped.clear()

    print("🎙️ Press 's' to start recording, and 'q' to stop.")

    def on_press(key):
        try:
            if key.char == 's' and not recording_started.is_set():
                print("🔴 Recording started...")
                recording_started.set()
            elif key.char == 'q' and recording_started.is_set():
                print("🛑 Recording stopped.")
                recording_stopped.set()
                # don't return False here because we are using listener.stop() below
        except AttributeError:
            # special keys ignored
            pass

    listener = keyboard.Listener(on_press=on_press)
    listener.start()

    # Start audio stream and wait until stopped
    try:
        with sd.InputStream(callback=_audio_callback, samplerate=samplerate, channels=1):
            # wait loop (non-busy)
            while not recording_stopped.is_set():
                sd.sleep(100)
    finally:
        listener.stop()

    if len(_recorded_frames) == 0:
        raise RuntimeError("No audio recorded. Did you press 's' to start?")

    # Concatenate frames, flatten channel dimension
    audio_np = np.concatenate(_recorded_frames, axis=0)
    # audio_np shape: (n_samples, 1) -> flatten to (n_samples,)
    if audio_np.ndim > 1:
        audio_np = audio_np.reshape(-1)

    # soundfile expects float32 or int16 etc.
    sf.write(filename, audio_np, samplerate, subtype='PCM_16')
    print(f"✅ Audio saved to {filename} (samples: {audio_np.shape[0]}, sr: {samplerate})")
    return filename

# ---------------- pydub helpers ----------------
def float_to_audiosegment(audio: np.ndarray, sr: int):
    """
    Convert float numpy array in [-1,1] to pydub.AudioSegment (16-bit PCM mono).
    """
    # ensure float32
    audio_f = audio.astype(np.float32)
    # clip
    audio_f = np.clip(audio_f, -1.0, 1.0)
    # convert to int16
    audio_int16 = (audio_f * 32767).astype(np.int16)
    return AudioSegment(
        audio_int16.tobytes(),
        frame_rate=sr,
        sample_width=2,
        channels=1
    )

def split_audio_on_silence_from_file(filepath, min_silence_len=MIN_SILENCE_LEN, silence_thresh_delta=SILENCE_THRESH_DELTA, keep_silence=KEEP_SILENCE_MS):
    """
    Load WAV file, convert to pydub segment and split on silence.
    Returns list of AudioSegment chunks (may be empty).
    """
    segment = AudioSegment.from_file(filepath, format="wav")
    # dynamic threshold relative to file loudness:
    silence_thresh = segment.dBFS - silence_thresh_delta
    chunks = silence.split_on_silence(
        segment,
        min_silence_len=min_silence_len,
        silence_thresh=silence_thresh,
        keep_silence=keep_silence
    )
    print(f"🔪 Split into {len(chunks)} chunks (silence_thresh={silence_thresh:.1f} dBFS)")
    return chunks

def save_chunks(chunks, out_dir=CHUNKS_DIR, prefix="chunk"):
    os.makedirs(out_dir, exist_ok=True)
    paths = []
    for i, chunk in enumerate(chunks):
        filename = os.path.join(out_dir, f"{prefix}_{i}.wav")
        chunk.export(filename, format="wav")
        paths.append(filename)
    return paths

# ---------------- Visualization (optional) ----------------
def plot_waveform(filepath, sr=SAMPLERATE, figsize=(14,4), ytick_step=0.1, save_path=None):
    audio, _ = librosa.load(filepath, sr=sr, mono=True)
    duration = len(audio) / sr
    time = np.linspace(0., duration, len(audio))
    plt.figure(figsize=figsize)
    plt.plot(time, audio)
    plt.title(f"Waveform: {os.path.basename(filepath)}")
    plt.xlabel("Time (s)")
    plt.ylabel("Amplitude")
    plt.yticks(np.arange(-1.0, 1.1, ytick_step))
    plt.grid(True)
    plt.tight_layout()
    
    # New save logic
    if save_path:
        plt.savefig(save_path)
        print(f"📊 Waveform plot saved to {save_path}")
        plt.close() # Frees up memory
    else:
        try:
            plt.show() # Keep original behavior as a fallback
        except UserWarning:
            pass # Ignore the "non-interactive" warning

# ---------------- Model / Inference Setup ----------------
print("🔁 Loading models (this may take a while)...")
# Audio emotion classifier (Hugging Face)
AUDIO_MODEL_ID = "firdhokk/speech-emotion-recognition-with-openai-whisper-large-v3"
audio_model = AutoModelForAudioClassification.from_pretrained(AUDIO_MODEL_ID)
audio_feature_extractor = AutoFeatureExtractor.from_pretrained(AUDIO_MODEL_ID)
audio_id2label = audio_model.config.id2label

def preprocess_audio_for_model(audio_path, feature_extractor, max_duration=30.0):
    # librosa loads float32 in [-1,1]
    audio_array, _ = librosa.load(audio_path, sr=feature_extractor.sampling_rate, mono=True)
    max_len = int(feature_extractor.sampling_rate * max_duration)
    if len(audio_array) > max_len:
        audio_array = audio_array[:max_len]
    else:
        audio_array = np.pad(audio_array, (0, max_len - len(audio_array)))
    inputs = feature_extractor(audio_array, sampling_rate=feature_extractor.sampling_rate, return_tensors="pt")
    return inputs

def predict_audio_emotion(audio_path, model, feature_extractor, id2label):
    inputs = preprocess_audio_for_model(audio_path, feature_extractor)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    inputs = {k: v.to(device) for k, v in inputs.items()}
    with torch.no_grad():
        outputs = model(**inputs)
    probs = F.softmax(outputs.logits, dim=-1)
    confidence, pred_id = torch.max(probs, dim=-1)
    label = id2label[pred_id.item()]
    return label, float(confidence.item())

# Whisper (for transcription)
whisper_model = whisper.load_model("base")  # uses CPU if no GPU

# Text emotion pipeline: use top_k=None to return all scores (replacement for return_all_scores)
text_emotion_pipeline = pipeline(
    "text-classification",
    model="j-hartmann/emotion-english-distilroberta-base",
    top_k=None,   # gives all scores
    framework="pt"
)

def predict_text_emotion(text, pipeline_obj=text_emotion_pipeline):
    outputs = pipeline_obj(text)[0]  # returns list of dicts
    # ensure sorted by score
    outputs_sorted = sorted(outputs, key=lambda x: x['score'], reverse=True)
    top = outputs_sorted[0]
    return top['label'], float(top['score'])

# ---------------- Main flow ----------------
def process_audio_file(audio_file=FILENAME):
    # 1) Optionally visualize
    try:
        plot_waveform(audio_file, sr=SAMPLERATE, figsize=(12,3), ytick_step=0.2, save_path="input_waveform.png")
    except Exception:
        pass

    # 2) Split into chunks on silence
    chunks = split_audio_on_silence_from_file(audio_file)
    if len(chunks) == 0:
        # fallback: use entire file as single chunk
        print("⚠️ No silence-based chunks found — using the entire audio as one chunk.")
        full = AudioSegment.from_file(audio_file, format="wav")
        chunks = [full]

    # save chunks
    chunk_paths = save_chunks(chunks)
    print(f"✅ Saved {len(chunk_paths)} chunk files to '{CHUNKS_DIR}/'")

    # 3) For each chunk: audio-emotion, whisper transcription, text-emotion
    audio_emotions_list = []
    text_emotions_list = []
    transcripts_list = []

    print("\n🎧 Combined Emotion Predictions (Audio + Text):")
    for path in chunk_paths:
        try:
            # audio-based emotion
            audio_emotion, audio_conf = predict_audio_emotion(path, audio_model, audio_feature_extractor, audio_id2label)

            # whisper transcription (use file path)
            trans = whisper_model.transcribe(path, fp16=False)  # ensure fp16 off on CPU
            transcript = trans.get("text", "").strip()

            # text-based emotion
            if transcript:
                text_emotion, text_conf = predict_text_emotion(transcript)
                print(f"{os.path.basename(path)} ➤ Audio: {audio_emotion} ({audio_conf:.2f}) | Text: {text_emotion} ({text_conf:.2f})")
                print(f"📝 Transcript: \"{transcript}\"\n")
            else:
                text_emotion, text_conf = None, None
                print(f"{os.path.basename(path)} ➤ Audio: {audio_emotion} ({audio_conf:.2f}) | Text: [No speech detected]\n")

            audio_emotions_list.append(audio_emotion)
            text_emotions_list.append(text_emotion)
            transcripts_list.append(transcript)

        except Exception as e:
            print(f"{os.path.basename(path)} ➤ Error: {e}\n")
            audio_emotions_list.append(None)
            text_emotions_list.append(None)
            transcripts_list.append("")

    # 4) Return all collected data for downstream TTS
    return chunk_paths, audio_emotions_list, text_emotions_list, transcripts_list


"""if __name__ == "__main__":
    # 0) Record from mic (press 's' to start, 'q' to stop)
    try:
        # Only record if neutral_input.wav doesn't already exist or you want to re-record
        if not os.path.exists(FILENAME):
            print("No existing recording found. Starting recorder.")
            start_recording()
        else:
            print(f"Using existing file {FILENAME} (delete it to re-record).")

        # 1) Process file
        process_audio_file(FILENAME)

    except Exception as main_e:
        print("Fatal error:", main_e)"""


🔁 Loading models (this may take a while)...


Device set to use cpu


'if __name__ == "__main__":\n    # 0) Record from mic (press \'s\' to start, \'q\' to stop)\n    try:\n        # Only record if neutral_input.wav doesn\'t already exist or you want to re-record\n        if not os.path.exists(FILENAME):\n            print("No existing recording found. Starting recorder.")\n            start_recording()\n        else:\n            print(f"Using existing file {FILENAME} (delete it to re-record).")\n\n        # 1) Process file\n        process_audio_file(FILENAME)\n\n    except Exception as main_e:\n        print("Fatal error:", main_e)'

In [5]:
# ---------------- Coqui TTS setup ----------------
from TTS.api import TTS
from pydub import AudioSegment

# Use a voice cloning / multi-speaker model
TTS_MODEL_NAME = "tts_models/multilingual/multi-dataset/your_tts"  # replace with your chosen TTS model
tts = TTS(TTS_MODEL_NAME)

SYNTH_DIR = "synthesized"
FINAL_OUTPUT = "final_output.wav"
os.makedirs(SYNTH_DIR, exist_ok=True)

# ---------------- Synthesize one chunk ----------------
def synthesize_chunk(text, reference_wav, emotion, output_path):
    """
    text: string to synthesize
    reference_wav: path to your neutral input (voice reference for cloning)
    emotion: string, e.g., "happy", "sad" (depends on TTS model support)
    output_path: path to save the chunk
    """
    # style/emotion support depends on model
    tts.tts_to_file(
        text=text,
        speaker_wav=reference_wav,
        style=emotion,
        language="en",  # set language if needed
        file_path=output_path
    )
    print(f"✅ Synthesized chunk saved to {output_path}")

# ---------------- Combine all chunks ----------------
def combine_chunks(synth_dir=SYNTH_DIR, output_path=FINAL_OUTPUT):
    combined = None
    files = sorted(os.listdir(synth_dir))
    for file_name in files:
        if not file_name.endswith(".wav"):
            continue
        chunk_audio = AudioSegment.from_file(os.path.join(synth_dir, file_name))
        if combined is None:
            combined = chunk_audio
        else:
            combined += chunk_audio
    if combined:
        combined.export(output_path, format="wav")
        print(f"🎉 Final combined audio saved to {output_path}")
    else:
        print("⚠️ No synthesized chunks found to combine.")

# ---------------- Full TTS + Combine Pipeline ----------------
def tts_and_combine(chunk_paths, reference_wav=FILENAME, audio_emotions=None, text_emotions=None, transcripts=None):
    """
    chunk_paths: list of chunk wav paths
    reference_wav: your neutral recording for voice cloning
    audio_emotions: list of predicted audio emotions per chunk
    text_emotions: list of predicted text emotions per chunk
    transcripts: list of transcribed texts per chunk
    """
    for idx, path in enumerate(chunk_paths):
        transcript = transcripts[idx] if transcripts and idx < len(transcripts) else ""
        if not transcript.strip():
            print(f"{path} ➤ No speech detected, skipping TTS.")
            continue

        # Decide which emotion to apply (audio > text or custom logic)
        chosen_emotion = audio_emotions[idx] if audio_emotions else "neutral"

        output_wav = os.path.join(SYNTH_DIR, f"synth_{idx}.wav")
        synthesize_chunk(transcript, reference_wav, chosen_emotion, output_wav)

    # Combine all synthesized chunks
    combine_chunks(SYNTH_DIR, FINAL_OUTPUT)

# ---------------- Example Usage ----------------
if __name__ == "__main__":
    try:
        # 0) Record from mic (press 's' to start, 'q' to stop) only if file doesn't exist
        if not os.path.exists(FILENAME):
            print("No existing recording found. Starting recorder.")
            start_recording()
        else:
            print(f"Using existing file {FILENAME} (delete it to re-record).")

        # 1) Process audio: split, transcribe, and predict emotions
        chunk_paths, audio_emotions, text_emotions, transcripts = process_audio_file(FILENAME)

        # 2) Run TTS on each chunk and combine all synthesized audio
        tts_and_combine(
            chunk_paths,
            reference_wav=FILENAME,
            audio_emotions=audio_emotions,
            text_emotions=text_emotions,
            transcripts=transcripts
        )

        print("🎉 Pipeline completed successfully!")

        # --- ADD THIS CODE BLOCK ---
        # Check if the final file exists before trying to plot it
        if os.path.exists(FINAL_OUTPUT):
            print("\n📊 Displaying Final Output Waveform:")
            # Add the save_path argument here
            plot_waveform(FINAL_OUTPUT, save_path="output_waveform.png") 
        # --- END OF ADDED CODE ---

    except Exception as e:
        print("Fatal error:", e)


No existing recording found. Starting recorder.
🎙️ Press 's' to start recording, and 'q' to stop.
🔴 Recording started...
🛑 Recording stopped.
✅ Audio saved to neutral_input.wav (samples: 164320, sr: 16000)
📊 Waveform plot saved to input_waveform.png
🔪 Split into 2 chunks (silence_thresh=-64.5 dBFS)
✅ Saved 2 chunk files to 'chunks/'

🎧 Combined Emotion Predictions (Audio + Text):
chunk_0.wav ➤ Audio: neutral (0.93) | Text: joy (0.99)
📝 Transcript: "What a beautiful day, everything feels bright for love joy and bursting with energy."

chunk_1.wav ➤ Audio: neutral (1.00) | Text: surprise (0.57)
📝 Transcript: "I can't stop smiling, life just feels amazing right now."

✅ Synthesized chunk saved to synthesized\synth_0.wav
✅ Synthesized chunk saved to synthesized\synth_1.wav
🎉 Final combined audio saved to final_output.wav
🎉 Pipeline completed successfully!

📊 Displaying Final Output Waveform:
📊 Waveform plot saved to output_waveform.png
