In [43]:
import copy
import easydict

import torch
import torch.nn as nn
from torch.utils.data import Dataset, TensorDataset, Subset
import torchvision
import torchvision.transforms as transforms
import torchvision.models as models
import torch.optim as optim

from skimage.exposure import exposure
from skimage.feature import hog

import numpy as np
import matplotlib.pyplot as plt


# PreParing Data

In [44]:
# Data transforms
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.255]

data_transform = {
    'train': transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean, std),
    ]),
    'val': transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean, std),
    ])
}

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(f'Operations will be on your: {device}')

Operations will be on your: cuda:0


In [45]:
# Loading Datasets
datasets = {
    'train': torchvision.datasets.CIFAR10(root='./data', train=True, transform=data_transform['train'],
                                          download=True),
    'val': torchvision.datasets.CIFAR10(root='./data', train=False, transform=data_transform['val'],
                                        download=True)
}

# Defining class names
class_names = datasets['train'].classes
print(f'Class names are {class_names}')

# Creating DataLoaders
batch_size = 1
dataloaders = {x: torch.utils.data.DataLoader(dataset=datasets[x], batch_size=batch_size, shuffle=False, num_workers=2) for x in
               ['train', 'val']}

print('DataLoaders Are Ready.')

Files already downloaded and verified
Files already downloaded and verified
Class names are ['airplane', 'automobile', 'bird', 'cat', 'deer', 'dog', 'frog', 'horse', 'ship', 'truck']
DataLoaders Are Ready.


# Extracting HOG Features and Making New Datasets

In [None]:
import datetime

orientations = 8
pixels_per_cell = (8, 8)
cells_per_block = (1, 1)
block_norm = 'L2-Hys'

# Make HOG features dataset 
def extract_hog_features(image):
    hog_channels = []
    for channel in range(image.shape[0]):  
        # Extract the single channel
        single_channel = image[channel, :, :].cpu().numpy()

        # Extract HOG features for the single channel and rescale features 
        fd, hog_channel = hog(single_channel,
                              orientations=orientations,
                              pixels_per_cell=pixels_per_cell,
                              cells_per_block=cells_per_block,
                              visualize=True)
        hog_channel = exposure.rescale_intensity(hog_channel, in_range=(0, 10))
        hog_channels.append(hog_channel)

    # Stack the HOG images to form a 3-channel image
    hog_image = np.stack(hog_channels, axis=0)
    return torch.tensor(hog_image, dtype=torch.float32)

# Define HOG extracted datasets
hog_datasets = {
    'train': None,
    'val': None
}

# Set number of new dataset images to Subset from original dataset
datasets_length = 25000

# Iterating over two datasets
for phase in ['train', 'val']:
    # Set timer for extracting HOG
    s0 = datetime.datetime.now()
    print(f'start HOG extracting form {phase} images in: {s0}')

    # Define lists for hog features 
    hog_images = []
    labels = []
    
    # define counter for datasets length
    counter = 0
    
    # Iterating over batches and data for HOG feature extraction
    for image, label in dataloaders[phase]:
        # Extract HOG features for each image
        hog_image = extract_hog_features(image.squeeze())  # Squeeze in case the batch size is 1
        hog_images.append(hog_image)
        labels.append(label)
        
        # check counter for length
        counter += 1
        if counter > datasets_length:
            break

    # Convert lists of tensors to a single tensor   
    hog_dataset = torch.stack(hog_images)
    labels = torch.cat(labels)
        
    # Create a new dataset with HOG images and labels
    hog_datasets[phase] = TensorDataset(hog_dataset, labels)
    print(f'Extracting Done in: {datetime.datetime.now() - s0}')


print(len(hog_datasets['train']))
print(len(hog_datasets['val']))

start HOG extracting form train images in: 2024-02-02 12:42:52.100036


Exception ignored in: <function _MultiProcessingDataLoaderIter.__del__ at 0x00000228A839B5B0>
Traceback (most recent call last):
  File "C:\Users\Mahdiar\AppData\Local\Programs\Python\Python310\lib\site-packages\torch\utils\data\dataloader.py", line 1478, in __del__
    self._shutdown_workers()
  File "C:\Users\Mahdiar\AppData\Local\Programs\Python\Python310\lib\site-packages\torch\utils\data\dataloader.py", line 1436, in _shutdown_workers
    if self._persistent_workers or self._workers_status[worker_id]:
AttributeError: '_MultiProcessingDataLoaderIter' object has no attribute '_workers_status'


In [None]:
# Make new small datasets for comparing HOG vs Original images
indices = list(range(datasets_length))
for dataset in [datasets, hog_datasets]:
    for phase in ['train', 'val']:
        dataset[phase] = Subset(dataset[phase], indices)
    
# Create new small Dataloaders
batch_size = 32
original_dataloaders = {x: torch.utils.data.DataLoader(dataset=datasets[x], batch_size=batch_size, shuffle=True, num_workers=2) for x in
               ['train', 'val']}

hog_dataloaders = {x: torch.utils.data.DataLoader(dataset=hog_datasets[x], batch_size=batch_size, shuffle=True, num_workers=2) for x in
               ['train', 'val']}

In [None]:
print(len(datasets['train']))
print(len(datasets['val']))

print(len(hog_datasets['train']))
print(len(hog_datasets['val']))

# Setup pretrained model

In [None]:
# Load pretrained model
pretrained_model = models.resnet18(pretrained=True)

# Freeze all trainable layers
for param in pretrained_model.parameters():
    param.requires_grad = False

# Modifying last classification layer for our dataset
num_features = pretrained_model.fc.in_features
pretrained_model.fc = nn.Linear(num_features, 10)

# print(pretrained_model)

In [None]:
# Defining loss function and optimizer 
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(pretrained_model.fc.parameters(), lr=0.001)

# Train Function 

In [None]:
# train function 
def train_model(model, criterion, optimizer, model_dataloaders, model_datasets,  epoch_num=25):
    acc_list = easydict.EasyDict({'train': [], 'val': []})
    loss_list = easydict.EasyDict({'train': [], 'val': []})
    
    # Copy the best model weights for loading at the End
    best_model_wts = copy.deepcopy(model.state_dict())
    best_accuracy = 0.0

    # Iterating over epochs
    for epoch in range(1, epoch_num + 1):
        print(f'Epoch {epoch}/{epoch_num}:')

        # Each epoch has two phase Train and Validation
        for phase in ['train', 'val']:
            # Define starting time
            s0 = datetime.datetime.now()
            
            if phase == 'train':
                model.train()
            else:
                model.eval()

            # For calculating Loss and Accuracy at the end of epoch
            running_loss = 0.0
            running_corrects = 0.0

            # Iterating over batches and data for training and validation
            for idx, batch in enumerate(model_dataloaders[phase], 0):
                inputs, labels = batch

                # Transfer data and labels to CUDA if is available
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                optimizer.zero_grad()

                # Forward Pass
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)
                    
                    _, predictions = torch.max(outputs, 1)
                    
                    # Back Propagation and updating weights
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(predictions == labels.data)
            
            # Calculating Accuracy and Loss per phase
            epoch_loss = running_loss / len(model_datasets[phase])
            epoch_accuracy = running_corrects.double() / len(model_datasets[phase])
            
            # Show epoch details
            print(f'{phase.capitalize()} Accuracy: {epoch_accuracy:.4f} ||| Loss: {epoch_loss:.4f}', end=' ')
            
            # Calculate each phase duration time
            s1 = datetime.datetime.now()
            delta_time = s1 - s0
            print(f' --> duration: {delta_time}')
            
            # Copy the model weights if its better
            if phase == 'val' and epoch_accuracy > best_accuracy:
                best_accuracy = epoch_accuracy
                best_model_wts = copy.deepcopy(model.state_dict())  
                print('Best model weights updated!')
                
            # Save Loss and accuracy
            acc_list[phase].append(epoch_accuracy)
            loss_list[phase].append(epoch_loss)        
            
        print()
        
    print(f'*** Best Accuracy: {best_accuracy:.4f} ***')
    
    # Loading best model weights 
    model.load_state_dict(best_model_wts)
    return model, acc_list, loss_list

In [None]:
model = pretrained_model.to(device)

# Train model
s0 = datetime.datetime.now()
model, acc_list, loss_list = train_model(model, criterion, optimizer, original_dataloaders, datasets, epoch_num=100)
delta_time = datetime.datetime.now() - s0
print(f'Total training duration: {delta_time}')

In [None]:
# plot accuracy
plt.plot([a.cpu() for a in acc_list.train], label='train')
plt.plot([a.cpu() for a in acc_list.val], label='val')
plt.title('Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy Percent')
plt.legend()
plt.show()

In [None]:
# plot loss
plt.plot([a for a in loss_list.train], label='train')
plt.plot([a for a in loss_list.val], label='val')
plt.title('Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss Percent')
plt.legend()
plt.show()

# Plotting Confusion Matrix

In [None]:
from sklearn.metrics import confusion_matrix
import pandas as pd
import seaborn as sns


def plot_cm(model):
    y_true, y_pred = [], []
    model.eval()
    with torch.no_grad():
        for inputs, labels in original_dataloaders['val']:
            inputs = inputs.to(device)
            outputs = model(inputs)

            outputs = (torch.max(torch.exp(outputs), 1)[1]).data.cpu().numpy()
            y_pred.extend(outputs)

            labels = labels.data.cpu().numpy()
            y_true.extend(labels)

    cm = confusion_matrix(y_true, y_pred)
    df_cm = pd.DataFrame(
        cm / np.sum(cm, axis=1)[:, None],
        index=[i for i in class_names],
        columns=[i for i in class_names]
    )

    plt.figure(figsize=(15, 10))
    sns.heatmap(df_cm, annot=True, cbar=False)
    plt.show()

In [None]:
plot_cm(model)

# Visualizing The Predictions

In [None]:
def visualize_model(model):
    model.eval()
    nrows, ncols = 4, 4
    fig, axes = plt.subplots(nrows=nrows, ncols=ncols, figsize=(20, 10))

    with torch.no_grad():
        for i, (inputs, labels) in enumerate(original_dataloaders['val']):
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            _, predictions = torch.max(outputs, 1)

            for j in range(inputs.size()[0]):
                img = inputs.cpu().data[j]
                img = img.numpy().transpose((1, 2, 0))
                img = std * img + mean
                img = np.clip(img, 0, 1)
                axes[i][j].axis('off')
                axes[i][j].set_title(
                    f'predictions: {class_names[predictions[j]]}, label: {class_names[labels[j]]}'
                )
                axes[i][j].imshow(img)
                if j == ncols - 1:
                    break
            if i == nrows - 1:
                break

In [None]:
visualize_model(model)

# Do Everything For HOG Dataset

In [None]:
# Load pretrained model
pretrained_model = models.resnet18(pretrained=True)

# Freeze all trainable layers
for param in pretrained_model.parameters():
    param.requires_grad = False

# Modifying last classification layer for our dataset
num_features = pretrained_model.fc.in_features
pretrained_model.fc = nn.Linear(num_features, 10)
# print(pretrained_model)

model2 = pretrained_model.to(device)

# Train model
model2, acc_list2, loss_list2 = train_model(model2, criterion, optimizer, hog_dataloaders, hog_datasets, epoch_num=100)

In [None]:
plt.plot([a.cpu() for a in acc_list2.train], label='train')
plt.plot([a.cpu() for a in acc_list2.val], label='val')
plt.title('Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy Percent')
plt.legend()
plt.show()

In [None]:
plt.plot([a for a in loss_list2.train], label='train')
plt.plot([a for a in loss_list2.val], label='val')
plt.title('Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss Percent')
plt.legend()
plt.show()

In [None]:
plot_cm(model2)

In [None]:
visualize_model(model2)