In [1]:
!pip install torch-summary



## imports

In [2]:
import torch
from torch import randn
import torch.nn as nn
from torchsummary import summary
import torch.optim as optim
import numpy as np
from torch.utils.data import Dataset, DataLoader

## the model

In [3]:
Tx = 1198
n_freq = 101
Ty = 296

In [4]:
class TW_model(nn.Module):

    def __init__(self, input_channels, conv_out,hidden_size, dropout_prob):
        super(TW_model, self).__init__()

        self.conv_layer_0 = nn.Conv1d(input_channels, conv_out, kernel_size=15, stride=4)
        self.batch_norm_layer_0 = nn.BatchNorm2d(num_features=conv_out)
        self.relu_0 = nn.ReLU()
        self.dropout_layer_0 = nn.Dropout(p=dropout_prob)

        self.gru_layer_1 = nn.GRU(conv_out, hidden_size, batch_first=True)
        self.dropout_layer_1 = nn.Dropout(p=dropout_prob)
        self.batch_norm_layer_1 = nn.BatchNorm2d(hidden_size)

        self.gru_layer_2 = nn.GRU(hidden_size, hidden_size, batch_first=True)
        self.dropout_layer_2_0 = nn.Dropout(p=dropout_prob)
        self.batch_norm_layer_2 = nn.BatchNorm2d(hidden_size)
        self.dropout_layer_2_1 = nn.Dropout(p=dropout_prob)

        self.dense_3 = nn.Linear(in_features=hidden_size, out_features=1)
        self.sigmoid_3 = nn.Sigmoid()


    def forward(self, x):
        x = x.reshape(n_freq,Tx)

        x = self.conv_layer_0(x)

        x = x.reshape(1, 1,x.shape[1],x.shape[0])
        x = x.permute(0, 3, 1, 2)
        x = self.batch_norm_layer_0(x)
        x = self.relu_0(x)
        x = self.dropout_layer_0(x)
        x=x.reshape(1,x.shape[3],x.shape[1])
        x,_ = self.gru_layer_1(x)
        x = self.dropout_layer_1(x)
        x = x.reshape(1, 1,x.shape[1],x.shape[2])
        x = x.permute(0, 3, 1, 2)
        x = self.batch_norm_layer_1(x)
        x=x.reshape(1,x.shape[3],x.shape[1])
        x,_ = self.gru_layer_2(x)
        x = self.dropout_layer_2_0(x)
        x = x.reshape(1, 1,x.shape[1],x.shape[2])
        x = x.permute(0, 3, 1, 2)
        x = self.batch_norm_layer_2(x)
        x = self.dropout_layer_2_1(x)
        x = x.reshape(1, 1,x.shape[3],x.shape[1])
        x = self.dense_3(x)
        output = self.sigmoid_3(x)
        return output


## hyperparameters

In [5]:
model = TW_model(input_channels=n_freq,conv_out=196,hidden_size=128,dropout_prob=0.2)
random_sample = torch.randn(n_freq, Tx)

In [6]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
next(model.parameters()).device

device(type='cuda', index=0)

In [7]:
criterion = nn.BCELoss()
criterion = criterion.cuda()
optimizer = optim.Adam(model.parameters(), lr=0.00002,betas=(0.9,0.999),weight_decay=0.1)
num_epochs = 500

In [8]:
x_train = np.load('/content/drive/MyDrive/Colab Notebooks/my-Trigger-Word-Detection/x_train_small.npy')
y_train = np.load('/content/drive/MyDrive/Colab Notebooks/my-Trigger-Word-Detection/y_train_small.npy')

In [None]:
x_eval = np.load('/content/drive/MyDrive/Colab Notebooks/Trigger-Word-Detection/x_train_small.npy')
y_eval = np.load('/content/drive/MyDrive/Colab Notebooks/Trigger-Word-Detection/y_train_small.npy')

## for training

In [9]:
x_train = np.array_split(x_train,4000)
y_train = np.array_split(y_train,4000)
x_train_m = [i.astype(dtype=np.float32) for i in x_train]
y_train_m = [i.astype(dtype=np.float32) for i in y_train]

class MyDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        return self.data[index], self.labels[index]



train_dataset = MyDataset(x_train_m, y_train)
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True, pin_memory=False)

In [11]:
train_x_gpu = []
train_y_gpu = []
for data, labels in train_loader:
    train_x_gpu.append(data.to(device))
    train_y_gpu.append(labels.to(device))
train_dataset = MyDataset(train_x_gpu, train_y_gpu)
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True, pin_memory=False)



## for the evaluating

In [10]:
x_eval = np.array_split(x_eval,10)
y_eval = np.array_split(y_eval,10)
x_eval_m = [i.astype(dtype=np.float32) for i in x_eval]
y_eval_m = [i.astype(dtype=np.float32) for i in y_eval]

class MyDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        return self.data[index], self.labels[index]



test_dataset = MyDataset(x_eval_m, y_eval_m)
# train_dataset = train_dataset.to(device)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=True, pin_memory=False)
# train_loader = train_loader.to(device)
test_x_gpu = []
test_y_gpu = []
for data, labels in test_loader:
    test_x_gpu.append(data.to(device))
    test_y_gpu.append(labels.to(device))
test_dataset = MyDataset(test_x_gpu, test_y_gpu)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=True, pin_memory=False)

NameError: ignored

In [None]:
sample, labels = next(iter(test_loader))
data = data.to('cpu')
print(data.device)

cpu


# Training

In [None]:
for epoch in range(num_epochs):
    for batch_idx, (data, labels) in enumerate(train_loader):
        labels = torch.tensor(labels, dtype=torch.float32)
        optimizer.zero_grad() # zero gradients
        output = model(data) # forward pass
        loss = criterion(output.squeeze(), labels.squeeze()) # calculate loss
        loss.backward() # backward pass
        optimizer.step() # update weights
    if (epoch+1) % 5 == 0:
      print('Epoch: {}, Loss: {:.4f}'.format(epoch+1, loss.item()))

  labels = torch.tensor(labels, dtype=torch.float32)


Epoch: 5, Loss: 0.0097
Epoch: 10, Loss: 0.2036
Epoch: 15, Loss: 0.1310
Epoch: 20, Loss: 0.2669
Epoch: 25, Loss: 0.2706
Epoch: 30, Loss: 0.1390
Epoch: 35, Loss: 0.1411
Epoch: 40, Loss: 0.1487
Epoch: 45, Loss: 0.1371
Epoch: 50, Loss: 0.2876
Epoch: 55, Loss: 0.3460
Epoch: 60, Loss: 0.1404
Epoch: 65, Loss: 0.2678
Epoch: 70, Loss: 0.2721
Epoch: 75, Loss: 0.2318
Epoch: 80, Loss: 0.2249
Epoch: 85, Loss: 0.1478
Epoch: 90, Loss: 0.2798
Epoch: 95, Loss: 0.2830
Epoch: 100, Loss: 0.1583
Epoch: 105, Loss: 0.2956
Epoch: 110, Loss: 0.1790
Epoch: 115, Loss: 0.1368
Epoch: 120, Loss: 0.1687
Epoch: 125, Loss: 0.1512
Epoch: 130, Loss: 0.1447
Epoch: 135, Loss: 0.2783
Epoch: 140, Loss: 0.1406
Epoch: 145, Loss: 0.1671
Epoch: 150, Loss: 0.3205
Epoch: 155, Loss: 0.2745
Epoch: 160, Loss: 0.3739
Epoch: 165, Loss: 0.2055
Epoch: 170, Loss: 0.1577
Epoch: 175, Loss: 0.1532
Epoch: 180, Loss: 0.3265
Epoch: 185, Loss: 0.2892
Epoch: 190, Loss: 0.3395
Epoch: 195, Loss: 0.2713
Epoch: 200, Loss: 0.2749
Epoch: 205, Loss: 0.

In [None]:
sample_x,sample_y = next(iter(train_loader))
pred = model(sample_x)
print(sample_y)
print(pred)

tensor([[[[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
           0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
           0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
           0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
           0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
           0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
           0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
           0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
           0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 1., 1., 1., 1., 1., 1., 1.,
           1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
           1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
           1., 1., 1., 1., 1., 1., 1., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
           0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0

# saving the model

In [None]:
torch.save(model.state_dict(), "TWDetection_model_cpu.pt")

In [None]:
model.load_state_dict(torch.load('/content/example_model.pt', map_location=device))

<All keys matched successfully>

In [9]:
model = model.to('cpu')
# model = model.to('cuda')
next(model.parameters()).device

NameError: ignored

In [None]:
def detect_tw(filename):
  x = graph_spectrogram(filename)
  print(x.shape)
  x = torch.from_numpy(x)
  print(x.dtype)
  x = x.to(device=device,dtype=torch.float32)
  print(x.device)
  prediction = model(x)
  return prediction

In [None]:
chime_file = "/content/drive/MyDrive/Colab Notebooks/my-Trigger-Word-Detection/chime.wav"
def chime_on_activate(filename, predictions, threshold):
    audio_clip = AudioSegment.from_wav(filename)
    chime = AudioSegment.from_wav(chime_file)
    Ty = predictions.shape[2]
    # Step 1: Initialize the number of consecutive output steps to 0
    consecutive_timesteps = 0
    # Step 2: Loop over the output steps in the y
    for i in range(Ty):
        # Step 3: Increment consecutive output steps
        consecutive_timesteps += 1
        # Step 4: If prediction is higher than the threshold and more than 75 consecutive output steps have passed
        if predictions[0,0,i,0] > threshold and consecutive_timesteps > 75:
            # Step 5: Superpose audio and background using pydub
            audio_clip = audio_clip.overlay(chime, position = ((i / Ty) * audio_clip.duration_seconds)*1000)
            # Step 6: Reset consecutive output steps to 0
            consecutive_timesteps = 0

    audio_clip.export("chime_output.wav", format='wav')

In [None]:
filename = "/content/drive/MyDrive/Colab Notebooks/Trigger-Word-Detection/audio_examples/my_audio.wav"
prediction = detect_tw(filename)
chime_on_activate(filename, prediction,0.7)
IPython.display.Audio("./chime_output.wav")