In [None]:
# Author : Yagan Arun
from google.colab import drive
import torch
import torch.nn as nn
from torch.nn.functional import softmax, interpolate
from torchvision.io import read_image
import torch.optim as optim
from torch.optim import lr_scheduler
from torchvision.transforms.functional import normalize, resize, to_pil_image
import torch.backends.cudnn as cudnn
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
from torchvision.models import resnet50
from torchcam.methods import SmoothGradCAMpp, LayerCAM, ScoreCAM
from torchcam.utils import overlay_mask

import matplotlib.pyplot as plt
import time
import os
from PIL import Image
from tempfile import TemporaryDirectory

cudnn.benchmark = True

In [None]:
drive.mount('/content/drive')
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(" Computation unit used : {}".format(device))

In [None]:
ROOT = "drive/MyDrive/"
pg = os.path.join(ROOT, "playground")
data_dir = os.path.join(ROOT, "MangoLeafBD")
models_dir = os.path.join(ROOT, "models")

In [None]:
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

In [None]:
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),
                                          data_transforms[x])
                  for x in ['train', 'val']}
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=32,
                                             shuffle=True, num_workers=4)
              for x in ['train', 'val']}
dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}
class_names = image_datasets['train'].classes

In [None]:
def imshow(inp, title=None):
    """Display image for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)

In [None]:
inputs, classes = next(iter(dataloaders['train']))
# Make a grid from batch
out = torchvision.utils.make_grid(inputs)
imshow(out, title=[class_names[x] for x in classes])

In [None]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=25, check_pt_freq=5, model_name = None):
    since = time.time()
    PATH = os.path.join(models_dir, f'{model_name}.pth')
    statsrec = {
        "train": {
            "epoch":[],
            "loss":[],
            "acc":[],
        },
        "val" : {
            "epoch":[],
            "loss":[],
            "acc":[],
        }
    }
    if f'{model_name}.pth' in os.listdir(models_dir):
      return torch.load(PATH)
    # Create a temporary directory to save training checkpoints
    with TemporaryDirectory() as tempdir:
        best_model_params_path = os.path.join(tempdir, 'best_model_params.pt')

        torch.save(model.state_dict(), best_model_params_path)
        best_acc = 0.0

        for epoch in range(num_epochs):
            print(f'Epoch {epoch}/{num_epochs - 1}')
            print('-' * 10)

            # Each epoch has a training and validation phase
            for phase in ['train', 'val']:
                if phase == 'train':
                    model.train()  # Set model to training mode
                else:
                    model.eval()   # Set model to evaluate mode

                running_loss = 0.0
                running_corrects = 0

                # Iterate over data.
                for inputs, labels in dataloaders[phase]:
                    inputs = inputs.to(device)
                    labels = labels.to(device)

                    # zero the parameter gradients
                    optimizer.zero_grad()

                    # forward
                    # track history if only in train
                    with torch.set_grad_enabled(phase == 'train'):
                        outputs = model(inputs)
                        _, preds = torch.max(outputs, 1)
                        loss = criterion(outputs, labels)

                        # backward + optimize only if in training phase
                        if phase == 'train':
                            loss.backward()
                            optimizer.step()

                    # statistics
                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(preds == labels.data)
                if phase == 'train':
                    scheduler.step()

                epoch_loss = running_loss.double() / dataset_sizes[phase]
                epoch_acc = running_corrects.double() / dataset_sizes[phase]
                statsrec[phase]["epoch"].append(epoch)
                statsrec[phase]["loss"].append(epoch_loss)
                statsrec[phase]["acc"].append(epoch_acc)

                print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

                # deep copy the model
                if phase == 'val' and epoch_acc > best_acc:
                    best_acc = epoch_acc
                    torch.save(model.state_dict(), best_model_params_path)

            print()

        time_elapsed = time.time() - since
        print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
        print(f'Best val Acc: {best_acc:4f}')

        # load best model weights
        model.load_state_dict(torch.load(best_model_params_path))

        # save model in permanent dir
        torch.save({"state_dict": model, "stats": statsrec}, PATH)
    return model

In [None]:
def get_gradcam(model, img_path, layer='layer3'):
    model = model.eval()
    cam_extractor = SmoothGradCAMpp(model, '{}'.format(layer))
    # Get your input
    img = Image.open(img_path)
    transform = transforms.PILToTensor()
    img = transform(img)
    # Preprocess it for your chosen model
    input_tensor = normalize(resize(img.to(device), (224, 224)) / 255., [0.485, 0.456, 0.406], [0.229, 0.224, 0.225])

    # Preprocess your data and feed it to the model
    out = model(input_tensor.unsqueeze(0))
    # Retrieve the CAM by passing the class index and the model output
    cams = cam_extractor(out.squeeze(0).argmax().item(), out)
    _, predicted = torch.max(out, 1)

    for name, cam in zip(cam_extractor.target_names, cams):
      _, axes = plt.subplots(1, 1)
      result = overlay_mask(to_pil_image(img), to_pil_image(cam.squeeze(0), mode='F'), alpha=0.5)
      return result

In [None]:
def get_localization_cues(model, img_path, layer='layer3'):
    model = model.eval()
    cam_extractor = SmoothGradCAMpp(model, '{}'.format(layer))
    # Get your input
    img = Image.open(img_path)
    transform = transforms.PILToTensor()
    img = transform(img)
    # Preprocess it for your chosen model
    input_tensor = normalize(resize(img.to(device), (224, 224)) / 255., [0.485, 0.456, 0.406], [0.229, 0.224, 0.225])

    # Preprocess your data and feed it to the model
    out = model(input_tensor.unsqueeze(0))
    # Retrieve the CAM by passing the class index and the model output
    cams = cam_extractor(out.squeeze(0).argmax().item(), out)

    _, predicted = torch.max(out, 1)

    # Resize it
    resized_cams = [resize(to_pil_image(cam.squeeze(0)), img.shape[-2:]) for cam in cams]
    segmaps = [to_pil_image((resize(cam, img.shape[-2:]).squeeze(0) >= 0.5).to(dtype=torch.float32)) for cam in cams]
    # Plot it
    for name, cam, seg in zip(cam_extractor.target_names, resized_cams, segmaps):
      _, axes = plt.subplots(1, 2)
      return seg

In [None]:
def visualize_model(model, num_images=6):
    was_training = model.training
    model.eval()
    images_so_far = 0
    fig = plt.figure()

    with torch.no_grad():
        for i, (inputs, labels) in enumerate(dataloaders['val']):
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)

            for j in range(inputs.size()[0]):
                images_so_far += 1
                ax = plt.subplot(num_images//2, 2, images_so_far)
                ax.axis('off')
                ax.set_title(f'predicted: {class_names[preds[j]]}')
                imshow(inputs.cpu().data[j])

                if images_so_far == num_images:
                    model.train(mode=was_training)
                    return
        model.train(mode=was_training)

In [None]:
model_ft = models.resnet50(weights='IMAGENET1K_V1')
num_ftrs = model_ft.fc.in_features
model_ft.fc = nn.Linear(num_ftrs, 7) # output features
model_ft = model_ft.to(device)
criterion = nn.CrossEntropyLoss()
optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.001, momentum=0.9)
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)

In [None]:
model_ft = train_model(model_ft, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=5, model_name="resnet_50_mango")

In [None]:
dataiter = iter(dataloaders['val'])
images, labels = next(dataiter)
imshow(torchvision.utils.make_grid(images))
print(images.size()[0])
images = images.to(device)
labels = labels.to(device)
print('GroundTruth: ', ' '.join(f'{class_names[labels[j]]:5s}' for j in range(32)))

In [None]:
outputs = model_ft(images)
_, predicted = torch.max(outputs, 1)
print('Predicted: ', ' '.join(f'{class_names[predicted[j]]:5s}' for j in range(3)))

In [None]:
!wget hhttps://previews.123rf.com/images/muslian/muslian1811/muslian181100367/116673148-mango-leaves-infected-by-powdery-mildew-oidium-mangiferae-powdery-mildew-is-one-of-the-most.jpg
img_path = "116673148-mango-leaves-infected-by-powdery-mildew-oidium-mangiferae-powdery-mildew-is-one-of-the-most.jpg"

In [None]:
layer = 'layer4'
plt.imshow(get_gradcam(model_ft, img_path, layer))