In [210]:
from gradient_descent import sklearn_optimize
from data import sample_weights
import numpy as np
# Let's see how well we do on the test set
from theoretical import predict_erm
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from data_model import CustomSpectra
import pickle

def load_mnist(path, kind='train'):
    import os
    import gzip
    import numpy as np

    """Load MNIST data from `path`"""
    labels_path = os.path.join(path,
                               '%s-labels-idx1-ubyte.gz'
                               % kind)
    images_path = os.path.join(path,
                               '%s-images-idx3-ubyte.gz'
                               % kind)

    with gzip.open(labels_path, 'rb') as lbpath:
        labels = np.frombuffer(lbpath.read(), dtype=np.uint8,
                               offset=8)

    with gzip.open(images_path, 'rb') as imgpath:
        images = np.frombuffer(imgpath.read(), dtype=np.uint8,
                               offset=16).reshape(len(labels), 784)

    return images, labels

%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [211]:
# Let's read the pickle file
with open('data/fashion_mnist.pkl', 'rb') as f:
    data = pickle.load(f)
    X_train = np.array(data['X_train'])
    y_train = np.array(data['y_train'])
    X_test = np.array(data['X_test'])
    y_test = np.array(data['y_test'])
    # replace the -1 labels with 0
    y_train[y_train == -1] = 0
    y_test[y_test == -1] = 0

In [225]:
import torch
import torch.nn as nn

class MyNeuralNetwork(nn.Module):
    def __init__(self):
        super(MyNeuralNetwork, self).__init__()
        # Input layer with 784 neurons
        self.input_layer = nn.Linear(784, 1000, bias=False)
        
        # Three hidden layers with 1000 neurons and ReLU activation
        # self.hidden1 = nn.Linear(1000, 1000)
        self.hidden3 = nn.Linear(1000, 784, bias=False)
        
        # Output layer with 2 neurons and logistic (sigmoid) activation
        self.output_layer = nn.Linear(784, 1, bias=False)
        self.sigmoid = nn.Sigmoid()
        # self.sign = nn.Softsign()
    
    def almost_forward(self, x):
        # Forward pass with ReLU activation for hidden layers
        x = torch.relu(self.input_layer(x))
        # x = torch.relu(self.hidden1(x))
        x = torch.relu(self.hidden3(x))
        
        # Output layer with sigmoid activation
        
        return x

    def forward(self, x):
        x = self.almost_forward(x)
        x = self.output_layer(x) 
        # x = self.sigmoid(x)
        
        return x

# Create an instance of the neural network
net = MyNeuralNetwork()


In [226]:
# Check that MPS is available
if not torch.backends.mps.is_available():
    if not torch.backends.mps.is_built():
        print("MPS not available because the current PyTorch install was not "
              "built with MPS enabled.")
    else:
        print("MPS not available because the current MacOS version is not 12.3+ "
              "and/or you do not have an MPS-enabled device on this machine.")
    device = torch.device("cpu")
else:
    device = torch.device("cpu")

In [227]:
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader

# Define a custom dataset class
class CustomDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels
        self.transform = None

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        data = torch.FloatTensor(np.array(self.data[idx]))
        label = torch.IntTensor(np.array([self.labels[idx]]))
        return data, label

# Create custom datasets
train_dataset = CustomDataset(X_train, y_train)
test_dataset = CustomDataset(X_test, y_test)

# Create custom data loaders
batch_size = 64  # You can adjust this batch size according to your needs
test_batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=test_batch_size, shuffle=False)


In [228]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms

# Define your neural network class (as previously defined)

# Define a function to train the network
def train(net, train_loader, test_loader, num_epochs=10, learning_rate=0.001):
    net.to(device)
    
    # Define loss and optimizer
    # criterion = nn.CrossEntropyLoss()
    # criterion = nn.BCELoss()
    criterion = nn.MSELoss()
    optimizer = optim.Adam(net.parameters(), lr=learning_rate)
    
    # Lists to store training and test losses
    train_losses = []
    test_losses = []
    
    for epoch in range(num_epochs):
        # Training phase
        net.train()
        running_loss = 0.0
        for inputs, labels in train_loader:

            # Reshape the labels to (batch_size)
            labels = labels.reshape(-1)

            # change the datatype to float64 for the labels
            labels = labels.type(torch.float32)
            inputs, labels = inputs.to(device), labels.to(device)

            # print the gradient values of inputs and labels
            # print("input grad",inputs.grad)
            # print("label grad", labels.grad)


            # Zero the parameter gradients
            optimizer.zero_grad()
            
            # Forward pass
            outputs = net(inputs)

            # print the gradient values of outputs
            # print("outptu grad", outputs.grad)

            # print the output
            # print("outputs", outputs)

            loss = criterion(outputs, labels)

            # print the gradient values of loss
            # print("loss grad",loss.grad)
            # print("loss", loss)
            
            # Backpropagation and optimization
            loss.backward()

            # print("loss grad",loss.grad)
            optimizer.step()
            
            running_loss += loss.item()
        
        # Calculate training loss for this epoch
        train_loss = running_loss / len(train_loader)
        train_losses.append(train_loss)
        
        # Testing phase
        net.eval()
        test_loss = 0.0
        
        with torch.no_grad():
            total = 0
            correct = 0
            for inputs, labels in test_loader:
                
                # inputs = inputs.reshape(-1)
                # print("input shape", inputs.shape)
                # print("label shape", labels.shape)

                # Reshape the labels to (batch_size)
                labels = labels.reshape(-1)

                # change the datatype to float32 for the labels
                labels = labels.type(torch.float32)
                inputs, labels = inputs.to(device), labels.to(device)
                

                outputs = net(inputs)
                loss = criterion(outputs, labels)
                
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                # count the number of correct predictions
                correct += (predicted == labels).sum().item()     


                test_loss += loss.item()

            accuracy = correct / total
        
        # Calculate test loss for this epoch
        test_loss /= len(test_loader)
        test_losses.append(test_loss)
        
        # Print training and test loss for this epoch
        if (epoch+1) % 1 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}] - Train Loss: {train_loss:.4f}, Test Loss: {test_loss:.4f}, Test Accuracy: {accuracy:.4f}')
    
    print('Training complete')
    
    return train_losses, test_losses


# Create an instance of your neural network
net = MyNeuralNetwork()

# Train the network and log the losses
train_losses, test_losses = train(net, train_loader, test_loader, num_epochs=40, learning_rate=0.001)


Epoch [1/40] - Train Loss: 0.2832, Test Loss: 0.2554, Test Accuracy: 0.5000
Epoch [2/40] - Train Loss: 0.2558, Test Loss: 0.2513, Test Accuracy: 0.5000
Epoch [3/40] - Train Loss: 0.2532, Test Loss: 0.2524, Test Accuracy: 0.5000
Epoch [4/40] - Train Loss: 0.2532, Test Loss: 0.2524, Test Accuracy: 0.5000
Epoch [5/40] - Train Loss: 0.2536, Test Loss: 0.2580, Test Accuracy: 0.5000
Epoch [6/40] - Train Loss: 0.2540, Test Loss: 0.2533, Test Accuracy: 0.5000
Epoch [7/40] - Train Loss: 0.2537, Test Loss: 0.2514, Test Accuracy: 0.5000
Epoch [8/40] - Train Loss: 0.2528, Test Loss: 0.2518, Test Accuracy: 0.5000
Epoch [9/40] - Train Loss: 0.2533, Test Loss: 0.2516, Test Accuracy: 0.5000
Epoch [10/40] - Train Loss: 0.2532, Test Loss: 0.2593, Test Accuracy: 0.5000
Epoch [11/40] - Train Loss: 0.2540, Test Loss: 0.2523, Test Accuracy: 0.5000
Epoch [12/40] - Train Loss: 0.2538, Test Loss: 0.2548, Test Accuracy: 0.5000
Epoch [13/40] - Train Loss: 0.2565, Test Loss: 0.2509, Test Accuracy: 0.5000
Epoch [1

In [229]:


# Get the test accuracy on the entire test set
net.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for inputs, labels in test_loader:
        labels = labels.reshape(-1)
        labels = labels.type(torch.LongTensor)

        inputs, labels = inputs.to(device), labels.to(device)
        outputs = net(inputs)
        # print max and min of outptus
        print("max", torch.max(outputs.data))
        print("min", torch.min(outputs.data))
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        # count the number of correct predictions
        correct += (predicted == labels).sum().item()       
        # print the predicted and true labels
        print("predicted", predicted)
        print("labels", labels)

        print("total", total)
        print("correct", correct)

    print('Accuracy of the network on all test images: %d %%' % (100 * correct / total))



max tensor(0.5878)
min tensor(0.4666)
predicted tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
labels tensor([1, 1, 0, 1, 0, 0, 1, 1, 1, 1, 0, 1, 1, 1, 0, 0, 0, 1, 0, 1, 1, 1, 0, 1,
        0, 0, 0, 0, 0, 0, 0, 1, 1, 0, 1, 0, 0, 0, 1, 1, 0, 0, 1, 1, 0, 1, 1, 1,
        0, 1, 0, 1, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 1])
total 64
correct 34
max tensor(0.5652)
min tensor(0.4638)
predicted tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
        0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0])
labels tensor([1, 1, 1, 1, 1, 0, 1, 1, 0, 1, 0, 0, 1, 0, 1, 0, 0, 0, 0, 1, 0, 0, 1, 0,
        0, 0, 0, 0, 0, 0, 1, 1, 1, 0, 0, 1, 1, 0, 0, 1, 0, 0, 0, 0, 1, 1, 0, 1,
        1, 0, 1, 1, 1, 0, 0, 1, 1, 0, 0, 1, 0, 1, 0, 0])
to

In [230]:
# okay, so actually the net learns something
# Let's prepare features and see if our logistic regression theory model can predict some stuff...

net.eval()

# Get the features for the training set
def get_features(data_loader):
    train_features = []
    train_labels = []
    with torch.no_grad():
        for inputs, labels in train_loader:
            labels = labels.reshape(-1)
            labels = labels.type(torch.LongTensor)
            inputs, labels = inputs.to(device), labels.to(device)
            # obtain the pre-acivation features
            pre_activation = net.almost_forward(inputs)
            # print("pre_activation", pre_activation.shape)
            train_features.append(pre_activation)
            train_labels.append(labels)
    return torch.cat(train_features).numpy(), torch.cat(train_labels).numpy()

train_features, train_labels = get_features(train_loader)
test_features, test_labels = get_features(test_loader)

In [231]:
train_features.shape

(12000, 784)

In [232]:
ntot = train_features.shape[0]
Omega = X_train.T @ X_train / ntot # student-student
rho = y_train.dot(y_train) / ntot
spec_Omega, U = np.linalg.eigh(Omega)
diagUtPhiPhitU = np.diag(1/ntot * U.T @ X_train.T @ y_train.reshape(ntot,1) @ y_train.reshape(1,ntot) @ X_train @ U)


# Creata a data json 
data = {
    "X_train": list(train_features),
    "y_train": list(train_labels),
    "X_test": list(test_features),
    "y_test": list(test_labels),
    "Omega": Omega,
    "rho": rho,
    "spec_Omega": spec_Omega,
    "diagUtPhiPhitU": diagUtPhiPhitU
}

In [233]:
# pickle data into a file called data/neural_fashion_mnist.pkl
with open('data/neural_fashion_mnist.pkl', 'wb') as f:
    pickle.dump(data, f)

In [221]:
print(net)

MyNeuralNetwork(
  (input_layer): Linear(in_features=784, out_features=1000, bias=False)
  (hidden3): Linear(in_features=1000, out_features=30, bias=False)
  (output_layer): Linear(in_features=30, out_features=1, bias=False)
  (sigmoid): Sigmoid()
)


In [222]:
net.hidden3.weight.shape

torch.Size([30, 1000])

In [223]:
net.output_layer.weight.shape

torch.Size([1, 30])

In [224]:
net.output_layer.weight

Parameter containing:
tensor([[-0.1571,  0.0267, -0.1033, -0.0523, -0.0406, -0.0783,  0.0081,  0.1119,
          0.1114,  0.1510,  0.1318,  0.0194,  0.1265, -0.0053, -0.0883,  0.0053,
         -0.1539,  0.0190, -0.0094, -0.0007, -0.0941,  0.0087,  0.1087, -0.0591,
         -0.0703, -0.0339,  0.1623,  0.0011, -0.1586, -0.1684]],
       requires_grad=True)