In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms
from torch.optim.lr_scheduler import StepLR
from torch.nn import CrossEntropyLoss

import os
import pandas
import numpy as np
import idx2numpy

from typing import Tuple

import matplotlib.pyplot as plt

In [4]:
! pip install idx2numpy

Collecting idx2numpy
  Downloading idx2numpy-1.2.3.tar.gz (6.8 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Building wheels for collected packages: idx2numpy
  Building wheel for idx2numpy (setup.py): started
  Building wheel for idx2numpy (setup.py): finished with status 'done'
  Created wheel for idx2numpy: filename=idx2numpy-1.2.3-py3-none-any.whl size=7905 sha256=45691d4862d5f9087da2419b71fa616aa3e68c8d06f71a08ea8ac44150ade13a
  Stored in directory: c:\users\ericeckstein\appdata\local\pip\cache\wheels\34\61\53\a6a64db5e907bdf792f401b5bfb922eabfe6686d08692167f3
Successfully built idx2numpy
Installing collected packages: idx2numpy
Successfully installed idx2numpy-1.2.3


In [6]:
class NonLinearModel(torch.nn.Module):
    def __init__(self, 
                 input_dim: int, 
                 n_intermediate: int,
                 intermediate_dim: int, 
                 output_dim: int,
                 act_fun: nn.Module) -> None:
        super(NonLinearModel, self).__init__()
        
        self.is_conv = False
        
        # we will store all our layers/operations here
        self.layers = torch.nn.Sequential()
        
        if n_intermediate > 0:  
            # add input layer
            self.layers.append(nn.Linear(in_features=input_dim, 
                                         out_features=intermediate_dim))
        
            # add intermediate layers and activation functions
            for _ in range(n_intermediate-1):
                self.layers.append(act_fun)
                self.layers.append(nn.Linear(in_features=intermediate_dim, 
                                             out_features=intermediate_dim))
        
            # add  output layer
            self.layers.append(act_fun)
            self.layers.append(nn.Linear(in_features=intermediate_dim, 
                                         out_features=output_dim))
        else:
            self.layers.append(nn.Linear(in_features=input_dim, 
                                         out_features=output_dim))
            
            
    # TODO: addapt output: we have 10 classes! use softmax or do we? 
    # lets investigate the cross netropy loss and NLLLOSS
    
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # execute all operations
        out = self.layers(x)
        return out

In [7]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Sequential(         
            nn.Conv2d(in_channels=1,
                      out_channels=16,            
                      kernel_size=5,              
                      stride=1,                   
                      padding=2),                              
            nn.ReLU(),                      
            nn.MaxPool2d(kernel_size=2),    
        )
        self.conv2 = nn.Sequential(         
            nn.Conv2d(16, 32, 5, 1, 2),     
            nn.ReLU(),                      
            nn.MaxPool2d(2),                
        )
        # fully connected layer, output 10 classes
        self.out = nn.Linear(32 * 7 * 7, 10)
        
        self.is_conv = True
        
    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        # flatten the output of conv2 to (batch_size, 32 * 7 * 7)
        x = x.view(x.size(0), -1)       
        output = self.out(x)
        return output 

In [8]:
net = NonLinearModel(input_dim=28*28, n_intermediate=2, intermediate_dim=50, act_fun=nn.ReLU(), output_dim=10)
torch.nn.utils.parameters_to_vector(net.parameters()).shape

torch.Size([42310])

In [9]:
cnn = CNN()
torch.nn.utils.parameters_to_vector(cnn.parameters()).shape

torch.Size([28938])

In [10]:
def load_idxfile(path: str) -> np.array:
    # can we do that for all files in the folder? 
    arr = idx2numpy.convert_from_file(path)
    return arr

In [11]:
file_path = r'/home/hubert/Lecture/data/raw/t10k-images.idx3-ubyte'
#file_path = r'/path/to/t10k-images.idx3-ubyte'
data = load_idxfile(file_path)

FileNotFoundError: [Errno 2] No such file or directory: '/home/hubert/Lecture/data/raw/t10k-images.idx3-ubyte'

In [12]:
file_path_labels = r'/home/hubert/Lecture/data/raw/t10k-labels.idx1-ubyte'
#file_path = r'/path/to/t10k-labels.idx1-ubyte'
labels = load_idxfile(file_path_labels)

FileNotFoundError: [Errno 2] No such file or directory: '/home/hubert/Lecture/data/raw/t10k-labels.idx1-ubyte'

In [None]:
idx = 1
print(f'Label: {labels[idx]}')
plt.imshow(data[idx])
plt.show()

In [None]:
class MNISTDset(Dataset):
    def __init__(self, images: np.array, labels: np.array) -> None:
        self.images = torch.tensor(images)/255.
        self.labels = torch.tensor(labels)
        self.num_samples = len(self.labels)
        
        # nomralize to standard deviation
        self._normalize()
        
        
    def set_num_samples(self, n:int=None) -> None:
        '''
        Restrict numbers of samples. 
        Not necessary, but sometimes useful for model testing 
        '''
        if n is None:
            self.num_samples = len(self.labels)
        else:
            assert 0 <=  n <= len(self.labels)
            self.num_samples = n
            
    def _normalize(self, mean: float=None, std: float=None):
        if std is not None: 
            assert std > 0
        '''Normalize data to nomral standard'''
        self.images = self.images - self.images.mean() if mean is None else self.images - mean 
        self.images = self.images / (self.images.std() + 1e-12) if std == None else self.images / std
        
    def __len__(self):
        ret = self.num_samples
        return ret
    
    def __getitem__(self, idx:int) -> Tuple[torch.Tensor, torch.Tensor, int]:
        image = self.images[idx]
        label = self.labels[idx]
        return image, label, idx 

In [None]:
dset = MNISTDset(images=data, labels=labels)
print(f'num samples in dataset: {len(dset)}')

In [None]:
img, label, idx = dset[15]
print(f'Label = {label}')
plt.imshow(img)
plt.show()

In [None]:
dloader = DataLoader(dset, batch_size=10, shuffle=True, num_workers=4)

In [None]:
for i, (image, targets, idx) in enumerate(dloader):
    print(image.shape)
    print(targets)
    print(idx)
    break

In [None]:
def accuracy(out, labels):
    return (out.argmax(-1) == label).float()

In [None]:
def train(model, train_loader, optimizer, loss_fun, device, epoch) -> Tuple[float, float]:
    model.train()
    
    n_batches = len(train_loader)
    for i, (image, targets, idx) in enumerate(train_loader):
        # get batch size
        bs = image.shape[0]
            
        # fully connected model: we need to flatten the images
        x = image.view(bs,-1) if not model.is_conv else image.view(bs,1,28,28)
            
        # image to device
        x = x.to(device)
            
        # zero grads
        optimizer.zero_grad()
            
        # forward pass
        out = model(x)
            
        # calc loss and gradients
        loss = loss_fun(out, targets).mean()
        loss.backward()
            
        # update
        optimizer.step()
    return loss.item()

In [None]:
# we split the data into train and validation dataset and create two dataloader objects

# training data
train_data = data[:8000]
train_labels = labels[:8000]

print(train_data.size)
print(train_labels.size)
train_dset = MNISTDset(images=train_data, labels=train_labels)
train_loader = DataLoader(dataset=train_dset, batch_size=8, shuffle=True, num_workers=2)


# val data
val_data = data[8000:]
val_labels = labels[8000:]
val_dset = MNISTDset(images=val_data, labels=val_labels)
val_loader = DataLoader(dataset=val_dset, batch_size=8, shuffle=False, num_workers=2)

In [None]:
# implement main loop 
def main():
    num_epochs = 20
    model = NonLinearModel(input_dim=28*28, n_intermediate=2, intermediate_dim=50, act_fun=nn.ReLU(), output_dim=10)
    #model = CNN()
    print(model)
    optimizer = optim.Adam(params=model.parameters(),lr=0.001)
    ce_loss = CrossEntropyLoss()
    
    device = 'cpu'
    model = model.to(device)
    
    tr_loss = []
    tr_acc = []
    ev_loss = []
    ev_acc = []
    for epoch in range(num_epochs):
        loss = train(model, train_loader, optimizer, ce_loss, device, epoch)
        tr_loss.append(loss)
        
        
        # calculate accuracy
        model.eval()
        N = 2000
        x, label, idx = train_dset[:N] 
        x = x.view(N,1,28,28) if model.is_conv else  x.view(N,-1) 
        out = model(x)
        acc_ = (out.argmax(-1) == label).float().sum()/len(label)
        tr_acc.append(acc_)


        x, label, idx = val_dset[:N] 
        x = x.view(N,1,28,28) if model.is_conv else  x.view(N,-1)
        model.eval()
        out = model(x)
        acc_ = (out.argmax(-1) == label).float().sum()/len(label)
        ev_acc.append(acc_)
        
        
        print(f'epoch [{epoch+1}/{num_epochs}]: train loss = {loss:.5f}, train acc = {tr_acc[-1]:.5f}, val acc = {ev_acc[-1]:.5f}')
    
    plt.plot(tr_loss, label='train loss')
    plt.legend()
    plt.show()
    
    plt.plot(tr_acc, label='train accuracy')
    plt.plot(ev_acc, label='eval accuracy')
    plt.title('acc')
    plt.legend()
    plt.show()
    return model

In [None]:
model = main()

In [None]:
model

# Modelle speichern und laden

In [None]:
st = model.state_dict()

In [None]:
torch.save(st, 'model.pt')

In [None]:
state_dict = torch.load('model.pt')

In [None]:
net_new = NonLinearModel(input_dim=28*28, n_intermediate=2, intermediate_dim=50, act_fun=nn.ReLU(), output_dim=10)

In [None]:
net_new.load_state_dict(state_dict)

# Playground
visit https://playground.tensorflow.org/