In [9]:
%matplotlib inline

import matplotlib.pyplot as plt
import torch
import torchvision
from torchvision import datasets, transforms
import helper
import numpy as np
from sklearn.model_selection import train_test_split
from torch.utils.data import Subset
import torchvision.models as models
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import classification_report,accuracy_score
from google.colab import files
from torchsummary import summary
# import onnx
# from onnx2pytorch import ConvertModel
from tqdm import trange
from tqdm import tqdm
import h5py

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


links to fine tuned models:


*   http://ptak.felk.cvut.cz/personal/sulcmila/models/LifeCLEF2018/

*   http://ptak.felk.cvut.cz/personal/sulcmila/models/LifeCLEF2019/

*   www.leafnet.pbarre.de 

*   https://www.kaggle.com/datasets/maksymshkliarevskyi/cassava-leaf-disease-models?select=EfNetB0_275_16.h5



In [6]:
path_to_model = 'drive/MyDrive/trained_models/resnet.hdf5'
#path_to_model = 'drive/MyDrive/trained_models/deepweeds_resnet50_pretrained'
feature_extract = True

In [10]:

def set_parameter_requires_grad(model, feature_extracting):
    if feature_extracting:
        for param in model.parameters():
            param.requires_grad = False

h5 = h5py.File(path_to_model, 'r')

# build model to map pretrained deepweeds model
model = models.resnet50()
set_parameter_requires_grad(model, True)
model.fc = nn.Linear(2048, 9)
model.load_state_dict(h5, strict=False)

# change output later to map csic data
model.fc = nn.Linear(2048, 4)


In [11]:
# Detect if we have a GPU available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


In [12]:
# onnx_model = onnx.load(path_to_model)
# onnx.checker.check_model(onnx_model)
# pytorch_model = ConvertModel(onnx_model , experimental = True)

In [13]:
model = model.to(device)

In [14]:
# build dataloader
mean = torch.tensor([135.31470632, 124.53849418, 103.39646082])
std = torch.tensor([5.18153318, 4.14170719, 5.17011963])

transform = transforms.Compose([transforms.Resize(255),
                                transforms.CenterCrop(224),
                                transforms.ToTensor(),
                                transforms.RandomRotation(degrees=360),
                                transforms.RandomHorizontalFlip(p=0.5),
                                transforms.ColorJitter(brightness=.5, hue=.3),
                                transforms.RandomInvert(),
                                transforms.Normalize(mean, std)])

# transform = transforms.Compose([transforms.Resize(255),
#                                 transforms.CenterCrop(224),
#                                 transforms.ToTensor(),
#                                 transforms.Normalize(mean, std)])

balanced_path_data = 'drive/MyDrive/balanced_csic_data_64/training'


def train_val_dataset(dataset, val_split=0.1):
    train_idx, val_idx = train_test_split(list(range(len(dataset))), test_size=val_split)
    datasets = {}
    datasets['train'] = Subset(dataset, train_idx)
    datasets['val'] = Subset(dataset, val_idx)
    return datasets

# load the balanced dataset
balanced_dataset = datasets.ImageFolder(balanced_path_data, transform=transform)
balanced_datasets = train_val_dataset(balanced_dataset)


class_dict = balanced_dataset.class_to_idx

In [15]:
batch_size = 32
balanced_dataloaders_dict = {x: torch.utils.data.DataLoader(balanced_datasets[x], batch_size=batch_size, shuffle=True, num_workers=2) for x in ['train', 'val']}

In [16]:
def train_model(model, dataloaders, criterion, optimizer, num_epochs=20, is_inception=False):
    since = time.time()

    val_acc_history = []
    train_acc_history = []

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            with tqdm(dataloaders[phase],unit = 'batch') as tepoch:
              # Iterate over data.
              for inputs, labels in tepoch:
                  inputs = inputs.to(device)
                  labels = labels.to(device)

                  # zero the parameter gradients
                  optimizer.zero_grad()

                  # forward
                  # track history if only in train
                  with torch.set_grad_enabled(phase == 'train'):
                      # Get model outputs and calculate loss
                      # Special case for inception because in training it has an auxiliary output. In train
                      #   mode we calculate the loss by summing the final output and the auxiliary output
                      #   but in testing we only consider the final output.
                      if is_inception and phase == 'train':
                          # From https://discuss.pytorch.org/t/how-to-optimize-inception-model-with-auxiliary-classifiers/7958
                          outputs, aux_outputs = model(inputs)
                          loss1 = criterion(outputs, labels)
                          loss2 = criterion(aux_outputs, labels)
                          loss = loss1 + 0.4*loss2
                      else:
                          outputs = model(inputs)
                          loss = criterion(outputs, labels)

                      _, preds = torch.max(outputs, 1)
                      

                      # backward + optimize only if in training phase
                      if phase == 'train':
                          loss.backward()
                          optimizer.step()

                  # statistics
                  running_loss += loss.item() * inputs.size(0)
                  #getCategoricalAccuracy(preds, labels.data, class_dict)
                  running_corrects += torch.sum(preds == labels.data)

              epoch_loss = running_loss / len(dataloaders[phase].dataset)
              epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

              print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

              # deep copy the model
              if phase == 'val' and epoch_acc > best_acc:
                  best_acc = epoch_acc
                  best_model_wts = copy.deepcopy(model.state_dict())
              if phase == 'val':
                  val_acc_history.append(epoch_acc)
              if phase == 'train':
                  train_acc_history.append(epoch_acc)

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model, val_acc_history, train_acc_history

In [None]:

# Gather the parameters to be optimized/updated in this run. If we are
#  finetuning we will be updating all parameters. However, if we are
#  doing feature extract method, we will only update the parameters
#  that we have just initialized, i.e. the parameters with requires_grad
#  is True.
params_to_update = model.parameters()
print("Params to learn:")
if feature_extract:
    params_to_update = []
    for name,param in model.named_parameters():
        if param.requires_grad == True:
            params_to_update.append(param)
            print("\t",name)
else:
    for name,param in model.named_parameters():
        if param.requires_grad == True:
            print("\t",name)

# Observe that all parameters are being optimized
optimizer_ft = optim.Adam(params_to_update, lr=1e-4)

# Setup the loss fxn
import time
import copy
criterion = nn.CrossEntropyLoss()
epoch = 100

# Training a pretrained Resnet18 on a balanced dataset
pytorch_model, balanced_val_hist, balanced_tr_hist  = train_model(model, balanced_dataloaders_dict, criterion, optimizer_ft,num_epochs= epoch)


 96%|█████████▌| 90/94 [00:38<00:01,  2.29batch/s]

In [None]:
b_train_acc = []
for val in balanced_tr_hist:
  b_train_acc.append(val.cpu().data.numpy())

b_val_acc = []
for val in balanced_val_hist:
  b_val_acc.append(val.cpu().data.numpy())

plt.plot(b_val_acc, 'm-', label='Finetuned Resnet50 Validation accuracy')
plt.plot(b_train_acc,'m--', label='Finetuned Resnet50 Training accuracy')
plt.legend()
plt.savefig('64_100epochs_finetuned.png')
plt.show()
files.download('64_100epochs_finetuned.png')

In [None]:
test_path_data = 'drive/MyDrive/balanced_csic_data_64/testing'
testing_dataset = torchvision.datasets.ImageFolder(test_path_data, transform=transform)
batch_size = 32
testing_dataloader =torch.utils.data.DataLoader(testing_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
print(testing_dataset.class_to_idx)

In [None]:
# send model to GPU
if torch.cuda.is_available():
    model.cuda()

In [None]:
def getTestingMetrics(dataloader, model):
  # send model to GPU
  if torch.cuda.is_available():
    model.cuda()

  # with tqdm(testing_dataloader) as tepoch:
  lbs = []
  preds = []
  #   # Iterate over data.
  for inputs, labels in dataloader:
    test_inputs = inputs.to(device)
    #test_labels = labels.to(device)
    outputs = model(test_inputs)
    _, pred = torch.max(outputs, 1)
    lbs+=(list(labels.numpy()))
    preds+= list(pred.cpu().data.numpy())
  print('Overall testing accuracy:',accuracy_score(lbs, preds))
  print(classification_report(lbs, preds, target_names=class_dict.keys()))


In [None]:
getTestingMetrics(testing_dataloader, model)

In [None]:
    accuracy                           0.78       850
   macro avg       0.70      0.55      0.57       850
weighted avg       0.79      0.78      0.76       850
