# RBM in Tensorflow 2.0

## Import tensorflow

In [1]:
import tensorflow as tf
import tensorflow_probability as tfp

## Load dataset

In [40]:
import tensorflow_datasets as tfds

# Load dataset
dataset, info = tfds.load(name="mnist", split=tfds.Split.TRAIN, with_info=True)

# Build your input pipeline
dataset = dataset.shuffle(1024).batch(32).prefetch(tf.data.experimental.AUTOTUNE)

## Define cool functions alias

* $\sigma$ as sigmoid
* $X \sim Bernouli(k;p) = p^{k}(1-p)^{1-k} \text{ for } k \in \{0,1\}$

In [24]:
σ = tf.sigmoid
bernoulli_sample = lambda p, samples=(): tfp.distributions.Bernoulli(probs=p).sample(samples).cast(tf.float32)
mean = tf.math.reduce_mean

tf.Tensor.T = property(lambda self: tf.transpose(self))
tf.Variable.T = property(lambda self: tf.transpose(self))
#tf.Variable.__setitem__ = lambda self, x, y: self[x].assign(y)
tf.Tensor.reshape = lambda self, shape: tf.reshape(self, shape=shape)
tf.Tensor.cast = lambda self, dtype: tf.cast(self, dtype=dtype)
tf.Tensor.to_vector = lambda self: self.reshape(shape=(-1, 1))

## Define model parameters

The vectors are _column vectors_

In [16]:
image_shape = info.features['image'].shape

hidden_size = 100
visible_size = image_shape[0]*image_shape[1]

learning_rate = 0.1
momentum = 0
regularization = 0

W = tf.Variable(name='W', initial_value=0.01 * tf.random.normal([hidden_size, visible_size]), dtype=tf.float32)
b_h = tf.Variable(name='b_h', dtype=tf.float32, initial_value=tf.zeros([hidden_size, 1]))
b_v = tf.Variable(name='b_v', dtype=tf.float32, initial_value=tf.zeros([visible_size, 1]))

## Energy function

$$E(\boldsymbol{v}, \boldsymbol{h}) = - \boldsymbol{h}^T\boldsymbol{W}\boldsymbol{v} - \boldsymbol{v}^T\boldsymbol{b}^v - \boldsymbol{h}^T\boldsymbol{b}^h$$

In [5]:
def E(self, v, h):
    return - (h.T @ self.W @ v) - (v.T @ self.b_v) - (h.T @ self.b_h)

## Conditional probabilities

* $P(h_i = 1|\boldsymbol{v}) = \sigma(\boldsymbol{W}_{i \cdot} \boldsymbol{v} + b^h_i)$
* $P(v_j=1|\boldsymbol{h}) = \sigma(\boldsymbol{h}^T \boldsymbol{W}_{\cdot j} + b^v_j)$

In [17]:
def P_h_given_v(v):
    return σ(W @ v + b_h)

def P_v_given_h(h):
    return σ(h.T @ W + b_v.T).T

## Sampling

* $\boldsymbol{p}_h \sim P(h_i = 1|\boldsymbol{v})$
* $\boldsymbol{p}_v \sim P(v_j=1|\boldsymbol{h})$

## Learning

In [48]:
def format_image(image):
    image = image.reshape((-1, visible_size)).T  # To vector format compatible with rbm visible label
    image = image / 255                          # normalization
    image = (image > 0.5).cast(tf.float32)       # binarization
    
    return image

for features in dataset.take(1):
    image, label = features["image"], features["label"]
    break

v0 = format_image(image)

η = learning_rate
α = momentum
λ = regularization


with tf.name_scope('gibbs_chain'):
    P_h0_given_v0 = P_h_given_v(v0)
    h0 = bernoulli_sample(p=P_h0_given_v0)

    P_v1_given_h0 = P_v_given_h(h0)
    v1 = bernoulli_sample(p=P_v1_given_h0)

    P_h1_given_v1 = P_h_given_v(v1)
    h1 = bernoulli_sample(p=P_h1_given_v1)

with tf.name_scope('delta_W'):
    ΔW = η * (P_h0_given_v0 @ v0.T - P_h1_given_v1 @ v1.T)# - λ*W + α*ΔW

with tf.name_scope('delta_v_b'):
    Δb_v = η * mean(v0 - v1, axis=1).to_vector()# + α*Δb_v

with tf.name_scope('delta_h_b'):
    Δb_h = η * mean(P_h0_given_v0 - P_h1_given_v1, axis=1).to_vector()# + α*Δb_h


#self.ΔW = self.ΔW.assign(ΔW)
#self.Δb_v = self.Δb_v.assign(Δb_v)
#self.Δb_h = self.Δb_h.assign(Δb_h)

#W_update = self.W.assign(self.W + self.ΔW)
#b_v_update = self.b_v.assign(self.b_v + self.Δb_v)
#b_h_update = self.b_h.assign(self.b_h + self.Δb_h)