In [1]:
import torch
import torchvision
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
import torchvision.datasets as datasets
import torchvision.transforms as transforms


In [2]:
# Set device
if torch.cuda.is_available():
    device = "gpu"
elif torch.has_mps:
    device = "mps"
else:
    device = "cpu"
device = torch.device(device)
print(device)


mps


# RNN

In [3]:
# Hyperparameters
input_size = 28  # Number of features for each time-step but we don't have to explicitely say this
sequence_length = 28
num_layers = 2
num_classes = 10
hidden_size = 256  # Number of nodes in each time-step
learning_rate = 0.001
batch_size = 64
num_epochs = 2


In [4]:
# Create a basic RNN
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes) -> None:
        super(RNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.rnn = nn.RNN(
            input_size, hidden_size, num_layers, batch_first=True
        )  # we have batches as the first axis (0)
        # N(batch_size) x time_seq x features
        self.fc = nn.Linear(
            hidden_size * sequence_length, num_classes
        )  # We have 28 time-seuqences (time-steps) and we concatenate all of those sequence as send into linear layer. So linear layer uses information from every hidden state. We could take the absolute last hidden sates. I have it later.

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(
            device
        )  # number of layers, number of mini-batches we send in at the same time, hidden size

        # Forward prop
        out, _ = self.rnn(
            x, h0
        )  # second output is hidden state, since every example has its own hidden state, we ignore that output
        out = out.reshape(
            out.shape[0], -1
        )  # keep the batch as the first axis and concat everything else (28*256)
        out = self.fc(out)
        return out


In [5]:
# Load data
train_dataset = datasets.MNIST(
    root="dataset/", train=True, transform=transforms.ToTensor(), download=True
)
test_dataset = datasets.MNIST(
    root="dataset/", train=False, transform=transforms.ToTensor(), download=True
)
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, shuffle=True)


In [6]:
# Initialize network
model = RNN(input_size, hidden_size, num_layers, num_classes).to(device)
model


RNN(
  (rnn): RNN(28, 256, num_layers=2, batch_first=True)
  (fc): Linear(in_features=7168, out_features=10, bias=True)
)

In [7]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)


In [8]:
criterion


CrossEntropyLoss()

In [9]:
optimizer


Adam (
Parameter Group 0
    amsgrad: False
    betas: (0.9, 0.999)
    capturable: False
    eps: 1e-08
    foreach: None
    lr: 0.001
    maximize: False
    weight_decay: 0
)

In [10]:
# This cell took 3 minutes for 2 epochs on MacBook Pro (16-inch, 2021) Apple M1 Pro 16 GB

# Train Network loop
for epoch in range(num_epochs):
    for batch_idx, (data, targets) in enumerate(train_loader):
        # Get the data to CUDA of MPS if possible
        data = data.to(device=device).squeeze(1)  # Nx1x28x28 -> Nx28x28
        targets = targets.to(device=device)

        # Forward
        scores = model(data)
        loss = criterion(scores, targets)

        # Backward
        optimizer.zero_grad()
        loss.backward()

        # Gardient descent or adam step
        optimizer.step()


In [11]:
# Check accuracy on training data and test to see how good our model is
def check_accuracy(loader, model):
    if loader.dataset.train:
        print("Checking accuracy on training data.")
    else:
        print("Checking accuracy on test data.")

    num_correct = 0
    num_samples = 0
    model.eval()

    with torch.no_grad():
        for x, y in loader:
            x = x.to(device=device).squeeze(1)
            y = y.to(device=device)
            # x = x.reshape(x.shape[0], -1)

            scores = model(x)
            _, predictions = scores.max(1)
            num_correct += (predictions == y).sum()
            num_samples += predictions.size(0)

        print(
            f"Got {num_correct} / {num_samples} with accuracy \
            {float(num_correct)/float(num_samples)*100:.2f}"
        )

    model.train()


In [12]:
# This cell took 30 seconds for 10 epochs on MacBook Pro (16-inch, 2021) Apple M1 Pro 16 GB

check_accuracy(train_loader, model)
check_accuracy(test_loader, model)


Checking accuracy on training data.
Got 57981 / 60000 with accuracy             96.64
Checking accuracy on test data.
Got 9656 / 10000 with accuracy             96.56


# GRU 

In [13]:
# Hyperparameters
input_size = 28  # Number of features for each time-step but we don't have to explicitely say this
sequence_length = 28
num_layers = 2
num_classes = 10
hidden_size = 256  # Number of nodes in each time-step
learning_rate = 0.001
batch_size = 64
num_epochs = 2


In [14]:
# Create a basic GRU
class GRU(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes) -> None:
        super(GRU, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.gru = nn.RNN(
            input_size, hidden_size, num_layers, batch_first=True
        )  # we have batches as the first axis (0)
        # N(batch_size) x time_seq x features
        self.fc = nn.Linear(
            hidden_size * sequence_length, num_classes
        )  # We have 28 time-seuqences (time-steps) and we concatenate all of those sequence as send into linear layer. So linear layer uses information from every hidden state. We could take the absolute last hidden sates. I have it later.

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(
            device
        )  # number of layers, number of mini-batches we send in at the same time, hidden size

        # Forward prop
        out, _ = self.gru(
            x, h0
        )  # second output is hidden state, since every example has its own hidden state, we ignore that output
        out = out.reshape(
            out.shape[0], -1
        )  # keep the batch as the first axis and concat everything else (28*256)
        out = self.fc(out)
        return out


In [15]:
# Initialize network
model = GRU(input_size, hidden_size, num_layers, num_classes).to(device)
model


GRU(
  (gru): RNN(28, 256, num_layers=2, batch_first=True)
  (fc): Linear(in_features=7168, out_features=10, bias=True)
)

In [16]:
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)


In [17]:
# This cell took 3 minutes for 2 epochs on MacBook Pro (16-inch, 2021) Apple M1 Pro 16 GB

# Train Network loop
for epoch in range(num_epochs):
    for batch_idx, (data, targets) in enumerate(train_loader):
        # Get the data to CUDA of MPS if possible
        data = data.to(device=device).squeeze(1)  # Nx1x28x28 -> Nx28x28
        targets = targets.to(device=device)

        # Forward
        scores = model(data)
        loss = criterion(scores, targets)

        # Backward
        optimizer.zero_grad()
        loss.backward()

        # Gardient descent or adam step
        optimizer.step()


In [18]:
# Check accuracy on training data and test to see how good our model is
def check_accuracy(loader, model):
    if loader.dataset.train:
        print("Checking accuracy on training data.")
    else:
        print("Checking accuracy on test data.")

    num_correct = 0
    num_samples = 0
    model.eval()

    with torch.no_grad():
        for x, y in loader:
            x = x.to(device=device).squeeze(1)
            y = y.to(device=device)
            # x = x.reshape(x.shape[0], -1)

            scores = model(x)
            _, predictions = scores.max(1)
            num_correct += (predictions == y).sum()
            num_samples += predictions.size(0)

        print(
            f"Got {num_correct} / {num_samples} with accuracy \
            {float(num_correct)/float(num_samples)*100:.2f}"
        )

    model.train()


In [19]:
check_accuracy(train_loader, model)
check_accuracy(test_loader, model)


Checking accuracy on training data.
Got 58404 / 60000 with accuracy             97.34
Checking accuracy on test data.
Got 9729 / 10000 with accuracy             97.29


# LSTM

In [20]:
# Hyperparameters
input_size = 28  # Number of features for each time-step but we don't have to explicitely say this
sequence_length = 28
num_layers = 2
num_classes = 10
hidden_size = 256  # Number of nodes in each time-step
learning_rate = 0.001
batch_size = 64
num_epochs = 2


In [21]:
# Create a basic LSTM
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes) -> None:
        super(LSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(
            input_size, hidden_size, num_layers, batch_first=True
        )  # we have batches as the first axis (0)
        # N(batch_size) x time_seq x features
        self.fc = nn.Linear(
            hidden_size * sequence_length, num_classes
        )  # We have 28 time-seuqences (time-steps) and we concatenate all of those sequence as send into linear layer. So linear layer uses information from every hidden state. We could take the absolute last hidden sates. I have it later.

    def forward(self, x):
        h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(
            device
        )  # number of layers, number of mini-batches we send in at the same time, hidden size
        # we need to have seperate cell state
        c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(
            device=device
        )  # cell state

        # Forward prop
        out, _ = self.lstm(
            x, (h0, c0)
        )  # second output is hidden state, since every example has its own hidden state, we ignore that output
        out = self.fc(out[:, -1,:]) # All mini batches, Last hidden-state, All features
        return out


In [22]:
# Initialize network
model = LSTM(input_size, hidden_size, num_layers, num_classes).to(device)
model


LSTM(
  (lstm): LSTM(28, 256, num_layers=2, batch_first=True)
  (fc): Linear(in_features=7168, out_features=10, bias=True)
)

In [23]:
# This cell took 3 minutes for 2 epochs on MacBook Pro (16-inch, 2021) Apple M1 Pro 16 GB

# Train Network loop
for epoch in range(num_epochs):
    for batch_idx, (data, targets) in enumerate(train_loader):
        # Get the data to CUDA of MPS if possible
        data = data.to(device=device).squeeze(1)  # Nx1x28x28 -> Nx28x28
        targets = targets.to(device=device)

        # Forward
        scores = model(data)
        loss = criterion(scores, targets)

        # Backward
        optimizer.zero_grad()
        loss.backward()

        # Gardient descent or adam step
        optimizer.step()


: 

: 

In [None]:
# Check accuracy on training data and test to see how good our model is
def check_accuracy(loader, model):
    if loader.dataset.train:
        print("Checking accuracy on training data.")
    else:
        print("Checking accuracy on test data.")

    num_correct = 0
    num_samples = 0
    model.eval()

    with torch.no_grad():
        for x, y in loader:
            x = x.to(device=device).squeeze(1)
            y = y.to(device=device)
            # x = x.reshape(x.shape[0], -1)

            scores = model(x)
            _, predictions = scores.max(1)
            num_correct += (predictions == y).sum()
            num_samples += predictions.size(0)

        print(
            f"Got {num_correct} / {num_samples} with accuracy \
            {float(num_correct)/float(num_samples)*100:.2f}"
        )

    model.train()


In [None]:
check_accuracy(train_loader, model)
check_accuracy(test_loader, model)
