In [0]:
import io
import torch
from torchvision import models, transforms, datasets
import torch.utils.data as data
import numpy as np

import os
import os.path
import pickle
import hashlib
import librosa


In [0]:
!pip install gdown
!gdown "https://drive.google.com/uc?id=1HR28jRFwrveq5zxjkzri61jm9F4-M7p7"



Downloading...
From: https://drive.google.com/uc?id=1HR28jRFwrveq5zxjkzri61jm9F4-M7p7
To: /content/speech_commands_v0.01_with_splits.tar.gz
1.49GB [00:09, 152MB/s]


In [0]:
!gdown "https://drive.google.com/uc?id=125ghIxEQVIfOvizEU9ZhdO8kjNEta5Kc"
!tar -zxf speech_commands_cached.tar.gz

Downloading...
From: https://drive.google.com/uc?id=125ghIxEQVIfOvizEU9ZhdO8kjNEta5Kc
To: /content/speech_commands_cached.tar.gz
864MB [00:15, 56.7MB/s]


In [0]:
!ls -laorth

total 2.2G
drwxr-xr-x  1 root 4.0K Mar 27 20:25 .config
drwxr-xr-x  1 root 4.0K Mar 27 20:26 sample_data
drwxr-xr-x 32 1000 4.0K Mar 29 11:48 mini
drwxr-xr-x  1 root 4.0K Apr  2 17:54 ..
-rw-r--r--  1 root  362 Apr  2 17:56 cookie
-rw-r--r--  1 root 2.2M Apr  2 17:56 mini_gcommands.tar.gz
-rw-r--r--  1 root 1.4G Apr  2 18:08 speech_commands_v0.01_with_splits.tar.gz
-rw-r--r--  1 root 825M Apr  2 18:08 speech_commands_cached.tar.gz
drwxr-xr-x  1 root 4.0K Apr  2 18:08 .


In [0]:
!tar -zxf speech_commands_v0.01_with_splits.tar.gz

In [0]:
class AudioReader(data.Dataset):
  
    def __init__(self, list_path, add_noise=False, use_cache=True):
        
        self.list_path = list_path
        self.database_path = os.path.dirname(list_path) + '/audio/'
        self.cache_path = os.path.dirname(list_path) + '/cache/'
        self.add_noise = add_noise
        self.use_cache = use_cache

        self.target_class = {}
        self.target_class_idx_to_name = {}
        self.target_class_names = ['unknown','silence', 'yes', 'no', 'up', 'down', 'left', 'right', 'on', 'off', 'stop', 'go']
        for i, name in enumerate(self.target_class_names):
            self.target_class[name] = i
            self.target_class_idx_to_name[i] = name
        self.audio_class = {}
        self.audio_speaker = {}
        self.audios = []
                 
        self.read_list_data()
                 
        self.speaker_ids = {}
        for i, spk_id in enumerate(self.audio_speaker.values()):
            self.speaker_ids[spk_id] = i
        
        
        self.num_silence = 0
        
        if self.add_noise:
            self.background_noises_names = []
            self.background_noises = []
            for f in os.listdir(self.database_path + '/_background_noise_/'):
                if f.endswith(".wav"):
                    self.background_noises_names.append('_background_noise_/' + f)
                    print(self.background_noises_names[-1])
                    self.background_noises.append( self.load_audio(self.background_noises_names[-1]))

            self.num_silence = sum(1 for i in self.audio_class.values() if i == 'yes')
            
        self.seeded = False
        
        
    
    def read_list_data(self):
        with open(self.list_path, 'r') as stream:
            for line in stream:
                file_path = line.strip()
                file_class, file_name = file_path.split('/')
                identity = file_name.split('_')[0]
                self.audio_class[file_path] = file_class
                self.audio_speaker[file_path] = identity
                self.audios.append(file_path)
                
        
    def __len__(self):
        return len(self.audio_class) + self.num_silence


    def __getitem__(self, index):
        
        if not self.seeded:
            self.seeded = True
            np.random.seed(index)
            
        if index >= len(self.audios):
            spk_id=-1
            length = 22050
            audio = self.get_silence_chunk(length)
            target_id = self.target_class['silence']
            audio_id = 'random_silence/randomchunk.wav' 
            params = self.getParams(audio)
            
            #return [params, spk_id, target_id, audio_id]
            return [params, target_id]
        
        audio_id = self.audios[index]
        if self.use_cache==False or os.path.isfile(self.cache_path + audio_id + '.pickle') == False:
            audio = self.load_audio(audio_id)
            params = self.getParams(audio)
            if self.use_cache:
                dest_path = os.path.dirname(self.cache_path + audio_id + '.pickle')
                if not os.path.isdir(dest_path):
                    os.makedirs(dest_path)
                with open(self.cache_path + audio_id + '.pickle','wb') as stream:
                    pickle.dump(params, stream)
        else:
            with open(self.cache_path + audio_id + '.pickle','rb') as stream:
                params = pickle.load(stream)

        spk_id = self.speaker_ids[self.audio_speaker[audio_id]]
        target = self.audio_class[audio_id]
                

        if target not in self.target_class_names:
            target = 'unknown'
        target_id = self.target_class[target]
        
            
        #return [params, spk_id, target_id, audio_id]
        return [params, target_id]
        
        
    def load_audio(self, audio_name):
        
        audio_path = self.database_path + audio_name
        #fs, audio = wavfile.read(audio_path)
        audio, fs = librosa.load(audio_path)

        #audio = audio / 2**15
        return audio
                 
    def getParams(self, y, sfr=22050, window_stride=0.01, window_size=0.02, window_type='hamming', n_fft=512, normalize=True, max_len=97):
        #audio_path = self.database_path + audio_name
        #y, sfr = librosa.load(audio_path)
        win_length = int(sfr * window_size)
        hop_length = int(sfr * window_stride)
        lowfreq = 20
        highfreq = sfr/2 - 400
        S = librosa.stft(y, n_fft=n_fft, hop_length=hop_length, win_length=win_length, window=window_type, center=False)
        D = np.abs(S)
        param = librosa.feature.melspectrogram(S=D, sr=sfr, n_mels=40, fmin=lowfreq, fmax=highfreq, norm=None)
        #mfcc_default = librosa.feature.mfcc(y=y, sr=sfr, n_mfcc=40)
        #mfcc = librosa.feature.mfcc(S=librosa.power_to_db(D), n_mfcc=13)
            
        # Add zero padding to make all param with the same dims
        if param.shape[1] < max_len:
            pad = np.zeros((param.shape[0], max_len - param.shape[1]))
            param = np.hstack((pad, param))

        # If exceeds max_len keep last samples
        elif param.shape[1] > max_len:
            param = param[:, -max_len:]
        param = param.reshape(1,40,97)

        param = torch.FloatTensor(param)

        # z-score normalization
        if normalize:
            mean = param.mean()
            std = param.std()
            if std != 0:
                param.add_(-mean)
                param.div_(std)

        return param    
    
    
    def get_silence_chunk(self, length):
        i = np.random.randint(0, len(self.background_noises))
        silence = self.background_noises[i]
        max_start = silence.shape[0] - length -1
        random_start = np.random.randint(0, max_start)
        #print("Starting at", random_start )
        chunk = silence[random_start:(random_start + length)]
        return chunk
      
    def get_class_weights(self):
        class_ids = []
        for target in self.audio_class.values():
            if target not in self.target_class_names:
                target = 'unknown'
            target_id = self.target_class[target]
            class_ids.append(target_id)
        for jj in range(self.num_silence):
            class_ids.append(self.target_class['silence'])
        class_ids.append(self.target_class['unknown'])
        from sklearn.utils import class_weight
        #print(np.unique(class_ids))
        class_weight = class_weight.compute_class_weight('balanced', np.unique(class_ids),class_ids)
        class_weight = torch.from_numpy(class_weight).float()
        return class_weight
      
    def get_n_classes(self):
        return len(self.target_class_names)
    


In [0]:
train_loader = data.DataLoader(
                    AudioReader('gcommands/training_list.txt',add_noise=True), 
                        batch_size=50, shuffle=True, num_workers=2, pin_memory=True, 
                    )

valid_loader = data.DataLoader(
                    AudioReader('gcommands/validation_list.txt'), 
                        batch_size=50, shuffle=False, num_workers=2, pin_memory=True, 
                    )

test_loader = data.DataLoader(
                    AudioReader('gcommands/testing_list.txt'), 
                        batch_size=50, shuffle=False, num_workers=2, pin_memory=True, 
                    )

_background_noise_/pink_noise.wav
_background_noise_/white_noise.wav
_background_noise_/running_tap.wav
_background_noise_/dude_miaowing.wav
_background_noise_/exercise_bike.wav
_background_noise_/doing_the_dishes.wav


In [0]:

for batch in train_loader:
    break

In [0]:
X, target = batch
print(X.shape)

torch.Size([50, 1, 40, 97])


In [0]:
train_loader.dataset.get_class_weights()

tensor([0.1356, 2.3723, 2.3723, 2.3812, 2.3941, 2.3954, 2.3994, 2.3825, 2.3672,
        2.3994, 2.3408, 2.3710])

In [0]:
import math
import torch.nn as nn
import torch.nn.functional as F

class LeNet(nn.Module):
    def __init__(self, num_classes=31):
        super(LeNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 20, kernel_size=5)
        self.conv2 = nn.Conv2d(20, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(2940, 1000)
        self.fc2 = nn.Linear(1000, num_classes)

    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return x

In [0]:
#example https://github.com/GRAAL-Research/poutyne/blob/master/examples/mnist.ipynb
!pip install poutyne



In [0]:
!pip install torchsummary



In [0]:
import torch.nn as nn
import torch.optim as optim
from poutyne.framework import Model
from torchsummary import summary
cuda_device = 0
device = torch.device("cuda:%d" % cuda_device if torch.cuda.is_available() else "cpu")
                      
mymodel = LeNet(num_classes = train_loader.dataset.get_n_classes())
#mymodel = TDNN(num_classes = train_loader.dataset.get_n_classes())
#mymodel = VGG('VGG11',num_classes = train_loader.dataset.get_n_classes())
#mymodel = MyVGG(num_classes = train_loader.dataset.get_n_classes())
#mymodel = MyVGGUpsample(num_classes = train_loader.dataset.get_n_classes())
print(mymodel.to(device))
summary(mymodel, input_size=(1, 40, 97))
learning_rate = 0.001
# Optimizer and loss function
#optimizer = optim.SGD(mymodel.parameters(), lr=learning_rate, weight_decay=0.001)
#optimizer = optim.Adam(mymodel.parameters(), lr=learning_rate)
optimizer = optim.Adam( filter(lambda p: p.requires_grad, mymodel.parameters()), lr=learning_rate )
loss_function = nn.CrossEntropyLoss(weight=train_loader.dataset.get_class_weights())


model = Model(mymodel, optimizer, loss_function, metrics=['accuracy'])

# Send model on GPU
model.to(device)

model.fit_generator(train_loader, valid_loader, epochs=1)

for param in mymodel.parameters():
    param.requires_grad = True
    
optimizer = optim.Adam( filter(lambda p: p.requires_grad, mymodel.parameters()), lr=learning_rate )
loss_function = nn.CrossEntropyLoss(weight=train_loader.dataset.get_class_weights())


model = Model(mymodel, optimizer, loss_function, metrics=['accuracy'])

# Send model on GPU
model.to(device)

model.fit_generator(train_loader, valid_loader, epochs=10)


 # Test
test_loss, test_acc = model.evaluate_generator(test_loader)
print('Test:\n\tLoss: {}\n\tAccuracy: {}'.format(test_loss, test_acc))

LeNet(
  (conv1): Conv2d(1, 20, kernel_size=(5, 5), stride=(1, 1))
  (conv2): Conv2d(20, 20, kernel_size=(5, 5), stride=(1, 1))
  (conv2_drop): Dropout2d(p=0.5)
  (fc1): Linear(in_features=2940, out_features=1000, bias=True)
  (fc2): Linear(in_features=1000, out_features=12, bias=True)
)
----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1           [-1, 20, 36, 93]             520
            Conv2d-2           [-1, 20, 14, 42]          10,020
         Dropout2d-3           [-1, 20, 14, 42]               0
            Linear-4                 [-1, 1000]       2,941,000
            Linear-5                   [-1, 12]          12,012
Total params: 2,963,552
Trainable params: 2,963,552
Non-trainable params: 0
----------------------------------------------------------------
Input size (MB): 0.01
Forward/backward pass size (MB): 0.70
Params size (MB): 11.31
Estimated Total Size (MB): 12.02
----

In [0]:
 # Test
test_loss, test_acc = model.evaluate_generator(test_loader)
print('Test:\n\tLoss: {}\n\tAccuracy: {}'.format(test_loss, test_acc))

In [0]:
for p in filter(lambda p: p.requires_grad, mymodel.parameters()):
  print(p)

Parameter containing:
tensor([[-0.0101,  0.0063,  0.0089,  ...,  0.0052, -0.0070,  0.0103],
        [-0.0115, -0.0152, -0.0133,  ..., -0.0118, -0.0284, -0.0201],
        [-0.0128,  0.0035,  0.0087,  ...,  0.0211, -0.0018, -0.0268],
        ...,
        [ 0.0137,  0.0242, -0.0078,  ...,  0.0081, -0.0040,  0.0002],
        [-0.0064, -0.0052,  0.0116,  ...,  0.0010,  0.0238,  0.0113],
        [ 0.0118,  0.0138, -0.0226,  ...,  0.0075, -0.0243, -0.0253]],
       device='cuda:0', requires_grad=True)
Parameter containing:
tensor([-0.0181, -0.0250, -0.0204,  ..., -0.0225, -0.0211, -0.0100],
       device='cuda:0', requires_grad=True)
Parameter containing:
tensor([[-0.0046,  0.0052, -0.0101,  ..., -0.0130, -0.0116, -0.0010],
        [-0.0029,  0.0023, -0.0067,  ...,  0.0043,  0.0054, -0.0095],
        [ 0.0107,  0.0035,  0.0114,  ..., -0.0126,  0.0160,  0.0016],
        ...,
        [ 0.0041,  0.0075, -0.0055,  ..., -0.0203,  0.0002, -0.0007],
        [ 0.0043, -0.0104,  0.0129,  ...,  0.0165,

In [0]:
import torch.nn as nn
import torch.nn.functional as F

class TDNN(nn.Module):
    def __init__(self, num_classes=12):
        super(TDNN, self).__init__()
        self.tdnn = nn.Sequential(
            nn.Conv1d(40, 450, stride=1, dilation=1, kernel_size=3),
            nn.ReLU(True),
            nn.Conv1d(450, 450, stride=1, dilation=1, kernel_size=4),
            nn.ReLU(True),
            nn.Conv1d(450, 450, stride=1, dilation=3, kernel_size=3),
            nn.ReLU(True),
            nn.Conv1d(450, 450, stride=1, dilation=3, kernel_size=3),
            nn.ReLU(True),
            nn.Conv1d(450, 450, stride=1, dilation=3, kernel_size=3),
            nn.ReLU(True),
            nn.Conv1d(450, 450, stride=1, dilation=3, kernel_size=3),
            nn.ReLU(True),
            nn.Conv1d(450, 450, stride=1, dilation=3, kernel_size=3),
            nn.ReLU(True),
            nn.MaxPool1d(3, stride=3),
        )
        self.classifier = nn.Sequential(
            nn.Linear(9000, 4096),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(4096, num_classes),
        )

    def forward(self, x):
        x.squeeze_(1)
        x = self.tdnn(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x

In [0]:
import torch.nn as nn
import torch.nn.functional as F

#https://github.com/pytorch/vision/blob/master/torchvision/models/vgg.py
class VGG(nn.Module):

    def __init__(self, vgg_name, num_classes=12):
        super(VGG, self).__init__()
        self.features = make_layers(cfg[vgg_name])
        self.classifier = nn.Sequential(
            nn.Linear(1 * 3 * 512, 4096),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.ReLU(True),
            nn.Dropout(),
            nn.Linear(4096, num_classes),
        )
        self._initialize_weights()

    def forward(self, x):
        x = self.features(x)
        print(x.shape)
        x = x.view(x.size(0), -1)
        print(x.shape)
        x = self.classifier(x)
        return x

    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, math.sqrt(2. / n))
                if m.bias is not None:
                    m.bias.data.zero_()
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()
            elif isinstance(m, nn.Linear):
                m.weight.data.normal_(0, 0.01)
                m.bias.data.zero_()


def make_layers(cfg, batch_norm=True):
    layers = []
    in_channels = 1
    for v in cfg:
        if v == 'M':
            layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
        else:
            conv2d = nn.Conv2d(in_channels, v, kernel_size=3, padding=1)
            if batch_norm:
                layers += [conv2d, nn.BatchNorm2d(v), nn.ReLU(inplace=True)]
            else:
                layers += [conv2d, nn.ReLU(inplace=True)]
            in_channels = v
    return nn.Sequential(*layers)


cfg = {
    'VGG11': [64, 'M', 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
    'VGG13': [64, 64, 'M', 128, 128, 'M', 256, 256, 'M', 512, 512, 'M', 512, 512, 'M'],
    'VGG16': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 'M', 512, 512, 512, 'M', 512, 512, 512, 'M'],
    'VGG19': [64, 64, 'M', 128, 128, 'M', 256, 256, 256, 256, 'M', 512, 512, 512, 512, 'M', 512, 512, 512, 512, 'M'],
}

In [0]:
import torchvision.models as models
import torch.nn as nn
import torch.nn.functional as F

class MyVGG(nn.Module):
    def __init__(self, num_classes=12):
          super(MyVGG, self).__init__()
          self.origVGG = models.vgg11()
          for param in self.origVGG.parameters():
              param.requires_grad = False
          self.origVGG.classifier = nn.Sequential(
              nn.Linear(1 * 3 * 512, 4096),
              nn.ReLU(True),
              nn.Dropout(),
              nn.Linear(4096, 4096),
              nn.ReLU(True),
              nn.Dropout(),
              nn.Linear(4096, num_classes),
          )
          self.conv1 = nn.Conv2d(1, 3, kernel_size=3, padding = 1)
          
    def forward(self, x):
        x = self.conv1(x)
        #print(x.shape)
        x = self.origVGG.features(x)
        #print(x.shape)
        x = x.view(x.size(0), -1)
        #print(x.shape)
        x = self.origVGG.classifier(x)
        #print(x.shape)
        return x

In [0]:
import torchvision.models as models
import torch.nn as nn
import torch.nn.functional as F

class MyVGGUpsample(nn.Module):
    def __init__(self, num_classes=12):
          super(MyVGGUpsample, self).__init__()
          self.origVGG = models.vgg11()
          for param in self.origVGG.parameters():
              param.requires_grad = False
          
          removed = list(self.origVGG.classifier.children())[:-4]
          composed = removed + [nn.Linear(4096, 4096), nn.ReLU(True), nn.Dropout(), nn.Linear(4096, num_classes)]
          self.origVGG.classifier = nn.Sequential(*composed)
          
          #self.conv1 = nn.Conv2d(1, 3, kernel_size=3, padding = 1)
          self.upsample1 = nn.ConvTranspose2d(in_channels = 1, out_channels = 1, kernel_size = (4,1), stride = (2,1), padding = 0)
          self.upsample2 = nn.ConvTranspose2d(in_channels = 1, out_channels = 3, kernel_size = (6,4), stride = (3,2), padding = 0)
          
    def forward(self, x):
        x = self.upsample2(self.upsample1(x))
        x = self.origVGG.forward(x)
        return x