In [1]:
import random
import gym,os
import numpy as np
from collections import deque
from keras.models import Sequential
from keras.optimizers import Adam
import keras.backend as K
import tensorflow.compat.v1 as tf
from tensorflow.keras import layers

In [2]:
def Noise(x_prev):
    # Formula taken from https://www.wikipedia.org/wiki/Ornstein-Uhlenbeck_process.
    std_deviation=0.2
    theta=0.15
    dt=1e-2
    mean=0
    x = ( x_prev+ theta * (mean - x_prev) * dt+ std_deviation * np.sqrt(dt) * np.random.normal(size=1)  )
    # Store x into x_prev
    #print("x",x)
    #print("x_prev",x_prev)
    #print("mean",mean)
    # Makes next noise dependent on current one
    #x_prev = x
    return x

In [3]:
a=Noise(0)
b=Noise(a)
print(a,b)

[-0.01873419] [0.00911134]


In [4]:
def get_actor():
    # Initialize weights between -3e-3 and 3-e3
    last_init = tf.random_uniform_initializer(minval=-0.003, maxval=0.003)

    inputs = layers.Input(shape=(3,))
    out = layers.Dense(256, activation="relu")(inputs)
    out = layers.Dense(256, activation="relu")(out)
    outputs = layers.Dense(1, activation="tanh", kernel_initializer=last_init)(out)

    # Our upper bound is 2.0 for Pendulum.
    outputs = outputs * 2
    model = tf.keras.Model(inputs, outputs)
    return model

In [5]:
def get_critic():
    inputs = layers.Input(shape=(4,))
    out = layers.Dense(256, activation="relu")(inputs)
    out = layers.Dense(256, activation="relu")(out)
    out = layers.Dense(256, activation="relu")(out)
    outputs = layers.Dense(1)(out)

    model = tf.keras.Model(inputs, outputs)
    return model

In [6]:
@tf.function
def update_target(target_weights, weights, tau):
    for a, b in zip(target_weights, weights):
        a.assign(b * tau + a * (1 - tau))

In [15]:
#sta=np.concatenate((np.reshape(state[0], [1, 3]),action,reward,np.reshape(next_state, [1, 3])), axis=None)


@tf.function
def learn(batch,actor_model,tactor_model,critic_model,target_model):
    with tf.GradientTape() as tape:
                target_actions = tactor_model(batch[:,5:8], training=True)
                next_state_action=tf.concat([batch[:,5:8],target_actions],1)
                y = batch[:,4:5] + 0.99 * target_model(next_state_action, training=True)
                critic_value = critic_model(batch[:,0:4], training=True)
                critic_loss = tf.math.reduce_mean(tf.math.square(y - critic_value))
    critic_grad = tape.gradient(critic_loss, critic_model.trainable_variables)
    critic_optimizer.apply_gradients(zip(critic_grad, critic_model.trainable_variables))

    with tf.GradientTape() as tape:
                actions = actor_model(batch[:,0:3], training=True)
                state_action=tf.concat([batch[:,0:3],actions],1)
                critic_value = critic_model(state_action, training=True)
                actor_loss = -tf.math.reduce_mean(critic_value) 
    actor_grad = tape.gradient(actor_loss, actor_model.trainable_variables)
    actor_optimizer.apply_gradients( zip(actor_grad, actor_model.trainable_variables))

In [16]:
actor_model = get_actor()
tactor_model = get_actor()

critic_model=get_critic()
target_model=get_critic()

# Making the weights equal initially
tactor_model.set_weights(actor_model.get_weights())
target_model.set_weights(critic_model.get_weights())

# Learning rate for actor-critic models
critic_lr = 0.002
actor_lr = 0.001

critic_optimizer = tf.keras.optimizers.Adam(critic_lr)
actor_optimizer = tf.keras.optimizers.Adam(actor_lr)

In [17]:
env=gym.make("Pendulum-v1",render_mode="human")
men=np.zeros((0,8))
for i in range(100):
    print(i)
    state=env.reset()
    noise_1=0
    while True:
        env.render()
        action=actor_model.predict(np.reshape(state[0], [1, 3]),verbose=0)[0]
        noise=Noise(noise_1)
        #print(noise_1,noise)
        noise_1=noise
        action = np.clip(action+noise, -2, 2)
        next_state,reward,done,_,info=env.step(action)
        sta=np.concatenate((np.reshape(state[0], [1, 3]),action,reward,np.reshape(next_state, [1, 3])), axis=None)
        men=np.vstack(([sta],men))
        #print(men.shape,reward)
        if len(men)>=64:
            idx=np.random.choice(len(men), size=64, replace=False)
            batch1=men[idx,:]
            batch1=tf.convert_to_tensor(batch1,tf.float32)
            learn(batch1,actor_model,tactor_model,critic_model,target_model)
            update_target(tactor_model.variables, actor_model.variables, 0.005)
            update_target(target_model.variables, critic_model.variables, 0.005)
        if done:
            break
        state=(next_state,{})

0


KeyboardInterrupt: 

In [None]:
critic_model.variables

In [None]:
critic_model.predict([[1,2,3,4]])



array([[-30.080212]], dtype=float32)