# Libraries and Imports

In [22]:
%pip install pydub


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m24.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [23]:
import sys
import os
import subprocess
import torch
import torch.nn.functional as F
import json
import numpy as np
import pandas as pd
import random
import shutil

# adding the directory containing asr/asr_pipeline.py to the Python path
sys.path.append(os.path.abspath("../asr"))
#add prosody path to the python path
sys.path.append(os.path.abspath("../prosody/utils"))

sys.path.append(os.path.abspath("../prosody/training_scripts"))

In [24]:
from pydub import AudioSegment
from pydub.silence import split_on_silence, detect_silence
from asr_pipeline import asr_infer_pipeline
from prosody_feature_extraction import extract_prosody_features
from prosody_bilstm_features_only import Encoder, Decoder, Seq2Seq

# ASR
Section to load audio from disk and get transcript

In [25]:
def convert_to_wav(input_file: str, output_file: str) -> str:
    """
    Converts an audio file to .wav format.

    Args:
        input_file (str): The path to the input audio file.
        output_file (str): The path to the output .wav file.

    Returns:
        str: The path to the converted .wav file.
    """
    # Load the audio file
    audio = AudioSegment.from_file(input_file)

    # Export as .wav
    audio.export(output_file, format="wav")

    return output_file

In [26]:
def strip_silence_from_audio(audio_path, output_path, silence_thresh=-40, min_silence_len=10, keep_silence=0):
    # Load the audio file
    audio = AudioSegment.from_file(audio_path)

    # Split audio on silence
    chunks = split_on_silence(
        audio,
        min_silence_len=min_silence_len,
        silence_thresh=silence_thresh,
        keep_silence=keep_silence
    )

    # Concatenate the chunks back together
    stripped_audio = AudioSegment.empty()
    for chunk in chunks:
        stripped_audio += chunk

    # Export the new audio file without silence
    stripped_audio.export(output_path, format="wav")
    print(f"Saved stripped audio: {output_path}")

Covert audio file to wav format

# Create TextGrid File

In [27]:
def call_aeneas_dynamic(audio_file_path: str, text_transcript: str, temp_dir: str) -> str:
    """
    Call Aeneas to generate a TextGrid file for the given audio file and transcript.

    Args:
        audio_file_path (str): The path to the audio file.
        text_transcript (str): The transcription text.
        temp_dir (str): Directory for storing temporary files.

    Returns:
        str: The relative path to the generated TextGrid file.

    Raises:
        subprocess.CalledProcessError: If the command to run Aeneas fails.
    """
    # Define directories based on the current working directory
    current_dir = os.getcwd()
    PROJECT_DIR = os.path.abspath(os.path.join(current_dir, '..'))
    AENEAS_DIR = os.path.join(PROJECT_DIR, 'aeneas')
    TEMP_TEXT_FILE_PATH = os.path.join(temp_dir, 'temp_transcription.txt')
    TEXT_GRID_DIR = os.path.join(temp_dir, 'text_grid_files')  # Create a text_grid_files folder in the temp directory

    # Ensure the output directory exists
    if not os.path.exists(TEXT_GRID_DIR):
        os.makedirs(TEXT_GRID_DIR)
        print(f"Created directory {TEXT_GRID_DIR}")

    # Write the transcription text to the temp file
    with open(TEMP_TEXT_FILE_PATH, 'w', encoding='utf-8') as temp_text_file:
        for word in text_transcript.split():
            temp_text_file.write(word + '\n')

    # Define the output file path
    output_file_path = os.path.join(TEXT_GRID_DIR, f"{os.path.splitext(os.path.basename(audio_file_path))[0]}.TextGrid")

    try:
        # Change to the aeneas directory
        os.chdir(AENEAS_DIR)

        # Define the command to run aeneas
        command = [
            'python3', '-m', 'aeneas.tools.execute_task',
            audio_file_path,
            TEMP_TEXT_FILE_PATH,
            'task_language=eng|is_text_type=plain|os_task_file_format=aud',
            output_file_path
        ]

        # Run the command
        subprocess.run(command, check=True)
        print(f"Generated TextGrid for {audio_file_path} to {output_file_path}")
    except subprocess.CalledProcessError as e:
        print(f"Failed to generate TextGrid for {audio_file_path}: {e}")
    finally:
        # Return to the original directory
        os.chdir(current_dir)
        # Clean up the temporary text file
        if os.path.exists(TEMP_TEXT_FILE_PATH):
            os.remove(TEMP_TEXT_FILE_PATH)

    return os.path.relpath(output_file_path, start=current_dir)

# Create Audio Slices

In [28]:
def create_audio_slices(audio_path, textgrid_path, output_folder):
    # Load the audio file
    audio = AudioSegment.from_file(audio_path)
    print(audio)

    # Load the TextGrid file and read its contents
    with open(textgrid_path, 'r') as f:
        file_contents = f.readlines()

    # Iterate through each line in the TextGrid file
    for index, line in enumerate(file_contents):
        # Split the line into start time, end time, and word
        start_time_str, end_time_str, word = line.strip().split('\t')
        start_time = float(start_time_str)
        end_time = float(end_time_str)

        # Skip empty words
        if not word:
            continue

        # Create a buffer window around the word, except for the first word
        if index != 0:
            start_time = max(0, start_time - 0.010)  # Subtract 5 milliseconds
        end_time = end_time + 0.035  # Add 35 milliseconds

        # Create a slice of the original audio
        start_ms = int(start_time * 1000)  # Convert to milliseconds
        end_ms = int(end_time * 1000)  # Convert to milliseconds
        audio_slice = audio[start_ms:end_ms]

        # Construct the filename
        filename = f"{os.path.basename(audio_path).split('.')[0]}_{word}_{index}_{start_time:.3f}_{end_time:.3f}.wav"
        output_path = os.path.join(output_folder, filename)

        # Export the slice
        audio_slice.export(output_path, format="wav")
        print(f"Saved: {output_path}")

## Create Json Output

In [29]:
def prepare_features_json(input_folder, output_json_path):
    """
    Processes audio slices to extract prosodic features and save them in a JSON file.

    Args:
        input_folder (str): Folder containing sliced audio files.
        output_json_path (str): Path to save the JSON file.
    """
    # Create a dictionary to store the results
    results = {}

    # Iterate through each file in the input folder
    for filename in os.listdir(input_folder):
        if filename.endswith(".wav"):
            # Construct the full file path
            audio_path = os.path.join(input_folder, filename)

            # Extract the root name, word, and position from the filename
            parts = filename.split('_')
            #remove the first index in parts
            parts.pop(0)

            root_name = parts[0]
            word = parts[1]
            position = int(parts[2])
            start_time = float(parts[3])
            end_time = float(parts[4].replace('.wav', ''))

            # Extract prosodic features
            features = extract_prosody_features(audio_path)
            features[torch.isnan(features)] = 0

            # Convert the tensor to a list
            features_list = features.tolist()

            # get current directory
            current_dir = os.getcwd()

            # Prepare data structure for the file
            if root_name not in results:
                results[root_name] = {
                    "filepath": os.path.join(current_dir, root_name + ".wav"),
                    "words": [],
                    "positions": [],
                    "features": []
                }

            # Append the word, position, and features to the respective lists
            results[root_name]["words"].append((position, word))
            results[root_name]["positions"].append((position, position))
            results[root_name]["features"].append((position, features_list))

    # Sort the words, positions, and features by their positions
    for root_name, data in results.items():
        data["words"] = [word for position, word in sorted(data["words"])]
        data["positions"] = [position for position, _ in sorted(data["positions"])]
        data["features"] = [features for position, features in sorted(data["features"])]

    # Write the results to a JSON file
    with open(output_json_path, 'w') as json_file:
        json.dump(results, json_file, indent=4)
    print(f"Saved results to {output_json_path}")

# Infer with Trained Model

In [30]:
def load_model(model_path, feature_dim, hidden_dim=128, output_dim=4, num_layers=2, dropout=0.418213662555253, num_attention_layers=2, device="cpu"):
    """
    Load the trained model from a specified file.

    Args:
        model_path (str): Path to the model file.
        feature_dim (int): Dimension of the input features.
        hidden_dim (int): Dimension of the hidden state in LSTM.
        output_dim (int): Dimension of the output layer.
        num_layers (int): Number of LSTM layers.
        dropout (float): Dropout rate.
        num_attention_layers (int): Number of attention layers.
        device (torch.device): Device to load the model onto (CPU or GPU).

    Returns:
        nn.Module: Loaded Seq2Seq model.
    """
    encoder = Encoder(feature_dim, hidden_dim, num_layers, dropout).to(device)
    decoder = Decoder(hidden_dim, output_dim, num_layers, dropout, num_attention_layers).to(device)
    model = Seq2Seq(encoder, decoder).to(device)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.eval()
    return model

In [31]:
def predict_labels(json_path, model, device, num_classes=2):
    """
    Perform inference on a JSON file to predict labels for words.

    Args:
        json_path (str): Path to the input JSON file.
        model (nn.Module): Trained model for prediction.
        device (torch.device): Device to perform inference on (CPU or GPU).
        num_classes (int): Number of classes in the classification task.

    Returns:
        dict: Dictionary with word predictions.
    """
    # Load data from JSON
    with open(json_path, 'r') as file:
        data = json.load(file)

    results = {}
    for key, entry in data.items():
        words = entry['words']
        features = torch.tensor(entry['features'], dtype=torch.float32).unsqueeze(0).to(device)
        lengths = torch.tensor([len(f) for f in features])

        # Perform inference
        with torch.no_grad():
            output = model(features,lengths)
            if num_classes > 2:
                print(f'Here and num_classes is {num_classes}')
                # Apply softmax to get probabilities for multi-class classification
                preds = F.softmax(output.squeeze(0), dim=-1).cpu().numpy()

                print(f'preds: {preds}')
                
                preds = preds.argmax(axis=-1)
            else:
                # Apply sigmoid for binary classification
                preds = (output.squeeze(0) > 0.4).cpu().numpy()

        # Map predictions to words
        word_predictions = {
            word: int(pred) if num_classes > 2 else int(pred[0])
            for word, pred in zip(words, preds)
        }
        results[key] = word_predictions

    return results


In [32]:
def save_predictions(json_path, predictions, output_path, transcript):
    """
    Save the predictions into a JSON file.

    Args:
        json_path (str): Path to the original JSON file.
        predictions (dict): Dictionary of predictions.
        output_path (str): Path to save the new JSON with predictions.
        transcript (str): The transcription of the text.
    """
    # Load original data to preserve structure
    with open(json_path, 'r') as file:
        data = json.load(file)

    # Prepare the final output with filename, transcript, words, and predictions
    final_output = {}

    for key, entry in data.items():
        final_output[key] = {
            'filename': key+"wav",  # Assuming the key is the filename
            'transcription': transcript,
            'words': entry['words'],
            'labels': predictions[key]
        }

    # Save updated data
    with open(output_path, 'w') as file:
        json.dump(final_output, file, indent=4)


In [33]:
def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)  # if you are using multi-GPU.
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

In [34]:
def process_audio_to_prosody_labels(audio_file_path, model_path, device='cpu'):
    """
    Processes an audio file to return text transcript and prosody labels.

    Args:
        audio_file_path (str): Path to the input audio file.
        model_path (str): Path to the trained model for prosody label prediction.
        device (str): Device to perform inference on (CPU or GPU).

    Returns:
        tuple: A tuple containing the text transcript and a dictionary of prosody labels.
    """
    # Use the current working directory as the base directory
    current_dir = os.getcwd()

    # Create a temporary directory inside the current working directory
    temp_dir = os.path.join(current_dir, 'temp')
    os.makedirs(temp_dir, exist_ok=True)

    try:
        # Step 1: Convert to WAV format if needed
        if not audio_file_path.endswith('.wav'):
            wav_path = os.path.join(temp_dir, os.path.basename(audio_file_path).replace('.m4a', '.wav'))
            convert_to_wav(audio_file_path, wav_path)
        else:
            wav_path = audio_file_path

        # Step 2: Strip silence from the audio
        stripped_audio_path = os.path.join(temp_dir, 'stripped_' + os.path.basename(wav_path))
        strip_silence_from_audio(wav_path, stripped_audio_path)

        # Step 3: Transcribe audio using ASR pipeline
        transcript = asr_infer_pipeline(stripped_audio_path).strip()

        # Step 4: Generate TextGrid using Aeneas
        textgrid_path = call_aeneas_dynamic(stripped_audio_path, transcript, temp_dir)

        # Step 5: Create audio slices from TextGrid
        audio_slices_dir = os.path.join(temp_dir, 'audio_slices')
        os.makedirs(audio_slices_dir, exist_ok=True)
        create_audio_slices(stripped_audio_path, textgrid_path, audio_slices_dir)

        # Step 6: Extract prosodic features and prepare JSON
        features_json_path = os.path.join(temp_dir, 'features.json')
        prepare_features_json(audio_slices_dir, features_json_path)

        # Step 7: Load the trained model
        with open(features_json_path, 'r') as file:
            sample_data = json.load(file)
        feature_dim = len(next(iter(sample_data.values()))['features'][0])

        model = load_model(model_path, feature_dim, device=device)

        # Step 8: Predict prosody labels
        predictions = predict_labels(features_json_path, model, device, num_classes=4)

    finally:
        # Clean up the temporary directory
        shutil.rmtree(temp_dir)
        # pass

    return transcript, predictions

In [35]:
if __name__ == "__main__":
    # Set seed for reproducibility
    seed = 42
    set_seed(seed)

    # Parameters
    audio_file_path = '../ltl/test_samples/test05.m4a'
    model_path = '../prosody/models/best-model-ambiguous_instructions-prosody_multiclass.pt'

    # Set device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Device: {device}")

    # Process audio and get results
    transcript, prosody_labels = process_audio_to_prosody_labels(audio_file_path, model_path, device=device)

    # Output results
    print("Transcript:", transcript)
    print("Prosody Labels:", prosody_labels)

Device: cpu
Saved stripped audio: /Users/kayems/Documents/GitHub/IUDvPP/ltl/temp/stripped_test05.wav
Created directory /Users/kayems/Documents/GitHub/IUDvPP/ltl/temp/text_grid_files
[INFO] Validating config string (specify --skip-validator to bypass)...
[INFO] Validating config string... done
[INFO] Creating task...
[INFO] Creating task... done
[INFO] Executing task...
[INFO] Executing task... done
[INFO] Creating output sync map file...
[INFO] Creating output sync map file... done
[92m[INFO] Created file '/Users/kayems/Documents/GitHub/IUDvPP/ltl/temp/text_grid_files/stripped_test05.TextGrid'[0m
Generated TextGrid for /Users/kayems/Documents/GitHub/IUDvPP/ltl/temp/stripped_test05.wav to /Users/kayems/Documents/GitHub/IUDvPP/ltl/temp/text_grid_files/stripped_test05.TextGrid
<pydub.audio_segment.AudioSegment object at 0x32f541610>
Saved: /Users/kayems/Documents/GitHub/IUDvPP/ltl/temp/audio_slices/stripped_test05_Fill_0_0.000_0.355.wav
Saved: /Users/kayems/Documents/GitHub/IUDvPP/ltl/t