In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    print(dirname)
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
!conda install -y -c conda-forge librosa

In [None]:
!pip install vit-pytorch

# Data Preprocessing

## Helper Functions

In [None]:
import numpy as np
import librosa
def get_melspectrogram_db(file_path, sr=22937, n_fft=2048, hop_length=512, n_mels=224, fmin=20, fmax=8300, top_db=80):
    wav,sr = librosa.load(file_path,sr=sr)
    if wav.shape[0]<5*sr:
        wav=np.pad(wav,int(np.ceil((5*sr-wav.shape[0])/2)),mode='reflect')
    else:
        wav=wav[:5*sr]
    spec=librosa.feature.melspectrogram(wav, sr=sr, n_fft=n_fft,
              hop_length=hop_length,n_mels=n_mels,fmin=fmin,fmax=fmax)
    spec_db=librosa.power_to_db(spec,top_db=top_db)
    return spec_db

In [None]:
def spec_to_image(spec, eps=1e-6):
    mean = spec.mean()
    std = spec.std()
    spec_norm = (spec - mean) / (std + eps)
    spec_min, spec_max = spec_norm.min(), spec_norm.max()
    spec_scaled = 255 * (spec_norm - spec_min) / (spec_max - spec_min)
    spec_scaled = spec_scaled.astype(np.uint8)
    return spec_scaled

## Dataset Initialization

In [None]:
# Import data from cough dataset
import os
import numpy as np
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

POSITIVE_FOLDER = "pos"
NEGATIVE_FOLDER = "neg"

ROOT = "/kaggle/input/coughnetwithwavegangen"

class CoughDataSet(Dataset):
    def __init__(self, root=ROOT, dataset_type="train"):
        self.root = root
        positive_coughs = []
        negative_coughs = []
        for entry in os.scandir(os.path.join(self.root, os.path.join(dataset_type, POSITIVE_FOLDER))):
            if entry.is_file():
                positive_coughs.append(entry.path)
        for entry in os.scandir(os.path.join(self.root, os.path.join(dataset_type, NEGATIVE_FOLDER))):
            if entry.is_file():
                negative_coughs.append(entry.path)
        data = [] # will contain tuples
        labels = []
        for filename in positive_coughs:
            # Load image
            mel_db = get_melspectrogram_db(filename)
            # Transform image using spec_to_image and get_melspectrogram_db
            spec_img = spec_to_image(mel_db)
            # Adjust for densenet required 3-channels
            spec_img_three_channel = np.repeat(spec_img[np.newaxis,...], 3, 0)
            data.append(spec_img_three_channel)
            labels.append(0)
        for filename in negative_coughs:
            # Load image
            mel_db = get_melspectrogram_db(filename)
            # Transform image using spec_to_image and get_melspectrogram_db
            spec_img = spec_to_image(mel_db)
            # Adjust for densenet required 3-channels
            spec_img_three_channel = np.repeat(spec_img[np.newaxis,...], 3, 0)
            data.append(spec_img_three_channel)
            labels.append(1)
        self.data = data
        self.labels = labels
    def __len__(self):
        return len(self.data)
    def __getitem__(self, index):
        return self.data[index], self.labels[index]
        

In [None]:
# test_data = CoughDataSet(dataset_type="test")
# test_loader = DataLoader(test_data, batch_size=32, shuffle=False)

# test_iter = iter(test_loader)
# batch = iter(test_iter)
# for x, y in batch:
#     print(x.shape)

In [None]:
train_data = CoughDataSet()
test_data = CoughDataSet(dataset_type="test")
valid_data = CoughDataSet(dataset_type="val")
train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
valid_loader = DataLoader(valid_data, batch_size=32, shuffle=True)

In [None]:
test_loader = DataLoader(test_data, batch_size=32, shuffle=True)

# Initialize Metrics

In [None]:
from ignite.metrics import Accuracy, Precision, Recall
from ignite.contrib.metrics import RocCurve

# Setup Training Metrics
training_metrics = [Accuracy()]

for metric in training_metrics:
    metric.reset()

# Setup Validation Metrics
metrics = [Accuracy(), Precision(average=False), Recall(average=False)]

# Add F1 Score as a Metric
F1 = (metrics[1] * metrics[2] * 2 / (metrics[1] + metrics[2])).mean()
metrics.append(F1)

for metric in metrics:
    metric.reset()

# Setup Test Metrics
test_metrics = [Accuracy(), Precision(average=False), Recall(average=False)]

# Add F1 Score as a Metric
F1 = (test_metrics[1] * test_metrics[2] * 2 / (test_metrics[1] + test_metrics[2])).mean()
test_metrics.append(F1)

for metric in test_metrics:
    metric.reset()

## Tensorboard Logging

In [None]:
from torch.utils.tensorboard import SummaryWriter

summary_writer = SummaryWriter("/kaggle/working/bleh_waveganv2")

# Model Initialization

## Imports

In [None]:
import os
import torch
from torch import nn
import torch.nn.functional as F
import pytorch_lightning as pl

## Set Constant Values

In [None]:
learning_rate = 3e-4
start_epoch = 1
epochs = 100
loss_fn = nn.CrossEntropyLoss()

## Set Device

In [None]:
if torch.cuda.is_available():
    device=torch.device('cuda:0')
    print(device)
else:
    device=torch.device('cpu')

## Setup Transformer

In [None]:
from torch import optim
from vit_pytorch import ViT

model = ViT(
    image_size = 224,
    patch_size = 32,
    num_classes = 2,
    dim = 128,
    depth = 12,
    heads = 8,
    mlp_dim = 128
).to(device)

optimizer = optim.Adam(model.parameters(), lr=learning_rate)

loss_function = torch.nn.CrossEntropyLoss()

## Setup Training Iteration for Model

In [None]:
train_loss_history = []
test_loss_history = []

In [None]:
import time
from tqdm import tqdm

start_time = time.time()
for epoch in range(start_epoch, epochs):
    print('Epoch:', epoch)
    
    # Initialize training 
    model.train()
    batch_losses=[]
    train_accuracy = training_metrics[0]
    
    for data, label in tqdm(train_loader):
        data = data.to(device, dtype=torch.float32)
        label = label.to(device, dtype=torch.long)
        
        optimizer.zero_grad()

        output = model(data)
        loss = loss_fn(output, label)
        
        train_accuracy.update((output, label))

        loss.backward()
        
        batch_losses.append(loss.item())
        optimizer.step()
    train_loss_history.append(batch_losses)
    
    # Add Training Accuracy to Summary
    computed_train_accuracy = train_accuracy.compute()
    summary_writer.add_scalar("Training Accuracy", computed_train_accuracy, epoch)
    print(f'Epoch - {epoch} Train-Loss : {np.mean(train_loss_history[-1])} Train-Accuracy: {computed_train_accuracy}')
    train_accuracy.reset() # reset for next epoch's use
    
    # Perform validation on epoch
    model.eval()
    with torch.no_grad():
        batch_losses = []
        for data, label in valid_loader:
            data = data.to(device, dtype=torch.float32)
            label = label.to(device, dtype=torch.long)

            val_output = model(data)
            val_loss = loss_fn(val_output, label)
            
            batch_losses.append(val_loss.item())
            
            # Calculate metrics here on validation data
            for metric in metrics:
                metric.update((val_output, label))
        test_loss_history.append(batch_losses)
    
    # Finally add metrics for validation
    for metric in metrics:
        metric_name = metric.__class__.__name__
        computed_metric = metric.compute()
        if (torch.is_tensor(computed_metric) and len(computed_metric.size()) > 0):
            # Must iterate because this is a tensor
            for idx, row in enumerate(computed_metric):
                summary_writer.add_scalar(metric_name + f' of {idx}', row, epoch)
        else:
            # Add Metric to Summary
            summary_writer.add_scalar(metric_name, computed_metric, epoch)
        print("Validation {} is {}".format(metric_name, computed_metric))
        
    # Finally reset metrics for next iteration
    for metric in metrics:
        metric.reset()
    
    summary_writer.flush()
    
    # Check Test Results
    total = 0
    correct = 0
    with torch.no_grad():
        for data, label in test_loader:
            data = data.to(device, dtype=torch.float32)
            label = label.to(device, dtype=torch.long)

            test_output = model(data)
            _, predicted = torch.max(test_output, 1)
            for metric in test_metrics:
                metric.update((predicted, label))

    for metric in test_metrics:
        metric_name = metric.__class__.__name__
        computed_metric = metric.compute()
        if (torch.is_tensor(computed_metric) and len(computed_metric.size()) > 0):
            # Must iterate because this is a tensor
            for idx, row in enumerate(computed_metric):
                summary_writer.add_scalar(metric_name + f' of {idx}', row, epoch)
        else:
            # Add Metric to Summary
            summary_writer.add_scalar(metric_name, computed_metric, epoch)
        print("Test {} is {}".format(metric_name, computed_metric))

    # Finally reset metrics for next iteration
    for metric in test_metrics:
        metric.reset()

print('Execution time:', '{:5.2f}'.format(time.time() - start_time), 'seconds')

In [None]:
# !mkdir /kaggle/working/checkpoints

In [None]:
for dirname, _, _ in os.walk("/kaggle"):
    print(dirname)

In [None]:
with open('/kaggle/working/checkpoints/{}_{}.pth'.format("nonwavegan", epoch), 'wb') as f:
    torch.save({
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
    }, f)
checkpoint = torch.load(f'/kaggle/working/checkpoints/{"nonwavegan"}_{epoch}.pth')

In [None]:
summary_writer.close()

In [None]:
!zip -r checkpoints.zip /kaggle/working/checkpoints

In [None]:
from IPython.display import FileLinks
FileLinks('/kaggle/working/')

In [None]:
total = 0
correct = 0

model.eval()
with torch.no_grad():
    for data, label in test_loader:
        data = data.to(device, dtype=torch.float32)
        label = label.to(device, dtype=torch.long)

        test_output = model(data)
        _, predicted = torch.max(test_output, 1)
        print(predicted)
        print(label)

        total += len(label)
        correct += (predicted == label).sum().item()
print(f"Test Accuracy: {correct / total}")