In [None]:
import time 
import numpy as np

from tqdm import tqdm
from collections import deque

from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.layers import Dense, Input, Lambda
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.losses import MeanSquaredError

import tensorflow as tf


from ParabolicShot import ParabolicShotEnv
import gymnasium as gym

In [None]:
from tensorflow.keras.layers import Layer
import tensorflow_probability as tfp
tfd = tfp.distributions


class ScaleLayer(Layer):
    def __init__(self, scale_factor, **kwargs):
        super().__init__(**kwargs)
        self.scale_factor = scale_factor

    def call(self, inputs):
        return inputs * self.scale_factor

class SoftplusLayer(Layer):
    def __init__(self, epsilon, **kwargs):
        super().__init__(**kwargs)
        self.epsilon = epsilon

    def call(self, inputs):
        return tf.nn.softplus(inputs) + self.epsilon
    
    
class DistributionLayer(Layer):
    def __init__(self, action_dim, continuous_action_bound, **kwargs):
        super().__init__(**kwargs)
        self.action_dim = action_dim
        self.continuous_action_bound = continuous_action_bound

    def build(self, input_shape):
        self.mu_layer = Dense(self.action_dim, activation='tanh')
        self.sigma_layer = Dense(self.action_dim, activation='softplus')
        self.scale_layer = ScaleLayer(self.continuous_action_bound)
        self.adjust_sigma_layer = SoftplusLayer(0.0001)

    def call(self, inputs):
        mu = self.mu_layer(inputs)
        sigma = self.sigma_layer(inputs)
        mu_scaled = self.scale_layer(mu)
        sigma_adjusted = self.adjust_sigma_layer(sigma)
        return mu_scaled, sigma_adjusted


In [None]:
class PPOAgent:
    def __init__(self, state_dim, continuous_action_dim, continuous_action_bound):
        self.state_dim = state_dim
        self.continuous_action_dim = continuous_action_dim
        self.continuous_action_bound = continuous_action_bound
        self.actor_optimizer = Adam(learning_rate=0.001)
        self.critic_optimizer = Adam(learning_rate=0.01)
        self.actor = self.build_actor()
        self.critic = self.build_critic()

    def build_actor(self):
        inputs = Input(shape=(self.state_dim,))
        x = Dense(64, activation='relu', kernel_initializer='he_uniform')(inputs)
        x = Dense(64, activation='relu', kernel_initializer='he_uniform')(x)
        mu, sigma = DistributionLayer(self.continuous_action_dim, self.continuous_action_bound)(x)
        return Model(inputs=inputs, outputs=[mu, sigma])

    
    def policy(self, state):
        mu, sigma = self.actor(np.array([state]))
        dist = tfd.Normal(loc=mu[0], scale=sigma[0])  # Asegura usar el primer (y único) batch
        action = dist.sample()
        return np.clip(action, -self.continuous_action_bound, self.continuous_action_bound)


    def build_critic(self):
        inputs = Input(shape=(self.state_dim,))
        x = Dense(64, activation='relu', kernel_initializer='he_uniform')(inputs)
        x = Dense(64, activation='relu', kernel_initializer='he_uniform')(x)
        outputs = Dense(1, activation='linear')(x)
        return Model(inputs=inputs, outputs=outputs)

    def train(self, replay_buffer):
        batch_size = 64
        if len(replay_buffer) < batch_size:
            return None, None

        samples = np.array(replay_buffer, dtype=object)
        states, continuous_actions, rewards, next_states, dones = map(np.stack, zip(*samples))
        rewards = (rewards - np.mean(rewards)) / (np.std(rewards) + 1e-8)

        with tf.GradientTape() as tape:
            critic_value = self.critic(states, training=True)
            critic_loss = MeanSquaredError()(rewards, tf.squeeze(critic_value))

        critic_grads = tape.gradient(critic_loss, self.critic.trainable_variables)
        self.critic_optimizer.apply_gradients(zip(critic_grads, self.critic.trainable_variables))

        with tf.GradientTape() as tape:
            mu, sigma = self.actor(states, training=True)
            dist = tfd.Normal(loc=mu, scale=sigma)
            log_probs = dist.log_prob(continuous_actions)
            log_probs = tf.reduce_sum(log_probs, axis=-1)  # Sumar a lo largo de la dimensión de acción

            critic_value = self.critic(states)
            advantage = (rewards - tf.squeeze(critic_value))
            advantage = (advantage - tf.reduce_mean(advantage)) / (tf.math.reduce_std(advantage) + 1e-8)

            actor_loss = -tf.reduce_mean(log_probs * advantage)
            entropy_bonus = -0.01 * tf.reduce_mean(dist.entropy())
            total_actor_loss = actor_loss + entropy_bonus

        actor_grads = tape.gradient(total_actor_loss, self.actor.trainable_variables)
        actor_grads = [tf.clip_by_norm(g, 1.0) if g is not None else None for g in actor_grads]
        self.actor_optimizer.apply_gradients(zip(actor_grads, self.actor.trainable_variables))

        return actor_loss, critic_loss


In [None]:
def train_agent(episodes=110):
    env = ParabolicShotEnv()
    agent = PPOAgent(env.observation_space.shape[0], 3, env.action_space.spaces['continuous'].high)  # Ajustar según la definición correcta de dimensiones y límites
    replay_buffer = deque(maxlen=1000)

    for episode in tqdm(range(episodes)):
        state = env.reset()
        done = False
        total_reward = 0
        while not done:
            action = agent.policy(state)
            hybrid_action = {'continuous': action, 'discrete': 1 if np.random.rand() > 0.5 else 0}  # Ejemplo de cómo seleccionar acción discreta
            next_state, reward, done, info = env.step(hybrid_action)
            replay_buffer.append((state.copy(), hybrid_action['continuous'], reward, next_state.copy(), done))
            state = next_state
            total_reward += reward

        if episode % 10 == 0:
            actor_loss, critic_loss = agent.train(list(replay_buffer))
            if episode % 100 == 0 and actor_loss is not None:  # Cambia 100 por cualquier otro número dependiendo de la frecuencia deseada
                print(f'Episode: {episode+1}, Total Reward: {total_reward}, Actor Loss: {actor_loss.numpy()}, Critic Loss: {critic_loss.numpy()}')

    agent.actor.save_weights('/home/jd/Documentos/CODIGO/OpenAIGym/trained/parabolic_actor.weights.h5')
    agent.critic.save_weights('/home/jd/Documentos/CODIGO/OpenAIGym/trained/parabolic_critic.weights.h5')
    env.close()

In [None]:
def simulate_agent():
    env = ParabolicShotEnv()
    agent = PPOAgent(env.observation_space.shape[0], 3, env.action_space.spaces['continuous'].high)
    agent.actor.load_weights('/home/jd/Documentos/CODIGO/OpenAIGym/trained/parabolic_actor.weights.h5')
    agent.critic.load_weights('/home/jd/Documentos/CODIGO/OpenAIGym/trained/parabolic_critic.weights.h5')
    
    for _ in tqdm(range(10)):
        state = env.reset()
        done = False
        total_reward = 0

        while not done:
            action = agent.policy(state)
            hybrid_action = {'continuous': action, 'discrete': 1 if np.random.rand() > 0.5 else 0}
            print(hybrid_action)  # Debugging
            state, reward, done, _ = env.step(hybrid_action)
            total_reward += reward
            env.render(mode='human')
            time.sleep(1)

        print(f'Total Reward: {total_reward}')
    env.close()

In [None]:
# simulate_agent()
train_agent(1000)

In [None]:
simulate_agent()