In [None]:
# DBN AVEC TENSORFLOW

In [10]:
import numpy as np
import tensorflow as tf

def sigmoid(z):
    return 1 / (1 + np.exp(-z))

def DBN(epoch, test_data, rbm_list,  lr=0.001):
    # Compilation du DBN
    
    # Convertir les données en type variable pour faciliter le  "gradient tracking"
    RBM1_w = tf.Variable(tf.convert_to_tensor(rbm_list[0].weights))    
    # Convertir les données en type variable pour faciliter le  "gradient tracking"
    RBM1_vb = tf.Variable(tf.convert_to_tensor(rbm_list[0].bias_a))     
    # Convertir les données en type variable pour faciliter le  "gradient tracking"
    RBM1_hb = tf.Variable(tf.convert_to_tensor(rbm_list[0].bias_b))     
    
    # Convertir les données en type variable pour faciliter le  "gradient tracking"
    RBM1to2_w = tf.Variable(tf.random.normal([6, 6],dtype=tf.float64))  

    # Convertir les données en type variable pour faciliter le  "gradient tracking"
    RMB2_w = tf.Variable(tf.convert_to_tensor(rbm_list[1].weights))     
    # Convertir les données en type variable pour faciliter le  "gradient tracking"
    RBM2_vb = tf.Variable(tf.convert_to_tensor(rbm_list[1].bias_a))     
    # Convertir les données en type variable pour faciliter le  "gradient tracking"
    RBM2_hb = tf.Variable(tf.convert_to_tensor(rbm_list[1].bias_b))     

    # Convertir les données en type variable pour faciliter le  "gradient tracking"
    BP_w = tf.Variable(tf.random.normal([6,6],dtype=tf.float64))        
    # Convertir les données en type variable pour faciliter le  "gradient tracking"
    BP_b = tf.Variable(tf.random.normal([6],dtype=tf.float64))          

    test_data = tf.Variable(tf.convert_to_tensor(test_data,dtype=tf.float64))

    # Affiner le nombre des itérations
    for step in range(epoch):
        # Calculaler le "forward propagation" du batch courant
        with tf.GradientTape() as tape:
            # Forward propagation
            # Première couche : v-h
            out1 = tf.nn.sigmoid(RBM1_hb + test_data @ RBM1_w)
            # 2ème couche : h-v
            out2 = tf.nn.sigmoid(RBM2_vb + out1 @ RBM1to2_w)
            # 3ème couche : v-h
            out3 = tf.nn.sigmoid(RBM2_hb + out2 @ RMB2_w)
            out = tf.nn.relu(BP_b + out3 @ BP_w)
            # Backpropagation
            # Calculer la fonction de perte : loss function
            loss = tf.reduce_mean(tf.square(test_data - out))   # Mean square error loss function
            # Mise-à-jour des gradients (Manual gradient update parameters)
            grads = tape.gradient(loss, [RBM1_w, RBM1_vb, RBM1_hb,
                                         RBM1to2_w,
                                         RMB2_w, RBM2_vb, RBM2_hb,
                                         BP_b,BP_w])
            # Mise-à-jour des paramètres
            RBM1_w.assign_sub(lr * grads[0])
            # RBM1_vb = RBM1_vb - lr * grads[1]
            RBM1_hb.assign_sub(lr * grads[2])
            RBM1to2_w.assign_sub(lr * grads[3])
            RMB2_w.assign_sub(lr * grads[4])
            RBM2_vb.assign_sub(lr * grads[5])
            RBM2_hb.assign_sub(lr * grads[6])
            BP_b.assign_sub(lr * grads[7])
            BP_w.assign_sub(lr * grads[8])

            print(step,": loss : ", loss)

class RBM:
    def __init__(self, n_visible, n_hidden):
        self.n_visible = n_visible          # Nombre des noeuds de la couche visible
        self.n_hidden = n_hidden            # Nombre des noeuds de la couche cachée
        self.bias_a = np.zeros(self.n_visible)  # Décalage de la couche visible
        self.bias_b = np.zeros(self.n_hidden)  # Décalage de la couche cachée
        self.weights = np.random.normal(0, 0.01, size=(self.n_visible, self.n_hidden))  # Connection weight w
        self.n_sample = None

    def encode(self, v):
        # Encodage (calculer la probabilité conditionnelle : p(h=1|v))
        return sigmoid(self.bias_b + v @ self.weights)

    def decode(self, h):
        # Décodage (reconstruction): (calculer la probabilité conditionnelle : p(v=1|h)
        return sigmoid(self.bias_a + h @ self.weights.T)

    # gibbs sampling (Echantillonnage): retourner les valeurs de v et h après "max_cd" sampling
    def gibbs_sample(self, v0, max_cd):
        v = v0
        for _ in range(max_cd):
            # 1er Echantillonnage : each hidden layer neuron according to the input sample. Binomial distribution sampling to determine whether the neuron is activated
            ph = self.encode(v)
            h = np.random.binomial(1, ph, (self.n_sample, self.n_hidden))
            # Sample each visual layer neuron according to the value of the hidden layer neuron after sampling
            pv = self.decode(h)
            v = np.random.binomial(1, pv, (self.n_sample, self.n_visible))
        return v

    # According to the visible layer value obtained by Gibbs sampling (decoding or reconstruction), update the parameters
    def update(self, v0, v_cd, eta):
        ph = self.encode(v0)
        ph_cd = self.encode(v_cd)
        self.weights += eta * (v0.T @ ph - v_cd.T @ ph)  # Update the connection weight parameter
        self.bias_b += eta * np.mean(ph - ph_cd, axis=0)  # Update hidden layer offset b
        self.bias_a += eta * np.mean(v0 - v_cd, axis=0)  # Update the visible layer offset a
        return

    # Training function Update parameters using contrast divergence algorithm
    def fit(self, data, max_step, max_cd=2, eta=0.1):
        # data training data set
        # max_cd Sampling steps
        # max_step: Maximum number of iterations iter
        # eta: learning rate
        assert data.shape[1] == self.n_visible, "The input data dimension is not equal to the number of neurons in the visual layer"
        self.n_sample = data.shape[0]

        for i in range(max_step):
            v_cd = self.gibbs_sample(data, max_cd)
            self.update(data, v_cd, eta)
            error = np.sum((data - v_cd) ** 2) / self.n_sample / self.n_visible * 100
            if i == (max_step-1):  # Compare the reconstructed sample with the original sample to calculate the error
                print("Visual layer (hidden layer) state error ratio: {0}%".format(round(error, 2)))

    # Forecast
    def predict(self, v):
        # Input training data, predict hidden layer output
        ph = self.encode(v)[0]
        states = ph >= np.random.rand(len(ph))
        return states.astype(int)


if __name__ == '__main__':
    # Number of iterations
    iter = 100
    # Learning rate
    lr = 0.001
    # N represents the number of RBM layers that make up the DBN
    N = 2

    # Create multiple RBM layers
    rbm_model_list = []
    # Used for the connection weight of two RBMs, h-v
    for i in range(N):
        rbm_model_list.append(RBM(n_visible=6, n_hidden=6))

    # Training set data
    V = np.array([[1, 1, 1, 0, 0, 0], [1, 0, 1, 0, 0, 0], [1, 1, 1, 0, 0, 0],
                  [0, 0, 1, 1, 1, 0], [0, 0, 1, 1, 0, 0], [0, 0, 1, 1, 1, 0],
                  [1, 0, 0, 1, 1, 0], [0, 0, 0, 1, 0, 0], [0, 0, 1, 0, 1, 0]])

    # Train the parameters of each RBM layer separately
    for epoch in range(iter):   # Number of iterations
        for i in range(N):      # Each iteration train each RBM layer separately
            rbm_model_list[i].fit(V, max_step=iter, max_cd=1, eta=lr)

    # Connect two layers of RBM in series for prediction
    user = np.array([[1,0,0,1,0,0]])
    temp = rbm_model_list[0].predict(user)
    out = rbm_model_list[1].predict([temp])
    print(out)

    DBN(iter,V,rbm_model_list)


Visual layer (hidden layer) state error ratio: 33.33%
Visual layer (hidden layer) state error ratio: 46.3%
Visual layer (hidden layer) state error ratio: 33.33%
Visual layer (hidden layer) state error ratio: 53.7%
Visual layer (hidden layer) state error ratio: 38.89%
Visual layer (hidden layer) state error ratio: 44.44%
Visual layer (hidden layer) state error ratio: 48.15%
Visual layer (hidden layer) state error ratio: 38.89%
Visual layer (hidden layer) state error ratio: 42.59%
Visual layer (hidden layer) state error ratio: 33.33%
Visual layer (hidden layer) state error ratio: 42.59%
Visual layer (hidden layer) state error ratio: 37.04%
Visual layer (hidden layer) state error ratio: 42.59%
Visual layer (hidden layer) state error ratio: 40.74%
Visual layer (hidden layer) state error ratio: 37.04%
Visual layer (hidden layer) state error ratio: 33.33%
Visual layer (hidden layer) state error ratio: 38.89%
Visual layer (hidden layer) state error ratio: 38.89%
Visual layer (hidden layer) st

Visual layer (hidden layer) state error ratio: 22.22%
Visual layer (hidden layer) state error ratio: 24.07%
Visual layer (hidden layer) state error ratio: 18.52%
Visual layer (hidden layer) state error ratio: 11.11%
Visual layer (hidden layer) state error ratio: 14.81%
Visual layer (hidden layer) state error ratio: 12.96%
Visual layer (hidden layer) state error ratio: 24.07%
Visual layer (hidden layer) state error ratio: 24.07%
Visual layer (hidden layer) state error ratio: 25.93%
Visual layer (hidden layer) state error ratio: 18.52%
Visual layer (hidden layer) state error ratio: 16.67%
Visual layer (hidden layer) state error ratio: 16.67%
Visual layer (hidden layer) state error ratio: 18.52%
Visual layer (hidden layer) state error ratio: 14.81%
Visual layer (hidden layer) state error ratio: 20.37%
Visual layer (hidden layer) state error ratio: 14.81%
Visual layer (hidden layer) state error ratio: 16.67%
Visual layer (hidden layer) state error ratio: 16.67%
Visual layer (hidden layer) 

91 : loss :  tf.Tensor(0.800159545677734, shape=(), dtype=float64)
92 : loss :  tf.Tensor(0.7991031275668669, shape=(), dtype=float64)
93 : loss :  tf.Tensor(0.798048839353975, shape=(), dtype=float64)
94 : loss :  tf.Tensor(0.7969966756639687, shape=(), dtype=float64)
95 : loss :  tf.Tensor(0.7959466311415182, shape=(), dtype=float64)
96 : loss :  tf.Tensor(0.7948987004509539, shape=(), dtype=float64)
97 : loss :  tf.Tensor(0.7938528782761731, shape=(), dtype=float64)
98 : loss :  tf.Tensor(0.7928091593205431, shape=(), dtype=float64)
99 : loss :  tf.Tensor(0.791767538306805, shape=(), dtype=float64)
