In [8]:
import tensorflow as tf
import tensorflow_probability as tfp
import tensorflow.math as tm
import numpy as np
import random
import time
import matplotlib.pyplot as plt

from tensorflow.keras import Model
from tensorflow.keras.layers import Dense
from sklearn.metrics import accuracy_score
from sklearn.datasets import make_moons
from sklearn.model_selection import train_test_split

In [28]:
np.random.seed(1234)
X, Y = make_moons(10, noise = 0.1)

# Split into test and training data
x_train, x_test, y_train, y_test = train_test_split(X, Y, test_size=0.2, random_state=73)
y_train = np.reshape(y_train, (y_train.shape[0], 1))
y_test = np.reshape(y_test, (y_test.shape[0], 1))

train_ds = tf.data.Dataset.from_tensor_slices((x_train, y_train)).batch(1)
test_ds = tf.data.Dataset.from_tensor_slices((x_test, y_test)).batch(1)

In [60]:
# Model with Gibbs setting

class StochasticMLP(Model):
    
    def __init__(self, hidden_layer_sizes=[100], n_outputs=10):
        super(StochasticMLP, self).__init__()
        self.hidden_layer_sizes = hidden_layer_sizes
        self.fc_layers = [Dense(layer_size) for layer_size in hidden_layer_sizes]
        self.output_layer = Dense(n_outputs)
        
    def call(self, x):
        
        network = []
        
        for i, layer in enumerate(self.fc_layers):
            
            logits = layer(x)
            x = tfp.distributions.Bernoulli(logits=logits).sample()
            network.append(x)

        final_logits = self.output_layer(x) # initial the weight of output layer
            
        return network
    
    def generate_new_layer_prob(self, in_layer, out_layer, pv, cv, nv):
        
        prob_parents = tm.sigmoid(in_layer(pv))
            
        out_layer_weights = out_layer.get_weights()[0]
            
        next_logits = out_layer(cv)
            
        # if h1 node is a 1, subtract its weight
        next_logits_if_node_is_0 = next_logits[:, tf.newaxis, :] - cv[:, :, np.newaxis] * out_layer_weights[tf.newaxis, :, :]
        
        # if h1 node is a 0, add its weight
        next_logits_if_node_is_1 = next_logits[:, tf.newaxis, :] + (1 - cv[:, :, np.newaxis]) * out_layer_weights[tf.newaxis, :, :]
             
        nv_tiled = tf.cast(np.tile(nv[:, np.newaxis, :], (1, cv.shape[-1], 1)), dtype=tf.float32)
                
        logprob_children_if_node_is_0 = tf.reduce_sum(tf.nn.sigmoid_cross_entropy_with_logits(
            labels=nv_tiled, logits=next_logits_if_node_is_0), axis=-1)

        logprob_children_if_node_is_1  = tf.reduce_sum(tf.nn.sigmoid_cross_entropy_with_logits(
            labels=nv_tiled, logits=next_logits_if_node_is_1), axis=-1)
            
        prob_0 = (1 - prob_parents) * tm.exp(logprob_children_if_node_is_0)
        prob_1 = prob_parents * tm.exp(logprob_children_if_node_is_1)
        
        prob = prob_1 / (prob_1 + prob_0)
        
        return prob
        
    
    def gibbs_new_state(self, x, h, y):
        
        '''
        generate a new state for the whole network in real Gibbs setting
        '''
        
        h_current = h
        h_current = [tf.cast(h_i, dtype=tf.float32) for h_i in h_current]
        
        in_layers = self.fc_layers
        out_layers = self.fc_layers[1:] + [self.output_layer]
        
        prev_vals = [x] + h_current[:-1]
        curr_vals = h_current
        next_vals = h_current[1:] + [y]
        
        h_new = []
        
        for i, (layer_size, in_layer, out_layer) in enumerate(zip(self.hidden_layer_sizes, in_layers, out_layers)):
            
            prob = self.generate_new_layer_prob(in_layer, out_layer, prev_vals[i], curr_vals[i], next_vals[i])
            new_state = []
            
            for j in range(layer_size):
                
                new_node_state = tfp.distributions.Bernoulli(probs=prob[0][j]).sample()
                new_state.append(new_node_state)
                
                print(new_node_state, curr_vals[i][0][j])
                if new_node_state != curr_vals[i][0][j].numpy(): # the node is flipped
                    
                    # change prev, curr and next
                    curr_vals[i][0][j] = new_node_state.numpy()
                    
                    if i < len(curr_vals) - 1:
                        prev_vals[i + 1][0][j] = new_node_state.numpy()
                    
                    if i > 0:
                        next_vals[i - 1][0][j] = new_node_state.numpy()
                        
                    # recalculate the prob
                    prob = self.generate_new_layer_prob(i, in_layer, out_layer, prev_vals[i], curr_vals[i], next_vals[i])
            
            h_new.append(new_state)
            
        return h_new
                
    

In [61]:
model = StochasticMLP(hidden_layer_sizes = [5], n_outputs=1)
network = [model.call(x) for x, y in train_ds]

In [62]:
network

[[<tf.Tensor: shape=(1, 5), dtype=int32, numpy=array([[1, 0, 1, 1, 0]], dtype=int32)>],
 [<tf.Tensor: shape=(1, 5), dtype=int32, numpy=array([[0, 1, 1, 0, 1]], dtype=int32)>],
 [<tf.Tensor: shape=(1, 5), dtype=int32, numpy=array([[1, 1, 1, 1, 1]], dtype=int32)>],
 [<tf.Tensor: shape=(1, 5), dtype=int32, numpy=array([[0, 1, 0, 1, 0]], dtype=int32)>],
 [<tf.Tensor: shape=(1, 5), dtype=int32, numpy=array([[0, 1, 1, 0, 1]], dtype=int32)>],
 [<tf.Tensor: shape=(1, 5), dtype=int32, numpy=array([[0, 0, 0, 0, 0]], dtype=int32)>],
 [<tf.Tensor: shape=(1, 5), dtype=int32, numpy=array([[1, 1, 1, 0, 1]], dtype=int32)>],
 [<tf.Tensor: shape=(1, 5), dtype=int32, numpy=array([[1, 0, 0, 1, 0]], dtype=int32)>]]

In [63]:
network_new = [model.gibbs_new_state(images, net, labels) for (images, labels), net in zip(train_ds, network)]

tf.Tensor(0, shape=(), dtype=int32) tf.Tensor(1.0, shape=(), dtype=float32)


TypeError: 'tensorflow.python.framework.ops.EagerTensor' object does not support item assignment

In [49]:
a=[[1,2,3],[4,5,6]]
a[1][1]

5