In [2]:
import torch
import numpy as np
import pandas as pd
import torch.nn.functional as F
import matplotlib.pyplot as plt
import torchvision.transforms as transforms

from PIL import Image
from tqdm import tqdm
from pathlib import Path
from matplotlib import cm
from torch import nn, optim
from __future__ import annotations
from torch.utils.data import DataLoader

from IPython.display import clear_output

import warnings

In [3]:
import kagglehub

path = kagglehub.dataset_download("bloodlaac/products-dataset")

print("Path to dataset files:", path)

  from .autonotebook import tqdm as notebook_tqdm


Path to dataset files: C:\Users\Юля\.cache\kagglehub\datasets\bloodlaac\products-dataset\versions\1


In [4]:
warnings.filterwarnings("ignore")

In [5]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

Using device: cuda


In [6]:
food_dir = f"{path}\products_dataset"

FOOD = [
    'FreshApple', 'FreshBanana', 'FreshMango', 'FreshOrange', 'FreshStrawberry',
    'RottenApple', 'RottenBanana', 'RottenMango', 'RottenOrange', 'RottenStrawberry',
    'FreshBellpepper', 'FreshCarrot', 'FreshCucumber', 'FreshPotato', 'FreshTomato',
    'RottenBellpepper', 'RottenCarrot', 'RottenCucumber', 'RottenPotato', 'RottenTomato'
]

In [7]:
path = Path(food_dir)

In [8]:
class LabeledDataset():
    def __init__(self, food_dir: Path, food_classes: list[str], transform=None) -> LabeledDataset:
        self.food_dir = food_dir
        self.food_classes = food_classes
        self.transform = transform
        self.images_paths = []
        self.labels = []

        for cls_name in food_classes:
            class_path = Path(food_dir)
            class_path /= cls_name

            for image_name in class_path.iterdir():
                image_path = class_path / image_name
                self.images_paths.append(image_path)
                self.labels.append(food_classes.index(cls_name))
        
    def __len__(self) -> int:
        return len(self.images_paths)
    
    def __getitem__(self, index: int):
        image = Image.open(self.images_paths[index]).convert("RGB")
        label = self.labels[index]

        if self.transform:
            image = self.transform(image)

        return image, label

In [9]:
data_transforms = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomCrop([200, 200]),
    transforms.RandomVerticalFlip(p=0.5),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(degrees=90),
    #transforms.ColorJitter(),
    #transforms.RandomAffine(degrees=0, translate=(0.3, 0.3)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.485, 0.456, 0.406],
        std=[0.485, 0.456, 0.406]
    )
])

In [10]:
food_dataset = LabeledDataset(food_dir, FOOD, transform=data_transforms)

In [11]:
total = len(food_dataset)
n_train = int(0.6 * total)
n_val   = int(0.2 * total)
n_test  = total - n_train - n_val

In [12]:
train_dataset, val_dataset, test_dataset = torch.utils.data.random_split(food_dataset, [n_train, n_val, n_test])

In [13]:
train_dataloaders = []

for batch_size in (16, 32, 64):
    train_dataloaders.append(
        DataLoader(
            train_dataset,
            batch_size=batch_size,
            shuffle=True,
            num_workers=7,
            pin_memory=True,  # TODO: fix
        )
    )

In [14]:
val_dataloader = DataLoader(val_dataset, batch_size=64, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=False)

In [15]:
class Block(nn.Module):
    """
    Create basic unit of ResNet.
    Consists of two convolutional layers.
    """

    def __init__(
            self,
            in_channels: int,
            out_channels: int,
            stride: int = 1,
            downsampling=None
        ) -> Block:

        super().__init__()
        
        self.conv1 = nn.Conv2d(
            in_channels=in_channels,
            out_channels=out_channels,
            kernel_size=3,
            stride=stride,
            padding=1
        )
        self.batch_norm = nn.BatchNorm2d(num_features=out_channels)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(
            in_channels=out_channels,
            out_channels=out_channels,
            kernel_size=3,
            stride=1,  # TODO: Replace with padding="same"
            padding=1
        )
        self.downsampling = downsampling

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        input = x

        pred = self.batch_norm(self.conv1(x))
        pred = self.relu(pred)
        pred = self.batch_norm(self.conv2(pred))
        
        if self.downsampling is not None:
            input = self.downsampling(x)
        
        pred += input
        pred = self.relu(pred)

        return pred

In [16]:
class ResNet(nn.Module):
    """
    Build model ResNet and return prediction
    """

    def __init__(self, blocks_num_list: list[int]) -> ResNet:
        """
        ResNet init.

        Arguments:
        blocks_num_list -- number of basic blocks for each layer
        """
        super().__init__()

        self.in_channels = 64  # Default number of channels for first layer. Mutable!

        # Reduce resolution of picture by 2
        # 224 -> 112
        self.conv1 = nn.Conv2d(
            in_channels=3,
            out_channels=64,
            kernel_size=7,
            stride=2,
            padding=3
        )
        self.batch_norm = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.pooling = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)  # 112 -> 56

        self.layer1 = self.create_layer(  # Default stride. No resolution reduction.
            out_channels=64,
            num_blocks=blocks_num_list[0]
        )
        self.layer2 = self.create_layer(  # Resolution reduction. 56 -> 28
            out_channels=128,
            num_blocks=blocks_num_list[1],
            stride=2
        )
        self.layer3 = self.create_layer(  # Resolution reduction. 28 -> 14
            out_channels=256,
            num_blocks=blocks_num_list[2],
            stride=2
        )
        self.layer4 = self.create_layer(  # Resolution reduction. 14 -> 7
            out_channels=512,
            num_blocks=blocks_num_list[3],
            stride=2
        )

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512, 20)
    
    def create_layer(
            self,
            out_channels: int,
            num_blocks: int,
            stride: int = 1
        ) -> nn.Sequential:
        """
        Create ResNet layer.

        out_channels -- number of output channels per block
        num_blocks -- number of blocks per layer
        stride -- step of filter in conv layer (default 1)
        """
        downsampling = None

        if stride != 1:
            downsampling = nn.Sequential(
                nn.Conv2d(
                    in_channels=self.in_channels,
                    out_channels=out_channels,
                    kernel_size=1,
                    stride=stride
                ),
                nn.BatchNorm2d(out_channels)
            )

        blocks: list[Block] = []
        
        blocks.append(Block(
            in_channels=self.in_channels,
            out_channels=out_channels,
            stride=stride,
            downsampling=downsampling
        ))

        self.in_channels = out_channels

        for _ in range(num_blocks - 1):
            blocks.append(Block(out_channels, out_channels))

        return nn.Sequential(*blocks)
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        pred = self.batch_norm(self.conv1(x))
        pred = self.relu(pred)
        pred = self.pooling(pred)

        pred = self.layer1(pred)
        pred = self.layer2(pred)
        pred = self.layer3(pred)
        pred = self.layer4(pred)

        pred = self.avgpool(pred)
        pred = torch.flatten(pred, 1)
        pred = self.fc(pred)

        return pred

In [17]:
def plot_history(
        epochs: int,
        train_history: list,
        val_history: list,
        optimizer_name: str,
        label: str
    ):
    _, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 10))
    ax1.plot(np.arange(1, epochs + 1), train_history, label=label)
    ax2.plot(np.arange(1, epochs + 1), val_history, label=label)

    for ax in (ax1, ax2):
        ax.set_xlabel('Epochs')
        ax.set_ylabel('Accuracy')
        ax.legend(loc='lower right')
        ax.grid(True)

    ax1.set_title(f'{optimizer_name} Training accuracy')
    ax2.set_title(f'{optimizer_name} Validation accuracy')

    plt.tight_layout()
    plt.show()

In [18]:
def validate(model, loader):
    model.eval()

    correct, total = 0, 0

    for batch in loader:
        images, labels = batch[0], batch[1]
        images = images.to(device)
        labels = labels.to(device)

        with torch.no_grad():
            pred = model(images)
        pred = torch.argmax(pred, dim=1)

        total += len(pred)
        correct += (pred == labels).sum().item()

    return correct / total

In [19]:
def train(model, criterion, train_loader, val_loader, optimizer, epochs=10):
    train_acc, val_acc = [], []
    model.train()

    for epoch in tqdm(range(epochs), leave=False):
        correct, total = 0, 0

        for batch in train_loader:
            images, labels = batch[0], batch[1]

            images = images.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            pred = model(images)
            loss = criterion(pred, labels)
                
            loss.backward()
            optimizer.step()

            pred = torch.argmax(pred, dim=1)

            total += len(pred)
            correct += (pred == labels).sum().item()

        train_acc.append(correct / total)
        val_acc.append(validate(model, val_loader))
        # TODO: add loss log

        print(f"Epoch: [{epoch + 1}/{epochs}]")
        print(f"Train accuracy: {train_acc[-1]:.4f}")
        print(f"Val accuracy: {val_acc[-1]:.4f}\n")

    return train_acc, val_acc

In [20]:
def test(model, loader):
    model.eval()

    correct, total = 0, 0

    for batch in loader:
        images, labels = batch[0], batch[1]
        images = images.to(device)
        labels = labels.to(device)

        with torch.no_grad():
            pred = model(images)
        pred = torch.argmax(pred, dim=1)

        total += len(pred)
        correct += (pred == labels).sum().item()

    return correct / total

In [21]:
criterion = nn.CrossEntropyLoss()

In [22]:
def get_optimizer(optimizer_name: str, model: ResNet, lr=0.01):
    optimizers = {
        "SGD": optim.SGD(model.parameters(), lr=lr, momentum=0.9),
        "Adam": optim.Adam(model.parameters(), lr=lr),
        "AdamW": optim.AdamW(model.parameters(), lr=lr),
        "RMSprop": optim.RMSprop(model.parameters(), lr=lr)
    }

    return optimizers[optimizer_name]

In [23]:
optimizers_names = ["SGD", "Adam", "AdamW", "RMSprop"]

In [24]:
epochs = [10, 20, 50, 100]

blocks_num_list = [2, 2, 2, 2]

In [25]:
model = ResNet([2, 2, 2, 2]).to(device)
optimizer=get_optimizer("SGD", model, lr=0.001)
print(optimizer)

SGD (
Parameter Group 0
    dampening: 0
    differentiable: False
    foreach: None
    fused: None
    lr: 0.001
    maximize: False
    momentum: 0.9
    nesterov: False
    weight_decay: 0
)


In [None]:
print("Training ResNet18 with SGD")

_, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 10))

for epoch in epochs:
    model = ResNet(blocks_num_list).to(device)

    print(
        f"{epoch} epochs\n"
    )

    train_acc, val_acc = train(
        model,
        criterion,
        train_dataloaders[0],
        val_dataloader,
        optimizer=get_optimizer("SGD", model, lr=0.001),
        epochs=epoch,
    )

    label=f'{epoch} epochs'

    ax1.plot(np.arange(1, epoch + 1), train_acc, label=label)
    ax2.plot(np.arange(1, epoch + 1), val_acc, label=label)

    test_acc = test(model, test_dataloader)

    print(f"Test accuracy: {test_acc:.4f}\n")

for ax in (ax1, ax2):
    ax.set_xlabel('Epochs')
    ax.set_ylabel('Accuracy')
    ax.legend(loc='lower right')
    ax.grid(True)

ax1.set_title("SGD Training accuracy")
ax2.set_title("SGD Validation accuracy")

plt.tight_layout()
plt.show()

Training ResNet18 with SGD
10 epochs



  0%|          | 0/10 [00:00<?, ?it/s]