# Download datset from kaggle API

In [None]:
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

!kaggle datasets download -d gpiosenka/100-bird-species

!unzip -q 100-bird-species.zip

# imports

# Download packages

In [None]:
!pip install pytorch-lightning
!pip install torcheval

# Build and train model
## define transforms and device

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import torch.backends.cudnn as cudnn
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
from PIL import Image
from tempfile import TemporaryDirectory

# old transformation
transform = transforms.Compose([transforms.ToTensor(), transforms.Resize((224, 224))])

# this is the new transform
transformsss = torchvision.transforms.Compose([
    torchvision.transforms.Resize(256),
    torchvision.transforms.CenterCrop(224),
    torchvision.transforms.ToTensor(),
    torchvision.transforms.ColorJitter()
])

torch.manual_seed(42)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## Load the datasets
We use ImageFolder from torchvision  

In [None]:
train_set = torchvision.datasets.ImageFolder("./train", transform = transformsss)
train_loader = DataLoader(train_set , batch_size=64 , shuffle = True)


validset = torchvision.datasets.ImageFolder("./valid", transform = transformsss)
validloader = torch.utils.data.DataLoader(validset , batch_size=64 , shuffle = True)

testset = torchvision.datasets.ImageFolder("./test", transform = transformsss)
test_loader = torch.utils.data.DataLoader(testset , batch_size=64 , shuffle = True)

## Define transfer training function

In [None]:
def train_model(model, trainLoader, valid_loader, epochs, optimizer, scheduler, criterion):
    '''
    The function iterate over epochs - each iteration iterates ovr the trainloader for training and over the validationLoader for validation.
    After training the function returns the best model.
    The function is inspired from official pytorch docs
    '''
    since = time.time() # calc train time

    print_every = 30
    min_valid_loss = 0.09
    trainloss = []
    validloss = []

    # Create a temporary directory to save training checkpoints
    with TemporaryDirectory() as tempdir:
        best_model_params_path = os.path.join(tempdir, 'best_model_params.pt')
        torch.save(model.state_dict(), best_model_params_path)
        best_acc = 0.

        for epoch in range(epochs):
            print(f'Epoch {epoch}/{epochs - 1}')
            print('-' * 10)

            # the following variables used to calc accuracy and losses
            train_loss = 0.0
            valid_loss = 0.0

            running_loss = 0.0
            running_corrects = 0

            epoch_loss = 0.0

            # training phase
            model.train()
            for batch_i, (images, target) in enumerate(trainLoader):
                images = images.cuda()
                target = target.cuda()

                optimizer.zero_grad()
                torch.set_grad_enabled(True)

                # forward
                output = model(images)
                _, preds = torch.max(output, 1)
                loss = criterion(output, target)

                # backward
                loss.backward()
                optimizer.step()


                train_loss = loss.item()
                trainloss.append(loss.item())
                epoch_loss += train_loss


            scheduler.step()
            epoch_loss = epoch_loss / len(trainLoader)

            print(f'Train | Loss: {epoch_loss:.4f}')


            running_corrects = 0
            epoch_loss = 0.0
            total_samples = 0
            print()


            # validation phase
            model.eval()
            with torch.no_grad():
                for batch_i, (images, target) in enumerate(validloader):
                    images = images.cuda()
                    target = target.cuda()


                    output = model(images)
                    _, preds = torch.max(output.data, 1)
                    loss = criterion(output, target)
                    epoch_loss += loss.item()


                    valid_loss = loss.item()
                    epoch_loss += valid_loss
                    validloss.append(valid_loss)


                    running_corrects += torch.sum(preds == target.data)
                    total_samples += target.size(0)


            epoch_loss = epoch_loss / len(validloader)
            epoch_acc = running_corrects.double() / total_samples
            print(f'Valid | Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')


            if epoch_acc > best_acc:
                best_acc = epoch_acc
                torch.save(model.state_dict(), best_model_params_path)

        time_elapsed = time.time() - since
        print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
        print(f'Best val Acc: {best_acc:4f}')
        model.load_state_dict(torch.load(best_model_params_path))
    return model, trainloss, validloss


## Define test function

In [None]:
def test(model, test_loader, criterion, batch_size):
    '''
    This func just iterate over test_loader and calculate accuracy and the indices where the model got correct and incorrect predictions
    '''
    model.eval()
    sum_losses = 0
    total_samples = 0
    running_corrects = 0

    unique_correct_indices = []
    unique_error_indices = []


    with torch.no_grad():
        for iter_num, (images, target) in enumerate(test_loader):
            images = images.cuda()
            target = target.cuda()

            output = model(images)
            loss = criterion(output, target)
            sum_losses += loss.item()

            _, preds = torch.max(output.data, 1)
            total_samples += target.size(0)

            running_corrects += torch.sum(preds == target.data)

            unique_correct_indices.extend((preds == target.data).nonzero().cpu().numpy() + (batch_size*iter_num))
            unique_error_indices.extend((preds != target.data).nonzero().cpu().numpy() + (batch_size*iter_num))



        test_acc = running_corrects/total_samples
        test_loss = sum_losses / len(test_loader)
    print(f"Test acc: {test_acc}, Test loss: {test_loss}")
    return unique_correct_indices, unique_error_indices


## Train the models and print results

In [None]:
# vgg19
vgg19_model = torchvision.models.vgg19(weights=torchvision.models.VGG19_Weights.DEFAULT)
vgg19_model.classifier[6] = nn.Linear(vgg19_model.classifier[6].in_features, 525)

alexNet_model = torchvision.models.alexnet(weights=torchvision.models.AlexNet_Weights.IMAGENET1K_V1)
for param in alexNet_model.parameters():
        param.requires_grad = False
alexNet_model.classifier[6] = nn.Linear(alexNet_model.classifier[6].in_features, 525)

# googleNet
googlenet_model = torchvision.models.googlenet(pretrained=True)

# # resNet18
resNet18_model = torchvision.models.resnet18(weights='IMAGENET1K_V1')

# # resnet34
resNet34_model = torchvision.models.resnet34(weights='IMAGENET1K_V1')


pretrained_models = {'AlexNet': alexNet_model}
                     'googlenet': googlenet_model,
                     'resNet18': resNet18_model,
                     'resNet34': resNet34_model}

out_features = 525  # Number of classes for bird types
uniques_indices = {}
for model_name, model in pretrained_models.items():
    print('\n\n #### Model:', model_name, " ####\nnum of parameters:", sum(p.numel() for p in model.parameters()))



    # put the last layer
    if model_name != "AlexNet": # this condition because AlexNet is built different
        # freeze
        for param in model.parameters():
          param.requires_grad = False

        in_features = model.fc.in_features
        model.fc = nn.Linear(in_features, out_features)



    model = model.to(device)

    criterion = nn.CrossEntropyLoss().cuda()
    optimizer_conv = optim.Adam(model.parameters(), lr=0.001)

    # Decay LR by a factor of 0.1 every 7 epochs
    exp_lr_scheduler = lr_scheduler.StepLR(optimizer_conv, step_size=7, gamma=0.1)

    new_model, train_losses, val_losses = train_model(model, train_loader, validloader, 10, optimizer_conv, exp_lr_scheduler, criterion)

    # test
    correct_indices, error_indices = test(model, test_loader, criterion, 64)

    uniques_indices[model_name] = correct_indices, error_indices



## Count unique errors and corrects

In [None]:
from copy import deepcopy as dc

def count_uniques(lsts):
    '''
    Function gets lists of indices for each model
    Return: list of number of unique elements of each list
    '''
    lsts = dc(lsts)
    tmp = []

    uniques = []


    for i, lst in enumerate(lsts):
        others = np.array([lsts[x] for x in range(len(lsts)) if x != i])
        others = [j[0] for v in others for j in v]

        uniques.append(len(set([x[0] for x in lst]) - set(others)))

    return uniques


# corrects
corrects_counts = count_uniques([x[0] for x in uniques_indices.values()])

# errors
errors_counts = count_uniques([x[1] for x in uniques_indices.values()])

for i, model_name in enumerate(uniques_indices.keys()):
  print(f"model: {model_name}\nunique corrects: {corrects_counts[i]}, unique errors: {errors_counts[i]}\n\n")

# Train classic ML model
## Process the data

In [None]:
## define identity class as the final layer of the edited model to return prev layer values
class Identity(nn.Module):
  def __init__(self):
    super(Identity, self).__init__()

  def forward(self, x):
    return x


FE_googlenet_model = dc(googlenet_model)
FE_googlenet_model.fc = Identity()
batch_size = 64

x_train = []
y_train = []
x_test = []
y_test = []

idx = 0
for input, label in train_loader:
  input = input.cuda()
  label = label.cuda()

  features = FE_googlenet_model(input) # shape (64, 1024)


  x_train += features.cpu().tolist()
  y_train += label.tolist()


for input, label in test_loader:
  input = input.cuda()
  label = label.cuda()

  features = FE_googlenet_model(input) # shape (64, 1024)
  x_test += features.cpu().tolist()
  y_test += label.tolist()
  # break




## Train random forest and print results

In [None]:
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score
from sklearn.metrics import f1_score


x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.2, random_state=52)
clf = RandomForestClassifier(max_depth=10, random_state=358)
clf.fit(x_train, y_train)

y_pred = clf.predict(x_test)


cm = confusion_matrix(y_test, y_pred)
test_acc = accuracy_score(y_test, y_pred)

y_val_pred = clf.predict(x_val)
val_acc = accuracy_score(y_val, y_val_pred)


print(f"test acc: {test_acc}, val_acc: {val_acc}")
f1_score(y_test, y_pred, average='micro')

## Train KNN and print results

In [None]:
from sklearn.neighbors import KNeighborsClassifier

# neigh.fit(X, y)

x_train, x_val, y_train, y_val = train_test_split(x_train, y_train, test_size=0.2, random_state=52)
clf = KNeighborsClassifier(n_neighbors=30)
# clf = RandomForestClassifier(max_depth=10, random_state=358)
clf.fit(x_train, y_train)

y_pred = clf.predict(x_test)


cm = confusion_matrix(y_test, y_pred)
test_acc = accuracy_score(y_test, y_pred)

y_val_pred = clf.predict(x_val)
val_acc = accuracy_score(y_val, y_val_pred)


print(f"test acc: {test_acc}, val_acc: {val_acc}")
f1_score(y_test, y_pred, average='micro')

# Define Test function for addition metrics like F1 score

In [None]:
from torcheval.metrics.functional import multiclass_f1_score

def test_metrics(model, test_loader, criterion, batch_size):
    model.eval()
    sum_losses = 0
    total_samples = 0
    running_corrects = 0

    unique_correct_indices = []
    unique_error_indices = []

    F1s = 0.0

    with torch.no_grad():
        for iter_num, (images, target) in enumerate(test_loader):
            images = images.cuda()
            target = target.cuda()

            output = model(images)
            loss = criterion(output, target)
            sum_losses += loss.item()

            # total_samples += target.size(0)
            _, preds = torch.max(output.data, 1)
            total_samples += target.size(0)

            # running_corrects += torch.sum(preds == target.data)
            F1s += multiclass_f1_score(preds, target.data, num_classes=525).item()




        F1_score = F1s / len(test_loader)


    print(f"Test F1_score: {F1_score}")
    return None