In [None]:
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor

import tensorflow as tf
import numpy as np
import matplotlib.pyplot as plt
import torch
import random
import torch.nn.functional as F
from sklearn.utils import shuffle
import seaborn as sns
import pandas as pd
import torch.nn as nn
from torch import optim
from torch.autograd import Variable

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cpu')

In [None]:
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()

y_test = y_test.flatten()
y_test = tf.one_hot(y_test.astype(np.int32), depth=10)
y_test = torch.tensor(np.array(y_test), dtype=int)

In [None]:
# Normalize the pixel values to be between 0 and 1

x_test = x_test.astype("float32") / 255.0
x_test = torch.tensor(x_test)

In [None]:
train_data = datasets.MNIST(
    root = 'data',
    train = True,
    transform = ToTensor(),
    download = True,
)
test_data = datasets.MNIST(
    root = 'data',
    train = False,
    transform = ToTensor()
)

In [None]:
# loaders = {
#     'train' : torch.utils.data.DataLoader(train_data,
#                                           batch_size=100,
#                                           shuffle=True,
#                                           num_workers=1),

#     'test'  : torch.utils.data.DataLoader(test_data,
#                                           batch_size=100,
#                                           shuffle=True,
#                                           num_workers=1),
# }

In [None]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(
                in_channels=1,
                out_channels=16,
                kernel_size=5,
                stride=1,
                padding=2,
            ),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2),
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(16, 32, 5, 1, 2),
            nn.ReLU(),
            nn.MaxPool2d(2),
        )
        self.out = nn.Linear(32 * 7 * 7, 11)

        self.softmax_fn = torch.nn.Softmax(dim=0)


    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = x.view(x.size(0), -1)
        output = self.out(x)
        return output, x

    def fit(self, num_epochs, loaders, expert):
        self.train()
        optimizer = optim.Adam(self.parameters(), lr = 0.001)

        total_step = len(loaders['train'])
        for epoch in range(num_epochs):
            for i, (images, labels) in enumerate(loaders['train']):
                b_x = Variable(images)   # batch x
                b_y = Variable(labels)   # batch y
                logits = self.forward(b_x)[0]

                expert_b = expert.predict(b_y)
                loss_ex = 0
                pred_loss = 0
                for i, curr_y in enumerate(b_y):
                    if curr_y == expert_b[i]:
                        loss_ex += -torch.log(self.softmax_fn(logits[i])[-1])

                for j, logit in enumerate(logits):
                    pred_loss += -torch.log(self.softmax_fn(logit)[b_y[j]])

                loss = pred_loss + loss_ex

                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            # Print epoch loss after each epoch
            print ('Epoch [{}/{}], Loss: {:.4f}'
                   .format(epoch + 1, num_epochs, loss.item()))

        pass

    def predict(self, image):
        self.eval()
        with torch.no_grad():
            image = image.unsqueeze(0)
            output, _ = self.forward(image)
            _, predicted_class = torch.max(output.data, 1)
        return predicted_class.item()



In [None]:
class synth_expert:

    def __init__(self, k, n_classes):
        self.k = k
        self.n_classes = n_classes

    def predict(self, true_label):

        out = [0] * len(true_label)

        for i in range(len(true_label)):
            if true_label[i] <= self.k:
               out[i] = true_label[i]
            else:
               out[i] = random.randint(0, self.n_classes - 1)

        return torch.tensor(out)

In [None]:
# model1 = CNN()
# expert_dumb = synth_expert(0, 10)
# model1.fit(20, loaders, expert_dumb)

In [None]:
# model2 = CNN()
# expert = synth_expert(5, 10)
# model2.fit(20, loaders, expert)

In [None]:
# model3 = CNN()
# expert_good = synth_expert(10, 10)
# model3.fit(20, loaders, expert_good)

In [None]:
# torch.save(model1, '/content/drive/My Drive/model1_lr.pt')
# torch.save(model2, '/content/drive/My Drive/model2_lr.pt')
# torch.save(model3, '/content/drive/My Drive/model3_lr.pt')

In [None]:
from google.colab import drive
drive.mount('/content/drive')
model1 = torch.load('/content/drive/My Drive/model1_lr.pt')
model2 = torch.load('/content/drive/My Drive/model2_lr.pt')
model3 = torch.load('/content/drive/My Drive/model3_lr.pt')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
counter = 0
defer = 0
for i, (image, label) in enumerate(test_data):
    predicted = model2.predict(image)
    if predicted == 10:
       defer += 1
    elif predicted == label:
       counter += 1

In [None]:
import tensorflow_datasets as tfds

mnist_corrupted = tfds.load('mnist_corrupted/impulse_noise', split='test', as_supervised=True)
mnist_shot_raw = tfds.load('mnist_corrupted/shot_noise', split='test', as_supervised=True)
mnist_spatter_raw = tfds.load('mnist_corrupted/spatter', split='test', as_supervised=True)

In [None]:
def dataset_to_numpy(ds):
    """
    Convert tensorflow dataset to numpy arrays
    """
    images = []
    labels = []

    # Iterate over a dataset
    for i, (image, label) in enumerate(tfds.as_numpy(ds)):
        images.append(image)
        labels.append(label)
    return images, labels

In [None]:
def torch_data(dataset):
    im, labels = dataset_to_numpy(dataset)
    aba = np.array(im)
    bc = aba.astype("float32") / 255.0
    bc = bc.reshape(10000, 28, 28)
    return torch.from_numpy(bc), torch.tensor(labels)

In [None]:
def testmodel(model, x, y):
    counter = 0
    def_counter = 0
    pred_counter = 0
    for i in range(len(x)):
        pred = model.predict(x[i].unsqueeze(0))
        if pred != 10:
          pred_counter += 1
        else:
          def_counter += 1
        if y[i].item() == pred:
          counter += 1
    a = counter / pred_counter
    b = def_counter / len(x)
    # print("accuracy: ", a)
    # print("defer rate: ", b)
    return a, b

In [None]:
mnist_impulse, mnist_impulse_labels = torch_data(mnist_corrupted)

mnist_shot, mnist_shot_labels = torch_data(mnist_shot_raw)

mnist_spatter, mnist_spatter_labels = torch_data(mnist_spatter_raw)


In [None]:
x_test, y_test = zip(*test_data)

In [None]:
testmodel(model1, mnist_spatter, mnist_spatter_labels)

(0.9846730975348339, 0.067)

In [None]:
a = list(zip(mnist_impulse, mnist_impulse_labels))
b = list(zip(x_test, y_test))

In [None]:
random_samples = random.sample(a, 2000)
random_samples2 = random.sample(b, 8000)
total = random_samples + random_samples2

In [None]:
x, y = zip(*total)

In [None]:
x[0]

torch.Size([28, 28])

In [None]:
testmodel(model1, x, y)

RuntimeError: ignored