In [5]:
import gymnasium as gym
import numpy as np
import tensorflow as tf
import random

In [6]:
# Define the CNN model
def create_model():
    model = tf.keras.models.Sequential([
        tf.keras.layers.Dense(8, activation='relu'),
        tf.keras.layers.Dense(32, activation='relu'),
        tf.keras.layers.Dense(4, activation='softmax')
    ])
    return model

# Create the model
Q_theta = create_model()
# Build the model to initialize the weights
Q_theta.build(input_shape=(None, 8))

# Copy the model
Q_target = tf.keras.models.clone_model(Q_theta)
Q_target.build(input_shape=(None, 8))
Q_target.set_weights(Q_theta.get_weights())

optimizer = tf.keras.optimizers.Adam(learning_rate=0.1)





In [7]:
class Environment:
    def __init__(self, number_environments, epsilon=0.2):
        self.num_envs = number_environments
        self.envs = gym.vector.make('LunarLander-v2',
                                    num_envs=number_environments,
                                    render_mode='human')
        self.current_states, _ = self.envs.reset()
        self.epsilon = epsilon
    

    def sample(self, model):
        # gather q values for each action for each enironment
        q_values = model(self.current_states)
        # get actions with highest q values
        actions = np.argmax(q_values, axis=1)
        # replace chosen action with random action with probability of epsilon
        actions = [np.random.choice(4) if np.random.rand() < self.epsilon else action for action in actions]
        # take actions
        new_states, rewards, _, _, _ = self.envs.step(actions)
        old_states = self.current_states
        self.current_states = new_states
        return old_states, actions, rewards, new_states
    


class Buffer:
    def __init__(self, buffer_size):
        self.buffer_size = buffer_size
        self.buffer = []
    

    def add_to_buffer(self, samples):
        for i in range(len(samples[0])):
            state = samples[0][i]
            action = samples[1][i]
            reward = samples[2][i]
            new_state = samples[3][i]
            sample = np.array([state, action, reward, new_state])
            self.buffer.append(sample)
        if len(self.buffer) > self.buffer_size:
            self.buffer = self.buffer[-self.buffer_size:]
    

    def sample_minibatch(self, batch_size):
        batch_size = min(batch_size, len(self.buffer))
        minibatch = random.sample(self.buffer, batch_size)
        return np.array(minibatch)

In [8]:
NUM_ENVS = 5
envs = Environment(NUM_ENVS)

BUFFER_SIZE = 100000
buffer = Buffer(BUFFER_SIZE)

MAX_STEPS = 1_000_000
converged = False

TAU = 0.5
N = 5
K = 3
MINIBATCH_SIZE = 10
GAMMA = 0.3
mse = 0
losses = []
while not converged and MAX_STEPS > 0:
    Q_target.set_weights((1 - TAU) * np.array(Q_target.get_weights()) + TAU * np.array(Q_theta.get_weights()))
    if len(losses) > 1000:
        print("Loss mean:", np.mean(losses))
        losses = []
    for n in range(N):
        
        # sample
        samples = envs.sample(Q_theta)
        # add to buffer
        buffer.add_to_buffer(samples)
        for k in range(K):
            # sample minibatch
            minibatch = buffer.sample_minibatch(MINIBATCH_SIZE)
            # unpack
            old_states = np.array(list(minibatch[:, 0]))
            actions = np.array(list(minibatch[:, 1]))
            rewards = minibatch[:, 2]
            new_states = minibatch[:, 3]
            new_states = np.array(list(new_states))
            Q_target_values = Q_target(new_states)
            max_Q_target_values = np.array([max(action_values) for action_values in Q_target_values])
            target_values = GAMMA * max_Q_target_values + rewards
            #
            with tf.GradientTape() as tape:
                predictions = Q_theta(old_states)
                selected_q_values = tf.gather(predictions, actions, batch_dims=1)
                mse = tf.reduce_mean(tf.square(target_values - selected_q_values))
                losses.append(np.mean(rewards))
            
            gradients = tape.gradient(mse, Q_theta.trainable_variables)
            optimizer.apply_gradients(zip(gradients, Q_theta.trainable_variables))

    print(MAX_STEPS)
    MAX_STEPS -= 1

KeyboardInterrupt: 

In [None]:
import numpy as np
import random

max_steps = 1_000_000
N = 10
K = 2
EPSILON = 0.2
MINI_BATCH_SIZE = 10
GAMMA = 0.5
TAU = 0.9
converged = False

# Create the replay buffer
buffer_size = 10000  # Define the size of the replay buffer
buffer = []#deque(maxlen=buffer_size)

observation, info = envs.reset()
print(observation.shape)

2023-06-12 16:57:47.533 Python[64693:4478232] ApplePersistenceIgnoreState: Existing state will not be touched. New state will be written to /var/folders/mt/0n8phlt50rn7h35xhgh97mw80000gn/T/org.python.python.savedState


(1, 8)


In [None]:
con

3