In [59]:
import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision import datasets
from torchvision.transforms import ToTensor
from torchvision.io import read_image
import torchvision.transforms as transforms

In [60]:
# Download training data from open datasets.
training_data = datasets.MNIST(
    root="data",
    train=True,
    download=True,
    transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
        ]),
)

# Download test data from open datasets.
test_data = datasets.MNIST(
    root="data",
    train=False,
    download=True,
    transform=transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
        ]),
)

In [61]:
batch_size = 64

# Create data loaders.
train_dataloader = DataLoader(training_data, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_data, batch_size=batch_size, shuffle=True)

for X, y in train_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

Shape of X [N, C, H, W]: torch.Size([64, 1, 28, 28])
Shape of y: torch.Size([64]) torch.int64


In [62]:
# Get cpu, gpu or mps device for training.
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)# Get cpu, gpu or mps device for training.
device = (
    "cuda"
    if torch.cuda.is_available()
    else "mps"
    if torch.backends.mps.is_available()
    else "cpu"
)
print(f"Using {device} device")

# Define model
class NeuralNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        # self.flatten = nn.Flatten()
        # self.linear_relu_stack = nn.Sequential(
            # nn.Linear(784, 512),
            # nn.BatchNorm1d(512),
            # nn.LeakyReLU(),
            # nn.Dropout(0.25),

            # nn.Linear(512, 512),
            # nn.BatchNorm1d(512),
            # nn.LeakyReLU(),
            # nn.Dropout(0.25),

            # nn.Linear(512, 256),
            # nn.BatchNorm1d(256),
            # nn.LeakyReLU(),
            # nn.Dropout(0.25),

            # nn.Linear(256, 128),
            # nn.LayerNorm(128),
            # nn.LeakyReLU(),
            # nn.Dropout(0.1),

            # nn.Linear(128, 10),
            # nn.LogSoftmax(1)
        # )
        self.layer1 = nn.Sequential(            
            nn.Conv2d(1, 6, kernel_size=3, stride=1, padding=1),            
            nn.BatchNorm2d(6),            
            nn.Tanh(),            
            nn.MaxPool2d(kernel_size=2, stride=2))        
        
        self.layer2 = nn.Sequential(            
            nn.Conv2d(6, 16, kernel_size=5, stride=1, padding=0),            
            nn.BatchNorm2d(16),            
            nn.Tanh(),            
            nn.MaxPool2d(kernel_size=2, stride=2))      
            
        self.fc1 = nn.Sequential(         
            nn.Linear(5*5*16, 120),
            nn.BatchNorm1d(120),
            nn.Tanh(),
            nn.Dropout(0.25),
        )

        self.fc2 = nn.Sequential(         
            nn.Linear(120, 84),
            nn.BatchNorm1d(84),
            nn.Tanh(),
            nn.Dropout(0.25),
        )

        self.fc3 = nn.Sequential(
            nn.Linear(84, 10),
            nn.LogSoftmax(1)
        )

    def forward(self, x):        
        out = self.layer1(x)        
        out = self.layer2(out)        
        out = out.reshape(out.size(0), -1)        
        out = self.fc1(out)
        out = self.fc2(out)
        out = self.fc3(out)   
        return out


    # def forward(self, x):
    #     # x = self.flatten(x)
    #     logits = self.linear_relu_stack(x)
    #     return logits

model = NeuralNetwork().to(device)
print(model)
print(f"Using {device} device")


Using cuda device
NeuralNetwork(
  (layer1): Sequential(
    (0): Conv2d(1, 6, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(6, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): Tanh()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (layer2): Sequential(
    (0): Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1))
    (1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): Tanh()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (fc1): Sequential(
    (0): Linear(in_features=400, out_features=120, bias=True)
    (1): BatchNorm1d(120, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): Tanh()
    (3): Dropout(p=0.25, inplace=False)
  )
  (fc2): Sequential(
    (0): Linear(in_features=120, out_features=84, bias=True)
    (1): BatchNorm1d(84, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)

In [63]:
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.RMSprop(model.parameters(), lr=1e-3)

In [64]:
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")

In [65]:
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [99]:
epochs = 2
with torch.no_grad():
    nn.init.xavier_uniform_(model.fc1[0].weight)
    nn.init.xavier_uniform_(model.fc2[0].weight)
    nn.init.xavier_uniform_(model.fc3[0].weight)

    nn.init.zeros_(model.fc1[0].bias)
    nn.init.zeros_(model.fc2[0].bias)
    nn.init.zeros_(model.fc3[0].bias)

for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn)
print("Done!")

test(train_dataloader, model, loss_fn)

Epoch 1
-------------------------------
loss: 2.675250  [   64/60000]
loss: 0.121971  [ 6464/60000]
loss: 0.089904  [12864/60000]
loss: 0.062588  [19264/60000]
loss: 0.026537  [25664/60000]
loss: 0.019687  [32064/60000]
loss: 0.028218  [38464/60000]
loss: 0.034390  [44864/60000]
loss: 0.139679  [51264/60000]
loss: 0.094347  [57664/60000]
Test Error: 
 Accuracy: 98.5%, Avg loss: 0.044663 

Epoch 2
-------------------------------
loss: 0.054879  [   64/60000]
loss: 0.013962  [ 6464/60000]
loss: 0.013133  [12864/60000]
loss: 0.197255  [19264/60000]
loss: 0.049450  [25664/60000]
loss: 0.092796  [32064/60000]
loss: 0.015319  [38464/60000]
loss: 0.030585  [44864/60000]
loss: 0.033125  [51264/60000]
loss: 0.030715  [57664/60000]
Test Error: 
 Accuracy: 98.7%, Avg loss: 0.038984 

Done!
Test Error: 
 Accuracy: 99.3%, Avg loss: 0.021616 



In [67]:
# model.eval()
# x, y = test_data[42][0], test_data[42][1]
# with torch.no_grad():
#     x = x.to(device)
#     pred = model(x)
#     predicted, actual = torch.argmax(pred[0]), y
#     print(f'Predicted: "{predicted}", Actual: "{actual}"')

In [107]:
image = read_image('1_2.jpg')
transform = transforms.Grayscale()
data = transform(image)

validation_dataloader = DataLoader(data / 256, batch_size=64)

with torch.no_grad():
  model.eval()
  for batch, X in enumerate(validation_dataloader):
    pred = model(X.unsqueeze(0).to(device))
    print(torch.argmax(pred[0]))


tensor(8, device='cuda:0')
