## CNN Classifier Testscript - Vacation Images


### Imports

In [1]:
#general
import numpy as np
import splitfolders
import time
from datetime import datetime 
from time import gmtime, strftime
from tqdm import tqdm
import os

#pytorch
import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as T

#visualization
import matplotlib.pyplot as plt

### Some parameters to be set

In [2]:
#==============#
pixel = 128    # --> 128,64,32 are valid
#==============#
version = f'{pixel}x{pixel}'
#==============#
batch_size = 8
#==============#
epochs = 100
#==============#
lr = 0.001
#==============#
momentum = 0.9
#==============#
num_workers = 4
#==============#

### Script Runtime Start

In [3]:
def start_time_():    
    start_time = time.time()
    return(start_time)

def end_time_():
    end_time = time.time()
    return(end_time)

def Execution_time(start_time_,end_time_):
    return(strftime("%H:%M:%S",gmtime(int('{:.0f}'.format(float(str((end_time_-start_time_))))))))

In [4]:
script_start = start_time_()

### Create Directory for saving files

In [5]:
dirName_1 = f'graphs/{version}'
dirName_2 = f'models/{version}'
dirName_3 = f'output_txt/{version}'

directories = [dirName_1, dirName_2, dirName_3]

# Create target Directory if don't exist

for path in directories:
    if not os.path.exists(path):
        os.mkdir(path)
        print("Directory " , path ,  " Created ")
    else:    
        print("Directory " , path ,  " already exists")  

Directory  graphs/128x128  already exists
Directory  models/128x128  already exists
Directory  output_txt/128x128  already exists


### Initialize textfile for prints

In [6]:
f = open(f'output_txt/{version}/cnn_net_b{batch_size}_e{epochs}.txt', 'a')

print(f'The parameters for this network are: \n\
        - input image size: \t {version}\n\
        - batch_size dataloader: {batch_size}\n\
        - number of epcohs: \t {epochs}\n\
        - learning rate: \t\t {lr}\n\
        - momentum: \t\t\t {momentum}\n\
        - number of workers: \t {num_workers}','\n', file=f)


### Initialize GPU

In [7]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', device)

Using device: cuda


### Creating Folder Structure

The original folder containing the 2 subclass folder has to be split into subfolders for train, validation and test.

`Original:`  
* Data
    * Person
    * Scene
---
`New:`
* Data
    * train (70% of total)
        * Person
        * Scene
    * val (10% of total)
        * Person
        * Scene
    * test (20% of total)
        * Person
        * Scene

In [8]:
## Following Code does the described above

# input_folder = 'Data/'
# output_folder = 'img_data/'

# splitfolders.ratio(input_folder, output=output_folder, seed=1337, ratio=(0.7, 0.1, 0.2))

In [9]:
#paths to be used for dataloaders later on
path_to_train = './img_data/train/'
path_to_val = './img_data/val/'
path_to_test = './img_data/test/'

### Define data tranformations

In [10]:
mean = [0.4939, 0.5189, 0.5326] #calculated in seperate notebook file
std = [0.2264, 0.2398, 0.2707] #calculated in seperate notebook file


#for training a litte augmentation (random flip, grayscale and rotation) will be performed
train_transforms = T.Compose([
    T.Resize([pixel,]),
    T.CenterCrop([pixel,]),
    T.RandomHorizontalFlip(0.3), 
    T.RandomGrayscale(0.1),
    T.RandomRotation(30),
    T.ToTensor(),
    T.Normalize(torch.tensor(mean), torch.tensor(std))
])

#no augmentation needed, therefore only reszing, to tensor and normalizing
val_transforms = T.Compose([
    T.Resize([pixel,]),
    T.CenterCrop([pixel,]),
    T.ToTensor(),
    T.Normalize(torch.tensor(mean), torch.tensor(std))
])

#no augmentation needed, therefore only reszing, to tensor and normalizing
test_transforms = T.Compose([
    T.Resize([pixel,]),
    T.CenterCrop([pixel,]),
    T.ToTensor(),
    T.Normalize(torch.tensor(mean), torch.tensor(std))
])

### Initialize Datasets (Train, Validation, Test)

In [11]:
#Create datasets, labling is done with "ImageFolder"-Method
train_dataset = torchvision.datasets.ImageFolder(root=path_to_train, transform=train_transforms)
val_dataset = torchvision.datasets.ImageFolder(root=path_to_val, transform=val_transforms)
test_dataset = torchvision.datasets.ImageFolder(root=path_to_test, transform=test_transforms)

classes = ('person', 'scene')

### Initialize DataLoaders for the Datasets

In [12]:
#Define Loaders
train_loader = torch.utils.data.DataLoader(dataset = train_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
val_loader = torch.utils.data.DataLoader(dataset = val_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)
test_loader = torch.utils.data.DataLoader(dataset = test_dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers)

### Unnormalize pictures for visualization later

In [13]:
class UnNormalize(object):
    def __init__(self, mean, std):
        self.mean = mean
        self.std = std

    def __call__(self, tensor):
        """
        Args:
            tensor (Tensor): Tensor image of size (C, H, W) to be normalized.
        Returns:
            Tensor: Normalized image.
        """
        for t, m, s in zip(tensor, self.mean, self.std):
            t.mul_(s).add_(m)
            # The normalize code -> t.sub_(m).div_(s)
        return tensor

### Define Image viewer

In [14]:
def imshow(img):
    de_norm = UnNormalize(mean=(0.4939, 0.5189, 0.5326), std=(0.2264, 0.2398, 0.2707))
    img = de_norm(img)      # unnormalize
    npimg = img.cpu().numpy()
    plt.figure(figsize=(3,3))
    plt.imshow(np.transpose(npimg, (1, 2, 0)))
    plt.show()

### Define CNN 

In [15]:
class CNN(nn.Module):
    
    def __init__(self, pixel):
        
        super(CNN, self).__init__()
        self.pixel = pixel
        self.dropout = nn.Dropout(0.25)
        self.pool = nn.MaxPool2d(2,2)
        
        #_________128x128__________________________
        if self.pixel == 128:
            self.conv1 = nn.Conv2d(3, 6, 5) 
            self.conv2 = nn.Conv2d(6, 16, 5, padding = 1) 
            self.conv3 = nn.Conv2d(16, 16, 5)
            
            self.fc1 = nn.Linear(16 * 13 * 13, 120) #(16 channel * 13 * 13 (image size))
            
        #_________64x64____________________________
        elif self.pixel == 64:
            self.conv1 = nn.Conv2d(3, 6, 3, padding = 1) 
            self.conv2 = nn.Conv2d(6, 16, 3, padding = 1) 
            self.conv3 = nn.Conv2d(16, 16, 3)
            
            self.fc1 = nn.Linear(16 * 7 * 7, 120) #(16 channel * 7 * 7 (image size))
            
        #_________32x32____________________________
        elif self.pixel == 32:
            self.conv1 = nn.Conv2d(3, 6, 3, padding = 1) 
            self.conv2 = nn.Conv2d(6, 16, 3, padding = 1) 
            self.conv3 = nn.Conv2d(16, 16, 3)
            
            self.fc1 = nn.Linear(16 * 3 * 3, 120) #(16 channel * 3 * 3 (image size))
            
        #________following_linear_layers___________
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 20)
        self.fc4 = nn.Linear(20, 2)
    
    def forward(self,x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = F.relu(self.fc2(x))
        x = self.dropout(x)
        x = F.relu(self.fc3(x))
        x = self.fc4(x)
        return x

In [16]:
cnn = CNN(pixel).to(device)

### Define loss function and optimizer

In [17]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(cnn.parameters(), lr=lr, momentum=momentum)

### Train network

In [18]:
#splitted from below, to run more epochs if needed without setting loss lists to zero
train_loss = []  
validation_loss = []
min_valid_loss = np.inf  #value to distinguish if model will be saved or not. If val loss lower, model will be saved

In [None]:
#Starting Time 
train_start = start_time_()


#Training Iterations
#==========================================================================
for epoch in range(epochs):  # loop over the dataset multiple times
    
    
    #model training
    #______________________________________________________
    running_loss = 0.0
    cnn.train()
    for data, labels in tqdm(train_loader, leave=False):
        data, labels = data.to(device), labels.to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = cnn(data)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item()*data.size(0)
    
    
    #model validation
    #______________________________________________________
    valid_loss = 0.0
    val_correct = 0
    cnn.eval()
    for data, labels in tqdm(val_loader, leave=False):
        data, labels = data.to(device), labels.to(device)
        
        outputs = cnn(data)
        loss = criterion(outputs, labels)
        valid_loss += loss.item()*data.size(0)
        
        # get val_accuracy every epoch
        # the class with the highest energy is what we choose as prediction
        _, predicted = torch.max(outputs.data, 1)
        val_correct += (predicted == labels).sum().item()
    
    
    #metrics
    #______________________________________________________
    train_loss.append(running_loss / len(train_dataset))
    validation_loss.append(valid_loss / len(val_dataset))
    val_accuracy = int(100 * val_correct / len(val_dataset))

    
    #console output for tracking
    #______________________________________________________
    console = f'Epoch {epoch+1} \t Training Loss: {(running_loss / len(train_dataset)):.6f} \t Validation Loss: {(valid_loss / len(val_dataset)):.6f} \t Validation Accuracy: {val_accuracy}%'
    print(console,file=f)
    print(console)
     
    if min_valid_loss > (valid_loss / len(val_dataset)):
        saved = f'Validation Loss Decreased({min_valid_loss:.6f}--->{(valid_loss / len(val_dataset)):.6f}) \t Saving The Model'
        print(saved, file=f)
        print(saved)
        min_valid_loss = (valid_loss / len(val_dataset))
         
        # Saving State Dict
        torch.save(cnn.state_dict(), f'models/{version}/cnn_net_b{batch_size}_e{epochs}.pth') 
        
        
#==========================================================================


print('Finished Training','\n', file=f)
print('Finished Training')

#End Time of Training
train_end = end_time_()

#Time needed for training
print("Execution time of training:", Execution_time(train_start, train_end),'\n', file=f)
print("Execution time of training:", Execution_time(train_start, train_end))

  return torch.max_pool2d(input, kernel_size, stride, padding, dilation, ceil_mode)
  0%|                                                                                          | 0/261 [00:00<?, ?it/s]

Epoch 1 	 Training Loss: 0.677907 	 Validation Loss: 0.673712 	 Validation Accuracy: 59%
Validation Loss Decreased(inf--->0.673712) 	 Saving The Model


  0%|                                                                                          | 0/261 [00:00<?, ?it/s]

Epoch 2 	 Training Loss: 0.673788 	 Validation Loss: 0.673416 	 Validation Accuracy: 59%
Validation Loss Decreased(0.673712--->0.673416) 	 Saving The Model


### Show loss over epochs

In [None]:
plt.figure(figsize=(10,6))
plt.plot(train_loss, label='Train_Loss')
plt.plot(validation_loss, label='Validation_Loss')
plt.xlabel('Epochs', fontsize=15)
plt.ylabel('Loss', fontsize=15)
plt.legend(fontsize=13)
plt.tick_params(labelsize=12)
plt.grid()
plt.title('Loss Values', fontsize=18)
plt.savefig(f'graphs/{version}/loss_b{batch_size}_e{epochs}.png', dpi=300)

### Testing
#### Sample

In [None]:
#load a batch from the test set
dataiter = iter(test_loader)
images, labels = dataiter.next()
images, labels = images.to(device), labels.to(device)

#generate predictions of testloader samples
#deactivate droptout layer with .eval()
cnn.eval()                 
outputs = cnn(images)

#the class with the highest energy is what we choose as prediction
_, predicted = torch.max(outputs, 1)

for i in range(batch_size):
    if classes[labels[i]] == classes[predicted[i]]:
        boolean = 'Right'
    else:
        boolean = 'Wrong'
    print(f'Truth: {classes[labels[i]]} \t Prediction: {classes[predicted[i]]} \t Result: {boolean}')
    imshow(images[i])

#### Complete testset - final epoch

In [None]:
correct = 0
total = 0
# since we're not training, we don't need to calculate the gradients for our outputs
with torch.no_grad():
    for data, labels in test_loader:
        data, labels = data.to(device), labels.to(device)
        
        # calculate outputs by running images through the network
        outputs = cnn(data)
        # the class with the highest energy is what we choose as prediction
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print('Accuracy of the network on the 597 test images: %d %%' % (100 * correct / total),'\n', file=f)
print('Accuracy of the network on the 597 test images: %d %%' % (100 * correct / total))

#### Complete testset - best epoch

In [None]:
#Loading the model
cnn_best = CNN(pixel).to(device)
cnn_best.load_state_dict(torch.load(f'models/{version}/cnn_net_b{batch_size}_e{epochs}.pth'))
cnn.eval()

correct = 0
total = 0
# since we're not training, we don't need to calculate the gradients for our outputs
with torch.no_grad():
    for data, labels in test_loader:
        data, labels = data.to(device), labels.to(device)
        
        # calculate outputs by running images through the network
        outputs = cnn_best(data)
        # the class with the highest energy is what we choose as prediction
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
accuracy = int(100 * correct / total)

print('Accuracy of the network on the 597 test images with best iteration: %d %%' % (100 * correct / total),'\n', file=f)
print('Accuracy of the network on the 597 test images with best iteration: %d %%' % (100 * correct / total))

### Script Runtime Result

In [None]:
#End Time of Training
script_end = end_time_()

#Time needed for script
print(f'Execution time of script is : {Execution_time(script_start, script_end)} hh:mm:ss')

### Closing output textfile

In [None]:
f.close()