## CNN Classifier for MNIST

In [69]:
import torch
import torch.nn.functional as F
import torchvision
import torchvision.transforms as transforms

class CNN_classifier(torch.nn.Module):

    def __init__(self, L, num_class):
        super(CNN_classifier, self).__init__()

        self.conv = torch.nn.Conv2d(in_channels = 1, out_channels = 5, kernel_size = 3, stride = 1, padding = 1, bias=False)
        self.pool = torch.nn.Identity()
        #self.pool = torch.nn.MaxPool2d( kernel_size = 2)

        self._dim_to_linear = None
        self._get_flattened_conv_output_size(L)

        self.fc = torch.nn.Linear(self._dim_to_linear, num_class)

    def _get_flattened_conv_output_size(self, L):
        """Pass a dummy tensor through conv layers to compute FC layer size"""
        with torch.no_grad():
            # Pass a dummy input with shape (1, 1, L, L) to match the Conv2d input requirements
            # conv layers expect (batch_size, num_channels, ...data dimension... )
            dummy_input = torch.randn(1, 1, L, L)
            dummy_output = self.pool(self.conv(dummy_input))
            self._dim_to_linear = dummy_output.numel()  # Compute feature count
    
    def forward(self, x):
        x = F.relu(self.pool(self.conv(x)))
        x = torch.flatten(x, start_dim = 1)  # Flatten from the first non-batch dimension
        x = self.fc(x)
        return F.log_softmax(x, dim=1)  

In [70]:
# Load MNIST dataset
train_data = torchvision.datasets.MNIST(root='./data', train=True, transform=transforms.ToTensor(), download=True)
test_data = torchvision.datasets.MNIST(root='./data', train=False, transform=transforms.ToTensor(), download=True)

# Create data loaders
train_loader = torch.utils.data.DataLoader(train_data, batch_size=64, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=64, shuffle=False)

image, label = train_data[0]
L = image.shape[1]
num_class = len(set([label for image, label in train_data]))

In [71]:
model = CNN_classifier( L = L, num_class = num_class)

criterion = torch.nn.NLLLoss()
optimizer = torch.optim.Adam(model.parameters(), lr = 0.001)
num_epochs = 10

model.train()
for epoch in range(num_epochs):
    running_loss, correct, total = 0.0, 0, 0
    for images, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        # Calculate accuracy
        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    # Print epoch results
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader):.4f}, Accuracy: {100 * correct / total:.2f}%")


Epoch 1/10, Loss: 0.2949, Accuracy: 92.28%
Epoch 2/10, Loss: 0.1121, Accuracy: 96.91%
Epoch 3/10, Loss: 0.0853, Accuracy: 97.53%
Epoch 4/10, Loss: 0.0720, Accuracy: 97.90%
Epoch 5/10, Loss: 0.0630, Accuracy: 98.09%
Epoch 6/10, Loss: 0.0558, Accuracy: 98.36%
Epoch 7/10, Loss: 0.0512, Accuracy: 98.48%
Epoch 8/10, Loss: 0.0465, Accuracy: 98.61%
Epoch 9/10, Loss: 0.0425, Accuracy: 98.69%
Epoch 10/10, Loss: 0.0391, Accuracy: 98.80%


In [None]:
model.eval()
num_data = len(test_loader.dataset)
num_batches = len(test_loader)

with torch.no_grad():
    test_loss, correct = 0.0, 0
    for images, labels in test_loader:
        outputs = model(images)

        test_loss += criterion(outputs, labels)
        _, predictions = torch.max(outputs, 1)
        correct += (predictions == labels).sum().item()
    
    accuracy = 100*correct/num_data
    average_loss = test_loss/num_batches

print(f"Test Loss: {average_loss:.4f}, Test Accuracy: {accuracy:.2f}%")            

Test Loss: 0.0693, Test Accuracy: 97.93%


## CNN Classifier as a thermometer for the Ising model 

In [None]:
#---- background packages
import numpy as np, pandas as pd, matplotlib.pyplot as plt, torch
from torch.utils.data import DataLoader, TensorDataset
import torch.nn.functional as F
from time import time

#---- parallel computation packages 
from joblib import Parallel, delayed
import multiprocessing
from functools import partial
num_cores = multiprocessing.cpu_count()

def hotstart(L): 
    ''' hotstart(L)
    LxL Matrix of random spin '''
    return np.random.choice([-1,1],size=(L,L))

def Metropolis_update(Latt, beta, N_burnin, N_measure, N_separation ):
    H = 0
    J = 1
    L = len(Latt)
    m = np.sum(Latt) # the magnetization
    
    e = 0
    for i in range(L):
        for j in range(L):
            # move through lattice and compute s_i * neighbors
            # this double counts bounds, so divide by 2 afterwards
            e += -J*Latt[i, j] * (Latt[(i+1)%L, j] + Latt[(i-1)%L, j]
                                  + Latt[i, (j+1)%L]+ Latt[i, (j-1)%L])
    e = e/2 - H*m
    
    # Perform the burnin stage, don't log the observables traces
    for s in range(N_burnin):
        # choose a random spin to flip - convince youself of the dE expression, do it by hand for a small lattice
        i, j = np.random.randint(0, L, 2)
        de = -2 * (-J) * Latt[i, j] * (Latt[(i+1)%L, j] + Latt[(i-1)%L, j]
                                            + Latt[i, (j+1)%L]+ Latt[i, (j-1)%L])
        de = de + 2*H*Latt[i,j]

        if np.random.rand() < np.exp(-beta*de):
            Latt[i,j] *= -1 # flip the spin if random # < exp(-\beta de)
            e += de
            m += 2*Latt[i,j]
    
    trace = np.zeros((N_measure,2)) # trace of energy and magnetization
    trace_Latt = []
    
    # Perform the measurement stage, log observables only every N_separation steps to obtain N_measure samples
    for s in range(N_measure):
        for sep in range(N_separation):
            # choose a random spin to flip - convince youself of the dE expression, do it by hand for a small lattice
            i, j = np.random.randint(0, L, 2)
            de = -2 * (-J) * Latt[i, j] * (Latt[(i+1)%L, j] + Latt[(i-1)%L, j]
                                                + Latt[i, (j+1)%L]+ Latt[i, (j-1)%L])
            de = de + 2*H*Latt[i,j]

            if np.random.rand() < np.exp(-beta*de):
                Latt[i,j] *= -1 # flip the spin if random # < exp(-\beta de)
                e += de
                m += 2*Latt[i,j]

        trace[s] = [m,e]
        trace_Latt.append(Latt.copy())
        ## If appending just Latt, the list will contain only "Latt" which is updated every step!!!
    
    return trace, trace_Latt

#---- Ising Monte Carlo data generation
#---- generates 100 roughly decorrelated spin configurations per temperature

L = 8

N_sites = L**2
N_burnin = 100*N_sites
N_measure = 10**2
N_separation = 50*N_sites

Nt = num_cores*5
temperature_range = np.linspace(2, 2.8, Nt)
temperature_labels = []

for t in temperature_range:
    for n in range(N_measure):
        temperature_labels.append(t)
temperature_labels = np.array(temperature_labels).reshape((-1,1))

t0 = time()

innerm_ = partial(Metropolis_update,  N_burnin = N_burnin, N_measure = N_measure, N_separation = N_separation )
output = Parallel(n_jobs = num_cores)(delayed(innerm_)(hotstart(L), 1/t) for t in temperature_range)

spin_configurations = np.array([out[1] for out in output])
spin_configurations = spin_configurations.reshape((-1, 1, L, L))

t1 = time()
m, s = divmod(t1-t0, 60)
h, m = divmod(m, 60)
print(f"Computation time = {h} hours, {m} minutes, {s} seconds")

spin_configurations = torch.tensor(spin_configurations, dtype=torch.float32)
temperature_labels = torch.tensor(np.digitize(temperature_labels.flatten(), temperature_range) - 1)

train_dataset = TensorDataset(spin_configurations, temperature_labels)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

In [None]:
class CNN_thermometer(torch.nn.Module):

    def __init__(self, L, num_labels):
        super(CNN_thermometer, self).__init__()

        self.conv = torch.nn.Conv2d(in_channels = 1, out_channels = 5, kernel_size = 3, stride = 1, padding = 1, bias=False)
        #self.pool = torch.nn.MaxPool2d(2)
        self.pool = torch.nn.Identity()

        self._dim_to_linear = None
        self._get_flattened_conv_output_size(L)

        self.fc = torch.nn.Linear(self._dim_to_linear, num_labels)

    def _get_flattened_conv_output_size(self, L):
        """Pass a dummy tensor through conv layers to compute FC layer size"""
        with torch.no_grad():
            # Pass a dummy input with shape (1, 1, L, L) to match the Conv2d input requirements
            # conv layers expect (batch_size, num_channels, ...data dimension... )
            dummy_input = torch.randn(1, 1, L, L)
            dummy_output = self.pool(self.conv(dummy_input))
            self._dim_to_linear = dummy_output.numel()  # Compute feature count
    
    def forward(self, x):
        x = F.relu(self.pool(self.conv(x)))
        x = torch.flatten(x, start_dim = 1)  # Flatten from the first non-batch dimension
        x = self.fc(x)
        return F.log_softmax(x, dim=1)  # Apply softmax across the feature dimension
        #return x


model = CNN_thermometer( L = L, num_labels = Nt)

criterion = torch.nn.NLLLoss()

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)  
num_epochs = 10**4 

for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    running_loss, correct, total = 0.0, 0, 0
    
    for inputs, labels in train_loader:
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        # Calculate accuracy
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    # Print epoch results
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader):.4f}, Accuracy: {100 * correct / total:.2f}%")


In [None]:
# Extract the weights of the fully connected (FC) layer
weights = model.fc.weight.data.numpy()

# Get the number of output nodes (temperature labels) and hidden neurons (y-axis)
num_labels = weights.shape[0]  # Number of output nodes (temperature labels)
num_hidden_neurons = weights.shape[1]  # Number of hidden neurons in the linear layer

# Plot the heatmap with output nodes on the x-axis and hidden neurons on the y-axis
fig, ax = plt.subplots(figsize=(12, 8))
cax = ax.imshow(np.transpose(weights), cmap='hot', aspect='auto')

# # Set axis labels
ax.set_xlabel("Output Nodes (Temperature Labels)")
ax.set_ylabel("Hidden Neurons")

# Set x-axis ticks to correspond to the output nodes (temperature labels) with an increment of 10
x_ticks = np.arange(0, num_labels, 10)
ax.set_xticks(x_ticks)
ax.set_xticklabels(x_ticks)

# Set y-axis ticks to correspond to the hidden neurons with an increment of 10
y_ticks = np.arange(0, num_hidden_neurons, num_hidden_neurons // 10)
ax.set_yticks(y_ticks)
ax.set_yticklabels(y_ticks)

# Add a color bar to show the intensity scale
fig.colorbar(cax)

plt.title("Heatmap of Weights from the Fully Connected Layer")
plt.tight_layout()
plt.show()
