# This notebook aims to develop a neural-network based classifier for the coffee bean dataset
## The technology of choice is pytorch

In [198]:
import torch
import pandas as pd
import os
from torchvision.io import read_image
from torch.utils.data import Dataset

bean_annotations = pd.read_csv("data/beans.csv")
bean_annotations.head()
DEFECT_CLASSES = dict(
    [
        (defect, index)
        for (index, defect) in enumerate(pd.unique(bean_annotations["defect_class"]))
    ]
)
DEFECT_CLASSES

{'burnt': 0,
 'normal': 1,
 'frag': 2,
 'under': 3,
 'quaker': 4,
 'insectOrMold': 5}

In [199]:
from torchvision.transforms import v2

transforms = {
    "train": v2.Compose(
        [
            v2.Resize(size=(400, 400)),
            v2.RandomHorizontalFlip(),
            v2.RandomRotation(
                degrees=(20, 340), fill=(255, 255, 255)
            ),  # Augment the data with random rotations, setting the background to white
            v2.ToDtype(
                torch.float32, scale=True
            ),  # Use float32 datatype to enable training on Apple silicon GPU
            v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ]
    ),
    "test": v2.Compose(
        [
            v2.Resize(size=(400, 400)),
            v2.ToDtype(torch.float32, scale=True),
            v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ]
    ),
}

In [200]:
class RoastDefectsDataset(Dataset):
    def __init__(self, csv_file, root_dir, transform=None):
        self.bean_annotations = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.bean_annotations)

    def get_labels(self):
        return self.bean_annotations["defect_class"]

    def __getitem__(self, item):
        if torch.is_tensor(item):
            item = item.toList()

        img_name = self.bean_annotations.iloc[item, 0]
        img_dir = "-".join(img_name.split("-")[0:4])
        img_path = os.path.join(self.root_dir, img_dir, img_name)
        image = read_image(img_path)

        if self.transform:
            image = self.transform(image)

        annotations = DEFECT_CLASSES[
            self.bean_annotations.iloc[item, 1:]["defect_class"]
        ]
        return image, annotations

In [201]:
from sklearn.model_selection import train_test_split

train, test = train_test_split(bean_annotations, train_size=0.8)

In [202]:
train["defect_class"].value_counts()

defect_class
normal          1051
quaker           788
frag             231
under             82
burnt             41
insectOrMold      35
Name: count, dtype: int64

In [203]:
test["defect_class"].value_counts()

defect_class
normal          260
quaker          190
frag             65
under            22
insectOrMold     12
burnt             9
Name: count, dtype: int64

In [204]:
from torch.utils.data import DataLoader, SubsetRandomSampler
from torchsampler import ImbalancedDatasetSampler

data_train = RoastDefectsDataset(
    csv_file="data/beans.csv", root_dir="data/processed", transform=transforms["train"]
)

data_test = RoastDefectsDataset(
    csv_file="data/beans.csv", root_dir="data/processed", transform=transforms["test"]
)

train_sampler = ImbalancedDatasetSampler(
    data_train, labels=train["defect_class"], indices=list(train.index)
)
test_sampler = SubsetRandomSampler(list(test.index))


train_loader = DataLoader(data_train, sampler=train_sampler, batch_size=16)
test_loader = DataLoader(data_test, sampler=test_sampler, batch_size=64)

dataloaders = {"test": test_loader, "train": train_loader}
dataset_sizes = {"test": len(test), "train": len(train)}

In [205]:
device = "mps"  # Train on GPU

In [206]:
from sklearn.metrics import classification_report


def train_model(model, criterion, optimizer, scheduler, num_epochs=25):
    for epoch in range(num_epochs):
        print(f"Epoch {epoch}/{num_epochs - 1}")
        print("-" * 10)

        # Each epoch has a training and validation phase
        for phase in ["train", "test"]:
            if phase == "train":
                model.train()  # Set model to training mode
            else:
                model.eval()  # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            all_labels = []
            all_preds = []

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)
                all_labels.extend(labels.tolist())

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == "train"):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)
                    all_preds.extend(preds.tolist())

                    # backward + optimize only if in training phase
                    if phase == "train":
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            if phase == "train":
                scheduler.step()

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects / dataset_sizes[phase]
            print(f"Correct guesses in phase {phase}: {running_corrects}")
            print(f"{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}")
            if phase == "test":
                print("Classification report for test dataset:")
                print(
                    classification_report(
                        all_labels,
                        all_preds,
                        target_names=DEFECT_CLASSES.keys(),
                        zero_division=0,
                    )
                )
            print()
    return model

In [207]:
from torchvision import models
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler

model_ft = models.resnet18(weights="IMAGENET1K_V1")

# for param in model_ft.parameters():
#     param.requires_grad = False

num_ftrs = model_ft.fc.in_features
model_ft.fc = nn.Linear(num_ftrs, len(DEFECT_CLASSES))

model_ft = model_ft.to(device)

criterion = nn.CrossEntropyLoss()

optimizer_ft = optim.SGD(model_ft.parameters(), lr=0.001, momentum=0.9)

exp_lr_scheduler = lr_scheduler.StepLR(optimizer_ft, step_size=10, gamma=0.1)

In [208]:
model_ft = train_model(
    model_ft, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=30
)

Epoch 0/29
----------
Correct guesses in phase train: 1622
train Loss: 0.8028 Acc: 0.7280
Correct guesses in phase test: 440
test Loss: 0.6756 Acc: 0.7885
Classification report for test dataset:
              precision    recall  f1-score   support

       burnt       1.00      0.22      0.36         9
      normal       0.87      0.93      0.90       260
        frag       0.51      0.34      0.41        65
       under       0.50      0.05      0.08        22
      quaker       0.74      0.92      0.82       190
insectOrMold       0.00      0.00      0.00        12

    accuracy                           0.79       558
   macro avg       0.60      0.41      0.43       558
weighted avg       0.75      0.79      0.75       558


Epoch 1/29
----------
Correct guesses in phase train: 1839
train Loss: 0.5006 Acc: 0.8254
Correct guesses in phase test: 420
test Loss: 0.7126 Acc: 0.7527
Classification report for test dataset:
              precision    recall  f1-score   support

       burn

KeyboardInterrupt: 

In [209]:
from sklearn.metrics import accuracy_score

model_ft.eval()
for inputs, labels in dataloaders["test"]:
    print(accuracy_score(torch.max(model_ft(inputs.to("mps")), 1)[1].cpu(), labels))

0.9375
0.90625
0.90625
0.828125
0.921875
0.953125
0.921875
0.9375
0.9347826086956522


In [210]:
torch.save(model_ft.state_dict(), "model.pt")