# Boltzmann Machine

## Setup and Context

### Introduction

Boltzmann machines are a type of stochastic or generative neural network that use probabilistic methods to learn and represent complex patterns in data. Boltzmann machine is an unsupervised deep learning model in which every node is connected to every other node and unlike the other neural networks, it is an undirected model. They also lack an output layer. Boltzmann Machines can be seen from two different points of view: An Energy-Based Model and A Probabilistic Graphical Model.

For our example, we will be making a movie recommendation system.

### Import Statements

In [1]:
import numpy as np
import pandas as pd

import torch

## Data Preprocessing

### Importing the Datasets

The dataset used is from [Kaggle](https://www.kaggle.com/datasets/prajitdatta/movielens-100k-dataset). It is a collection of 100,000 ratings from 1000 users on 1700 movies.

In [2]:
# getting dataset with details
cols = ['UserId','MovieId','Rating','TimeStamp']
dataset = pd.read_csv('./data/ml-100k/u.data', sep ='\t', header=None, names=cols)

dataset.head()

Unnamed: 0,UserId,MovieId,Rating,TimeStamp
0,196,242,3,881250949
1,186,302,3,891717742
2,22,377,1,878887116
3,244,51,2,880606923
4,166,346,1,886397596


In [3]:
# getting dataset with movies
cols = ['MovieId','Names']
movies = pd.read_csv('./data/ml-100k/u.item', sep='\|', header=None, engine='python', usecols=[0, 1], names=cols, encoding='latin-1')
movies.head()

Unnamed: 0,MovieId,Names
0,1,Toy Story (1995)
1,2,GoldenEye (1995)
2,3,Four Rooms (1995)
3,4,Get Shorty (1995)
4,5,Copycat (1995)


### Importing the Training and Test Sets

Now we split the data into training and test sets. We will be converting the training and test sets to a numpy array.

In [4]:
training_set = pd.read_csv('./data/ml-100k/u1.base', sep='\t', header=None)
test_set = pd.read_csv('./data/ml-100k/u1.test', sep='\t', header=None)

training_set = np.array(training_set, dtype = 'int')
test_set = np.array(test_set, dtype = 'int')

### Getting the Total Number of Users and Movies

We need to get the total number of users and movies. This is because we are going to convert the training and test sets into matrices where the rows are the users, the columns represent the movies and each cell is the rating of a movie by a user.

In [5]:
num_movies = int(max(max(training_set[:, 1]), max(test_set[:, 1])))
num_users = int(max(max(training_set[:, 0]), max(test_set[:, 0])))

In [6]:
num_movies

1682

In [7]:
num_users

943

### Converting the Data into a 2D Matrices

Let us create a function that would convert our training and test sets into 2D matrices. Simply put, we want to create a list of lists. Each list would be the ratings of every movie by a particular user. If a user did not rate a movie then the cell would be 0.

In [8]:
def convert_data(data):
    # Initializing a matrix with zeros to store the ratings
    new_data = np.zeros((num_users, num_movies))
    
    # Iterating through each rating in the dataset
    for i in range(len(data[:, 0])):
        # Extracting user, movie, and rating from the current row
        user = data[i, 0]
        movie = data[i, 1]
        rating = data[i, 2]
        
        # Storing the rating in the appropriate position in the matrix
        new_data[user - 1, movie - 1] = rating

    # Return the converted data matrix
    return new_data

In [9]:
training_set = convert_data(training_set)
test_set = convert_data(test_set)

In [10]:
training_set

array([[5., 3., 4., ..., 0., 0., 0.],
       [4., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       ...,
       [5., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 5., 0., ..., 0., 0., 0.]])

In [11]:
test_set

array([[0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       ...,
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.],
       [0., 0., 0., ..., 0., 0., 0.]])

### Converting to Pytorch Tensors

Now we convert our training and test set to **pytorch tensors**.

In [12]:
training_set = torch.FloatTensor(training_set)
test_set = torch.FloatTensor(test_set)

### Converting Ratings into Binary Ratings

We want to convert the ratings into binary ratings: 1 (Liked) or 0 (Disliked). No rating would be represented by -1.

In [13]:
training_set[training_set == 0] = -1
training_set[training_set == 1] = 0
training_set[training_set == 2] = 0
training_set[training_set >= 3] = 1

In [14]:
test_set[test_set == 0] = -1
test_set[test_set == 1] = 0
test_set[test_set == 2] = 0
test_set[test_set >= 3] = 1

## The Neural Network

### Creating the Neural Network Architecture

We are going to create a class for our Restricted Boltzmann Machine.

In [15]:
class RBM:
    def __init__(self, nv, nh):
        # Initialize weights and biases randomly
        self.W = torch.randn(nh, nv) # Weight matrix (nh x nv)
        self.a = torch.randn(1, nh) # Bias for hidden units (1 x nh)
        self.b = torch.randn(1, nv) # Bias for visible units (1 x nv)
 
    def sample_h(self, x):
        # Calculate the probabilities of hidden units being activated given visible units
        wx = torch.mm(x, self.W.t()) # Weighted sum of inputs to hidden units
        activation = wx + self.a.expand_as(wx) # Add bias and reshape to match dimensions
        p_h_given_v = torch.sigmoid(activation) # Sigmoid activation function
        # Sample hidden units based on these probabilities
        return p_h_given_v, torch.bernoulli(p_h_given_v)
    
    def sample_v(self, y):
        # Calculate the probabilities of visible units being activated given hidden units
        wy = torch.mm(y, self.W) # Weighted sum of inputs to visible units
        activation = wy + self.b.expand_as(wy) # Add bias and reshape to match dimensions
        p_v_given_h = torch.sigmoid(activation) # Sigmoid activation function
        # Sample visible units based on these probabilities
        return p_v_given_h, torch.bernoulli(p_v_given_h)
    
    def train(self, v0, vk, ph0, phk):
        # Update weights and biases based on Contrastive Divergence algorithm
        self.W += (torch.mm(v0.t(), ph0) - torch.mm(vk.t(), phk)).t() # Weight update
        self.a += torch.sum((ph0 - phk), 0) # Hidden bias update
        self.b += torch.sum((v0 - vk), 0) # Visible bias update

**Notes:**
- In (func. init ): weights(W) and biases(a,b) are initialized using Random Normal Distribution.
- In (func. sample_h ): activation function is created using sigmoid activation to get p_h_given_v(prob. that hidden node is 1 given the visible node)
- In (func. sample_v ): activation function is created using sigmoid activation to get p_v_given_h(prob. that visible node is 1 given the hidden node)
- In (func. train ): Weights and biases are updated to minimize the energy(it's an Energy Based Model)

where,

- a = bias for probabilty of hidden node given the visible node
- b = bias for probabilty of visible node given the hidden node
- torch.bernoulli(p_h_given_v) = bernoulli distribution of p_h_given_v (vector of 0s and 1s)
- v0 = Visible node or Input vector
- vk = Visible node after k Contrastive Divergence
- ph0 = probabilty of hidden node given the visible node (at starting)
- phk = probabilty of hidden node given the visible node (after k Contrastive Divergence)

### Initializing the Model

In [16]:
nv = len(training_set[0]) # visible nodes (nv) will be equal to input nodes (i.e movie ratings) 
nh = 100
batch_size = 100

In [17]:
rbm = RBM(nv, nh)

### Training the Model

In [18]:
num_epochs = 10
train_loss = 0
n = 0.

#for loop for epochs
for epoch in range(num_epochs):
    
    # for loop to iterate users
    for i in range(0, num_users - batch_size, batch_size):
        
        #note that at start, v0 and vk are equal
        v0 = training_set[i : i + batch_size]
        vk = training_set[i : i + batch_size]
        ph0,_ = rbm.sample_h(v0)
        
        #for loop for Contrastive Divergence
        for k in range(10):
            _,hk = rbm.sample_h(vk)
            _,vk = rbm.sample_v(hk)
            vk[v0 < 0] = v0[v0 < 0] # to ignore values less than 0 (that are unrated movies with value -1)
        phk,_ = rbm.sample_h(vk)
        rbm.train(v0,vk,ph0,phk) # performing Contrastive Divergence
        
        #calculating loss
        train_loss += torch.mean(torch.abs(v0[v0 >= 0] - vk[v0 >= 0]))
        n += 1.
    
    print(f'Epoch: {epoch + 1}/{num_epochs}\tLoss: {train_loss / n}')

Epoch: 1/10	Loss: 0.3476966321468353
Epoch: 2/10	Loss: 0.2975553274154663
Epoch: 3/10	Loss: 0.28268906474113464
Epoch: 4/10	Loss: 0.27419954538345337
Epoch: 5/10	Loss: 0.26918596029281616
Epoch: 6/10	Loss: 0.26599475741386414
Epoch: 7/10	Loss: 0.2628726065158844
Epoch: 8/10	Loss: 0.2610853314399719
Epoch: 9/10	Loss: 0.2593919634819031
Epoch: 10/10	Loss: 0.2586412727832794


### Testing the Model

In [19]:
test_loss = 0  # Initialize test loss accumulator
n = 0.  # Initialize counter for the number of users with available ratings

# Iterate over each user in the dataset
for i in range(num_users):
    v = training_set[i : i + 1]  # Get the training data for the current user
    vt = test_set[i : i + 1]  # Get the test data for the current user
    
    # Check if the current user has ratings in the test set
    if len(vt[vt >= 0]) > 0:
        # Sample the hidden units based on the visible units
        _, h = rbm.sample_h(v)
        
        # Sample the visible units based on the hidden units
        _, v = rbm.sample_v(h)
        
        # Calculate the absolute difference between predicted and true ratings
        # Only consider observations where the true rating exists (vt >= 0)
        test_loss += torch.mean(torch.abs(v[vt >= 0] - vt[vt >= 0]))
        
        # Increment the counter for the number of users with available ratings
        n += 1.

# Print the average test loss
print(f'Test Loss: {(test_loss / n).item()}')

Test Loss: 0.24148474633693695
