In [3]:
import pandas as pd
import os
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from pathlib import Path

# set up the path
data_path = '/home/ubuntu/landscape-aesthetics/data/external/scenicornot/scenicornot.metadata.csv'
image_folder = Path('/home/ubuntu/landscape-aesthetics/data/external/scenicornot') 

# Check if the data path exists
if not os.path.exists(data_path):
    raise FileNotFoundError(f"Data file not found at {data_path}")

# Read the data
data = pd.read_csv(data_path)

# Check if the root directory exists
root_dir = '/home/ubuntu/landscape-aesthetics/data/external/scenicornot/'
if not os.path.exists(root_dir):
    raise FileNotFoundError(f"Root directory not found at {root_dir}")

# prepocess the dataset, remove the invalid paths
valid_files = []
missing_files = []
for idx in range(len(data)):
    img_name = data.iloc[idx]['filename']
    image_path = image_folder / Path(img_name)
    if image_path.exists():
        valid_files.append(idx)
    else:
        missing_files.append(image_path)

if missing_files:
    print(f"Missing {len(missing_files)} files:")
    for file in missing_files:
        print(file)

# only keep the valid file record
valid_data = data.iloc[valid_files].reset_index(drop=True)

# # for test 10 images
# sampled_data = valid_data.sample(n=100, random_state=42).reset_index(drop=True)
# print("Sampled filenames:")
# print(sampled_data['filename'])
        
class ScenicDataset(Dataset):
    def __init__(self, data_frame, root_dir, transform=None):
        self.data_frame = data_frame
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        img_name = os.path.join(self.root_dir, self.data_frame.iloc[idx]['filename']) # combine root address with filename
        image_path = self.root_dir / Path(img_name)
        # print(f"Loading image: {image_path}")  # tune the information and print the file path
        
        try:
            image = Image.open(image_path).convert('RGB')
        except FileNotFoundError:
            # print(f"File not found: {image_path}, skipping.")
            # return None, None
            return self.__getitem__((idx + 1) % len(self)) # return the next sample
        except Exception as e:
            # print(f"Error loading image {image_path}: {e}")
            # return None, None
            return self.__getitem__((idx + 1) % len(self)) # return the next sample
            
        rating = self.data_frame.iloc[idx]['average']

        if self.transform:
            image = self.transform(image)

        return image, torch.tensor(rating, dtype=torch.float32)

# centered crop
data_transforms = transforms.Compose([
    # transforms.Resize((256, 256)),
    transforms.CenterCrop(256),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.2)
])

image_dataset = ScenicDataset(data_frame=data,
                              root_dir='/home/ubuntu/landscape-aesthetics/data/external/scenicornot/',
                              transform=data_transforms)

train_size = int(0.8 * len(image_dataset))
val_size = len(image_dataset) - train_size
train_dataset, val_dataset = torch.utils.data.random_split(image_dataset, [train_size, val_size])

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

print(f"Training set size: {len(train_loader.dataset)}")
print(f"Validation set size: {len(val_loader.dataset)}")

Missing 1 files:
/home/ubuntu/landscape-aesthetics/data/external/scenicornot/photos/76/41/764143_e860c8c4.jpg
Training set size: 169484
Validation set size: 42372


In [None]:
# check for validity of data loader

# for data in train_loader:
#     if data is None:
#         print("Found None in training data")
# print("Ok for training data")
# for data in val_loader:
#     if data is None:
#         print("Found None in validation data")
# print("Ok for validation data")

In [None]:
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, models, transforms

# utilise ResNet50 to extract deep features
class ResNetFeatureExtractor(nn.Module):
    def __init__(self):
        super(ResNetFeatureExtractor, self).__init__()
        self.resnet = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)
        self.resnet = nn.Sequential(*list(self.resnet.children())[:-1])  # Remove the last fully connected layer

    def forward(self, x):
        with torch.no_grad():
            features = self.resnet(x)
        return features.squeeze()

feature_extractor = ResNetFeatureExtractor()
feature_extractor.eval()

# define a simple regression model
class RegressionModel(nn.Module):
    def __init__(self, input_dim):
        super(RegressionModel, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(256, 1)
        )

    def forward(self, x):
        return self.fc(x)

# a more complex one
# class RegressionModel(nn.Module):
#     def __init__(self, input_dim):
#         super(ComplexRegressionModel, self).__init__()
#         self.fc = nn.Sequential(
#             nn.Linear(input_dim, 512),
#             nn.ReLU(),
#             nn.BatchNorm1d(512),
#             nn.Dropout(0.5),
#             nn.Linear(512, 256),
#             nn.ReLU(),
#             nn.BatchNorm1d(256),
#             nn.Dropout(0.5),
#             nn.Linear(256, 128),
#             nn.ReLU(),
#             nn.BatchNorm1d(128),
#             nn.Dropout(0.5),
#             nn.Linear(128, 1)
#         )

#     def forward(self, x):
#         return self.fc(x)

In [None]:
def train_model(feature_extractor, model, criterion, optimizer, train_loader, val_loader, num_epochs=10):
    for epoch in range(num_epochs):
        model.train()
        train_losses = []
        val_losses = []

        for images, labels in train_loader:
            if images is None:  # skip invalid images
                continue
            features = feature_extractor(images)
            optimizer.zero_grad()
            outputs = model(features)
            loss = criterion(outputs.squeeze(), labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item() * images.size(0)

        epoch_loss = running_loss / len(train_loader.dataset)
        train_losses.append(epoch_loss)
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss}')

        # Evaluate model
        model.eval()
        val_loss = 0.0
        with torch.no_grad():
            for images, labels in val_loader:
                if images is None:  # jump invalid images
                    continue
                features = feature_extractor(images)
                outputs = model(features)
                loss = criterion(outputs.squeeze(), labels)
                val_loss += loss.item() * images.size(0)

        val_loss /= len(val_loader.dataset)
        val_losses.append(val_loss)
        print(f'Validation Loss: {val_loss}')

    return train_losses, val_losses



input_dim = 2048  # ResNet50 feature dimension
model = RegressionModel(input_dim)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# training model
train_losses, val_losses = train_model(feature_extractor, model, criterion, optimizer, train_loader, val_loader, num_epochs=10)

In [None]:
plt.figure()
plt.plot(range(10), train_losses, label='Training Loss')
plt.plot(range(10), val_losses, label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()

In [1]:
# evaluate
def evaluate_model(feature_extractor, model, test_loader):
    model.eval()
    test_loss = 0.0
    with torch.no_grad():
        for images, labels in test_loader:
            features = feature_extractor(images)
            outputs = model(features)
            loss = criterion(outputs.squeeze(), labels)
            test_loss += loss.item() * images.size(0)

    test_loss /= len(test_loader.dataset)
    mae = mean_absolute_error(val_labels, val_predictions)
    r2 = r2_score(val_labels, val_predictions)
    print(f'Test Loss: {test_loss}')
    print(f'Validation MAE: {mae}')
    print(f'Validation R²: {r2}')
    return test_loss, mae, r2

test_loss, mae, r2 = evaluate_model(feature_extractor, model, val_loader)

# # save the model
# torch.save(model.state_dict(), 'complex_regression_model.pth')
# print("Model saved to complex_regression_model.pth")
