In [141]:
import numpy as np
import torch
import torch.utils.data
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.autograd import Variable
from torchvision import datasets, transforms
from torchvision.utils import make_grid , save_image

# Load data

In [142]:
batch_size = 64
train_loader = torch.utils.data.DataLoader(
    datasets.MNIST('../data', train=True, download=False,
                   transform=transforms.Compose([
                       transforms.ToTensor()
                   ])),
    batch_size=batch_size)

test_loader = torch.utils.data.DataLoader(
    datasets.MNIST('../data', train=False, transform=transforms.Compose([
                       transforms.ToTensor()
                   ])),
    batch_size=batch_size)

# Train

In [143]:
class RBM(nn.Module):
    def __init__(self,
                 n_vis=784,
                 n_hin=625,
                 k=5):
        super(RBM, self).__init__()
        self.W = nn.Parameter(torch.randn(n_hin,n_vis)*1e-2)
        self.v_bias = nn.Parameter(torch.zeros(n_vis))
        self.h_bias = nn.Parameter(torch.zeros(n_hin))
        self.k = k
    
    def sample_from_p(self,p):
        return F.relu(torch.sign(p - Variable(torch.rand(p.size()))))
    
    def v_to_h(self,v):
        p_h = F.sigmoid(F.linear(v,self.W,self.h_bias))
        sample_h = self.sample_from_p(p_h)
        return p_h,sample_h
    
    def h_to_v(self,h):
        p_v = F.sigmoid(F.linear(h,self.W.t(),self.v_bias))
        sample_v = self.sample_from_p(p_v)
        return p_v,sample_v
        
    def forward(self,v):
        pre_h1,h1 = self.v_to_h(v)
        
        h_ = h1
        for _ in range(self.k):
            pre_v_,v_ = self.h_to_v(h_)
            pre_h_,h_ = self.v_to_h(v_)
        
        return v,v_, h_
    
    def free_energy(self,v):
        vbias_term = v.mv(self.v_bias)
        wx_b = F.linear(v,self.W,self.h_bias)
        hidden_term = wx_b.exp().add(1).log().sum(1)
        return (-hidden_term - vbias_term).mean()

# Adding target at end

In [144]:
input_data=[]
target_data=[]

for _, (data,target) in enumerate(train_loader):
        data =data.view(-1,784)
        data=data.numpy()
        input_data.append(data)
        
        target=target.view(-1)
        target=target.numpy()
        target_data.append(target)

In [145]:
n=np.array(target_data[0])
x=torch.from_numpy(n)

for i in range(1,938):
    y=np.array(target_data[i])
    y=torch.from_numpy(y)
    x=torch.cat((x, y),0)
    

In [146]:
full_input=np.array(input_data[0])
full_input=torch.Tensor(full_input)

for i in range(1,938):
    intermed=np.array(input_data[i])
    intermed=torch.Tensor(intermed)
    
    full_input=torch.cat((full_input, intermed),0)

In [147]:
x=x.numpy()

target_array=np.zeros((60000,10))

count=0
for n in x:
    target_array[count][n]=1
    count+=1

target_array=torch.Tensor(target_array)

In [148]:
final_training_data=torch.cat((full_input, target_array), 1)

# Training

In [149]:
rbm = RBM(k=5, n_vis=794)
train_op = optim.SGD(rbm.parameters(),0.1)

In [150]:
batch_size_=64

for epoch in range(5):
    loss_ = []
    reconstruction_error=0
    s=0
    for n in range(0, len(final_training_data)- batch_size_, batch_size_):
        sample_data=final_training_data[n:n+batch_size_]
        sample_data=Variable(sample_data)
        sample_data = sample_data.bernoulli()
        v,v1,h1 = rbm(sample_data)
        
        loss = rbm.free_energy(v) - rbm.free_energy(v1)
        loss_.append(loss.data[0])
        train_op.zero_grad()
        loss.backward()
        train_op.step()
        reconstruction_error+=torch.mean(torch.abs(v-v1))
        s+=1
    print (' loss: ' + str(reconstruction_error/s))     
    print (np.mean(loss_))

 loss: Variable containing:
 0.1178
[torch.FloatTensor of size 1]

-2.07829307467
 loss: Variable containing:
1.00000e-02 *
  8.6326
[torch.FloatTensor of size 1]

-5.30928220311
 loss: Variable containing:
1.00000e-02 *
  7.9327
[torch.FloatTensor of size 1]

-3.39327485075
 loss: Variable containing:
1.00000e-02 *
  7.5343
[torch.FloatTensor of size 1]

-2.19898004847
 loss: Variable containing:
1.00000e-02 *
  7.2774
[torch.FloatTensor of size 1]

-1.3512001974


# Create Test Subset

In [151]:
input_data=[]

for _, (data,target) in enumerate(test_loader):
        data =data.view(-1,784)
        data=data.numpy()
        input_data.append(data)
        



In [152]:
full_input=np.array(input_data[0])
full_input=torch.Tensor(full_input)

for i in range(1,157):
    intermed=np.array(input_data[i])
    intermed=torch.Tensor(intermed)
    
    full_input=torch.cat((full_input, intermed),0)      

In [153]:
len(full_input)

10000

In [154]:
test_dummy=np.zeros((len(full_input), 10))
test_dummy=torch.Tensor(test_dummy)

In [155]:
test_set=torch.cat((full_input, test_dummy), 1)

# Compress Weights

In [180]:
weights=rbm.W
weights=weights.data.numpy()


x=weights.reshape(496250)

In [157]:
savedweights=weights

In [242]:
x=savedweights.reshape(496250)

# Prune
i = []

for w in x:
    if w < 0.1 and w > -0.1:
        w=0
        
    else:
        w=w
    i.append(w)

i=np.array(i).reshape(625, 794)

w=torch.Tensor(i)


o=torch.nn.Parameter(w)
rbm.W=o

In [243]:
# quantise

q=[]

for w in x:
    if w > 0.15:
        w=float(1)
    elif w<-0.15:
        w=float(-1)
    else:
        w=float(0)
    q.append(w)
    
q=np.array(q).reshape(625, 794)
w=torch.Tensor(q)
w=torch.nn.Parameter(w)
rbm.W=w



In [260]:
w

Parameter containing:
    0     0     0  ...      0     1    -1
    0     0     0  ...      0    -1    -1
    0     0     0  ...      1    -1    -1
       ...          ⋱          ...       
    0     0     0  ...      0     0    -1
    0     0     0  ...     -1     1    -1
    0     0     0  ...     -1    -1     0
[torch.FloatTensor of size 625x794]

# Testing it

In [244]:
output=[]


test_loss = 0
s=0
for n in range(0,len(test_set)):
    sample_data=Variable(test_set)
    sample_data = sample_data[n:n+1]

    v,v1,h1 = rbm(sample_data)
    test_loss+=torch.mean(torch.abs(v-v1))
    s+=1
    output.append(v1)
print (' loss: ' + str(test_loss/s)) 

 loss: Variable containing:
1.00000e-02 *
  8.6283
[torch.FloatTensor of size 1]



In [245]:
target_data=[]

for _, (data,target) in enumerate(test_loader):
    target=target.view(-1)
    target=target.numpy()
    target_data.append(target)

n=np.array(target_data[0])
target_1=torch.from_numpy(n)

for i in range(1,157):
    y=np.array(target_data[i])
    y=torch.from_numpy(y)
    target_1=torch.cat((target_1, y),0)

In [246]:
target_1=target_1.numpy()

In [247]:
output_array=[]

for n in range(0, 10000):
    output_n=output[n][0].data.numpy()
    output_array.append(output_n)
    
output_array=np.array(output_array)

In [248]:
digits=[]

for n in range(0, 10000):
    digit=output_array[n][784:]
    digits.append(digit)

# Get Class

In [249]:
import pandas as pd

In [250]:
def getdigit(outputarray):
    results=[]
    index=[]
    for i in range(0,len(outputarray)):
        for n in range(0, 10):
            if outputarray[i][n]==1:
                results.append(int(n))
                index.append(int(i))
        if all(outputarray[i][0:10]==0):
            results.append(int(0))
            index.append(int(i))
   # results=np.array(results)
    #index=np.array(index)
    #total=np.stack((index, results), axis=0)
    total=list(zip(index,results))
    return total

In [251]:
output_digits=getdigit(digits)
#output_digits=results

In [252]:
classification=[]
for i, x in enumerate(output_digits):
    classification.append(x)

In [253]:
classification=np.array(classification)

In [254]:
df_class=pd.DataFrame(classification)

In [255]:
final_class=df_class.drop_duplicates(subset=0, keep='first')

In [256]:
final_class=final_class.reset_index()

In [257]:
correct=0

for i in range(0, 10000):
    if target_1[i]==final_class[1][i]:
        correct+=1
        
accuracy=correct/len(target_1)

In [258]:
accuracy

0.7683

In [259]:
correct

7683