In [None]:
from modulefinder import Module

from torchvision.datasets import MNIST
import torch
import torchvision.transforms as transforms # Transformations we can perform on our dataset for augmentation
from torch import optim # For optimizers like SGD, Adam, etc.
from torch import nn # To inherit our neural network
from torch.utils.data import Dataset,DataLoader # For management of the dataset (batches)
from tqdm import tqdm # For nice progress bar!
class MNISTDataset(Dataset):
    def __init__(self,root,train,download):
        if(train):
            dt_transforms = transforms.Compose([
            transforms.RandomRotation(degrees=10),
            transforms.RandomAffine(degrees=0, translate=(0.1, 0.1), scale=(0.9, 1.1)),
            transforms.ToTensor(),
            transforms.Normalize((0.5,),(0.5,)),
            # transforms.Lambda(lambda x : x.view(-1,784))
            ])
            self.dataset = MNIST(root=root, train=train, download=download, transform=dt_transforms)
        else:
            dt_transforms = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.5,),(0.5,)),
            # transforms.Lambda(lambda x : x.view(-1,784))
            ])
            self.dataset = MNIST(root=root, train=train, download=download, transform=dt_transforms)
    def loadeData(self,batch_size=64):
        return DataLoader(self.dataset,batch_size=batch_size,shuffle=True)
    def loadTestData(self,batch_size=64):
        return DataLoader(self.dataset,batch_size=batch_size,shuffle=False)
        # data_lines = int(input_data_size / batch_size)
        # 
        # self.proc_data = torch.empty((data_lines,batch_size,784),dtype=torch.float32 )
        # self.proc_label = torch.empty((data_lines, batch_size), dtype=torch.int8)
        # 
        # batch_idx = 0
        # 
        # for x, y in self.processed_dataset:
        #     
        #     self.proc_data[batch_idx] = x.squeeze()
        #     self.proc_label[batch_idx] = y
        #     batch_idx += 1
        # return self.proc_data,self.proc_label


In [None]:
# load dataset
train_dataset = MNISTDataset('data/train',True,False)
test_dataset = MNISTDataset('data/test',False,False)
# proc_train_data,proc_train_label = train_dataset.loadeData(60000)
# proc_test_data,proc_test_label = test_dataset.loadeData(10000)


# Inspect a sample from the dataset


In [None]:

# Inspect a sample from the dataset
# sample_image = proc_train_data[0][0]
# sample_label = proc_train_label[0][0]
# print("Sample image shape:", sample_image.shape)
# print("Sample label:", sample_label)
# sample_image

# # Crearea unui tensor 2D de exemplu
# tensor_2d = torch.tensor([[1, 2, 3], [4, 5, 6],[4, 5, 6]])

# # Aplatizarea tensorului
# flattened_tensor = torch.flatten(tensor_2d)
# 
# print("Tensor original:\n", tensor_2d)
# print("Tensor aplatizat:\n", flattened_tensor)

In [None]:
#Model

class NN(nn.Module):
    def __init__(self,input_size,hidden1,hidden2,num_classes):
        super(NN, self).__init__()
        
        self.fc1 = nn.Linear(input_size,hidden1)
        self.fc2 = nn.Linear(hidden1,hidden2)
        self.out = nn.Linear(hidden2,num_classes)
        self.activation_fct = nn.ReLU()
        # self.dropout = nn.Dropout(p=0.05)
    
    def forward(self,x):
        x = self.fc1(x)
        x = self.activation_fct(x)
        # x = self.dropout(x)
        x = self.fc2(x)
        x = self.activation_fct(x)
        # x = self.dropout(x)
        x = self.out(x)
        return x
    def forward_validation(self,x):
        x = self.fc1(x)
        x = self.activation_fct(x)
        x = self.fc2(x)
        x = self.activation_fct(x)
        x = self.out(x)
        return x
    def evalTest(self):
        self.loader=test_dataset.loadeData()
    def evalTraining(self): 
        self.loader=train_dataset.loadeData()
        
    def train(self,criterion,optimizer,num_epochs):
        for epoch in range(num_epochs):
            for batch_idx,(data,targets), in enumerate(tqdm(self.loader)):
                optimizer.zero_grad()
                # print("Data shape:",data.shape)
                # Get to correct shape
                data = data.reshape(data.shape[0], -1)
                # print(data.shape,"\n")
                
                # Forward
                # scores = self.forward_train(data)
                scores = self(data) # same as self.forward(data)
                loss = criterion(scores, targets)
                
                # Backward
                loss.backward()
                
                #adam step
                optimizer.step()
            
            # print("loss: ",loss)    
    

      

In [None]:
#Model

class NN_2(nn.Module):
    def __init__(self,input_size,hidden1,hidden2,num_classes):
        super(NN_2, self).__init__()
        
        self.fc1 = nn.Linear(input_size,hidden1)
        self.fc2 = nn.Linear(hidden1,hidden2)
        self.out = nn.Linear(hidden2,num_classes)
        self.activation_fct = nn.ReLU()
        # self.bn1 = nn.BatchNorm1d(hidden1)
        # self.bn2 = nn.BatchNorm1d(hidden2)
        self.dropout = nn.Dropout(p=0.2)
    
    def forward(self,x):
        x = self.fc1(x)
        # x = self.bn1(x)
        x = self.activation_fct(x)
        x = self.dropout(x)
        x = self.fc2(x)
        # x = self.bn2(x)
        x = self.activation_fct(x)
        x = self.dropout(x)
        x = self.out(x)
        return x
    def forward_validation(self,x):
        x = self.fc1(x)
        # x = self.bn1(x)
        x = self.activation_fct(x)
        x = self.fc2(x)
        # x = self.bn2(x)
        x = self.activation_fct(x)
        x = self.out(x)
        return x
    def evalTest(self):
        self.loader=test_dataset.loadTestData()
    def evalTraining(self): 
        self.loader=train_dataset.loadeData()
        
    def train(self,criterion,optimizer,num_epochs):
        for epoch in range(num_epochs):
            for batch_idx,(data,targets), in enumerate(tqdm(self.loader)):
                optimizer.zero_grad()
                # print("Data shape:",data.shape)
                # Get to correct shape
                data = data.reshape(data.shape[0], -1)
                # print(data.shape,"\n")
                
                # Forward
                # scores = self.forward_train(data)
                scores = self(data) # same as self.forward(data)
                loss = criterion(scores, targets)
                
                # Backward
                loss.backward()
                
                #adam step
                optimizer.step()
       
    

      

In [None]:
def check_accuracy(model):
 
    num_correct = 0
    num_samples = 0

    with torch.no_grad():
        # Loop through the data
        for x, y in model.loader:
            # Forward pass
            x = x.reshape(x.shape[0], -1)
            scores = model.forward_validation(x)
            
            _, predictions = scores.max(dim=1)
            # Check how many we got correct
            num_correct += (predictions == y).sum()
            # Keep track of number of samples
            num_samples += predictions.size(0)
    print(f'Got {num_correct} / {num_samples} with accuracy {float(num_correct)/float(num_samples)*100:.2f}')




In [None]:

def write_predictions_to_file(model, filename='predictions.csv'):
    model.evalTest()
    with open(filename, 'w') as f:
        f.write("ID,Target\n")
        with torch.no_grad():
            for idx, (data, _) in enumerate(model.loader):
                data = data.view(data.size(0), -1) # transform (64,28,28) to (64,784)
                outputs = model.forward_validation(data)
                _, predictions = torch.max(outputs, 1)
                for i, prediction in enumerate(predictions):
                    f.write(f"{idx * model.loader.batch_size + i},{prediction.item()}\n")

# Example usage:
# Assuming `test_loader` is your DataLoader for the test dataset
# write_predictions_to_file(model, test_loader)

### Setup

In [None]:
# Hyperparameters
input_size = 784
num_classes = 10
learning_rate = 0.001
batch_size = 64
num_epochs = 10
 
# Initialize Module
model = NN(input_size = input_size ,hidden1=128,hidden2=64, num_classes= num_classes,)

# Loss and Optimizer
criterion = nn.CrossEntropyLoss()
# Adam optimizer
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# optimizer = optim.Adadelta(model.parameters(), lr=learning_rate)
# SGD with momentum
# optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9)



In [None]:
# view what view is doing
for x,y in model.loader:
    x = x.squeeze()
    x = x.view(x.size(0),-1)
    print(x.size() , y.size())
    print(x.shape, y.shape)
    break


In [None]:
model.evalTraining()
print("Accuracy on training")
model.train(criterion,optimizer,num_epochs)
check_accuracy(model)

In [None]:
print("Accuracy on validation")
model.evalTest()
check_accuracy(model)

In [None]:
model2 = NN_2(input_size = input_size ,hidden1=128,hidden2=64, num_classes= num_classes,)
model2.fc1.load_state_dict(model.fc1.state_dict())
model2.fc2.load_state_dict(model.fc2.state_dict())
model2.out.load_state_dict(model.out.state_dict())



In [None]:
# with batch normalization
model3 = NN_2(input_size = input_size ,hidden1=128,hidden2=64, num_classes= num_classes,)
model3.fc1.load_state_dict(model2.fc1.state_dict())
model3.fc2.load_state_dict(model2.fc2.state_dict())
model3.out.load_state_dict(model2.out.state_dict())

In [None]:
model3.dropout = nn.Dropout(p=0.1)

In [None]:
# Reload the augmented dataset
augmented_train_dataset = MNISTDataset('data/train', True, False)
model3.loader = augmented_train_dataset.loadeData()

In [None]:
# Reinitialize the optimizer for mode
# l2
optimizer3 = optim.Adam(model3.parameters(), lr=learning_rate)


In [None]:
model3.evalTraining()
print("Accuracy on training")
model3.train(criterion,optimizer3,10)
# check_accuracy(model3)


In [None]:
print("Accuracy on validation")
model3.evalTest()
(check_accuracy(model3))

In [None]:
# Reinitialize the optimizer for mode
# l2
optimizer2 = optim.Adam(model2.parameters(), lr=learning_rate)


In [None]:
# model2.evalTraining()
# print("Accuracy on training")
# model2.train(criterion,optimizer2,num_epochs)
# check_accuracy(model2)


In [None]:
print("Accuracy on validation")
model2.evalTest()
check_accuracy(model2)

In [None]:
# Make predictions
write_predictions_to_file(model3, 'predictions9873.csv')


In [None]:
# Salvare ponderi
torch.save(model.state_dict(), 'model_weights.pth')
print("Model weights saved!")

In [None]:
# Load Weights
model = NN(input_size=input_size, hidden1=128, hidden2=64, num_classes=num_classes)
model.load_state_dict(torch.load('model_weights.pth'))

In [None]:
# Save the entire model
torch.save(model, 'complete_model.pth')
print("Complete model saved!")

In [None]:
# Load the entire model
model = torch.load('model_weights.pth')

In [None]:

# Save the model, optimizer, and training state
def save_checkpoint(model, optimizer, epoch, loss, filename='data/save/checkpoint.pth.tar'):
    checkpoint = {
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'epoch': epoch,
        'loss': loss
    }
    torch.save(checkpoint, filename)
    print(f"Checkpoint saved to {filename}")

In [None]:
torch.save(model2.fc1, 'data/save/fc1.pth')
torch.save(model2.fc2, 'data/save/fc2.pth')
torch.save(model2.out, 'data/save/out.pth')
torch.save(model2, 'data/save/model.pth')


In [None]:
torch.load('data/save/fc1.pth',model.fc1)
torch.load('data/save/fc2.pth',model.fc2)
torch.load('data/save/out.pth',model.out)

In [None]:
print(model.fc1.weight.shape)
print(model.fc2.weight.shape)
print(model.out.weight.shape)
print(model.out.bias)