In [None]:
import pandas as pd
import torch
from torch import nn
from torch.utils.data import DataLoader, random_split
from torchsummary import summary
import pytorch_lightning as pl
from pytorch_lightning.loggers import WandbLogger
import torchmetrics
import wandb
import gc

from dataset import CoughDataset
from utils import plot_confusion_matrix

In [None]:
class CoughDataModule(pl.LightningDataModule):
    def __init__(self, 
                 df, 
                 data_path, 
                 batch_size=32, 
                 num_workers=4, 
                 train_size=0.8, 
                 val_size=0.1, 
                 test_size=0.1,
                 duration=10.0,
                 sample_rate=48000,
                 channels=1,
                 n_mels=64,
                 n_fft=1024, 
                 top_db=80):
        super().__init__()
        
        self.df = df
        self.data_path = data_path
        self.batch_size = batch_size
        self.num_workers = num_workers
        
        self.train_size = train_size
        self.val_size = val_size
        self.test_size = test_size
        
        self.duration = duration
        self.sample_rate = sample_rate
        self.channels = channels
        
        self.n_mels = n_mels
        self.n_fft = n_fft
        self.top_db = top_db
        
        if self.train_size + self.val_size + self.test_size != 1.0:
            raise Exception('train_size + val_size + test_size must be equal to 1.0')
        
        
        self.dataset = CoughDataset(df=self.df, 
                               data_path=self.data_path,
                               duration=self.duration,
                               sample_rate=self.sample_rate,
                               channels=self.channels,
                               n_mels=self.n_mels,
                               n_fft=self.n_fft,
                               top_db=self.top_db)
        self.classes = self.dataset.label_encoder.classes_

        self.train_dataset, self.val_dataset, self.test_dataset = random_split(self.dataset, [self.train_size, self.val_size, self.test_size])
            
    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True, num_workers=self.num_workers)
    
    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size, shuffle=False, num_workers=self.num_workers)
    
    def test_dataloader(self):
        return DataLoader(self.test_dataset, batch_size=self.batch_size, shuffle=False, num_workers=self.num_workers)        

In [None]:
class CoughModel(nn.Module):
    def __init__(self, n_class=3) -> None:
        super().__init__()
        
        # Convolutional layers
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=(3, 3), padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.pool1 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
        
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=(3, 3), padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.pool2 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
        
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=(3, 3), padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.pool3 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
        
        # Fully-connected layers
        self.fc1 = nn.Linear(in_features=128 * 3 * 104, out_features=512)
        self.dropout = nn.Dropout(0.5)
        self.fc2 = nn.Linear(in_features=512, out_features=n_class)
        
        # Activation function
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=1)
        
    def forward(self, x):
         # Convolutional layers
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.pool1(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.pool2(x)
        
        x = self.conv3(x)
        x = self.bn3(x)
        x = self.relu(x)
        x = self.pool3(x)
        
        # Flatten the tensor for the fully-connected layers
        x = torch.flatten(x, start_dim=1)
        
        # Fully-connected layers
        x = self.fc1(x)
        x = self.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        x = self.softmax(x)
        
        return x

In [None]:
class LitCoughClassifier(pl.LightningModule):
    def __init__(self, classes, learning_rate=1e-3):
        super().__init__()
        
        # Classes   
        self.classes = classes
        num_classes = len(self.classes)
        
        # Model
        self.model = CoughModel(num_classes)
        self.criterion = nn.CrossEntropyLoss()
        
        # Hyperparameters
        self.learning_rate = learning_rate
        
        # Loss and metrics
        self.criterion = nn.CrossEntropyLoss()
        self.accuracy = torchmetrics.Accuracy(task='multiclass', num_classes=num_classes)
        self.f1_score = torchmetrics.F1Score(task='multiclass', num_classes=num_classes)
        self.confusion_matrix = torchmetrics.ConfusionMatrix(task='multiclass', num_classes=num_classes)
        self.test_confusion_matrix = torch.zeros(num_classes, num_classes)
        
    def forward(self, x):
        return self.model(x)
        
    def training_step(self, batch, batch_idx):
        x, y = batch
        
        y_pred = self.model(x)
        
        loss = self.criterion(y_pred, y)
        self.log('train_loss', loss)
        
        # Metrics
        accuracy = self.accuracy(y_pred, y)
        self.log('train_accuracy', accuracy, on_step=True, on_epoch=True, prog_bar=True)
        f1_score = self.f1_score(y_pred, y)
        self.log('train_f1_score', f1_score, on_step=True, on_epoch=True, prog_bar=False)

        return loss
    
    def validation_step(self, batch, batch_idx):
        x, y = batch
        
        y_pred = self.model(x)
        
        loss = self.criterion(y_pred, y)
        self.log('val_loss', loss)
        
        # Metrics
        accuracy = self.accuracy(y_pred, y)
        self.log('val_accuracy', accuracy, on_step=True, on_epoch=True, prog_bar=True)
        f1_score = self.f1_score(y_pred, y)
        self.log('val_f1_score', f1_score, on_step=True, on_epoch=True, prog_bar=False)

        return loss
    
    def test_step(self, batch, batch_idx):
        x, y = batch
        
        y_pred = self.model(x)
        
        loss = self.criterion(y_pred, y)
        self.log('test_loss', loss)
        
        # Metrics
        accuracy = self.accuracy(y_pred, y)
        self.log('test_accuracy', accuracy)
        f1_score = self.f1_score(y_pred, y)
        self.log('test_f1_score', f1_score)
        
        confusion_matrix = self.confusion_matrix(y_pred, y)
        self.test_confusion_matrix += confusion_matrix

        return loss
    
    def test_epoch_end(self, outputs):
        cm = self.test_confusion_matrix.numpy()
        plot_confusion_matrix(cm, self.classes, filename='confusion_matrix.png')
    
    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.model.parameters(), lr=self.learning_rate)
        return optimizer

In [None]:
METADATA_FILE = 'data/metadata_compiled.csv'
DATA_PATH = 'data/'

metadata_df = pd.read_csv(METADATA_FILE)

In [None]:
not_nan_df = metadata_df[metadata_df['status'].isna() == False]
filtered_df = not_nan_df[not_nan_df['cough_detected'] > 0.9]    # TODO: Set as a hyperparameter
filtered_df[['uuid', 'cough_detected', 'SNR', 'age', 'gender', 'status']]

In [None]:
data_module = CoughDataModule(df=filtered_df, data_path=DATA_PATH, n_fft=1024, n_mels=64, sample_rate=16000,)
classes = data_module.classes

classifier = LitCoughClassifier(classes, learning_rate=1e-3)

test_sample = data_module.test_dataset[0][0]
print('Sample input size:', test_sample.shape)
summary(classifier, test_sample.shape)

In [None]:
def train(config=None):
    with wandb.init(config=config):
        config = wandb.config

        gc.collect()
        torch.cuda.empty_cache()

        try:
            torch.manual_seed(69)  # noice

            wandb_logger = WandbLogger(log_model=True)

            data_module = CoughDataModule(
                df=filtered_df,
                data_path=DATA_PATH,
                batch_size=config.batch_size,
                sample_rate=config.sample_rate,
                n_fft=config.n_fft,
                n_mels=config.n_mels)
            classes = data_module.classes

            classifier = LitCoughClassifier(classes, learning_rate=config.learning_rate)

            accelerator = None
            if torch.cuda.is_available():
                accelerator = 'gpu'
            elif torch.backends.mps.is_available():
                accelerator = 'cpu'  # MPS is not implemented in PyTorch yet

            trainer = pl.Trainer(
                max_epochs=config.max_epochs,
                logger=wandb_logger,
                accelerator=accelerator
            )

            trainer.fit(classifier, data_module)
            trainer.test(classifier, data_module)

            wandb.finish()

        except Exception as e:
            print(e)
            wandb.finish()
            raise e

        del wandb_logger
        del data_module
        del classifier
        del trainer

        gc.collect()
        torch.cuda.empty_cache()


In [None]:
sweep_config = {
  "method": "bayes",
  "metric": {
        "name": "val_accuracy",
        "goal": "maximize"
  },
  "parameters": {
    "batch_size": {
        "values": [32, 64, 128]
    },
    "max_epochs": {
        "values": [1]
    },
    "learning_rate": {
        "min": 0.001,
        "max": 0.01
    },
    "sample_rate": {
      "values": [16000]
    },
    "n_fft": {
      "values": [1024]
    },
    "n_mels": {
      "values": [64]
    },
  }
}

sweep_id = wandb.sweep(sweep_config, project='cough-classifier', entity='dl-miniproject')

wandb.agent(sweep_id, function=train, count=1)