In [None]:
import torch
import torchvision

In [None]:
import numpy as np

In [None]:
# data loader for batches

In [None]:
from torchvision import transforms
transform = transforms.Compose([ # reinitialize dataset object
    transforms.ToTensor()
])

In [None]:
# train data
mnist = torchvision.datasets.MNIST('', download = True, transform = transform)

In [None]:
mnist[0][0]

In [None]:
data_loader = torch.utils.data.DataLoader(mnist,
                                          batch_size=64,
                                          shuffle=True,
                                          num_workers=0)

In [None]:
# test data
mnist_test = torchvision.datasets.MNIST('', train=False, transform=transform)
test_loader = torch.utils.data.DataLoader(mnist_test,
                                          batch_size=64,
                                          shuffle=True,
                                          num_workers=0)

In [None]:
for b in data_loader:
  print(b[0].shape)
  break # outputs 4 images and 4 labels
  # b[0].shape is 4 by 1 by 28 by 28, if rgb, would be 4 by 3 by 28 by 28 but this is grayscale so there's only one pixel intensity (one channel)

torch.Size([4, 1, 28, 28])


In [None]:
import torch.nn as nn
import torch.nn.functional as F

In [None]:
class MLP(nn.Module):
    def __init__(self, input_dim, output_dim):
        super().__init__()

        self.input_fc = nn.Linear(input_dim, 500)
        self.hidden_fc = nn.Linear(500, 300)
        self.hidden_fc1 = nn.Linear(300, 100)
        self.output_fc = nn.Linear(100, output_dim)

    def forward(self, x):

        # x = [batch size, height, width]

        batch_size = x.shape[0]

        x = x.view(batch_size, -1) # -1 reshapes it to 4 x something, it figures out the something on its own (height x width)

        # x = [batch size, height * width]

        h_1 = F.relu(self.input_fc(x))

        # h_1 = [batch size, 250]

        h_2 = F.relu(self.hidden_fc(h_1))

        # h_2 = [batch size, 100]

        h_3 = F.relu(self.hidden_fc1(h_2))

        y_pred = self.output_fc(h_3)

        # y_pred = [batch size, output dim]

        return y_pred

In [None]:
data, labels = next(iter(data_loader)) # like printing b after for b in data_loader, this is a batch

In [None]:
print(data.shape)

torch.Size([4, 1, 28, 28])


In [None]:
device = 'cuda'

In [None]:
m = MLP(28*28, 10)
m = m.to(device)

In [None]:
logits = m(data)

In [None]:
labels

tensor([4, 8, 4, 8])

In [None]:
F.softmax(logits, dim = -1).sum(axis = 1) # check if each 10 sums to 1

tensor([1.0000, 1.0000, 1.0000, 1.0000], grad_fn=<SumBackward1>)

In [None]:
soft = F.softmax(logits, dim = -1)

In [None]:
soft

tensor([[0.0977, 0.1010, 0.0986, 0.1067, 0.0911, 0.1055, 0.1074, 0.1003, 0.1007,
         0.0910],
        [0.0971, 0.1010, 0.0984, 0.1083, 0.0907, 0.1069, 0.1063, 0.1000, 0.0989,
         0.0925],
        [0.0965, 0.1011, 0.0974, 0.1082, 0.0910, 0.1071, 0.1070, 0.1002, 0.0995,
         0.0919],
        [0.0971, 0.1006, 0.0977, 0.1067, 0.0916, 0.1058, 0.1070, 0.1011, 0.1007,
         0.0916]], grad_fn=<SoftmaxBackward0>)

In [None]:
-soft[torch.arange(4),labels].log().mean() # loss

tensor(2.3503, grad_fn=<NegBackward0>)

In [None]:
print(labels)

tensor([4, 8, 4, 8])


In [None]:
# 1 iteration

optimizer = optim.AdamW(m.parameters())

ce = nn.CrossEntropyLoss()

m.train()
optimizer.zero_grad()
y_pred = m(data)
loss = ce(y_pred, labels)

loss.backward()
optimizer.step()

In [None]:
loss

tensor(2.3097, device='cuda:0', grad_fn=<NllLossBackward0>)

In [None]:
# testing data
test_data = torchvision.datasets.MNIST('',
                           download=True,
                           train = False,
                           transform=transform)

test_loader = torch.utils.data.DataLoader(test_data,
                                          batch_size=4,
                                          shuffle=True,
                                          num_workers=0)

MLP Training and Testing

In [None]:
m = MLP(28*28, 10)
m.to(device)
optimizer = optim.AdamW(m.parameters())
ce = nn.CrossEntropyLoss()

In [None]:
for data, labels in data_loader:
    m.train()
    data, labels = next(iter(data_loader))
    data = data.to(device)
    labels = labels.to(device)

    optimizer.zero_grad()
    y_pred = m(data)
    loss = ce(y_pred, labels)

    loss.backward()
    optimizer.step()

In [None]:
num_samples = 0
num_correct = 0
for data, labels in test_loader:
  data = data.to(device)
  labels = labels.to(device)
  num_samples += len(labels)
  preds = torch.argmax(m(data), dim=-1)
  num_correct += sum(preds == labels)

num_correct/num_samples # testing accuracy

tensor(0.9580, device='cuda:0')

In [None]:
! pip install einops

Collecting einops
  Downloading einops-0.7.0-py3-none-any.whl (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.6/44.6 kB[0m [31m688.8 kB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: einops
Successfully installed einops-0.7.0


In [None]:
import einops

# Making Vision Transformer

In [None]:
def patchify(images, n_patches_per_row):
    n, c, h, w = images.shape

    assert h == w, "Patchify method is implemented for square images only"

    patches = torch.zeros(n, n_patches_per_row ** 2, c * h * w // n_patches_per_row ** 2)
    patch_size = h // n_patches_per_row

    for idx, image in enumerate(images):
        for i in range(n_patches_per_row):
            for j in range(n_patches_per_row):
                patch = image[:, i * patch_size: (i + 1) * patch_size, j * patch_size: (j + 1) * patch_size]
                patches[idx, i * n_patches_per_row + j] = patch.flatten()

    # output_patch = einops.rearrange(patches, 'b h w -> b w h')

    return patches


In [None]:
patches = patchify(data, 14)
patches.shape

torch.Size([4, 196, 4])

In [None]:
class MLP(nn.Module):
  def __init__(self, embed_dim=16):
    super().__init__()
    self.up = nn.Linear(embed_dim, embed_dim*4)
    self.relu = nn.ReLU()
    self.down = nn.Linear(embed_dim*4, embed_dim)

  def forward(self, x):
    x = self.up(x)
    x = self.relu(x)
    x = self.down(x)
    return x

class Block(nn.Module):
  def __init__(self, embed_dim=16, num_heads=4):
    super().__init__()
    self.q = nn.Linear(embed_dim, embed_dim)
    self.k = nn.Linear(embed_dim, embed_dim)
    self.v = nn.Linear(embed_dim, embed_dim)

    self.mha = nn.MultiheadAttention(embed_dim, num_heads, batch_first=True)
    self.mlp = MLP(embed_dim)

  def forward(self, x):
    q, k, v = self.q(x), self.k(x), self.v(x)
    attn_output, _ = self.mha(q, k, v)
    x = x + attn_output
    x = self.mlp(x)
    return x

In [None]:
class VIT(nn.Module):
    def __init__(self, input_dim, output_dim, n_patches, hidden_d = 32, blocks=3, device="cuda"):
        super().__init__()

        self.chw = input_dim # (C, H, W) 1 x 28 x 28
        self.n_patches = n_patches
        self.hidden_d = hidden_d
        self.blocks = blocks
        self.device = device

        assert self.chw[1] % n_patches == 0, "Input shape not entirely divisible by number of patches"
        assert self.chw[2] % n_patches == 0, "Input shape not entirely divisible by number of patches"
        self.patch_size = (self.chw[1] / n_patches, self.chw[2] / n_patches)

        # 1) Linear mapper
        self.input_d = int(self.chw[0] * self.patch_size[0] * self.patch_size[1])
        self.linear_mapper = nn.Linear(self.input_d, self.hidden_d)

        # 2) Adding Classification Token that is learned by our model
        self.class_token = nn.Parameter(torch.rand(1, self.hidden_d))

        # 3) Positional encoding
        self.pos_emb = nn.Embedding((n_patches**2)+1, self.hidden_d)

        self.blocks = nn.ModuleList([
            Block(self.hidden_d)
            for _ in range(self.blocks)
        ])

        self.classification_head = nn.Linear(self.hidden_d, output_dim)

    def patchify(self, images, n_patches_per_row):
      n, c, h, w = images.shape

      assert h == w, "Patchify method is implemented for square images only"

      patches = torch.zeros(n, n_patches_per_row ** 2, c * h * w // n_patches_per_row ** 2)
      patch_size = h // n_patches_per_row

      for idx, image in enumerate(images):
          for i in range(n_patches_per_row):
              for j in range(n_patches_per_row):
                  patch = image[:, i * patch_size: (i + 1) * patch_size, j * patch_size: (j + 1) * patch_size]
                  patches[idx, i * n_patches_per_row + j] = patch.flatten()

      # output_patch = einops.rearrange(patches, 'b h w -> b w h')

      return patches.to(self.device)

    def tokenize(self, x):
        patches = self.patchify(x, self.n_patches) # 4 x 49 x 16
        N, T, D = patches.shape
        # x = [batch size, height, width]

        tokens = self.linear_mapper(patches) # 4 x 49 x 8

        tokens = torch.cat([self.class_token.repeat(N, 1, 1), tokens], dim=1)
        # appended CLS token


        # expected shape: [4, 50, 16] = [N, T, D]

        pos_embed = self.pos_emb(torch.arange(0,T+1).to(self.device)) # shape: [50, 16]
        pos_embed = pos_embed.unsqueeze(0).repeat(N, 1, 1)

        tokens += pos_embed

        return tokens

    def forward(self, x): # b x c x h x w - batch size x 1 x 28 x 28, turn into output:
        # sequence of embeddings for each batch, b x s x d, s is # patches, d is dimensionality
        # batch size x 49 x dimension of embeddings
        x = self.tokenize(x)
        for block in self.blocks:
          x = block(x)

        cls_tokens = x[:, 0, :] # shape [N, 1, D]
        # cls_tokens = cls_tokens.squeeze(1) # shape [N, D]
        out = self.classification_head(cls_tokens)
        return out

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

model = VIT(
    input_dim=(1, 28, 28), output_dim=10, n_patches=7, hidden_d=16, blocks=2, device=device
)

model = model.to(device)

In [None]:
device

'cuda'

In [None]:
test_input = torch.randn(4, 1, 28, 28)
model(test_input).shape

torch.Size([4, 10])

In [None]:
tokens.shape

torch.Size([4, 16])

In [None]:
# create two sample vectors
N, T, D = 4, 49, 16
inps = test_sequence = torch.randn(N, T, D)
d_orig = test_embed = torch.randn(1, D).unsqueeze(0)
d_repeat = d_orig.repeat(N, 1, 1)

a = torch.cat([d_repeat, inps], dim=1)

## MLP Part

In [None]:
import torch.optim as optim

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

model = VIT(
    input_dim=(1, 28, 28), output_dim=10, n_patches=7, hidden_d=16, blocks=2, device=device
)

model = model.to(device)

In [None]:
optimizer = optim.AdamW(model.parameters(), lr=1e-3)
ce = nn.CrossEntropyLoss()

In [None]:
losses = []

for idx, (data, labels) in enumerate(data_loader):
  model.train() # puts in training mode
  data, labels = next(iter(data_loader))

  data = data.to(device)
  labels = labels.to(device)
  optimizer.zero_grad()
  y_pred = model(data)
  loss = ce(y_pred, labels)
  losses.append(loss)
  if idx%10==0:
    print(f'step: {idx}, avg loss: {sum(losses)/len(losses)}')

  loss.backward()
  optimizer.step()

step: 0, avg loss: 2.3102006912231445
step: 10, avg loss: 2.3148818016052246
step: 20, avg loss: 2.3085780143737793
step: 30, avg loss: 2.3091354370117188
step: 40, avg loss: 2.3076868057250977
step: 50, avg loss: 2.3079912662506104
step: 60, avg loss: 2.307399034500122
step: 70, avg loss: 2.3069472312927246
step: 80, avg loss: 2.3038196563720703
step: 90, avg loss: 2.297553062438965
step: 100, avg loss: 2.277010202407837
step: 110, avg loss: 2.2474889755249023
step: 120, avg loss: 2.2202906608581543
step: 130, avg loss: 2.186349868774414
step: 140, avg loss: 2.147282361984253
step: 150, avg loss: 2.108994960784912
step: 160, avg loss: 2.072641134262085
step: 170, avg loss: 2.037958860397339
step: 180, avg loss: 2.0023574829101562
step: 190, avg loss: 1.9655821323394775
step: 200, avg loss: 1.9387181997299194
step: 210, avg loss: 1.9058629274368286
step: 220, avg loss: 1.8763759136199951
step: 230, avg loss: 1.8485597372055054
step: 240, avg loss: 1.8223530054092407
step: 250, avg loss

In [None]:
a.shape

torch.Size([16, 197, 16])

In [None]:
a[0,0,:]

tensor([-0.9394,  1.7848, -0.3612, -0.5182,  0.6462,  0.0430,  0.6311,  0.1861,
         0.0152,  0.2347,  1.8798, -0.9534, -1.2178, -1.0636, -0.5045, -0.1788])

In [None]:
d_repeat[0,0]

tensor([-0.9394,  1.7848, -0.3612, -0.5182,  0.6462,  0.0430,  0.6311,  0.1861,
         0.0152,  0.2347,  1.8798, -0.9534, -1.2178, -1.0636, -0.5045, -0.1788])

In [None]:
d_orig[0]

tensor([[-0.9394,  1.7848, -0.3612, -0.5182,  0.6462,  0.0430,  0.6311,  0.1861,
          0.0152,  0.2347,  1.8798, -0.9534, -1.2178, -1.0636, -0.5045, -0.1788]])

In [None]:
test_sequence.shape

torch.Size([4, 49, 16])

In [None]:
import einops

In [None]:
patch = patchify(data, 4)

In [None]:
patch_alyssa = patchify(data , 7)
patch_alyssa.shape

torch.Size([4, 49, 16])

In [None]:
patch.shape

torch.Size([4, 49, 16])

In [None]:
patch =

In [None]:
patch.shape

torch.Size([4, 49, 16])

### Test Accuracy


In [None]:
preds = []
true_vals = []

# get logits (probs) get arg max
testing = 0
for idx, (data, labels) in enumerate(test_loader):

  # testing += 1

  data, labels = next(iter(test_loader))

  data = data.to(device)
  labels = labels.to(device)

  true_vals.extend([x.item() for x in labels]) # save true labels
  with torch.no_grad():
    y_pred = model(data)
  predictions = torch.argmax(y_pred, dim=1).cpu().numpy().tolist()
  preds.extend(predictions)

  # if (testing == 10):
    # break

In [None]:
len(true_vals)

10048

In [None]:
preds[0]

4

In [None]:
sum(
    [
        preds[i] == true_vals[i]
        for i in range(len(preds))
    ]
)/len(preds)

0.8072253184713376

testing accuracy is 80%

# Saving model into pickle file

In [None]:
import pickle

In [None]:
with open("model.pkl", "wb") as f:
     pickle.dump(model, f)

PicklingError: ignored

In [None]:
from joblib import dump, load

# Save the model
dump(model, 'model.joblib')

PicklingError: ignored

In [None]:
torch.save(model.state_dict(), 'model.pth')

In [None]:
model.load_state_dict(torch.load('model.pth', map_location=torch.device('cpu')))

<All keys matched successfully>

In [None]:
test_loader2 = torch.utils.data.DataLoader(mnist_test,
                                          batch_size=1,
                                          shuffle=True,
                                          num_workers=0)

In [None]:
data[0][0]

tensor([[0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
         0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
         0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
         0.0000],
        [0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
         0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
         0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
         0.0000],
        [0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
         0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
         0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
         0.0000],
        [0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
         0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000,
         0.0000, 0.0000, 0.0000, 0.0000, 0.0000, 0.0000

In [None]:
preds = []
true_vals = []

# get logits (probs) get arg max
testing = 0

for idx, (data, labels) in enumerate(test_loader2):

  testing += 1

  data, labels = next(iter(test_loader2))

  data = data.to(device)
  labels = labels.to(device)


  true_vals.extend([x.item() for x in labels]) # save true labels
  with torch.no_grad():
    y_pred = model(data)
  predictions = torch.argmax(y_pred, dim=1).cpu().numpy().tolist()
  print(y_pred)
  preds.extend(predictions)

  if (testing == 10):
    break

tensor([[ 0.8287, -2.0055,  1.7437,  2.2655, -2.8121,  2.7288,  0.4494, -2.3902,
          5.8704, -0.4977]], device='cuda:0')
tensor([[ 0.2588, -0.2310,  2.6480,  6.6356, -2.9453,  4.9725, -4.4896, -0.7791,
          3.3479, -0.3488]], device='cuda:0')
tensor([[-5.2552,  9.0690,  2.1111,  2.5399, -3.4231,  0.6679, -3.9700,  3.6825,
          1.8155,  0.0938]], device='cuda:0')
tensor([[ 7.6219, -6.2419,  3.2462, -1.5795,  0.8101,  1.7498,  4.8904, -4.6103,
          1.1583, -3.5272]], device='cuda:0')
tensor([[-2.5683,  2.2977,  0.8782,  0.8913, -0.7453,  2.5208, -1.4932,  2.3574,
          1.5868, -0.2228]], device='cuda:0')
tensor([[ 2.7210, -5.1902,  2.0368, -3.5298,  2.0482, -0.8861,  5.6995, -2.2616,
          0.1933, -1.9242]], device='cuda:0')
tensor([[-0.9083, -0.6750,  6.0460, -2.5448, -1.5347, -3.3658,  5.6901,  0.1635,
          0.9237, -4.1021]], device='cuda:0')
tensor([[ 2.4621, -5.8862, -1.6482,  2.0978,  1.6325,  5.4438, -2.1264, -1.8626,
          2.0578,  2.3143]], d

In [None]:
preds

[8, 3, 1, 0, 5, 6, 2, 5, 5, 4]

In [None]:
true_vals

[8, 9, 1, 0, 3, 4, 2, 3, 5, 4]