In [None]:
# this mounts your Google Drive to the Colab VM.
from google.colab import drive
drive.mount('/content/drive', force_remount=True)


## Download Rice Dataset
from:  https://drive.google.com/file/d/1eSp5f5ih17blcqjgxJQ1IKx9a7QXTqJT/view?usp=sharing

In [None]:
!gdown  https://drive.google.com/u/0/uc?id=1eSp5f5ih17blcqjgxJQ1IKx9a7QXTqJT&export=download
!unzip ./rise.zip -d ./rice

In [None]:
# get device for training

import torch

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Using {} device'.format(device))

## Read Images

In [None]:
import os

img_dir = '/content/rice/Rice_Image_Dataset/'
img_labels = ['Arborio', 'Basmati', 'Ipsala', 'Jasmine', 'Karacadag']  # all possible classes
num_classes = len(img_labels)
img_files = [[os.path.join(img_dir, label, x) 
               for x in os.listdir(os.path.join(img_dir, label))] 
               for label in img_labels]

In [None]:
print(len(img_labels), len(img_files))

In [None]:
img_file_list = []
img_label_list = []
for i, class_name in enumerate(img_labels):
    img_file_list.extend(img_files[i])
    img_label_list.extend([i] * len(img_files[i]))

In [None]:
from PIL import Image

# find image dimensions
image_width, image_height = Image.open(img_file_list[0]).size
print("Image dimensions:", image_width, "x", image_height)

In [None]:
print(len(img_file_list), len(img_label_list))
total_img = len(img_label_list)

In [None]:
print(img_label_list)

In [None]:
import random

# Shuffle both lists at once with same order

zipped = list(zip(img_file_list, img_label_list))
random.shuffle(zipped)
img_file_list, img_label_list = zip(*zipped)

print(img_label_list)

##Train, Test, Validation Split

In [None]:
fractions = [0.6, 0.2, 0.2]  # train, test, val
length_to_split = [int(f * total_img) for f in fractions]
print(length_to_split)

In [None]:
from itertools import islice

file_input = iter(img_file_list)
file_output = [list(islice(file_input, elem)) for elem in length_to_split]
print(len(file_output))

label_input = iter(img_label_list)
label_output = [list(islice(label_input, elem)) for elem in length_to_split]
print(len(label_output))

In [None]:
import numpy as np

X_train = np.array(file_output[0])
X_test = np.array(file_output[1])
X_val = np.array(file_output[2])

y_train = np.array(label_output[0])
y_test = np.array(label_output[1])
y_val = np.array(label_output[2])

In [None]:
print(X_train)  # example

In [None]:
print(len(X_train), len(y_train))
print(len(X_test), len(y_test))
print(len(X_val), len(y_val))

## Creating a Custom Dataset for my files

In [None]:
from torch.utils.data import Dataset
from torchvision import datasets, transforms, models
from torchvision.transforms import ToTensor, Lambda
import matplotlib.pyplot as plt

In [None]:
class CustomImageDataset(Dataset):

  def __init__(self, img_files, img_labels, transforms):
    self.img_files = img_files
    self.img_labels = img_labels
    self.transforms = transforms

  def __len__(self):
    return len(self.img_labels)

  def __getitem__(self, index):
    return self.transforms(self.img_files[index]), self.img_labels[index]

## Preparing the data for training with DataLoaders

In [None]:
!pip install -q "monai-weekly[gdown, nibabel, tqdm, itk]"

In [None]:
from monai.transforms import *

class SumDimension(Transform):
    def __init__(self, dim=1):
        self.dim = dim

    def __call__(self, inputs):
        return inputs.sum(self.dim)

In [None]:
# Define transforms

transform = Compose([
    LoadImage(image_only=True),
    AddChannel(),
    ScaleIntensity(),
    SumDimension(3),
    ToTensor(),
])

act = Activations(softmax=True)
to_onehot = AsDiscrete(to_onehot=True, n_classes=num_classes)

In [None]:
training_data = CustomImageDataset(X_train, y_train, transform)
test_data = CustomImageDataset(X_test, y_test, transform)
val_data = CustomImageDataset(X_val, y_val, transform)

In [None]:
from torch.utils.data import DataLoader

train_dataloader = DataLoader(training_data, batch_size=64, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=64)
val_dataloader = DataLoader(val_data, batch_size=64)

## Neural Network

In [None]:
from torch import nn

In [None]:
# Define the Class

class NeuralNetwork(nn.Module):
  def __init__(self):
    super(NeuralNetwork, self).__init__()
    self.flatten = nn.Flatten()
    self.linear_relu_stack = nn.Sequential(
        nn.Linear(image_width*image_height, 512),
        nn.ReLU(),
        nn.Linear(512, 512),
        nn.ReLU(),
        nn.Linear(512, len(img_labels)),
        nn.ReLU()
    )
  
  def forward(self, x):
    x = self.flatten(x)
    logits = self.linear_relu_stack(x)
    return logits

In [None]:
model = NeuralNetwork().to(device)
print(model)

In [None]:
X = torch.rand(1, image_width, image_height, device=device)
logits = model(X)
pred_probabilities = nn.Softmax(dim=1)(logits)
y_pred = pred_probabilities.argmax(1)
print(f"Predicted class: {y_pred}")

## Optimizing Model Params

In [None]:
# Loss Function

loss_fn = nn.CrossEntropyLoss()

In [None]:
# Optimizer

learning_rate = 1e-5
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
# Train Loop

def train_loop(dataloader, model, loss_fn, optimizer):
  size = len(dataloader.dataset)

  # print("dataloader: ", dataloader)
  for batch, (X, y) in enumerate(dataloader):
    X = X.to(device)
    y = y.to(device)
    
    # compute prediction and loss
    pred = model(X)
    loss = loss_fn(pred, y)

    # backpropagation
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if batch % 100 == 0:
      loss, current = loss.item(), batch * len(X)
      print(f"loss: {loss:>7f} [{current:>5d}/{size:>d}]")

In [None]:
# Test Loop

def test_loop(dataloader, model, loss_fn):
  size = len(dataloader.dataset)
  test_loss, correct = 0, 0

  with torch.no_grad():
    for X, y in dataloader:
      X = X.to(device)
      y = y.to(device)
      pred = model(X)
      test_loss += loss_fn(pred, y).item()
      correct += (pred.argmax(1) == y).type(torch.float).sum().item()

  test_loss /= size
  correct /= size
  print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [None]:
# Training

epochs = 10
for t in range(epochs):
  print(f"Epoch {t+1}\n------------------------------------")
  train_loop(train_dataloader, model, loss_fn, optimizer)
  test_loop(test_dataloader, model, loss_fn)
print("Done!")

## Saving and Loading Model and Model Weights


In [None]:
# Saving weights

model = models.vgg16(pretrained=True)
torch.save(model.state_dict(), 'model_weights.pth')

In [None]:
# Loading weights

model = models.vgg16()
model.load_state_dict(torch.load('model_weights.pth'))
model.eval()

In [None]:
# Saving model

torch.save(model, 'model.pth')

In [None]:
# Loadind model

model = torch.load('model.pth')
model.eval()

## Predict Example

In [None]:
FOLDERNAME = "computerVisionAssignments/assignment3"
assert FOLDERNAME is not None, "[!] Enter the foldername."

import sys
sys.path.append('/content/drive/My Drive/{}'.format(FOLDERNAME))

%cd drive/My\ Drive/$FOLDERNAME/

In [None]:
from pathlib import Path

def predict(img_path, trans):
  image = Image.open(Path(img_path))
  input = trans(image)
  output = model(input)
  prediction = int(torch.max(output.data, 1)[1].numpy())

In [None]:
# Retrieve item
index = 100
path = img_files[3][index]  # an image from class number 3 (Jasmin)
print(path)

# Generate prediction
image = transform(path)  # of type tensor
new_shape = tuple([1]) + image.shape  # shape: [N, C, W, H]
image = torch.reshape(image, new_shape)
image = image.permute(0, 3, 1, 2)
print(image.shape)
prediction = model(image)

# Predicted class value using argmax
predicted_class = np.argmax(prediction)
print(predicted_class)