In [19]:
#Imports
import os
import sys
import glob
import torch

import torchvision
from tqdm import trange

import numpy as np
import pandas as pd
import datetime as dt
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot   as plt
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2
import time
from datetime import datetime

from PIL import Image
from torch.utils.data  import Dataset
from torch.autograd    import Variable
from torch.optim       import lr_scheduler

from torch.utils.data  import Dataset, DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
from torchvision       import transforms, datasets, models
from os                import listdir, makedirs, getcwd, remove
from os.path           import isfile, join, abspath, exists, isdir, expanduser
from efficientnet_pytorch import EfficientNet


%matplotlib inline

In [20]:
origin_path = "./dataset"
train_path = join(origin_path, "train/train")
test_path = join(origin_path,"test/test")
extraimage_path = join(origin_path, "extraimages/extraimages")

In [21]:
# Transformations for both the training and testing data
mean=[0.485, 0.456, 0.406]
std=[0.229, 0.224, 0.225]

# Do data transforms here, Try many others
train_transforms = A.Compose(
        [
            A.HorizontalFlip(p=0.5),
            A.VerticalFlip(p=0.5),
            A.Rotate(),
            A.Resize(height=256, width=256, p=1),
            A.Cutout(num_holes=8, max_h_size=32, max_w_size=32, fill_value=0, p=0.5),
             A.ToFloat(max_value=255.0),
            ToTensorV2(p=1.0),
        ]
    )

test_transforms = A.Compose(
        [
            A.Resize(height=256, width=256, p=1),
             A.ToFloat(max_value=255.0),
            ToTensorV2(p=1.0),
        ]
    )



In [22]:
class CassavaDataset(Dataset):
    def __init__(self, path, transform=None):
        self.classes = os.listdir(path)
        self.path = [f"{path}/{className}" for className in self.classes]
        self.file_list = [glob.glob(f"{x}/*") for x in self.path]
        self.transform = transform
        self.classes_dict = {}

        files = []
        for i, className in enumerate(self.classes):
            for fileName in self.file_list[i]:
                self.classes_dict[i] = className
                files.append([i, className, fileName])
        self.file_list = files
        files = None

    def __len__(self):
        return len(self.file_list)
    
    def get_image_filename(self, idx):
        file_path = self.file_list[idx][2]
        file_name = file_path.split("/")[-1]
        return file_name

    def __getitem__(self, idx):
        fileName = self.file_list[idx][2]
        classCategory = self.file_list[idx][0]
        im = Image.open(fileName)
        if self.transform:
            im = self.transform(image= np.asarray(im))["image"]
            
        return im, classCategory

In [23]:
train_data = CassavaDataset(train_path, transform=train_transforms)
test_data = CassavaDataset(test_path, transform=test_transforms)

In [24]:
validation_split = .2
shuffle_dataset = True
random_seed= 42

# Creating data indices for training and validation splits:
dataset_size = len(train_data)
indices = list(range(dataset_size))
split = int(np.floor(validation_split * dataset_size))

if shuffle_dataset :
    np.random.seed(random_seed)
    np.random.shuffle(indices)

train_indices, val_indices = indices[split:], indices[:split]

In [25]:
# Creating PT data samplers and loaders:
train_sampler = SubsetRandomSampler(train_indices)
valid_sampler = SubsetRandomSampler(val_indices)
BATCH_SIZE = 16

train_loader = torch.utils.data.DataLoader(train_data, batch_size=BATCH_SIZE,
                                             sampler=train_sampler)
valid_loader = torch.utils.data.DataLoader(train_data, batch_size=BATCH_SIZE,
                                             sampler=valid_sampler)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=BATCH_SIZE)

In [26]:
# Device configuration
device = torch.device('mps' if torch.backends.mps.is_available() else 'cpu')
device

device(type='mps')

In [31]:
def make_model():
    model = EfficientNet.from_pretrained('efficientnet-b4') 
    model._fc = nn.Linear(in_features=1792, out_features=5, bias=True)
    return model

model = make_model()

Loaded pretrained weights for efficientnet-b4


In [32]:
# Define Models 

class Classifier(nn.Module):
    def __init__(self, num_classes):
        super(Classifier, self).__init__()
        # Block 1
        self.conv1 = nn.Conv2d(in_channels = 3, out_channels = 32, kernel_size = 3, stride = 2, padding = 1)
        self.relu1 = nn.ReLU()
        self.maxpool1 = nn.MaxPool2d(kernel_size = 2)

        #Block 2
        self.conv2 = nn.Conv2d(in_channels = 32, out_channels = 64, kernel_size = 3, stride = 2)
        self.relu2 = nn.ReLU()
        self.maxpool2 = nn.MaxPool2d(kernel_size=2)

        #Block 3
        self.conv3 = nn.Conv2d(in_channels = 64, out_channels = 32, kernel_size = 3, stride = 2)
        self.relu3 = nn.ReLU()
        self.maxpool3 = nn.MaxPool2d(kernel_size=2)

        # last fully-connected layer
        self.fc = nn.Linear(32*3*3, num_classes)


    def forward(self, input):

        x = self.maxpool1(self.relu1(self.conv1(input)))
        x = self.maxpool2(self.relu2(self.conv2(x)))
        x = self.maxpool3(self.relu3(self.conv3(x)))

        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x


# model = Classifier(5)

In [33]:
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
scheduler = torch.optim.lr_scheduler.OneCycleLR(
                                    optimizer,
                                    max_lr=0.001,
                                    epochs=30,
                                    steps_per_epoch=int(len(train_data) / BATCH_SIZE), 
                                    pct_start=0.1,
                                    anneal_strategy='cos',
                                    final_div_factor=10**5)

In [34]:
def train(model, criterion, data_loader, optimizer, scheduler, num_epochs):
    """Simple training loop for a PyTorch model.""" 
    
    # Make sure model is in training mode.
    model.train()
    
    # Move model to the device (CPU or MPS or GPU).
    model.to(device)
    
    # Exponential moving average of the loss.
    ema_loss = None

    print('----- Training Loop -----')
    # Loop over epochs.
    for epoch in trange(num_epochs):
        
      # Loop over data.
      for batch_idx, (features, target) in enumerate(data_loader):
            
          # Forward pass.
        output = model(features.to(device))
        loss = criterion(output.to(device), target.to(device))

          # Backward pass.
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        scheduler.step()

      # NOTE: It is important to call .item() on the loss before summing.
        if ema_loss is None:
            ema_loss = loss.item()
        else:
            ema_loss += (loss.item() - ema_loss) * 0.01 

      # Print out progress the end of epoch.
      print('Epoch: {} \tLoss: {:.6f}'.format(epoch, ema_loss),)
  

In [35]:
def test(model, data_loader):
    """Measures the accuracy of a model on a data set.""" 
    # Make sure the model is in evaluation mode.
    model.eval()
    correct = 0
    print('----- Model Evaluation -----')
    # We do not need to maintain intermediate activations while testing.
    with torch.no_grad():
        
        # Loop over test data.
        for images, target in data_loader:
            # Forward pass.
            output = model(images.to(device))
            
            # Get the label corresponding to the highest predicted probability.
            pred = output.argmax(dim=1, keepdim=True)
            # Count number of correct predictions.
            correct += pred.cpu().eq(target.view_as(pred)).sum().item()

    # Print test accuracy.
    percent = 100. * correct / len(data_loader.dataset)
    print(f'Validation accuracy: {correct} / {len(data_loader.dataset)} ({percent:.0f}%)')
    torch.save(model.state_dict(), 'model.ckpt')
    return percent

In [36]:
num_epochs = 5
train(model, criterion, train_loader, optimizer, scheduler, num_epochs=num_epochs)
test(model, valid_loader)

----- Training Loop -----


 20%|██        | 1/5 [02:01<08:05, 121.38s/it]

Epoch: 0 	Loss: 0.937459


 40%|████      | 2/5 [03:59<05:57, 119.22s/it]

Epoch: 1 	Loss: 0.622544


 60%|██████    | 3/5 [05:57<03:57, 118.75s/it]

Epoch: 2 	Loss: 0.590200


 80%|████████  | 4/5 [07:55<01:58, 118.42s/it]

Epoch: 3 	Loss: 0.568897


100%|██████████| 5/5 [09:52<00:00, 118.58s/it]

Epoch: 4 	Loss: 0.495079
----- Model Evaluation -----





Test accuracy: 894 / 5656 (16%)


15.806223479490805

In [37]:
def predict(model, data_loader):
    model.eval()
    preds = []
    for images, _ in data_loader:
        # Forward pass.
        output = model(images.to(device))
        # Get the label corresponding to the highest predicted probability.
        pred = output.argmax(dim=1, keepdim=True)
        for p in pred:
            pred_class = train_data.classes_dict[p.item()]
            preds.append(pred_class)
    return preds

In [38]:
def generate_submission_file(predictions):
    mapped_preds = []
    for idx in range(len(test_data)):
        mapped_preds.append({'Category': predictions[idx], 'Id': test_data.get_image_filename(idx)})
    pd.DataFrame(mapped_preds).to_csv("sample_submission_file.csv", index=False)

In [39]:
predictions = predict(model, test_loader)
# Make submission here
generate_submission_file(predictions)