# Objective: This model is used for classification of STFT array
## Description: Since the model is very large use of GPU is preferable, I have included the GPU usage also.

### Importing libraries

In [None]:
import os
import numpy as np
import torch
import torch.nn as nn
from sklearn.model_selection import train_test_split
from scipy import signal
from scipy.io import wavfile
from torch.utils.data import TensorDataset
import torch.nn.functional as F
from torchvision.datasets import ImageFolder
from torchvision.utils import make_grid
from torchvision.transforms import ToTensor
from torch.utils.data.dataloader import DataLoader
import matplotlib.pyplot as plt
%matplotlib inline

### Saving the combined STFT outputs to numpy array type file

In [None]:
rootdir = "C:\\Users\\vinay\\Downloads\\FEIS_v1_1\\augmented_wavs"
folder_count = 1
labels = ['f', 'fleece', 'goose', 'k', 'm', 'n', 'ng', 'p', 's', 'sh', 't', 'thought', 'trap', 'v', 'z', 'zh']
dataset = []
dataset_labels = []
for subdir, dirs, files in os.walk(rootdir):
    if subdir == "C:\\Users\\vinay\\Downloads\\FEIS_v1_1\\wavs\\" + f'{folder_count:02d}' + "\\combined_wavs":
        folder_count += 1
        if folder_count == 5:
            folder_count += 1
        continue
    elif subdir == "C:\\Users\\vinay\\Downloads\\FEIS_v1_1\\wavs\\chinese-1":
        break
    else:
        count = 0
        for file in files:
            dummy = file.replace("_", ".")
            file_name_list = dummy.split('.')
            file_path = subdir + "\\" + file
            samplerate, data = wavfile.read(file_path)
            f, t, Zxx = signal.stft(data, samplerate, window='boxcar', nperseg=512, nfft=512, noverlap=103)
            Zxx = Zxx.reshape((1, Zxx.shape[0], Zxx.shape[1]))
            dataset.append(abs(Zxx))
            dataset_labels.append(labels.index(file_name_list[0]))

dataset = np.array(dataset)
dataset_labels = np.array(dataset_labels)
np.save("C:\\Users\\vinay\\Downloads\\FEIS_v1_1\\dataset.npy", dataset)
np.save("C:\\Users\\vinay\\Downloads\\FEIS_v1_1\\dataset_labels.npy", dataset_labels)

### Loading the STFT dataset from local drive

In [None]:
dataset = np.load("C:\\Users\\vinay\\Downloads\\FEIS_v1_1\\dataset.npy")
dataset_labels = np.load("C:\\Users\\vinay\\Downloads\\FEIS_v1_1\\dataset_labels.npy")

### Train test split

In [None]:
X_train, X_test, y_train, y_test = train_test_split(dataset, dataset_labels, test_size=0.2, shuffle=True)

### Convert data to tensor and merging the data with label

In [None]:
X_train_tensor = torch.Tensor(X_train) # transform to torch tensor
y_train_tensor = torch.Tensor(y_train)
y_train_tensor = y_train_tensor.type(torch.LongTensor)

my_dataset = TensorDataset(X_train_tensor,y_train_tensor) # create your dataset

In [None]:
X_train_tensor.shape

### Splitting training data into batches

In [None]:
batchsize = 8

train_dl = DataLoader(my_dataset, batchsize, shuffle=True, num_workers=2, pin_memory=True)

### displaying one batch of images

In [None]:
def show_batch(dl):
    for images, label in dl:
        fig, ax = plt.subplots(figsize=(12,6))
        ax.set_xticks([]); ax.set_yticks([])
        ax.imshow(make_grid(images, nrow=16).permute(1,2,0))
        break

show_batch(train_dl)

### Training, Validation step. Accuracy function

In [None]:
class ClassificationBase(nn.Module):
    def training_step(self, batch):
        stft_tensor, labels = batch
        out = self(stft_tensor)
        loss = F.cross_entropy(out, labels)
        acc = accuracy(out, labels)
        return loss, acc
    
    def validation_step(self, batch):
        stft_tensor, labels = batch
        out = self(stft_tensor)
        loss = F.cross_entropy(out, labels)
        acc = accuracy(out, labels)
        return {'loss': loss, 'acc': acc}
    
    def validation_epoch_end(self, outputs):
        batch_losses = [x['loss'] for x in outputs]
        epoch_loss = torch.stack(batch_losses).mean()
        batch_accs = [x['acc'] for x in outputs]
        epoch_acc = torch.stack(batch_accs).mean()
        return {'loss': epoch_loss.item(), 'acc': epoch_acc.item()}
    
    def epoch_end(self, epoch, result):
        print("Epoch [{}], train_loss: {:.4f}, train_acc: {:.4f}".format(epoch, result['train_loss'], result['train_acc']))
    
def accuracy(outputs, labels):
    _, preds = torch.max(outputs, dim=1)
    return torch.tensor(torch.sum(preds == labels).item() / len(preds))

### CNN model defining layers

In [None]:
class CNNModel(ClassificationBase):
    def __init__(self):
        super().__init__()
        self.network = nn.Sequential(
            nn.Conv2d(1,8,kernel_size=3,stride=1,padding=1),
            nn.ReLU(),
            nn.Conv2d(8,16,kernel_size=3,stride=1,padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2,2),

            nn.Conv2d(16,32,kernel_size=3,stride=1,padding=1),
            nn.ReLU(),
            nn.Conv2d(32,32,kernel_size=3,stride=1,padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2,2),

            nn.Conv2d(32,64,kernel_size=3,stride=1,padding=1),
            nn.ReLU(),
            nn.Conv2d(64,64,kernel_size=3,stride=1,padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2,2),

            nn.Flatten(),
            nn.Linear(26624, 8196),
            nn.ReLU(),
            nn.Linear(8196, 1024),
            nn.ReLU(),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Linear(512,16)
        )
    def forward(self, xb):
        return self.network(xb)

model = CNNModel()

### Checking for GPU, sending data if present

In [None]:
def get_default_device():
    """Pick GPU if available, else CPU"""
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')
    
def to_device(data, device):
    """Move tensor(s) to chosen device"""
    if isinstance(data, (list,tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader():
    """Wrap a dataloader to move data to a device"""
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device
        
    def __iter__(self):
        """Yield a batch of data after moving it to device"""
        for b in self.dl: 
            yield to_device(b, self.device)

    def __len__(self):
        """Number of batches"""
        return len(self.dl)

In [None]:
device = get_default_device()
device

In [None]:
train_dl = DeviceDataLoader(train_dl, device)
to_device(model, device);

### Initializing fit function

In [None]:
@torch.no_grad()
def evaluate(model, val_loader):
    model.eval() # testing the model
    outputs = [model.validation_step(batch) for batch in val_loader]
    return model.validation_epoch_end(outputs)

def fit(epochs, lr, model, train_loader, opt_func=torch.optim.SGD):
    history = []
    optimizer = opt_func(model.parameters(), lr) # explicitly telling it what parameters (tensors) of the model it should be updating i.e. weight and biases
    for epoch in range(epochs):
        #training phase
        model.train() # tells your model that you are training the model; model.eval() or model.train(mode=False) to tell that you are testing.
        train_losses = []
        train_acc = []
        result = {}
        for batch in train_loader:
            loss, acc = model.training_step(batch)
            train_losses.append(loss)
            train_acc.append(acc)
            loss.backward() # The gradients are "stored" by the tensors themselves once called backward on the loss
            optimizer.step() # makes the optimizer iterate over all parameters it is supposed to update and use their internally stored grad to update their values.
            optimizer.zero_grad() # Sets the gradients of all optimized torch.Tensor's to zero.
        #validation phase
        #result = evaluate(model, train_loader)
        result['epoch'] = epoch
        result['train_loss'] = torch.stack(train_losses).mean().item()
        result['train_acc'] = torch.stack(train_acc).mean().item()
        model.epoch_end(epoch, result)
        history.append(result)
    return history

In [None]:
model = to_device(CNNModel(), device)

### Training the CNN model

In [None]:
num_epochs = 30
opt_func = torch.optim.Adam
lr = 0.001

history = fit(num_epochs, lr, model, train_dl, opt_func)

### printing history of model of model training

In [None]:
import pandas as pd
# convert the history.history dict to a pandas DataFrame:     
hist_df = pd.DataFrame(history)
hist_df

### Saving model history to csv file, this command specifically for google drive

In [None]:
hist_df.to_csv('model_acc_loss_4.csv')
!cp model_acc_loss_4.csv "/content/gdrive/MyDrive/Colab Notebooks/EE626P_PGP/"

### Since the PyTorch does not have summary command using external library for it

In [None]:
# !pip install torchsummary

In [None]:
from torchsummary import summary
summary(model, (1,257,109))

### Plotting training accuracy

In [None]:
def plot_accuracies(history):
    accuracies = [x['train_acc'] for x in history]
    plt.plot(accuracies, '-x')
    plt.xlabel('epoch')
    plt.ylabel('accuracy')
    #plt.title('Accuracy vs. No. of epochs');

plot_accuracies(history)

### Plotting training loss

In [None]:
def plot_losses(history):
    train_losses = [x.get('train_loss') for x in history]
    #val_losses = [x['val_loss'] for x in history]
    plt.plot(train_losses, '-bx')
    #plt.plot(val_losses, '-rx')
    plt.xlabel('epoch')
    plt.ylabel('loss')
    plt.legend(['Training'])
    plt.title('Loss vs. No. of epochs')

plot_losses(history)

### Convert data to tensor and merging the data with label

In [None]:
X_test_tensor = torch.Tensor(X_test) # transform to torch tensor
y_test_tensor = torch.Tensor(y_test)
y_test_tensor = y_test_tensor.type(torch.LongTensor)

test_dataset = TensorDataset(X_test_tensor,y_test_tensor) # create your dataset

### Testing the model for testing dataset

In [None]:
test_dl = DataLoader(test_dataset, batchsize, shuffle=True, num_workers=2, pin_memory=True)
test_dl = DeviceDataLoader(test_dl, device)
result = evaluate(model, test_dl)

In [None]:
print("test data loss and accuracy is", result)

### Predicting a random array from test dataset

In [None]:
def predict_image(img, model):
    # Convert to a batch of 1
    xb = to_device(img.unsqueeze(0), device)
    # Get predictions from model
    yb = model(xb)
    # Pick index with highest probability
    _, preds  = torch.max(yb, dim=1)
    # Retrieve the class label
    return preds[0].item()

img, label = test_dataset[0]
print('Label:', label, ', Predicted:', predict_image(img, model))

### Confusion matrix for testing dataset

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import seaborn as sn
import pandas as pd

y_pred = []
y_true = []

# iterate over test data
for inputs, labels in test_dl:
    for img in inputs:
        y_pred.append(predict_image(img, model))
    y_true = y_true + labels.tolist()

cf_matrix = confusion_matrix(y_true, y_pred)
cmd = ConfusionMatrixDisplay(cf_matrix, display_labels=['f', 'fleece', 'goose', 'k', 'm', 'n', 'ng', 'p', 's', 'sh', 't', 'thought', 'trap', 'v', 'z', 'zh'])
fig, ax = plt.subplots(figsize=(15,15))
#sn.heatmap(cmd, annot=True)
cmd.plot(ax=ax)
#plt.savefig('output.png')

### Confusion matrix for training dataset

In [None]:
y_pred = []
y_true = []

# iterate over test data
for inputs, labels in train_dl:
    for img in inputs:
        y_pred.append(predict_image(img, model))
    y_true = y_true + labels.tolist()

cf_matrix = confusion_matrix(y_true, y_pred)
cmd = ConfusionMatrixDisplay(cf_matrix, display_labels=['f', 'fleece', 'goose', 'k', 'm', 'n', 'ng', 'p', 's', 'sh', 't', 'thought', 'trap', 'v', 'z', 'zh'])
fig, ax = plt.subplots(figsize=(15,15))
#sn.heatmap(cmd, annot=True)
cmd.plot(ax=ax)
#plt.savefig('output.png')