# Image Data Loss Exploratory Analysis

## Setup Environment

In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt # visualization
import seaborn as sns # visualization
# machine learning
import torch 
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision import transforms, datasets, models

!pip3 install progressbar
import progressbar

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [2]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

## Load MNIST Dataset

In [3]:
# import mnist dataset
BATCH_SIZE = 250

train_loader = torch.utils.data.DataLoader(
    torchvision.datasets.MNIST('/kaggle/working',
                               train=True,
                               download=True,
                               transform=torchvision.transforms.ToTensor()),
    batch_size=BATCH_SIZE,shuffle=True)

test_loader = torch.utils.data.DataLoader(
    torchvision.datasets.MNIST('/kaggle/working',
                               train=False,
                               download=True,
                               transform=torchvision.transforms.ToTensor()),
    batch_size=BATCH_SIZE,shuffle=True)

## MNIST Classifier / Baseline Model

In [4]:
class BasicClassifier(nn.Module):
    def __init__(self,num_classes) -> None:
        self.num_classes = num_classes
        super(BasicClassifier, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1,64,kernel_size=7),
            nn.ReLU(inplace=True),
            nn.Conv2d(64,128,kernel_size=11),
            nn.ReLU(inplace=True),
            nn.Conv2d(128,182,kernel_size=3),
            nn.ReLU(inplace=True),
            nn.Conv2d(182,256,kernel_size=5),
        )
        self.classifier = nn.Sequential(
            nn.Linear(256 * 6 * 6, 4096),
            nn.ReLU(inplace=True),
            nn.Linear(4096,2048),
            nn.ReLU(inplace=True),
            nn.Linear(2048,1024),
            nn.ReLU(inplace=True),
            nn.Linear(1024,num_classes),
        )
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.features(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

## Metrics Class

In [5]:
class Metrics:
    def __init__(self):
        pass
    
    def update_confusion_matrix(self, outputs: torch.Tensor, labels: torch.Tensor) -> None:       
        for (guess,label) in zip(outputs.argmax(1),labels):
            self.confusion_matrix[guess,label] += 1
            
    def get_confusion_matrix(self,norm=False) -> np.array:
        if norm:
            return self.confusion_matrix / self.confusion_matrix.sum()
        return self.confusion_matrix
    
    def reset_confusion_matrix(self,num_classes: int) -> None:
        self.confusion_matrix = np.zeros((num_classes,num_classes))
    
    def classification_metrics(self,print_=False) -> tuple:
        s = "Accuracy: {}\nPrecision: {}\nRecall: {}\nf1-score: {}\nSupport: {}".format(self.accuracy(),self.precision(),self.recall(),self.f1_score(),self.confusion_matrix.sum())
        if print_:
            print(s)
        return [self.accuracy(),self.precision(),self.recall(),self.f1_score(),self.confusion_matrix.sum()]
    def accuracy(self) -> int:
        dim = self.confusion_matrix.shape[0]
        correct = 0.0
        total = self.confusion_matrix.sum()
        for i in range(dim):
            correct += self.confusion_matrix[i,i]
        return correct/total
    def precision(self) -> int:
        fp = 0
        tp = 0
        for i in range(len(self.confusion_matrix)):
            for j in range(len(self.confusion_matrix)):
                if i == j:
                    tp += self.confusion_matrix[i,j]
                if i < j:
                    fp += self.confusion_matrix[i,j]
        return tp / (tp + fp)
    def recall(self) -> int:
        fn = 0
        tp = 0
        for i in range(len(self.confusion_matrix)):
            for j in range(len(self.confusion_matrix)):
                if i == j:
                    tp += self.confusion_matrix[i,j]
                if i > j:
                    fn += self.confusion_matrix[i,j]
        return tp / (tp + fn)
    def f1_score(self) -> int:
        p = self.precision()
        r = self.recall()
        return 2.0*(p*r)/(p+r)
    
    def feature_map(self,inputs: torch.Tensor,model,print_=False):
        no_of_layers=0
        conv_layers=[]

        model_children=list(model.children())

        for child in model_children:
            if type(child)==nn.Conv2d:
                conv_layers.append(child)
            elif type(child) == nn.Sequential:
                for layer in child.children():
                    if type(layer) == nn.Conv2d:
                        conv_layers.append(layer)
#         (inputs,labels) = next(iter(test_loader))
        results = [conv_layers[0](inputs.to(device))]
        for i in range(1, len(conv_layers)):
            results.append(conv_layers[i](results[-1]))
        outputs = results
        if print_:
            plt.imshow(inputs[0,0].to('cpu'),cmap='Greys')
            plt.show()
            for num_layer in range(len(outputs)):
                plt.figure(figsize=(50, 10))
                layer_viz = outputs[num_layer][0, :, :, :]
                layer_viz = layer_viz.data
#                 print("Layer ",num_layer+1)
                for i, filter in enumerate(layer_viz.to('cpu')):
                    if i == 8: 
                        break
                    plt.subplot(2, 8, i + 1)
                    plt.imshow(filter, cmap='gray')
                    plt.axis("off")
                plt.show()
                plt.close()
        return results
    
    def feature_map_diff(map1,map1_loss) -> np.array:
        pass
metric = Metrics()

# Data Loss Class




In [6]:
class DataLoss:
    def __init__(self):
        pass
    def random_per_pixel(self,inputs: torch.Tensor,ppp: float=0.0) -> torch.Tensor:
        # ppp: proportion per pixel
        lossyinputs = torch.clone(inputs)
        mask = torch.Tensor(np.random.rand(inputs.shape[0],inputs.shape[1],inputs.shape[2],inputs.shape[3]))
        lossyinputs = lossyinputs * (mask > ppp)
        return lossyinputs
    
    def random_per_img(self,inputs: torch.Tensor,ppp: float=0.0) -> torch.Tensor:
        if int(inputs.shape[2]*inputs.shape[3]*ppp) < 1:
            return inputs
        if ppp > 1.0:
            ppp = 1.0
        num_loss = int(inputs.shape[2]*inputs.shape[3]*ppp)
        lossyinputs = torch.clone(inputs)
        mask = np.concatenate((np.zeros((inputs.shape[0],num_loss)),np.ones((inputs.shape[0],int(inputs.shape[2]*inputs.shape[3] - num_loss)))),axis=1)
        for i in range(inputs.shape[0]):
            np.random.shuffle(mask[i])
        mask = torch.Tensor(mask.reshape((inputs.shape[0],inputs.shape[1],inputs.shape[2],inputs.shape[3])))
        lossyinputs *= mask
        return lossyinputs  
dloss = DataLoss()

In [7]:
inputs = next(iter(train_loader))[0]
plt.imshow(inputs[0][0],cmap='Greys')
plt.title('raw')
plt.show()
plt.imshow(dloss.random_per_pixel(inputs,0.5)[0][0],cmap='Greys')
plt.title('random_per_pixel')
plt.show()
plt.imshow(dloss.random_per_img(inputs,0.5)[0][0],cmap='Greys')
plt.title('random_per_img')
plt.show()

# Model trained on unaltered data

In [8]:
model_unaltered = BasicClassifier(10)
model_unaltered.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_unaltered.parameters(),lr=3e-5)

In [9]:
widgets = [
    ' [', progressbar.Timer(), '] ',
    progressbar.Percentage(), ' ',
    progressbar.Bar(),
    ' (', progressbar.ETA(), ') ',
]

## Train model_unaltered on Unaltered Data

In [10]:
# training
# find best number of epochs
NUM_EPOCHS = 10
bar = progressbar.ProgressBar(NUM_EPOCHS*len(train_loader),widgets=widgets).start()
for epoch in range(NUM_EPOCHS):
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(train_loader):
        optimizer.zero_grad()
        outputs = model_unaltered(inputs.to(device))
        loss = criterion(outputs, labels.to(device))
        loss.backward()
        optimizer.step()
        # statistics
        # progressbar
        bar.update(epoch*len(train_loader)+i)

## Test model_unaltered on Unaltered Data

In [11]:
# testing
metric.reset_confusion_matrix(10)
bar = progressbar.ProgressBar(len(test_loader),widgets=widgets).start()
with torch.no_grad():
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(test_loader):
        outputs = model_unaltered(inputs.to(device))
        # statistics
        metric.update_confusion_matrix(outputs.to('cpu'),labels)
        # progressbar
        bar.update(i)
m = metric.get_confusion_matrix(norm=True)
sns.heatmap(m,square=True,cmap='Greys')
plt.show()
metric.classification_metrics(print_=True)

## Test model_unaltered on Lossy Data

### DataLoss::random_per_img()

In [13]:
loss_stats = {}
percents = [x/10.0 for x in range(0,10)] + [0.95] # input parameters to DataLoss::random_per_img()
bar = progressbar.ProgressBar(len(percents),widgets=widgets).start()
for cc, ppp in enumerate(percents):
    metric.reset_confusion_matrix(10)
    with torch.no_grad():
        running_loss = 0.0
        for i, (inputs, labels) in enumerate(test_loader):
            outputs = model_unaltered(dloss.random_per_img(inputs,ppp).to(device)) # apply data loss
            # statistics
            metric.update_confusion_matrix(outputs.to('cpu'),labels)
            # progressbar
    bar.update(cc)
    loss_stats[ppp] = metric.classification_metrics()

leg = ['accuracy','precision','recall']
sns.lineplot(x=list(loss_stats.keys()),y=np.array(list(loss_stats.values()),dtype=float)[:,0],) # accuracy
sns.lineplot(x=list(loss_stats.keys()),y=np.array(list(loss_stats.values()),dtype=float)[:,1]) # precision
sns.lineplot(x=list(loss_stats.keys()),y=np.array(list(loss_stats.values()),dtype=float)[:,2]) # recall
plt.legend(leg)
plt.xlabel('% loss')
plt.title('Baseline Model trained on unaltered data DataLoss::random_per_img()')
plt.show()

# Feature Map
https://androidkt.com/how-to-visualize-feature-maps-in-convolutional-neural-networks-using-pytorch/

## Feature Map without DataLoss

In [14]:
# (inputs,labels) = next(iter(test_loader))
# res_full = metric.feature_map(inputs[0:1],model,print_=True)
# print(model(inputs[0:1].to(device)).to('cpu').argmax())

## Feature Map with DataLoss

In [15]:
# (inputs,labels) = next(iter(test_loader))
# loss_inputs = dloss.random_per_img(inputs[0:1],.75)
# res_dloss = metric.feature_map(loss_inputs,model,print_=True)
# print(model(loss_inputs.to(device)).to('cpu').argmax())

# Model Trained on Lossy Data

In [16]:
model_lossy_1 = BasicClassifier(10)
model_lossy_1.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_lossy_1.parameters(),lr=3e-5)

## Train model_lossy_1 on Lossy Data : loss = 50%

In [17]:
# training
# find best number of epochs
NUM_EPOCHS = 10
bar = progressbar.ProgressBar(NUM_EPOCHS*len(train_loader),widgets=widgets).start()
for epoch in range(NUM_EPOCHS):
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(train_loader):
        optimizer.zero_grad()
        outputs = model_lossy_1(dloss.random_per_img(inputs,0.5).to(device))
        loss = criterion(outputs, labels.to(device))
        loss.backward()
        optimizer.step()
        # statistics
        # progressbar
        bar.update(epoch*len(train_loader)+i)

## Test model_lossy_1 on Lossy Data

In [18]:
# Testing on DataLoss::random_per_pixel()
loss_stats = {}
percents = [x/10.0 for x in range(10)] + [0.95,0.99] # input parameters to DataLoss::random_ppp()
bar = progressbar.ProgressBar(len(percents),widgets=widgets).start()
for cc, ppp in enumerate(percents):
    metric.reset_confusion_matrix(10)
    with torch.no_grad():
        running_loss = 0.0
        for i, (inputs, labels) in enumerate(test_loader):
            outputs = model_lossy_1(dloss.random_per_img(inputs,ppp).to(device)) # apply data loss
            # statistics
            metric.update_confusion_matrix(outputs.to('cpu'),labels)
            # progressbar
    bar.update(cc)
    loss_stats[ppp] = metric.classification_metrics()

leg = ['accuracy','precision','recall']
sns.lineplot(x=list(loss_stats.keys()),y=np.array(list(loss_stats.values()),dtype=float)[:,0],) # accuracy
sns.lineplot(x=list(loss_stats.keys()),y=np.array(list(loss_stats.values()),dtype=float)[:,1]) # precision
sns.lineplot(x=list(loss_stats.keys()),y=np.array(list(loss_stats.values()),dtype=float)[:,2]) # recall
plt.legend(leg)
plt.xlabel('% loss')
plt.title('Baseline Model trained on lossy data DataLoss::random_per_img()')
plt.show()

## Test model_lossy_1 on Unaltered Data

In [19]:
# Testing on DataLoss::random_per_pixel()
metric.reset_confusion_matrix(10)
bar = progressbar.ProgressBar(len(test_loader),widgets=widgets).start()
with torch.no_grad():
    running_loss = 0.0
    for i, (inputs, labels) in enumerate(test_loader):
        outputs = model_lossy_1(inputs.to(device))
        # statistics
        metric.update_confusion_matrix(outputs.to('cpu'),labels)
        # progressbar
        bar.update(i)
m = metric.get_confusion_matrix(norm=True)
sns.heatmap(m,square=True,cmap='Greys')
plt.show()
metric.classification_metrics(print_=True)