<a href="https://colab.research.google.com/github/edegp/food_cnn/blob/main/Untitled0.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# %cd ConvNeXt/
# !wget https://dl.fbaipublicfiles.com/convnext/convnext_tiny_22k_224.pth

In [2]:
# !git clone https://github.com/facebookresearch/ConvNeXt

In [3]:
import pandas as pd

import os
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms

import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import random_split
from torchvision import datasets, transforms
from torchvision.models import convnext_tiny

In [4]:
demog = pd.read_csv("/content/drive/My Drive/foodReward/data/data_demographic_NCNP.csv")
ques = pd.read_csv("/content/drive/My Drive/foodReward/data/data_questionnare_NCNP.csv")
resp = pd.read_csv("/content/drive/My Drive/foodReward/data/data_responses_NCNP_2types.csv")

In [5]:
res_L_mean = resp.groupby("img")["res_L"].mean()

In [6]:
# Custom Dataset class
class ImageDataset(Dataset):
    def __init__(self, image_dir: str, label_series:pd.Series, transform=None):
        self.image_dir = image_dir
        self.transform = transform
        self.image_files = sorted([f for f in os.listdir(image_dir) if f.endswith('.jpg')])

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_name = os.path.join(self.image_dir, self.image_files[idx])
        image = Image.open(img_name).convert('RGB')

        if self.transform:
            image = self.transform(image)

        # Replace 0 with actual label if available
        label = res_L_mean[idx + 1]

        return image, label

# Define transformations
# transform = transforms.Compose([
#     transforms.Resize(256),
#     transforms.CenterCrop(224),
#     transforms.ToTensor(),
#     transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
# ])

# Load dataset
image_dir = '/content/drive/My Drive/foodReward/data/Database'
dataset = ImageDataset(image_dir=image_dir,label_series= res_L_mean, transform=None)

# Create a data loader
data_loader = DataLoader(dataset, batch_size=32, shuffle=True)

# # Sample batch
# for images, labels in data_loader:
#     print(f'Batch of images shape: {images.shape}')
#     print(f'Batch of labels shape: {labels.shape}')
#     break

In [7]:
# Function to visualize a batch of images
def show_images(images, labels, n=8):
    plt.figure(figsize=(15, 6))
    for i in range(n):
        ax = plt.subplot(2, n//2, i + 1)
        img = images[i].permute(1, 2, 0).numpy()
        img = img * [0.229, 0.224, 0.225] + [0.485, 0.456, 0.406]  # Unnormalize
        img = img.clip(0, 1)
        plt.imshow(img)
        plt.title(f'Label: {labels[i].item()}')
        plt.axis('off')
    plt.show()


In [8]:

# Convert augmented data into a DataLoader-friendly format
class AugmentedDataset(Dataset):
    def __init__(self, images, labels):
        self.images = images
        self.labels = labels

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        return self.images[idx], self.labels[idx]

# Augment the training dataset by creating additional copies with transformations
def augment_dataset(original_dataset, num_augmentations):
    augmented_images = []
    augmented_labels = []
    for img, label in original_dataset:
        for _ in range(num_augmentations):
            augmented_img = train_transform(img)
            augmented_images.append(augmented_img)
            augmented_labels.append(label)
    return augmented_images, augmented_labels

In [9]:
# Define transformations for training and validation datasets
train_transform = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    transforms.RandomRotation(20),  # Increased rotation range
    transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4, hue=0.2),
    transforms.RandomAffine(degrees=0, translate=(0.1, 0.1), scale=(0.8, 1.2), shear=15),
    transforms.RandomPerspective(distortion_scale=0.5, p=0.5),
    transforms.GaussianBlur(kernel_size=(5, 9), sigma=(0.1, 5)),
    transforms.RandomGrayscale(p=0.25),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Define transformations without augmentation for the validation and test sets
val_test_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Define split sizes
train_size = int(0.75 * len(dataset))
val_size = int(0.125 * len(dataset))
test_size = len(dataset) - train_size - val_size

# Split the dataset into training, validation, and test sets
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

# Apply augmentations to create additional data points
num_augmentations = 4  # Number of augmented versions per original image
augmented_images, augmented_labels = augment_dataset(train_dataset, num_augmentations)

# Create an augmented training dataset
augmented_train_dataset = AugmentedDataset(augmented_images, augmented_labels)

# Combine original and augmented datasets
combined_train_dataset = torch.utils.data.ConcatDataset([train_dataset, augmented_train_dataset])

# # # Get a batch of images and labels from the data loader
# images, labels = next(iter(combined_train_dataset))
# show_images(images, labels, n=8)

train_dataset.dataset.transform = train_transform
val_dataset.dataset.transform = val_test_transform
test_dataset.dataset.transform = val_test_transform

# Create data loaders
train_loader = DataLoader(combined_train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Load pre-trained ConvNeXt model
model = convnext_tiny(pretrained=True)

# Modify the classifier to output a single continuous value for regression
model.classifier[2] = nn.Linear(model.classifier[2].in_features, 1)

# Loss function and optimizer for regression
criterion = nn.HuberLoss()
optimizer = optim.AdamW(model.parameters(), lr=0.0001)

Downloading: "https://download.pytorch.org/models/convnext_tiny-983f1562.pth" to /root/.cache/torch/hub/checkpoints/convnext_tiny-983f1562.pth
100%|██████████| 109M/109M [00:00<00:00, 190MB/s] 


In [10]:
# Fine-tuning function for regression
def fine_tune_regression(model, train_loader, val_loader, criterion, optimizer, num_epochs=10):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device).float().unsqueeze(1)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}')

        # Evaluate the model on validation data
        model.eval()
        total_loss = 0.0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device).float().unsqueeze(1)
                outputs = model(inputs)
                loss = criterion(outputs, labels)
                total_loss += loss.item()

        print(f'Validation Loss: {total_loss/len(val_loader)}')

    return model


# Test the model
def test_model(model, test_loader, criterion):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)
    model.eval()
    test_loss = 0.0
    test_corr = 0.0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device).float().unsqueeze(1)
            outputs = model(inputs)
            # calclate correlation coefficient
            corr = torch.corrcoef(torch.cat((outputs, labels), dim=1).T)[0, 1]
            test_corr += corr.item()
            loss = criterion(outputs, labels)
            test_loss += loss.item()
    print(f'Test Loss: {test_loss/len(test_loader)}')
    print(f'Test Correlation: {test_corr/len(test_loader)}')
    return model

In [11]:
# 相関係数をlossにする
# Fine-tune the model for regression
model = fine_tune_regression(model, train_loader, val_loader, criterion, optimizer, num_epochs=20)

# Save the fine-tuned model
torch.save(model.state_dict(), '/content/drive/My Drive/foodReward/model/convnext_tiny_regression.pth')

Epoch 1/20, Loss: 0.45674435467947094
Validation Loss: 0.25228212773799896
Epoch 2/20, Loss: 0.24557117706253415
Validation Loss: 0.22051187604665756
Epoch 3/20, Loss: 0.1705087797272773
Validation Loss: 0.1808598805218935
Epoch 4/20, Loss: 0.10241540170141629
Validation Loss: 0.18433713912963867
Epoch 5/20, Loss: 0.05634997845405624
Validation Loss: 0.16611464321613312
Epoch 6/20, Loss: 0.03562978090984481
Validation Loss: 0.16220567747950554
Epoch 7/20, Loss: 0.024504553251678034
Validation Loss: 0.164857754483819
Epoch 8/20, Loss: 0.019590637345044386
Validation Loss: 0.1665643583983183
Epoch 9/20, Loss: 0.018157830812214386
Validation Loss: 0.1628615316003561
Epoch 10/20, Loss: 0.017502474940071504
Validation Loss: 0.1635803859680891
Epoch 11/20, Loss: 0.015684071422687598
Validation Loss: 0.17144652642309666
Epoch 12/20, Loss: 0.014770094127882095
Validation Loss: 0.16942264139652252
Epoch 13/20, Loss: 0.013986136821941251
Validation Loss: 0.15953294932842255
Epoch 14/20, Loss: 0.

In [12]:
# Evaluate the model on the test set
model = test_model(model, test_loader, criterion)

Test Loss: 0.1651686578989029
Test Correlation: 0.6612539812922478
