<a href="https://colab.research.google.com/github/DorAzaria/Sentiment-Analysis-Deep-Learning-Methods-For-Speech-Recognition/blob/main/train_model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/data/')

Mounted at /content/data/


# **IMPORTS**
---

In [82]:
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
import pickle
import numpy as np
import pandas as pd
import os
import datetime
import torchaudio
from numpy import mat

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")



# **PREPROCESS**

---



In [83]:
class Data:

    def __init__(self):
        file_handler = open('/content/data/MyDrive/dl/dataset1.pth', 'rb')
        data = pickle.load(file_handler)
        
        x_dataset = [embedding[1] for embedding in data]
        y_dataset = [label[2] for label in data]
        train_x, test_x, train_y, test_y = train_test_split(np.array(x_dataset), np.array(y_dataset), test_size=0.20)
        train_x = torch.from_numpy(train_x)
        train_y = torch.from_numpy(train_y)
        torch_train = TensorDataset(train_x, train_y)
        
        test_x = torch.from_numpy(test_x)
        test_y = torch.from_numpy(test_y)
        torch_test = TensorDataset(test_x, test_y)

        self.train_loader = DataLoader(torch_train, batch_size=28, drop_last=True, shuffle=True)
        self.test_loader = DataLoader(torch_test, batch_size=28, drop_last=True, shuffle=False)

# **TRAIN**

---



In [92]:
DROP_OUT = 0.5
NUM_OF_CLASSES = 3


class ConvNet(nn.Module):

    def __init__(self, num_of_classes, dataset):
        super().__init__()
        # Hyper parameters
        self.epochs = 100
        self.batch_size = 28
        self.learning_rate = 0.001
        self.dataset = dataset
        # Model Architecture
        self.first_conv = nn.Conv2d(1, 96, kernel_size=(5, 5), padding=1)
        self.first_bn = nn.BatchNorm2d(96)
        self.first_polling = nn.MaxPool2d(kernel_size=(3, 3), stride=(2, 2))

        self.second_conv = nn.Conv2d(96, 256, kernel_size=(5, 5), padding=1)
        self.second_bn = nn.BatchNorm2d(256)
        self.second_polling = nn.MaxPool2d(kernel_size=(3, 3), stride=(1, 1))

        self.third_conv = nn.Conv2d(256, 384, kernel_size=(3, 3), padding=1)
        self.third_bn = nn.BatchNorm2d(384)

        self.forth_conv = nn.Conv2d(384, 256, kernel_size=(3, 3), padding=1)
        self.forth_bn = nn.BatchNorm2d(256)

        self.fifth_conv = nn.Conv2d(256, 256, kernel_size=(3, 3), padding=1)
        self.fifth_bn = nn.BatchNorm2d(256)
        self.fifth_polling = nn.MaxPool2d(kernel_size=(5, 3), stride=(3, 2))

        self.sixth_conv = nn.Conv2d(256, 64, kernel_size=(2, 2), padding=1)
        self.first_drop = nn.Dropout(p=DROP_OUT)

        self.avg_polling = nn.AdaptiveAvgPool2d((1, 1))
        self.first_dense = nn.Linear(64, 1024)
        self.second_drop = nn.Dropout(p=DROP_OUT)

        self.second_dense = nn.Linear(1024, num_of_classes)

    def forward(self, X):
        x = nn.ReLU()(self.first_conv(X))
        x = self.first_bn(x)
        x = self.first_polling(x)

        x = nn.ReLU()(self.second_conv(x))
        x = self.second_bn(x)
        x = self.second_polling(x)

        x = nn.ReLU()(self.third_conv(x))
        x = self.third_bn(x)

        x = nn.ReLU()(self.forth_conv(x))
        x = self.forth_bn(x)

        x = nn.ReLU()(self.fifth_conv(x))
        x = self.fifth_bn(x)
        x = self.fifth_polling(x)

        x = nn.ReLU()(self.sixth_conv(x))
        x = self.first_drop(x)
        x = self.avg_polling(x)

        x = x.view(-1, x.shape[1])  # output channel for flatten before entering the dense layer

        x = nn.ReLU()(self.first_dense(x))
        x = self.second_drop(x)

        x = self.second_dense(x)
        y = nn.LogSoftmax(dim=1)(x)  # consider using Log-Softmax

        return y

    def get_epochs(self):
        return self.epochs

    def get_learning_rate(self):
        return self.learning_rate

    def get_batch_size(self):
        return self.batch_size

    def train_model(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
        criterion = torch.nn.CrossEntropyLoss(weight=torch.tensor([2.103336921, 3.203278689, 1]))

        n_total_steps = len(self.dataset.train_loader)

        for epoch in range(self.get_epochs()):
            for i, (embedding, labels) in enumerate(self.dataset.train_loader):

                embedding = embedding.type(torch.FloatTensor)
                labels = labels.type(torch.LongTensor)
                labels = labels.to(device)
                embedding = embedding.to(device)
                
                # Forward pass
                outputs = self.forward(embedding)
                loss = criterion(outputs, labels)

                # Backward and optimize
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

                if i == 98:
                    print(f'Epoch [{epoch + 1}/{self.epochs}], Step [{i + 1}/{n_total_steps}], Loss: {loss.item():.4f}')


# **TEST**

---



In [85]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


class TestConvNet:
    def __init__(self, model, dataset):
        self.model = model
        self.dataset = dataset
        self.results = []

    def test(self):

        with torch.no_grad():
            n_correct = 0
            n_samples = 0
            n_class_correct = [0 for i in range(3)]
            n_class_samples = [0 for i in range(3)]
            for embedding, labels in self.dataset.test_loader:

                embedding = labels.type(torch.FloatTensor)
                labels = labels.type(torch.LongTensor)

                outputs = self.model.forward(embedding)

                # max returns (value ,index)
                _, predicted = torch.max(outputs, 1)
                n_samples += labels.size(0)
                n_correct += (predicted == labels).sum().item()

                for i in range(self.model.batch_size):
                    label = labels[i]
                    pred = predicted[i]
                    if label == pred:
                        n_class_correct[label] += 1
                    n_class_samples[label] += 1

            acc = 100.0 * n_correct / n_samples
            print(f'Accuracy of the network: {acc} %')
            self.results.append(f'Accuracy of the network: {acc} %')

            for i in range(3):
                acc = 100.0 * n_class_correct[i] / n_class_samples[i]
                print(f'Accuracy of {self.dataset.classes[i]}: {acc} %')
                self.results.append(f'Accuracy of {self.dataset.classes[i]}: {acc} %')

        saved_time = datetime.datetime.now().strftime("%d-%m-%Y-%H-%M")
        file_name = 'result.txt'
        directory = '/content/data/' + str(saved_time)
        os.mkdir(directory)

        with open(directory + "/" + file_name, 'w') as f:
            for line in self.results:
                f.write(line)
                f.write('\n')

        torch.save(self.model, directory + "/model.pth")


# **Main**

---


## NORM AND INFERENCE

In [86]:
bundle = torchaudio.pipelines.WAV2VEC2_ASR_BASE_960H
model = bundle.get_model().to(device)


def Norm(X):
    embedding = X.detach().cpu().numpy()
    for i in range(len(embedding)):
        mlist = embedding[0][i]
        embedding[0][i] = 2 * (mlist - np.max(mlist)) / (np.max(mlist) - np.min(mlist)) + 1
        if embedding[0][i] < -1 or embedding[0][i] > 1:
            print("NISHBAR HAZAIN")
            break
    return torch.from_numpy(embedding)


def recording(name):
    # import sounddevice
    # # from scipy.io.wavefile import write
    # filename = name
    # fps = 16000
    # duration = 3
    # print("Recording ..")
    # recording = sounddevice.rec(int(duration * fps), samplerate = fps, channels = 2)
    # sounddevice.wait()
    # print("Done.")
    # write(filename, fps, recording)
    # return filename + ".wav"
    pass


def inference(file_name):
    waveform, sample_rate = torchaudio.load(recording(file_name))
    waveform = waveform.to(device)

    if sample_rate != bundle.sample_rate:
        waveform = torchaudio.functional.resample(waveform, sample_rate, bundle.sample_rate)

    with torch.inference_mode():
        embedding, _ = model(waveform)

    return embedding

## START TRAIN

---



In [93]:
aer_dataset = Data()
cnn = ConvNet(3, aer_dataset)
cnn.train_model()

RuntimeError: ignored

## START TEST

---



In [None]:
test = TestConvNet(cnn, aer_dataset)
test.test()
X = Norm(inference("dor_angry"))
predict = [mat.exp(c) for c in cnn.forward(X)]