# Image Recognition Training
#### Practice creating and optimizing image recognition models using the CIFAR10 data set. Current training using Kfold cross validation

In [1]:
"""
Torchvision
"""
# Basics
import torch
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
# For CNN
import torch.nn as nn
import torch.nn.functional as F
# For Loss Func. And Optimization
import torch.optim as optim
# Utility
import torchvision.datasets
from torchvision.datasets import ImageFolder as img_fold
from torchvision.utils import save_image

"""
Numpy
"""
# For plots and 
import numpy as np

"""
Sklearn
"""
# Validation and Cross-validation
from sklearn.model_selection import train_test_split
from sklearn.model_selection import KFold


"""
Extra
"""
import matplotlib.pyplot as plt
from torch.utils.tensorboard import SummaryWriter



print("Success")

Success


In [None]:
BATCH_SIZE = 32
EPOCHS = 100
WORKERS = 4
RANDOM_STATE = 42

NUM_CLASSES = 10

if torch.cuda.is_available():
    print("Available")
    device = torch.device("cuda")
    print("")
    
# Image Categories
classes = ('airplane', 'automobile', 'bird', 'cat',
           'deer', 'dog', 'frog', 'horse', 'ship', 'truck') 

# normalizer
transformer = transforms.Compose([transforms.ToTensor(),transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])

In [None]:
"""
Bringing in CIFAR10 training and test data
"""
CIFAR_Train = torchvision.datasets.CIFAR10(root='./data', train=True,
                                        download=True, transform=transformer)
CIFAR_Test = torchvision.datasets.CIFAR10(root='./data', train=False,
                                        download=True, transform=transformer)

"""
Bringing in custom data
"""
# Setting root directory for custom training and test data; Use datasets in CNN_Dataset for training
trainroot = "./CNN_Dataset/Level 1"
testroot = "./Noiseimg/test"

# Using ImageFolder to designate and transform our entire custom dataset (determine if necessary to transform here and in dataloaders)
trainset = img_fold(root=trainroot, transform=transformer)
noise_testset = img_fold(root=testroot, transform=transformer)

In [None]:
# Storing tensors for images and labels in custom dataset
# Not necessary to run if training on the CIFAR10 dataset
img, label = trainset[0][0], trainset[0][1]

# Verifying information
print(f"Image tensor:\n\n{img}\n")
print(f"Image shape: {img.shape}")
print(f"Image datatype: {img.dtype}")
print(f"Image label: {label}")
print(f"Label datatype: {type(label)}")

### Use the following code cell if you want to train on EITHER the CIFAR10 or custom dataset

In [None]:
# USE THIS IF NOT MERGING CIFAR10 AND CUSTOM DATASET

# Grabbing labels from dataset
# Modify depending on what dataset you use
class_names = CIFAR_Train.classes
print('Labels: ',class_names,'\n')

# Verifying length of CIFAR10 Dataset
print('CIFAR10 Trainset Length: ', len(CIFAR_Train))
print('CIFAR10 Testset Length: ', len(CIFAR_Test),'\n')

# Verifying length of custom dataset
print('Custom Trainset Length: ', len(trainset))
print('Custom Testset Length: ', len(noise_testset))

### Use the following two code cells if you want to train on BOTH the CIFAR10 and custom dataset

In [None]:
# Merging CIFAR10 and custom datasets
# ONLY USE TO IF YOU WANT TO TRAIN ON BOTH NORMAL AND NOISY IMAGES
Merged_dataset = torch.utils.data.ConcatDataset([CIFAR_Train, trainset])

# Grabbing labels from merged dataset
class_names = Merged_dataset.datasets[0].classes
print('Labels: ', class_names, '\n')

# Verifying information
print(f"Image tensor:\n\n{img}\n")
print(f"Image shape: {img.shape}")
print(f"Image datatype: {img.dtype}")
print(f"Image label: {label}")
print(f"Label datatype: {type(label)}")

In [None]:
# Storing tensors for images and labels in merged dataset
img, label = Merged_dataset[0][0], Merged_dataset[0][1]

# Verifying information
print(f"Image tensor:\n\n{img}\n")
print(f"Image shape: {img.shape}")
print(f"Image datatype: {img.dtype}")
print(f"Image label: {label}")
print(f"Label datatype: {type(label)}")

In [None]:
"""
Creating dataloaders
"""

# Modify depending on what dataset you want to use
trainloader = torch.utils.data.DataLoader(Merged_dataset, batch_size=BATCH_SIZE,
                                            shuffle=True, num_workers=4)

# TestLoader for CIFAR10 images
CIFAR10_testloader=torch.utils.data.DataLoader(CIFAR_Test, batch_size = BATCH_SIZE,
                                            shuffle=True, num_workers=4)

# TestLoader for Custom data
Noise_testloader = torch.utils.data.DataLoader(noise_testset, batch_size=BATCH_SIZE,
                                            shuffle=True, num_workers=4)




"""
Creating validation loaders for validation training
"""

# Split dataset into train and validation sets (ONLY NECESSARY FOR val_train_model FUNCTION)
# Modify depending on what dataset you want to use
train_set, val_set = train_test_split(Merged_dataset, test_size=0.2, random_state=42)

# Dataloader for validation set
valloader = torch.utils.data.DataLoader(val_set, batch_size=4, shuffle=False, num_workers=2)

In [None]:
"""
Showing sample images from training set
"""
# Defining function to show images from training set
def imshow(img):
    img = img / 2 + 0.5     # Unnormalize
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()
    
# Randomly select images from training set
dataiter = iter(trainloader)
images, labels = next(dataiter)

# Show images
imshow(torchvision.utils.make_grid(images))

# Print labels for each shown image
print('Actual labels: ', ' '.join(f'{class_names[labels[j]]}' for j in range(BATCH_SIZE)))


In [None]:
# Verifying information
print(f"Image shape: {img.shape} -> [batch_size, color_channels, height, width]")
print(f"Label shape: {labels.shape}")

In [None]:
"""
Training Functions
"""

# Train only on entire training set
def full_train_model(model, trainloader, criterion, optimizer, device, epochs):
    
    """
    Train the model on the training data.

    Args:
        model (nn.Module): The neural network model.
        trainloader (DataLoader): DataLoader for the training dataset.
        criterion: The loss criterion.
        optimizer: The optimization algorithm.
        device: The device to which the model and data should be moved (e.g., "cuda" or "cpu").
        epochs (int): Number of epochs for training.
    """
    
    model.to(device)
    model.train()
    
    for epoch in range(epochs):
        running_loss = 0.0
        correct_train = 0
        total_train = 0

        for images, labels in trainloader:
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total_train += labels.size(0)
            correct_train += (predicted == labels).sum().item()

        train_accuracy = correct_train / total_train

        print(f'Epoch {epoch + 1}, Training Loss: {running_loss / len(trainloader):.3f}, Training Accuracy: {100 * train_accuracy:.2f}%')

    print("Training Complete")

# Train on a training and validation set
def val_train_model(model, trainloader, valloader, criterion, optimizer, device, epochs):

    """
    Train the model using train-validation technique. Make sure to split the initial training set into a training and validation set
    and the dataloader for validation set is initialized!

    Args:
        model (nn.Module): The neural network model.
        trainloader (DataLoader): DataLoader for the training dataset.
        valloader (DataLoader: DataLoader for the validation set)
        criterion: The loss criterion.
        optimizer: The optimization algorithm.
        device: The device to which the model and data should be moved (e.g., "cuda" or "cpu").
        epochs (int): Number of epochs for training.
    """
    model.to(device)
    
    for epoch in range(epochs):
        running_loss = 0.0
        correct_train = 0
        total_train = 0

        # Training Phase
        model.train()
        for images, labels in trainloader:
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total_train += labels.size(0)
            correct_train += (predicted == labels).sum().item()

        train_accuracy = correct_train / total_train

        # Validation Phase
        model.eval()
        val_loss = 0.0
        val_correct_predictions = 0
        val_total_samples = 0
    
        with torch.no_grad():
            for images, labels in valloader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                
                # calculate accuracy for validation
                _, predicted = torch.max(outputs.data, 1)
                val_total_samples += labels.size(0)
                val_correct_predictions += (predicted == labels).sum().item()
        
        val_accuracy = val_correct_predictions / val_total_samples
    
        print(f'Epoch {epoch + 1}, Training Loss: {running_loss / len(trainloader):.3f}, Training Accuracy: {100 * train_accuracy:.2f}%, Validation Loss: {val_loss / len(valloader)}, Validation Accuracy: {val_accuracy:.2%}')

    print("Training Complete")
    
def kfold_train_model(model, dataset, criterion, optimizer, device, k_folds, epochs, batch_size, workers, patience):
    
    """
    Train the model using kfold cross-validation technique.

    Args:
        model (nn.Module): The neural network model.
        dataset: dataset for training. Will be split into train and validation sets
        criterion: The loss criterion.
        optimizer: The optimization algorithm.
        device: The device to which the model and data should be moved (e.g., "cuda" or "cpu").
        epochs (int): Number of epochs for training.
        batch_size (int): Batch size for DataLoader.
        workers (int): Number of workers for DataLoader.
        patience (int): Number of epochs with no improvement after which training will be stopped.
    """
    writer = SummaryWriter()

    kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)

    for fold, (train_idx, val_idx) in enumerate(kf.split(dataset)):
        
        # Data splitting
        train_dataset = torch.utils.data.Subset(dataset, train_idx)
        val_dataset = torch.utils.data.Subset(dataset, val_idx)

        # Creating dataloaders
        trainloader = torch.utils.data.DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=workers)
        valloader = torch.utils.data.DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=workers)
        
        model.to(device)

        # Move optimizer to GPU. Necessary for optimizers like Adagrad
        for state in optimizer.state.values():
            for k, v in state.items():
                if torch.is_tensor(v):
                    state[k] = v.to(device)
                    
        
        # Training loop for each epoch
        for epoch in range(epochs):  
            running_loss = 0.0
            correct_train = 0
            total_train = 0

            # Training phase
            model.train()
            
            best_val_loss = float('inf')
            no_improvement_epochs = 0

            
            for i, (images, labels) in enumerate(trainloader):
                
                images, labels = images.to(device), labels.to(device)

                optimizer.zero_grad()
                outputs = model(images)
                loss = criterion(outputs, labels)
                loss.backward()
                optimizer.step()

                running_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                total_train += labels.size(0)
                correct_train += (predicted == labels).sum().item()
                
                # Print intermediate values
                if i % 100 == 99:  # Print every 100 batches
                    train_accuracy_batch = correct_train / total_train
                    print(f'Fold {fold + 1}, Epoch {epoch + 1}, Batch {i + 1}, Loss: {loss.item()}, Accuracy: {100 * train_accuracy_batch:.2f}%')


            train_accuracy = correct_train / total_train

            # Validation phase
            model.eval()
            val_loss = 0.0
            val_correct_predictions = 0
            val_total_samples = 0

            with torch.no_grad():
                for images, labels in valloader:
                    images, labels = images.to(device), labels.to(device)
                    outputs = model(images)
                    loss = criterion(outputs, labels)
                    val_loss += loss.item()

                    # calculate accuracy for validation
                    _, predicted = torch.max(outputs.data, 1)
                    val_total_samples += labels.size(0)
                    val_correct_predictions += (predicted == labels).sum().item()

            val_accuracy = val_correct_predictions / val_total_samples

            # Print loss and accuracy per epoch for both training and validation
            print(f'Fold {fold + 1}, Epoch {epoch + 1}, Training Loss: {running_loss / len(trainloader):.3f}, Training Accuracy: {100 * train_accuracy:.2f}%, Validation Loss: {val_loss / len(valloader)}, Validation Accuracy: {val_accuracy:.2%}')
            
            
            # TensorBoard logging
            writer.add_scalar(f'Loss/Train/Fold_{fold + 1}', running_loss / len(trainloader), epoch)
            writer.add_scalar(f'Loss/Validation/Fold_{fold + 1}', val_loss / len(valloader), epoch)
            writer.add_scalar(f'Accuracy/Train/Fold_{fold + 1}', 100 * train_accuracy, epoch)
            writer.add_scalar(f'Accuracy/Validation/Fold_{fold + 1}', 100 * val_accuracy, epoch)
            
            # Early stopping check
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                no_improvement_epochs = 0
            else:
                no_improvement_epochs += 1

            if no_improvement_epochs >= patience:
                print(f'Early stopping! No improvement in validation loss for {patience} epochs.')
                break
            
            print("")

    # Close TensorBoard writer
    writer.close()
    print("Training Complete")
        

In [None]:

# Test Function
def test_model(model, model_name, testloader, device):
    
    """
    Test the model on the entire dataset and store statistics.

    Args:
        model (nn.Module): The neural network model.
        model_name: Name of the neural network model. Used in storing statistics
        testloader (DataLoader): DataLoader for the test dataset.
        device: The device to which the model and data should be moved (e.g., "cuda" or "cpu").

    """
    model.to(device)
    model.eval()
    
    correct = 0 
    total = 0
    
    correct_pred = {classname: 0 for classname in classes}
    total_pred = {classname: 0 for classname in classes}

    with torch.no_grad():
        for data in testloader:
            images, labels = data
            images, labels = images.to(device), labels.to(device)
            
            outputs = model(images)
            
            # Pick class with highest similarity score
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            
    print(f'Accuracy of the network on the test images: {100 * correct // total} %\n')
    

    with torch.no_grad():
        for data in testloader:
            images, labels = data
            images, labels = images.to(device), labels.to(device)
            
            outputs = model(images)
            _, predictions = torch.max(outputs, 1)
            
            # collect the correct predictions for each class
            for label, prediction in zip(labels, predictions):
                if label == prediction:
                    correct_pred[classes[label]] += 1
                total_pred[classes[label]] += 1


    # print accuracy for each class
    for classname, correct_count in correct_pred.items():
        accuracy = 100 * float(correct_count) / total_pred[classname]
        print(f'Accuracy for class: {classname:5s} is {accuracy:.1f} %')
        
    # store statistics
    write_accuracy_to_file(f'{model_name}_accuracy.txt', model_name, correct, total, correct_pred, total_pred, EPOCHS)

# Saving statistics    
def write_accuracy_to_file(filename, model_name, correct, total, correct_pred, total_pred, epochs):

    """
    Store model statistics. Designed to be called within test_model function
    """
    # Open the specified text file in write mode
    with open(filename, 'w') as file:
        # Write overall accuracy to the file
        file.write(f'Model: {model_name}\n\n')
        file.write(f'Total Epochs: {epochs}\n\n')
        file.write(f'Accuracy of the network on the 10000 test images: {100 * correct // total} %\n\n')

        # Write accuracy for each class to the file
        for classname, correct_count in correct_pred.items():
            accuracy = 100 * float(correct_count) / total_pred[classname]
            file.write(f'Accuracy for class: {classname:5s} is {accuracy:.1f} %\n')

# Saving model (needs debugging)    
def save_model(model, name):
    PATH = fr"./Models/{name}"
    torch.save(model, PATH)

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        
        # Creating three convolutional layers and utilizing batch normalization after each convolution
        self.conv1 = nn.Conv2d(3, 64, 3, padding=1)
        self.batch_norm1 = nn.BatchNorm2d(64)
        self.conv2 = nn.Conv2d(64, 128, 3, padding=1)
        self.batch_norm2 = nn.BatchNorm2d(128)
        self.conv3 = nn.Conv2d(128, 256, 3, padding=1)
        self.batch_norm3 = nn.BatchNorm2d(256)
        
        # Dropout Layers
        self.dropout1 = nn.Dropout(0.5)
        self.dropout2 = nn.Dropout(0.5)
        self.dropout3 = nn.Dropout(0.5)

        # Pooling and fully connected layers
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(256 * 4 * 4, 1024)
        self.batch_norm_fc1 = nn.BatchNorm1d(1024)
        self.fc2 = nn.Linear(1024, 512)
        self.batch_norm_fc2 = nn.BatchNorm1d(512)
        self.fc3 = nn.Linear(512, 256)
        self.batch_norm_fc3 = nn.BatchNorm1d(256)
        self.fc4 = nn.Linear(256, 128)
        self.batch_norm_fc4 = nn.BatchNorm1d(128)
        self.fc5 = nn.Linear(128, 10)


    def forward(self, x):
        x = self.pool(F.relu(self.batch_norm1(self.conv1(x))))
        x = self.dropout1(x)
        x = self.pool(F.relu(self.batch_norm2(self.conv2(x))))
        x = self.dropout2(x)
        x = self.pool(F.relu(self.batch_norm3(self.conv3(x))))
        x = self.dropout3(x)
        
        x = x.view(-1, 256 * 4 * 4)
        
        x = F.relu(self.batch_norm_fc1(self.fc1(x)))
        x = F.relu(self.batch_norm_fc2(self.fc2(x)))
        x = F.relu(self.batch_norm_fc3(self.fc3(x)))
        x = F.relu(self.batch_norm_fc4(self.fc4(x)))
        
        x = self.fc5(x)

        return x

In [None]:
test = Net()

In [None]:
"""
DO NOT USE! MODIFY BASED ON NEW MODELS!

# Creating new models

model_train_level_1 = Net()
model_train_val_level_1 = Net()
model_train_vc_level_1 = Net()

# Copying weights of old models to retrain
model_train_level_1.load_state_dict(torch.load('./Models/cifar_net.pth'))
model_train_val_level_1.load_state_dict(torch.load('./Models/cifar_net_Validation.pth'))
model_train_vc_level_1.load_state_dict(torch.load('./Models/cifar_net_kfold.pth'))
"""

In [None]:
# Setting up loss func. and optim
criterion = nn.CrossEntropyLoss()
# optimizer = optim.SGD(test.parameters(), lr = 0.00001, momentum = 0.9, weight_decay=.001)
# optimizer = optim.Adam(test.parameters(), lr = 0.0001, weight_decay=.001)
optimizer = optim.Adagrad(test.parameters(), lr=0.001, weight_decay=0.001)


In [None]:
# full_train_model(test, trainloader, criterion, optimizer, device, EPOCHS)
# val_train_model(model_train_val_level_1, trainloader, valloader, criterion, optimizer, device, EPOCHS)
kfold_train_model(test, Merged_dataset, criterion, optimizer, device, 10, EPOCHS, BATCH_SIZE, 4, 2)




In [None]:
"""
Retrieve batch of images from test set
"""

dataiter = iter(CIFAR10_testloader)
images, labels = next(dataiter)

# show img from test set
imshow(torchvision.utils.make_grid(images))
print('Truth: ', ' '.join(f'{CIFAR_Test.classes[labels[j]]:5s}' for j in range(BATCH_SIZE)))

In [None]:
# loading saved model
net = Net()
net.load_state_dict(torch.load('./Models/Merged_Kfold_AdaGrad_F10_E100_B32_W4.pth'))

# Testing against above images
outputs = net(images)

_, predicted = torch.max(outputs, 1)

print('Predicted: ', ' '.join(f'{CIFAR_Test.classes[predicted[j]]:5s}'
                              for j in range(BATCH_SIZE)))

In [None]:
# Testing model on entire test test
test_model(net, 'CIFAR10_Merged_Kfold_AdaGrad_F10_E100_B32_W4', CIFAR10_testloader, device)

In [None]:
PATH = './Models/Merged_Kfold_AdaGrad_F10_E100_B32_W4.pth'
torch.save(test.state_dict(), PATH)