In [None]:
import torch
from torchvision import datasets, transforms
import torch.nn as nn
from PIL import Image
from sklearn.model_selection import train_test_split
import pandas as pd
import os
import torch.nn.functional as F
import numpy as np
from google.colab import drive
import matplotlib.pyplot as plt

seed = 42

np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.benchmark = False
torch.backends.cudnn.deterministic = True

In [None]:
drive.mount('/content/drive')

In [None]:
# !unzip /content/drive/MyDrive/art_dataset.zip -d /content/drive/MyDrive/cs342fp

In [None]:
%cd /content/drive/MyDrive/cs342fp

In [None]:
%pwd

In [None]:
parent_dir = "kaggle/working/"

# Define valid_genres list
valid_genres = [
    "Art Nouveau Modern",
    "Baroque",
    "Color Field Painting",
    "Cubism",
    "Early Renaissance",
    "Expressionism",
    "High Renaissance",
    "Impressionism",
    "Mannerism Late Renaissance",
    "Minimalism",
    "Naive Art Primitivism",
    "Northern Renaissance",
    "Realism",
    "Rococo",
    "Romanticism",
    "Ukiyo e"
]

# Read the CSV file into a DataFrame
df = pd.read_csv(parent_dir + 'classes.csv')

# Drop rows with genre_count greater than 1
df = df[df['genre_count'] <= 1]

# Drop columns except 'filename', 'genre', 'subset'
df = df[['filename', 'genre', 'subset']]


# Drop rows with genres not in the valid_genres list
df = df[df['genre'].apply(lambda x: x[2:-2] in valid_genres)]
print(len(df['genre'].unique()))

# Reset index after dropping rows
df.reset_index(drop=True, inplace=True)

genre_counts = df['genre'].value_counts()
print(genre_counts)
print(min(genre_counts))

df = df.groupby('genre', group_keys=False).apply(lambda x: x.sample(min(len(x), min(genre_counts))))

# Reset index after dropping rows
df.reset_index(drop=True, inplace=True)

# Replace genre values with their indices in the valid_genres list
df['genre'] = df['genre'].apply(lambda x: valid_genres.index(x[1:-1].split(',')[0].strip()[1:-1]))

df = df[df['filename'].apply(lambda x: os.path.exists(parent_dir + x))]

genre_counts = df['genre'].value_counts()
print(genre_counts)
print(min(genre_counts))

In [None]:
img_max = 512

In [None]:
class SimpleGatedAttention(nn.Module):
    def __init__(self, channel_size):
        super(SimpleGatedAttention, self).__init__()
        self.attention_weights = nn.Sequential(
            nn.Conv2d(channel_size * 2, channel_size, kernel_size=1),
            nn.Sigmoid()
        )

    def forward(self, x1, x2):
        g = self.attention_weights(torch.cat((x1, x2), dim=1))
        return x1 * g + x2 * (1 - g)

class CNNArtModel(nn.Module):
    def __init__(self, num_classes=16):
        super(CNNArtModel, self).__init__()
        # Path 1 for color usage
        self.path1_conv1 = nn.Conv2d(3, 16, kernel_size=6, padding=1)
        self.path1_pool1 = nn.AvgPool2d(kernel_size=2, stride=2)
        self.path1_conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.path1_pool2 = nn.AvgPool2d(kernel_size=2, stride=2)
        self.path1_conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.path1_pool3 = nn.AvgPool2d(kernel_size=2, stride=2)

        # Path 2 for textural qualities
        self.path2_conv1 = nn.Conv2d(3, 16, kernel_size=6, padding=1)
        self.path2_pool1 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.path2_conv2 = nn.Conv2d(16, 32, kernel_size=3, padding=1)
        self.path2_pool2 = nn.MaxPool2d(kernel_size=2, stride=2)
        self.path2_conv3 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.path2_pool3 = nn.MaxPool2d(kernel_size=2, stride=2)

        # Gated attention to merge paths
        self.gated_attention = SimpleGatedAttention(64)

        # Global average pooling and final classifier
        self.global_avg_pool = nn.AdaptiveAvgPool2d((1, 1))
        self.classifier = nn.Linear(64, num_classes)

    def forward(self, x):
        # Path 1
        x1 = F.relu(self.path1_conv1(x))
        x1 = self.path1_pool1(x1)
        x1 = F.relu(self.path1_conv2(x1))
        x1 = self.path1_pool2(x1)
        x1 = F.relu(self.path1_conv3(x1))
        x1 = self.path1_pool3(x1)

        # Path 2
        x2 = F.relu(self.path2_conv1(x))
        x2 = self.path2_pool1(x2)
        x2 = F.relu(self.path2_conv2(x2))
        x2 = self.path2_pool2(x2)
        x2 = F.relu(self.path2_conv3(x2))
        x2 = self.path2_pool3(x2)

        # Merge paths with gated attention
        x = self.gated_attention(x1, x2)

        # Global average pooling and classification
        x = self.global_avg_pool(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x

# Instantiate the model
print(CNNArtModel(16))

In [None]:
# Split the dataframe into train and test subsets
train_df = df[df['subset'] == 'train']
test_df = df[df['subset'] == 'test']

class SquarePad:
    def __call__(self, image):

        padded_tensor = torch.tensor(np.array(image), dtype=torch.float).permute(2,0,1)

        while padded_tensor.shape != (3,img_max,img_max):
            h = padded_tensor.shape[1]
            w = padded_tensor.shape[2]

            max_wh = max([w, h])
            hp = int((max_wh - w) // 2)
            vp = int((max_wh - h) // 2)
            padding = (hp, hp, vp, vp)
            if hp * 2 + w < img_max:
                padding = (hp, hp + 1, vp, vp)
            if vp * 2 + h < img_max:
                padding = (hp, hp, vp, vp + 1)

            padding = (min(padding[0], w-1),min(padding[1], w-1),min(padding[2], h-1),min(padding[3], h-1))
            padded_tensor = F.pad(padded_tensor,padding, mode='reflect')

        # Display the image after each padding iteration
        # image_np = padded_tensor.permute(1, 2, 0).numpy()
        # Display the image
        # plt.imshow(image_np/255)
        # plt.show()

        assert(padded_tensor.shape == (3,img_max,img_max))
        return padded_tensor

def resize_larger_dimension(image, size):
    width, height = image.size
    aspect_ratio = width / height
    if width > height:
        new_width = size
        new_height = int(size / aspect_ratio)
    else:
        new_width = int(size * aspect_ratio)
        new_height = size

    return image.resize((new_width, new_height))

transform = transforms.Compose([
    transforms.Lambda(lambda x: resize_larger_dimension(x, img_max)),  # Resize the larger dimension to img_max while preserving aspect ratio
    SquarePad(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize the image
])

# Define a custom dataset class
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, dataframe, transform=None):
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        filename = parent_dir + self.dataframe.iloc[idx]['filename']
        # print(filename)
        image = Image.open(filename).convert('RGB')
        label = self.dataframe.iloc[idx]['genre']

        if self.transform:
            w, h = image.size
            image = self.transform(image)

        return image, label

# Create custom datasets for train and test
train_dataset = CustomDataset(train_df, transform=transform)
test_dataset = CustomDataset(test_df, transform=transform)

# Create train and test dataloaders
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=128, shuffle=False)

In [None]:
model = CNNArtModel(16)

def test(model):
    correct = 0
    total = 0
    true, pred = [], []
    with torch.no_grad():
        for inputs, labels  in test_loader:
            outputs = model.forward(inputs)
            predicted = torch.argmax(outputs, dim=1) # get predicted class label for each test example.
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            true.append(labels)
            pred.append(predicted)
    acc = (100 * correct / total)
    print('accuracy: %0.3f' % (acc))
    true = np.concatenate(true)
    pred = np.concatenate(pred)
    return acc, true, pred

# Modify the train_one_epoch function to move inputs and labels to GPU
def train_one_epoch(model, train_loader, test_loader, optimizer, criterion):
    total_loss = 0
    count = 0
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        count += 1
        print(f'Train loss after iteration {count}: {total_loss/count}')

    print('{:>12s} {:>7.5f}'.format('Train loss:', total_loss/count))

    correct = 0
    total = 0
    true, pred = [], []
    with torch.no_grad():
        for inputs, labels  in test_loader:
            outputs = model.forward(inputs)
            print(outputs.shape)
            predicted = torch.argmax(outputs, dim=1) # get predicted class label for each test example.
            print(predicted.shape)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            true.append(labels)
            pred.append(predicted)
    acc = (100 * correct / total)
    print('accuracy: %0.3f' % (acc))
    print()

# Example of using the train_loader and val_loader in a training loop
nepoch = 5
optimizer = torch.optim.Adam(model.parameters(), lr=0.0005)
criterion = nn.CrossEntropyLoss()
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.1)

for epoch in range(nepoch):
    train_one_epoch(model, train_loader, test_loader, optimizer, criterion)
    scheduler.step()
    torch.save(model, f'art_model_{epoch}.pt')