In [None]:
#Imports
import os
import sys
import glob
import torch
import torchvision

import numpy    as np
import datetime as dt
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot   as plt

from PIL               import Image
from torch.utils.data  import Dataset
from torch.autograd    import Variable
from torch.optim       import lr_scheduler

from torch.utils.data  import Dataset, DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
from torchvision       import transforms, datasets, models
from os                import listdir, makedirs, getcwd, remove
from os.path           import isfile, join, abspath, exists, isdir, expanduser
import pathlib

%matplotlib inline

In [None]:
from __future__ import print_function, division
import torch.optim as optim
from torch.optim import lr_scheduler
import torch.backends.cudnn as cudnn

import torchvision
import matplotlib.pyplot as plt
import time
import os
import copy
from torch.optim import Adam

cudnn.benchmark = True
plt.ion()  

In [None]:
data_path = "/kaggle/input/ammi-2022-convnets/"
train_path = join(data_path, "train/train")
test_path = join(data_path,"test/test")
extraimage_path = join(data_path, "extraimages/extraimages")
# train_path="https://drive.google.com/drive/folders/1h6yxl8HS7aPsgeJuSdf2_77S4x_yRzoF?usp=sharing"
path="/kaggle/input/ammi-2022-convnets/train"

In [None]:
mean=[0.5,0.5,0.5]#[0.485, 0.456, 0.406]
std=[0.5,0.5,0.5]#[0.229, 0.224, 0.225]

# Do data transforms here, Try many others
train_transforms = transforms.Compose([#transforms.RandomRotation(30),
                                       transforms.Resize((150,150)),
                                       #transforms.RandomResizedCrop(224),
                                       transforms.RandomHorizontalFlip(),
                                       transforms.ToTensor(),
                                       transforms.Normalize(mean=mean, std=std)])

test_transforms = transforms.Compose([ #transforms.Resize(256),
                                      transforms.Resize((150,150)),
                                        transforms.RandomHorizontalFlip(),
                                       #transforms.RandomResizedCrop(224),
                                       transforms.ToTensor(),
                                     transforms.Normalize(mean=mean, std=std)])

normalize = transforms.Normalize(mean=mean, std=std)

In [None]:
train_data=datasets.ImageFolder(root=train_path, transform=train_transforms)
test_data=datasets.ImageFolder(root=test_path, transform=test_transforms)

In [None]:
root=pathlib.Path(train_path)
Classes=sorted([j.name.split('/')[-1] for j in root.iterdir()])

In [None]:
print(Classes)

In [None]:
validation_split = .2
shuffle_dataset = True
random_seed= 42

# Creating data indices for training and validation splits:
dataset_size = len(train_data)
indices = list(range(dataset_size))
split = int(np.floor(validation_split * dataset_size))

if shuffle_dataset :
    np.random.seed(random_seed)
    np.random.shuffle(indices)

train_indices, val_indices = indices[split:], indices[:split]
len(train_indices),len(val_indices)

In [None]:
train_sampler = SubsetRandomSampler(train_indices)
valid_sampler = SubsetRandomSampler(val_indices)


train_loader = torch.utils.data.DataLoader(train_data, batch_size=256,
                                             sampler=train_sampler)
valid_loader = torch.utils.data.DataLoader(train_data, batch_size=256,
                                             sampler=valid_sampler)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=32)

In [None]:
train_count= len(train_indices)
test_count= len(val_indices)
print(train_count,test_count)

In [None]:
# Device configuration
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [None]:
import torchvision.models as models

In [None]:
class ConvNet(nn.Module):
    
    def __init__(self,num_classes=5):
        super(ConvNet,self).__init__()
        
        #Output
        #((w-f+2P)/s)+1
        
        
        #input shape =(256,3,150,150)
        
        self.conv1=nn.Conv2d(in_channels=3,out_channels=12,kernel_size=3,stride=1,padding=1)
        #shape(256,12,150,150)
        self.bn1=nn.BatchNorm2d(num_features=12)
        
        self.relu1=nn.ReLU()
        #shape(256,12,75,75)
        
        self.pool=nn.MaxPool2d(kernel_size=2)
        #Reduce he image size by factor of 2
        #shape (256,12,75,75)
        
        
        self.conv2=nn.Conv2d(in_channels=12,out_channels=20,kernel_size=3,stride=1,padding=1)
        #shape(256,20,150,150)
        
        self.relu2=nn.ReLU()
        #shape(256,20,75,75)
        
        
        
        self.conv3=nn.Conv2d(in_channels=20,out_channels=32,kernel_size=3,stride=1,padding=1)
        #shape(256,32,75,75)
        
        self.bn3=nn.BatchNorm2d(num_features=32)
        #shape(256,32,75,75)
        
        self.relu3=nn.ReLU()
        #shape(256,32,75,75)
        
        
        
        self.fc=nn.Linear(in_features=32*75*75,out_features=num_classes)
        
        self.dense = models.densenet161()
        
        
        
    #feed  forward
    
    def forward(self,input):
        output=self.conv1(input)
        output=self.bn1(output)
        output=self.relu1(output)
        
        output=self.pool(output)
        
        output=self.conv2(output)
        output=self.relu2(output)
        
        
        output=self.conv3(output)
        output=self.bn3(output)
        output=self.relu3(output)
        
        
        
        #Above output will be in matrix form, with shape (256,32,75,75)
        
        
        output=output.view(-1,32*75*75)
        
        output=self.fc(output)
        
        output=self.dense(output)
        
        return output
        

In [None]:
model=ConvNet(num_classes=5).to(device)

In [None]:
optimizer=Adam(model.parameters(),lr=0.001,weight_decay=0.0001)
loss_function=nn.CrossEntropyLoss()

In [None]:
num_epochs=10


In [None]:
best_accuracy=0.0

for epoch in range(num_epochs):
    
    #Evaluation and training on training dataset
    model.train()
    train_accuracy=0.0
    train_loss=0.0
    
    for i, (images,labels) in enumerate(train_loader):
        if torch.cuda.is_available():
            images=Variable(images.cuda())
            labels=Variable(labels.cuda())
            
        optimizer.zero_grad()
        
        outputs=model(images)
        loss=loss_function(outputs,labels)
        loss.backward()
        optimizer.step()
        
        
        train_loss+= loss.cpu().data*images.size(0)
        _,prediction=torch.max(outputs.data,1)
        
        train_accuracy+=int(torch.sum(prediction==labels.data))
        
    train_accuracy=train_accuracy/train_count
    train_loss=train_loss/train_count
    
    
    # Evaluation on testing dataset
    model.eval()
    
    test_accuracy=0.0
    for i, (images,labels) in enumerate(valid_loader):
        if torch.cuda.is_available():
            images=Variable(images.cuda())
            labels=Variable(labels.cuda())
            
        outputs=model(images)
        _,prediction=torch.max(outputs.data,1)
        test_accuracy+=int(torch.sum(prediction==labels.data))
    
    test_accuracy=test_accuracy/test_count
    
    
    print('Epoch: '+str(epoch)+' Train Loss: '+str(train_loss)+' Train Accuracy: '+str(train_accuracy)+' Test Accuracy: '+str(test_accuracy))
    
    #Save the best model
    if test_accuracy>best_accuracy:
        torch.save(model.state_dict(),'best_checkpoint.model')
        best_accuracy=test_accuracy
    
       

In [None]:
checkpoint=torch.load('best_checkpoint.model')
model=ConvNet(num_classes=5)
model.load_state_dict(checkpoint)
model.eval()

In [None]:
transformer=transforms.Compose([
    transforms.Resize((150,150)),
    transforms.ToTensor(),  #0-255 to 0-1, numpy to tensors
    transforms.Normalize([0.5,0.5,0.5], # 0-1 to [-1,1] , formula (x-mean)/std
                        [0.5,0.5,0.5])
])

In [None]:
def prediction(img_path,transformer):
    
    image=Image.open(img_path)
    
    image_tensor=transformer(image).float()
    
    
    image_tensor=image_tensor.unsqueeze_(0)
    
    if torch.cuda.is_available():
        image_tensor.cuda()
        
    input=Variable(image_tensor)
    
    
    output=model(input)
    
    index=output.data.numpy().argmax()
    
    pred=Classes[index]
    
    return pred

In [None]:
images_path=glob.glob(test_path+'/**/*.jpg')


In [None]:
pred_dict={}

for i in images_path:
    pred_dict[i[i.rfind('/')+1:]]=prediction(i,transformer)
    
pred_dict

In [None]:
import pandas as pd

In [None]:
image_name=pred_dict.keys()
#image_name
image_pred=pred_dict.values()
#image_pred

In [None]:
donne=pd.read_csv("/kaggle/input/ammi-2022-convnets/sample_submission_file.csv")
donne.head(5)

In [None]:
# dictionnary to dataframe
df_1=pd.Series(image_name).to_frame()
df_2=pd.Series(image_pred).to_frame()
#dataframe to csv
data_pred= pd.concat([df_2,df_1],axis=1)
type(data_pred)
data_pred.columns=["Category","Id"]
data_pred.head(5)
#from pathlib import Path  
import os  
os.makedirs("C:/Users/LENOVO/Desktop/project_cassava", exist_ok=True) 
data_pred.to_csv('C:/Users/LENOVO/Desktop/project_cassava/out.csv',index=False) 
#filepath = Path("C:/Users/LENOVO/Desktop/project_cassava/out.csv")  
#filepath.parent.mkdir(parents=True, exist_ok=True)  
#data_pred.to_csv(filepath) 


In [None]:
donnee=pd.read_csv("C:/Users/LENOVO/Desktop/project_cassava/out.csv")
donnee.head(5)

In [None]:
  def train_model(model, criterion, optimizer, scheduler,train_loader,valid_loader, num_epochs=25):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
                data_loader= train_loader
            else:
                model.eval()   # Set model to evaluate mode
                data_loader= valid_loader

            running_loss = 0.0
            running_corrects = 0
            dataset_sizes=0
            # Iterate over data.
            for batch_idx, (inputs, labels) in enumerate(data_loader):
                dataset_sizes+=batch_idx
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes
            epoch_acc = running_corrects.double() / dataset_sizes

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        print()

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc:4f}')

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model


In [None]:
model_ft = models.resnet18(pretrained=True)
num_ftrs = model_ft.fc.in_features
# Here the size of each output sample is set to 2.
# Alternatively, it can be generalized to nn.Linear(num_ftrs, len(class_names)).
model_ft.fc = nn.Linear(num_ftrs, 5)

model_ft = model_ft.to(device)

criterion = nn.CrossEntropyLoss()

# Observe that all parameters are being optimized
optimizer_ft = torch.optim.SGD(model_ft.parameters(), lr=0.001, momentum=0.9)

# Decay LR by a factor of 0.1 every 7 epochs
exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)

In [None]:
model_ft = train_model(model_ft, criterion, optimizer_ft, exp_lr_scheduler,train_loader,valid_loader, num_epochs=25)

In [None]:
def test(model, data_loader):
    """Measures the accuracy of a model on a data set.""" 
    # Make sure the model is in evaluation mode.
    model.eval()
    correct = 0
    print('----- Model Evaluation -----')
    # We do not need to maintain intermediate activations while testing.
    with torch.no_grad():
        
        # Loop over test data.
        for features, target in data_loader:
          
            # Forward pass.
            output = model(features.to(device))
            
            # Get the label corresponding to the highest predicted probability.
            pred = output.argmax(dim=1, keepdim=True)
            
            # Count number of correct predictions.
            correct += pred.cpu().eq(target.view_as(pred)).sum().item()

    # Print test accuracy.
    percent = 100. * correct / len(data_loader.dataset)
    print(f'Test accuracy: {correct} / {len(data_loader.dataset)} ({percent:.0f}%)')
    torch.save(model.state_dict(), 'model.ckpt')
    return percent

In [None]:
test(model_ft, valid_loader)