In [1]:
from IPython.display import Audio
import torch
import os
from scipy.io import wavfile
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, Sampler, DataLoader
import numpy as np

# Play the .wav file
Audio("digits/6.wav")

In [2]:


class WaveformDataset(Dataset):
    def __init__(self, directory, t_input, max_len, terminal_pad):
        """
        directory: Directory containing the .wav files.
        t_input: Time input array for all files.
        max_len: Maximum length of time steps needed for all files.
        terminal_pad: Number of zeros to pad at the end of each audio file.
        """
        self.directory = directory
        self.files = sorted([f for f in os.listdir(directory) if f.endswith('.wav')], key=lambda x: int(x.split('.')[0]))
        self.t_input = t_input[:max_len]  # Truncate t_input to the maximum required length
        self.terminal_pad = terminal_pad  # Fixed number of zeros to pad
        self.wav_data_list = [self._load_and_pad(os.path.join(directory, f)) for f in self.files]  # Load and pad files
        self.file_indices = []
        self.total_length = 0

        # Calculate lengths of all files and their indices
        for i, wav_data in enumerate(self.wav_data_list):
            length = wav_data.size(1)  # Assuming data is [channels, time], we take the time dimension
            self.file_indices.extend([(i, j) for j in range(length)])
            self.total_length += length

    def _load_and_pad(self, file_path):
        """
        Helper function to load, normalize, and pad the audio file.
        """
        sample_rate, data = wavfile.read(file_path)
        data = torch.tensor(data).unsqueeze(0)  # Convert to tensor and add channel dimension

        # Normalize the data to the range [-1, 1] based on int16
        if data.dtype == torch.int16:
            data = data / 32768.0  # Normalize int16 data
        elif data.dtype == torch.int32:
            data = data / 2147483648.0  # Normalize int32 data
        elif data.dtype == torch.float32:
            pass  # If it's already float, assume it's in [-1, 1]

        # Pad the data with zeros at the end
        pad_length = self.terminal_pad
        data_padded = torch.nn.functional.pad(data, (0, pad_length), mode='constant', value=0)
        #print(data_padded.shape, "data padded")
        return data_padded

    def _generate_target(self, wav_data):
        """
        Helper function to generate the target tensor.
        The target will have 1 in all positions except for the final terminal_pad zeros.
        """
        target = torch.ones_like(wav_data)  # Create a target tensor with all ones
        # Set the last terminal_pad positions to zero
        target[:, -self.terminal_pad:] = 0
        #print(target.shape, "target shape")
        return target

    def __len__(self):
        return self.total_length

    def __getitem__(self, idx):
        file_idx, local_idx = self.file_indices[idx]
        wav_data = self.wav_data_list[file_idx][:, local_idx]  # Slice based on channel and index
        t_step = self.t_input[local_idx]
        target = self._generate_target(self.wav_data_list[file_idx])[:, local_idx]  # Generate the target tensor
        return wav_data, t_step, target, file_idx


In [3]:
class RandomConsecutiveSampler(Sampler):
    def __init__(self, data_source, batch_size, consecutive_size):
        """
        data_source: Dataset that returns (wav_data, time_step, file_idx)
        batch_size: Number of consecutive segments in each batch
        consecutive_size: How many consecutive steps to take for each sampled segment
        """
        self.data_source = data_source
        self.batch_size = batch_size
        self.consecutive_size = consecutive_size

    def __iter__(self):
        indices = np.random.permutation(len(self.data_source) - self.consecutive_size + 1)
        for i in range(0, len(indices), self.batch_size):
            batch_indices = []
            for j in range(i, min(i + self.batch_size, len(indices))):
                start_idx = indices[j]
                batch_indices.extend(range(start_idx, start_idx + self.consecutive_size))
            yield batch_indices

    def __len__(self):
        return (len(self.data_source) - self.consecutive_size ) // self.batch_size


In [4]:
class ConsecutiveDifferenceHigherOrderLossBatch(nn.Module):
    def __init__(self, consecutive_size,order=1):
        super(ConsecutiveDifferenceHigherOrderLossBatch, self).__init__()
        self.consecutive_size = consecutive_size
        self.order = order
    def forward(self, prediction, target):
        pred_reshape = prediction.view(-1, self.consecutive_size)
        target_reshape = target.view(-1, self.consecutive_size)
        result = torch.tensor([0.0])
        
        pred_a = pred_reshape[ 1:, :]
        pred_b = pred_reshape[:-1,:]
        target_a = target_reshape[ 1:, :]
        target_b = target_reshape[:-1,:]
        for i in range(self.order):
            
            pred_dif = pred_a - pred_b
            target_dif = target_a - target_b
            pred_a = pred_dif[ 1:, :]
            pred_b = pred_dif[:-1,:]
            target_a = target_dif[ 1:, :]
            target_b = target_dif[:-1,:]
            
            result +=  torch.mean((pred_dif - target_dif) ** 2)/self.order
        return result
    
class ConsecutiveDifferenceHigherOrderLoss(nn.Module):
    def __init__(self, consecutive_size,order=1):
        super(ConsecutiveDifferenceHigherOrderLoss, self).__init__()
        self.consecutive_size = consecutive_size
        self.order = order
    def forward(self, prediction, target):
        pred_reshape = prediction.view(-1, self.consecutive_size)
        target_reshape = target.view(-1, self.consecutive_size)
        result = torch.tensor([0.0])
        
        pred_a = pred_reshape[:, 1:]
        pred_b = pred_reshape[:, :-1]
        target_a = target_reshape[:, 1:]
        target_b = target_reshape[:, :-1]
        for i in range(self.order):
            
            pred_dif = pred_a - pred_b
            target_dif = target_a - target_b
            pred_a = pred_dif[:, 1:]
            pred_b = pred_dif[:, :-1]
            target_a = target_dif[:, 1:]
            target_b = target_dif[:, :-1]
            
            result +=  torch.mean((pred_dif - target_dif) ** 2)/self.order
        return result



In [5]:
directory = "digits/"

In [6]:
import os
from scipy.io import wavfile
import torch
def get_max_required_length(dir):
    max_length = 0
    
    # Iterate over all files in the directory
    for filename in os.listdir(dir):
        if filename.endswith('.wav'):  # Only process .wav files
            file_path = os.path.join(dir, filename)
            
            # Read the .wav file
            sample_rate, data = wavfile.read(file_path)
            
            # Get the length of the audio file (number of samples)
            file_length = data.shape[0]  # shape[0] gives the number of samples (time dimension)
            
            # Update the max length if this file is longer
            if file_length > max_length:
                max_length = file_length

    return max_length

#import torch
#import torch.nn as nn
#import torch.optim as optim



def binary_sequence_tensor(num_bits, length):
    # Create a tensor of shape (length,) with values from 0 to length - 1
    t_values = torch.arange(1,length+1) #start with 1

    # Create a tensor to store the binary representations
    binary_tensor = ((t_values.unsqueeze(1) >> torch.arange(num_bits)) & 1).float()
    binary_tensor[binary_tensor == 0] = -1
    return binary_tensor

In [10]:
target_pad = 20
bits = 16

max_len = get_max_required_length(directory)

t_input = binary_sequence_tensor(bits, max_len+ target_pad)  # Example, adjust this to match your real t_input



#this is for validating
#t_input = np.linspace(1,max_len + target_pad,max_len + target_pad)

# Instantiate the dataset
dataset = WaveformDataset(directory, t_input, max_len, target_pad)

# Sampler setup as before
batch_size = 10
consecutive_size = 10
sampler = RandomConsecutiveSampler(dataset, batch_size, consecutive_size)

# DataLoader
dataloader = DataLoader(dataset, batch_sampler=sampler)

# Example iteration through dataloader
for batch in dataloader:
    wav_data, t_step, target, file_idx = batch #right now this wraps arround, just fyi.  not sure its a bad thing.

    if sum(target)%10 != 0:
        print("Waveform data:", wav_data)
        print("Time step:", t_step)
        print("Target tensor:", target)
        print("File index:", file_idx.shape)
        break

Waveform data: tensor([[-0.1154],
        [-0.1323],
        [-0.1520],
        [-0.1548],
        [-0.1709],
        [-0.2025],
        [-0.2343],
        [-0.1993],
        [-0.1673],
        [-0.1551],
        [-0.0085],
        [-0.0062],
        [-0.0178],
        [-0.0084],
        [-0.0082],
        [-0.0073],
        [-0.0117],
        [-0.0203],
        [-0.0154],
        [-0.0219],
        [-0.0154],
        [-0.0246],
        [-0.0321],
        [-0.0349],
        [-0.0323],
        [-0.0319],
        [-0.0423],
        [-0.0629],
        [-0.0828],
        [-0.1007],
        [ 0.1937],
        [ 0.1816],
        [ 0.1628],
        [ 0.1466],
        [ 0.1357],
        [ 0.1313],
        [ 0.1233],
        [ 0.1129],
        [ 0.0952],
        [ 0.0854],
        [-0.0769],
        [-0.0817],
        [-0.0603],
        [-0.0706],
        [ 0.0197],
        [ 0.0768],
        [ 0.1479],
        [ 0.2090],
        [ 0.1808],
        [ 0.1524],
        [-0.0365],
        [-0.0377

In [13]:
#SA model here
from swissarmy import SeqModel

config = {
    't_seq_bits': 10,  # Example value for the input bit size
    't_seq_len': 5,    # Example value for the sequence length
    't_bits': 10,      # Example value for the bits used in the decoder

    'encoder': {
        't_layer_dim': 64,               # Example hidden layer dimension for encoder
        't_num_layers': 2,                # Example number of layers in the encoder's initial layer
        'fc_layers': 3,                   # Example number of fully connected layers in the encoder
        'encoder_layers': 2,              # Example number of encoder layers
        'one_hot_vocab_len': 10,          # Vocabulary size for one-hot encoding
        'one_hot_embedding_dim': 32       # Embedding dimension for one-hot encoding
    },

    'decoder': {
        't_layer_dim': 64,                # Example hidden layer dimension for decoder
        't_num_layers': 2,                # Example number of layers in the decoder's initial layer
        'fc_layers': 3,                   # Example number of fully connected layers in the decoder
        'decoder_layers': 2                # Example number of decoder layers
    },

    'output': {
        'mse_output_layers': 2,           # Number of layers in the MSE output head
        'mse_dim': 64,                     # Hidden dimension for the MSE output head
        'bce_output_layers': 2,            # Number of layers in the BCE output head
        'bce_dim': 64                      # Hidden dimension for the BCE output head
    }
}


model = SeqModel(config)

In [14]:
model

SeqModel(
  (encoder): SeqEncoder(
    (initial_layer): SwissArmyLayer(
      (t_layers): ModuleList(
        (0): Linear(in_features=10, out_features=64, bias=True)
        (1): Linear(in_features=64, out_features=64, bias=True)
      )
      (embedding): Embedding(11, 32, padding_idx=10)
      (layers): ModuleList(
        (0-2): 3 x Linear(in_features=96, out_features=96, bias=True)
      )
    )
    (encoder_layers): ModuleList(
      (0): SwissArmyLayer(
        (t_layers): ModuleList(
          (0): Linear(in_features=10, out_features=64, bias=True)
          (1): Linear(in_features=64, out_features=64, bias=True)
        )
        (embedding): Embedding(11, 32, padding_idx=10)
        (layers): ModuleList(
          (0-2): 3 x Linear(in_features=192, out_features=192, bias=True)
        )
      )
      (1): SwissArmyLayer(
        (t_layers): ModuleList(
          (0): Linear(in_features=10, out_features=64, bias=True)
          (1): Linear(in_features=64, out_features=64, bias=

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

# Example usage:
total_params = count_parameters(model)
print(f"Total trainable parameters: {total_params}")
model

In [None]:


# Hyperparameters
hyperparams = {
    "num_digits": 10,        # Number of possible digits
    "embedding_dim": 64,      # Size of the embedding space
    "bits": 16,               # Number of bits in the input tensor
    "hidden_dim": 512,        # Number of units in the hidden layers
    "num_layers": 5,         # Number of hidden layers
    "batch_size": 100,        # Batch size
    "consecutive_size": 20,   
    "terminal_pad": 50      # Amount of padding for each audio file
}

# Instantiate the model
model = DigitEmbeddingModel(
    num_digits=hyperparams['num_digits'], 
    embedding_dim=hyperparams['embedding_dim'], 
    bits=hyperparams['bits'], 
    hidden_dim=hyperparams['hidden_dim'], 
    num_layers=hyperparams['num_layers']
)

##add more layers onto the embedding before t? res blocks? 

In [None]:
#hyperparams['consecutive_size'] = 50
#hyperparams['batch_size'] = 10

In [None]:
# Optimizer and loss functions
optimizer = optim.Adam(model.parameters(), lr=0.00001)
mse_loss_fn = nn.MSELoss()
bce_loss_fn = nn.BCELoss()
cdifb_loss = ConsecutiveDifferenceHigherOrderLossBatch(hyperparams['consecutive_size'],order=3)
cdif_loss = ConsecutiveDifferenceHigherOrderLoss(hyperparams['consecutive_size'],order=3)

# Dataset and DataLoader setup (using the dataset and sampler we worked on)
directory = "digits/" # Replace with your actual directory path

terminal_pad = hyperparams['terminal_pad']
max_len = get_max_required_length(directory) + terminal_pad
t_input = binary_sequence_tensor( hyperparams['bits'], max_len)



# Instantiate the dataset and sampler
dataset = WaveformDataset(directory, t_input, max_len + terminal_pad, terminal_pad)
sampler = RandomConsecutiveSampler(dataset, hyperparams['batch_size'], consecutive_size=hyperparams['consecutive_size'])
dataloader = DataLoader(dataset, batch_sampler=sampler)

num_epochs = 2

# Training loop
for epoch in range(num_epochs):
    model.train()
    
    for batch in dataloader:
        wav_data, t_step, target, file_idx = batch
        
        bce_output, mse_output = model(file_idx, t_step)
        
        # Compute losses
        mse_loss = mse_loss_fn(mse_output*target, wav_data)  # Assuming the target is for MSE
        bce_loss = bce_loss_fn(bce_output, target)  # Assuming the target is for BCE
        cdif = cdif_loss(mse_output*target, wav_data)
        #bc = bc_loss(outputs, targets)
        cdif_b = cdifb_loss(mse_output*target, wav_data)
        
        
        # Combine losses (you can weight them if needed)
        total_loss = mse_loss + 0.1*bce_loss + 1.6*cdif + 0.4*cdif_b
        
        # Backpropagation and optimization
        optimizer.zero_grad()
        total_loss.backward()
        optimizer.step()

    # Print progress for each epoch
    print(f"Epoch {epoch+1}/{num_epochs} MSE: {mse_loss.item():.6f} BCE: {bce_loss.item():.6f} CDIF: {cdif.item():.6f} CDIF_B: {cdif_b.item():.6f} Total Loss: {total_loss.item():.8f}")

print("all done sweetheart <3")

In [None]:
import scipy.io.wavfile as wavfile
file_path = 'digits/2.wav'
sample_rate, file_data = wavfile.read(file_path)


In [None]:
file_num = 6
file_idx = (torch.ones((t_input.shape[0])) * file_num).to(int)

t_step = t_input

all_bce_outputs = []
all_mse_outputs = []

batch_size = 20
# Loop through batches without shuffling
for i in range(0, len(file_idx), batch_size):
    batch_file_idx = file_idx[i:i+batch_size]
    batch_t_step = t_step[i:i+batch_size]
    
    # Pass the batch through the model
    with torch.no_grad():
        bce_output, mse_output = model(batch_file_idx, batch_t_step)
        
    # Append outputs to the lists
    all_bce_outputs.append(bce_output)
    all_mse_outputs.append(mse_output)
    
    print(f"Processed batch {i // batch_size + 1}")

# Concatenate all batches into final tensors
final_bce_output = torch.cat(all_bce_outputs, dim=0)
final_mse_output = torch.cat(all_mse_outputs, dim=0)

# Now you have the complete outputs for all batches
print("Final BCE output shape:", final_bce_output.shape)
print("Final MSE output shape:", final_mse_output.shape)


Processed batch 418
Processed batch 419
Processed batch 420
Processed batch 421
Processed batch 422
Processed batch 423
Processed batch 424
Processed batch 425
Processed batch 426
Processed batch 427
Processed batch 428
Processed batch 429
Processed batch 430
Processed batch 431
Processed batch 432
Processed batch 433
Processed batch 434
Processed batch 435
Processed batch 436
Processed batch 437
Processed batch 438
Processed batch 439
Processed batch 440
Processed batch 441
Processed batch 442
Processed batch 443
Processed batch 444
Processed batch 445
Processed batch 446
Processed batch 447
Processed batch 448
Processed batch 449
Processed batch 450
Processed batch 451
Processed batch 452
Processed batch 453
Processed batch 454
Processed batch 455
Processed batch 456
Processed batch 457
Processed batch 458
Processed batch 459
Processed batch 460
Processed batch 461
Processed batch 462
Processed batch 463
Processed batch 464
Processed batch 465
Processed batch 466
Processed batch 467


In [None]:
import torch
import matplotlib.pyplot as plt

# Assuming final_bce_output and final_mse_output are already computed
# and have been concatenated from batches

# Example plotting code for final_bce_output and final_mse_output

# Plot BCE Output
plt.figure(figsize=(15, 6))
plt.plot(final_bce_output.numpy(), label="BCE Output")
plt.title("BCE Output over Time Steps")
plt.xlabel("Time Step")
plt.ylabel("BCE Output")
plt.legend()
plt.grid(True)
plt.show()

# Plot MSE Output
plt.figure(figsize=(15, 6))

plt.plot(final_mse_output.numpy(), label="MSE Output")

plt.title("MSE Output over Time Steps")
plt.xlabel("Time Step")
plt.ylabel("MSE Output")
plt.legend()
plt.grid(True)
plt.show()


In [None]:
# Assuming final_bce_output is a 1D tensor with the accumulated BCE outputs

# Find the index of the first value below 0.5
below_threshold_indices = (final_bce_output < 0.5).nonzero(as_tuple=True)[0]
first_below_threshold_idx = 0
if len(below_threshold_indices) > 0:
    first_below_threshold_idx = below_threshold_indices[0].item()  # Get the first index
    print(f"The first instance where BCE output is below 0.5 is at index {first_below_threshold_idx}")
else:
    print("No value below 0.5 was found in the BCE output.")


In [None]:
first_below_threshold_idx

In [None]:
# Plot MSE Output
plt.figure(figsize=(15, 6))
plt.plot(file_data/32768, label="Target Output")
plt.plot(final_mse_output.numpy()[:first_below_threshold_idx], label="MSE Output")

plt.title("MSE Output over Time Steps")
plt.xlabel("Time Step")
plt.ylabel("MSE Output")
plt.legend()
plt.grid(True)
plt.show()
##sibilant

In [None]:
import torch
import numpy as np
from scipy.io.wavfile import write


def tensor_to_wav(tensor, filename, sample_rate=44100,cut_off=-1 ):
    # Convert tensor to numpy array and detach if needed
    data = tensor.detach().cpu().numpy()[:cut_off]
    # Normalize to the range [-1, 1]
    #data = data / np.max(np.abs(data))

    # Convert to 16-bit PCM format (values between -32768 and 32767)
    data_int16 = np.int16(data * 32768)

    # Write the .wav file
    write(filename, sample_rate, data_int16)
    print(f"Saved as {filename}")

# Example usage with your model predictions (assuming predictions are in range -1 to 1):
# predictions is the output tensor from the model
tensor_to_wav(final_mse_output, "test_2.wav", sample_rate=sample_rate,cut_off = first_below_threshold_idx)

In [None]:
from IPython.display import Audio

# Play the .wav file
Audio("test_2.wav")

In [None]:
##torch.save(model, "first_try_digits.pth") #this is the first one!!!!! it is intelligible, even tho it is grainy

In [None]:
#torch.save(model, "first_try_digits_cdif.pth")

In [None]:
#torch.save(model, "two_wide_digits_cdif.pth")


In [None]:
#torch.save(model, "five_wide_digits_cdif.pth")

In [None]:
model = torch.load("five_wide_digits_cdif.pth")

# Set the model to evaluation mode (if you're using it for inference)
model.eval()

In [None]:
if you have a simple embedding for each digit, what if its more complicated? every digit was a little neural network.

forget about that.  
higher than 16, 256? fundamentally if you are 
limited by categories.  those are the words.  

maybe start with some embeddings.  combined in some way? 

get this un noisy.  

could you use this to classify digits.  

text to speach kind of thing.  learn the phoneme.  

multiple outputs that sum? look at what the before thing looks like.  

split those earlier? 

why don't we use a sequential model.  

mamba?

mess with the t for the embedding? 

position encoding?!

residual block? 

