<a href="https://colab.research.google.com/github/ElektrosStulpas/DeepLearningVU22/blob/main/GMMFirst.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Pirma užduotis reikalaus realizuoti efektyvų duomenų nuskaitymo programąir pirmo klasifikatoriaus sukūrimą.  Atsiskaitinėjant pratybų dėstytojas atsiųstestinių vaizdų/garsų, su kuriais turėsite pademonstruoti, kaip jūsų realizuotasmodelis veikia.  Atsiskaitymo metu turėsite gebėti papasakoti, kaip realizuotasjūsų užduoties varianto t.y.  duotas specifinis klasifikavimo modelis.  Programi-nės įrangos sprendimą galite naudoti savo nuožiūra. Reikalingos sąvokos (rinki-nys (angl.  batch), duomenų klasė (angl.  Dataset), duomenų paruošėjas (angl.DataLoader), darbininkai (angl.   workers), duomenų išankstinis išsaugojimas(angl.  pre-fetching, caching), duomenų nuskaitymo paralelizavimas, duomenųtransformacijų (augmentacijų) paralelizavimas).
Mano užduoties Dataset: http://m3c.web.auth.gr/research/aesdd-speech-emotion-recognition/

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Function to scan the dataset directory in drive and put it all into a dataframe

In [None]:
import os
import pandas as pd
import numpy as np
import torch
import torchaudio

def load_data_from_drive():
  emotions = ('anger', 'disgust', 'fear', 'happiness', 'sadness')
  data_temp = []
  sound_lengths = []

  max_frames = 0
  for i, val in enumerate(emotions):
    for file in os.scandir("drive/MyDrive/AESDD/" + val):
      data_temp.append([file.path, i])
      sound_lengths.append(torchaudio.info(file.path).num_frames)

  #     frames = torchaudio.info(file.path).num_frames
  #     if frames == 561607: 
  #       file_path = file.path

  # print(file_path)
  sound_lengths = np.array(sound_lengths)
  return pd.DataFrame(data_temp, columns=["relative_path", "class_id"]), int(sound_lengths.mean())

Loading data from drive to a dataframe and checking if it's in the expected format

In [None]:
df, MEAN_OF_SOUNDS = load_data_from_drive()
print(MEAN_OF_SOUNDS)
#MEAN_OF_SOUNDS_DUR = int(MEAN_OF_SOUNDS // (44100//1000) + 1000)
df.head()
#print(MEAN_OF_SOUNDS_DUR)

180994


Unnamed: 0,relative_path,class_id
0,drive/MyDrive/AESDD/anger/a01 (3).wav,0
1,drive/MyDrive/AESDD/anger/a02 (3).wav,0
2,drive/MyDrive/AESDD/anger/a03 (3).wav,0
3,drive/MyDrive/AESDD/anger/a04 (3).wav,0
4,drive/MyDrive/AESDD/anger/a05 (3).wav,0


In [None]:
import math, random
from torchaudio import transforms
from IPython.display import Audio

class AudioUtil():
  @staticmethod
  def open(audio_file):
    sig, sr = torchaudio.load(audio_file) # returns signal as tensor, in this case there's only one channel, so it basically array. And sample rate as well
    return (sig, sr)

  @staticmethod
  def pad_trunc(aud):
    sig, sr = aud
    num_rows, sig_len = sig.shape #sig_len here is the same as num_frames

    #more than average we cut off
    if (sig_len > MEAN_OF_SOUNDS):
      sig = sig[:,:MEAN_OF_SOUNDS]

    #shorter than average we extend
    elif (sig_len < MEAN_OF_SOUNDS):
      dur_to_pad = MEAN_OF_SOUNDS - sig_len
      pad_begin_len = random.randint(0, dur_to_pad)
      pad_end_len = dur_to_pad - pad_begin_len

      pad_begin = torch.zeros((num_rows, pad_begin_len))
      pad_end = torch.zeros((num_rows, pad_end_len))

      sig = torch.cat((pad_begin, sig, pad_end), 1)
      
    return (sig, sr)


  @staticmethod
  def time_shift(aud, shift_limit):
    sig, sr = aud
    _, sig_len = sig.shape
    shift_amt = int(random.random() * shift_limit * sig_len)
    return (sig.roll(shift_amt), sr)


  @staticmethod
  def spectro_gram(aud):
    sig,sr = aud

    spec = transforms.Spectrogram()(sig)

    return (spec)


  @staticmethod
  def spectro_augment(spec, max_mask_pct=0.1, n_freq_masks=1, n_time_masks=1):
    _, freq_dim, time_dim = spec.shape
    mask_value = spec.mean()
    aug_spec = spec

    freq_mask_param = max_mask_pct * freq_dim
    for _ in range(n_freq_masks):
      aug_spec = transforms.FrequencyMasking(freq_mask_param)(spec, mask_value)

    time_mask_param = max_mask_pct * time_dim
    for _ in range(n_time_masks):
      aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec, mask_value)

    return aug_spec

In [None]:
from torch.utils.data import DataLoader, Dataset, random_split
import torchaudio

class SoundDS(Dataset):
  def __init__(self, df):
    self.df = df
            

  def __len__(self):
    return len(self.df)    
    

  def __getitem__(self, idx):
    #first get location of the file so we can open it and apply transforms
    audio_file = self.df.loc[idx, 'relative_path']
    #also get class id, since we'll give it as a label
    class_id = self.df.loc[idx, 'class_id']

    aud = AudioUtil.open(audio_file)

    dur_aud = AudioUtil.pad_trunc(aud)
    shift_aud = AudioUtil.time_shift(dur_aud, shift_limit=0.4)
    sgram = AudioUtil.spectro_gram(shift_aud)
    aug_sgram = AudioUtil.spectro_augment(sgram, max_mask_pct=0.2, n_freq_masks=2, n_time_masks=2)

    return aug_sgram, class_id

In [None]:
from torch.utils.data import random_split

myds = SoundDS(df)

num_items = len(myds)
num_train = round(num_items * 0.8)
num_val = num_items - num_train
train_ds, val_ds = random_split(myds, [num_train, num_val])

# data loaders
train_dl = torch.utils.data.DataLoader(train_ds, batch_size=16, num_workers=2, shuffle=True)
val_dl = torch.utils.data.DataLoader(val_ds, batch_size=16, num_workers=2)

In [None]:
import torch.nn.functional as F
from torch.nn import init
import torch.nn as nn

class AudioClassifier (nn.Module):
    def __init__(self):
        super().__init__()
        conv_layers = []

        #first conv
        self.conv1 = nn.Conv2d(1, 8, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
        self.relu1 = nn.ReLU()
        self.bn1 = nn.BatchNorm2d(8)
        init.kaiming_normal_(self.conv1.weight, a=0.1)
        self.conv1.bias.data.zero_()
        conv_layers += [self.conv1, self.relu1, self.bn1]

        #second conv
        self.conv2 = nn.Conv2d(8, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu2 = nn.ReLU()
        self.bn2 = nn.BatchNorm2d(16)
        init.kaiming_normal_(self.conv2.weight, a=0.1)
        self.conv2.bias.data.zero_()
        conv_layers += [self.conv2, self.relu2, self.bn2]

        #third conv
        self.conv3 = nn.Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu3 = nn.ReLU()
        self.bn3 = nn.BatchNorm2d(32)
        init.kaiming_normal_(self.conv3.weight, a=0.1)
        self.conv3.bias.data.zero_()
        conv_layers += [self.conv3, self.relu3, self.bn3]

        #fourth conv
        self.conv4 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu4 = nn.ReLU()
        self.bn4 = nn.BatchNorm2d(64)
        init.kaiming_normal_(self.conv4.weight, a=0.1)
        self.conv4.bias.data.zero_()
        conv_layers += [self.conv4, self.relu4, self.bn4]

        self.conv = nn.Sequential(*conv_layers)

        #pooling and linear output
        self.ap = nn.AdaptiveAvgPool2d(output_size=1)
        self.lin = nn.Linear(in_features=64, out_features=5)
 

    def forward(self, x):
        #pass our data through conv
        x = self.conv(x)

        #do adaptive pool on data
        x = self.ap(x)
        x = x.view(x.shape[0], -1) # fancy line for dropping to (16, 64)
        #put through final linear
        x = self.lin(x)

        #return outs
        return x


myModel = AudioClassifier()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
myModel = myModel.to(device)

In [None]:
def training(model, train_dl, num_epochs):
  cross_loss_func = nn.CrossEntropyLoss()
  optimizer = torch.optim.Adam(model.parameters())


  for epoch in range(num_epochs):
    running_loss = 0.0
    correct_prediction = 0
    total_prediction = 0

    for i, data in enumerate(train_dl):
        inputs, labels = data[0].to(device), data[1].to(device)

        #input normalization
        inputs = (inputs - inputs.mean()) / inputs.std()

        #zero gradients so to avoid descent direction accumulation
        optimizer.zero_grad()

        #forward pass data through model
        outputs = model(inputs)
        loss = cross_loss_func(outputs, labels)
        loss.backward() #recalculates gradient and accumulates it
        optimizer.step() #applies values to weights

        #accumulate loss for every batch so to get average loss after batch finished
        running_loss += loss.item() #loss contains a 0-dim tensor containing a loss number evaluation

        #get max values, but with a hidden argmax get predictions as well
        _, prediction = torch.max(outputs,1)
        #accumulate all guesses and number of correct ones
        correct_prediction += (prediction == labels).sum().item()
        total_prediction += prediction.shape[0]

    
    #once the epoch ends, get stats
    num_batches = len(train_dl)
    avg_loss = running_loss/num_batches
    acc = correct_prediction/total_prediction
    print(f'Epoch: {epoch}, Loss: {avg_loss:.2f}, Accuracy: {acc:.2f}')



In [None]:
num_epochs=4
training(myModel, train_dl, num_epochs)

In [None]:
def get_custom_val_data():
  data_temp = []
  for file in os.scandir("drive/MyDrive/CustomValData"):
    data_temp.append([file.path, int(file.name[0])])

  df = pd.DataFrame(data_temp, columns=["relative_path", "class_id"])

  # return torch.utils.data.DataLoader(SoundDS(df), batch_size=16, num_workers=2, shuffle=False)
  return SoundDS(df)

In [None]:
def inference(model, val_dl, custom_val_data=False):
  correct_prediction = 0
  total_prediction = 0


  #disable gradient updates
  with torch.no_grad():
    model.eval()
    if custom_val_data:
      val_dl = get_custom_val_data()
      
      for data in val_dl:
        inputs, labels = data[0].to(device), data[1].to(device)
        inputs = (inputs - inputs.mean()) / inputs.std()

        outputs = model(inputs)
        print(f"outputs: {outputs}")

        _, prediction = torch.max(outputs,1)
        print(f"prediction: {prediction}")
        correct_prediction += (prediction == labels).sum().item()
        total_prediction += prediction.shape[0]
    model.train()
    
  acc = correct_prediction/total_prediction
  print(f'Accuracy: {acc:.2f}, Total items: {total_prediction}')



In [None]:
def upd_inference(model, ds):
  with torch.no_grad():
    sample = next(iter(ds))[0].to(device)
    sample = sample.unsqueeze(0)
    outputs = model(sample)
    print(f"outputs: {outputs}")
    _, prediction = torch.max(outputs,1)
    print(f"prediction: {prediction}")

In [None]:
dataS = get_custom_val_data()
next(iter(dataS))
upd_inference(myModel, dataS)

outputs: tensor([[-0.2714,  0.2761,  0.3571,  0.4055, -0.8635]], device='cuda:0')


In [None]:
type(next(iter(dataS)))

tuple

In [None]:
inference(myModel, val_dl, custom_val_data=True)

outputs: tensor([[-3.0785,  1.6839,  1.7832,  1.5563, -3.2746],
        [ 0.0842, -0.7205,  1.2582, -1.3787,  1.5998],
        [ 1.0313,  1.3934,  0.0692,  1.1620, -3.1640],
        [-0.0924,  1.7656,  0.6016,  0.6336, -2.2613]], device='cuda:0')
prediction: tensor([2, 4, 1, 1], device='cuda:0')
Accuracy: 0.75, Total items: 4


In [None]:
#drive.flush_and_unmount() #save to drive when finishing work