In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
from tqdm import tqdm
from PIL import Image
import matplotlib.pyplot as plt
import numpy as np
import shutil as sh
import os
import time
import random

print(f"Cuda available: {torch.cuda.is_available()}")

Configuration of hyperparams and other variables that will be used later

In [None]:
img_size = 224

# Autoencoder stuff
batch_size = 128
learning_rate = 1e-3
num_epochs = 10
dropout = 0.3
convolutional_kernel = 4
convolutional_stride = 2
convolutional_padding = 1
amount_of_pictures_to_show = 10
encoder_name = "models/encoder_V1.pth"
decoder_name = "models/decoder_V1.pth"
autoencoder_name = "models/autoencoder_V1.pth"

# Classificator stuff
classification_batch_size = 128
classification_learning_rate = 1e-3
classification_epochs = 20
encoder_name_to_load = encoder_name
classifier_name = "models/classifier_V1.pth"



device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Using device {device}")

workers = os.cpu_count() - 2
print(f"Using {workers} workers for loading the datasets")

### Fixing the food-101 dataset structure

In [None]:
def fixFood101(path: str):
    if os.path.exists(path):
        if os.path.exists(path + "/images/test/"):
            print("The structure is already fixed!")
            return
        
        file = open(path + "/meta/train.txt")
        list = file.readlines()
        file.close()

        for item in list:
            os.makedirs(path + "/images/train/"+item.split("/")[0], exist_ok=True)
            sh.move(path + "/images/"+item[:-1]+".jpg", path + "/images/train/"+item[:-1]+".jpg")

        file = open(path + "/meta/test.txt")
        list = file.readlines()
        file.close()

        for item in list:
            os.makedirs(path + "/images/test/"+item.split("/")[0], exist_ok=True)
            sh.move(path + "/images/"+item[:-1]+".jpg", path + "/images/test/"+item[:-1]+".jpg")

        for dirPath, _, _ in os.walk(path + "/images/"):
            try:
                os.rmdir(dirPath)
            except:
                pass
    else:
        print("Couldn't find food-101")
fixFood101("./Datasets/food-101")

### Preprocessing function

In [5]:
def preprocess(org_path: str, train: bool):
    arr = org_path.split("/")
    dst_start = "/".join(arr[:arr.index("Datasets") + 1]) # Gets the path to the Dataset folder 
    dst = ""
    if train:
        dst = dst_start + "/Cleaned_V1/train/" + "/".join(org_path.split("/")[-2:]) # ./Datasets/Cleaned_V1/train/class_name/file_name.extension
    else:
        dst = dst_start + "/Cleaned_V1/validation/" + "/".join(org_path.split("/")[-2:]) # ./Datasets/Cleaned_V1/validation/class_name/file_name.extension

    if not (dst.endswith(".jpg") or dst.endswith(".jpeg")):
        dst = ".".join(dst.split(".")[:-1]) + ".jpg" # ./Datasets/Cleaned_V1/x/class_name/file_name.jpg

    if os.path.exists(dst):
        print("File already preprocessed")
        return

    image = Image.open(org_path)
    image = image.resize((img_size, img_size), Image.Resampling.HAMMING) # Hamming is a resampling filter that produces good quality outputs

    if image.mode in ("RGBA", "LA"): #If the image has transparency, get rid of it
        background = Image.new("RGB", image.size, (255, 255, 255)) #Create a white image to act as the background
        background.paste(image, mask=image.split()[3]) #Apply this background where there is transparency on the image
        image = background

    os.makedirs("/".join(dst.split("/")[:-1]), exist_ok=True) # "/".join(dst.split("/")[:-1] = ./Datasets/Cleaned_V1/x/class_name/
                                                              # exists_ok=True means that if the directory already exists, no error should be thrown or exception raised
    image.save(dst, "JPEG")     

In [6]:
try:
    for dirPath, _, files in os.walk("./Datasets/food-101/images/train/"):
        if files:
            for file in files:
                preprocess(dirPath+"/"+file, True)

    for dirPath, _, files in os.walk("./Datasets/food-101/images/test/"):
        if files:
            for file in files:
                preprocess(dirPath+"/"+file, False)
except FileNotFoundError:
    print("Couldn't find an image. Problably the dataset doesn't exist or its path is wrong")

# Autoencoder

Encodes the images first and then it decodes them, so it learns the most important patterns needed to recreate the image

In [7]:
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()
        
        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=convolutional_kernel, stride=convolutional_stride, padding=convolutional_padding),
            nn.LazyBatchNorm2d(), # LazyBatchNorm() doesn't need any parameter with the dimensions as it gets them itself
            nn.ReLU(),
            nn.Dropout2d(p=dropout),

            nn.Conv2d(64, 128, kernel_size=convolutional_kernel, stride=convolutional_stride, padding=convolutional_padding),
            nn.LazyBatchNorm2d(),
            nn.ReLU(),
            nn.Dropout2d(p=dropout),

            nn.Conv2d(128, 256, kernel_size=convolutional_kernel, stride=convolutional_stride, padding=convolutional_padding),
            nn.LazyBatchNorm2d(),
            nn.ReLU(),
            nn.Dropout2d(p=dropout),

            nn.Conv2d(256, 512, kernel_size=convolutional_kernel, stride=convolutional_stride, padding=convolutional_padding),
            nn.LazyBatchNorm2d(),
            nn.ReLU(),
            nn.Dropout2d(p=dropout)
        )
        
        # Decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(512, 256, kernel_size=convolutional_kernel, stride=convolutional_stride, padding=convolutional_padding),
            nn.LazyBatchNorm2d(),
            nn.ReLU(),
            nn.Dropout2d(p=dropout),

            nn.ConvTranspose2d(256, 128, kernel_size=convolutional_kernel, stride=convolutional_stride, padding=convolutional_padding),
            nn.LazyBatchNorm2d(),
            nn.ReLU(),
            nn.Dropout2d(p=dropout),

            nn.ConvTranspose2d(128, 64, kernel_size=convolutional_kernel, stride=convolutional_stride, padding=convolutional_padding),
            nn.LazyBatchNorm2d(),
            nn.ReLU(),
            nn.Dropout2d(p=dropout),

            nn.ConvTranspose2d(64, 3, kernel_size=convolutional_kernel, stride=convolutional_stride, padding=convolutional_padding),
            nn.LazyBatchNorm2d(),
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return encoded, decoded

# Classifier

Uses the encoder from the autoencoder to "get" the important patterns of the image, and uses them for classifiying them

In [8]:
class Classifier(nn.Module):
    def __init__(self, encoder):
        super(Classifier, self).__init__()
        self.encoder = encoder
        for param in self.encoder.parameters():
            param.requires_grad = False # Freezes all layers of the encoder so they don't get retrained (fine tuned)

        # Then some of the last layers get unfrozen so they get fine tuned
        for param in self.encoder[8].parameters():
            param.requires_grad = True # Secon to last Convolution Normalization

        for param in self.encoder[9].parameters():
            param.requires_grad = True # Second to last Batch Normalization

        for param in self.encoder[12].parameters():
            param.requires_grad = True # Last Convolution layer

        for param in self.encoder[13].parameters():
            param.requires_grad = True # Last Batch Normalization

        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(512 * 14 * 14, 101)
            # 512*14*14 is the size of the tensor that nn.Flatten gives, and 101 the amount of classes
        )

    def forward(self, x):
        x = self.encoder(x)
        x = self.fc(x)
        return x
            

# Loading of the datasets

In [9]:
transform_train = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(30),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) # Normalizes the values to the [-1, 1] range (mean 0, stadndard eviation 1)
])

transform_validation = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

train_dataset = ImageFolder('./Datasets/Cleaned_V1/train', transform=transform_train)
validation_dataset = ImageFolder('./Datasets/Cleaned_V1/validation', transform=transform_validation)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=workers)
test_loader = DataLoader(validation_dataset, batch_size=batch_size, shuffle=False, num_workers=workers)
dataloaders = {"train": train_loader, "validation": test_loader}

In [None]:
# Inicialización del modelo, pérdida y optimizador
model = Autoencoder()
model = model.to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
print(f"Device for model: {next(model.parameters()).device}")

In [14]:
def show_image(original, reconstructed, epoch):
    original = original.cpu().numpy().transpose(1, 2, 0)  # Convert the format of the image (H, W, C)
    reconstructed = reconstructed.cpu().numpy().transpose(1, 2, 0)

    # Denormalize the images so they can be printed( [-1, 1] range to [0, 1])
    original = (original + 1) / 2
    original = np.clip(original, 0, 1)
    reconstructed = (reconstructed + 1) / 2
    reconstructed = np.clip(reconstructed, 0, 1)

    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.imshow(original)
    plt.title("Original Image")
    plt.axis('off')

    plt.subplot(1, 2, 2)
    plt.imshow(reconstructed)
    plt.title(f"Reconstructed Image (Epoch {epoch+1})")
    plt.axis('off')

    plt.show()

def get_selected_image():
    iterator = iter(dataloaders["validation"])
    # Iterates a random amount of times the validation dataset. Each iteration is a different batch
    for _ in range(0, random.randrange(0, len(dataloaders["validation"]) - 1)):
        next(iterator)
    image, _ = next(iterator) # Gets the first image of a random batch
    return image

selected_image = get_selected_image() 

# Autoencoder training

In [None]:
fixed_image = selected_image[0].unsqueeze(0).to(device)
show_image(fixed_image[0], fixed_image[0], -2)

since = time.time()
for epoch in range(num_epochs):
    print(f"Epoch {epoch+1}/{num_epochs}")
    for phase in ['train', 'validation']:
        if phase == 'train':
            model.train()  # Training mode
        else:
            model.eval()  # Evaluating mode

        running_loss = 0.0
        running_corrects = 0
        total_samples = 0
        
        with tqdm(total=len(dataloaders[phase]), desc=f"{phase} phase") as pbar:
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # Forward pass
                with torch.set_grad_enabled(phase == 'train'):
                    _, outputs = model(inputs)
                    loss = criterion(outputs, inputs)

                    # Backward pass and optimization only in training phase
                    if phase == 'train':
                        optimizer.zero_grad()
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                total_samples += inputs.size(0)
                
                pbar.set_postfix({"Loss": f"{running_loss / total_samples:.4f}"})
                pbar.update(1)

        epoch_loss = running_loss / len(dataloaders[phase].dataset)
        print(f"{phase} Loss: {epoch_loss:.4f}")

    if phase == "validation":
        # Show images passed through the autoencoder for visual evaluation
        with torch.no_grad():
            # First show always the same image for a continuous evaluation
            fixed_image = selected_image[0].unsqueeze(0).to(device)
            _, reconstructed_image = model(fixed_image)
            show_image(fixed_image[0], reconstructed_image[0], epoch)

            # Then some random images to see how other classes are doing
            for _ in range(0, amount_of_pictures_to_show):
                iterator = iter(dataloaders["validation"])
                for _ in range(0, random.randrange(0, len(dataloaders["validation"]) - 1)):
                    next(iterator)
                image, _ = next(iterator)

                i = random.randrange(0, len(image))
                image = image[i].unsqueeze(0).to(device)
                _, reconstructed_image = model.forward(image)
                show_image(image[0], reconstructed_image[0], epoch)
            
    time_elapsed = time.time() - since
    print(f'Epoch finished at {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')

# Saving the model
torch.save(model.encoder.state_dict(), encoder_name)
torch.save(model.decoder.state_dict(), decoder_name)
torch.save(model.state_dict(), autoencoder_name)

# Training of the classification model

In [None]:
pretrained_encoder = Autoencoder().encoder
pretrained_encoder.load_state_dict(torch.load(encoder_name_to_load))
pretrained_encoder = pretrained_encoder.to(device)

model = Classifier(pretrained_encoder).to(device)

train_loader = DataLoader(train_dataset, batch_size=classification_batch_size, shuffle=True, num_workers=workers)
test_loader = DataLoader(validation_dataset, batch_size=classification_batch_size, shuffle=False, num_workers=workers)
dataloaders = {"train": train_loader, "validation": test_loader}


In [14]:
def train_model(model, criterion, optimizer, scheduler, num_epochs):
    since = time.time()
    train_acc = []
    train_loss = []
    validation_acc = []
    validation_loss = []
    for epoch in range(num_epochs):
        print(f'Epoch {epoch}/{num_epochs - 1}')
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'validation']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            with tqdm(total=len(dataloaders[phase]), desc=f"{phase} phase") as pbar:
                for inputs, labels in dataloaders[phase]:
                    inputs = inputs.to(device)
                    labels = labels.to(device)

                    # forward
                    # track history if only in train
                    with torch.set_grad_enabled(phase == 'train'):
                        outputs = model(inputs)
                        _, preds = torch.max(outputs, 1)
                        loss = criterion(outputs, labels)

                        # backward + optimize only if in training phase
                        if phase == 'train':
                            # zero the parameter gradients
                            optimizer.zero_grad()
                            loss.backward()
                            optimizer.step()

                    # statistics
                    running_loss += loss.item() * inputs.size(0)
                    running_corrects += torch.sum(preds == labels.data)
                    batch_acc = (torch.sum(preds == labels.data).item() / inputs.size(0)) * 100
                    pbar.set_postfix({"Loss": running_loss, "Accuracy": f"{batch_acc:.2f}%"})
                    pbar.update(1)
                    
            if phase == 'train':
                scheduler.step()

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            if phase == "train":
                train_acc.append(float(epoch_acc))
                train_loss.append(float(epoch_loss))
            else:
                validation_acc.append(float(epoch_acc))
                validation_loss.append(float(epoch_loss))

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

        print()

        time_elapsed = time.time() - since
        print(f'Epoch finished at {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')

    torch.save(model.state_dict(), classifier_name)

    plt.subplot(1, 2, 1)
    plt.plot(range(len(train_acc)), train_acc, label="Train")
    plt.plot(range(len(validation_acc)), validation_acc, label="Validation")
    plt.legend(loc="lower right")
    plt.title("Accuracy")

    plt.subplot(1, 2, 2)
    plt.plot(range(len(train_loss)), train_loss, label="Train")
    plt.plot(range(len(validation_loss)), validation_loss, label="Validation")
    plt.legend("lower right")
    plt.title("Loss")

    plt.show()

    return model

In [15]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=classification_learning_rate)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

In [None]:
model = train_model(model, criterion, optimizer, scheduler, classification_epochs)

The image predictor class is in the Flask app