Make sure you are on the correct conda environment (sound).

In [None]:
!conda info

In [None]:
!dir

In [None]:
#some basic imports
import numpy as np
import scipy as sp
import matplotlib
import matplotlib.pyplot as plt
import math

# Change this to 'cpu' if your machine doesn't have cuda capability.
device = 'cuda'

### Load an audio file
Import code for loading wav files from disk.

In [None]:
from code.lib.mel_features import read_wav
# the code below allows us to convert the wav's sample rate to a desired sample rate; 8kHz frequency maximum is OK for us
wav_sample_rate=16000
wav1 = read_wav("wavs/Motorcycle.wav", target_sample_rate=wav_sample_rate)
wav2 = read_wav("wavs/Yakov_Golman_-_10_-_Valse.wav", target_sample_rate=wav_sample_rate)

wav1_time = len(wav1)/wav_sample_rate
wav2_time = len(wav2)/wav_sample_rate

In [None]:
print("Audio sample length: "+str(len(wav1)/wav_sample_rate)+" seconds")

In [None]:
# alternatively, you could use scipy.io and its wavfile 
# (but we won't use this here, because there are troubles with playback in IPython)
# see https://colab.research.google.com/drive/1--xY78_ZTFwpI7F2ZfaeyFKiAOG2nkwd

from scipy.io import wavfile
wav1_scipy = wavfile.read("wavs/Motorcycle.wav")

# Separete the object elements
framerate = wav1_scipy[0]
sounddata = wav1_scipy[1]
time      = np.arange(0,len(sounddata))/framerate

# Show information about the object
print('Sample rate:',framerate,'Hz')
print('Total time:',len(sounddata)/framerate,'s')

In [None]:
# Generate a player for the (mono) sound, play it back
from IPython.display import Audio
Audio(wav1,rate=wav_sample_rate)

#### &lt;back to the presentation&gt;

### Visualize / represent the data

#### 1) as a wave

In [None]:
# enable plotting inside the notebook
%matplotlib inline

In [None]:
xlabels = [x/wav_sample_rate for x in range(len(wav1))]
plt.figure(figsize=(16, 4))
plt.plot(xlabels, wav1)
plt.ylabel('amplitude')
plt.xlabel('time [s]')
plt.show()

In [None]:
# let's visualize just the first second
plt.figure(figsize=(16, 4))
plt.plot(xlabels[:wav_sample_rate], wav1[:wav_sample_rate])
plt.ylabel('amplitude')
plt.xlabel('time [s]')
plt.show()

In [None]:
# and the last second
plt.figure(figsize=(16, 4))
plt.plot(xlabels[-wav_sample_rate:], wav1[-wav_sample_rate:])
plt.ylabel('amplitude')
plt.xlabel('time [s]')
plt.show()

#### &lt;back to the presentation&gt;

#### 2) as spectrum and phase

First compute fourier transform. Below is code that computes FFT for an example series - a 5Hz sine wave.

In [None]:
# inspired by https://plot.ly/matplotlib/fft/

# first show FFT of a 5Hz signal, duration 1 second:
five_hz_sine_sample_rate = 400.0;  # sampling rate
ts = 1.0/five_hz_sine_sample_rate; # sampling interval
t = np.arange(0,1,ts) # time vector

ff = 5;   # frequency of the signal
five_hz_sine = np.sin(2*np.pi*ff*t)

def compute_fft(data, times, sampling_rate):
    n = len(data)
    total_time = n/sampling_rate
    frq = sp.linspace(0, sampling_rate, n)
    frq = frq[range(n//2)] # one side frequency range

    fft = np.fft.fft(data)/n # fft computing and normalization
    fft = fft[range(n//2)]

    fig, ax = plt.subplots(3, 1, figsize=(8, 10))
    ax[0].plot(times,data)
    ax[0].set_xlabel('Time')
    ax[0].set_ylabel('Amplitude')
    ax[1].plot(frq,abs(fft),'r') # plotting the magnitudes
    ax[1].set_xlabel('Freq (Hz)')
    ax[1].set_ylabel('magnitude')
    ax[2].plot(frq,np.angle(fft),'g') # plotting the phase
    ax[2].set_xlabel('Freq (Hz)')
    ax[2].set_ylabel('phase')

    plt.show()
    # note that the frequency is 0.5 and not 1 since we're showing 
    # only the positive frequencies of the FFT

    print("maximum frequency magnitude: " + str(np.amax(abs(fft))) + 
          " at frequency: "+str(np.argmax(abs(fft))/total_time) + " Hz and " +
          "phase: "+str(np.angle(fft[np.argmax(abs(fft))])))
    print("value of FT at the maximum magnitude: "+str(fft[np.argmax(abs(fft))]))
    
    return fft
    
example_fft = compute_fft(five_hz_sine, t, five_hz_sine_sample_rate)

The following is the fft of our sound sample. Notice the peak at the low frequencies.

In [None]:
wav1_fft = compute_fft(wav1, xlabels, wav_sample_rate)

#### &lt;back to the presentation&gt;

Spectrum of a signal expressed using color coding:

In [None]:
# Let's plot the spectrogram magnitude data as a 1-D color-coded graph:

xx, yy = sp.meshgrid(
    sp.linspace(0,1, 2),
    sp.linspace(0,wav_sample_rate//2, len(wav1_fft))
    )

zz = sp.zeros(xx.shape)
for i in range(xx.shape[0]):
    for j in range(xx.shape[1]):
        zz[i,j] = abs(wav1_fft[i])

# plot the calculated function values
fig, ax = plt.subplots(1,figsize=(1, 8),sharey='all')

p = ax.pcolormesh(xx,yy,zz,cmap='viridis')

# and a color bar to show the correspondence between function value and color
fig.colorbar(p)

plt.show() 

In [None]:
# plot the calculated function values, log values
fig, ax = plt.subplots(1,figsize=(1, 8),sharey='all')

p = ax.pcolormesh(xx,yy,np.log(zz+0.00001),cmap='viridis')

# and a color bar to show the correspondence between function value and color
fig.colorbar(p)

plt.show() 

#### 3) as a spectrogram 

fft computed over short-length time windows, a.k.a. short-time FT (STFT)

In [None]:
from code.lib.mel_features import spectrogram

def print_spectrogram(spect, time, sample_rate, fig=None, ax=None, show_bar=True):
    xx, yy = sp.meshgrid(
        sp.linspace(0,time, len(spect)),
        sp.linspace(0,sample_rate/2, len(spect[0]))
        )

    zz = sp.zeros(xx.shape)
    for i in range(xx.shape[1]):
        for j in range(xx.shape[0]):
            zz[j,i] = spect[i,j]

    if fig is None:
        # plot the calculated function values
        fig, ax = plt.subplots(1,figsize=(8, 8))
    
    p = ax.pcolormesh(xx,yy,zz)  # can use different colormaps, e.g. cmap='jet'

    if show_bar:
        # and a color bar to show the correspondence between function value and color
        fig.colorbar(p)

In [None]:
spect = spectrogram(five_hz_sine, sample_rate=five_hz_sine_sample_rate, logarithmic=True, window_length_secs=0.1)
# notice how the window length_secs has an effect on the localization of frequencies. Try using 0.5 or 0.9

print_spectrogram(spect, 1.0, five_hz_sine_sample_rate)

In [None]:
spect = spectrogram(wav1, sample_rate=wav_sample_rate, logarithmic=False)
  
print_spectrogram(spect, wav1_time, wav_sample_rate)

In [None]:
spect = spectrogram(wav1, sample_rate=wav_sample_rate, logarithmic=True)

print_spectrogram(spect, wav1_time, wav_sample_rate)

#### &lt;back to the presentation&gt;

#### 4) as a mel spectrogram (computes a spectrogram, then converts it to mel)

In [None]:
from code.lib.mel_features import mel_spectrogram

mel_sp = mel_spectrogram(wav1, 
                         sample_rate=wav_sample_rate, 
                         log_offset=0.01, 
                         window_length_secs=0.1, 
                         hop_length_secs=0.01, 
                         logarithmic=True,
                         num_mel_bins=256,
                         lower_edge_hertz=10,
                         upper_edge_hertz=8000)

print_spectrogram(mel_sp, wav1_time, wav_sample_rate)
# warning: the numbers on the Y axis no longer match the real frequencies

Cool visualizations may be achieved using external tools, like [Sonic Visualizer](https://www.sonicvisualiser.org/)

#### &lt;back to the presentation&gt;

#### 5) as a wavelet

In [None]:
from code.lib.wavelet_transform import continuous_wavelet_transform

In [None]:
# wavelet transform on the 5Hz signal:
wavelet_five_hz = continuous_wavelet_transform(five_hz_sine)
wavelet_five_hz_print = np.transpose(np.flipud(wavelet_five_hz))

#hack to make the data visible in the plot (it's 1D otherwise):
wavelet_five_hz_print = wavelet_five_hz_print + wavelet_five_hz_print

print_spectrogram(wavelet_five_hz_print + wavelet_five_hz_print, 1.0, five_hz_sine_sample_rate)

#ignore the future warning below, it has been fixed in the newest fftpack

The y labels are not accurate - they no longer refer to frequency, but to scale and position used by the wavelet transform.

In [None]:
# Warning: This uses a lot of RAM!

# wavelet_wav = continuous_wavelet_transform(wav1)

# wavelet_wav_print = np.transpose(np.flipud(wavelet_wav))


# print_spectrogram(wavelet_wav_print, wav1_time, wav_sample_rate)

#### &lt;back to the presentation&gt;

### Feature extraction

An example using pyAudioAnalysis library [for feature extraction](https://github.com/tyiannak/pyAudioAnalysis/wiki/3.-Feature-Extraction). For higher-performance models, we'd recommend feature extraction using VGGish.

In [None]:
from code.lib.pyAudioAnalysis import audioBasicIO
from code.lib.pyAudioAnalysis import audioFeatureExtraction

Fs = wav_sample_rate
data = wav1

features, f_names = audioFeatureExtraction.stFeatureExtraction(data, Fs, 0.050*Fs, 0.025*Fs);
# features now contain an array of 32 standard audio features
# see https://github.com/tyiannak/pyAudioAnalysis/wiki/3.-Feature-Extraction
plt.subplot(2,1,1); plt.plot(features[0,:]); plt.xlabel('Frame no'); plt.ylabel(f_names[0]); 
plt.subplot(2,1,2); plt.plot(features[1,:]); plt.xlabel('Frame no'); plt.ylabel(f_names[1]); 

plt.show()

VGGish feature extraction. For more information, check out [this colab notebook](https://colab.research.google.com/drive/1TbX92UL9sYWbdwdGE0rJ9owmezB-Rl1C#scrollTo=9BKF-1dzDhnz).

In [None]:
# first make sure your vggish is installed correctly
from code.lib.vggish.vggish_smoke_test import *

Let's go through the smoke test bit by bit, because it actually loads a trained network and uses it to compute an embedding of a test signal.

In [None]:
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

# modified by GoodAI, 2019

from __future__ import print_function

import numpy as np
import tensorflow as tf

import code.lib.vggish.vggish_input as vggish_input
import code.lib.vggish.vggish_params as vggish_params
import code.lib.vggish.vggish_postprocess as vggish_postprocess
import code.lib.vggish.vggish_slim as vggish_slim

# Paths to downloaded VGGish files.
checkpoint_path = 'code/lib/vggish/vggish_model.ckpt'
pca_params_path = 'code/lib/vggish/vggish_pca_params.npz'

# Relative tolerance of errors in mean and standard deviation of embeddings.
rel_error = 0.1  # Up to 10%

# Generate a 1 kHz sine wave at 44.1 kHz (we use a high sampling rate
# to test resampling to 16 kHz during feature extraction).
num_secs = 3
freq = 1000
sr = 44100
t = np.linspace(0, num_secs, int(num_secs * sr))
x = np.sin(2 * np.pi * freq * t)

In [None]:
vggish_params.EXAMPLE_HOP_SECONDS = 0.01  # hop by 0.01s

# Produce a batch of log mel spectrogram examples.
input_batch = vggish_input.waveform_to_examples(x, sr)
#input_batch = vggish_input.waveform_to_examples(wav1[0:160000], wav_sample_rate)  # only 10 seconds to limit memory

# Define VGGish, load the checkpoint, and run the batch through the model to
# produce embeddings.
with tf.Graph().as_default(), tf.Session() as sess:
    vggish_slim.define_vggish_slim()
    vggish_slim.load_vggish_slim_checkpoint(sess, checkpoint_path)

    features_tensor = sess.graph.get_tensor_by_name(
        vggish_params.INPUT_TENSOR_NAME)
    embedding_tensor = sess.graph.get_tensor_by_name(
        vggish_params.OUTPUT_TENSOR_NAME)
    [embedding_batch] = sess.run([embedding_tensor],
                                 feed_dict={features_tensor: input_batch})
    # print('VGGish embedding: ', embedding_batch[0])

    # Postprocess the results to produce whitened quantized embeddings.
    pproc = vggish_postprocess.Postprocessor(pca_params_path)
    postprocessed_batch = pproc.postprocess(embedding_batch)

print('Postprocessed VGGish embedding (first item): ', postprocessed_batch[0], '\ntotal items: ', len(postprocessed_batch))

# End of modified vggish_smoke_test.

There are 203 items for a 3-second audio, because an embedding is computed for 0.96s slices with hop length equal to 0.01s and minimal sample length equal to 0.98s.

#### &lt;back to the presentation&gt;

### Simple classifiers

SVM, SVM+RBF, RandomForest, ...

In [None]:
from code.lib.pyAudioAnalysis import audioTrainTest as aT

# calls sklearn.svm.SVC with a linear kernel or an rbf kernel
# or sklearn.ensemble.RandomForestClassifier

wav1_slices = [wav1[i*16000:(i+1)*16000] for i in range(0,20)]
wav2_slices = [wav2[i*16000:(i+1)*16000] for i in range(0,20)]
aT.featureWavAndTrain([wav1_slices, wav2_slices],
                      wav_sample_rate,
                      ["motorcycle","piano"],
                      1.0, 
                      1.0, 
                      aT.shortTermWindow, 
                      aT.shortTermStep, 
                      "svm", # or "svm_rbf" or "randomforest" or other...
                      "svmMusicGenre3", 
                      True)


#### &lt;back to the presentation&gt;

### Slicing

In [None]:
def slice_spectrogram(spectrogram, width):
    right = width
    slices = []
    while right <= spectrogram.shape[0]:
        slices.append(spectrogram[right-width:right, :])
        right += width
    
    return slices

In [None]:
mel_sp2 = mel_spectrogram(wav2, 
                         sample_rate=wav_sample_rate, 
                         log_offset=0.01, 
                         window_length_secs=0.1, 
                         hop_length_secs=0.01, 
                         logarithmic=True,
                         num_mel_bins=256,
                         lower_edge_hertz=10,
                         upper_edge_hertz=8000)
print_spectrogram(mel_sp2, wav2_time, wav_sample_rate)

In [None]:
# Create slices of the whole-sample spectrogram.
slices_1 = slice_spectrogram(mel_sp, width=256)
slices_2 = slice_spectrogram(mel_sp2, width=256)

all_slices = slices_1 + slices_2

In [None]:
# Show a couple of slices.
n_to_show = 4
fig, ax = plt.subplots(1, n_to_show, sharey=True, figsize=(14, 8)) #, figsize=(8, 10))
for i, (s, ax_) in enumerate(zip(slices_2[:n_to_show], ax)):
    print_spectrogram(s, 256, wav_sample_rate, fig, ax_, show_bar=i==0)

#### &lt;back to the presentation&gt;

### NN classifiers

### Fully connected single layer feedforward network

In [None]:
# Extract features for the network.

def extract_features(slice_data, n_features, eps=0.0001):
    # Normalize the slice to (mean = 0, std = 1).
    data_std = max(np.std(slice_data, ddof=1), eps)
    
    slice_data = (slice_data - np.mean(slice_data)) / data_std
    
    # Get frequencies present in each line of the spectrogram (rfft computes dft on real input).
    freqs = np.fft.fft(slice_data)
    
    # Get magnitude of the frequencies present (absolute value of a complex number).
    # rfft returns the result multiplied by the shape, divide.
    magnitudes = np.abs(freqs) / slice_data.shape[1]
    
    # Take only first n_features.
    result = magnitudes[..., :n_features].astype(np.float32)
    return result

n_features = 20
data = [extract_features(slice_data, n_features) for slice_data in all_slices]

In [None]:
# Prepare the labels.
labels_1 = [0 for _ in range(len(slices_1))]
labels_2 = [1 for _ in range(len(slices_2))]

labels = labels_1 + labels_2

n_classes = len(set(labels))

In [None]:
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split

# Create the train/test split.
indices = list(range(len(data)))

# Q: Notice a problem here?
training_indices, testing_indices = train_test_split(indices, test_size=0.2)

In [None]:
import torch

training_data = torch.tensor(np.array([data[i] for i in training_indices]))
training_labels = torch.tensor(np.array([labels[i] for i in training_indices]), dtype=torch.long)
testing_data = torch.tensor(np.array([data[i] for i in testing_indices]))
testing_labels = torch.tensor(np.array([labels[i] for i in testing_indices]), dtype=torch.long)

training_dataset = TensorDataset(training_data, training_labels)
testing_dataset = TensorDataset(testing_data, testing_labels)

In [None]:
# Calculate class weights for the loss function.
class_weights = [len(cls_labels)/len(labels) for cls_labels in (labels_1, labels_2)]
print(f"Class weights:\t{class_weights}")
print(f"Testing labels:\t{testing_labels}")

class_weights_tensor = torch.tensor(class_weights, device=device, dtype=torch.float32)

In [None]:
class FullyConnectedNet(torch.nn.Module):
    """Fully connected feed-forward neural network. """
    def __init__(self, input_height: int, n_classes: int, n_features: int):
        super().__init__()

        # Create the fully connected layer.
        self.output = torch.nn.Linear(input_height * n_features, n_classes)

    def forward(self, data):
        data = data.view(data.size(0), -1)  # View as (batch_size, n_features).
        out = self.output(data)

        return out

In [None]:
model = FullyConnectedNet(input_height=256, n_classes=n_classes, n_features=n_features)

criterion = torch.nn.CrossEntropyLoss(weight=class_weights_tensor)

optimizer = torch.optim.Adam(model.parameters(), lr=0.4)

In [None]:
from tqdm import tqdm

def train(model, criterion, optimizer, training_dataset):
    model.to(device)
    model.train()

    training_loader = DataLoader(dataset=training_dataset, batch_size=5, shuffle=True)
    
    with tqdm(training_loader) as progress_bar:
        for slices, labels in progress_bar:
            slices = slices.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            outputs = model(slices)

            # The criterion is CrossEntropyLoss, which is log-softmax and negative log-likelihood.
            loss = criterion(outputs, labels)

            loss.backward()
            optimizer.step()

In [None]:
def test(model, testing_dataset):
    model.to(device)
    model.eval()

    testing_loader = DataLoader(dataset=testing_dataset, batch_size=1, shuffle=False)

    softmax = torch.nn.Softmax(dim=1)
    all_predictions = []
    all_labels = []

    for slices, labels in testing_loader:
        slices = slices.to(device)

        labels = labels.cpu().numpy()
        outputs = softmax(model(slices)).detach().cpu().numpy()
        
        all_predictions.extend(outputs)
        all_labels.extend(labels)

    return all_predictions, all_labels

In [None]:
train(model, criterion, optimizer, training_dataset)

In [None]:
def print_result(all_predictions, all_labels):
    best_guess = np.argmax(all_predictions, axis=1)

    print(f"Predicted:\t{best_guess}")
    print(f"Ground truth:\t{np.array(all_labels)}")

In [None]:
all_predictions, all_labels = test(model, testing_dataset)
print("Predictions:")
for p in all_predictions:
    print(p)
    
print()
print_result(all_predictions, all_labels)

#### &lt;back to the presentation&gt;

### ResNet - transfer learning

In [None]:
# ResNet is originally trained on images with 3 channels (RGB). We will simply expand the one channel that we have.
slices_expanded = [torch.tensor(s, dtype=torch.float32).view(1, 256, 256).expand(3, 256, 256) for s in all_slices]

training_images = torch.stack([slices_expanded[i] for i in training_indices])
testing_images = torch.stack([slices_expanded[i] for i in testing_indices])

training_dataset_images = TensorDataset(training_images, training_labels)
testing_dataset_images = TensorDataset(testing_images, testing_labels)

In [None]:
from torchvision.models.resnet import resnet18

resnet = resnet18(pretrained=True)
resnet.fc = torch.nn.Linear(resnet.fc.in_features, n_classes)

resnet.avgpool = torch.nn.AdaptiveAvgPool2d((1, 1))  # Fix copied from torchvision master.

optimizer = torch.optim.SGD(resnet.parameters(), lr=0.1, momentum=0.9)

In [None]:
for _ in range(2):
    # The criterion is the same as before - class-weighted cross entropy loss.
    train(resnet, criterion, optimizer, training_dataset_images)

In [None]:
all_predictions, all_labels = test(resnet, testing_dataset_images)
print_result(all_predictions, all_labels)

#### &lt;back to the presentation&gt;

### Cross-validation

In [None]:
from sklearn.model_selection import StratifiedKFold, KFold
from sklearn.metrics import confusion_matrix
from sklearn import datasets

iris = datasets.load_iris()

kfold = StratifiedKFold(n_splits=4)
splits = kfold.split(iris.data, iris.target)

# We're going to collect per-class recall values.
recalls = []
for train_indices, test_indices in splits:
    X_train, y_train = iris.data[train_indices], iris.target[train_indices]
    X_test, y_test = iris.data[test_indices], iris.target[test_indices]
    
    model = FullyConnectedNet(input_height=256, n_classes=n_classes, n_features=n_features)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.4)
    
    # Train and test the model.
    # The criterion is the same as before - class-weighted cross entropy loss.
    train(model, criterion, optimizer, training_dataset)
    all_predictions, all_labels = test(model, testing_dataset)
    
    print_result(all_predictions, all_labels)
    
    # Calculate per-class recall.
    cm = confusion_matrix(np.array(all_labels), np.argmax(all_predictions, axis=1))
    per_class_rec = []
    for c in range(n_classes):
        recall = cm[c][c] / cm[c].sum()  # True positives/all samples from c.
        per_class_rec.append(recall)
        
    recalls.append(per_class_rec)

In [None]:
for class_recalls in recalls:
    print(class_recalls)

In [None]:
means = []
for c in range(n_classes):
    recs = [rec[c] for rec in recalls]
    print(f"Class {c}:")
    mean = np.mean(recs)
    means.append(mean)
    print(f"\tRecall mean:\t\t{mean}")
    print(f"\tRecall stddev:\t{np.std(recs)}")

In [None]:
print(f"Class weights: {class_weights}")
class_weighted_recall = sum(mean*weight for mean, weight in zip(means, class_weights))
print(f"Class-weighted recall: {class_weighted_recall}")

#### &lt;back to the presentation&gt;