In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import os

In [None]:
import torch
import torchvision
from torchvision import datasets
from torchvision import transforms as T # for simplifying the transforms
from torch import nn, optim
from torch.nn import functional as F
from torch.utils.data import DataLoader, sampler, random_split
from torchvision import models

In [None]:
## Now, we import timm, torchvision image models
!pip install timm # kaggle doesnt have it installed by default
import timm
from timm.loss import LabelSmoothingCrossEntropy # This is better than normal nn.CrossEntropyLoss

In [None]:
# remove warnings
import warnings
warnings.filterwarnings("ignore")

In [None]:
import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
import sys
from tqdm import tqdm
import time
import copy

In [None]:
def get_classes(data_dir):
    all_data = datasets.ImageFolder(data_dir)
    return all_data.classes

In [None]:
def get_data_loaders(data_dir, batch_size, train = False):
    if train:
        #train
        transform = T.Compose([
            T.RandomHorizontalFlip(),
            T.RandomVerticalFlip(),
            T.RandomApply(torch.nn.ModuleList([T.ColorJitter()]), p=0.25),
            T.Resize(256),
            T.CenterCrop(224),
            T.ToTensor(),
            T.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)), # imagenet means
            T.RandomErasing(p=0.2, value='random')
        ])
        train_data = datasets.ImageFolder(os.path.join(data_dir, "train/"), transform = transform)
        train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=4)
        return train_loader, len(train_data)
    else:
        # val/test
        transform = T.Compose([ # We dont need augmentation for test transforms
            T.Resize(256),
            T.CenterCrop(224),
            T.ToTensor(),
            T.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)), # imagenet means
        ])
        val_data = datasets.ImageFolder(os.path.join(data_dir, "valid/"), transform=transform)
        test_data = datasets.ImageFolder(os.path.join(data_dir, "test/"), transform=transform)
        val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=True, num_workers=4)
        test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=True, num_workers=4)
        return val_loader, test_loader, len(val_data), len(test_data)


In [None]:
dataset_path = "/content/drive/MyDrive/sunflowerdataset/Augmented Image/new_dataset_v1/"

In [None]:
(train_loader, train_data_len) = get_data_loaders(dataset_path, 128, train=True)
(val_loader, test_loader, valid_data_len, test_data_len) = get_data_loaders(dataset_path, 32, train=False)

In [None]:
print(val_loader)

In [None]:
classes = get_classes("/content/drive/MyDrive/sunflowerdataset/Augmented Image/new_dataset_v1/train")
print(classes, len(classes))

In [None]:
dataloaders = {
    "train": train_loader,
    "val": val_loader
}
dataset_sizes = {
    "train": train_data_len,
    "val": valid_data_len
}

In [None]:
print(len(train_loader), len(val_loader), len(test_loader))

In [None]:
print(train_data_len, valid_data_len, test_data_len)

In [None]:
# now, for the model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

In [None]:
model = torch.hub.load('facebookresearch/deit:main', 'deit_tiny_patch16_224', pretrained=True)

In [None]:
for param in model.parameters(): #freeze model
    param.requires_grad = False

n_inputs = model.head.in_features
model.head = nn.Sequential(
    nn.Linear(n_inputs, 512),
    nn.ReLU(),
    nn.Dropout(0.3),
    nn.Linear(512, len(classes))
)
model = model.to(device)
print(model.head)

In [None]:
criterion = LabelSmoothingCrossEntropy()
criterion = criterion.to(device)
optimizer = optim.Adam(model.head.parameters(), lr=0.001)

In [None]:
# lr scheduler
exp_lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.97)

In [None]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=50):
    since = time.time()
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    
    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print("-"*10)
        
        for phase in ['train', 'val']: # We do training and validation phase per epoch
            if phase == 'train':
                model.train() # model to training mode
            else:
                model.eval() # model to evaluate
            
            running_loss = 0.0
            running_corrects = 0.0
            
            for inputs, labels in tqdm(dataloaders[phase]):
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                optimizer.zero_grad()
                
                with torch.set_grad_enabled(phase == 'train'): # no autograd makes validation go faster
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1) # used for accuracy
                    loss = criterion(outputs, labels)
                    
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
                
            if phase == 'train':
                scheduler.step() # step at end of epoch
            
            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc =  running_corrects.double() / dataset_sizes[phase]
            
            print("{} Loss: {:.4f} Acc: {:.4f}".format(phase, epoch_loss, epoch_acc))
            
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict()) # keep the best validation accuracy model
        print()
    time_elapsed = time.time() - since # slight error
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print("Best Val Acc: {:.4f}".format(best_acc))
    
    model.load_state_dict(best_model_wts)
    return model

In [None]:
model_ft = train_model(model, criterion, optimizer, exp_lr_scheduler) # now it is a lot faster
# I will come back after 50 epochs

In [None]:
test_loss = 0.0
class_correct = list(0 for i in range(len(classes)))
class_total = list(0 for i in range(len(classes)))
model.eval()

for data, target in tqdm(test_loader):
    data, target = data.to(device), target.to(device)
    with torch.no_grad(): # turn off autograd for faster testing
        output = model(data)
        loss = criterion(output, target)
    test_loss = loss.item() * data.size(0)
    _, pred = torch.max(output, 1)
    correct_tensor = pred.eq(target.data.view_as(pred))
    correct = np.squeeze(correct_tensor.cpu().numpy())
    if len(target) == 32:
        for i in range(32):
            label = target.data[i]
            class_correct[label] += correct[i].item()
            class_total[label] += 1

test_loss = test_loss / test_data_len
print('Test Loss: {:.4f}'.format(test_loss))
for i in range(len(classes)):
    if class_total[i] > 0:
        print("Test Accuracy of %5s: %2d%% (%2d/%2d)" % (
            classes[i], 100*class_correct[i]/class_total[i], np.sum(class_correct[i]), np.sum(class_total[i])
        ))
    else:
        print("Test accuracy of %5s: NA" % (classes[i]))
print("Test Accuracy of %2d%% (%2d/%2d)" % (
            100*np.sum(class_correct)/np.sum(class_total), np.sum(class_correct), np.sum(class_total)
        ))

In [None]:
# our model earns 93% test accuracy, which is very high. lets save it
example = torch.rand(1, 3, 224, 224)
traced_script_module = torch.jit.trace(model.cpu(), example)
traced_script_module.save("sunflower.pt")

In [None]:

import os
import tensorflow as tf
import numpy as np
import PIL
import matplotlib.pyplot as plt
from keras.applications.imagenet_utils import decode_predictions

In [None]:
!pip list | grep scikit-image
     

In [None]:

import tensorflow as tf
from typing import Tuple

EFFICIENTNET_VERSION = {
    'B0': {'model': tf.keras.applications.EfficientNetB0, 'img_size': 224},
    'B1': {'model': tf.keras.applications.EfficientNetB1, 'img_size': 240},
    'B2': {'model': tf.keras.applications.EfficientNetB2, 'img_size': 260},
    'B3': {'model': tf.keras.applications.EfficientNetB3, 'img_size': 300},
    'B4': {'model': tf.keras.applications.EfficientNetB4, 'img_size': 380},
    'B5': {'model': tf.keras.applications.EfficientNetB5, 'img_size': 456},
    'B6': {'model': tf.keras.applications.EfficientNetB6, 'img_size': 528},
    'B7': {'model': tf.keras.applications.EfficientNetB7, 'img_size': 600},
}


def get_efficientnet(
        version: str = 'B0', weights: str = 'imagenet'
) -> Tuple[tf.keras.Model, callable]:
    """
    Function to get one of the EfficientNet models with pretrained weights. For
    more information on image resolution and number of parameters for each of
    the EfficientNet models see:
    https://keras.io/examples/vision/image_classification_efficientnet_fine_tuning/
    """
    img_size = EFFICIENTNET_VERSION[version]['img_size']
    print(img_size)
    def preprocess_image(x):
        return tf.image.resize(x, (img_size, img_size))

    return (
        EFFICIENTNET_VERSION[version]['model'](weights=weights),
        preprocess_image
    )

In [None]:
os.chdir('..')

In [None]:

# Get model and corresponding preprocess function
model, preprocess_image = get_efficientnet(version='B0', weights='imagenet')

In [None]:

import numpy as np

In [None]:
img = PIL.Image.open('/content/drive/MyDrive/sunflowerdataset/Augmented Image/new_dataset_v1/test/Downy_mildew/DownyMildew (470).png') 
img = preprocess_image(np.array(img).astype(int))
plt.imshow(img.numpy().astype(int));

In [None]:
# Get predictions (expand dims for single image as model expects size (None, 224, 224, 3))
preds = model(np.expand_dims(img, 0))

In [None]:

# Decode it based on keras imagenet_utils package
decoded_preds = decode_predictions(preds.numpy())[0]
print(f"Predicted species: {decoded_preds[0][1]} | Probability: {decoded_preds[0][2]:.2f}")

In [None]:
import sys 
sys.path.insert(0, '/content')
%cd /content/
%pwd

In [None]:
Y_pred = preds(test_set, test_set.samples / batch_size)
val_preds = np.argmax(Y_pred, axis=1)
import sklearn.metrics as metrics
val_trues =test_set.classes
from sklearn.metrics import classification_report
print(classification_report(val_trues, val_preds))