In [49]:
# pytorch imports
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter

#numpy imports
import numpy as np

# opencv imports
import cv2

# python imports
import os
from collections import OrderedDict, namedtuple
from itertools import product

# progress bar - tqdm
from tqdm import tqdm


In [8]:
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

In [None]:
# Download images from msft repo for cats and dogs
CATS = "PetImages\Cat"
DOGS = "PetImages\Dog"

CATEGORIES = {
    CATS : 0,
    DOGS : 1
}

OUTPUT_FILE = "cats_and_dogs.npy"
IMAGE_SIZE = 50

# Prepare dataset and store in .npy file


results = []
input_image_processing_failed = 0
cat_images = 0
dog_images = 0

for category in CATEGORIES:
    for file_name in tqdm(os.listdir(category)):
        if "jpg" in file_name:
            try:
                file_path = os.path.join(category, file_name)
                image = cv2.imread(file_path, cv2.IMREAD_GRAYSCALE)
                resize_image = cv2.resize(image, (IMAGE_SIZE, IMAGE_SIZE))
                
                # Get the category value for 1 hot encoding
                label_index = CATEGORIES[category]
                
                # [1,0] for cat and [0,1] for dog
                label  = np.eye(2)[label_index]
                
                # Each entry is (np.array(image), np.array(label)) tuple
                result = (np.array(resize_image), label)
                results.append(result)
                
                if category == CATS:
                    cat_images+= 1
                elif category == DOGS:
                    dog_images+=1
            
            except Exception as e:
                input_image_processing_failed+=1

print("Cat Images: ", cat_images, " Dog Images: ", dog_images, " Failed Images: ",  input_image_processing_failed)

np.random.shuffle(results)
np.save(OUTPUT_FILE, results, allow_pickle=True)
                
                
                

In [33]:
TEST_PERCENTAGE = 0.1

# Build tensors for training and test dataset

def get_train_test_dataset_tensors():
    data = np.load(OUTPUT_FILE, allow_pickle=True)
    # print(data.shape) # (24946, 2)


    split_index = int(0.1*len(results))
    
    
    images_tensor = torch.tensor([entry[0] for entry in data], dtype= torch.float)
    images_tensor = images_tensor/255.0
    images_tensor = images_tensor.to(device)
    
    labels_tensor = torch.tensor([entry[1] for entry in data], dtype= torch.float)
    labels_tensor = labels_tensor.to(device)
    
    train_images_tensor = images_tensor[:-split_index].reshape(-1,1,50,50)
    train_labels_tensor = labels_tensor[:-split_index]
    
    test_images_tensor = images_tensor[-split_index:].reshape(-1,1,50,50)
    test_labels_tensor = labels_tensor[-split_index:]
    
    
    return [(train_images_tensor, train_labels_tensor), (test_images_tensor, test_labels_tensor)]
    
#     # List of (image, label) tuple
#     train_dataset =  data[:-split_index]
    
#     # List of (image, label) tuple
#     test_dataset = data[-split_index:]
    
#     # Training
#     train_images_tensor = torch.tensor([entry[0] for entry in train_dataset]).reshape(-1,1,IMAGE_SIZE, IMAGE_SIZE)
#     train_images_tensor = train_images_tensor.to(device, dtype=torch.float)
    
#     train_labels_tensor = torch.tensor([entry[1] for entry in train_dataset])
#     train_labels_tensor = train_labels_tensor.to(device, dtype=torch.float)
    
#     # Test
#     test_images_tensor = torch.tensor([entry[0] for entry in test_dataset]).reshape(-1,1,IMAGE_SIZE, IMAGE_SIZE)
#     test_images_tensor = test_images_tensor.to(device, dtype=torch.float)
    
#     test_labels_tensor = torch.tensor([entry[1] for entry in test_dataset])
#     test_labels_tensor = test_labels_tensor.to(device, dtype=torch.float)
    
#     return [(train_images_tensor, train_labels_tensor), (test_images_tensor, test_labels_tensor)]

train_dataset, test_dataset = get_train_test_dataset_tensors()

In [43]:
# Sanity Verification of data and shapes
train_images, train_labels = train_dataset
#print(train_images.shape, train_labels.shape)

test_images, test_labels = test_dataset
#print(test_images.shape, test_labels.shape, test_images[0])



In [44]:
# Build Network

class CatsAndDogsNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=32, kernel_size=5)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=5)
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=5)
        
        self.linear_size=None
        X = torch.rand(IMAGE_SIZE, IMAGE_SIZE).reshape(-1,1,IMAGE_SIZE,IMAGE_SIZE)
        self.convs(X)
        
        self.fc1 = nn.Linear(in_features = self.linear_size, out_features = 512)
        self.out =  nn.Linear(in_features = 512, out_features = 2)
    
    def convs(self, X):
        X =  F.relu(self.conv1(X))
        X = F.max_pool2d(X, kernel_size=2, stride=2)
        
        X = F.relu(self.conv2(X))
        X = F.max_pool2d(X, kernel_size=2, stride=2)
        
        X = F.relu(self.conv3(X))
        X = F.max_pool2d(X, kernel_size=2, stride=2)
        
        # flatten from dim1. dim0 is batch size and we dont want to flatten on that
        X = X. flatten(1,-1)
        
        if self.linear_size is None:
            self.linear_size = X.shape[1]
        
        return X
    
    def forward(self, X):
        X = self.convs(X)
        
        # FC1
        X = F.relu(self.fc1(X))
        
        X = self.out(X)
        
        return F.softmax(X, dim=1)

network = CatsAndDogsNetwork()
network.to(device)
print(network, network.linear_size)            
        
        
        
        
        
        


CatsAndDogsNetwork(
  (conv1): Conv2d(1, 32, kernel_size=(5, 5), stride=(1, 1))
  (conv2): Conv2d(32, 64, kernel_size=(5, 5), stride=(1, 1))
  (conv3): Conv2d(64, 128, kernel_size=(5, 5), stride=(1, 1))
  (fc1): Linear(in_features=512, out_features=512, bias=True)
  (out): Linear(in_features=512, out_features=2, bias=True)
) 512


In [45]:
# RunGenerator class 
# To try various hyper parameters for the network
class RunGenerator:
    @staticmethod
    def get_runs(params):
        runs = []
        hyperparams = params.keys()
        hyperparams_values = params.values()
        
        Run = namedtuple("Run", hyperparams)
        
        for combination in product(*hyperparams_values):
            run = Run(*combination)
            runs.append(run)
        
        return runs



for run in runs:
    print(run)

Run(batch_size=100, lr=0.1)
Run(batch_size=100, lr=0.001)
Run(batch_size=1000, lr=0.1)
Run(batch_size=1000, lr=0.001)


In [46]:
# Run Manager to manage Run/Epoch level operations and logging to TensorBoard
class RunManager():
    def __init__(self):
        self.run =  None
        self.network = None
        self.tb = None
        
        self.train_dataset_size = None
        self.test_dataset_size = None
        
        self.epoch_id = 0
        self.correct_predictions = 0
        self.total_loss=0.0
        
    def run_start(self, run, network, train_dataset_size, test_dataset_size):
        self.run = run
        self.tb = SummaryWriter(comment=f"-{run}")
        self.network = network
        
        self.train_dataset_size = train_dataset_size
        self.test_dataset_size = test_dataset_size
        
        self.epoch_id = 0
    
    def run_end(self):
        self.tb.close()
    
    def epoch_start(self):
        self.correct_predictions = 0
        self.total_loss=0.0
        
    
    def epoch_end(self):
        accuracy = 100*(self.correct_predictions/self.train_dataset_size)
        
        self.tb.add_scalar("Training Accuracy", accuracy,self.epoch_id)
        self.tb.add_scalar("Training Loss", self.total_loss, self.epoch_id)
        
        for name, param in self.network.named_parameters():
            self.tb.add_histogram(f"{name}.grad", param.grad, self.epoch_id)
        
        self.total_loss=0.0
        self.epoch_id+=1
        
    # This will be called per batch
    def track_correct_predictions(self, predictions, actual):
        correct_predictions = self.get_correct_predictions(predictions, actual)
        self.correct_predictions+= correct_predictions
    
    # This will be called per batch
    def track_total_loss(self,loss):
        self.total_loss+=loss
    
    def get_correct_predictions(self, predictions, actual):
        predictions = predictions.argmax(dim=1)
        actual = actual.argmax(dim=1)
        
        matches = predictions.eq(actual).sum().item()
        
        return matches
    
    def record_validation_stats(self, accuracy, loss, epoch):
        self.tb.add_scalar("Validation Accuracy: ", accuracy, epoch)
        self.tb.add_scalar("Validation Loss: ", loss, epoch)
        
rm = RunManager()
    

In [None]:
test_images_size = test_images.shape[0]

def get_random_batch_of_training_data(size=32):
    random_index = np.random.randint(size, test_images_size+1)
    
    train_images_batch = train_images[random_index-size:random_index]
    train_labels_batch = train_labels[random_index-size:random_index]
        
    return (train_images_batch, train_labels_batch)



In [None]:
# Generate various runs experimentation and hyoerparam tuning
params = OrderedDict(
    batch_size= [100],
    lr = [0.001]
)

runs = RunGenerator.get_runs(params)

# Initialize Run Manager
rm = RunManager()

train_size = train_images.shape[1]
test_size = test_images.shape[1]

# Training Loop
EPOCHS = 1

for run in runs:
    optimizer = optim.Adam(network.parameters(), run.lr)
    loss_func = nn.MSELoss()
    rm.run_start(run, network, train_size, test_size)
    
    for epoch in EPOCHS:
        rm.epoch_start()
        for i in tqdm(range(0,train_size, run.batch_size)):
            train_images_batch = train_images[i:i+run.batch_size]
            train_labels_batch = train_labels[i:i+run.batch_size]
            
            network.zero_grad()
            
            predicted = network(train_images_batch)
            loss = loss_func(predicted, train_labels_batch)
            
            rm.track_correct_predictions(predicted, train_labels_batch)
            rm.track_total_loss(loss.item())
            
            loss.backward()
            optim.step()
                
        rm.epoch_end()
        
        if epoch %2 == 0:
            # Test on a random batch of some size
            random_batch_images, random_batch_labels = get_random_batch_of_training_data()
            with torch.no_grad:
                predicted = network(random_batch_images)
                correct_predictions = rm.get_correct_predictions(predicted, random_batch_labels)
                accuracy = 100*(correct_predictions/100)
                loss = loss_func(predicted, random_batch_labels)
                rm.record_validation_stats(accuracy,loss, epoch)
    
    
    with torch.no_grad():
        # Test on entire test dataset
        predicted = network(test_images)
        correct_predictions = rm.get_correct_predictions(predicted, test_labels)
        accuracy = 100*(correct_predictions/100)
        loss = loss_func(predicted, test_labels)
        rm.record_validation_stats(accuracy,loss, epoch)

    
    rm.run_end()
            

