In [None]:
import numpy as np
import pandas as pd
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader, random_split
from google.colab import files, drive

In [None]:
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [None]:
path = '/content/gdrive/MyDrive/Primary_Dataset_with_Label.pt'
Primary_Dataset_with_Label = torch.load(path)

In [None]:
class cornSeeds(Dataset):
    def __init__(self, data):
        self.X = data['images'].type(torch.FloatTensor)
        self.y = data['label']   
 
    # The __len__ function returns the number of samples in our dataset.
    def __len__(self):
        return self.X.size()[0]
 
    # The __getitem__ function loads and returns a sample from the dataset at the given index idx
    def __getitem__(self, idx):
        return [self.X[idx], self.y[idx]]
 
    # get indexes for train and test rows
    def get_splits(self, n_test = 0.25):
        # determine the split sizes
        test_size = round(n_test * len(self.X))
        train_size = len(self.X) - test_size
        # fix the generator for reproducible results
        return random_split(self, [train_size, test_size], generator = torch.Generator().manual_seed(123))

In [None]:
class CNN(nn.Module):
    # define model elements
    def __init__(self):
        super(CNN, self).__init__()
        self.convLayer = nn.Sequential(
            nn.Conv2d(in_channels = 3, 
                                 out_channels = 64, 
                                 kernel_size = (3, 3),
                                 stride = 1,
                                 padding = 2),

            nn.Conv2d(in_channels = 64, 
                                 out_channels = 64, 
                                 kernel_size = (3, 3),
                                 stride = 1,
                                 padding = 2), 
            
            nn.ReLU(),

            nn.MaxPool2d(kernel_size = (3,3), 
                                  stride = (2,2)),    
            
            nn.ReLU(),

            nn.Conv2d(in_channels = 64, 
                                 out_channels = 128, 
                                 kernel_size = (3, 3),
                                 stride = 1,
                                 padding = 2),

            nn.Conv2d(in_channels = 128, 
                                 out_channels = 128, 
                                 kernel_size = (3, 3),
                                 stride = 1,
                                 padding = 2),    

            nn.ReLU(),

            nn.MaxPool2d(kernel_size = (3,3), 
                                  stride = (2,2)),
            
            nn.ReLU(),

            nn.Conv2d(in_channels = 128, 
                                 out_channels = 256, 
                                 kernel_size = (3, 3),
                                 stride = 1,
                                 padding = 2),

            nn.Conv2d(in_channels = 256, 
                                 out_channels = 256, 
                                 kernel_size = (3, 3),
                                 padding = 1,
                                 stride = 1),

            nn.ReLU(),

            nn.MaxPool2d(kernel_size = (3,3), 
                                  stride = (2,2)),

            nn.ReLU(),

            nn.Conv2d(in_channels = 256, 
                                 out_channels = 512, 
                                 kernel_size = (3, 3),
                                 padding = 1,
                                 stride = 1),

            nn.Conv2d(in_channels = 512, 
                                 out_channels = 512, 
                                 kernel_size = (3, 3),
                                 padding = 1,
                                 stride = 1),
            
            nn.Conv2d(in_channels = 512, 
                                 out_channels = 512, 
                                 kernel_size = (3, 3),
                                 padding = 1,
                                 stride = 1),

            nn.ReLU(),

            nn.MaxPool2d(kernel_size = (3,3), 
                                  stride = (2,2)),

            nn.ReLU(),

            nn.Conv2d(in_channels = 512, 
                                 out_channels = 512, 
                                 kernel_size = (3, 3),
                                 padding = 1,
                                 stride = 1),
            
            nn.Conv2d(in_channels = 512, 
                                 out_channels = 512, 
                                 kernel_size = (3, 3),
                                 padding = 1,
                                 stride = 1),
            
            nn.Conv2d(in_channels = 512, 
                                 out_channels = 512, 
                                 kernel_size = (3, 3),
                                 padding = 1,
                                 stride = 1),

            nn.ReLU(),

            nn.MaxPool2d(kernel_size = (3,3), 
                                  stride = (2,2)),

            nn.ReLU(),
        )
        
        # flatten layer
        self.flatten = nn.Flatten()
        self.bnm1 = nn.BatchNorm1d(25088)
        
        # fully connected layer
        self.hidden6 = nn.Linear(25088, 100)
        self.act1 = nn.ReLU()
        self.bnm2 = nn.BatchNorm1d(100)

        # fully connected layer
        self.hidden7 = nn.Linear(100, 4)

        # output layer
        self.output = nn.Softmax(dim = 1)

    # forward propagate input
    def forward(self, X):
        X = self.convLayer(X)
        X = self.flatten(X)
        X = self.bnm1(X)
        X = self.hidden6(X)
        X = self.act1(X)
        X = self.bnm2(X)
        X = self.hidden7(X)
        X = self.output(X)
        
        return X

In [None]:
def train_model(dataloader, model, loss_fn, optimizer, num, verbose = False):  

    for epoch in range(num):
        model.train()
        running_loss = 0
        size = len(dataloader.dataset)
        print(f"Epoch {epoch+1}\n-------------------------------")
        for batch, (X, y) in enumerate(train_dataloader):
            X, y = X.to(device), y.to(device)
            # Compute prediction error
            pred = model(X)
            loss = loss_fn(pred, y)

            # Backpropagation
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            if verbose == True and batch % 100 == 99:
                current = batch * len(X)
                print(f"loss: {running_loss / 100:>7f}  [{current:>5d}/{size:>5d}]")
                running_loss = 0.0


In [None]:
# model.eval() is a kind of switch for some specific layers/parts of the model that behave differently during training 
# and inference (evaluating) time. For example, Dropouts Layers, BatchNorm Layers etc. You need to turn off them during model evaluation, 
# and .eval() will do it for you. In addition, the common practice for evaluating/validation is using torch.no_grad() in pair with model.eval() 
# to turn off gradients computation:
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss = 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
    test_loss /= num_batches
    print(f"Test Loss: \n CrossEntropyLoss: {test_loss}")

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

Using cuda device


In [None]:
BATCH_SIZE = 64
LEARNING_RATE = 0.001
NUMBER_OF_EPOCHS = 20

In [None]:
# prepare the dataset

# load the dataset
dataset = cornSeeds(Primary_Dataset_with_Label)
# calculate split
train, test = dataset.get_splits()
# prepare data loaders
train_dataloader = DataLoader(train, batch_size = BATCH_SIZE)
test_dataloader = DataLoader(test, batch_size = BATCH_SIZE)

for X, y in test_dataloader:
    print(f"Shape of X: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

Shape of X: torch.Size([64, 3, 256, 256])
Shape of y: torch.Size([64]) torch.int64


In [None]:
# define the network
PATH = '/content/gdrive/MyDrive/checkpoint.pth'
model = CNN()
model.load_state_dict(torch.load(PATH))
model.to(device)
print(model)

CNN(
  (convLayer): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(2, 2))
    (1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(2, 2))
    (2): ReLU()
    (3): MaxPool2d(kernel_size=(3, 3), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
    (4): ReLU()
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(2, 2))
    (6): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(2, 2))
    (7): ReLU()
    (8): MaxPool2d(kernel_size=(3, 3), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
    (9): ReLU()
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(2, 2))
    (11): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (12): ReLU()
    (13): MaxPool2d(kernel_size=(3, 3), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
    (14): ReLU()
    (15): Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (16): Conv2d(512, 512, kernel_size=(3, 3), stri

In [None]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr = LEARNING_RATE)
torch.cuda.empty_cache()
train_model(train_dataloader, model, loss_fn, optimizer, num = 1, verbose = True)

Epoch 1
-------------------------------
loss: 0.761395  [ 6336/13351]
loss: 0.765987  [12736/13351]


In [None]:
import gc
gc.collect()
torch.cuda.empty_cache()
pred_label = torch.empty(int(len(test) / BATCH_SIZE) + 1, BATCH_SIZE)
true_label = torch.empty(int(len(test) / BATCH_SIZE) + 1, BATCH_SIZE)
N = 0
i = 0
for X, y in test_dataloader:
    X, y = X.to(device), y.to(device)
    pred = model(X)
    pred_label_batch = torch.argmax(pred, dim=1)
    if (pred_label_batch.size(dim=0) < BATCH_SIZE):
        N = BATCH_SIZE - pred_label_batch.size(dim=0)
        temp = torch.zeros(BATCH_SIZE)
        temp[:pred_label_batch.size(dim=0)] = pred_label_batch
        pred_label_batch = temp
    if (y.size(dim=0) < BATCH_SIZE):
        temp = torch.zeros(BATCH_SIZE)
        temp[:y.size(dim=0)] = y
        y = temp
    pred_label[i] = pred_label_batch
    true_label[i] = y
    i += 1
pred_label = pred_label.flatten()[:-N]
true_label = true_label.flatten()[:-N]

RuntimeError: ignored

In [None]:
from sklearn.metrics import confusion_matrix
cm = confusion_matrix(true_label.numpy(), pred_label.numpy())
cm

In [None]:
accuracy = np.sum(np.diagonal(cm)) / np.sum(cm)
accuracy

In [None]:
torch.save(model.state_dict(), '/content/gdrive/MyDrive/checkpoint.pth')