# Food Classification with CNN - Building a Restaurant Recommendation System

This assignment focuses on developing a deep learning-based food classification system using Convolutional Neural Networks (CNNs). You will build a model that can recognize different food categories and use it to return the food preferences of a user.

## Learning Objectives
- Implement CNNs for image classification
- Work with real-world food image datasets
- Build a preference-detector system

## Background: AI-Powered Food Preference Discovery

The system's core idea is simple:

1. Users upload 10 photos of dishes they enjoy
2. Your CNN classifies these images into the 91 categories
3. Based on these categories, the system returns the user's taste profile

Your task is to develop the core computer vision component that will power this detection engine.

You are given a training ("train" folder) and a test ("test" folder) dataset which have ~45k and ~22k samples respectively. For each one of the 91 classes there is a subdirectory containing the images of the respective class.

## Assignment Requirements

### Technical Requirements
- Implement your own pytorch CNN architecture for food image classification
- Use only the provided training dataset split for training
- Train the network from scratch ; No pretrained weights can be used
- Report test-accuracy after every epoch
- Report all hyperparameters of final model
- Use a fixed seed and do not use any CUDA-features that break reproducibility
- Use Pytorch 2.6

### Deliverables
1. Jupyter Notebook with CNN implementation, training code etc.
2. README file
3. Report (max 3 pages)

Submit your report, README and all code files as a single zip file named GROUP_[number]_NC2425_PA. The names and IDs of the group components must be mentioned in the README.
Do not include the dataset in your submission.

### Grading

1. Correct CNN implementation, training runs on the uni DSLab computers according to the README.MD instructions without ANY exceptions on the DSLab machines: 3pt
2. Perfect 1:1 reproducibility on DSLab machines: 1pt
3. Very clear github-repo-style README.MD with instructions for running the code: 1pt
4. Report: 1pt
5. Model test performance on test-set: interpolated from 30-80% test-accuracy: 0-3pt
6. Pick 10 random pictures of the test set to simulate a user uploading images and report which categories occur how often in these: 1pt
7. Bonus point: use an LLM (API) to generate short description / profile of preferences of the simulated user

**If there is anything unclear about this assignment please post your question in the Brightspace discussions forum or send an email**


# Loading the datasets
The dataset is already split into a train and test set in the directories "train" and "test". 

In [16]:
### HYPER-PARAMETER SET-UP ###

# Randomization
random_seed = 42
# Model training
workers = 4
batch_size = 16
num_epochs = 50
lr = 0.01
momentum = 0.9

In [2]:
### IMPORTS AND DEVICE ###

import torch
import numpy as np
import torch.nn as nn
from time import time
import torch.nn.functional as F
from datetime import datetime as dt
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import torch.optim.lr_scheduler as lr_scheduler


# Use the GPU to speed up the training
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [3]:
### DATA LOADING ###

# Allow repeatability
np.random.seed(random_seed)
torch.manual_seed(random_seed)
torch.cuda.manual_seed_all(random_seed)

# Define the transformer for the data
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
])


full_dataset = datasets.ImageFolder(root='./train', transform=transform)
train_dataset, val_dataset = torch.utils.data.random_split(full_dataset, [0.9, 0.1])
test_dataset = datasets.ImageFolder(root='./test', transform=transform)

train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, num_workers=workers)
val_loader = DataLoader(dataset=val_dataset, batch_size=batch_size, shuffle=False, num_workers=workers)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=False, num_workers=workers)

classes = test_dataset.classes

# CNN Implementation

In [None]:
class Block(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        self.layer1 = nn.Sequential(nn.Conv2d(in_channels, out_channels, 3, stride, 1),
                                    nn.BatchNorm2d(out_channels),
                                    nn.ReLU())
        
        self.layer2 = nn.Sequential(nn.Conv2d(out_channels, out_channels, 3, 1, 1),
                                    nn.BatchNorm2d(out_channels))
        
        self.proj = False
        if in_channels != out_channels or stride != 1:
            self.proj = True
            self.p_layer = nn.Conv2d(in_channels, out_channels, 1, stride)

    def forward(self, x):
        res = x
        out = self.layer1(x)
        out = self.layer2(out)
        if self.proj:
            res = self.p_layer(x)
        # print(out.shape, res.shape, x.shape)
        return F.relu(out+res)


class CNN(nn.Module):
    def __init__(self, num_blocks):
        super().__init__()
        self.layer1 = nn.Sequential(nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
                                    nn.BatchNorm2d(64),
                                    nn.ReLU())                      # 128
        
        self.max = nn.MaxPool2d(3, 2, 1)                            # 64

        self.layer2 = self._make_layer(64, 64, num_blocks[0], 1)
        self.layer3 = self._make_layer(64, 128, num_blocks[1])      # 32
        self.layer4 = self._make_layer(128, 256, num_blocks[2])     # 16
        self.layer5 = self._make_layer(256, 512, num_blocks[3])     # 8

        self.avg = nn.AvgPool2d(7, 1)                               # 2

        self.layer6 = nn.Sequential(nn.Flatten(),
                                    nn.Linear(512*2*2, 512),
                                    nn.Dropout(),
                                    nn.ReLU(),              # Deleted a FC layer
                                    nn.Linear(512, 91))
        
    def forward(self, x):
        x = self.layer1(x)
        x = self.max(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.layer5(x)
        x = self.avg(x)
        x = self.layer6(x)
        return x
    
    def _make_layer(self, in_channels, out_channels, blocks, stride=2):
        layers = [Block(in_channels, out_channels, stride)]
        for _ in range(1, blocks):
            layers.append(Block(out_channels, out_channels))
        return nn.Sequential(*layers)

In [5]:
x = torch.randn(batch_size, 3, 256, 256)
trial = CNN([1, 1, 1, 1])
print(trial(x).shape)

torch.Size([16, 91])


# Training the model
Implement your training process below. Report the test-accuracy after every epoch for the training run of the final model.

Hint: before training your model make sure to reset the seed in the training cell, as otherwise the seed may have changed due to previous training runs in the notebook

Note: If you implement automatic hyperparameter tuning, split the train set into train and validation subsets for the objective function.

In [11]:
### TRAINING FUNCTIONS ###

def train(model, dataloader, optimizer, loss_fn):
    """
    One epoch of model training.
    """
    model.train()

    running_loss = 0.0
    running_accuracy = 0.0
    for idx, data in enumerate(dataloader):
        images, labels = data[0].to(device), data[1].to(device)

        # Forward propagate
        optimizer.zero_grad()
        outputs = model(images)
        correct = sum(labels == torch.argmax(outputs, 1)).item()
        running_accuracy += correct/batch_size

        # Back-propagate and optimize
        loss = loss_fn(outputs, labels)
        running_loss += loss.item()
        loss.backward()
        optimizer.step()

        # Print a snapshot of the training
        if idx % 500 == 499:
            avg_loss_across_batches = running_loss / 500
            avg_acc_across_batches = (running_accuracy / 500) * 100
            print('Batch {}, Loss: {:.3f}, Accuracy: {:.2f}%'.format(idx+1, avg_loss_across_batches, avg_acc_across_batches))
            running_loss = 0.0
            running_accuracy = 0.0


def validate(model, dataloader, loss_fn):
    """
    One validation test.
    """
    model.eval()

    running_loss = 0.0
    running_accuracy = 0.0
    for data in iter(dataloader):
        images, labels = data[0].to(device), data[1].to(device)

        # With no gradient to increase efficiency
        with torch.no_grad():
            outputs = model(images)
            correct = sum(labels == torch.argmax(outputs, 1)).item()
            running_accuracy += correct/batch_size

            loss = loss_fn(outputs, labels)
            running_loss += loss.item()

    avg_loss_across_batches = running_loss / len(dataloader)
    avg_acc_across_batches = (running_accuracy / len(dataloader)) * 100
    
    print('Val Loss: {:.3f}, Val Accuracy: {:.2f}%'.format(avg_loss_across_batches, avg_acc_across_batches))
    return avg_loss_across_batches


def accuracy(model, dataloader):
    """
    One test accuracy test.
    """
    model.eval()

    running_accuracy = 0.0
    for data in iter(dataloader):
        images, labels = data[0].to(device), data[1].to(device)

        with torch.no_grad():
            outputs = model(images)
            correct = sum(labels == torch.argmax(outputs, 1)).item()
            running_accuracy += correct/batch_size

    avg_acc_across_batches = (running_accuracy / len(dataloader)) * 100
    print('Accuracy: {:.2f}%'.format(avg_acc_across_batches))
    return avg_acc_across_batches


def run_epochs(num_epochs, model, optimizer, loss_fn, scheduler, epoch=0, regret=10.0, grudge=0, pride=0.0):
    """
    An entire training session.
    """
    upper = epoch+num_epochs
    for epoch in range(epoch, upper):
        # Start epoch
        print(f"Epoch {epoch+1} begin: {dt.now()}.")
        start = time()

        # Train the model
        train(model, train_loader, optimizer, loss_fn)
        print('Completed training after {:.2f} seconds.'.format((time() - start)))
        check_point = time()

        # Validate the model
        val_loss = validate(model, val_loader, loss_fn)
        print('Completed validation after {:.2f} seconds.'.format((time() - check_point)))
        check_point = time()

        # Test the model
        acc = accuracy(model, test_loader)
        print('Completed testing after {:.2f} seconds.'.format((time() - check_point)))

        # Update learning rate
        scheduler.step(val_loss)

        # Output an update
        print('Total epoch time: {:.2f} seconds.'.format((time() - start)))
        print('Epoch [{}/{}]'.format(epoch+1, upper))
        check_point = time()

        # Update emotional state
        if pride < acc:
            pride = acc
            print("Check-point!")
            torch.save(model.state_dict(), "./checkpoint.pth")
        grudge = 0 if val_loss <= regret else grudge + 1
        regret = val_loss
        if 2 < grudge:
            print("Overfitting!")
            torch.save(model.state_dict(), "./endpoint.pth")
            break
        print("####################################################################################################")
    return epoch+1, regret, grudge, pride

In [None]:
### TRAINING SET-UP ###

# Allow repeatability
np.random.seed(random_seed)
torch.manual_seed(random_seed)
torch.cuda.manual_seed_all(random_seed)

# Create model object and connect it to the corresponding processing
model = CNN([3, 4, 6, 2]).to(device)  # Modified from 2, 2, 2, 2

# Define loss and optimizer
loss_fn = nn.CrossEntropyLoss() 
optimizer = torch.optim.Adam(model.parameters(), lr=lr)  #, momentum=momentum)
scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, patience=2)

# Define mental state
epoch, regret, grudge, pride = 0, 10.0, 0, 0.0

In [None]:
### ACTUAL TRAINING ###
epoch, regret, grudge, pride = run_epochs(num_epochs, model, optimizer, loss_fn, scheduler, epoch, regret, grudge, pride)

Epoch 1 begin: 2025-05-05 15:56:53.916473.
Batch 500, Loss: 4.507, Accuracy: 1.73%
Batch 1000, Loss: 4.463, Accuracy: 2.05%
Batch 1500, Loss: 4.439, Accuracy: 2.35%
Batch 2000, Loss: 4.411, Accuracy: 2.53%
Batch 2500, Loss: 4.395, Accuracy: 2.49%
Completed training after 195.62 seconds.
Val Loss: 4.325, Val Accuracy: 3.56%
Completed validation after 18.58 seconds.


# Calculating model performance
Load the best version of your model ( which should be produced and saved by previous cells ), calculate and report the test accuracy.

In [None]:
### MODEL COMPARISONS ###

# Load the best model weights
model2 = CNN().to(device)
model2.load_state_dict(torch.load("checkpoint.pth", weights_only=True))

# Calculate and present final scores
final_test_acc = accuracy(model2, test_loader)
print(f"Final Test Accuracy: {final_test_acc:.2f}%")


Accuracy: 41.82%
Final Test Accuracy: 41.82%


# Summary of hyperparameters
Report the hyperparameters ( learning rate etc ) that you used in your final model for reproducibility.

# Simulation of random user
Pick 10 random pictures of the test set to simulate a user uploading images and report which categories occur how often in these: 1pt

In [None]:
# Your code here
# Below an example showing the format of the code output

# Bonus point
Use an LLM (API) to generate a description of the food preference of a user based on 10 images that a potential user could provide. 
Please include an example of the output of your code, especially if you used an API other than the OpenAI API.

This should work well even with differing test images by setting different random seeds for the image selector.