In [1]:
import numpy as np
import sympy as sp
import sklearn as sk
from sklearn.linear_model import HuberRegressor
import tensorflow as tf
from tensorflow.keras.layers import Dense, Input
from tensorflow.keras import Model
from tensorflow import keras
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.metrics import MeanSquaredError
from matplotlib import pyplot as plt

In [2]:
# Q learning initializing constants

# DeltaT, it will be used to turn the infinite time system into a discrete time
# system and give it steps for ease of algorithms: k, k+1, k+2

deltaT = 0.01

# Time that will be quantized by the quatization constant,
# used for online evaluation of learned network

maxQuantizedTime = int(20/deltaT)

# State and Control Effort penalizing matrices, typically constants, deploid in
# the local cost function. Tells the algorithm how to go about controlling the
# states/control input.

#Q = np.array([[1e5, 0, 0], [0, 1e5, 0], [0, 0, 1e5]])
#R = np.array([[1e3, 0, 0], [0, 1e3, 0], [0, 0, 1e3]])*0.5

# Q = deltaT * Q
# R = deltaT * R

# numberOfStates = Q.shape[0]
# numberOfControls = R.shape[1]

numberOfStates = 2
numberOfControls = 1

# Intsead of inputing a single sample to the system and looping it to make it
# learn, we add more patterns to make it learn faster so it learns how
# different samples react and adjust learning accordingly

numberOfPatterns = 1000

# NN training epochs
numberOfEpochs = 100

# Maximum magnitude of the states and control input

stateDomain = 1
controlDomain = 30

# Discount Factor, how much will the future cost impact impact the
# instantaneous cost

gamma = 0.99

# Max number of times the loop will repeat,
# could be less if diverged/error is small enough

maxLooping = 200

# Basis functions, a polynomial used, along with the weights, for
# function approximation

def phi(x1, x2, u):
    
    # Change dimension when changing the amount of basis functions
    # Type is object to apease sympy, if used for calculation purposes
    # use ".astype(float)" to change format
    
    out = np.zeros((17, np.size(x1)), dtype=object)

    x1 = np.array(x1)
    x2 = np.array(x2)
    u = np.array(u)
    
    out[0,:] = 1
    
    out[1,:] = x1
    out[2,:] = x2
    out[3,:] = u
    
    out[4,:] = x1**2
    out[5,:] = x2**2
    out[6,:] = u**2
    out[7,:] = x1 * x2
    out[8,:] = x1 * u
    out[9,:] = x2 * u
    
    out[10,:] = x1**3 
    out[11,:] = x2**3 
    # out[12,:] = x1**2 * x2 
    out[12,:] = x1**2 * u 
    out[13,:] = x2**2 * u
    # out[15,:] = x2**2 * x1 
    out[14,:] = x1 * x2 * u  
    out[15,:] = u**2 * x1 
    out[16,:] = u**2 * x2 
    # out[19,:] = u**3 
    
    # u cannot be cubic as the derivative would be squared,
    #solution could have imaginary component
    
    return out


# The drift dynamics of the system have been discretized below, F & G

def F(x):
    x1 = np.array(x[0,:])
    x2 = np.array(x[1,:])

    # outF = np.zeros((numberOfStates, np.size(x1)))
    f = np.array([
        x2, 
        ( (1-x1**2) * x2 - x1) 
    ])

    return x + deltaT*f
    
G = deltaT * np.array([[0], [1]])

def cs(x):
    return x.reshape(x.shape[0], 1)

# This are the coefficients for the basis functions used in function 
# approximation

numberOfNeurons = np.size(phi(1,1,1))
weights_LIP = np.random.randn(numberOfNeurons, 1)

# Random States and Controls used for training in their respective domain

xPatterns = stateDomain * (2 * np.random.rand(numberOfStates,numberOfPatterns) - 1)
uPatterns = controlDomain * (2 * np.random.rand(numberOfControls, numberOfPatterns) - 1)

# the G dynamics using xPatterns as inputs
uIn = G

# Derivative of the basis function with respect to u, used to update the policy
# and is solved through symbolically.

x1,x2,u = sp.symbols('x1,x2,u')
basisPhi = sp.Matrix(phi(x1,x2,u))
derivePhi = sp.diff(basisPhi, u)

# PHI is the value the basis functions output when xPatterns and 
# uPatterns are used as input

PHI = phi(xPatterns[0,:], xPatterns[1,:], uPatterns[0,:]).astype(float)

weightHistory = np.zeros((numberOfNeurons, maxLooping))

ERROR = np.zeros((1, maxLooping))

# Training a NN on tensorflow requires an optimizer (way of learning; adam is modified gradient descent) and a metric (how does it know its learning correctly)
optimizer = Adam()
acc_metric = MeanSquaredError()
loss_train = tf.keras.losses.Huber(name='train_loss')

In [3]:
# Q Learning Loop
def Q_Learn(R, Q, weights_LIP):
    for k in range(maxLooping):
        
        # uIn refers to the initial control, uIn = G(xPatterns)
        xNext = F(xPatterns) + G * uPatterns
        
        weightSym = sp.Matrix(weights_LIP)
        derEqual0 = weightSym.dot(derivePhi)
        solutionU = sp.solve(derEqual0, u)
        uNextFunction = sp.lambdify([x1,x2], solutionU, 'numpy')

        # The xNext is obtained from training in the simulation space, could be obtained by dynamics
        uNext = np.array(uNextFunction(xNext[0,:], xNext[1,:]))
        
        futureCost = (weights_LIP.T @ phi(xNext[0,:], xNext[1,:], uNext).astype(float)).reshape(1, numberOfPatterns)
        
        cost = (0.5 * np.sum((Q @ xPatterns**2 ), axis = 0)
                + 0.5 * np.sum((R @ uPatterns**2), axis = 0) + gamma * futureCost)

        oldWeights = weights_LIP
        weightHistory[:, k] = oldWeights.reshape(numberOfNeurons,)
        
        # Least squares method to find the next set of weights, Ax = B
        
        A = PHI @ PHI.T
        B = PHI @ cost.T
        weights_LIP = np.linalg.solve(A, B)
        
        ERROR[:, k] = np.linalg.norm(weights_LIP - oldWeights, 2)
        
        # print("Iteration {} | Error = {}".format(k+1, ERROR[:, k]))
        
        if np.isnan(weights_LIP).any == True:
            print("The weights have diverged")
            break

    return uNextFunction

In [4]:
# Data to train and validate
# so we need samples from an optimal policy, ideally this is a sample set from an "expert" demonstrator
# but in less than ideal scenarions we use an optimal policy from an pretrained Q-learning instance
# train =  : this is a line where the optimal policy samples are going to be generated or just loaded, ask edgar how to save values
# U_true =  : different samples with the which to test the 

train_states = np.loadtxt('Train_States.txt', dtype=float)
train_control = np.loadtxt('Train_Controls.txt', dtype=float)
U_true = train_control.reshape(1,train_control.size)
X = train_states

policy = np.concatenate((X, U_true))
train_og = tf.reshape(policy, (policy.shape))
policy = policy.reshape(1,policy.shape[0],policy.shape[1])
X.shape

(2, 1999)

In [5]:
# Gradient Descent network
# Edit the network should have the dempostrator policy as inputs, and output the R and Q
input_shape = numberOfControls + numberOfStates
output_shape = input_shape

model_in = Input(shape=(train_og.shape), name="x_in")
z = Dense(4, activation='relu', name="denseC1")(model_in)
z = Dense(5, activation='relu', name="denseC2")(z)
z = Dense(3, activation='linear', name="linearC3")(z)
out = Dense(1, name="aOut")(z)

model_GD = Model(inputs=[model_in], outputs=[out], name="Gradient_Model")
#model_GD.compile(optimizer='Adam', loss='mse')

model_GD.summary()

Model: "Gradient_Model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 x_in (InputLayer)           [(None, 3, 1999)]         0         
                                                                 
 denseC1 (Dense)             (None, 3, 4)              8000      
                                                                 
 denseC2 (Dense)             (None, 3, 5)              25        
                                                                 
 linearC3 (Dense)            (None, 3, 3)              18        
                                                                 
 aOut (Dense)                (None, 3, 1)              4         
                                                                 
Total params: 8,047
Trainable params: 8,047
Non-trainable params: 0
_________________________________________________________________


In [6]:
guess = np.array((1,2,4)).reshape(1,input_shape)
print(guess.shape)
prediction = model_GD.predict(guess)
prediction


(1, 3)


ValueError: in user code:

    File "c:\Users\uhh443\Miniconda3\envs\Py\lib\site-packages\keras\engine\training.py", line 2041, in predict_function  *
        return step_function(self, iterator)
    File "c:\Users\uhh443\Miniconda3\envs\Py\lib\site-packages\keras\engine\training.py", line 2027, in step_function  **
        outputs = model.distribute_strategy.run(run_step, args=(data,))
    File "c:\Users\uhh443\Miniconda3\envs\Py\lib\site-packages\keras\engine\training.py", line 2015, in run_step  **
        outputs = model.predict_step(data)
    File "c:\Users\uhh443\Miniconda3\envs\Py\lib\site-packages\keras\engine\training.py", line 1983, in predict_step
        return self(x, training=False)
    File "c:\Users\uhh443\Miniconda3\envs\Py\lib\site-packages\keras\utils\traceback_utils.py", line 70, in error_handler
        raise e.with_traceback(filtered_tb) from None
    File "c:\Users\uhh443\Miniconda3\envs\Py\lib\site-packages\keras\engine\input_spec.py", line 277, in assert_input_compatibility
        raise ValueError(

    ValueError: Exception encountered when calling layer "Gradient_Model" "                 f"(type Functional).
    
    Input 0 of layer "denseC1" is incompatible with the layer: expected axis -1 of input shape to have value 1999, but received input with shape (None, 3)
    
    Call arguments received by layer "Gradient_Model" "                 f"(type Functional):
      • inputs=tf.Tensor(shape=(None, 3), dtype=int32)
      • training=False
      • mask=None


In [None]:
n = np.array((1,2,3))
np.eye(n.size) * n


array([[1., 0., 0.],
       [0., 2., 0.],
       [0., 0., 3.]])

In [7]:
def separate(network_output):
    network_output = np.array(network_output).reshape(3,1)
    Qs = np.eye(numberOfStates)
    Rs = np.eye(numberOfControls)

    Qss = np.zeros((1, numberOfStates))
    Rss = np.zeros((1, numberOfControls))

    for i in range(numberOfStates):
        Qss[:,i] = network_output[i]
    for j in range(numberOfControls):
        Rss[:,j] = network_output[numberOfStates+j]

    print(Qss)

    Q = Qs * Qss
    R = Rs * Rss

    return Q, R

In [None]:

#separate(prediction.T)
tf.trainable_variables()

AttributeError: module 'tensorflow' has no attribute 'trainable_variables'

In [8]:
# Training Loop

for epoch in range(numberOfEpochs):

    with tf.GradientTape() as tape:
        # The model will predict that the reward function parameters
        
        RQ_predict = model_GD(policy)   # train variable refers to the x,u values that the network uses to predict RQ: this are the values for the 
        
        Qt, Rt = separate(RQ_predict)
        uNextFunction = Q_Learn(Rt, Qt, weights_LIP)

        # Using the simulation, add a line using this controls to see if the simulation matches expected outcome, or just have a previously simulated data set from where to compare (better idea)(u_true)
        # The X[] need to be changed to accommodate the testing
        # U_true is the true value, so the values of the "actions" that are recorded from the simulations? How would it work? U_true needs to be a pre-labeled data set 
        U_predict = np.array([uNextFunction(X[0,:], X[1,:])])
        loss = loss_train(U_true, U_predict)
    

    gradients = tape.gradient(loss, model_GD.trainable_variables)
    print(gradients)
    optimizer.apply_gradients(zip(gradients, model_GD.trainable_variables))
    acc_metric.update_state(U_true, U_predict)

    train_acc = acc_metric.result()
    print(f"Accuraty over epoch {train_acc}")
    acc_metric.reset_states()




[[-0.25915238 -0.09271409]]
[None, None, None, None, None, None, None, None]


ValueError: No gradients provided for any variable: (['denseC1/kernel:0', 'denseC1/bias:0', 'denseC2/kernel:0', 'denseC2/bias:0', 'linearC3/kernel:0', 'linearC3/bias:0', 'aOut/kernel:0', 'aOut/bias:0'],). Provided `grads_and_vars` is ((None, <tf.Variable 'denseC1/kernel:0' shape=(1999, 4) dtype=float32, numpy=
array([[ 0.0091089 , -0.04871399,  0.00050838,  0.00884101],
       [-0.03221896,  0.02565037, -0.01074677,  0.03093069],
       [ 0.04772449,  0.02825473,  0.04298745, -0.03740874],
       ...,
       [-0.04829378, -0.04537277,  0.00629076, -0.04414977],
       [-0.04932998,  0.00678224, -0.0043845 ,  0.01002862],
       [-0.02386786, -0.01392929,  0.01743731, -0.01504624]],
      dtype=float32)>), (None, <tf.Variable 'denseC1/bias:0' shape=(4,) dtype=float32, numpy=array([0., 0., 0., 0.], dtype=float32)>), (None, <tf.Variable 'denseC2/kernel:0' shape=(4, 5) dtype=float32, numpy=
array([[ 0.5807724 , -0.00480539,  0.11797154, -0.4891484 , -0.75998527],
       [ 0.6021545 , -0.48518535,  0.56726086, -0.5671906 ,  0.32049298],
       [-0.21754128,  0.59352934,  0.08948421,  0.7719984 ,  0.67068076],
       [ 0.7549732 , -0.1191321 ,  0.47170222,  0.5969547 , -0.4807818 ]],
      dtype=float32)>), (None, <tf.Variable 'denseC2/bias:0' shape=(5,) dtype=float32, numpy=array([0., 0., 0., 0., 0.], dtype=float32)>), (None, <tf.Variable 'linearC3/kernel:0' shape=(5, 3) dtype=float32, numpy=
array([[ 0.5497882 ,  0.8140982 , -0.84831095],
       [ 0.38342136, -0.48034763, -0.11172938],
       [ 0.68278164, -0.85856414, -0.4141432 ],
       [ 0.51267976,  0.4443633 ,  0.0804984 ],
       [-0.6962507 , -0.82539636, -0.31413323]], dtype=float32)>), (None, <tf.Variable 'linearC3/bias:0' shape=(3,) dtype=float32, numpy=array([0., 0., 0.], dtype=float32)>), (None, <tf.Variable 'aOut/kernel:0' shape=(3, 1) dtype=float32, numpy=
array([[-1.0690477 ],
       [-0.7712507 ],
       [-0.48756385]], dtype=float32)>), (None, <tf.Variable 'aOut/bias:0' shape=(1,) dtype=float32, numpy=array([0.], dtype=float32)>)).

In [None]:
# test would look similar to the QLIP_Single Network
# Something like this, vareiables need to be changed to fit this script

X = np.zeros((numberOfStates, maxQuantizedTime))
X[:, 0] = np.array([[1.], [1.], [1.]]).reshape(numberOfStates,)

U = np.zeros((numberOfControls, maxQuantizedTime-1))

for t in range(maxQuantizedTime-1):
    
    uOn = G(cs(X[:,t]))

    U[:, t] = np.array([u1NextFunction(X[0,t], X[1,t], X[2,t]), u2NextFunction(X[0,t], X[1,t], X[2,t]), u3NextFunction(X[0,t], X[1,t], X[2,t])]).reshape((numberOfControls,))
    
    X[:, t+1] = (F(cs(X[:,t])) + (uOn[:,0,:] * U[0,t] + uOn[:,1,:] * U[1,t] + uOn[:,2,:] * U[2,t]).T).reshape((numberOfStates,))
    

Time = np.linspace(0, maxQuantizedTime, maxQuantizedTime) * deltaT
timeControl = np.linspace(0, maxQuantizedTime, maxQuantizedTime-1) * deltaT
iterationX = np.linspace(0, maxLooping, maxLooping)

plt.subplot(3,1,1)
plt.title('States')
plt.plot(Time, X[0,:])
plt.xlabel('Time')
plt.ylabel('$x_1$')
plt.grid(visible=True)

plt.subplot(3,1,2)
plt.plot(Time, X[1,:])
plt.xlabel('Time')
plt.ylabel('$x_2$')
plt.grid(visible=True)

plt.subplot(3,1,3)
plt.plot(Time, X[2,:])
plt.xlabel('Time')
plt.ylabel('$x_3$')
plt.grid(visible=True)

plt.figure(num=2)
plt.subplot(3,1,1)
plt.title('Control')
plt.plot(timeControl, U[0,:])
plt.xlabel('Time')
plt.ylabel('[$u_1$]')
plt.grid(visible=True)


plt.subplot(3,1,2)
plt.plot(timeControl, U[1,:])
plt.xlabel('Time')
plt.ylabel('[$u_2$]')
plt.grid(visible=True)


plt.subplot(3,1,3)
plt.plot(timeControl, U[2,:])
plt.xlabel('Time')
plt.ylabel('[$u_3$]')
plt.grid(visible=True)
plt.show()

plt.figure(num=3)
plt.title('Weights')
for l in range(numberOfNeurons):
    plt.plot(iterationX, weightHistory[l,:])
plt.xlabel('Iterations')
plt.ylabel('Weights')
plt.grid(visible=True)
plt.show()

plt.figure(num=4)
plt.title('Error')
plt.plot(iterationX, ERROR[0,:])
plt.xlabel('Iteration')
plt.grid(visible=True)
plt.show()

