In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import cv2
import albumentations as A
from albumentations.pytorch import ToTensorV2
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import confusion_matrix
import torch.optim as optim
import pytorch_lightning as pl
import torchmetrics

In [2]:
class FoodDataset(Dataset):
    def __init__(self, data_type=None, transforms=None, size_scale=1):
        if data_type is None:
            raise Exception("You are useless")
        if size_scale <= 0 or size_scale > 1:
            raise Exception("You are useless")
        self.path = 'data/Food-5K/' + data_type + '/'
        self.images_name = os.listdir(self.path)
        self.transforms = transforms
        self.size_scale = size_scale
    
    def __len__(self):
        return int(len(self.images_name) * self.size_scale)
    
    def __getitem__(self, idx):
        data = self.images_name[idx]
        
        label = data.split('_')[0]
        label = int(label)
        label = torch.tensor(label)
        
        image = cv2.imread(self.path + data)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        if self.transforms:
            aug = self.transforms(image=image)
            image = aug['image']
        
        return (image, label)

In [3]:
image_transformation = A.Compose([
                            A.RandomResizedCrop(256, 256),
                            A.HorizontalFlip(),
                            A.Normalize(),
                            ToTensorV2()
                        ])
train_dataset = FoodDataset('training', image_transformation)
val_dataset = FoodDataset('validation', image_transformation)
test_dataset = FoodDataset('evaluation', image_transformation)

trainloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
validloader = DataLoader(val_dataset, batch_size=32)
testloader = DataLoader(test_dataset, batch_size=32)

In [4]:
class LitModel(pl.LightningModule):
    def __init__(self, model):
        super().__init__()
        self.model = model
        self.loss = nn.CrossEntropyLoss()
        self.accuracy = torchmetrics.Accuracy(average='macro', num_classes=6)
        self.recall = torchmetrics.Recall(average='macro', num_classes=6)
        self.f1 = torchmetrics.F1Score(average='macro', num_classes=6)
        self.precision_metrics = torchmetrics.Precision(average='macro', num_classes=6)
        
    def forward(self, x):
        y_pred = self.model(x)
        return y_pred
    
    def configure_optimizers(self):
        optimizer = optim.Adam(self.model.parameters(), lr=0.008)
        return optimizer
    
    def get_metrics(self, y_pred, y, metrics_type="train"):
        predictions = y_pred.argmax(-1)
        y = y.type(torch.IntTensor)
        
        precision = self.precision_metrics(predictions, y)
        acc = self.accuracy(predictions, y)
        rec = self.recall(predictions, y)
        f1 = self.f1(predictions, y)

        self.log('acc/test', acc)
        self.log('rec/test', rec)
        self.log('f1/test', f1)
        self.log('prec/test', precision)

    
    def training_step(self, batch, batch_idx):
        x, y = batch
        
        y_pred = self.forward(x).squeeze()
        # y_pred = torch.unsqueeze(y_pred, 0)
        loss = self.loss(y_pred, y)
        
        self.get_metrics(y_pred, y, "train")
        return loss
    
    def validation_step(self, val_batch, batch_idx):
        x, y = val_batch

        y_pred = self.forward(x).squeeze()
        # y_pred = torch.unsqueeze(y_pred, 0)
        loss = self.loss(y_pred, y)

        self.get_metrics(y_pred, y, "val")
        return loss
    
    def test_step(self, batch, batch_idx):
        x, y = batch

        y_pred = self.forward(x).squeeze()
        # y_pred = torch.unsqueeze(y_pred, 0)
        loss = self.loss(y_pred, y)

        self.get_metrics(y_pred, y, "test")
        return loss


In [14]:
def train_model(resnet_model, model_name):
    gpu = 1 if torch.cuda.is_available() else 0
    resnet_model.fc = nn.Linear(resnet_model.fc.in_features, 2)
    resnet_model = LitModel(model)
    trainer_resnet_model= pl.Trainer(max_epochs=3, gpus=gpu)
    trainer_resnet_model.fit(resnet_model, trainloader, validloader)
    torch.save(resnet_model, model_name)

In [15]:
from torchvision import models

In [16]:
model = models.resnet101(weights='IMAGENET1K_V1')
train_model(model, 'resnet101.pth')

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs

  | Name              | Type             | Params
-------------------------------------------------------
0 | model             | ResNet           | 42.5 M
1 | loss              | CrossEntropyLoss | 0     
2 | accuracy          | Accuracy         | 0     
3 | recall            | Recall           | 0     
4 | f1                | F1Score          | 0     
5 | precision_metrics | Precision        | 0     
-------------------------------------------------------
42.5 M    Trainable params
0         Non-trainable params
42.5 M    Total params
170.017   Total estimated model params size (MB)


Sanity Checking: 0it [00:00, ?it/s]

Training: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

Validation: 0it [00:00, ?it/s]

`Trainer.fit` stopped: `max_epochs=3` reached.


### ResNet101

In [17]:
model = torch.load('resnet101.pth', map_location=torch.device('cpu'))
model = LitModel(model)
trainer_model = pl.Trainer(max_epochs=20, accelerator='cpu')
trainer_model.test(model, testloader)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs


Testing: 0it [00:00, ?it/s]

[{'acc/test': 0.3923666775226593,
  'rec/test': 0.3923666775226593,
  'f1/test': 0.44120630621910095,
  'prec/test': 0.5224000215530396}]