In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, TensorDataset
import torchvision
from matplotlib import pyplot as plt
%matplotlib inline
import PIL
from PIL import Image
import numpy as np
from torch.utils.tensorboard import SummaryWriter
import os
import math
import time

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
cd "/content"

#Download Data from Kaggle

In [None]:
%%bash 
#run like a terminal
pip uninstall -y kaggle
pip install --upgrade pip
pip install kaggle==1.5.6

In [None]:
!kaggle datasets list -s plant

In [None]:
!kaggle datasets download -d vbookshelf/v2-plant-seedlings-dataset

Downloading v2-plant-seedlings-dataset.zip to /content
100% 3.18G/3.19G [01:11<00:00, 59.9MB/s]
100% 3.19G/3.19G [01:11<00:00, 48.0MB/s]


In [None]:
!unzip -q v2-plant-seedlings-dataset.zip -d "/content/data"

In [None]:
img_path=os.path.join("/content",
                      "data",
                      "nonsegmentedv2")

# Exploratory Data Analysis

In [None]:
figure,axis=plt.subplots(nrows=3,ncols=4,figsize=(10,10))
i=0
j=0
for filename in os.listdir(img_path):
  curr_path=img_path+"/"+filename
  label=filename

  for img in os.listdir(curr_path):
    image_path=curr_path+"/"+img
    curr_img=np.asarray(Image.open(image_path))
    print(curr_img.shape)
    #plt.imshow(curr_img)
    axis[i][j].imshow(curr_img)
    axis[i][j].set_title(label)
    j+=1
    if j==4:
      i+=1
      j=0  
    break

In [None]:
label_list,img_count_list=[],[]
for filename in os.listdir(img_path):
  curr_path=img_path+"/"+filename
  label=filename
  label_list.append(label)
  img_count_list.append(len(os.listdir(curr_path)))

In [None]:
plt.bar(x=label_list,
        height=img_count_list)
plt.xticks(rotation=90)

In [None]:
percentage_class_dict={}
for i in range(12):
  percentage_class_dict[label_list[i]]=round((img_count_list[i]/sum(img_count_list))*100,2)
percentage_class_dict

In [None]:
#To get the mean and standard deviation of dataset
first_channel,second_channel,third_channel,num=[],[],[],0
for filename in os.listdir(img_path):
  curr_path=img_path+"/"+filename
  for img in os.listdir(curr_path):
    image_path=curr_path +"/" +img
    img = np.array(PIL.Image.open(image_path).resize((244,244),Image.BILINEAR))
    first_channel.append(img[:,:,0])
    second_channel.append(img[:,:,1])
    third_channel.append(img[:,:,2])
    num+=1
  print(filename,"done")

In [None]:
mean,std=[np.mean(first_channel)/255.,np.mean(second_channel)/255.,np.mean(third_channel)/255.],[np.std(first_channel)/255.,np.std(second_channel)/255.,np.std(third_channel)/255.]

In [None]:
print(mean,std)

In [None]:
mean,std=[0.32878234546825347, 0.28885041498392117, 0.20677955249812788],[0.1033289967821012, 0.1086720358391526, 0.12568620125984942]

In [None]:
#This function returns a nested list of length 2, containing a list of image paths and its labels
# training= 80%
# validation= 10%
# test set= 10%
def prepare_data(path):
  imgs_train,labels_train=[],[]
  imgs_valid,labels_valid=[],[]
  imgs_test,labels_test=[],[]
  num_grayscale=0

  for filename in os.listdir(path):
    train,valid=True,True
    label=filename
    curr_path=path+"/"+label
    

    num_images= len(os.listdir(curr_path))
    print(num_images)
    num_train=math.floor(num_images*0.80)
    num_valid=math.floor(num_images*0.10)
    n=0

    for img in os.listdir(curr_path):
      my_img=np.asarray(Image.open(curr_path+"/"+img))

      #Ensures that the images are all non grayscale images
      if my_img.shape[2]==3:
        if train:
          imgs_train.append(curr_path+"/"+img)
          labels_train.append(label)
          n+=1

          if n==num_train:
            n=0
            train=False

        elif valid:
          imgs_valid.append(curr_path+"/"+img)
          labels_valid.append(label)
          n+=1

          if n==num_valid:
            n=0
            valid=False

        else:
          imgs_test.append(curr_path+"/"+img)
          labels_test.append(label)

      else:
        num_grayscale+=1

  return [imgs_train,labels_train],[imgs_valid,labels_valid],[imgs_test,labels_test],num_grayscale

In [None]:
train_prior,valid_prior,test_prior,num_gs_images=prepare_data(img_path)

In [None]:
print(num_gs_images)

In [None]:
class_dictionary={'Maize':0,
                  'Cleavers':1,
                  'Sugar beet':2,
                  'Common Chickweed':3,
                  'Black-grass':4,
                  'Scentless Mayweed':5,
                  'Small-flowered Cranesbill':6,
                  'ShepherdтАЩs Purse':7,
                  'Loose Silky-bent':8,
                  'Common wheat':9,
                  'Charlock':10,
                  'Fat Hen':11}

In [None]:
class GetDataset(torch.utils.data.Dataset):
  def __init__(self,prior,mean,std,dict,set):
    self.img_paths=prior[0]
    self.labels=prior[1]
    self.set=set
    self.mean=mean
    self.std=std
    self.dict=dict
    if self.set=="train":
      self.transform=transforms.Compose([
                                      transforms.Resize((512,512)),
                                      transforms.RandomHorizontalFlip(0.5),
                                      transforms.ColorJitter(brightness=0.4,contrast=0.4),
                                      transforms.ToTensor(),
                                      transforms.Normalize(self.mean,self.std)])
      
    else:
      self.transform=transforms.Compose([transforms.Resize((512,512)),
                                        transforms.ToTensor(),
                                        transforms.Normalize(self.mean,self.std)])   

  def __getitem__(self,index):
    img_path=self.img_paths[index]
    label=self.labels[index]
    label=self.dict.get(label)
    img=Image.open(img_path)
    img=self.transform(img)
    
    return [img,label]

  def __len__(self):
    return len(self.labels)

In [None]:
train_dataset=GetDataset(train_prior,mean,std,class_dictionary,set="train")
valid_dataset=GetDataset(valid_prior,mean,std,class_dictionary,set="valid")
test_dataset=GetDataset(test_prior,mean,std,class_dictionary,set="test")

In [None]:
print(len(train_dataset))
print(len(valid_dataset))
print(len(test_dataset))

In [None]:
train_loader=DataLoader(train_dataset,batch_size=32,num_workers=4,shuffle=True)
valid_loader=DataLoader(valid_dataset,batch_size=32,num_workers=4,shuffle=False)
test_loader=DataLoader(test_dataset,batch_size=32,num_workers=4,shuffle=False)

In [None]:
#To visualise images after transformation in train set
for i,data in enumerate(train_loader):
  img,label=data
  plt.imshow(np.transpose(np.array(img[10]),[1,2,0]))
  print(label[3])
  break

In [None]:
#To visualise images after transformation in validation set
for i,data in enumerate(valid_loader):
  img,label=data
  plt.imshow(np.transpose(np.array(img[3]),[1,2,0]))

#Training the Model

In [None]:
resnet50=torchvision.models.resnet50(pretrained=True)

In [None]:
for name, child in resnet50.named_children():
    for name2, params in child.named_parameters():
        print(name, name2)

In [None]:
class Flatten(nn.Module):
  def forward(self,x):
    return x.reshape(x.size(0),-1)

In [None]:
net_num_features=resnet50.fc.in_features

myresnet50=nn.Sequential(resnet50.conv1,
                         resnet50.bn1,
                         resnet50.relu,
                         resnet50.maxpool,
                         nn.Sequential(*resnet50.layer1),
                         nn.Sequential(*resnet50.layer2),
                         nn.Sequential(*resnet50.layer3),
                         nn.Sequential(*resnet50.layer4),
                         resnet50.avgpool,
                         Flatten(),
                         nn.Dropout(0.5),
                         nn.Linear(net_num_features,12,bias=True))

In [None]:
for name, child in myresnet50.named_children():
    for name2, params in child.named_parameters():
        print(name, name2)

In [None]:
myresnet50

In [None]:
for param in myresnet50.parameters():
  param.requires_grad=False

#Unfreeze layers from the bottom
myresnet50[11].weight.requires_grad=True
myresnet50[11].bias.requires_grad=True

myresnet50[7][2].conv3.weight.requires_grad=True
myresnet50[7][2].bn3.weight.requires_grad=True
myresnet50[7][2].bn3.bias.requires_grad=True

myresnet50[7][2].conv2.weight.requires_grad=True
myresnet50[7][2].bn2.weight.requires_grad=True
myresnet50[7][2].bn2.bias.requires_grad=True

myresnet50[7][2].conv1.weight.requires_grad=True
myresnet50[7][2].bn1.weight.requires_grad=True
myresnet50[7][2].bn1.bias.requires_grad=True

myresnet50[7][1].conv3.weight.requires_grad=True
myresnet50[7][1].bn3.weight.requires_grad=True
myresnet50[7][1].bn3.bias.requires_grad=True

myresnet50[7][1].conv2.weight.requires_grad=True
myresnet50[7][1].bn2.weight.requires_grad=True
myresnet50[7][1].bn2.bias.requires_grad=True

myresnet50[7][1].conv1.weight.requires_grad=True
myresnet50[7][1].bn1.weight.requires_grad=True
myresnet50[7][1].bn1.bias.requires_grad=True


In [None]:
myresnet50=myresnet50.to(torch.device("cuda:0"))

In [None]:
class_weights=[]
class_list=list(class_dictionary.keys())
for i in range(12):
  curr_class=class_list[i]
  class_weights.append(1/percentage_class_dict[curr_class])

class_weights=torch.tensor(class_weights)  
class_weights=class_weights.to(torch.device("cuda:0"))


In [None]:
class_weights

In [None]:
loss_func=nn.CrossEntropyLoss(weight=class_weights)
device=torch.device("cuda:0")

In [None]:
#Train function with scheduled learning rate
def train_scheduled(net,valid,loss_func,epochs,learning_rate,weight_decay,device,run):
  print("Training the model...")
  net.to(device)
  optimizer=optim.Adam(net.parameters(),lr=learning_rate,weight_decay=weight_decay)
  scheduler=optim.lr_scheduler.CosineAnnealingLR(optimizer,T_max=30)
  comment=f'learning_rate={learning_rate},weight_decay ={weight_decay},run= {run}'
  writer=SummaryWriter(comment=comment)
  start=time.time()

  for epoch in range(epochs):
    train_loss,valid_loss,train_acc,valid_acc,n=0.0,0.0,0.0,0.0,0.0
    for i,data in enumerate(train_loader):
      img,label=data
      img,label=img.to(device),label.to(device)
      net.train()
      optimizer.zero_grad()
      label_preds=net(img)
      loss=loss_func(label_preds,label)
      loss.backward()
      optimizer.step()

      with torch.no_grad():
        label=label.long()
        train_loss+=loss.float()
        train_acc+=torch.sum(torch.argmax(label_preds,dim=1)==label).float()
        n+=label.shape[0]

    train_loss=train_loss/n
    train_acc=train_acc/n
    n=0

    if valid:
      net.eval()
      for i,data in enumerate(valid_loader):
        img,label=data
        img,label=img.to(device),label.to(device)
        label_preds=net(img)
        loss=loss_func(label_preds,label)
        valid_loss+=loss
        valid_acc+=torch.sum(torch.argmax(label_preds,dim=1)==label).float()
        n+=label.shape[0]

    valid_loss=valid_loss/n
    valid_acc=valid_acc/n

    writer.add_scalars(main_tag="Loss",tag_scalar_dict={"Train loss":train_loss,
                              "Valid loss":valid_loss},global_step=epoch)
    writer.add_scalars(main_tag="Accuracy",tag_scalar_dict={"Train accuracy":train_acc,
                              "Valid accuracy":valid_acc},global_step=epoch)
    writer.add_scalar("Learning rate",scheduler.get_last_lr()[0],epoch)
  
    #if epoch==0 or (epoch+1)%5==0:
    if True:
      print("Epoch %d: Learning rate is: %f, Train Accuracy is: %f, Train Loss is: %f, Valid Accuracy is %f, Valid Loss is %f" %(epoch+1,scheduler.get_last_lr()[0],train_acc,train_loss,valid_acc,valid_loss))
    scheduler.step()
  end=time.time()
  Runtime=end-start
  print("Runtime is: %f" % (Runtime))

  writer.close()

In [None]:
#Train function where learning rate decreases by a factor of 10 every 5 epochs
def train(net,valid,loss_func,epochs,learning_rate,weight_decay,device,run):
  print("Training the model...")
  net.to(device)
  optimizer=optim.Adam(net.parameters(),lr=learning_rate,weight_decay=weight_decay)
  comment=f'learning_rate={learning_rate},weight_decay ={weight_decay},run= {run}'
  writer=SummaryWriter(comment=comment)
  start=time.time()

  for epoch in range(epochs):
    train_loss,valid_loss,train_acc,valid_acc,n=0.0,0.0,0.0,0.0,0.0
    if (epoch+1)%5==0:
      learning_rate=learning_rate/10
      optimizer=optim.Adam(net.parameters(),lr=learning_rate,weight_decay=weight_decay)
    for i,data in enumerate(train_loader):
      img,label=data
      img,label=img.to(device),label.to(device)
      net.train()
      optimizer.zero_grad()
      label_preds=net(img)
      loss=loss_func(label_preds,label)
      loss.backward()
      optimizer.step()

      with torch.no_grad():
        label=label.long()
        train_loss+=loss.float()
        train_acc+=torch.sum(torch.argmax(label_preds,dim=1)==label).float()
        n+=label.shape[0]

    train_loss=train_loss/n
    train_acc=train_acc/n
    n=0

    if valid:
      net.eval()
      with torch.no_grad():
        for i,data in enumerate(valid_loader):
          img,label=data
          img,label=img.to(device),label.to(device)
          label_preds=net(img)
          loss=loss_func(label_preds,label)
          valid_loss+=loss
          valid_acc+=torch.sum(torch.argmax(label_preds,dim=1)==label).float()
          n+=label.shape[0]

    valid_loss=valid_loss/n
    valid_acc=valid_acc/n

    writer.add_scalars(main_tag="Loss",tag_scalar_dict={"Train loss":train_loss,
                              "Valid loss":valid_loss},global_step=epoch)
    writer.add_scalars(main_tag="Accuracy",tag_scalar_dict={"Train accuracy":train_acc,
                              "Valid accuracy":valid_acc},global_step=epoch)
    writer.add_scalar("Learning rate",learning_rate,epoch)
    end=time.time()
    Runtime=end-start

    if True:
      print("Epoch %d:, learning rate is: %f, Train Accuracy is: %f, Train Loss is: %f, Valid Accuracy is %f, Valid Loss is %f" %(epoch+1,learning_rate,train_acc,train_loss,valid_acc,valid_loss))
      print("Run time is :%f"%(Runtime))

  writer.close()

In [None]:
#Initialising weights for final fully connected layer.
nn.init.xavier_uniform_(myresnet50[11].weight);

In [None]:
train(myresnet50,True,loss_func,30,0.01,0.00001,device,3)

# Results

In [None]:
#Function to get test accuracy, and list of wrongly classified images and labels             
def test_accuracy():
  myresnet50.eval()
  test_acc,n=0.0,0
  with torch.no_grad():
    for i,data in enumerate(test_loader):
      img,label=data
      img,label=img.to(torch.device("cuda:0")),label.to(torch.device("cuda:0"))
      label_pred=myresnet50(img)
      test_acc+=torch.sum(torch.argmax(label_pred,dim=1)==label).float()
      n+=label.shape[0]
      
      #To get wrongly classified images:
      labels=torch.argmax(label_pred,dim=1)
      for i in range(label.shape[0]):
        if labels[i]!=label[i]:
          wrongly_classified.append([img[i],labels[i],label[i]])
          class_count[int(labels[i])]+=1

  test_acc=test_acc/n
  print("Test Accuracy is: %f"% (test_acc))

In [None]:
wrongly_classified=[]
class_count={0:0,
             1:0,
             2:0,
             3:0,
             4:0,
             5:0,
             6:0,
             7:0,
             8:0,
             9:0,
             10:0,
             11:0}
             
test_accuracy()

In [None]:
class_count

In [None]:
index_to_labels={0:'Maize',
                 1:'Cleavers',
                 2:'Sugar beet',
                 3:'Common Chickweed',
                 4: 'Black-grass',
                 5:'Scentless Mayweed',
                 6: 'Small-flowered Cranesbill',
                 7:'ShepherdтАЩs Purse',
                 8:'Loose Silky-bent',
                 9:'Common wheat',
                 10:'Charlock',
                 11:'Fat Hen'}

In [None]:
#To unnormalize image for plotting
unnormalize= transforms.Normalize(
   mean=[-0.32878234546825347/0.1033289967821012, -0.28885041498392117/0.1086720358391526, -0.20677955249812788/0.12568620125984942],
   std=[1/0.1033289967821012, 1/0.1086720358391526, 1/0.12568620125984942]
)

In [None]:
#Plot out images that are wrongly classified as well as their predicted and actual label
fig,axis=plt.subplots(nrows=4,ncols=5,figsize=(22,15))
r=0
c=0
for i in range(len(wrongly_classified)):
  img=np.asarray(unnormalize(wrongly_classified[i][0]).to(torch.device("cpu")))
  index=wrongly_classified[i][1]
  actual_index=wrongly_classified[i][2]
  label=index_to_labels.get(int(index))
  actual_label=index_to_labels.get(int(actual_index))
  axis[r][c].imshow(np.transpose(img,[1,2,0]))
  axis[r][c].imshow(np.transpose(img,[1,2,0]))
  axis[r][c].set_title("Predicted: "+label+"\n"+"Actual "+actual_label)
  axis[r][c].set_xticks([])
  axis[r][c].set_yticks([])
  c+=1
  if c==5:
      r+=1
      c=0


In [None]:
%reload_ext tensorboard
%tensorboard --logdir=runs

In [None]:
torch.save(myresnet50.state_dict(),"/content/drive/My Drive/plant_seedling/paths/37run.pth")

In [None]:
from sklearn.metrics import confusion_matrix
from sklearn.metrics import precision_score

In [None]:
#Function to get 2 tensors of actual and predicted classes of all images in the test set
def actual_predicted():
  myresnet50.eval()
  actual=None
  predicted=None
  with torch.no_grad():
    for i,data in enumerate(test_loader):
      img,label=data
      img,label=img.to(torch.device("cuda:0")),label.to(torch.device("cuda:0"))
      label_pred=myresnet50(img)
      label_pred=torch.argmax(label_pred,dim=1)

      if actual==None:
        actual=label
      else:
        actual=torch.cat((actual,label))
      
      if predicted==None:
        predicted= label_pred
      else:
        predicted=torch.cat((predicted,label_pred))

  return actual.to(torch.device("cpu")),predicted.to(torch.device("cpu"))

In [None]:
actual,predicted=actual_predicted()

In [None]:
cm=confusion_matrix(actual,predicted) # np array 
print(cm)

In [None]:
precision_score(actual,
                predicted,
                average="macro")

In [None]:
# Confusion Matrix for predicted and actual classes for images in the test set
figure,axis=plt.subplots(figsize=(10,10))
mat=axis.imshow(cm)

axis.set_xticks(np.arange(12))
axis.set_yticks(np.arange(12))

axis.set_xticklabels(list(class_dictionary.keys()),
                     rotation=90)
axis.set_yticklabels(list(class_dictionary.keys()))

for i in range(12):
  for j in range(12):
    axis.text(i,
              j,
              cm[i,j],
              ha="center",
              va="center",
              color="crimson",
              fontsize=15)

axis.set_title("Confusion matrix")
plt.ylabel("Predicted")
plt.xlabel("Actual")