In [1]:
import tensorflow as tf
import tensorflow_probability as tfp
from tensorflow import keras
from tensorflow.keras import layers
from tensorflow.keras import regularizers
from sklearn.datasets import make_moons
import numpy as np
import matplotlib.pyplot as plt
import tensorflow_probability as tfp
import scipy

In [2]:
# class KL_Divergence(Loss):
#     def call(self, y_true, y_pred

def neighbors_sum(lattice, i, j):
    '''
    Sums the spins of the lattice points at four neighbor sites to site (i,j).
        Takes into account the size of the lattice in 
        terms of number of rows (i) and columns (j),
        thus implementing periodic boundary conditions.
    '''
    latticesum = 0
    cell = 0
    if i > 0: #Up direction
        latticesum += lattice[i-1,j]
    else:
        latticesum += lattice[lattice.shape[0] - 1, j] #Periodic Boundary Condition
    if j < lattice.shape[1] - 1: #Right direction
        latticesum = lattice[i,j+1]
    else:
        latticesum = lattice[i,0] #Periodic Boundary Condition
    if i < lattice.shape[0] - 1: #Down direction
        latticesum = lattice[i+1,j]
    else:
        latticesum = lattice[0,j] #Periodic Boundary Condition
    if j > 0: #Left Direction
        latticesum += lattice[i,j-1]
    else:
        latticesum += lattice[i,lattice.shape[1] - 1] #Periodic Boundary Condition

    return latticesum

def local_energy(lattice, J, i, j):
    '''
    Calculates the energy of a specific point (i, j) on the lattice
    by adding up its interaction with each of its nearest neighbors
    according to E = -J * sum(s_ij*s_neighbor).
    '''
    #Every term of the sum has s_ij in it, so we can factor it out
    return -1 * J * lattice[i,j] * neighbors_sum(lattice, i, j)

def total_energy(lattice, J):
    '''
    Calculates the total energy of the lattice by counting all of the
    interactions while making sure that no interactions are double counted.
    Do so by calculating the local energy of every other lattice point
    '''
    energy_sum = 0
    for i in range(lattice.shape[0]):
        for j in range(lattice.shape[1]):
            if (i + j) % 2 == 0:
                energy_sum += local_energy(lattice, J, i, j)
    return energy_sum

def energy(N,M,n):
    energies = []
    lattice = np.empty((N,M))
    for r in range(N):
        for c in range(M):
            if (n // 2**(M*r + c)) % 2:
                lattice[r,c] = 1
            else:
                lattice[r,c] = -1
    energies.append(total_energy(lattice, 1))
    return np.array(energies)

In [8]:
# length = 5
# width = 5
# data = np.random.choice(2**(length*width), size=10000, replace=False)
data = np.arange(2**4)
data = np.array(data)[np.newaxis].T
#data = tf.convert_to_tensor(data).requires_grad()
data2 = make_moons(3000, noise=0.05)[0].astype("float32")
# norm = layers.Normalization()
# norm.adapt(data)
# normalized_data = norm(data)
# print(normalized_data)
# print(data)

tf.Tensor(
[[-1.6269784 ]
 [-1.410048  ]
 [-1.1931175 ]
 [-0.97618705]
 [-0.7592566 ]
 [-0.54232615]
 [-0.32539567]
 [-0.10846523]
 [ 0.10846523]
 [ 0.32539567]
 [ 0.54232615]
 [ 0.7592566 ]
 [ 0.97618705]
 [ 1.1931175 ]
 [ 1.410048  ]
 [ 1.6269784 ]], shape=(16, 1), dtype=float32)
[[ 0]
 [ 1]
 [ 2]
 [ 3]
 [ 4]
 [ 5]
 [ 6]
 [ 7]
 [ 8]
 [ 9]
 [10]
 [11]
 [12]
 [13]
 [14]
 [15]]


In [4]:
# Creating a custom layer with keras API.
output_dim = 256
reg = 0.01

def Coupling(input_shape):
    input = keras.layers.Input(shape=(input_shape,))

    t_layer_1 = keras.layers.Dense(
        output_dim, activation="relu", kernel_regularizer=regularizers.l2(reg)
    )(input)
    t_layer_2 = keras.layers.Dense(
        output_dim, activation="relu", kernel_regularizer=regularizers.l2(reg)
    )(t_layer_1)
    t_layer_3 = keras.layers.Dense(
        output_dim, activation="relu", kernel_regularizer=regularizers.l2(reg)
    )(t_layer_2)
    t_layer_4 = keras.layers.Dense(
        output_dim, activation="relu", kernel_regularizer=regularizers.l2(reg)
    )(t_layer_3)
    t_layer_5 = keras.layers.Dense(
        input_shape, activation="linear", kernel_regularizer=regularizers.l2(reg)
    )(t_layer_4)

    s_layer_1 = keras.layers.Dense(
        output_dim, activation="relu", kernel_regularizer=regularizers.l2(reg)
    )(input)
    s_layer_2 = keras.layers.Dense(
        output_dim, activation="relu", kernel_regularizer=regularizers.l2(reg)
    )(s_layer_1)
    s_layer_3 = keras.layers.Dense(
        output_dim, activation="relu", kernel_regularizer=regularizers.l2(reg)
    )(s_layer_2)
    s_layer_4 = keras.layers.Dense(
        output_dim, activation="relu", kernel_regularizer=regularizers.l2(reg)
    )(s_layer_3)
    s_layer_5 = keras.layers.Dense(
        input_shape, activation="tanh", kernel_regularizer=regularizers.l2(reg)
    )(s_layer_4)

    return keras.Model(inputs=input, outputs=[s_layer_5, t_layer_5])

In [5]:
class RealNVP(keras.Model):
    def __init__(self, num_coupling_layers):
        super().__init__()

        self.num_coupling_layers = num_coupling_layers

        # Distribution of the latent space.
        self.distribution = tfp.distributions.MultivariateNormalDiag(
            loc=[0.0, 0.0], scale_diag=[1.0, 1.0]
        )
        self.masks = np.array(
            [[0, 1], [1, 0]] * (num_coupling_layers // 2), dtype="float32"
        )
        self.loss_tracker = keras.metrics.Mean(name="loss")
        self.layers_list = [Coupling(2) for i in range(num_coupling_layers)]

    @property
    def metrics(self):
        """List of the model's metrics.

        We make sure the loss tracker is listed as part of `model.metrics`
        so that `fit()` and `evaluate()` are able to `reset()` the loss tracker
        at the start of each epoch and at the start of an `evaluate()` call.
        """
        return [self.loss_tracker]

    def call(self, x, training=True):
        log_det_inv = 0
        direction = 1
        if training:
            direction = -1
        for i in range(self.num_coupling_layers)[::direction]:
            x_masked = x * self.masks[i]
            reversed_mask = 1 - self.masks[i]
            s, t = self.layers_list[i](x_masked)
            s *= reversed_mask
            t *= reversed_mask
            gate = (direction - 1) / 2
            x = (
                reversed_mask
                * (x * tf.exp(direction * s) + direction * t * tf.exp(gate * s))
            )
            log_det_inv += gate * tf.reduce_sum(s, [1])

        return x, log_det_inv

    # Log likelihood of the normal distribution plus the log determinant of the jacobian.

    def log_loss(self, x):
        y, logdet = self(x)
        log_likelihood = self.distribution.log_prob(y) + logdet
        return -tf.reduce_mean(log_likelihood)

    def kl_loss(self,x):
        min = tf.math.reduce_max(x)
        max = tf.math.reduce_min(x)
        # print(min)
        # print(max)
        # print(x)
        #x_new = tf.math.add(x,1-max)
        edges = tf.range(min, max, (min-max)/5)
        y = tfp.stats.histogram(x,edges)
        total = len(x)
        sum = 0
        for i, val in enumerate(y):
            print(val)
            print((edges[i] +edges[i+1])/2)
            sum += (val/sum) * tf.math.log((val/sum)/(-1)*energy(2,2,int((edges[i] +edges[i+1])/2))/(scipy.constants.k*300))
        return sum

    def train_step(self, data):
        with tf.GradientTape() as tape:

            #loss = self.log_loss(data)
            loss = self.kl_loss(data)

        g = tape.gradient(loss, self.trainable_variables)
        self.optimizer.apply_gradients(zip(g, self.trainable_variables))
        self.loss_tracker.update_state(loss)

        return {"loss": self.loss_tracker.result()}

    def test_step(self, data):
        #loss = self.log_loss(data)
        loss = self.kl_loss(data)
        self.loss_tracker.update_state(loss)

        return {"loss": self.loss_tracker.result()}

In [6]:
model = RealNVP(num_coupling_layers=6)

model.compile(optimizer=keras.optimizers.Adam(learning_rate=0.0001), run_eagerly=True)

history = model.fit(normalized_data, batch_size=256, epochs=300, verbose=2, validation_split=0.2)

Epoch 1/300
tf.Tensor(3.0, shape=(), dtype=float32)
tf.Tensor(-1.3883549, shape=(), dtype=float32)
tf.Tensor(2.0, shape=(), dtype=float32)
tf.Tensor(-0.9111079, shape=(), dtype=float32)
tf.Tensor(2.0, shape=(), dtype=float32)
tf.Tensor(-0.4338609, shape=(), dtype=float32)
tf.Tensor(2.0, shape=(), dtype=float32)
tf.Tensor(0.0433861, shape=(), dtype=float32)


ValueError: No gradients provided for any variable.

In [None]:
plt.figure(figsize=(15, 10))
plt.plot(history.history["loss"])
plt.plot(history.history["val_loss"])
plt.title("model loss")
plt.legend(["train", "validation"], loc="upper right")
plt.ylabel("loss")
plt.xlabel("epoch")

# From data to latent space.
z, _ = model(normalized_data)

# From latent space to data.
samples = model.distribution.sample(3000)
x, _ = model.predict(samples)

f, axes = plt.subplots(2, 2)
f.set_size_inches(20, 15)

axes[0, 0].scatter(normalized_data[:, 0], normalized_data[:, 1], color="r")
axes[0, 0].set(title="Inference data space X", xlabel="x", ylabel="y")
axes[0, 1].scatter(z[:, 0], z[:, 1], color="r")
axes[0, 1].set(title="Inference latent space Z", xlabel="x", ylabel="y")
axes[0, 1].set_xlim([-3.5, 4])
axes[0, 1].set_ylim([-4, 4])
axes[1, 0].scatter(samples[:, 0], samples[:, 1], color="g")
axes[1, 0].set(title="Generated latent space Z", xlabel="x", ylabel="y")
axes[1, 1].scatter(x[:, 0], x[:, 1], color="g")
axes[1, 1].set(title="Generated data space X", label="x", ylabel="y")
axes[1, 1].set_xlim([-2, 2])
axes[1, 1].set_ylim([-2, 2])