In [1]:
import numpy as np
import scipy as sp

import torch
from torch import nn
import os

## Choose computation device (CPU)

In [2]:
if torch.backends.mps.is_available():
    mps_device = torch.device("mps")
    x = torch.ones(1, device=mps_device)
    print ("MPS device found.")
else:
    print ("MPS device not found.")

cpu_device = torch.device("cpu")
    
# Select the device for training
device = cpu_device

print(f"Using {device} device")

MPS device found.
Using cpu device


## User inputs

In [3]:
# EDIT THIS SECTION FOR USER INPUTS
#
name = 'model_0'
in_file = '../data/ts9_test1_in_FP32.wav'
out_file = '../data/ts9_test1_out_FP32.wav'
epochs = 1

train_mode = 0     # 0 = speed training, 
                   # 1 = accuracy training 
                   # 2 = extended training

input_size = 150 
batch_size = 4096 
test_size = 0.2

if not os.path.exists('models/'+name):
    os.makedirs('models/'+name)
else:
    print("A model with the same name already exists. Please choose a new name.")
    exit

A model with the same name already exists. Please choose a new name.


## Define some helper functions

In [4]:
def save_wav(name, data):
    sp.io.wavfile.write(name, 44100, data.flatten().astype(np.float32))

def normalize(data):
    data_max = max(data)
    data_min = min(data)
    data_norm = max(data_max,abs(data_min))
    return data / data_norm

## Pre-processing the data

In [5]:
# Load and Preprocess Data ###########################################
in_rate, in_data = sp.io.wavfile.read(in_file)
out_rate, out_data = sp.io.wavfile.read(out_file)

X_all = in_data.astype(np.float32).flatten()  
X_all = normalize(X_all)
y_all = out_data.astype(np.float32).flatten() 
y_all = normalize(y_all)

# Get the last 20% of the wav data for testing and thee rest for training
X_training, X_testing = np.split(X_all, [int(len(X_all)*.8)])
y_training, y_testing = np.split(y_all, [int(len(y_all)*.8)])
print(f"X_training shape (pre-processing): {X_training.shape}")
print(f"y_training shape (pre-processing): {y_training.shape}")
print(f"X_testing shape (pre-processing): {X_testing.shape}")
print(f"y_testing shape (pre-processing): {y_testing.shape}")

# Create a new array where each element is an array of input_size samples in time order
# Each element of the new array is shifted by one sample from the previous element
indices = np.arange(input_size) + np.arange(len(X_training)-input_size+1)[:,np.newaxis]
indices = torch.from_numpy(indices)
X_training = torch.from_numpy(X_training)
X_ordered_training = torch.zeros_like(indices, dtype=torch.float32)
for i, j in enumerate(indices):
    X_ordered_training[i] = torch.gather(X_training, 0, indices[i])
print(f"X_ordered_training shape: {X_ordered_training.shape}")

# The input size defines the number of samples used for each prediction
# Therefore the first output value that we get is at index input_size-1
y_ordered_training = y_training[input_size-1:]
y_ordered_training = torch.from_numpy(y_ordered_training)
print(f"y_ordered_training shape: {y_ordered_training.shape}")

print(f"The X_ordered_training data is an array, where each element is an array of input_size samples in time order. Therefore the lenght is smaller than the original X_training array (the first {input_size} samples are grouped).")
print(f"The y_ordered_training data is an array, where each element is a single sample. This single sample is the target output for the corresponding X_random_training element, which consists of input samples.")

X_training shape (pre-processing): (6587907,)
y_training shape (pre-processing): (6587907,)
X_testing shape (pre-processing): (1646977,)
y_testing shape (pre-processing): (1646977,)
X_ordered_training shape: torch.Size([6587758, 150])
y_ordered_training shape: torch.Size([6587758])
The X_ordered_training data is an array, where each element is an array of input_size samples in time order. Therefore the lenght is smaller than the original X_training array (the first 150 samples are grouped).
The y_ordered_training data is an array, where each element is a single sample. This single sample is the target output for the corresponding X_random_training element, which consists of input samples.


## Create dataloaders

In [6]:
training_dataset = torch.utils.data.TensorDataset(X_ordered_training, y_ordered_training)
training_dataloader = torch.utils.data.DataLoader(training_dataset, batch_size=batch_size, shuffle=True)

for batch, (X, y) in enumerate(training_dataloader):
    print(f"Batch: {batch}")
    print(f"Shape of X: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    print(f"This is the first audio chunk in the batch: {X[0]}")
    print(f"This is the first target value in the batch: {y[0]}")
    break

Batch: 0
Shape of X: torch.Size([4096, 150])
Shape of y: torch.Size([4096]) torch.float32
This is the first audio chunk in the batch: tensor([ 1.6002e-02,  1.4742e-02,  1.3524e-02,  1.2222e-02,  1.0920e-02,
         9.5762e-03,  8.2322e-03,  6.8882e-03,  5.5441e-03,  4.2001e-03,
         2.8981e-03,  1.5960e-03,  3.3601e-04, -9.2402e-04, -2.1000e-03,
        -3.2761e-03, -4.3681e-03, -5.3761e-03, -6.3001e-03, -7.1402e-03,
        -7.8542e-03, -8.4422e-03, -8.9042e-03, -9.2822e-03, -9.4922e-03,
        -9.5762e-03, -9.4922e-03, -9.3662e-03, -9.1562e-03, -8.8622e-03,
        -8.5262e-03, -8.1482e-03, -7.7282e-03, -7.2662e-03, -6.8041e-03,
        -6.3841e-03, -5.8801e-03, -5.4181e-03, -4.9141e-03, -4.4101e-03,
        -3.8221e-03, -3.2341e-03, -2.5621e-03, -1.8480e-03, -1.0080e-03,
        -1.2600e-04,  9.2402e-04,  2.0160e-03,  3.2761e-03,  4.5361e-03,
         5.9221e-03,  7.3502e-03,  8.8622e-03,  1.0416e-02,  1.2054e-02,
         1.3650e-02,  1.5288e-02,  1.6884e-02,  1.8480e-02,  2.

## Define the model

In [7]:
'''This is a similar PyTorch implementation of the LSTM model from the paper:
    "Real-Time Guitar Amplifier Emulation with Deep Learning"
    https://www.mdpi.com/2076-3417/10/3/766/htm

    Uses a stack of two 1-D Convolutional layers, followed by LSTM, followed by 
    a Dense (fully connected) layer. Three preset training modes are available, 
    with further customization by editing the code. A PyTorch model 
    is implemented here.

    Note: RAM may be a limiting factor for the parameter "input_size". The wav data
      is preprocessed and stored in RAM, which improves training speed but quickly runs out
      if using a large number for "input_size".  Reduce this if you are experiencing
      RAM issues.
'''

if train_mode == 0:         # Speed Training
    learning_rate = 0.01 
    conv1d_strides = 12   
    conv1d_1_strides = 12
    conv1d_filters = 16
    hidden_units = 36
elif train_mode == 1:       # Accuracy Training (~10x longer than Speed Training)
    learning_rate = 0.01 
    conv1d_strides = 4
    conv1d_filters = 36
    hidden_units= 64
else:                       # Extended Training (~60x longer than Accuracy Training)
    learning_rate = 0.0005 
    conv1d_strides = 3
    conv1d_filters = 36
    hidden_units= 96

# Define model ########################################################

class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.pad = nn.ConstantPad1d(padding=12, value=0)
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=conv1d_filters, kernel_size=12, stride=conv1d_strides)
        self.conv2 = nn.Conv1d(in_channels=conv1d_filters, out_channels=conv1d_filters, kernel_size=12, stride=conv1d_strides)
        self.lstm = nn.LSTM(input_size=3, hidden_size=hidden_units, batch_first = True)
        self.linear = nn.Linear(in_features=hidden_units, out_features=1)

    def forward(self, x):
        x = self.pad(x)
        x = self.conv1(x)
        x = self.pad(x)
        x = self.conv2(x)
        output, (hidden, cell) =  self.lstm(x)
        x = self.linear(cell)
        return x

model = NeuralNetwork().to(device)
print(model)

# Define loss function and optimizer ##################################
loss_fn = nn.MSELoss(reduction='mean')
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

NeuralNetwork(
  (pad): ConstantPad1d(padding=(12, 12), value=0)
  (conv1): Conv1d(1, 16, kernel_size=(12,), stride=(12,))
  (conv2): Conv1d(16, 16, kernel_size=(12,), stride=(12,))
  (lstm): LSTM(3, 36, batch_first=True)
  (linear): Linear(in_features=36, out_features=1, bias=True)
)


In [8]:
# Define training procedure ############################################

def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        # Preprocess input and target data
        X, y = X.to(device), y.to(device)
        X = X.unsqueeze(1)
        y = y.unsqueeze(0)
        y = y.unsqueeze(2)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

## Train the model

In [9]:
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(training_dataloader, model, loss_fn, optimizer)
print("Done!")

torch.save(model.state_dict(), "models/"+name+"/"+name+".pth")
print("Saved PyTorch Model State to model.pth")

Epoch 1
-------------------------------
loss: 0.026680  [ 4096/6587758]
loss: 0.001680  [413696/6587758]
loss: 0.000810  [823296/6587758]
loss: 0.000564  [1232896/6587758]
loss: 0.000443  [1642496/6587758]
loss: 0.000426  [2052096/6587758]
loss: 0.000371  [2461696/6587758]
loss: 0.000399  [2871296/6587758]
loss: 0.000357  [3280896/6587758]
loss: 0.000343  [3690496/6587758]
loss: 0.000332  [4100096/6587758]
loss: 0.000327  [4509696/6587758]
loss: 0.000317  [4919296/6587758]
loss: 0.000328  [5328896/6587758]
loss: 0.000331  [5738496/6587758]
loss: 0.000304  [6148096/6587758]
loss: 0.000317  [6557696/6587758]
Done!
Saved PyTorch Model State to model.pth


## Run predictions
### 1. On the test audio data

In [10]:
# Set the model to evaluate mode #################################
model.eval()

# Run Prediction #################################################
# Test the model on the testing data #############################

# pre-processing X_testing data
indices_testing = np.arange(input_size) + np.arange(len(X_testing)-input_size+1)[:,np.newaxis]
indices_testing = torch.from_numpy(indices_testing)
X_testing_tensor = torch.from_numpy(X_testing)
X_ordered_testing = torch.zeros_like(indices_testing, dtype=torch.float32)
for i, j in enumerate(indices_testing):
    X_ordered_testing[i] = torch.gather(X_testing_tensor, 0, indices[i])

# pre-processing y_testing data
y_ordered_testing = y_testing[input_size-1:]
y_ordered_testing = torch.from_numpy(y_ordered_testing)

# Create dataloader for testing ##################################
testing_dataset = torch.utils.data.TensorDataset(X_ordered_testing, y_ordered_testing)
testing_dataloader = torch.utils.data.DataLoader(testing_dataset, batch_size=batch_size)

# Run prediction ##################################################
prediction = torch.zeros(0).to(device)

print("Running prediction..")
with torch.no_grad():
    for X, _ in testing_dataloader:
        X = X.to(device)
        X = X.unsqueeze(1)
        predicted_batch = model(X)
        prediction = torch.cat((prediction, predicted_batch.flatten()), 0)

save_wav('models/'+name+'/y_pred.wav', prediction.cpu().numpy())
save_wav('models/'+name+'/x_test.wav', X_testing)
save_wav('models/'+name+'/y_test.wav', y_testing)

print("X_testing shape: ", X_testing.shape)
print("X_ordered_testing shape: ", X_ordered_testing.shape)
print("y_testing shape: ", y_testing.shape)
print("prediction shape: ", prediction.shape)

print("Note that the prediction shape is smaller than the y_testing shape. This is because the first predicted sample needs input_size samples for prediction.\n")


Running prediction..
X_testing shape:  (1646977,)
X_ordered_testing shape:  torch.Size([1646828, 150])
y_testing shape:  (1646977,)
prediction shape:  torch.Size([1646828])
Note that the prediction shape is smaller than the y_testing shape. This is because the first predicted sample needs input_size samples for prediction.



### 2. On a number sequence (to control inference)

In [11]:
# Test the model simple number sequence to compare with inference #
X_testing_2 = np.array([], dtype=np.float64)

for i in range(0, 150):
    X_testing_2 = np.append(X_testing_2, i*0.001)

X_testing_2 = np.expand_dims(X_testing_2, axis=0)
X_testing_2 = np.expand_dims(X_testing_2, axis=0)

X_testing_2 = np.reshape(X_testing_2, (1, 1, 150))

X_testing_2 = torch.from_numpy(X_testing_2).double()

print("Running prediction..")
model = model.float()

prediction_2 = model(X_testing_2.to(device).float())

print(f"prediction {prediction_2}")

print("X_testing_2 shape: ", X_testing_2.shape)
print("prediction_2 shape: ", prediction_2.shape)

Running prediction..
prediction tensor([[[-0.2674]]], grad_fn=<ViewBackward0>)
X_testing_2 shape:  torch.Size([1, 1, 150])
prediction_2 shape:  torch.Size([1, 1, 1])


## Export as pt model

In [12]:
# An example input you would normally provide to your model's forward() method.
example = torch.rand(1, 1, 150).to(device)

# Use torch.jit.trace to generate a torch.jit.ScriptModule via tracing.
traced_script_module = torch.jit.trace(model, example)
traced_script_module.save("models/"+name+"/"+name+".pt")