In [36]:
# library imports
import os
import glob
import cv2
import numpy as np
import pandas as pd
import seaborn as sns
import torch
import torchvision
import matplotlib.pyplot as plt
from torchvision import transforms
from datetime import datetime
from torch import nn
from torch import optim
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split

In [37]:
# checking for input validity
for dirs, _, files in os.walk('/kaggle/input') :
    for file in files :
        pass
        # print(os.path.join(dirs, file))

In [38]:
# data upload
beeData = pd.read_csv('../input/honey-bee-pollen/pollendataset/PollenDataset/pollen_data.csv')
beeData = beeData.iloc[:, [j for j, c in enumerate(beeData.columns) if j != 0]] # dropping first column
print(beeData)

In [39]:
# getting a list of all the path names
path = '../input/honey-bee-pollen/pollendataset/PollenDataset/images/'
imageList = glob.glob(os.path.join(path, '*.jpg'))

In [40]:
# defining a function to return a tuple of 2 numpy arrays i.e.; the dataset
def getDataset(pathList, size=(300, 1)) :
    
    data = []
    
    for i, file in enumerate(pathList) :
        image = cv2.imread(file)
        image = cv2.resize(image, (28,28))
        image = np.transpose(image, (2,0,1))
        data.append(image)
        
    labels = [1 if f.split("/")[-1][0] == 'P' else 0 for f in pathList]
    
    return np.array(data), np.array(labels)

In [41]:
# type converting, reshaping and splitting data
N = 714
X, Y = getDataset(imageList)
# X = np.transpose(X, (2, 0, 1))
X = torch.from_numpy(X.astype(np.float32))
Y = torch.from_numpy(Y)
x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size = 0.5)

In [42]:
# creating the dataset
training_dataset = list(zip(x_train, y_train))
testing_dataset = list(zip(x_test, y_test))

In [43]:
# CNN final layer length
k = len(training_dataset[0])

In [44]:
# creating the dataloader
training_dataloader = DataLoader(training_dataset, batch_size = 32, shuffle = True)
testing_dataloader = DataLoader(testing_dataset, batch_size = 32, shuffle = True)

In [45]:
# define the model

class CNN(nn.Module):
  def __init__(self, K):
    super(CNN, self).__init__()
    self.model = nn.Sequential(
     nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=2),
     nn.ReLU(),
     nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=2),
     nn.ReLU(),
     nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=2),
     nn.ReLU(),
     nn.Flatten(),
     nn.Dropout(0.2),
     nn.Linear(128 * 2 * 2, 512),
     nn.ReLU(),
     nn.Dropout(0.2),
     nn.Linear(512, K)   
 )
  
  def forward(self, X):
    out = self.model(X)
    out = out.view(out.size(0), -1)
    # out = self.dense_layers(out)
    return out


In [46]:
# model creation
model = CNN(k)

In [47]:
# GPU initialization
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model.to(device)
print(device)

In [48]:
# A function to encapsulate the training loop
def batch_gd(model, criterion, optimizer, train_loader, test_loader, epochs):
  train_losses = np.zeros(epochs)
  test_losses = np.zeros(epochs)

  for it in range(epochs):
    model.train()
    t0 = datetime.now()
    train_loss = []
    for inputs, targets in train_loader:
      # move data to GPU
      inputs, targets = inputs.to(device), targets.to(device)

      # zero the parameter gradients
      optimizer.zero_grad()

      # Forward pass
      outputs = model(inputs)
      loss = criterion(outputs, targets)
        
      # Backward and optimize
      loss.backward()
      optimizer.step()

      train_loss.append(loss.item())

    # Get train loss and test loss
    train_loss = np.mean(train_loss) # a little misleading
    
    model.eval()
    test_loss = []
    for inputs, targets in test_loader:
      inputs, targets = inputs.to(device), targets.to(device)
      outputs = model(inputs)
      loss = criterion(outputs, targets)
      test_loss.append(loss.item())
    test_loss = np.mean(test_loss)

    # Save losses
    train_losses[it] = train_loss
    test_losses[it] = test_loss
    
    dt = datetime.now() - t0
    print(f'Epoch {it+1}/{epochs}, Train Loss: {train_loss:.4f}, \
      Test Loss: {test_loss:.4f}, Duration: {dt}')
  
  return train_losses, test_losses

In [49]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())

In [50]:
# calling training function
train_losses, test_losses = batch_gd(model, criterion, optimizer, training_dataloader, testing_dataloader, epochs=16)

In [51]:
# Plot the train loss and test loss per iteration
plt.plot(train_losses, label='train loss')
plt.plot(test_losses, label='test loss')
plt.legend()
plt.show()

In [52]:
# Accuracy

model.eval()
n_correct = 0.
n_total = 0.
for inputs, targets in training_dataloader:
  # move data to GPU
  inputs, targets = inputs.to(device), targets.to(device)

  # Forward pass
  outputs = model(inputs)

  # Get prediction
  # torch.max returns both max and argmax
  _, predictions = torch.max(outputs, 1)
  
  # update counts
  n_correct += (predictions == targets).sum().item()
  n_total += targets.shape[0]

train_acc = n_correct / n_total


n_correct = 0.
n_total = 0.
for inputs, targets in testing_dataloader:
  # move data to GPU
  inputs, targets = inputs.to(device), targets.to(device)

  # Forward pass
  outputs = model(inputs)

  # Get prediction
  # torch.max returns both max and argmax
  _, predictions = torch.max(outputs, 1)
  
  # update counts
  n_correct += (predictions == targets).sum().item()
  n_total += targets.shape[0]

test_acc = n_correct / n_total
print(f"Train acc: {train_acc:.4f}, Test acc: {test_acc:.4f}")