# Food Classification with CNN - Building a Restaurant Recommendation System

This assignment focuses on developing a deep learning-based food classification system using Convolutional Neural Networks (CNNs). You will build a model that can recognize different food categories and use it to return the food preferences of a user.

## Learning Objectives
- Implement CNNs for image classification
- Work with real-world food image datasets
- Build a preference-detector system

## Background: AI-Powered Food Preference Discovery

The system's core idea is simple:

1. Users upload 10 photos of dishes they enjoy
2. Your CNN classifies these images into the 91 categories
3. Based on these categories, the system returns the user's taste profile

Your task is to develop the core computer vision component that will power this detection engine.

You are given a training ("train" folder) and a test ("test" folder) dataset which have ~45k and ~22k samples respectively. For each one of the 91 classes there is a subdirectory containing the images of the respective class.

## Assignment Requirements

### Technical Requirements
- Implement your own pytorch CNN architecture for food image classification
- Use only the provided training dataset split for training
- Train the network from scratch ; No pretrained weights can be used
- Report test-accuracy after every epoch
- Report all hyperparameters of final model
- Use a fixed seed and do not use any CUDA-features that break reproducibility
- Use Pytorch 2.6

### Deliverables
1. Jupyter Notebook with CNN implementation, training code etc.
2. README file
3. Report (max 3 pages)

Submit your report, README and all code files as a single zip file named GROUP_[number]_NC2425_PA. The names and IDs of the group components must be mentioned in the README.
Do not include the dataset in your submission.

### Grading

1. Correct CNN implementation, training runs on the uni DSLab computers according to the README.MD instructions without ANY exceptions on the DSLab machines: 3pt
2. Perfect 1:1 reproducibility on DSLab machines: 1pt
3. Very clear github-repo-style README.MD with instructions for running the code: 1pt
4. Report: 1pt
5. Model test performance on test-set: interpolated from 30-80% test-accuracy: 0-3pt
6. Pick 10 random pictures of the test set to simulate a user uploading images and report which categories occur how often in these: 1pt
7. Bonus point: use an LLM (API) to generate short description / profile of preferences of the simulated user

**If there is anything unclear about this assignment please post your question in the Brightspace discussions forum or send an email**


# Loading the datasets
The dataset is already split into a train and test set in the directories "train" and "test". 

In [58]:
### HYPER-PARAMETER SET-UP ###

# Randomization
random_seed = 13

# Model training
workers = 8
batch_size = 16
num_epochs = 50
lr = 0.01
momentum = 0.75

In [59]:
### IMPORTS AND DEVICE ###

import torch
import numpy as np
import torch.nn as nn
from time import time
import torch.nn.functional as F
from torchvision import datasets
from torch.autograd import Variable
from datetime import datetime as dt
from torchvision.transforms import v2
from torch.utils.data import DataLoader
import torch.optim.lr_scheduler as lr_scheduler


# Use the GPU to speed up the training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [60]:
### DATA LOADING ###

# Allow repeatability
np.random.seed(random_seed)
torch.manual_seed(random_seed)
torch.cuda.manual_seed_all(random_seed)

# Define the transformer for the data
train_transform = v2.Compose([
    v2.Resize((256, 256)),
    v2.RandomApply([v2.TrivialAugmentWide()], p=0.5),
    v2.ToImage(), 
    v2.ToDtype(torch.float32, scale=True),
    v2.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

test_transform = v2.Compose([
    v2.Resize((256, 256)),
    v2.ToImage(), 
    v2.ToDtype(torch.float32, scale=True),
    v2.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])

# Collect the data and set the augmentation
train_dataset = datasets.ImageFolder(root='../train', transform=train_transform)
train_dataset, val_dataset = torch.utils.data.random_split(train_dataset, [0.9, 0.1])
test_dataset = datasets.ImageFolder(root='../test', transform=test_transform) 

# Prepare the data loaders with the batches, shuffling, and workers
train_loader = DataLoader(train_dataset, batch_size, shuffle=True, num_workers=workers)
test_loader = DataLoader(test_dataset, batch_size, shuffle=False, num_workers=workers)
val_loader = DataLoader(val_dataset, batch_size, shuffle=False, num_workers=workers)

In [61]:
len(train_loader), len(val_loader), len(test_loader)

(2573, 286, 1408)

# CNN Implementation

In [74]:
### ResNet ###

class Block(nn.Module):
    def __init__(self, in_channels, mid_channels, stride=1, expansion=4):
        super().__init__()
        self.layer = nn.Sequential(nn.Conv2d(in_channels, mid_channels, 1, 1, 0),
                                   nn.SELU(),
                                   nn.BatchNorm2d(mid_channels),
                                   nn.Conv2d(mid_channels, mid_channels, 3, stride, 1),
                                   nn.SELU(),
                                   nn.Conv2d(mid_channels, mid_channels*expansion, 1, 1, 0),
                                   nn.BatchNorm2d(mid_channels*expansion))
        
        self.proj = False
        if in_channels != (mid_channels*expansion) or stride != 1:
            self.proj = True
            self.p_layer = nn.Conv2d(in_channels, mid_channels*4, 1, stride)

    def forward(self, x):
        res = x
        out = self.layer(x)
        if self.proj:
            res = self.p_layer(x)
        return nn.SELU()(out+res)


class ResNet(nn.Module):
    """
    Made to emulate the Residual Network.
    """
    def __init__(self, num_blocks=None):
        super().__init__()
        if num_blocks is None:
            num_blocks = [3, 4, 6, 3]
        self.layer1 = nn.Sequential(nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
                                    nn.SELU(),
                                    nn.MaxPool2d(3, 2, 1))          # 64

        self.layer2 = self._make_layer(64, 64, num_blocks[0], 1)
        self.layer3 = self._make_layer(256, 128, num_blocks[1])     # 32
        self.layer4 = self._make_layer(512, 256, num_blocks[2])     # 16
        self.layer5 = self._make_layer(1024, 512, num_blocks[3])    # 8

        self.layer6 = nn.Sequential(nn.AvgPool2d(8),
                                    nn.Flatten(),
                                    nn.Dropout(0.25),
                                    nn.Linear(2048, 1024),
                                    nn.ReLU(),
                                    nn.Dropout(0.25),
                                    nn.Linear(1024, 91))
        
    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.layer5(x)
        x = self.layer6(x)
        return x
    
    def _make_layer(self, in_channels, mid_channels, blocks, stride=2, expansion=4):
        layers = [Block(in_channels, mid_channels, stride)]
        for _ in range(1, blocks):
            layers.append(Block(mid_channels*expansion, mid_channels))
        return nn.Sequential(*layers)

In [6]:
### GoogLeNet ####

class Block(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=1, stride=1, padding=0):
        super().__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)
        self.bn = nn.BatchNorm2d(out_channels)

    def forward(self, x):
        return F.silu(self.bn(self.conv(x)))


class MultiStream(nn.Module):
    def __init__(self, in_channels, s1, s3, s5):
        super().__init__()
        self.stream1 = nn.Sequential(nn.AvgPool2d(3, 1, 1), Block(in_channels, s1))     # 1x1
        self.stream2 = nn.Sequential(Block(in_channels, s3), Block(s3, s3, 3, 1, 1))    # 3x3
        self.stream3 = nn.Sequential(Block(in_channels, s5), Block(s5, s5, 5, 1, 2))    # 4x4
    
    def forward(self, x):
        return torch.cat([self.stream1(x), self.stream2(x), self.stream3(x)], 1)


class GoogLeNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(3, 32, 3, 1, 1), 
            nn.ReLU(),
            nn.BatchNorm2d(32), 
            nn.MaxPool2d(2, 2))  # 256 -> 128

        self.layer2 = nn.Sequential(
            nn.Conv2d(32, 64, 3, 1, 1), 
            nn.ReLU(), 
            nn.MaxPool2d(2, 2))  # 128 -> 64

        self.layer3 = nn.Sequential(
            MultiStream(64, 32, 64, 32),
            MultiStream(128, 16, 80, 32),
            nn.MaxPool2d(2, 2))  # 64 -> 32

        self.layer4 = nn.Sequential(
            MultiStream(128, 64, 128, 64),
            MultiStream(256, 32, 160, 64),
            nn.MaxPool2d(2, 2))  # 32 -> 16

        self.layer5 = nn.Sequential(
            MultiStream(256, 256, 192, 64),
            MultiStream(512, 128, 256, 128),
            MultiStream(512, 64, 320, 128),
            nn.AvgPool2d(2, 2))  # 16 -> 8

        self.layer6 = nn.Sequential(
            nn.Conv2d(512, 1024, 4, 2, 1),
            nn.ReLU(),
            nn.BatchNorm2d(1024),
            nn.MaxPool2d(2, 2),
            nn.Conv2d(1024, 1024, 1, 1, 0),
            nn.MaxPool2d(2, 2)
        )

        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Dropout(),
            nn.Linear(1024, 1024),
            nn.Dropout(),
            nn.ReLU(),
            nn.Linear(1024, 1024),
            nn.ReLU(),
            nn.Linear(1024, 91))


    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.layer5(x)
        x = self.layer6(x)
        x = self.fc(x)
        return x


In [34]:
### AlexNet ###

class AlexNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.blck1 = nn.Sequential(
            nn.Conv2d(3, 64, 11, 4, 4),
            nn.SELU(),
            nn.MaxPool2d(2, 2))  # 32

        self.blck2 = nn.Sequential(
            nn.Conv2d(64, 128, 7, 2, 3),
            nn.SELU(),
            nn.MaxPool2d(2, 2))  # 8

        self.blck3 = nn.Sequential(
            nn.Conv2d(128, 256, 5, 1, 2), 
            nn.SELU(),
            nn.BatchNorm2d(256),
            
            nn.Conv2d(256, 256, 3, 1, 1), 
            nn.SELU(),

            nn.Conv2d(256, 512, 3, 1, 1), 
            nn.SELU(),
            nn.BatchNorm2d(512),
            nn.MaxPool2d(2, 2))  # 4
        
        self.blck4 = nn.Sequential(
            nn.Conv2d(256, 512, 5, 1, 2), 
            nn.SELU(),
            nn.Conv2d(512, 512, 3, 1, 1), 
            nn.SELU(),
            nn.Conv2d(512, 1024, 3, 1, 1), 
            nn.SELU(),
            nn.BatchNorm2d(1024),
            nn.MaxPool2d(2, 2))  # 2
        
        self.fcll = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),
            nn.Flatten(),
            nn.Dropout(0.25),
            nn.Linear(512, 512),
            nn.ReLU(),
            nn.Dropout(0.25),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 91)
        )

    def forward(self, x):
        x = self.blck1(x)
        x = self.blck2(x)
        x = self.blck3(x)
        x = self.fcll(x)
        return x

In [75]:
x = torch.randn(batch_size, 3, 256, 256)
trial = ResNet()
print(trial(x).shape)

torch.Size([16, 91])


# Training the model
Implement your training process below. Report the test-accuracy after every epoch for the training run of the final model.

Hint: before training your model make sure to reset the seed in the training cell, as otherwise the seed may have changed due to previous training runs in the notebook

Note: If you implement automatic hyperparameter tuning, split the train set into train and validation subsets for the objective function.

In [37]:
### TRAINING FUNCTIONS ###

def train(model, dataloader, optimizer, loss_fn, updates):
    """
    One epoch of model training.
    """
    model.train()

    running_loss = 0.0
    running_accuracy = 0.0
    for idx, data in enumerate(dataloader):
        images, labels = data[0].to(device), data[1].to(device)

        # Forward propagate
        optimizer.zero_grad()
        outputs = model(images)
        correct = sum(labels == torch.argmax(outputs, 1)).item()
        running_accuracy += correct/batch_size

        # Back-propagate and optimize
        loss = loss_fn(outputs, labels)
        running_loss += loss.item()
        loss.backward()
        optimizer.step()

        # Print a snapshot of the training
        if idx % updates == (updates-1):
            avg_loss_across_batches = running_loss / updates
            avg_acc_across_batches = (running_accuracy / updates) * 100
            print('Batch {}, Loss: {:.3f}, Accuracy: {:.2f}%'.format(idx+1, avg_loss_across_batches, avg_acc_across_batches))
            running_loss = 0.0
            running_accuracy = 0.0


def validate(model, dataloader, loss_fn):
    """
    One validation test.
    """
    model.eval()

    running_loss = 0.0
    running_accuracy = 0.0
    for data in iter(dataloader):
        images, labels = data[0].to(device), data[1].to(device)

        # With no gradient to increase efficiency
        with torch.no_grad():
            outputs = model(images)
            correct = sum(labels == torch.argmax(outputs, 1)).item()
            running_accuracy += correct/batch_size

            loss = loss_fn(outputs, labels)
            running_loss += loss.item()

    avg_loss_across_batches = running_loss / len(dataloader)
    avg_acc_across_batches = (running_accuracy / len(dataloader)) * 100
    
    print('Val Loss: {:.3f}, Val Accuracy: {:.2f}%'.format(avg_loss_across_batches, avg_acc_across_batches))
    return avg_loss_across_batches


def accuracy(model, dataloader):
    """
    One test accuracy test.
    """
    model.eval()

    running_accuracy = 0.0
    for data in iter(dataloader):
        images, labels = data[0].to(device), data[1].to(device)

        with torch.no_grad():
            outputs = model(images)
            correct = sum(labels == torch.argmax(outputs, 1)).item()
            running_accuracy += correct/batch_size

    avg_acc_across_batches = (running_accuracy / len(dataloader)) * 100
    print('Accuracy: {:.2f}%'.format(avg_acc_across_batches))
    return avg_acc_across_batches


def run_epochs(num_epochs, model, optimizer, loss_fn, scheduler=None, epoch=0, regret=10.0, grudge=0, pride=0.0, updates=500):
    """
    An entire training session.
    """
    upper = epoch+num_epochs
    for epoch in range(epoch, upper):
        # Start epoch
        print(f"Epoch {epoch+1} begin: {dt.now()}.")
        start = time()

        # Train the model
        train(model, train_loader, optimizer, loss_fn, updates)
        print('Completed training after {:.2f} seconds.'.format((time() - start)))
        check_point = time()

        # Validate the model
        val_loss = validate(model, val_loader, loss_fn)
        print('Completed validation after {:.2f} seconds.'.format((time() - check_point)))
        check_point = time()

        # Test the model
        acc = accuracy(model, test_loader)
        print('Completed testing after {:.2f} seconds.'.format((time() - check_point)))

        # Update learning rate
        if scheduler is not None:
            scheduler.step(val_loss)

        # Output an update
        print('Total epoch time: {:.2f} seconds.'.format((time() - start)))
        print('Epoch [{}/{}]'.format(epoch+1, upper))
        check_point = time()

        # Update emotional state
        if pride < acc:
            pride = acc
            print("Check-point!")
            torch.save(model.state_dict(), "./checkpoint-2.pth")
        grudge = 0 if val_loss <= regret else grudge + 1
        regret = val_loss
        if 6 < grudge:
            print("Overfitting!")
            torch.save(model.state_dict(), "./endpoint-2.pth")
            break
        print("####################################################################################################")
    return epoch+1, regret, grudge, pride

In [41]:
class FocalLoss(nn.Module):
    """
    Sourced from "Focal Loss for Dense Object Detection" (2017)
    """
    def __init__(self, gamma=0, alpha=None, size_average=True):
        super().__init__()
        self.gamma = gamma
        self.alpha = alpha
        self.size_average = size_average        
        if isinstance(alpha, list):
            self.alpha = torch.Tensor(alpha).to(device)
        if isinstance(alpha, (float, int)):
            self.alpha = torch.Tensor([alpha, 1-alpha]).to(device)

    def forward(self, outputs, labels):
        if 2 < outputs.dim():
            outputs = outputs.view(outputs.size(0), outputs.size(1), -1)
            outputs = outputs.transpose(1, 2)
            outputs = outputs.contiguous().view(-1, outputs.size(2))
        labels = labels.view(-1, 1)

        logpt = F.log_softmax(outputs, dim=1)  # Change to include dimension
        logpt = logpt.gather(1, labels)
        logpt = logpt.view(-1)
        pt = Variable(logpt.data.exp())

        if self.alpha is not None:
            if self.alpha.type() != outputs.data.type():
                self.alpha = self.alpha.type_as(outputs.data)
            at = self.alpha.gather(0, labels.data.view(-1))
            logpt *= Variable(at)

        loss = -1 * (1-pt)**self.gamma * logpt
        if self.size_average:
            return loss.mean()
        return loss.sum()

In [None]:
### TRAINING ###

# Allow repeatability
np.random.seed(random_seed)
torch.manual_seed(random_seed)
torch.cuda.manual_seed_all(random_seed)

# Create model object and connect it to the corresponding processing
model = ResNet().to(device)

# Define loss and optimizer
loss_fn = nn.CrossEntropyLoss()  # loss_fn = FocalLoss(gamma=1.5)
optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=momentum)
scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, patience=4)

# Define mental state and start
epoch, regret, grudge, pride, updates = 0, 10.0, 0, 0.0, 500
epoch, regret, grudge, pride = run_epochs(num_epochs, model, optimizer, loss_fn, scheduler, epoch, regret, grudge, pride, updates)

Epoch 1 begin: 2025-05-12 16:04:37.115669.
Batch 500, Loss: 4.522, Accuracy: 2.35%
Batch 1000, Loss: 4.430, Accuracy: 2.85%
Batch 1500, Loss: 4.359, Accuracy: 3.71%


In [None]:
model, seed, batch_size, lr, momentum

In [None]:
### FURTHER TRAINING ###
epoch, regret, grudge, pride = run_epochs(num_epochs, model, optimizer, loss_fn, scheduler, epoch, 10.0, 0, pride, updates)

Epoch 12 begin: 2025-05-07 19:04:35.135978.
Batch 1000, Loss: 4.501, Accuracy: 2.49%
Batch 2000, Loss: 4.500, Accuracy: 2.74%
Batch 3000, Loss: 4.501, Accuracy: 2.48%
Batch 4000, Loss: 4.500, Accuracy: 2.71%
Batch 5000, Loss: 4.500, Accuracy: 2.65%
Completed training after 187.65 seconds.
Val Loss: 4.500, Val Accuracy: 2.67%
Completed validation after 7.11 seconds.
Accuracy: 2.74%
Completed testing after 34.00 seconds.
Total epoch time: 228.76 seconds.
Epoch [12/61]
####################################################################################################
Epoch 13 begin: 2025-05-07 19:08:23.899848.
Batch 1000, Loss: 4.499, Accuracy: 2.79%
Batch 2000, Loss: 4.504, Accuracy: 2.41%
Batch 3000, Loss: 4.502, Accuracy: 2.39%
Batch 4000, Loss: 4.502, Accuracy: 2.55%


In [15]:
# Nico's net

np.random.seed(random_seed)
torch.manual_seed(random_seed)
torch.cuda.manual_seed_all(random_seed)

model = AlexNet().to(device)
loss_fn = nn.CrossEntropyLoss() 
optimizer = torch.optim.SGD(model.parameters(), lr=0.01, momentum=0.38)
scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, patience=2)

# Train model
for epoch in range(20):
    print(f"Epoch {epoch+1} begin: {dt.now()}.")
    start = time()

    train(model, train_loader, optimizer, loss_fn, 500)
    print('Completed training after {:.2f} seconds.'.format((time() - start)))
    check_point = time()

    val_loss = validate(model, val_loader, loss_fn)
    print('Completed validation after {:.2f} seconds.'.format((time() - check_point)))
    check_point = time()

    accuracy(model, test_loader)
    print('Completed testing after {:.2f} seconds.'.format((time() - check_point)))
    scheduler.step(val_loss)

    # Output an update
    print('Total epoch time: {:.2f} seconds.'.format((time() - start)))
    print('Epoch [{}/{}]'.format(epoch+1, 20))
    print("####################################################################################################")

Epoch 1 begin: 2025-05-12 13:20:32.985275.
Batch 500, Loss: 4.511, Accuracy: 0.97%
Batch 1000, Loss: 4.511, Accuracy: 1.46%
Batch 1500, Loss: 4.511, Accuracy: 1.35%
Batch 2000, Loss: 4.511, Accuracy: 1.38%
Batch 2500, Loss: 4.511, Accuracy: 1.62%
Completed training after 31.92 seconds.
Val Loss: 4.511, Val Accuracy: 2.32%
Completed validation after 2.02 seconds.


KeyboardInterrupt: 

# Calculating model performance
Load the best version of your model ( which should be produced and saved by previous cells ), calculate and report the test accuracy.

In [None]:
### MODEL COMPARISONS ###

# Load the best model weights
model2 = CNN().to(device)
model2.load_state_dict(torch.load("checkpoint.pth", weights_only=True))

# Calculate and present final scores
final_test_acc = accuracy(model2, test_loader)
print(f"Final Test Accuracy: {final_test_acc:.2f}%")


# Summary of hyperparameters
Report the hyperparameters ( learning rate etc ) that you used in your final model for reproducibility.

# Simulation of random user
Pick 10 random pictures of the test set to simulate a user uploading images and report which categories occur how often in these: 1pt

In [None]:
# Your code here
# Below an example showing the format of the code output

# Bonus point
Use an LLM (API) to generate a description of the food preference of a user based on 10 images that a potential user could provide. 
Please include an example of the output of your code, especially if you used an API other than the OpenAI API.

This should work well even with differing test images by setting different random seeds for the image selector.