In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

In [None]:
# !pwd

In [3]:
%load_ext autoreload
%autoreload 2

In [4]:
import warnings
warnings.filterwarnings('ignore')

In [5]:
import os
import numpy as np
import pandas as pd
import numpy as np
# import cv2

import matplotlib.pyplot as plt
import IPython.display as ipd
from tqdm import tqdm

import torch
import torch.nn as nn

from torch.optim import Adam
from torch.optim.lr_scheduler import ExponentialLR
from torch.utils.data import Dataset, DataLoader
# from torchvision.models import resnet34
import torchvision.transforms as transforms


import librosa
from librosa.display import specshow
from librosa.display import waveshow
from librosa.feature import melspectrogram
from librosa.feature import mfcc

###### Exploratory analysis

###### Case study

In [22]:
"""
Wave & features
"""
def transform_image(resize_shape=(32, 32)):
    data_transform = transforms.Compose([
            transforms.Resize(resize_shape),
            transforms.ToTensor()
            ])
    return data_transform
    

def spec_to_image(spec, eps=1e-6):
    mean = spec.mean()
    std = spec.std()
    spec_norm = (spec - mean) / (std + eps)
    spec_min, spec_max = spec_norm.min(), spec_norm.max()
    spec_scaled = 255 * (spec_norm - spec_min) / (spec_max - spec_min)
    spec_scaled = spec_scaled.astype(np.uint8)
    return spec_scaled


def get_wave(path, sr=None):
    wav, sr = librosa.load(path, sr=sr)
    return wav, sr


def get_feature(wav,
                data_transform=None,
                power_to_db=True,
                to_img=True,
                feature=melspectrogram,
                sr=None,
                n_fft=2048, 
                hop_length=512, 
                n_mels=128, 
                fmin=20, 
                fmax=8300, 
                top_db=80):
    
    spec = feature(wav, 
                  sr=sr,
                  n_fft=n_fft,
                  hop_length=hop_length,
                  n_mels=n_mels,
                  fmin=fmin,
                  fmax=fmax)

    if power_to_db:
        spec = librosa.power_to_db(spec, top_db=top_db)
    
    if to_img:
        spec = spec_to_image(spec)
    
    if data_transform is not None:
        spec = Image.fromarray(spec)
        spec = data_transform(spec)    
        
    return spec



"""
Plot
"""
def plot_audio(path):
    return ipd.Audio(path)


def plot_wave(wav, title="wave"):
    waveshow(wav, x_axis='time')
    plt.title(title)
    plt.show()
    
    
def plot_feature(feature, title="melspectrogram"): ### ???
    specshow(feature, x_axis='time', y_axis='hz')  ### ???
    plt.colorbar()
    plt.title(title)
    plt.show()
    

    
"""
Case study
"""
def audio_info(path, 
               sr=None, 
               features_dct={"melspectrogram": melspectrogram,
                             "mfcc": mfcc}):

    wav, sr = librosa.load(path, sr=sr)
    plot_wave(wav)
    for name, feature in features_dct.items():
        f = get_feature(wav, sr=sr, feature=feature)
        plot_feature(f, title=name)
    return plot_audio(path)

In [25]:
wav, sr = get_wave('../ESC-50-master/audio/1-100032-A-0.wav')
spec = get_feature(wav, sr=sr, to_img=True, data_transform=transform_image(resize_shape=(32, 32)))

In [26]:
spec

tensor([[[0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         ...,
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.],
         [0., 0., 0.,  ..., 0., 0., 0.]]])

In [27]:
spec.shape

torch.Size([1, 32, 32])

In [29]:
# Image.fromarray(spec)

In [None]:
X_float = get_feature(wav, sr=sr, to_img=False)

In [None]:
X_float

In [None]:
X_float.shape

In [None]:
X = get_feature(wav, sr=sr, to_img=True)

In [None]:
X

In [None]:
X.shape

In [None]:
X

In [None]:
plt.imshow(X, origin='lower')
# plt.axis("off")
plt.show()

In [None]:
plt.imshow(X_float, origin='lower')
# plt.axis("off")
plt.show()

In [None]:
_X = get_feature(wav, sr=sr, to_img=True)

In [None]:
plt.imshow(_X)

In [None]:
_X[:100, :150]

In [None]:
# audio_info('drive/MyDrive/DEEPSOUND/ESC-50-master/audio/1-100032-A-0.wav')
audio_info('../ESC-50-master/audio/1-100032-A-0.wav')

###### Construct Datasets

In [12]:
"""
DF
"""
def _rename_targets(df):
    map_dct = dict()
    for i, target in enumerate(df.target.unique()):
        map_dct[target] = i
    df.target = df.target.map(map_dct)
    return df
        
    
def _get_df(path, only_esc10=True):
    df = pd.read_csv(path)
    if only_esc10:
        print(f"Use only 10 classes!")
        print()
        df = df[df.esc10 == True]
        df = _rename_targets(df)
    return df


def _get_target_description(df):
    return df[["target", "category"]].groupby("target").first()


def _get_target_distribution(df, title="distribution_of_targets", ax=None, show=False):    
    ax = df.target.hist(bins=50, alpha=0.5, edgecolor="black", ax=ax)
    ax.set_xlabel("target")
    ax.set_ylabel("number_of_data_points")
    _min = df.target.min()
    _max = df.target.max() + 1
    plt.xticks(np.arange(_min, _max, 1))
    if show:
        plt.title(title)
        plt.show()
    else:
        return ax
    
    
def _get_folds(df):
    fold_dct = dict()
    for fold in df.fold.unique():
        fold_dct[fold] = df[df.fold == fold]
    return fold_dct


def _get_folds_info(dct):
    print(f"Number_of_folds: {len(dct)}")
    print()
    for i, fold in dct.items():
        print(f"fold: {i} | size: {fold.shape[0]}")
    print()
    
    
def _get_folds_distribution(dct):
    print("Distribution of targets across folds:")
    fig, axs = plt.subplots(nrows=1, ncols=len(dct), figsize=(20, 2), sharey=True)
    for i, fold in dct.items():
        _ax = axs[i - 1]
        _get_target_distribution(fold, ax=_ax)
        _ax.set_title(f"fold_{i}")
    plt.show()         

    
def _get_number_of_classes(df):
    return df.target.unique().shape[0]


def _get_point_from_df(df, 
                       ind=0, 
                       base_dir='../ESC-50-master/audio', 
                       in_col='filename', 
                       out_col='target',
                       feature=melspectrogram):
    
    row = df.iloc[ind]
    file_name = row[in_col]
    label = row[out_col]

    file_path = os.path.join(base_dir, file_name)            
    wav, sr = get_wave(file_path)
    point_feature = get_feature(wav, sr=sr, feature=feature) ### to remake
    return point_feature, label


def _train_test_split(df, test_fold=1):
    train_df = df[df.fold != test_fold]
    test_df = df[df.fold == test_fold]
    return train_df, test_df



"""
Dataset
"""   
class ESC50Data(Dataset):
    
    def __init__(self, 
                 df,
                 feature=melspectrogram,
                 base_dir='../ESC-50-master/audio', 
                 in_col='filename', 
                 out_col='target'):
        
        self.df = df
        self.data = []
        self.labels = []
                    
        for ind in tqdm(range(len(df))):
            point_feature, label = _get_point_from_df(df, 
                                                      ind=ind, 
                                                      base_dir=base_dir, 
                                                      in_col=in_col, 
                                                      out_col=out_col,
                                                      feature=feature)
                                        
            self.data.append(point_feature[np.newaxis,...])            
            self.labels.append(label)
            
                                        
    def __len__(self):
        return len(self.data)
    
    
    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]
        
    
    



    
"""
Loaders
"""
def get_loaders(df,
                base_dir='../ESC-50-master/audio',
                feature=melspectrogram,
                test_fold=None, 
                batch_size=16):
    
    # 1. split
    train_df = df
    test_df = None
    if test_fold is not None:
        train_df, test_df = _train_test_split(df, test_fold=test_fold)

    # 2. loaders
    print("train_data:")
    train_data = ESC50Data(train_df, base_dir=base_dir, feature=feature)
    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
    
    test_loader = None
    if test_df is not None:
        print("test_data:")
        test_data = ESC50Data(test_df, base_dir=base_dir, feature=feature)
        test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=True) ### !!!
    
    return train_loader, test_loader


def get_cross_validation_loaders(df,
                                 base_dir='../ESC-50-master/audio',
                                 feature=melspectrogram, 
                                 batch_size=16):
    
    loaders = []
    folds = df.fold.unique()
    for fold in folds:
        print(f"--- FOLD: {fold} ---")
        train_loader, test_loader = get_loaders(df,
                                                base_dir=base_dir,
                                                feature=feature,
                                                test_fold=fold, 
                                                batch_size=batch_size)
        loaders.append((train_loader, test_loader))
        
    return loaders       



"""
Data Storage
"""
class DataStorage:
    
    def __init__(self,
                 base_dir="../ESC-50-master",
                 only_esc10=True,
                 feature=melspectrogram,
                 batch_size=16):
        
        csv_path = os.path.join(base_dir, "meta", "esc50.csv")
        audio_dir_path = os.path.join(base_dir, "audio")
        
        assert os.path.isfile(csv_path)
        assert os.path.isdir(audio_dir_path)
        
        self.csv_path = csv_path
        self.audio_dir_path = audio_dir_path
        self.feature = feature
        self.batch_size = batch_size
        
        df = _get_df(csv_path, only_esc10=only_esc10)
        
        self.df = df
        self.folds = _get_folds(df)
        self.target_description = _get_target_description(df)  
        self.number_of_classes = _get_number_of_classes(df)
        self.random_point_feature = self.get_ind_point_feature(ind=np.random.choice(df.shape[0]))
        self.feature_shape = self.random_point_feature.shape
        
        print(f"Number of points: {self.df.shape[0]}")
        print()
        print(self.get_target_description())
        print()
        print(f"Feature: {feature} | Shape: {self.feature_shape}")
        print()
        self.get_cross_validation_loaders()
        self.get_full_data_loader() ### ???
        
        
    def get_ind_point_feature(self, ind=0):
        point_feature, _ = _get_point_from_df(self.df, 
                                              ind=ind, 
                                              base_dir=self.audio_dir_path, 
                                              feature=self.feature)
        return point_feature
        
        
    def get_folds_description(self):
        _get_folds_info(self.folds)
        
        
    def get_folds_distribution(self):
        _get_folds_distribution(self.folds)
        
        
    def get_target_description(self):
        return self.target_description
    
    
    def get_target_distribution(self):
        _get_target_distribution(self.df, show=True)
        
        
    def get_cross_validation_loaders(self):
        print("------ CONSTRUCT CROSS VALIDATION LOADERS------")
        print()
        self.get_folds_description()
        self.get_folds_distribution()
        self.cross_validation_loaders = \
        get_cross_validation_loaders(self.df,
                                     base_dir=self.audio_dir_path,
                                     feature=self.feature, 
                                     batch_size=self.batch_size)
    
    
    def get_full_data_loader(self):
        print("------ CONSTRUCT FULL DATA LOADER------")
        print()
        print(f"Number of points: {self.df.shape[0]}")
        self.get_target_distribution()
        self.full_data_loader, _ = \
        get_loaders(self.df,
                    base_dir=self.audio_dir_path,
                    feature=self.feature,
                    test_fold=None, 
                    batch_size=self.batch_size)

In [10]:
DS = DataStorage()

# DS.get_cross_validation_loaders()
# DS.get_full_data_loader()

Use only 10 classes!



TypeError: img should be PIL Image. Got <class 'numpy.ndarray'>

In [None]:
DS.df

In [None]:
DS.random_point_feature

In [None]:
DS.feature_shape

In [None]:
DS.number_of_classes

In [13]:
from PIL import Image

###### MODEL

In [None]:
def get_device():
    return torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')


"""
Model
"""
# class Model(nn.Module):
#     def __init__(self, in_dim=1*128*431, out_dim=50):
#         super().__init__()
#         self.network = nn.Sequential(
#             nn.Linear(in_dim, out_dim))
        
#     def forward(self, xb):
#         x = xb.flatten(start_dim=1)
#         return self.network(x)

    
class Model(nn.Modele):
    def __init__(self, in_dim=1*128*431, out_dim=50):
        super().__init__()
        self.network = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2), # output: 64 x 16 x 16

            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2), # output: 128 x 8 x 8

            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2, 2), # output: 256 x 4 x 4

            nn.Flatten(), 
            nn.Linear(256*4*4, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Linear(512, out_dim))
        
    def forward(self, xb):
        return self.network(xb)    
    
    
def _init_model(in_dim=1*128*431, out_dim=50):
    model = Model(in_dim=in_dim, out_dim=out_dim)
#     model = resnet34(pretrained=True)
#     model.fc = nn.Linear(512, 50)
#     model.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    model = model.to(device)
    return model


"""
Train
"""
def train(model, 
          train_loader, 
          valid_loader,
          epochs=50,
          lr=2e-4,
          gamma=0.96,
          optim=Adam,
          sheduler=ExponentialLR,
          loss_fn=nn.CrossEntropyLoss()):
      
    
    optimizer = optim(model.parameters(), lr=lr)
    lr_scheduler = sheduler(optimizer=optimizer, gamma=gamma)
        
    train_losses = []
    valid_losses = []    
    accuracies = []
    
    for epoch in tqdm(range(1, epochs + 1)):
        
        # 1. train
        model.train()        
        lr_scheduler.step()
        
        batch_losses=[]
        for i, data in enumerate(train_loader):
            x, y = data
            optimizer.zero_grad()
            x = x.to(device, dtype=torch.float32)
            y = y.to(device, dtype=torch.long)
            
            y_hat = model(x)
            loss = loss_fn(y_hat, y)
            loss.backward()
            batch_losses.append(loss.item())
            optimizer.step()

        train_loss = np.mean(batch_losses)    
        train_losses.append(train_loss)
        print(f'Epoch: {epoch} | Train_Loss : {train_loss}')
        
        
        # 2. validate
        if valid_loader is not None:
            model.eval()
            batch_losses=[]
            trace_y = []
            trace_y_hat = []

            for i, data in enumerate(valid_loader):
                x, y = data
                x = x.to(device, dtype=torch.float32)
                y = y.to(device, dtype=torch.long)

                y_hat = model(x)
                loss = loss_fn(y_hat, y)
                trace_y.append(y.cpu().detach().numpy())
                trace_y_hat.append(y_hat.cpu().detach().numpy())      
                batch_losses.append(loss.item())

            valid_loss = np.mean(batch_losses[-1])    
            valid_losses.append(valid_loss)

            trace_y = np.concatenate(trace_y)
            trace_y_hat = np.concatenate(trace_y_hat)

            accuracy = np.mean(trace_y_hat.argmax(axis=1) == trace_y) ## ???
            accuracies.append(accuracy)

            print(f"Epoch: {epoch} | Valid_Loss : {valid_loss}") 
            print(f"Valid_Accuracy : {accuracy}")
    
    return model, optimizer, train_losses, valid_losses

    
def plot_losses(train_losses, valid_losses):
    epochs = len(train_losses)
    x = np.linspace(1, epochs, epochs) 
    plt.plot(x, train_losses, label="train_loss")
    if len(valid_losses) > 0:
        assert len(train_losses) == len(valid_losses)
        plt.plot(x, valid_losses, label="validation_loss")
    plt.xlabel("epoch")
    plt.ylabel("loss")
    plt.title("Training process")
    plt.legend()
    plt.show()
    
    
    
"""
Main class
"""
class AudioClassifier:
    
    ### add lr plot
    ### add std to mean score
    
    def __init__(self, data_storage):
        self.data_storage = data_storage
           
            
    def _fit(self, 
             train_loader, 
             test_loader,
             plot_training=True):
        
        model = _init_model(in_dim=np.multiply(*self.data_storage.feature_shape),
                            out_dim=self.data_storage.number_of_classes)
        
        model, optimizer, train_losses, test_losses = train(model,  
                                                            train_loader, 
                                                            test_loader, 
                                                            epochs=self.epochs,
                                                            lr=self.lr,
                                                            gamma=self.gamma,
                                                            optim=self.optim,
                                                            sheduler=self.sheduler,
                                                            loss_fn=self.loss_fn)
        
        self.model = model
        self.optimizer = optimizer
        self.train_losses = train_losses
        self.test_losses = test_losses
        if plot_training:
            plot_losses(train_losses, test_losses)       
    
    
    def fit(self, 
            epochs=50, 
            lr=2e-4,
            gamma=0.96,
            optim=Adam,
            sheduler=ExponentialLR,
            loss_fn=nn.CrossEntropyLoss(), 
            plot_training=True):
        
        self.epochs = epochs
        self.lr = lr
        self.gamma = gamma
        self.optim = optim
        self.sheduler = sheduler
        self.loss_fn = loss_fn
         
        self._fit(self.data_storage.full_data_loader, 
                  None,
                  plot_training=plot_training)
        
        
    def cross_validation(self, 
                         epochs=50, 
                            lr=2e-4,
                            gamma=0.96,
                            optim=Adam,
                            sheduler=ExponentialLR,
                            loss_fn=nn.CrossEntropyLoss(), 
                            plot_training=True):
        
        self.epochs = epochs
        self.lr = lr
        self.gamma = gamma
        self.optim = optim
        self.sheduler = sheduler
        self.loss_fn = loss_fn
        
        cross_val_score_s = []
        for i, (train_loader, test_loader) in enumerate(self.data_storage.cross_validation_loaders):
            print(f"--- CROSS VALIDATION | FOLD: {i + 1} ---")
            self._fit(train_loader, 
                      test_loader, 
                      plot_training=plot_training)
            
            score = self.test_losses[-1]
            cross_val_score_s.append(score)

        self.cross_val_score_s = cross_val_score_s
        self.cross_val_score = np.mean(cross_val_score_s)
        print()
        print(f"--- CROSS VALIDATION SCORE: {self.cross_val_score} ---")
        print()
        

    def predict(self, loader): 
        ### improve
        trace_y_hat = []
        self.model.eval()
        for i, data in enumerate(loader):
            x, y = data
            x = x.to(device, dtype=torch.float32)
            y = y.to(device, dtype=torch.long)

            y_hat = self.model(x)
            trace_y_hat.append(y_hat.cpu().detach().numpy())      

        return trace_y_hat
    

In [None]:
device = get_device()

clf = AudioClassifier(DS)

In [None]:
clf.data_storage

In [None]:
clf.epochs = 5

In [None]:
clf.cross_validation(epochs=5)

In [None]:
clf.mean_score

In [None]:
clf.cross_val_scores

In [None]:
clf.fit()

In [None]:
clf.predict(full_loader)