# Computing with Oscillators: A Speech Demo

* author: Nand Chandravadia

***

This tutorial notebook is organized around three main concepts:

- **Part 1: Speech Data**
- **Part 2: Oscillator Model**

In [None]:
%pip install -q --force-reinstall airavata-python-sdk[notebook]

import airavata_jupyter_magic


%authenticate

%request_runtime hpc_cpu --file=cybershuttle.yml --walltime=120 --use=NeuroData25VC1:cloud,expanse:shared,anvil:shared

%switch_runtime hpc_cpu

!git clone https://github.com/cyber-shuttle/NeuroDATA_2025 workspace
%cd workspace

## Part 1: Speech Data

Speech is typically recorded on a microphone, such as the one on your cell-phone or laptop. Standard audio recorders usually record at sampling rates of 48 kHz. Here, we will look at two speech datasets of English and Arabic Speech. 

In [None]:
import numpy as np
import torch
import torchaudio
import matplotlib.pyplot as plt
import seaborn as sns
from IPython.display import Audio, Image, display
from torch.distributions.uniform import Uniform

device  = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Running on Device: {}".format(device))


plt.style.use('seaborn-v0_8')


### Load Speech Data

All the speech data is found in `/data'

In [None]:
audio_path = "./data/7_06_47.wav"
audio_path = "./data/0_60_25.wav"

In [None]:
def load_speech(path):

    #load raw data
    audio, sample_rate = torchaudio.load(path)

    target_length = 48000
    current_length = audio.shape[1]

    padding = torch.zeros((audio.shape[0], target_length - current_length))
    audio = torch.cat((audio, padding), dim=1)
    

    #downsample to 8kHz
    original_sampling_rate = 48000
    new_sampling_rate = 8000
    
    transform = torchaudio.transforms.Resample(orig_freq=original_sampling_rate, 
                                            new_freq=new_sampling_rate)

    #appy new sampling rate
    signal = transform(audio)
    
    #apply a normalization [-1, 1]
    max_value = signal[0,:].abs().max()
    new_signal=signal[0,:]*(1/max_value)
    

    return new_signal, new_sampling_rate


In [None]:
new_signal, new_sampling_rate = load_speech(audio_path)

In [None]:
Audio(data=new_signal, rate=new_sampling_rate)

In [None]:
time = np.linspace(start = 0, stop = len(new_signal), num = len(new_signal))

fig, axes = plt.subplots(1,1, figsize = (18, 5))

axes.plot(time, new_signal, color = 'black')


axes.grid(True)

# Part 2: Model: Network of Coupled Oscillators

Now, let's look at the form and structure of the model. 

<div style="text-align: center;">
  <img src="assets/network_oscillators.png" alt="Oscillatory Network" width="500" height="500"/>
</div>

In [None]:
from model import coRNN
import yaml

In [None]:
#User Specify Model
language = "English" #{"English, Arabic"}
model_number = 1 #{1,2}
isTrained = True

######################################
model_id = language + "_" + str(model_number)
network_path_trained = "./models/" + language + "_" + str(model_number) + "_" + "trained" + ".pth"
network_path_untrained = "./models/" + language + "_" + str(model_number) + "_" + "untrained" + ".pth"

In [None]:
def load_hyperparameters(model_id, device):
    # Load the hyperparams file
    with open("hyperparams.yaml", "r") as f:
        config = yaml.safe_load(f)

    # Choose which model config to use
    model_config = config["models"][model_id]

    # Set the seed
    generator = torch.Generator(device=device)
    generator.manual_seed(model_config["random_seed"])

    # Define frequency range
    low_frequency, high_frequency = 0.1, 20
    gamma_tensor = (high_frequency - low_frequency) * torch.rand((1, model_config["n_hid"]), generator=generator, device=device) + low_frequency

    # Define damping range
    low_damping, high_damping = 0.1, 80
    epsilon_tensor = (high_damping - low_damping) * torch.rand((1, model_config["n_hid"]), generator=generator, device=device) + low_damping

    
    # specific hyperparameters    
    params = {
        "network_type": model_config["network_type"],
        "n_inp": model_config["n_inp"],
        "n_hid": model_config["n_hid"],
        "n_out": model_config["n_out"],
        "dt": model_config["dt"],
        "learning_rate": model_config["learning_rate"],
        "random_seed": model_config["random_seed"]
    }

    return params, gamma_tensor, epsilon_tensor
    

In [None]:
params, gamma_tensor, epsilon_tensor = load_hyperparameters(model_id, device)

In [None]:
print(params)

In [None]:
print(gamma_tensor)

In [None]:
print(epsilon_tensor)

In [None]:
#load the untrained and trained model

def load_model(network_path, params, gamma_tensor, epsilon_tensor, device):

    #params
    network_type = params["network_type"]
    n_inp = params["n_inp"]
    n_hid = params["n_hid"]
    n_out = params["n_out"]
    dt = params["dt"]

    #load model
    model = coRNN(network_type = network_type, n_inp = n_inp, n_hid = n_hid,
                                  n_out = n_out, dt = dt, 
                                  gamma = gamma_tensor, epsilon = epsilon_tensor)

    # Load the saved state dictionary into the model
    model.load_state_dict(torch.load(network_path, map_location=device))

    # Set the model to evaluation mode
    model.eval()
    
    return model

In [None]:
untrained_model = load_model(network_path=network_path_untrained, params=params, gamma_tensor=gamma_tensor, epsilon_tensor=epsilon_tensor, device=device)
trained_model = load_model(network_path=network_path_trained, params=params, gamma_tensor=gamma_tensor, epsilon_tensor=epsilon_tensor, device=device)

untrained_model_weights = untrained_model.state_dict()
trained_model_weights = trained_model.state_dict()

In [None]:
#LOAD MODEL WEIGHTS

def load_model_weights(model):

    #load model weights!
    recurrent_weights = model["cell.R.weight"]
    recurrent_velocity_weights = model["cell.F.weight"]

    return recurrent_weights, recurrent_velocity_weights

In [None]:
# Visualize Model WEIGHTS
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

untrained_weights, untrained_damping = load_model_weights(untrained_model_weights)
trained_weights, trained_damping = load_model_weights(trained_model_weights)

index_start, index_end = 0, 64
# Plot the first heatmap on the left subplot
sns.heatmap(untrained_weights[index_start:index_end, index_start:index_end], ax=axes[0,0], cmap='coolwarm', linewidth=0.5)
axes[0,0].set_title('Untrained Weight Matrix')

sns.heatmap(untrained_damping[index_start:index_end, index_start:index_end], ax=axes[1,0], cmap='coolwarm', linewidth=0.5)
axes[1,0].set_title('Untrained Damping Matrix')


# Plot the second heatmap on the right subplot
sns.heatmap(trained_weights[index_start:index_end, index_start:index_end], ax=axes[0,1],cmap='coolwarm', linewidth=0.5)
axes[0,1].set_title('Trained Weight Matrix')

sns.heatmap(trained_damping[index_start:index_end, index_start:index_end], ax=axes[1,1],cmap='coolwarm', linewidth=0.5)
axes[1,1].set_title('Trained Damping Matrix')

# Adjust the spacing between subplots
plt.tight_layout()

# Display the plot
plt.show()

In [None]:
#Feed the Model Speech!

#format input
new_signal = new_signal.reshape(1, 1, 8000) 
new_signal = new_signal.permute(2, 0, 1)
input_signal = new_signal


save_output, save_hy, save_hz, save_activation = trained_model(input_signal)

In [None]:
#What does the model think?

def plot_response(signal, output, axes):

    SAMPLING_FREQUENCY = 8000
    TIME=1
    
    input = signal[:,0,0]
    output = output[:, 0, :]
    softmax = torch.nn.Softmax(dim=1)
    output = softmax(output)


    color_map = {
    0: "navy",
    1: "darkgreen",
    2: "maroon",
    3: "purple",
    4: "teal",
    5: "olive",
    6: "sienna",
    7: "royalblue",
    8: "darkorange",
    9: "indigo"}

 

    time = torch.arange(start=0, end=TIME, step=1/SAMPLING_FREQUENCY)

    for target in range(0,10):
        axes.plot(time, output.detach()[:, target], label = target, color = color_map[target], alpha = 0.7, linewidth=3)


    #set axes
    title = "Model Prediction"
    axes.set_title(title, fontsize=24)
    
    axes.set_xlabel("Time (in seconds)", fontsize=18)
    axes.set_ylabel("Probability", fontsize=18)
    axes.set_ylim(-0.05,1.05)
    axes.tick_params(axis='both', labelsize=18)  # Set tick label size 
    axes.legend(loc='center left', bbox_to_anchor=(1, 0.5), fontsize=15)
    axes.grid(True)


    return


In [None]:
fig, axes = plt.subplots(1, 1, figsize=(18,5))

plot_response(signal=input_signal, 
            output=save_output, 
            axes = axes)