# Quasi Tracheostomy

This notebook introduces the experimental set-up that we used to bring regular Czech voices closer to that of Jasmi.

## Setting Up

Let's start by importing libraries. We will need [pyannote/embedding](https://huggingface.co/pyannote/embedding) model that requires granted access from the model providers (for details see the [Hugging Face page](https://huggingface.co/pyannote/embedding)). In order to use this notebook successfully, ask for the access, generate a HuggingFace authentication token and store it in `HUGGING_FACE_TOKEN` variable of the [personal_settings.py](personal_settings.py) file. 

In [None]:
import librosa, os, csv, librosa, io, scipy, torch
import matplotlib.pyplot as plt
import numpy as np
from io import BytesIO
from data_processing.tokenize_audio import load_audio
from IPython.display import Audio
import soundfile as sf
from pydub import AudioSegment
from pyannote.audio import Model
from pyannote.audio import Inference
from personal_settings import HUGGING_FACE_TOKEN
from sklearn.decomposition import PCA
from sklearn.manifold import TSNE
from sklearn.cluster import KMeans
from scipy.stats import entropy
from tqdm import tqdm
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
from asr.whisper_config import REGULAR_SPEECH_DIR, PATIENT_SEGMENTED_AUDIO_DIR

model = Model.from_pretrained("pyannote/embedding", 
                              use_auth_token=HUGGING_FACE_TOKEN)
inference = Inference(model, window="whole")


And also define several functions that will help us later

In [None]:
def compute_entropy(classes, labels):
    """
    Compute entropy for each cluster's class distribution.
    """
    unique_clusters = np.unique(labels)
    total_entropy = 0
    for cluster in unique_clusters:
        indices = np.where(labels == cluster)[0]
        class_counts = np.bincount(classes[indices], minlength=np.max(classes) + 1)
        cluster_entropy = entropy(class_counts, base=2)
        total_entropy += cluster_entropy
    return total_entropy / len(unique_clusters)

def plot_clusters(embedding_matrix, cluster_labels, original_classes, class_labels = {0: "Transformed Regular", 1: "Patient"}):

    # PCA reduction to 2 dimensions for visualization
    pca_2d = TSNE(n_components=2)
    spectrograms_pca_2d = pca_2d.fit_transform(embedding_matrix)

    # Scatter plot: colored by K-Means clusters
    plt.figure(figsize=(12, 6))

    plt.subplot(1, 2, 1)
    plt.scatter(
        spectrograms_pca_2d[:, 0],
        spectrograms_pca_2d[:, 1],
        c=cluster_labels,
        cmap='viridis',
        alpha=0.7
    )
    plt.title("t-SNE Projection (Colored by K-Means Clusters)")
    plt.xlabel("Component 1")
    plt.ylabel("Component 2")
    plt.legend()

    # Scatter plot: colored by original classes
    # Assuming `original_classes` contains two distinct classes: 0 and 1
    colors = ['blue', 'red']  # Colors corresponding to the classes

    plt.subplot(1, 2, 2)
    scatter = plt.scatter(
        spectrograms_pca_2d[:, 0],
        spectrograms_pca_2d[:, 1],
        c=original_classes,
        cmap='coolwarm',
        alpha=0.7
    )
    plt.title("t-SNE Projection (Colored by Original Classes)")
    plt.xlabel("Component 1")
    plt.ylabel("Component 2")

    # Create a legend manually
    legend_handles = [
        mpatches.Patch(color=color, label=label) for label, color in zip(class_labels.values(), colors)
    ]
    plt.legend(handles=legend_handles, title="Classes")
    plt.tight_layout()
    plt.show()

Then define the key preprocessing function (not all of them are used to create our quasi tracheostomy speech) but we provide them for your own experiments:
1. `apply_lpc_noise_resynthesis` removes the periodic source of the speech signal.
2. `insert_silence` inserts silence segment into the audio segments.
3. `add_noise` adds gaussian noise to the audio (not used to generate the quasi tracheostomy speech).

In [None]:
def apply_lpc_noise_resynthesis(input_audio: AudioSegment, radLPC=12) -> AudioSegment:
    # Convert AudioSegment to numpy array (mono, float32)
    samples = np.array(input_audio.get_array_of_samples()).astype(np.float32)
    if input_audio.channels == 2:
        samples = samples.reshape((-1, 2))
        samples = samples.mean(axis=1)  # Convert to mono

    sr = input_audio.frame_rate

    # Remove DC component
    samples -= np.mean(samples)

    # Pre-emphasis
    pre_emphasis = 0.8
    samples = scipy.signal.lfilter([1, -pre_emphasis], 1, samples)

    # Parameters for segmentation
    window_length = 512
    shift_ratio = 0.5
    shift = int(window_length * shift_ratio)

    # Hamming window and scaling
    window = np.hamming(window_length + 1)[:-1]
    constant = np.sum(window) / window_length / shift_ratio
    window /= constant

    # Output and coefficient placeholders
    output = np.zeros(len(samples))
    num_segments = (len(samples) - shift) // shift
    coefficients = np.full((num_segments, radLPC), np.nan)

    segment_index = 0
    for i1 in range(0, len(samples) - window_length + 1, shift):
        i2 = i1 + window_length
        segment = samples[i1:i2] * window

        if np.var(segment) > 0:
            r = np.correlate(segment, segment, mode='full')[window_length-1:window_length+radLPC]
            try:
                ar_coeffs = scipy.linalg.solve_toeplitz((r[:-1], r[:-1]), r[1:])
                coefficients[segment_index, :len(ar_coeffs)] = ar_coeffs

                excitation = np.random.uniform(-1, 1, window_length)
                estimated_segment = scipy.signal.lfilter(
                    [np.sqrt(np.var(segment))],
                    np.concatenate([[1], -ar_coeffs]),
                    excitation
                )

                output[i1:i2] += estimated_segment
            except np.linalg.LinAlgError:
                pass  # Skip segments that can't be solved due to numerical issues

        segment_index += 1

    # De-emphasis
    output = scipy.signal.lfilter([1], [1, -pre_emphasis], output)

    # Normalize
    output /= np.max(np.abs(output))

    # Convert back to AudioSegment using in-memory buffer
    buffer = io.BytesIO()
    sf.write(buffer, output, sr, format='WAV')
    buffer.seek(0)
    return AudioSegment.from_file(buffer, format="wav")

def insert_silence(y, sr, num_pauses, length):
    """
    Insert silent segments into an audio signal.
    
    Args:
        y (np.ndarray): The audio signal.
        sr (int): The sampling rate of the audio.
        num_pauses (int): The number of silent segments to insert.
        length (float): The length of each silent segment in seconds.
    
    Returns:
        np.ndarray: The audio signal with silent segments inserted.
    """
    # Calculate the number of samples for the silence
    silence_samples = int(length * sr)
    silence = np.zeros(silence_samples, dtype=y.dtype)
    
    '''# Ensure num_pauses does not exceed the possible number of insertion points
    max_insertions = len(y) + num_pauses * silence_samples
    num_pauses = min(num_pauses, max_insertions // (len(y) + silence_samples))'''
    
    # Generate random insertion points
    insertion_points = np.sort(np.random.choice(len(y), size=num_pauses, replace=False))
    
    # Insert silence at the specified points
    y_with_silence = y.copy()
    offset = 0
    for point in insertion_points:
        point += offset  # Adjust for previously inserted silences
        y_with_silence = np.concatenate((y_with_silence[:point], silence, y_with_silence[point:]))
        offset += silence_samples
    
    return y_with_silence

def add_noise(y, noise_std = 0.01):
    noise = np.random.normal(0, noise_std, y.shape)
    return y + noise

# OPTIONAL: Add your own preprocessing functions 

## Experiment out the Modifications

In the cell below, we actually:
1. Load regular speech and patient's speech audio objects
2. Apply the modifications on the regular speech
3. Cluster the modified audios
4. Compute the intra-cluster entropy

You can specify the modifications yourselves into the `modification` variable.


In [None]:
def process_and_cluster(modification, plot = False, patient_waveforms = None, regular_waveforms = None, **kwargs):
    """
    Process and cluster audio files from two directories.
    
    Args:
        patient_DIR (str): Directory with tracheostomy audios.
        regular_DIR (str): Directory with normal voice audios.
        
    Returns:
        float: Average entropy of original classes in each cluster.
    """
    if patient_waveforms is None or regular_waveforms is None:
        # Load MP3 files
        patient_files = [os.path.join(PATIENT_SEGMENTED_AUDIO_DIR, f) for f in os.listdir(PATIENT_SEGMENTED_AUDIO_DIR)[:60]]
        regular_files = [os.path.join(REGULAR_SPEECH_DIR, f) for f in os.listdir(REGULAR_SPEECH_DIR)[:60]]
        
        # Load waveforms
        patient_waveforms = [librosa.load(f, sr=16_000) for f in tqdm(patient_files, desc="Loading patient_DIR")]
        regular_waveforms = [librosa.load(f, sr=16_000) for f in tqdm(regular_files, desc="Loading regular_DIR")]
        
        # Modify the regular speech to sound more like that of the patient
        regular_waveforms = [modification(waveform, sr, **kwargs) for waveform, sr in regular_waveforms]
        
    # Combine all waveforms
    all_waveforms = regular_waveforms + patient_waveforms
    
    # Compute 64 mel spectrograms
    with torch.no_grad():
        embedding_matrix = []
        for w in all_waveforms:
            sf.write("tmp_sound.wav", w, 16000)
            embedding_matrix.append(inference("tmp_sound.wav")) 
    embedding_matrix = np.array(embedding_matrix)
    print("Embedding shape:",embedding_matrix.shape)
    
    # K-Means clustering into 2 clusters
    kmeans = KMeans(n_clusters=2, random_state=42)
    cluster_labels = kmeans.fit_predict(embedding_matrix)
    
    # Original class labels: 0 for regular_DIR, 1 for PATIENT_DIR
    original_classes = np.array([0] * len(regular_waveforms) + [1] * len(patient_waveforms))
    if plot: plot_clusters(embedding_matrix, cluster_labels, original_classes)
    
    # Compute entropy of original classes within each cluster
    avg_entropy = compute_entropy(original_classes, cluster_labels)
    
    return avg_entropy

# Example usage

# OPTIONAL: Define how the regular speech should be modified to resemble that of our patient
modification = lambda waveform, sr, **kwargs: insert_silence(librosa.effects.time_stretch(apply_lpc_noise_resynthesis(waveform), rate = 1/4), sr, np.random.randint(7,12), 0.125) 

avg_entropy = process_and_cluster(modification, plot = True)
print(f"Average entropy: {avg_entropy}")


## Refine Modifications by Local Hill Climbing

Here we can further refine the results of manual experiments above using a local hill climbing. In order to do that, you have to:

1. Use your version of `modification` from above and replace the specific parameter settings by kwargs.
2. Set the manually discovered parameter setting into the dictionary `kwargs`.
3. In the dictionary `param_ranges` specify ranges of values that should be tried.

In [None]:
import random

def hill_climbing(initial_kwargs, param_ranges, max_iters=100, plot=False):
    """
    Perform hill climbing to maximize intra-cluster entropy.

    Args:
        initial_kwargs (dict): Initial parameter configuration.
        param_ranges (dict): Ranges for each parameter (min, max, step).
        max_iters (int): Maximum iterations to run hill climbing.
        plot (bool): Whether to plot during clustering.

    Returns:
        dict: Optimized parameters.
        float: Maximum entropy achieved.
    """
    # Initialize parameters and compute initial entropy
    current_kwargs = initial_kwargs.copy()
    current_entropy = process_and_cluster(PATIENT_SEGMENTED_AUDIO_DIR, REGULAR_SPEECH_DIR, plot=plot, **current_kwargs)

    print(f"Initial entropy: {current_entropy}")

    for iteration in range(max_iters):
        neighbors = []

        # Generate neighboring configurations by perturbing each parameter
        for param, (min_val, max_val, step) in param_ranges.items():
            # Increase parameter
            if current_kwargs[param] + step <= max_val:
                neighbor_up = current_kwargs.copy()
                neighbor_up[param] += step
                neighbors.append(neighbor_up)

            # Decrease parameter
            if current_kwargs[param] - step >= min_val:
                neighbor_down = current_kwargs.copy()
                neighbor_down[param] -= step
                neighbors.append(neighbor_down)

        # Evaluate neighbors
        best_neighbor = current_kwargs
        best_entropy = current_entropy

        for neighbor in neighbors:
            neighbor_entropy = process_and_cluster(PATIENT_SEGMENTED_AUDIO_DIR, REGULAR_SPEECH_DIR, plot=False, **neighbor)

            # Update if the neighbor is better
            if neighbor_entropy > best_entropy:
                best_entropy = neighbor_entropy
                best_neighbor = neighbor

        # Check if no improvement was found
        if best_entropy == current_entropy:
            print(f"No improvement found at iteration {iteration}. Stopping.")
            break

        # Update current configuration
        current_kwargs = best_neighbor
        current_entropy = best_entropy

        print(f"Iteration {iteration + 1}: Current entropy = {current_entropy}")

    return current_kwargs, current_entropy

# OPTIONAL: Define how the regular speech should be modified to resemble that of our patient
modification = lambda waveform, sr, **kwargs: insert_silence(librosa.effects.time_stretch(apply_lpc_noise_resynthesis(waveform), rate = kwargs['rate']), sr, np.random.randint(kwargs['lower_pauses'],kwargs['higher_pauses']), kwargs['pause_length']) 

# OPTIONAL: Set the parameter settings from above here
kwargs = {
    "rate": 0.25,
    "lower_pauses": 7,
    "higher_pauses": 12,
    "pause_length": 0.125
}

# OPTIONAL: Specify the value ranges that will be searched through here
# Define parameter ranges: (min, max, step)
param_ranges = {
    "rate": (0.1,1,0.1),
    "lower_pauses": (5,8,1),
    "higher_pauses": (10,14,1),
    "pause_length": (0.125,0.5,0.1)
}

# Run hill climbing
optimized_kwargs, max_entropy = hill_climbing(
    kwargs,
    param_ranges,
    max_iters=50,
    plot=True
)

print(f"Optimized parameters: {optimized_kwargs}")
print(f"Maximum entropy: {max_entropy}")
_= process_and_cluster(plot=True, **optimized_kwargs)