# Final Project

### connect my google drive and set dataset path

In [1]:
# connect google drive
from google.colab import drive
drive.mount("/content/gdrive")

Mounted at /content/gdrive


In [4]:
!ls "/content/gdrive/MyDrive/資料科學概論"

 crops_image  'Final Project.ipynb'


In [5]:
# dataset path
folder_path = "/content/gdrive/MyDrive/資料科學概論/crops_image/"

### import package

In [6]:
import pandas as pd
import numpy as np
import torch
from torch import nn, optim
from torchvision import datasets
from torchvision.datasets import ImageFolder
import torchvision.transforms as transforms
import torch.nn.functional as F
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.sampler import SubsetRandomSampler
from tqdm.auto import tqdm
import time

In [9]:
# skip plt.show()
%matplotlib inline

### preprocess the data

In [7]:
# Read dataset
data_transform = transforms.Compose([transforms.Resize((256, 256)),
                                     transforms.ToTensor(),
                                     transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])
dataset = ImageFolder(folder_path, transform=data_transform, target_transform=None)
print(dataset.class_to_idx)

{'atemoya': 0, 'banana': 1, 'bareland': 2, 'cabbage': 3, 'carrot': 4, 'grapes': 5, 'guava': 6, 'mango': 7, 'papaya': 8, 'pineapple': 9, 'pumpkin': 10}


In [8]:
# define the hyper parameter
batch_size = 32

# set the ratio of train data, valid data, and test data
train_ratio = 0.7
valid_ratio = 0.2
test_ratio = 0.1

In [10]:
class CustomDataset(Dataset):
    def __init__(self, data):
        self.data = data

    def __getitem__(self, index):
        # get data by index
        return self.data[index]

    def __len__(self):
        # return the length of dataset
        return len(self.data)

In [11]:
# get the size of dataset
dataset_size = len(dataset)
indices = list(range(dataset_size))
train_size = int(train_ratio * dataset_size)
valid_size = int(valid_ratio * dataset_size)
test_size = dataset_size - train_size - valid_size

# randomize the index
np.random.seed(24)
np.random.shuffle(indices)

# separate the dataset with given ratio
train_indices = indices[:train_size]
valid_indices = indices[train_size:train_size+valid_size]
test_indices = indices[train_size+valid_size:]

# define SubsetRandomSampler and DataLoader
train_sampler = SubsetRandomSampler(train_indices)
valid_sampler = SubsetRandomSampler(valid_indices)
test_sampler = SubsetRandomSampler(test_indices)

train_loader = DataLoader(dataset, batch_size=batch_size, sampler=train_sampler)
valid_loader = DataLoader(dataset, batch_size=batch_size, sampler=valid_sampler)
test_loader = DataLoader(dataset, batch_size=batch_size, sampler=test_sampler)

In [12]:
print("Information of Train Loader :")
print("batch size of each epoch :", train_loader.batch_size)
print("number of batches :", len(train_loader))
print("dataset size :", len(train_loader.sampler))

Information of Train Loader :
batch size of each epoch : 32
number of batches : 73
dataset size : 2310


In [13]:
for imgs, batchsize in train_loader:
     print("image size of train_loader :", imgs.shape)
     print("batch size of train_loader :", batchsize.shape)
     break

image size of train_loader : torch.Size([32, 3, 256, 256])
batch size of train_loader : torch.Size([32])


In [14]:
total_samples = len(test_loader.dataset)

print("Total samples in the test dataset are ", total_samples)

Total samples in the test dataset are  3300


### setup the model

In [15]:
class Classifier(nn.Module):
    def __init__(self):
      super().__init__()
      self.cnn_layers = nn.Sequential(
        # Input shape[3, 256, 256]
        # CNN layers
        nn.Conv2d(3, 64, 3, padding=1), # output shape(64, 256, 256)
        nn.BatchNorm2d(64),
        nn.ReLU(),
        nn.MaxPool2d(2, 2), # output shape(64, 128, 128)

        nn.Conv2d(64, 128, 3, padding=1), # output shape(128, 128, 128)
        nn.BatchNorm2d(128),
        nn.ReLU(),
        nn.MaxPool2d(2, 2), # output shape(128, 64, 64)

        nn.Conv2d(128, 256, 3, padding=1), # output shape(256, 64, 64)
        nn.BatchNorm2d(256),
        nn.ReLU(),
        nn.MaxPool2d(2, 2), # output shape(256, 32, 32)

        nn.Conv2d(256, 256, 3, padding=1), # output shape(256, 32, 32)
        nn.BatchNorm2d(256),
        nn.ReLU(),
        nn.MaxPool2d(2, 2), # output shape(256, 16, 16)

        nn.Conv2d(256, 512, 3, padding=1), # output shape(512, 16, 16)
        nn.BatchNorm2d(512),
        nn.ReLU(),
        nn.MaxPool2d(2, 2), # output shape(512, 8, 8)
      )
      self.fc_layers = nn.Sequential(
      # Fully connected layer
        nn.Linear(512 * 8 * 8, 256),
        nn.Dropout(0.4),
        nn.ReLU(),
        nn.Linear(256, 128),
        nn.ReLU(),
        nn.Linear(128, 64),
        nn.ReLU(),
        nn.Linear(64, 32),
        nn.ReLU(),
        nn.Linear(32, 11),
      )

    def forward(self, x):
      x = self.cnn_layers(x)

      x = torch.flatten(x, 1)

      x = self.fc_layers(x)
      return x

In [16]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print(f'the device to train the model is {device}')

the device to train the model is cpu


### train the model

In [17]:
lr = 0.0003
epochs = 20
decays = 0.0001

In [18]:
model = Classifier().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=decays) # 可換成其他opt
scheduler = optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.97)

In [19]:
# random seed
seed = 0
torch.manual_seed(seed)
if(torch.cuda.is_available()):
  torch.cuda.manual_seed(seed)
  torch.cuda.manual_seed_all(seed)
np.random.seed(seed)

In [None]:
valid_acc = 0.0
train_acc_history = []
train_loss_history = []
valid_acc_history = []
valid_loss_history = []
for epoch in range(epochs):
  model.train()

  train_loss = []
  train_accs = []

  for batch in tqdm(train_loader):

    imgs, labels = batch


    logits = model(imgs.to(device))


    loss = criterion(logits, labels.to(device))


    optimizer.zero_grad()


    loss.backward()

    grad_norm = nn.utils.clip_grad_norm_(model.parameters(), max_norm=10)

    optimizer.step()

    acc = (logits.argmax(dim=-1) == labels.to(device)).float().mean()

    train_loss.append(loss.item())
    train_accs.append(acc.item())

  scheduler.step()
  train_loss = sum(train_loss) / len(train_loss)
  train_accs = sum(train_accs) / len(train_accs)
  train_acc_history.append(train_accs)
  train_loss_history.append(train_loss)

  print(f"[ Train | {epoch + 1:03d}/{epochs:03d} ] loss = {train_loss:.5f}, acc = {acc:.5f}")

  # ---------- Validation ----------

  model.eval()

  valid_loss = []
  valid_accs = []
  y_pred = []
  y_test = []
  def validate(valid_loader, model, criterion, device, y_pred, y_test):
    model.eval()

    with torch.no_grad():
        end = time.time()
        for i, (images, labels) in enumerate(valid_loader):

            images, labels = images.cuda(), labels.cuda()
            output = model(images)
            _, preds = torch.max(output, 1)
            loss = criterion(output, labels)
            acc = (output.argmax(dim=-1) == labels.to(device)).float().mean()

            y_pred.extend(preds.view(-1).detach().cpu().numpy())
            y_test.extend(labels.view(-1).detach().cpu().numpy())
            valid_loss.append(loss.item())
            valid_accs.append(acc.item())


    return y_pred, y_test ,valid_loss, valid_accs


  y_pred, y_test, valid_loss, valid_accs = validate(valid_loader, model, criterion, device,  y_pred, y_test)


  valid_loss = sum(valid_loss) / len(valid_loss)
  valid_acc = sum(valid_accs) / len(valid_accs)
  valid_acc_history.append(valid_acc)
  valid_loss_history.append(valid_loss)

  # Print the information.
  print(f"[ Valid | {epoch + 1:03d}/{epochs:03d} ] loss = {valid_loss:.5f}, acc = {valid_acc:.5f}")

  0%|          | 0/73 [00:00<?, ?it/s]

### print the loss and accuracy plot

In [None]:
def show_train_history(train_acc, valid_acc, ylabel):
    epoch = np.linspace(1, 20, 20)
    plt.plot(epoch, train_acc)
    plt.plot(epoch, valid_acc)
    plt.title(f'Train History of {ylabel}')
    plt.ylabel(ylabel)
    plt.xlabel('Epoch')
    plt.legend(['train', 'test'], loc='upper left')
    plt.show()

In [None]:
show_train_history(train_acc_history, valid_acc_history, 'Accuracy')

In [None]:
show_train_history(train_loss_history, valid_loss_history, 'Loss')

### show the accuracy of each crop

In [None]:
label_dict={0:"atemoya", 1:"banana", 2:"bareland", 3:"cabbage", 4:"carrot", 5:"grapes",
            6:"guava", 7:"mango", 8:"papaya", 9:"pineapple", 10:"pumpkin"}

In [None]:
test_loss = 0.0
class_correct = [0.0] * 11
class_total = [0.0] * 11

model.eval()
with torch.no_grad():
    for data, target in test_loader:
        data, target = data.to(device), target.to(device)
        output = model(data)
        loss = criterion(output, target)
        test_loss += loss.item() * data.size(0)
        _, pred = torch.max(output, 1)
        correct_tensor = pred.eq(target.data.view_as(pred))
        correct = correct_tensor.cpu().numpy()

        for t, c in zip(target, correct):
            label = t.item()
            class_correct[label] += c.item()
            class_total[label] += 1

test_loss = test_loss / len(test_loader.dataset)
print(f'Test Loss: {test_loss:.3f}\n')

for i in range(11):
    if class_total[i] > 0:
        print(f'Test Accuracy of {label_dict[i]} : {100 * class_correct[i] / class_total[i]:.2f}% ({int(class_correct[i])}/{int(class_total[i])})')
    else:
        print(f'Test Accuracy of {i} : N/A (no training examples)')

overall_correct = sum(class_correct)
overall_total = sum(class_total)
print(f'\nTest Accuracy (Overall): {100. * overall_correct / overall_total:.2f}% ({int(overall_correct)}/{int(overall_total)})')

### print confusion matrix

In [None]:
import itertools
def plot_confusion_matrix(cm, classes, normalize=False, title='Confusion matrix', cmap=plt.cm.Blues):
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    else:
        pass

    plt.figure(figsize=(6, 6))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)

    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes, rotation=45)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.ylabel('True label')
    plt.xlabel('Predicted label')
    plt.tight_layout()

In [None]:
classes = dataset.classes
print(classes)

In [None]:
from sklearn.metrics import confusion_matrix

model.eval()
y_true = []
y_pred = []

with torch.no_grad():
    for data, target in test_loader:
        data, target = data.to(device), target.to(device)
        output = model(data)
        _, pred = torch.max(output, 1)

        y_true.extend(target.cpu().numpy())
        y_pred.extend(pred.cpu().numpy())

y_true = np.array(y_true)
y_pred = np.array(y_pred)

matrix = confusion_matrix(y_true, y_pred)
plot_confusion_matrix(matrix, classes=classes, normalize=True)

#### save the model

In [None]:
torch.save(model, "/content/gdrive/MyDrive/資料科學概論/mymodel.pth")