In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
import numpy as np

import matplotlib.pyplot as plt

In [None]:
import urllib.request

training = (
    "http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz",
    "http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz"
)

testing = (
    "http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz",
    "http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz"
)

def dl_mnist(ds, prefix):
    img_url, lab_url = ds
    print(f"Downloading {prefix}")
    urllib.request.urlretrieve(img_url, f'./{prefix}-img')
    urllib.request.urlretrieve(lab_url, f'./{prefix}-lab')
    print("Done")

In [None]:
dl_mnist(training, "train")

In [None]:
dl_mnist(testing, "test")

In [None]:
from torch.utils.tensorboard import SummaryWriter

# default `log_dir` is "runs" - we'll be more specific here
writer = SummaryWriter('runs/mnist')

In [None]:
import struct
import gzip

class MnistDataset(Dataset):
    def __init__(self, imgfile, labelfile, transform=None):
        self.imgfile = imgfile
        self.labelfile = labelfile
        self.images, self.labels = self.read_dataset(imgfile, labelfile)
        self.transform = transform
    
    def __len__(self):
        return self.images.shape[0]
    
    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        sample = {'image': self.images[idx], 'label': self.labels[idx]}
        
        if self.transform:
            sample = self.transform(sample)
        return sample

    def read_dataset(self, imgname, labname):
        import struct
        import gzip
        X = []
        y = []
        with gzip.open(imgname, "rb") as img, gzip.open(labname, "rb") as labs:
            img_header = struct.unpack(">4i", img.read(16))
            lab_header = struct.unpack(">2i", labs.read(8))

            img_size = img_header[2] * img_header[3]

            for i in range(img_header[1]):
                image = struct.unpack(f"{img_size}B", img.read(img_size))
                label = struct.unpack("B", labs.read(1))
                image = np.array(image).reshape((28, 28))
                X.append([image])
                y.append(label[0])
            X = np.array(X, dtype="float32")
        return X, np.array(y)

In [None]:
class negative(object):
    """Rescale values between 0 and 1"""
    
    def __call__(self, sample):
        image, label = sample['image'], sample['label']
        
        return {'image': image, 'label': label}

class ToTensor(object):
    """Convert ndarrays in sample to Tensors."""

    def __call__(self, sample):
        image, label = sample['image'], sample['label']

        return {'image': torch.from_numpy(image),
                'label': label}
    
compose = transforms.Compose([ToTensor()])

In [None]:
train = MnistDataset("train-img", "train-lab", transform=compose)

In [None]:
test = MnistDataset("test-img", 'test-lab')

In [None]:
train_dataloader = DataLoader(train, batch_size=1000, shuffle=True)

In [None]:
test_dataloader = DataLoader(test, shuffle=True, batch_size=10)

In [None]:
# get some random training images
dataiter = iter(train_dataloader).next()
images, labels = dataiter['image'][:10], dataiter['label'][:10]

img_grid = utils.make_grid(images, nrow=5)
# write to tensorboard
writer.add_image('mnist_images', img_grid)

In [None]:
for i, data in enumerate(train_dataloader):
    images, labels = data.values()
    break
    
# get the class labels for each image
class_labels = labels
# log embeddings
features = images.view(-1, 28 * 28)

writer.add_embedding(features,
                     metadata=class_labels,
                     label_img=images)
writer.close()

In [None]:
def print_data(x, y):
    img = x.numpy()[0]
    plt.imshow(img, cmap='Greys')
    plt.xlabel(f"Label: {y.item()}")

In [None]:
print_data(images[0], labels[0])

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(28 * 28, 120)
        self.fc2 = nn.Linear(120, 10)

    def forward(self, x):
        x = x.view(-1, 28 * 28)
        x = torch.sigmoid(self.fc1(x))
        x = F.softmax(self.fc2(x), dim=1)
        return x

net = Net()

In [None]:
import torch.optim as optim

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.1, momentum=0.9)

In [None]:
def accuracy(dataloader):
    correct = 0
    total = 0
    with torch.no_grad():
        for i, data in enumerate(dataloader):
            images, labels = data['image'], data['label']
            out = net(images)
            out = torch.argmax(out, dim=1)
            total += labels.shape[0]
            correct += (out == labels).sum().item()
    return correct / total

In [None]:
epoch = 10
writer = SummaryWriter('runs/MLP')
running_loss = 0
for e in range(epoch):
    for i, data in enumerate(train_dataloader):
        images, labels = data['image'], data['label']
        optimizer.zero_grad()
        
        out = net(images)
        loss = criterion(out, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        if i % 50 == 49:
            writer.add_scalar("accuracy", accuracy(test_dataloader))
            writer.add_scalar("loss", running_loss / 50)
            running_loss = 0
    print("Epoch:", e + 1, "/", epoch)
print("Training complete")
writer.close()

In [None]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 4 * 4, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, 10)

    def forward(self, x):
        print(x.shape)
        x = self.pool(F.relu(self.conv1(x)))
        print(x.shape)
        x = self.pool(F.relu(self.conv2(x)))
        print(x.shape)
        x = x.view(-1, 16 * 4 * 4)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

        
cnn = CNN()

In [None]:
import torch.optim as optim

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.1, momentum=0.9)

In [None]:
epoch = 10
writer = SummaryWriter('runs/CNN')
running_loss = 0.0
for e in range(epoch):
    for i, data in enumerate(train_dataloader):
        images, labels = data['image'], data['label']
        optimizer.zero_grad()
        
        out = net(images)
        loss = criterion(out, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        if i % 50 == 49:
            writer.add_scalar("accuracy", accuracy(test_dataloader))
            writer.add_scalar("loss", running_loss / 50)
            running_loss = 0
    print("Epoch:", e + 1, "/", epoch)
print("Training complete")
writer.close()