In [1]:
import torch
import numpy as np
torch.cuda.get_device_name()

'NVIDIA GeForce RTX 3060 Laptop GPU'

In [2]:
torch.cuda.is_available()

True

# Image assigning

In [3]:
import os
import uuid
from tqdm.notebook import tqdm

DATA_DIR = 'Images'

data_class2id_map = {"Classic": 0, "Modern": 1, "Soviet": 2 }
data_id2class_map = {0: "Classic", 1: "Modern", 2: "Soviet"}

In [4]:
for label_path in os.listdir(DATA_DIR):
    if 'train' in  label_path or 'test' in label_path:
        continue
        
    print(label_path)
    label_id = data_class2id_map[label_path]
    image_type_path = os.path.join(DATA_DIR,label_path)
    for image_path in os.listdir(image_type_path):
        image_id = uuid.uuid4().int
        image_name = f"{image_id}_{label_id}.jpg"
        os.rename(os.path.join(image_type_path, image_path), os.path.join(image_type_path,image_name))
    print("_______________________")


Classic
_______________________
Modern
_______________________
Soviet
_______________________


In [5]:
os.path.join(DATA_DIR, 'train')


'Images\\train'

In [6]:
# os.makedirs(os.path.join(DATA_DIR, 'train'), exist_ok=True)
# os.makedirs(os.path.join(DATA_DIR, 'test'), exist_ok=True)

[os.makedirs(os.path.join(DATA_DIR, 'train', folder), exist_ok=True) for \
 folder in list(data_class2id_map.keys())]

[os.makedirs(os.path.join(DATA_DIR, 'test', folder), exist_ok=True) for \
 folder in list(data_class2id_map.keys())]

[None, None, None]

In [7]:
split_ratio = 0.1
testing_paths = []
training_paths = []

for label_path in os.listdir(DATA_DIR):
    if 'train' in  label_path or 'test' in label_path:
        continue
    else:
        print(label_path)
        
        sub_path = os.path.join(DATA_DIR, label_path)
        
        label_id = data_class2id_map[label_path]
        image_type_path = os.path.join(DATA_DIR,label_path)
        files =  os.listdir(image_type_path)
        num_images = len(files)
        test_images = list(np.random.choice(files, size=int(num_images*split_ratio), replace=False))
        train_images = list(set(files) - set(test_images))

        assert sorted(train_images + test_images) == sorted(files)
        
        testing_paths += [os.path.join(sub_path, t_i) for t_i in test_images]
        training_paths += [os.path.join(sub_path, t_i) for t_i in train_images]
    


Classic
Modern
Soviet


In [8]:
training_paths

['Images\\Classic\\255544330452439562989672129852919410354_0.jpg',
 'Images\\Classic\\338535707135202624929950440699785428028_0.jpg',
 'Images\\Classic\\184439146238878862816570300395195491390_0.jpg',
 'Images\\Classic\\49828480862642322331870647240833131915_0.jpg',
 'Images\\Classic\\18449471709679300563827624902644967132_0.jpg',
 'Images\\Classic\\330549538701655674635472979142397910022_0.jpg',
 'Images\\Classic\\321006798778387740544992994532564608832_0.jpg',
 'Images\\Classic\\237879402739436274199279759452709042728_0.jpg',
 'Images\\Classic\\152123482479402919544422840030829390368_0.jpg',
 'Images\\Classic\\81728332479680054921153204900040668847_0.jpg',
 'Images\\Classic\\244550273952312124856564411256054240392_0.jpg',
 'Images\\Classic\\243031332650110769401591779706839048932_0.jpg',
 'Images\\Classic\\164882053139644828348451784927518659454_0.jpg',
 'Images\\Classic\\221722630344525691119727171575240990560_0.jpg',
 'Images\\Classic\\124799694547932158787519655212524363196_0.jpg'

In [9]:
os.path.basename(training_paths[-1])

'5612298313566516686588811578598226614_2.jpg'

In [10]:
import shutil
from typing import List, NoReturn

def train_test_split_paths(data_paths: List = [], split_type:str = "train") -> NoReturn:
    for data_path in data_paths:
        output_path = os.path.join(DATA_DIR, split_type)
        data_label_folder = data_id2class_map[int(data_path.split('_')[-1][0])]
        output_path = os.path.join(output_path, data_label_folder)
        output_path = os.path.join(output_path, os.path.basename(data_path))
        
        shutil.copyfile(data_path, output_path)
        
train_test_split_paths(training_paths, 'train')
train_test_split_paths(testing_paths, 'test')

In [11]:
from collections import Counter

labels_train = [i.split('_')[-1] for i in training_paths]
labels_test = [i.split('_')[-1] for i in testing_paths]

Counter(labels_test)

Counter({'0.jpg': 8, '1.jpg': 8, '2.jpg': 8})

# Torch Data pipe 

In [12]:
import glob
import os.path as osp
import random
import numpy as np
import json
from PIL import Image
from tqdm import tqdm
import matplotlib.pyplot as plt
%matplotlib inline

import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import torchvision
from torchvision import models, transforms
import time


from __future__ import print_function
from __future__ import division
import numpy as np
from torchvision import datasets, models, transforms
import matplotlib.pyplot as plt
import time
import os
import copy
print("PyTorch Version: ",torch.__version__)
print("Torchvision Version: ",torchvision.__version__)


PyTorch Version:  1.11.0
Torchvision Version:  0.12.0


In [13]:
# Top level data directory. Here we assume the format of the directory conforms
#   to the ImageFolder structure
data_dir = DATA_DIR

# Models to choose from [resnet, alexnet, vgg, squeezenet, densenet, inception]
model_name = "squeezenet"

# Number of classes in the dataset
num_classes = 3

# Batch size for training (change depending on how much memory you have)
batch_size = 8

# Number of epochs to train for
num_epochs = 15

# Flag for feature extracting. When False, we finetune the whole model,
#   when True we only update the reshaped layer params
feature_extract = True

In [14]:
def train_model(model, dataloaders, criterion, optimizer, num_epochs=25, is_inception=False):
    since = time.time()

    val_acc_history = []

    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'test']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    # Get model outputs and calculate loss
                    # Special case for inception because in training it has an auxiliary output. In train
                    #   mode we calculate the loss by summing the final output and the auxiliary output
                    #   but in testing we only consider the final output.
                    if is_inception and phase == 'train':
                        # From https://discuss.pytorch.org/t/how-to-optimize-inception-model-with-auxiliary-classifiers/7958
                        outputs, aux_outputs = model(inputs)
                        loss1 = criterion(outputs, labels)
                        loss2 = criterion(aux_outputs, labels)
                        loss = loss1 + 0.4*loss2
                    else:
                        outputs = model(inputs)
                        loss = criterion(outputs, labels)

                    _, preds = torch.max(outputs, 1)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

            # deep copy the model
            if phase == 'test' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
            if phase == 'test':
                val_acc_history.append(epoch_acc)

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model, val_acc_history

In [15]:
def set_parameter_requires_grad(model, feature_extracting):
    if feature_extracting:
        for param in model.parameters():
            param.requires_grad = False

In [16]:
def initialize_model(model_name, num_classes, feature_extract, use_pretrained=True):
    # Initialize these variables which will be set in this if statement. Each of these
    #   variables is model specific.
    model_ft = None
    input_size = 0

    if model_name == "resnet":
        """ Resnet18
        """
        model_ft = models.resnet18(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.fc.in_features
        model_ft.fc = nn.Linear(num_ftrs, num_classes)
        input_size = 224

    elif model_name == "alexnet":
        """ Alexnet
        """
        model_ft = models.alexnet(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.classifier[6].in_features
        model_ft.classifier[6] = nn.Linear(num_ftrs,num_classes)
        input_size = 224

    elif model_name == "vgg":
        """ VGG11_bn
        """
        model_ft = models.vgg11_bn(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.classifier[6].in_features
        model_ft.classifier[6] = nn.Linear(num_ftrs,num_classes)
        input_size = 224

    elif model_name == "squeezenet":
        """ Squeezenet
        """
        model_ft = models.squeezenet1_0(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        model_ft.classifier[1] = nn.Conv2d(512, num_classes, kernel_size=(1,1), stride=(1,1))
        model_ft.num_classes = num_classes
        input_size = 224

    elif model_name == "densenet":
        """ Densenet
        """
        model_ft = models.densenet121(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        num_ftrs = model_ft.classifier.in_features
        model_ft.classifier = nn.Linear(num_ftrs, num_classes)
        input_size = 224

    elif model_name == "inception":
        """ Inception v3
        Be careful, expects (299,299) sized images and has auxiliary output
        """
        model_ft = models.inception_v3(pretrained=use_pretrained)
        set_parameter_requires_grad(model_ft, feature_extract)
        # Handle the auxilary net
        num_ftrs = model_ft.AuxLogits.fc.in_features
        model_ft.AuxLogits.fc = nn.Linear(num_ftrs, num_classes)
        # Handle the primary net
        num_ftrs = model_ft.fc.in_features
        model_ft.fc = nn.Linear(num_ftrs,num_classes)
        input_size = 299

    else:
        print("Invalid model name, exiting...")
        exit()

    return model_ft, input_size

# Initialize the model for this run
model_ft, input_size = initialize_model(model_name, num_classes, feature_extract, use_pretrained=True)

# Print the model we just instantiated
print(model_ft)

SqueezeNet(
  (features): Sequential(
    (0): Conv2d(3, 96, kernel_size=(7, 7), stride=(2, 2))
    (1): ReLU(inplace=True)
    (2): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=True)
    (3): Fire(
      (squeeze): Conv2d(96, 16, kernel_size=(1, 1), stride=(1, 1))
      (squeeze_activation): ReLU(inplace=True)
      (expand1x1): Conv2d(16, 64, kernel_size=(1, 1), stride=(1, 1))
      (expand1x1_activation): ReLU(inplace=True)
      (expand3x3): Conv2d(16, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (expand3x3_activation): ReLU(inplace=True)
    )
    (4): Fire(
      (squeeze): Conv2d(128, 16, kernel_size=(1, 1), stride=(1, 1))
      (squeeze_activation): ReLU(inplace=True)
      (expand1x1): Conv2d(16, 64, kernel_size=(1, 1), stride=(1, 1))
      (expand1x1_activation): ReLU(inplace=True)
      (expand3x3): Conv2d(16, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (expand3x3_activation): ReLU(inplace=True)
    )
    (5): Fire(
   

In [17]:
data_transforms = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(input_size),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize(input_size),
        transforms.CenterCrop(input_size),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

print("Initializing Datasets and Dataloaders...")

# Create training and validation datasets
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x), data_transforms[x]) for x in ['train', 'test']}
# Create training and validation dataloaders
dataloaders_dict = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size, shuffle=True, num_workers=4) for x in ['train', 'test']}

# Detect if we have a GPU available
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

Initializing Datasets and Dataloaders...


In [18]:
# Send the model to GPU
model_ft = model_ft.to(device)

# Gather the parameters to be optimized/updated in this run. If we are
#  finetuning we will be updating all parameters. However, if we are
#  doing feature extract method, we will only update the parameters
#  that we have just initialized, i.e. the parameters with requires_grad
#  is True.
params_to_update = model_ft.parameters()
print("Params to learn:")
if feature_extract:
    params_to_update = []
    for name,param in model_ft.named_parameters():
        if param.requires_grad == True:
            params_to_update.append(param)
            print("\t",name)
else:
    for name,param in model_ft.named_parameters():
        if param.requires_grad == True:
            print("\t",name)

# Observe that all parameters are being optimized
optimizer_ft = optim.Adam(params_to_update, lr=0.001)

Params to learn:
	 classifier.1.weight
	 classifier.1.bias


In [19]:
criterion = nn.CrossEntropyLoss()

# Train and evaluate
model_ft, hist = train_model(model_ft, dataloaders_dict, criterion, optimizer_ft, num_epochs=num_epochs, is_inception=(model_name=="inception"))

Epoch 0/14
----------




train Loss: 1.0011 Acc: 0.5198
test Loss: 0.8243 Acc: 0.6944

Epoch 1/14
----------
train Loss: 0.7124 Acc: 0.7048
test Loss: 0.6479 Acc: 0.7500

Epoch 2/14
----------
train Loss: 0.5731 Acc: 0.7812
test Loss: 0.5384 Acc: 0.8056

Epoch 3/14
----------
train Loss: 0.5370 Acc: 0.7974
test Loss: 0.6093 Acc: 0.7778

Epoch 4/14
----------
train Loss: 0.4457 Acc: 0.8399
test Loss: 0.4952 Acc: 0.8472

Epoch 5/14
----------
train Loss: 0.4094 Acc: 0.8370
test Loss: 0.3899 Acc: 0.8750

Epoch 6/14
----------
train Loss: 0.3872 Acc: 0.8664
test Loss: 0.4425 Acc: 0.8472

Epoch 7/14
----------
train Loss: 0.4119 Acc: 0.8341
test Loss: 0.3351 Acc: 0.8194

Epoch 8/14
----------
train Loss: 0.3958 Acc: 0.8620
test Loss: 0.2936 Acc: 0.8472

Epoch 9/14
----------
train Loss: 0.3944 Acc: 0.8443
test Loss: 0.2520 Acc: 0.9167

Epoch 10/14
----------
train Loss: 0.3445 Acc: 0.8737
test Loss: 0.2742 Acc: 0.9167

Epoch 11/14
----------
train Loss: 0.3708 Acc: 0.8634
test Loss: 0.2616 Acc: 0.9167

Epoch 12/14
