### Imports

In [1]:
import librosa
import os
import numpy as np
import matplotlib.pyplot as plt
import IPython

In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
import torchvision.datasets as datasets
import torchvision.transforms as transforms

### CSV

In [6]:
import csv

In [7]:
markers = []
p = '2021-2-11-kick_markers/2021-2-12-kick_markers_markers.csv'
with open(p, 'r', newline='\n') as csvfile:
    r = csv.reader(csvfile, delimiter=',')
    markers = [np.array(row) for row in r]

In [8]:
kick_markers = [int(x[2]) for x in markers[1:]]
# print(kick_markers)

In [9]:
markers = []
p = '2021-2-11-kick_markers/2021-2-12-snare_markers_markers.csv'
with open(p, 'r', newline='\n') as csvfile:
    r = csv.reader(csvfile, delimiter=',')
    markers = [np.array(row) for row in r]

In [10]:
snare_markers = [int(x[2]) for x in markers[1:]]
# print(snare_markers)

## Data chopping

In [12]:
RATE = 44100
def plotSound(data):
    fig, ax = plt.subplots(nrows=1,ncols=1, figsize=(20,4))
    plt.plot(data, color='blue')
    ax.set_xlim((0, len(data)))
    plt.show()
    IPython.display.display(IPython.display.Audio(data=data, rate=RATE))

In [13]:
p = "2021-2-11-kick_markers/2021-2-11-kick_markers.wav"
a1, sr = librosa.load(p, sr=RATE)
print(len(a1))

9051983


In [14]:
kick_windows = [(m, a1[m-256:m+512*8]) for m in kick_markers]

In [15]:
# for w in kick_windows:
#     plotSound(w[1])

In [16]:
p = "2021-2-11-kick_markers/2021-2-12-snare_markers.wav"
a2, sr = librosa.load(p, sr=RATE)
print(len(a2))

9096083


In [17]:
snare_windows = [(m, a2[m-256:m+512*8]) for m in snare_markers]

In [18]:
# for w in snare_windows:
#     plotSound(w[1])

In [19]:
# since latest_start is earliest+128, the onset will never be at 0, but
# always shifted to the right. I'm doing this to encourage the model 
# to see onsets earlier (when the onset represents later data in the frame)
# rather than later, but I'm not sure if it's a good idea or not. 
# Might want to do latest_start = earliest_start+256

length = 512
earliest_start = 0
latest_start   = earliest_start+128
earliest_stop  = earliest_start+length # 0+512=512
latest_stop    = earliest_stop+128 # 512+128=640

labels = []
frames = []
markers = []

In [20]:
for marker, data in kick_windows:
    fs = [data[i:i+length] for i in range(earliest_start, latest_start)]
    labels.extend(['kick']*len(fs))
    frames.extend(fs)
    markers.extend([marker]*len(fs))

In [21]:
for marker, data in snare_windows:
    fs = [data[i:i+length] for i in range(earliest_start, latest_start)]
    labels.extend(['snare']*len(fs))
    frames.extend(fs)
    markers.extend([marker]*len(fs))

In [22]:
p = "2021-2-11-kick_markers/2021-2-12-snare_markers.wav"
nd, sr = librosa.load(p, sr=RATE)
print(len(nd))

9096083


In [23]:
noises = []
noise_window_end = RATE*30 # 30 is the number of seconds of noise we allow from start of file
for _ in range(len(frames) // 2):
    start = np.random.randint(noise_window_end-length)
    stop = start + length
    frame = nd[start:stop]
    labels.append('noise')
    frames.append(frame)
    markers.append(start)

it might be better to use ALL of the noise and balance this by multiplying the number of positive samples. It might also be better to use the noise from the same file [done here]

In [24]:
data = list(zip(frames, labels, markers))
np.random.shuffle(data)
print(len(data))

56064


In [25]:
print(set([x[1] for x in data]))

{'snare', 'noise', 'kick'}


In [26]:
train_data = data[:7*len(data)//10]    
X_train = [sample[0] for sample in train_data]
Y_train = [sample[1] for sample in train_data]
markers_train = [sample[2] for sample in train_data]

val_data = data[7*len(data)//10:8*len(data)//10]
X_val = [sample[0] for sample in val_data]
Y_val = [sample[1] for sample in val_data]
markers_val = [sample[2] for sample in val_data]

test_data = data[8*len(data)//10:]
X_test = [sample[0] for sample in test_data]
Y_test = [sample[1] for sample in test_data]
markers_test = [sample[2] for sample in test_data]

X_train = torch.tensor(X_train)
X_test = torch.tensor(X_test)
X_val = torch.tensor(X_val)

In [27]:
batch_size = 32
print(len(X_train)//batch_size, len(X_val)//batch_size, len(X_test)//batch_size)
labels = ['kick', 'snare', 'noise']

1226 175 350


In [28]:
train_data = []
for i in range(0, len(Y_train)-batch_size, batch_size):
    x = X_train[i:i+batch_size]
    y = torch.tensor([labels.index(y) for y in Y_train[i:i+batch_size]])
    train_data.append((x, y))
#     print(x.shape)
#     print(Y_train[i:i+batch_size])
#     print(y)
#     print("~~~~~~~~~~~~~~~~~~")

In [29]:
val_data = []
for i in range(0, len(Y_val)-batch_size, batch_size):
    x = X_val[i:i+batch_size]
    y = torch.tensor([labels.index(y) for y in Y_val[i:i+batch_size]])
    val_data.append((x, y))
#     print(x.shape)
#     print(Y_val[i:i+batch_size])
#     print(y)
#     print("~~~~~~~~~~~~~~~~~~")

In [30]:
test_data = []
for i in range(0, len(Y_test)-batch_size, batch_size):
    x = X_test[i:i+batch_size]
    y = torch.tensor([labels.index(y) for y in Y_test[i:i+batch_size]])
    test_data.append((x, y))
#     print(x.shape)
#     print(Y_test[i:i+batch_size])
#     print(y)
#     print("~~~~~~~~~~~~~~~~~~")

In [27]:
print(len(train_data), len(val_data), len(test_data))

1226 175 350


## Initialize model

In [88]:
n_fft = 32
input_size = 512
num_classes = 3
learning_rate = 0.01
batch_size = 32
num_epochs = 100

In [58]:
c1 = nn.Conv2d(1, 6, 3)
pool = nn.MaxPool2d(2, 2)
c2 = nn.Conv2d(6, 12, 3)
fc1 = nn.Linear(336, 256)
fc2 = nn.Linear(256, 128)
fc3 = nn.Linear(128, num_classes)

x = torch.randn(batch_size, input_size)
print(x.shape)
x = torch.abs(torch.stft(x, n_fft=32, return_complex=True))
print(x.shape)
x = torch.where(x.sum() > 0, x / torch.max(x), x)
print(x.shape)
x = x.unsqueeze(1)
print(x.shape)
x = c1(x)
print(x.shape)
x = pool(x)
print(x.shape)
x = c2(x)
print(x.shape)
x = pool(x)
print(x.shape)
x = torch.flatten(x, start_dim=1)
print(x.shape)
x = fc1(x)
print(x.shape)
x = fc2(x)
print(x.shape)
x = fc3(x)
print(x.shape)

torch.Size([32, 512])
torch.Size([32, 17, 65])
torch.Size([32, 17, 65])
torch.Size([32, 1, 17, 65])
torch.Size([32, 6, 15, 63])
torch.Size([32, 6, 7, 31])
torch.Size([32, 12, 5, 29])
torch.Size([32, 12, 2, 14])
torch.Size([32, 336])
torch.Size([32, 256])
torch.Size([32, 128])
torch.Size([32, 3])


In [59]:
class STFT_CNN(nn.Module):
    def __init__(self, num_classes):
        super(STFT_CNN, self).__init__()
        self.c1 = nn.Conv2d(1, 6, 3)
        self.pool = nn.MaxPool2d(2, 2)
        self.drop = nn.Dropout(p=0.2)
        self.c2 = nn.Conv2d(6, 12, 3)
        self.fc1 = nn.Linear(336, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, num_classes)
        
    def forward(self, x):
        x = torch.abs(torch.stft(x, n_fft=32, return_complex=True))
        x = torch.where(x.sum() > 0, x / torch.max(x), x)
        x = x.unsqueeze(1)
        
        x = self.pool(F.relu(self.c1(x)))
        x = self.drop(x)
        x = self.pool(F.relu(self.c2(x)))
        x = torch.flatten(x, start_dim=1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

In [60]:
stft_model = STFT_CNN(num_classes)

In [61]:
x = torch.randn(batch_size, input_size)
print(x.shape)
print(stft_model(x).shape)

torch.Size([32, 512])
torch.Size([32, 3])


In [89]:
criterion = nn.CrossEntropyLoss()

optimizer = optim.Adam(stft_model.parameters(), lr=learning_rate)
for epoch in range(num_epochs):
    print('.', end='')
    for batch_idx, (data, targets) in enumerate(train_data):
        scores = stft_model(data)
        loss = criterion(scores, targets)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

....................................................................................................

In [90]:
def stft_check_accuracy(data, model):
    num_correct = 0
    num_samples = 0
    model.eval()
    with torch.no_grad():
        for x, y in data:
            scores = model(x)
            _, predictions = scores.max(1)
            num_correct += (predictions == y).sum()
            num_samples += predictions.size(0)
        print(f'Got {num_correct}/{num_samples} with accuracy {float(num_correct)/float(num_samples)*100:.2f}')

In [91]:
stft_check_accuracy(train_data, stft_model)

Got 39061/39232 with accuracy 99.56


In [92]:
stft_check_accuracy(test_data, stft_model)

Got 11141/11200 with accuracy 99.47


# Test on other dataset (different microphone)

In [71]:
# Right now I'm using files that are 3072 +/- 1 samples long
directory = os.path.join('./dataset/kick')

In [72]:
kd = {}
for file in os.listdir(directory):
    if file[-3:] == 'wav':
        file_path = os.path.join(directory, file)
        d, sr = librosa.load(file_path, sr=RATE)
        kd[file] = d

In [73]:
directory = os.path.join('./dataset/snare')

In [74]:
sd = {}
for file in os.listdir(directory):
    if file[-3:] == 'wav':
        file_path = os.path.join(directory, file)
        d, sr = librosa.load(file_path, sr=RATE)
        sd[file] = d

In [75]:
noise_file = './dataset/noise_all/noise_all.wav'
nd, sr = librosa.load(noise_file, sr=RATE)
print(len(nd))

17970826


In [76]:
length = 512
earliest_start = 1000-256
latest_start   = earliest_start+128
earliest_stop  = earliest_start+length
latest_stop    = earliest_stop+128

labels = []
files = []
frames = []

In [77]:
for k in kd:
    data = kd[k]
    fs = [data[i:i+length] for i in range(earliest_start, latest_start)]
    labels.extend(['kick']*len(fs))
    files.extend([k]*len(fs))
    frames.extend(fs)

In [78]:
for k in sd:
    data = sd[k]
    fs = [data[i:i+length] for i in range(earliest_start, latest_start)]
    labels.extend(['snare']*len(fs))
    files.extend([k]*len(fs))
    frames.extend(fs)

In [79]:
noises = []
for _ in range(len(frames) // 2):
    start = np.random.randint(len(nd)-length)
    stop = start + length
    frame = nd[start:stop]
    labels.append('noise')
    files.append('noise_all.wav')
    frames.append(frame)

In [80]:
data = list(zip(frames, labels, files))
np.random.shuffle(data)
print(len(data))

12288


In [81]:
print(set([x[1] for x in data]))

{'snare', 'noise', 'kick'}


In [82]:
np.random.shuffle(data)
labels = ['kick', 'snare', 'noise']
second_test_data = data   
X_second_test = torch.tensor([sample[0] for sample in second_test_data])
Y_second_test = torch.tensor([labels.index(sample[1]) for sample in second_test_data])
files_second_test = [sample[2] for sample in second_test_data]

In [93]:
stft_model.eval()
scores = stft_model(X_second_test)

In [94]:
_, predictions = scores.max(1)
print(predictions.shape)

torch.Size([12288])


In [95]:
num_correct = (predictions == Y_second_test).sum()
# num_samples += predictions.size(0)

In [96]:
float(num_correct)/float(predictions.size(0))*100

93.51399739583334

In [97]:
x = torch.randn(batch_size, input_size)
stft_model.eval()
traced_script_module = torch.jit.trace(stft_model, x)
print(traced_script_module(x).shape)

torch.Size([32, 3])


In [98]:
traced_script_module.save("traced_model_cnn_0.pt")

The model performs at what feels like 3.5/5.0. In other words it appears to be trying but clearly needs work. Limiting contiguous repeats helps but the main issue seems to be confusion between kick and snare, especially in the direction of kick being confused for snare. Especially when used with limiting contiguous repeats, it appears that the tail of the kick is confused for a snare. It also seems that the first opportunity frames are not getting inferred on and then a wrong choice is made on the second or third opportunity. This points to possibly augmenting the data so that more left-sided (late) opportunity windows are seen in training. 