In [None]:
# This is a tensorlfow implementation for the demonstration of Pendulum experiment of the paper:
# "Adaptive Path-Integral Approach for Representation Learning and Planning of State Space Models"
#
# This code trains the APIAE network ans save weights of the network.

import os
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
from matplotlib import animation, rc
from APIAE import DynNet, GenNet, APIAE
import pickle
os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"
os.environ["CUDA_VISIBLE_DEVICES"]="0" # will use only the first GPU devices
plt.rcParams.update({'figure.max_open_warning': 0}) # off matplotlib warning

# Fix random seeds
np.random.seed(0)
tf.set_random_seed(0)

In [None]:
# Load trainig data
file = open("pendulum_data.pkl",'rb')
PendulumData = pickle.load(file)
Xref = PendulumData[0]
Zref = PendulumData[1]
file.close()

In [None]:
# Build APIAE
params = dict(n_x = 16**2, # dimension of x; observation
n_z = 2, # dimension of z; latent space
n_u = 1, # dimension of u; control

K = 10, # the number of time steps
L = 50, # the number of trajectory sampled
R = 3,# the number of improvements

dt = .1, # time interval
ur = .3, # update rate 
lr = 0.001 # learning rate
    )
apiae = APIAE(**params)

In [None]:
# Initialize dynamics network with zdot(t) = Az(t), where A = [[0, -g], [1, -b]]
theta_initials = 0.5*np.pi*np.random.normal(loc=0.0, scale=1.0, size=(5000,1)) 
omega_initials = np.pi*np.random.normal(loc=0.0, scale=1.0, size=(5000,1))
Zref_init = np.concatenate([theta_initials, omega_initials], axis = 1)

g = 5.
b = 5.
A_lin = np.array([[0., -g], [1., -b]])
Zdot_ref = Zref_init.dot(A_lin)

apiae.dynNet.initialize(apiae.sess, Zref_init, Zdot_ref, minibatchsize=500, training_epochs=500, display_step=100)

# Show initialized network
T_test = 10
dt_test = 0.1
Z_test = np.zeros((5000,T_test,2))
Z_test[:,0,:] = Zref_init
for t in range(T_test-1):
    dz = apiae.sess.run(apiae.dynNet.zdot_out,feed_dict={apiae.dynNet.z_in:Z_test[:,t,:]})*dt_test
    Z_test[:,t+1,:] = Z_test[:,t,:] + dz
    
plt.close()
plt.figure()
for b in range(0,Z_test.shape[0],100):
    plt.plot(Z_test[b,:,0],Z_test[b,:,1])
    plt.plot(Z_test[b,0,0],Z_test[b,0,1],'k.')
plt.grid()
plt.xlabel('z1')
plt.ylabel('z2')
plt.show()

In [None]:
# Set parameters
NBatch = 100
K = params['K']
dt = params['dt']
n_x = params['n_x']
n_z = params['n_z']
t_span = np.linspace(0,(K-1)*dt,K)
training_epochs = 20000
minibatchsize = np.minimum(250,NBatch)

# Training cycle
for epoch in range(training_epochs):
    avg_cost = 0.
    total_batch = int(NBatch/minibatchsize)
    nperm = np.random.permutation(NBatch)
    
    # Loop over all batches
    for i in range(total_batch):
        minibatch_idx = nperm[i*minibatchsize:(i+1)*minibatchsize]
        batch_xs = Xref[minibatch_idx,:,:,:,:]
        
        # Fit training using batch data
        cost,zseq,museq = apiae.partial_fit(batch_xs)
        
        # Compute average loss
        avg_cost += cost
        
    # Display logs per 10 epoch step
    if epoch % 10 == 0:
        print("Epoch:", '%04d' % (epoch+1), 
              "cost=", "{:.9f}".format(avg_cost))
    
    # Display trainig results and save weights per 100 epoch step   
    if epoch % 100 == 0:
        filename = './weights/weights_'+str(epoch)+'.pkl'
        apiae.saveWeights(filename)
            
        r = params['R']-1
        
        ims_total = []
        K_test = 10
        dt_test = 0.1
        Z_test = np.zeros((5000,K_test,2))
        Z_test[:,0,:] = Zref_init
        for t in range(T_test-1):
            dz = apiae.sess.run(apiae.dynNet.zdot_out,feed_dict={apiae.dynNet.z_in:Z_test[:,t,:]})*dt_test
            Z_test[:,t+1,:] = Z_test[:,t,:] + dz

        plt.close()
        plt.figure()
        for b in range(0,Z_test.shape[0],10):
            plt.plot(Z_test[b,:,0],Z_test[b,:,1])
            plt.plot(Z_test[b,0,0],Z_test[b,0,1],'k.')
        plt.grid()
        plt.xlabel('z1')
        plt.xlabel('z2')
        plt.show()