In [2]:
import gymnasium as gym 
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import keras
from keras import layers
from keras.models import Model
from keras.layers import Input, Dense, Lambda
from keras.layers import Concatenate
from collections import deque
import random

In [3]:
env = gym.make("Pendulum-v1", render_mode="human", g = 9.81)

In [None]:
state_size = env.observation_space.shape[0] # x, y, angular velocity
action_size = env.action_space.shape[0] # torque [-2, 2]
gamma = 0.89 # discount rate
learning_rate = 0.001 # learning rate

# Define the actor model
states_inputs = Input(shape=(state_size,))
dense = Dense(400, activation='relu')(states_inputs)
dense = Dense(300, activation='relu')(dense)
outputs = Dense(action_size, activation='tanh')(dense)
outputs = keras.layers.Lambda(lambda x: x * 2.0)(outputs)  # Scale action to [-2, 2]
actor_model = Model(inputs=states_inputs, outputs=outputs)

# Critic 1
state_input1 = Input(shape=(state_size,))
action_input1 = Input(shape=(action_size,))
concat1 = Concatenate()([state_input1, action_input1])
dense1 = Dense(400, activation='relu')(concat1)
dense1 = Dense(300, activation='relu')(dense1)
output1 = Dense(1)(dense1)
critic1 = Model([state_input1, action_input1], output1)

# Critic 2
state_input2 = Input(shape=(state_size,))
action_input2 = Input(shape=(action_size,))
concat2 = Concatenate()([state_input2, action_input2])
dense2 = Dense(400, activation='relu')(concat2)
dense2 = Dense(300, activation='relu')(dense2)
output2 = Dense(1)(dense2)
critic2 = Model([state_input2, action_input2], output2)


try:
    actor_model.load_weights('actor_model.weights.h5')
    critic_model.load_weights('critic_model.weights.h5')
except:
    pass

actor_optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
critic_optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)

target_actor = keras.models.clone_model(actor_model)
target_actor.set_weights(actor_model.get_weights())
target_critic = keras.models.clone_model(critic_model)
target_critic.set_weights(critic_model.get_weights())

ckpt = tf.train.Checkpoint(actor_optimizer=actor_optimizer,
                           critic_optimizer=critic_optimizer)

# Restore the latest checkpoint with optimizer states
ckpt.restore(tf.train.latest_checkpoint("optimizers_ckpt")).expect_partial()






<tensorflow.python.checkpoint.checkpoint.CheckpointLoadStatus at 0x218668b38b0>

In [5]:
class ReplayBuffer:
    def __init__(self, capacity=100000):
        self.buffer = deque(maxlen=capacity)
    def store(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = map(np.array, zip(*batch))
        return states, actions, rewards, next_states, dones
    def size(self):
        return len(self.buffer)

class OUActionNoise: # Ornstein-Uhlenbeck Process
    def __init__(self, mean, std_dev, theta=0.15, dt=1e-2):
        self.theta = theta
        self.mu = mean
        self.sigma = std_dev
        self.dt = dt
        self.x_prev = np.zeros_like(self.mu)
    def __call__(self):
        x = self.x_prev + self.theta * (self.mu - self.x_prev) * self.dt + \
            self.sigma * np.sqrt(self.dt) * np.random.normal(size=self.mu.shape)
        self.x_prev = x
        return x
    
def soft_update(target_weights, online_weights, tau=0.005):
    for (target, online) in zip(target_weights, online_weights):
        target.assign(target * (1 - tau) + online * tau) 

noise = OUActionNoise(mean=np.zeros(action_size), std_dev=0.01 * np.ones(action_size))
replay_buffer = ReplayBuffer()

In [None]:
epoch = 100
batch_size = 64

for e in range(epoch): 
    state, _ = env.reset() 
    done = False 
    while not done:
        action = actor_model(tf.convert_to_tensor([state], dtype=tf.float32)).numpy()[0]
        action = action + np.random.normal(0, 0.1, size=action_size)
        action = np.clip(action, -2.0, 2.0)  # clip action to [-2, 2]

        next_state, reward, done, _, _ = env.step(action)
        replay_buffer.store(state, action, reward, next_state, done)

        state = next_state

        if replay_buffer.size() >= batch_size: 
            states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)
            states = tf.convert_to_tensor(states, dtype=tf.float32)
            actions = tf.convert_to_tensor(actions, dtype=tf.float32)
            rewards = tf.convert_to_tensor(rewards, dtype=tf.float32)
            next_states = tf.convert_to_tensor(next_states, dtype=tf.float32)
            dones = tf.convert_to_tensor(dones, dtype=tf.float32)

            with tf.GradientTape() as critic_tape: 
                


        

KeyboardInterrupt: 