In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision 
from tqdm import tqdm
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import os

device = 'cpu'
if torch.cuda.is_available():
  device = 'cuda'
  torch.cuda.manual_seed_all(1)

In [None]:
from google.colab import drive
drive.mount('/content/drive')

train_dir = '/content/drive/My Drive/BitsXlaMarató/sp_covid/TRAIN'
test_dir = '/content/drive/My Drive/BitsXlaMarató/sp_covid/TEST'
transform = torchvision.transforms.Compose([torchvision.transforms.CenterCrop(217),
                                            torchvision.transforms.Resize(112),
                                            torchvision.transforms.Grayscale(),
                                            torchvision.transforms.ToTensor()])

batch_size = 64

train_dataset = torchvision.datasets.ImageFolder(train_dir, transform=transform)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_dataset = torchvision.datasets.ImageFolder(test_dir, transform=transform)
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

Mounted at /content/drive


In [None]:
for data_batch, labels_batch in train_loader:
    print('data batch shape:', data_batch.shape)
    print('labels batch shape:', labels_batch.shape)
    break

data batch shape: torch.Size([64, 1, 112, 112])
labels batch shape: torch.Size([64])


In [None]:
class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

def train_model(model, optimizer, loss_fn, train_loader, val_loader, epochs):

    train_accuracies, train_losses, val_accuracies, val_losses = [], [], [], []
    val_loss = AverageMeter()
    val_accuracy = AverageMeter()
    train_loss = AverageMeter()
    train_accuracy = AverageMeter()

    for epoch in range(epochs):
        # train
        model.train()
        train_loss.reset()
        train_accuracy.reset()
        train_loop = tqdm(train_loader, unit=" batches")  # For printing the progress bar
        for data, target in train_loop:
            train_loop.set_description('[TRAIN] Epoch {}/{}'.format(epoch + 1, epochs))
            data, target = data.float().to(device), target.float().to(device)
            target = target.unsqueeze(-1)
            data, target = data.cuda(), target.cuda()
            optimizer.zero_grad()
            output = model(data)
            #print(output.shape,target.shape,output,target,len(output),target.view(-1).shape)
            loss = loss_fn(output, target.view(-1).long()).to(device)
            loss.backward()
            optimizer.step()

            train_loss.update(loss.item(), n=len(target))
            pred = output.argmax(dim=1, keepdim=True) # get the prediction
            #print(pred)
            #print(confusion_matrix(target,pred,labels = [0,1,2]))
            acc = pred.eq(target.view_as(pred)).sum().item()/len(target)
            train_accuracy.update(acc, n=len(target))
            #Update the progress_bar
            #prec, recall = precision_and_recall(confusion_matrix(target,pred,labels = [0,1,2]))
            train_loop.set_postfix(loss=train_loss.avg, accuracy=train_accuracy.avg)

        train_losses.append(train_loss.avg)
        train_accuracies.append(train_accuracy.avg)

        # validation
        model.eval()
        val_loss.reset()
        val_accuracy.reset()
        val_loop = tqdm(val_loader, unit=" batches")  # For printing the progress bar
        with torch.no_grad():
            for data, target in val_loop:
                val_loop.set_description('[VAL] Epoch {}/{}'.format(epoch + 1, epochs))
                data, target = data.float().to(device), target.float().to(device)
                target = target.unsqueeze(-1)
                output = model(data)
                loss = loss_fn(output, target.view(-1).long())
                val_loss.update(loss.item(), n=len(target))
                pred = output.argmax(dim=1, keepdim=True) # get the prediction
                acc = pred.eq(target.view_as(pred)).sum().item()/len(target)
                val_accuracy.update(acc, n=len(target))
                val_loop.set_postfix(loss=val_loss.avg, accuracy=val_accuracy.avg)

        val_losses.append(val_loss.avg)
        val_accuracies.append(val_accuracy.avg)
        
    return train_accuracies, train_losses, val_accuracies, val_losses

# Building Network

In [None]:
cnn = nn.Sequential(
    nn.Conv2d(1, 64, 3),
    nn.ReLU(),
    nn.MaxPool2d(2),
    nn.Conv2d(64, 128, 3),
    nn.ReLU(),
    nn.MaxPool2d(2),
    nn.Conv2d(128, 256, 3),
    nn.ReLU(),
    nn.MaxPool2d(2),
    nn.Conv2d(256, 256, 3),
    nn.ReLU(),
    nn.MaxPool2d(2)
)

In [None]:
class Combine(nn.Module):
    def __init__(self):
        super(Combine, self).__init__()
        self.flatten = nn.Flatten()
        self.cnn = cnn
        self.rnn = nn.LSTM(
            input_size=5*5*256, 
            hidden_size=100, 
            num_layers=1,
            batch_first=True)
        self.dout = nn.Dropout(0.4)

        self.mlp = nn.Sequential(
          nn.Linear(100, 512), # Linear layer
          nn.ReLU(), # Activation function
          nn.Dropout(0.4),
          nn.Linear(512, 3), # Linear layer
          nn.LogSoftmax() # Activation function
        )


    def forward(self, x, state=None):
      batch_size, channels, height, width = x.size()
      hidden = self.init_hidden(batch_size,1,100)
      cell = torch.randn(batch_size,1,100)
      pack=(hidden,cell)
      x=self.cnn(x)
      x = x.view(batch_size, 1, -1)
      print(x.shape)
      ht, state = self.rnn(x, pack)
      ht = self.dout(ht)
      #h = ht.contiguous().view(-1, self.rnn_size)
      h=ht[:, -1, :]
      logprob = self.mlp(h)
      return logprob

    def init_hidden(self, a, batch_size, c):
        hidden = torch.randn( a, batch_size, c).to(device)
        return hidden

In [None]:
model = Combine()
model.to(device)

Combine(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (cnn): Sequential(
    (0): Conv2d(1, 64, kernel_size=(3, 3), stride=(1, 1))
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1))
    (4): ReLU()
    (5): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1))
    (7): ReLU()
    (8): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (9): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1))
    (10): ReLU()
    (11): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (rnn): LSTM(6400, 100, batch_first=True)
  (dout): Dropout(p=0.4, inplace=False)
  (mlp): Sequential(
    (0): Linear(in_features=100, out_features=512, bias=True)
    (1): ReLU()
    (2): Dropout(p=0.4, inplace=False)
    (3): Linear(in_features=512, out_features=3, bias=Tru

In [None]:
optimizer = optim.SGD(model.parameters(), lr=0.1)
loss_fn = F.nll_loss

In [None]:
epochs = 10
train_accuracies, train_losses, val_accuracies, val_losses = train_model(model, optimizer, loss_fn, train_loader, val_loader, epochs)

[TRAIN] Epoch 1/10:   0%|          | 0/28 [00:15<?, ? batches/s]

torch.Size([64, 1, 6400])


RuntimeError: ignored

In [None]:
epochs = range(len(train_accuracies))

plt.plot(epochs, train_accuracies, 'b', label='Training acc')
plt.plot(epochs, val_accuracies, 'r', label='Validation acc')
plt.title('Training and validation accuracy')
plt.legend()

plt.figure()

plt.plot(epochs, train_losses, 'b', label='Training loss')
plt.plot(epochs, val_losses, 'r', label='Validation loss')
plt.title('Training and validation loss')
plt.legend()

plt.show()

In [None]:
model_dropout = nn.Sequential(
    nn.Conv2d(1, 64, 3),
    nn.ReLU(),
    nn.MaxPool2d(2),
    nn.Conv2d(64, 128, 3),
    nn.ReLU(),
    nn.MaxPool2d(2),
    nn.Conv2d(128, 256, 3),
    nn.ReLU(),
    nn.MaxPool2d(2),
    nn.Conv2d(256, 256, 3),
    nn.ReLU(),
    nn.MaxPool2d(2),
    nn.Flatten(),
    nn.Dropout(0.5),
    nn.Linear(256*5*5, 512),
    nn.ReLU(),
    nn.Linear(512, 1),
    nn.Sigmoid()
)

model.to(device)

In [None]:
epochs = 60
train_accuracies_dropout, train_losses_dropout, val_accuracies_dropout, val_losses_dropout = train_model(model_dropout, optimizer, loss_fn, train_loader, val_loader, epochs)

In [None]:
epochs = range(len(train_accuracies_dropout))

plt.plot(epochs, train_accuracies_dropout, 'b', label='Training acc')
plt.plot(epochs, val_accuracies_dropout, 'r', label='Validation acc')
plt.title('Training and validation accuracy')
plt.legend()

plt.figure()

plt.plot(epochs, train_losses_dropout, 'b', label='Training loss')
plt.plot(epochs, val_losses_dropout, 'r', label='Validation loss')
plt.title('Training and validation loss')
plt.legend()

plt.show()

In [None]:
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=0.001)
loss_fn = nn.BCELoss()
epochs = 30
train_accuracies_l2, train_losses_l2, val_accuracies_l2, val_losses_l2 = train_model(model, optimizer, loss_fn, train_loader, val_loader, epochs)

In [None]:
epochs = range(len(train_accuracies_l2))

plt.plot(epochs, train_accuracies_l2, 'b', label='Training acc')
plt.plot(epochs, val_accuracies_l2, 'r', label='Validation acc')
plt.title('Training and validation accuracy')
plt.legend()

plt.figure()

plt.plot(epochs, train_losses_l2, 'b', label='Training loss')
plt.plot(epochs, val_losses_l2, 'r', label='Validation loss')
plt.title('Training and validation loss')
plt.legend()

plt.show()

In [None]:
torch.save(model, '/content/drive/My Drive/BitsXlaMarató/cough_detection_model_from_scratch.pt')
'''
To load it we can do:
model = torch.load(PATH)
model.eval()
'''