### Prepare data ###

In [1]:
import torch
from torchvision import transforms, datasets

import torch
import torch.nn as nn
import torch.nn.functional as F

import matplotlib.pyplot as plt

from torchsummary import summary

from sklearn.metrics import accuracy_score, f1_score, recall_score, precision_score
import numpy as np

torch.manual_seed(1234)

# from sklearn.metrics import classification_report

In [None]:
data_path = './Marine Litter Dataset developed for APLASTIC-Q publication'

In [None]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomVerticalFlip(p=0.5)
])
# PLD
PLD_train = datasets.ImageFolder(data=data_path+'/PLD_train_test/train', transform=transform)
PLD_train_loader = torch.utils.data.DataLoader(PLD_train, batch_size=32, shuffle=True)

PLD_test = datasets.ImageFolder(data=data_path+'/PLD_train_test/test', transform=transforms.ToTensor())
PLD_test_loader = torch.utils.data.DataLoader(PLD_test, batch_size=1000)

# PLQ
PLQ_train = datasets.ImageFolder(data=data_path+'/PLQ_train_test/train', transform=transform)
PLQ_train_loader = torch.utils.data.DataLoader(PLQ_train, batch_size=32, shuffle=True)

PLQ_test = datasets.ImageFolder(data=data_path+'/PLQ_train_test/test', transform=transforms.ToTensor())
PLQ_test_loader = torch.utils.data.DataLoader(PLQ_test, batch_size=1000)

### APALASTIC-Q Model ### 

In [3]:
device = ('cuda:0' if torch.cuda.is_available() else 'cpu')

In [4]:
class APALASTIC_Q(nn.Module):
    def __init__(self, task='d'):
        super(APALASTIC_Q, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3)
        self.conv3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3)
        self.conv4 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3)
        if task=='d':
            self.fc1 = nn.Linear(30976,512)
            self.fc2 = nn.Linear(512,6)
        elif task=='q':
            self.fc1 = nn.Linear(5184,512)
            self.fc2 = nn.Linear(512,18)
    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = F.max_pool2d(x, kernel_size=2)
        x = F.dropout(x, p=0.25)
        x = self.conv3(x)
        x = F.relu(x)
        x = self.conv4(x)
        x = F.relu(x)
        x = F.max_pool2d(x, kernel_size=2)
        x = F.dropout(x, p=0.25)
        x = x.view(-1, x.numel()//x.shape[0])
        x = self.fc1(x)
        x = F.relu(x)
        x = F.dropout(x, p=0.5)
        x = self.fc2(x)
        x = F.relu(x)
        return x

In [5]:
PLD_CNN = APALASTIC_Q('d').to(device)
PLQ_CNN = APALASTIC_Q('q').to(device)

### Training Loop ###


In [None]:
PLD_optimizer = torch.optim.Adam(PLD_CNN.parameters(), lr=1e-3, betas=(0.9, 0.999))
PLD_criterion = torch.nn.CrossEntropyLoss()

PLQ_optimizer = torch.optim.Adam(PLQ_CNN.parameters(), lr=1e-3, betas=(0.9, 0.999))
PLQ_criterion = torch.nn.CrossEntropyLoss()

### You can define your own training function

In [None]:
def train(model, train_loader, criterion, optimizer, device):
  model.train()
  train_loss = 0
  train_acc = 0

  for batch_idx, (data, target) in enumerate(train_loader):
    data, target = data.float().to(device), target.long().to(device)

    output = model(data)
    loss = criterion(output,target)   

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    y_pred = output.argmax(dim=1)
    train_acc += (y_pred == target).sum().item()
    train_loss += loss.item()
  
  train_acc = train_acc / len(train_loader.dataset)
  train_loss = train_loss / len(train_loader.dataset)

  return train_loss, train_acc

In [None]:
def test(model, test_loader, criterion, device):
  model.eval()
  test_acc = 0
  test_loss = 0
  y_preds = np.array([])
  targets = np.array([])

  with torch.no_grad():
    for batch_idx, (data, target) in enumerate(test_loader):
      data, target = data.float().to(device), target.long().to(device)

      output = model(data)
      loss = criterion(output,target)   
      test_loss += loss.item()

      y_pred = output.argmax(dim=1)
      test_acc += (y_pred == target).sum().item()
      y_preds = np.append(y_preds, y_pred.cpu().numpy())
      targets = np.append(targets, target.cpu().numpy())

  precisions = precision_score(y_preds, targets, average=None)
  precisions = dict(zip(test_loader.dataset.classes, precisions))
  recalls = recall_score(y_preds, targets, average=None)
  recalls = dict(zip(test_loader.dataset.classes, recalls))
  f1s = f1_score(y_preds, targets, average=None)
  f1s = dict(zip(test_loader.dataset.classes, f1s))
  print(f1_score(y_preds, targets, average='weighted'))
  #
  test_acc = test_acc / len(test_loader.dataset)

  return test_loss, test_acc, precisions, recalls, f1s, y_preds, targets

In [None]:
def train_test(model, train_loader, test_loader, criterion, optimizer, device, n_epochs=100):
  for epoch in range(n_epochs):
    #torch.cuda.empty_cache()
    train_loss, train_acc = train(model, train_loader, criterion, optimizer, device)
    if (epoch+1)%1==0:
      print('\n Epoch: {}/{}, Train loss: {:.4f}, Accuracy: {:.6f}\n'.format(
        epoch+1, n_epochs, train_loss, train_acc))
    if (epoch+1)%20==0:
      test_loss, test_acc = test(model, test_loader, criterion, device)
      print('\n Epoch: {}/{}, Test loss: {:.4f}, Accuracy: {:.6f}\n'.format(
        epoch+1, n_epochs, test_loss, test_acc))