In [119]:
import torch
from torch.utils.data import Dataset, DataLoader
import os
from PIL import Image
import numpy as np
import torch.nn as nn
import matplotlib.pyplot as plt
import torchvision
from torchvision import datasets

In [120]:
# TODO: Check torch minst dataset for how it holds labels
dataset_training=torchvision.datasets.MNIST(download=True, root='./data', train=True, transform=torchvision.transforms.ToTensor())
loader_training=DataLoader(dataset_training, shuffle=True, batch_size=4)
dataset_test=torchvision.datasets.MNIST(download=True, root='./data', train=False, transform=torchvision.transforms.ToTensor())
loader_test=DataLoader(dataset_test, shuffle=True, batch_size=4)

In [121]:
def load_image (file_path):
    image=Image.open(file_path)
    background = Image.new("RGB", image.size, (255, 255, 255))
    background.paste(image, mask=image.split()[3]) # 3 is the alpha channel
    background=background.convert('1')
    arr=np.array(background)
    image.close()
    return arr

In [122]:
class CifDataset(Dataset):
    def __init__(self, training=None):
        self.data=[]
        self.label=[]
        if os.path.exists("data.npy") and os.path.exists("label.npy"):
            self.data=np.load('data.npy')
            self.label=np.load('label.npy')
        else:
            for i in range(0, 10):
                cif_path=os.path.join('dataset', str(i), str(i))
                files=os.listdir(cif_path)
                index=0
                for file in files:
                    file_path=os.path.join(cif_path, file)
                    nd_array=load_image(file_path).flatten()
                    self.data.append([nd_array])
                    self.label.append(i)
                    #if index>1000:
                    #    break
                    index+=1
            self.data=np.float32(np.array(self.data))
            self.label=np.long(np.array(self.label))
            self.data, self.label=self.unison_shuffled_copies(self.data, self.label)
            np.save('data.npy', self.data)
            np.save('label.npy', self.label)
        cut=int(0.9*len(self.label))
        if training==True:
            self.data=self.data[:cut]
            self.label=self.label[:cut]
        elif training==False:
            self.data=self.data[cut:]
            self.label=self.label[cut:]
        
    def unison_shuffled_copies(self, a, b):
        assert len(a) == len(b)
        p = np.random.RandomState(seed=69).permutation(len(a), )
        return a[p], b[p]
    
    def __len__(self):
        return self.data.shape[0]
    def __getitem__(self, idx):
        return self.data[idx], self.label[idx]
            

In [123]:
class TestDataset(Dataset):
    def __init__(self):
        self.data=[]
        self.label=[]

        for file in os.listdir("tests"):
            file_path=os.path.join("tests", file)
            nd_array=load_image(file_path).flatten()
            self.data.append([nd_array])
            self.label.append(int(file[0]))
            
        self.data=np.float32(np.array(self.data))
        self.label=np.long(np.array(self.label))
        self.data, self.label=self.unison_shuffled_copies(self.data, self.label)

    def unison_shuffled_copies(self, a, b):
        assert len(a) == len(b)
        p = np.random.RandomState(seed=69).permutation(len(a), )
        return a[p], b[p]
        
    def __len__(self):
        return self.data.shape[0]
    def __getitem__(self, idx):
        return self.data[idx], self.label[idx]
            

In [124]:
dataset_training=CifDataset(training=True) #TODO: Implement training and testing data here!
loader_training=DataLoader(dataset_training, shuffle=True, batch_size=4)

In [125]:
dataset_test=CifDataset(training=False)
#dataset_test=TestDataset()
loader_test=DataLoader(dataset_test, shuffle=True, batch_size=4)

In [126]:
print (f'Training dataset length: {len(dataset_training)}')

Training dataset length: 96957


In [127]:
print (f'Test dataset length: {len(dataset_test)}')

Test dataset length: 10773


In [128]:
device="cpu"
device="cuda" if torch.cuda.is_available() else "cpu"
print (device)

cuda


In [129]:
class CifModel(nn.Module):
    def __init__(self, n_hidden):
        super().__init__()
        self.l1=nn.Linear(28*28, n_hidden)
        self.relu=nn.LeakyReLU()
        self.l11=nn.Linear(n_hidden, n_hidden)
        self.l2=nn.Linear(n_hidden, 10)
        self.sgm=nn.Sigmoid()
    def forward(self, sample):
        sample=self.l1(sample)
        sample=self.relu(sample)
        sample=self.l11(sample)
        sample=self.relu(sample)
        sample=self.l2(sample)
        #sample=self.sgm(sample)
        return sample

In [130]:
model=CifModel(100).to(device)

In [131]:
n_epochs=2
loss_fn=nn.CrossEntropyLoss()
optimizer=torch.optim.SGD(params=model.parameters(), lr=0.01)

In [40]:
for epoch in range(n_epochs): #It works on my own dataset as well, but it fails on my samples
    index=0
    for _, (image, label) in enumerate(loader_training):
        image=image.to(device).reshape(-1, 28*28)
        label=label.to(device)
        y_predict=model(image)
        loss=loss_fn(y_predict, label)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        if index%5000==0:
            print (f'Epoch: {epoch+1}, Iter: {index}, Loss: {loss:.3f}')
        index+=1

Epoch: 1, Iter: 0, Loss: 0.000
Epoch: 1, Iter: 5000, Loss: 0.013
Epoch: 1, Iter: 10000, Loss: 0.051
Epoch: 1, Iter: 15000, Loss: 0.008
Epoch: 1, Iter: 20000, Loss: 0.000
Epoch: 2, Iter: 0, Loss: 0.004
Epoch: 2, Iter: 5000, Loss: 0.000
Epoch: 2, Iter: 10000, Loss: 0.001
Epoch: 2, Iter: 15000, Loss: 0.003
Epoch: 2, Iter: 20000, Loss: 0.000


In [135]:
with torch.no_grad():
    index=0
    num_correct=0
    num_samples=0
    for _, (image, label) in enumerate(loader_test):
        image=image.to(device).reshape(-1, 28*28)
        label=label.to(device)
        y_pred=model(image)
        _, nr=y_pred.detach().max(dim=1)
        num_correct += (nr == label).sum()
        num_samples += nr.size(0)
        if (index%100==0):
            acc=100*num_correct/num_samples
            print (f'Accuracy: {acc}%')
        index+=1
    print (f'Final Accuracy: {acc}%')

Accuracy: 100.0%
Accuracy: 99.75247192382812%
Accuracy: 99.87561798095703%
Accuracy: 99.8338851928711%
Accuracy: 99.87531280517578%
Accuracy: 99.90019989013672%
Accuracy: 99.91680145263672%
Accuracy: 99.92867279052734%
Accuracy: 99.87515258789062%
Accuracy: 99.8890151977539%
Accuracy: 99.90010070800781%
Accuracy: 99.88646697998047%
Accuracy: 99.89591979980469%
Accuracy: 99.90391540527344%
Accuracy: 99.89292907714844%
Accuracy: 99.88341522216797%
Accuracy: 99.89070129394531%
Accuracy: 99.8971176147461%
Accuracy: 99.87506866455078%
Accuracy: 99.88164520263672%
Accuracy: 99.87506866455078%
Accuracy: 99.86911010742188%
Accuracy: 99.86370086669922%
Accuracy: 99.85875701904297%
Accuracy: 99.8542251586914%
Accuracy: 99.85005950927734%
Accuracy: 99.8462142944336%
Final Accuracy: 99.8462142944336%


In [133]:
checkpoint = torch.load("model.ckpt")
model.load_state_dict(checkpoint['model_state_dict'])
optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
epoch = checkpoint['epoch']
loss = checkpoint['loss']

In [46]:
torch.save({
            'epoch': n_epochs,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': loss,
            }, 'model.ckpt')

In [None]:
# Final Accuracy: 97.40733337402344% (5 Epoch)
# Final Accuracy: 97.56351470947266% (2 Epoch)
# Final Accuracy: 96.02249145507812% (2 Epoch)