<a href="https://colab.research.google.com/github/gdoorash/Cassava-Disease-Classification/blob/master/Notebooks/ConvAutoencoder_cassava.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
import numpy
import torch 
import torchvision 
import os
import matplotlib.pyplot as plt
import random 
import pdb
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, sampler
import imageio
import PIL
from IPython import display
from torchvision import datasets
import torch.nn as nn




In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [0]:
!unzip "/content/gdrive/My Drive/Colab Notebooks/Data/Cassava_t_v_t.zip"

In [0]:
%matplotlib inline

def show(img):
    """Show PyTorch tensor img as an image in matplotlib."""
    npimg = img.cpu().detach().numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)), interpolation='nearest')
    plt.grid(False)
    plt.gca().axis('off')

def display_thumb(img):
  display.display(transforms.Resize(128)(img))

In [0]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

## **Prepare the dataset** (balancing, spiliting, data augmentation, ..etc)

In [0]:
# Load Cassava dataset
traindata_folder_path = '/content/ammi-2020-convnets/train/train'
valdata_folder_path = '/content/ammi-2020-convnets/train/validation'
testdata_folder_path = '/content/ammi-2020-convnets/test/test'


# load the extra data for self-supervie (Autoencodrer)..
extra_folder_path = '/content/ammi-2020-convnets/extraimages'


In [0]:
# compute the mean and standard deviation of the all images (train, val, test)

In [0]:
######################### Transformations of images #################################################


inference_transform = transforms.Compose([
    transforms.Resize((500, 500)),
    transforms.CenterCrop(256),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

# For visualization purposes we'll create a separate transform that operates in image space.
inference_transform_show = transforms.Compose([
    transforms.Resize(500),
    transforms.CenterCrop(256),
])


########################################   Transform and put the data into Dataset object #########################################################################

# transformed_cassava_trian = datasets.ImageFolder(traindata_folder_path, transform=inference_transform)
# transformed_cassava_validation = datasets.ImageFolder( validdata_folder_path, transform=inference_transform) 
# transformed_cassava_test = datasets.ImageFolder(testdata_folder_path,  transform=inference_transform)

# ##########################################  Load the data into DataLoader #########################################################
# transformed_cassava_trainloader = torch.utils.data.DataLoader(transformed_cassava_train, batch_size=32, shuffle=True, num_workers=8)
# transformed_cassava_valloader = torch.utils.data.DataLoader(transformed_cassava_val, batch_size=32, shuffle=False, num_workers=8)
# transformed_cassava_testloader = torch.utils.data.DataLoader(transformed_cassava_test, batch_size=32, shuffle=False, num_workers=8)




########## Extra data set loading ##########################################################################

transformed_cassava_extra = datasets.ImageFolder(extra_folder_path, transform=inference_transform)
transformed_cassava_extraloader = torch.utils.data.DataLoader(transformed_cassava_extra , batch_size=32, shuffle=True, num_workers=8)



In [0]:
transformed_cassava_extraloader.dataset.classes

['extraimages']

In [0]:
len(train_loader.dataset)

NameError: ignored

## **Basic function for training & testing**

In [0]:
def accuracy(output, target, topk=(1,)):
    with torch.no_grad():
        maxk = max(topk)
        batch_size = target.size(0)

        _, pred = output.topk(maxk, 1, True, True)
        pred = pred.t()
        correct = pred.eq(target.view(1, -1).expand_as(pred))

        res = []
        for k in topk:
            correct_k = correct[:k].view(-1).float().sum(0, keepdim=True)
            res.append(correct_k.mul_(100.0 / batch_size))
        return res

In [0]:
def train (model, criterion, dataloader, optimizer, num_epochs):
    
    # put the model in train mode
    model.train()
    
    # move the model to GPU
    model.to(device)
    


    losses = []
    for epoch in range(num_epochs):
      
      print("Epoch {}".format(epoch))
      avg_loss = 0
      correct = 0

      for batch_index, (batch, target) in enumerate(dataloader):
          batch = batch.to(device)
          target = target.to(device)
          
          optimizer.zero_grad()

          # Forward step 
          output = model(batch)
          loss = criterion(output.to(device), target)
          
          # Backward step 
          loss.backward()
          #gradient step
          optimizer.step()
          

          # compute the accuracy
          prediction = output.argmax(dim=1, keepdim=True)
          correct += prediction.cpu().eq(target.cpu().view_as(prediction)).sum().item()

          # avarge of loss
          avg_loss += loss.item()
          # print("Loss : {}".format(loss.item()))

      # losses.append(avg_loss/len(batch))    
      percent = 100. * correct / len(dataloader.dataset)
      # Print out progress the end of epoch.
      print('Train Epoch: {} \tLoss: {:.6f} \t Accuracy: {:.2f}%'.format(
              epoch, avg_loss/len(batch), percent)
        )

def test (model, criterion, dataloader):
    
    model.eval()
    correct = 0
    # avg_loss = 0
    with torch.no_grad():
        
        for data, target in dataloader:
             
            data = data.to(device)
            target = target.to(device)
            
            output = model(data).to(device)
            loss = criterion(output, target) 
            
            prediction = output.argmax(dim=1, keepdim=True) 
            correct += prediction.cpu().eq(target.view_as(prediction)).sum().item()
            
            # avg_loss += loss.item() 
    percent = 100. * correct / len(dataloader.dataset)
    print(f'Accuracy: {correct}/{len(dataloader.dataset)} ({percent:.0f}%)')
   
    return percent  
    
    
    
        
        
            

In [0]:
def train_auto (model, criterion, dataloader, optimizer, num_epochs):
    
    # put the model in train mode
    model.train()
    
    # move the model to GPU
    model.to(device)
    


    losses = []
    for epoch in range(num_epochs):
      
      print("Epoch {}".format(epoch))
      avg_loss = 0
      correct = 0

      for batch_index, (batch, _) in enumerate(dataloader):
          batch = batch.to(device)
          
          
          optimizer.zero_grad()

          # Forward step 
          output = model(batch)
          loss = criterion(output.to(device), batch)
          
          # Backward step 
          loss.backward()
          #gradient step
          optimizer.step()
          

          # compute the accuracy
          prediction = output.argmax(dim=1, keepdim=True)
          correct += prediction.cpu().eq(target.cpu().view_as(prediction)).sum().item()

          # avarge of loss
          avg_loss += loss.item()
          # print("Loss : {}".format(loss.item()))

      # losses.append(avg_loss/len(batch))    
      percent = 100. * correct / len(dataloader.dataset)
      # Print out progress the end of epoch.
      print('Train Epoch: {} \tLoss: {:.6f} \t Accuracy: {:.2f}%'.format(
              epoch, avg_loss/len(batch), percent)
        )

def test_auto (model, criterion, dataloader):
    
    model.eval()
    correct = 0
    # avg_loss = 0
    with torch.no_grad():
        
        for data, target in dataloader:
             
            data = data.to(device)
            target = target.to(device)
            
            output = model(data).to(device)
            loss = criterion(output, target) 
            
            prediction = output.argmax(dim=1, keepdim=True) 
            correct += prediction.cpu().eq(target.view_as(prediction)).sum().item()
            
            # avg_loss += loss.item() 
    percent = 100. * correct / len(dataloader.dataset)
    print(f'Accuracy: {correct}/{len(dataloader.dataset)} ({percent:.0f}%)')
   
    return percent  
    
    
    
        
        
            

## **Model Architecture (CNN+Decoder)** 
Train the CNN on the unlabel data to learn the featuers, then train it on the prepared train data

In [0]:
class CNNModel(nn.Module):

  def __init__(self):
    super(CNNModel, self).__init__()
    self.block1 = nn.Sequential(
        nn.Conv2d(3, 16, kernel_size=(3, 3), stride=(1, 1), bias=False),
        nn.BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
        nn.ReLU(inplace=True),
        nn.Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), bias=False),
        nn.MaxPool2d(3),
        nn.ReLU(inplace=True),
    )

    self.block2 = nn.Sequential(
        nn.Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False),
        nn.BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
        nn.ReLU(inplace=True),
        nn.Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False),
        nn.BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
    )
    
  

 
    self.avpool = nn.AdaptiveAvgPool2d(output_size=(1, 1))
    # self.fc  =  nn.Linear(in_features=256, out_features=5, bias=True)


  def forward(self, x):
      x = self.block1(x)
      x = self.block2(x)
    
      return x

In [0]:
class ConvAutoencoder(nn.Module):
  def __init__(self):
      super(ConvAutoencoder, self).__init__()
      ## encoder layers ##
      
      self.encoder = CNNModel()
      
      ## decoder layers ##
      self.decoder = nn.Sequential(
          nn.Conv2d(32,  8, kernel_size=3, stride=1),
          nn.UpsamplingNearest2d(180),
          nn.Conv2d(8, 3, 3),
          nn.UpsamplingNearest2d(256),
      )


  def forward(self, x):
      ## encode ##
      x = self.encoder(x)
      

      ## decode ##
      x = self.decoder(x)
              
      return x


In [0]:
# Model Architectuer..
modelCNN = CNNModel()


In [0]:
sum(p.numel() for p in convAout.parameters() if p.requires_grad)

26163

## **Run the Model**  (checkpoint save and load, optimizer,...)

In [0]:
model = ConvAutoencoder()

In [0]:
path = "/content/gdrive/My Drive/Colab Notebooks/model_data_checkpoints/a284"
torch.save(model.state_dict(), path)

In [0]:
# load the checkpoint...
model = model.load_state_dict(torch.load("/content/gdrive/My Drive/Colab Notebooks/model_data_checkpoints/a284"))
# model

FileNotFoundError: ignored

In [0]:
if device == 'cuda':
    model = model.cuda()
    
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
train_auto(model, nn.MSELoss(), transformed_imagenet_loader, optimizer, 100)

NameError: ignored