In [1]:
!gdown --id 1zre8_TMtUeHZkQKcOTTueNcAqY_aXJFU
!mkdir training
!tar -xzf resized_dataset.tar.gz -C training

Downloading...
From: https://drive.google.com/uc?id=1zre8_TMtUeHZkQKcOTTueNcAqY_aXJFU
To: /content/resized_dataset.tar.gz
100% 274M/274M [00:03<00:00, 77.1MB/s]


In [2]:
# change num_countries to the number you want (maximum is 19)
num_countries = 19

# dictionaries to be populated with pairing from numbers to the label they correspond to i.e 0: "Canada", 1: "Russia", etc.
label_to_index = {}
index_to_label = {}

In [4]:
#load in dataset
# from google.colab import drive
# drive.mount('/content/drive')
base_dir = '/content/training'

import os
import glob
from PIL import Image
from PIL import ImageFile
ImageFile.LOAD_TRUNCATED_IMAGES = True

# Function to load images and labels from your folder structure
def load_images_and_labels(base_dir, num_countries, max_per_country = 1500):
    images = []
    labels = []
    
    # get all the folders from the base directory or small subset if use_small is true
    folders = os.listdir(base_dir)[:num_countries]


    # Iterate over the folders 
    for index, folder in enumerate(folders):
        folder_path = os.path.join(base_dir, folder)
        
        # Check if it's a directory and has the expected structure
        if os.path.isdir(folder_path):
            image_files = glob.glob(os.path.join(folder_path, 'canvas_*.jpg'))
            print(f"loading { min(len(image_files), max_per_country) } images from {folder}")

            label = folder
            index_to_label[index] = label
            label_to_index[label] = index
            
            # Iterate over the image files inside the folder
            for i, image_file in enumerate(image_files):
                if i >= max_per_country:
                  break

                # Read the image using PIL
                image = Image.open(image_file)
                
                # Append the image and label to the corresponding lists
                #image = image.resize((256,256)) # already 256x256
                images.append(image) 
                labels.append(label)
    
    return images, labels

images_1500, labels_1500 = load_images_and_labels(base_dir, num_countries, max_per_country=1500)
images_500, labels_500 = load_images_and_labels(base_dir, num_countries, max_per_country=500)



loading 1500 images from Australia
loading 944 images from Thailand
loading 1500 images from France
loading 726 images from Sweden
loading 1500 images from Brazil
loading 863 images from Poland
loading 901 images from Mexico
loading 689 images from Argentina
loading 698 images from Germany
loading 1183 images from South Africa
loading 1500 images from Japan
loading 1500 images from Russia
loading 1049 images from Finland
loading 789 images from Italy
loading 1500 images from United States
loading 707 images from Singapore
loading 1500 images from United Kingdom
loading 1382 images from Canada
loading 1075 images from Spain
loading 500 images from Australia
loading 500 images from Thailand
loading 500 images from France
loading 500 images from Sweden
loading 500 images from Brazil
loading 500 images from Poland
loading 500 images from Mexico
loading 500 images from Argentina
loading 500 images from Germany
loading 500 images from South Africa
loading 500 images from Japan
loading 500 im

In [None]:
#import libraries
import os
import glob
import numpy as np
from PIL import Image
from tensorflow.keras.utils import to_categorical
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator, img_to_array
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import TimeDistributed, Conv2D, MaxPooling2D, Flatten, GRU, Dense
from typing import *
from collections import defaultdict
from PIL import Image
from IPython.display import display
from glob import glob
import re
import os
from random import shuffle
import time
from matplotlib import pyplot as plt
from matplotlib.collections import LineCollection
import xml.etree.ElementTree as ET
import numpy as np
from torch.nn import functional as F
 
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import ToTensor
from torchvision import transforms, models
import torch.optim as optim


In [None]:
# wrap the data in Dataset class for ease of use later
class ImageDataset(Dataset):
  def __init__(self, images, labels, transform=None, target_transform=None):
        self.images = images
        self.labels = labels
        self.transform = transform
        self.target_transform = None
       

  def __len__(self):
        return len(self.labels)

  def __getitem__(self, idx):
        label = self.labels[idx]
        image = self.images[idx]

        if self.transform:
          image = self.transform(image)
        if self.target_transform:
          label = self.target_transform(label)

        return image, label_to_index[label] # convert label to index 


# need this transform to make images the expected size 
  
transform = transforms.Compose([
                    transforms.Resize((224, 224)),
                    transforms.ToTensor(),
                    ])


dataset_1500 = ImageDataset(images_1500, labels_1500, transform=transform)
dataset_500 = ImageDataset(images_500, labels_500, transform=transform)


In [None]:
def make_data_loaders(dataset, batch_size=16, train_frac=0.8):

  length = len(dataset)

  # remainder of data split equally into test and validation sets
  test_frac = (1 - train_frac) / 2

  train_len = int(train_frac * length)
  test_len = int(test_frac * length)
  val_len = length - train_len - test_len

  train_set, val_set, test_set = torch.utils.data.random_split(dataset, [train_len, val_len, test_len])

  train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=2, prefetch_factor=8)
  val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=True, num_workers=2, prefetch_factor=8)
  test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=True, num_workers=2, prefetch_factor=8)
  
  return train_loader, val_loader, test_loader

train_loader_1500, val_loader_1500, test_loader_1500 = make_data_loaders(dataset_1500)
train_loader_500, val_loader_500, test_loader_500 = make_data_loaders(dataset_500)

In [None]:
# Load a pre-trained model (ResNet18)
#pretrained_cnn = models.resnet18(pretrained=True)

# Remove the fully connected layer from the pre-trained model
#pretrained_cnn = nn.Sequential(*list(pretrained_cnn.children())[:-1])

In [None]:
#Create the CNN+RNN model
from torchvision.models import mobilenet_v2

class CNN_RNN_Model(nn.Module):
    def __init__(self, num_classes):
        super(CNN_RNN_Model, self).__init__()
        self.cnn = mobilenet_v2(pretrained=True).features
        self.rnn = nn.GRU(1280, 64, batch_first=True)  # Note the change in input size to 1280
        self.fc = nn.Linear(64, num_classes)
        
    def forward(self, x):
        batch_size, c, h, w = x.size()
        x = self.cnn(x)
        x = F.adaptive_avg_pool2d(x, (1, 1)).view(batch_size, 1, -1)  # Add adaptive average pooling and reshape
        _, x = self.rnn(x)
        x = self.fc(x.squeeze(1))
        return x.view(batch_size, -1)

In [None]:
def get_model(num_classes):
    model = CNN_RNN_Model(num_classes=num_classes)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    # Freeze pre-trained layers to prevent updating the weights during training
    for param in model.cnn.parameters():
        param.requires_grad = False

    # Compile the model
    model = model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(list(model.rnn.parameters()) + list(model.fc.parameters()))

    return model, criterion, optimizer

In [None]:
from torch.cuda.amp import GradScaler, autocast
import time
import matplotlib.pyplot as plt

def train_and_evaluate(train_loader, val_loader, test_loader,num_classes):
    num_epochs = 1
    accuracies = []
    val_accuracies = []
    model, criterion, optimizer = get_model(num_classes)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    scaler = GradScaler()

    for epoch in range(num_epochs):
        # Training
        model.train()
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()
            
            with autocast():
                outputs = model(images)
                loss = criterion(outputs, labels)
            
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

        # Validation
        model.eval()
        correct = 0
        total = 0
        batch_count = 0
        batch_accuracies = []
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)

                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

                batch_accuracy = 100 * correct / total
                batch_accuracies.append(batch_accuracy)

        val_accuracies.extend(batch_accuracies)
        val_accuracy = sum(batch_accuracies) / len(batch_accuracies)
        accuracies.append(val_accuracy)

            # Testing
    test_correct = 0
    test_total = 0
    test_accuracies = []
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)

            _, predicted = torch.max(outputs.data, 1)
            test_total += labels.size(0)
            test_correct += (predicted == labels).sum().item()

            test_accuracy = 100 * test_correct / test_total
            test_accuracies.append(test_accuracy)
    
    return val_accuracies, test_accuracies






In [None]:
# Modify the function calls to include test_loader
val_accuracies_1500, test_accuracies_1500 = train_and_evaluate(train_loader_1500, val_loader_1500, test_loader_1500, num_classes=len(index_to_label))
val_accuracies_500, test_accuracies_500 = train_and_evaluate(train_loader_500, val_loader_500, test_loader_500, num_classes=len(index_to_label))


    

Downloading: "https://download.pytorch.org/models/mobilenet_v2-b0353104.pth" to /root/.cache/torch/hub/checkpoints/mobilenet_v2-b0353104.pth
100%|██████████| 13.6M/13.6M [00:00<00:00, 157MB/s]


In [None]:
# Plot the test accuracy for each dataset
plt.plot(range(len(test_accuracies_1500)), test_accuracies_1500, label="Max images = 1500")
plt.plot(range(len(test_accuracies_500)), test_accuracies_500, label="Max images = 500")
plt.xlabel("Batch")
plt.ylabel("Test Accuracy")
plt.title("Test Accuracy vs Batch for batch size of 16")
plt.legend()
plt.show()

AttributeError: ignored