In [1]:
# Importing neccassary torch libraries
import torch
import torch.nn.functional as F
import torch.nn as nn
import torch.utils.data
import torch.optim as optim

# numpy for numerical operations and matplotlib for plotting the image
import matplotlib.pyplot as plt
import numpy as np

# libraries for handling datasets, image transformations, and visualizing data.
from torchvision import datasets
from torchvision import transforms
from torchvision.utils import make_grid

In [2]:
class BM(nn.Module): # Here i am defining BM class from nn.Module, which the base class for all neural network modules in PyTorch
    def __init__(self, n_visible, n_hidden):
        super(BM, self).__init__()
        self.n_visible = n_visible
        self.n_hidden = n_hidden

       # Initialize weights and biases
        self.W = nn.Parameter(torch.randn(n_visible, n_hidden) * 0.1)  # inter-layer weights
        self.Wvv = nn.Parameter(torch.randn(n_visible, n_visible) * 0.1)  # Visible-visible weights
        self.Whh = nn.Parameter(torch.randn(n_hidden, n_hidden) * 0.1)  # Hidden-hidden weights
        self.b_visible = nn.Parameter(torch.zeros(n_visible))  # visible biases
        self.b_hidden = nn.Parameter(torch.zeros(n_hidden))  # hidden biases

    def sample_hidden(self, visible):
        # Computing the activations of the hidden units given the visible units.

        activation = torch.matmul(visible, self.W) + self.b_hidden
        p_hidden = torch.sigmoid(activation)
        sampled_hidden = torch.bernoulli(p_hidden)

        return sampled_hidden

    def sample_visible(self, hidden):
        # Computing the activations of the visible units given the hidden units.

        activation = torch.matmul(hidden, self.W.t()) + self.b_visible
        p_visible = torch.sigmoid(activation)
        sampled_visible = torch.bernoulli(p_visible)
        return sampled_visible

    def energy(self, visible, hidden):
        # Computing the energy of the current configuration of visible and hidden units.

        batch_size = visible.shape[0]
        # Computing the interaction terms
        energy = -torch.sum(torch.matmul(visible, self.W) * hidden, dim=1)
        energy -= 0.5 * torch.sum(torch.matmul(visible, self.Wvv) * visible, dim=1)
        energy -= 0.5 * torch.sum(torch.matmul(hidden, self.Whh) * hidden, dim=1)

        # Computing the bias terms
        energy -= torch.sum(visible * self.b_visible, dim=1)
        energy -= torch.sum(hidden * self.b_hidden, dim=1)
        return energy.mean()  # Returning the average energy over the batch

    def forward(self, visible):
        # Performing a forward pass to compute the activations of the hidden units.
        hidden = self.sample_hidden(visible)
        visible_gibbs = self.sample_visible(hidden)
        return visible, visible_gibbs

In [12]:
def calculate_mse(original, reconstructed):
    return ((original - reconstructed) ** 2).mean()


def train(bm, train_loader, optimizer, epochs, device):
    # Training BM on MNIST datset

    bm.to(device)  # Move the model to the device (CPU/GPU)

    for epoch in range(epochs):
        total_mse = 0
        for batch, (data, _) in enumerate(train_loader):
            data = data.view(-1, 784).to(device)
            data = torch.bernoulli(data)

            # Performing CD training
            hidden = bm.sample_hidden(data)
            visible_recon = bm.sample_visible(hidden)

            # Calculating reconstruction error
            mse = calculate_mse(data, visible_recon)
            total_mse += mse.item()

            hidden_recon = bm.sample_hidden(visible_recon)
            positive_grad = bm.energy(data, hidden)
            negative_grad = bm.energy(visible_recon, hidden_recon)

            # Computing the gradients and updating the model parameters
            loss = positive_grad - negative_grad
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        # Average MSE over all batches
        average_mse = total_mse / len(train_loader)
        print(f"Epoch: {epoch+1}/{epochs}, Avg MSE: {average_mse:.2f}")

In [13]:
# Addinitional function to save and show image while executing
def show_and_save(img, file_name):
    pic = np.transpose(img.cpu().numpy(), (1, 2, 0))
    f = "./%s.png" % file_name
    plt.imshow(pic, cmap='gray')
    plt.imsave(f, pic)

In [14]:
# HYPER-PARAMETERS (experimented with various hyperparameters below)
batch_size = 40
num_epochs = 5
learning_rate = 0.001
num_hidden = 500
num_visible = 784

In [15]:
# This code initializes a DataLoader for the MNIST dataset, downloading it if necessary,
# onverting images to tensors, and organizing them into batches of a specified size for training. Saving results in ./ouput file
train_loader = torch.utils.data.DataLoader(
    datasets.MNIST('./output', train=True, download=True, transform=transforms.Compose([ transforms.ToTensor() ])),
    batch_size=batch_size
)

In [16]:
model = BM(num_visible, num_hidden)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [None]:
# Train the Boltzmann Machine (using CUDA-GPU in google colab if available)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
train(model, train_loader, optimizer, num_epochs, device=device)

In [18]:
model.to(device)
images = next(iter(train_loader))[0]
images = images.float()
images = images.view(-1, 784)  # Reshape the input images to (batch_size, 784)
images = images.to(device)
v, v_gibbs = model.forward(images)

In [None]:
# Saving and showing original image
show_and_save(make_grid(v.view(batch_size, 1, 28, 28).data), 'output/original_images')

In [None]:
# Saving and showing generated image by RBM
show_and_save(make_grid(v_gibbs.view(batch_size, 1, 28, 28).data), 'output/generated_image')