In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim
import torch.optim as optim

import torchvision
import torchvision.transforms as transforms
from torchvision.utils import make_grid

import matplotlib.pyplot as plt
import numpy as np


In [2]:
device = 'cuda'
batch_size = 128
transform = transforms.Compose(
    [transforms.ToTensor(),
     # transforms.Resize(16)
    ])

train_set = torchvision.datasets.KMNIST(root='./data', train=True, download=True, transform=transform)
test_set = torchvision.datasets.KMNIST(root='./data', train=False, download=True, transform=transform)

train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=0)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False, num_workers=0)



Files already downloaded and verified
Files already downloaded and verified


In [3]:
# Defining Gaussian-Bernoulli RBM
class RBM(nn.Module):
    """Restricted Boltzmann Machine template."""
    
    def __init__(self, D: int, F: int, k: int):
        """Creates an instance RBM module.
            
            Args:
                D: Size of the input data.
                F: Size of the hidden variable.
                k: Number of MCMC iterations for negative sampling.
                
            The function initializes the weight (W) and biases (c & b).
        """
        super().__init__()
        self.W = nn.Parameter(torch.randn(F, D) * 1e-2) # Initialized from Normal(mean=0.0, variance=1e-4)
        self.c = nn.Parameter(torch.zeros(D)) # Initialized as 0.0
        self.b = nn.Parameter(torch.zeros(F)) # Initilaized as 0.0
        self.k = k
    
    def sample(self, p):
        """Sample from a bernoulli distribution defined by a given parameter."""
        pass
    
    
    def sample_gaussian(self, p):
        """Sample from a bernoulli distribution defined by a given parameter."""
        pass
        
    def P_h_x(self, x):
        """Returns the conditional P(h|x).""" 
        pass
    
    def P_x_h(self, h):
        """Returns the conditional P(x|h). """
        pass
        
    def free_energy(self, x):
        """Returns the Average F(x) free energy. (Slide 11)."""
        pass
        
    def forward(self, x):
        """Generates x_negative using MCMC Gibbs sampling starting from x. 
        Your CD-k algorithm should be implemented here"""
        
        #ADD SOMETHING HERE

        
        for _ in range(self.k):
            # Complete your CD-K here

            
        return x_negative, pxh_k

In [4]:
def train(model, device, train_loader, optimizer, epoch):
    
    train_loss = 0
    model.train()
    
    for batch_idx, (data, target) in enumerate(train_loader):

        # flatten and pre-process variable
        data = data.view(data.size(0),-1).to(device) 
        mean, std = data.mean(), data.std()
        data = (data - mean)/std
        optimizer.zero_grad()
        
        #################TODO#######################
        # positive and netative phase and approximating the loss: -log(p(x))
        # Note that the computing of loss_fn can occassionally be negative
        # becasue since we used a sampling approach for estimation 
        # (slide 26)
        #############################################
        
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
        
        if (batch_idx+1) % (len(train_loader)//2) == 0:
            print('Train({})[{:.0f}%]: Loss: {:.4f}'.format(
                epoch, 100. * batch_idx / len(train_loader), train_loss/(batch_idx+1)))

def test(model, device, test_loader, epoch):
    
    model.eval()
    test_loss = 0
    
    with torch.no_grad():
        for data, target in test_loader:
            data = data.view(data.size(0),-1).to(device)
      
            mean, std = data.mean(), data.std()
            data = (data - mean)/std
            #####################FIXME############
            #Complete the CD-k process and estimate -log(p(x))
            ########################################

            test_loss += loss.item() # sum up batch loss
    
    test_loss = (test_loss*batch_size)/len(test_loader.dataset)
    print('Test({}): Loss: {:.4f}'.format(epoch, test_loss))

In [5]:
def show(img1, img2):
    npimg1 = img1.cpu().numpy()
    npimg2 = img2.cpu().numpy()
    
    fig, axes = plt.subplots(1,2, figsize=(20,10))
    axes[0].imshow(np.transpose(npimg1, (1,2,0)), interpolation='nearest')
    axes[1].imshow(np.transpose(npimg2, (1,2,0)), interpolation='nearest')
    fig.show()

In [None]:
# Sample Code 

seed = 42
num_epochs = 25
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
M = [16, 64, 256]

rbm = RBM(D=28 * 28 * 1, F=M[0], k=5).to(device)
#Keep the lr small to avoid overflow
optimizer = optim.Adam(rbm.parameters(), lr=1e-3)
print("M = {}:".format(M[0]))
for epoch in range(1, num_epochs + 1):
    train(rbm, device, train_loader, optimizer, epoch)
    test(rbm, device, test_loader, epoch)

    # reconstructing samples for plotting
    data, _ = next(iter(test_loader))
    data = data[:32]
    data_size = data.size()
    data = data.view(data.size(0), -1).to(device)
    mean, std = data.mean(), data.std()
    bdata = (data - mean) / std
    vh_k, pvh_k = rbm(bdata)
    vh_k, pvh_k = vh_k.detach(), pvh_k.detach()
    
    show(make_grid(data.reshape(data_size), padding=0), make_grid(pvh_k.reshape(data_size).clip(min=0,max=1), padding=0))
    plt.show()

    print('Optimizer Learning rate: {0:.4f}\n'.format(optimizer.param_groups[0]['lr']))
