In [41]:
MAIN

In [None]:
import sys

# Asegúrate de que la versión de Python sea al menos 3.7
assert sys.version_info >= (3, 7)

# Verifica la versión de TensorFlow
from packaging import version
import tensorflow as tf
assert version.parse(tf.__version__) >= version.parse("2.8.0")

# Importa Matplotlib para trazar gráficos
import matplotlib.animation
import matplotlib.pyplot as plt

# Configura estilos para las gráficas
plt.rc('font', size=14)
plt.rc('axes', labelsize=14, titlesize=14)
plt.rc('legend', fontsize=14)
plt.rc('xtick', labelsize=10)
plt.rc('ytick', labelsize=10)
plt.rc('animation', html='jshtml')

# Importa la clase Path de pathlib para manejar rutas de archivos
from pathlib import Path

# Define la ruta donde se guardarán las imágenes
IMAGES_PATH = Path() / "images" / "rl"
IMAGES_PATH.mkdir(parents=True, exist_ok=True)

# Define una función para guardar figuras
def save_fig(fig_id, tight_layout=True, fig_extension="png", resolution=300):
    path = IMAGES_PATH / f"{fig_id}.{fig_extension}"
    if tight_layout:
        plt.tight_layout()
    plt.savefig(path, format=fig_extension, dpi=resolution)
    
    # Imprime un mensaje si no se detecta una GPU
    if not tf.config.list_physical_devices('GPU'):
        print("No GPU was detected. Neural nets can be very slow without a GPU.")
    
    # Imprime un mensaje si se está ejecutando en Google Colab
    if "google.colab" in sys.modules:
        print("Go to Runtime > Change runtime and select a GPU hardware "
              "accelerator.")
    
    # Imprime un mensaje si se está ejecutando en Kaggle
    if "kaggle_secrets" in sys.modules:
        print("Go to Settings > Accelerator and select GPU.")

# Importa la librería de OpenAI Gym y crea un entorno CartPole
import gymnasium as gym

# Crea el entorno CartPole-v1 y obtiene la observación inicial
env = gym.make("CartPole-v1", render_mode="rgb_array")
obs, info = env.reset(seed=42)


HARDCODED POLICY

In [42]:
def basic_policy(obs):
    angle = obs[2]
    return 0 if angle < 0 else 1

totals = []
for episode in range(500):
    episode_rewards = 0
    obs, info = env.reset(seed=episode)
    for step in range(200):
        action = basic_policy(obs)
        obs, reward, done, truncated, info = env.step(action)
        episode_rewards += reward
        if done or truncated:
            break

    totals.append(episode_rewards)
    
import numpy as np
np.mean(totals), np.std(totals), min(totals), max(totals)

(41.698, 8.389445512070509, 24.0, 63.0)

ANIMACION

In [43]:
# extra code – this cell displays an animation of one episode

def update_scene(num, frames, patch):
    patch.set_data(frames[num])
    return patch,

def plot_animation(frames, repeat=False, interval=40):
    fig = plt.figure()
    patch = plt.imshow(frames[0])
    plt.axis('off')
    anim = matplotlib.animation.FuncAnimation(
        fig, update_scene, fargs=(frames, patch),
        frames=len(frames), repeat=repeat, interval=interval)
    plt.close()
    return anim

def show_one_episode(policy, n_max_steps=200, seed=42):
    frames = []
    env = gym.make("CartPole-v1", render_mode="rgb_array")
    np.random.seed(seed)
    obs, info = env.reset(seed=seed)
    for step in range(n_max_steps):
        frames.append(env.render())
        action = policy(obs)
        obs, reward, done, truncated, info = env.step(action)
        if done or truncated:
            print("Steps: " + str(step))
            break
    env.close()
    return plot_animation(frames)

show_one_episode(basic_policy)

Steps: 54


NEURAL NETWORK

In [46]:
import tensorflow as tf

tf.random.set_seed(42)  # extra code – ensures reproducibility on the CPU

model = tf.keras.Sequential([
    tf.keras.layers.Dense(5, activation="relu"),
    tf.keras.layers.Dense(1, activation="sigmoid"),
])
# extra code – a function that creates an animation for a given policy model

def pg_policy(obs):
    left_proba = model.predict(obs[np.newaxis], verbose=0)[0][0]
    #print(left_proba)
    return int(np.random.rand() > left_proba)

np.random.seed(42)
show_one_episode(pg_policy)

Steps: 24


GRADIENT

In [53]:
def play_one_step(env, obs, model, loss_fn):
    with tf.GradientTape() as tape:
        left_proba = model(obs[np.newaxis])
        action = (tf.random.uniform([1, 1]) > left_proba)
        y_target = tf.constant([[1.]]) - tf.cast(action, tf.float32)
        loss = tf.reduce_mean(loss_fn(y_target, left_proba))

    grads = tape.gradient(loss, model.trainable_variables)
    obs, reward, done, truncated, info = env.step(int(action))
    return obs, reward, done, truncated, grads

def play_multiple_episodes(env, n_episodes, n_max_steps, model, loss_fn):
    all_rewards = []
    all_grads = []
    for episode in range(n_episodes):
        current_rewards = []
        current_grads = []
        obs, info = env.reset()
        for step in range(n_max_steps):
            obs, reward, done, truncated, grads = play_one_step(
                env, obs, model, loss_fn)
            current_rewards.append(reward)
            current_grads.append(grads)
            if done or truncated:

                break

        all_rewards.append(current_rewards)
        all_grads.append(current_grads)

    return all_rewards, all_grads

def discount_rewards(rewards, discount_factor):
    discounted = np.array(rewards)
    for step in range(len(rewards) - 2, -1, -1):
        discounted[step] += discounted[step + 1] * discount_factor
    return discounted

def discount_and_normalize_rewards(all_rewards, discount_factor):
    all_discounted_rewards = [discount_rewards(rewards, discount_factor)
                              for rewards in all_rewards]
    flat_rewards = np.concatenate(all_discounted_rewards)
    reward_mean = flat_rewards.mean()
    reward_std = flat_rewards.std()
    return [(discounted_rewards - reward_mean) / reward_std
            for discounted_rewards in all_discounted_rewards]
n_iterations = 10
n_episodes_per_update = 10
n_max_steps = 200
discount_factor = 0.95

# extra code – let's create the neural net and reset the environment, for
#              reproducibility

tf.random.set_seed(42)

model = tf.keras.Sequential([
    tf.keras.layers.Dense(5, activation="relu"),
    tf.keras.layers.Dense(1, activation="sigmoid"),
])

obs, info = env.reset(seed=42)
optimizer = tf.keras.optimizers.legacy.Nadam(learning_rate=0.01)
loss_fn = tf.keras.losses.binary_crossentropy

for iteration in range(n_iterations):
    all_rewards, all_grads = play_multiple_episodes(
        env, n_episodes_per_update, n_max_steps, model, loss_fn)

    # extra code – displays some debug info during training
    total_rewards = sum(map(sum, all_rewards))
    print(f"\rIteration: {iteration + 1}/{n_iterations},"
          f" mean rewards: {total_rewards / n_episodes_per_update:.1f}", end="")

    all_final_rewards = discount_and_normalize_rewards(all_rewards,
                                                       discount_factor)
    all_mean_grads = []
    for var_index in range(len(model.trainable_variables)):
        mean_grads = tf.reduce_mean(
            [final_reward * all_grads[episode_index][step][var_index]
             for episode_index, final_rewards in enumerate(all_final_rewards)
                 for step, final_reward in enumerate(final_rewards)], axis=0)
        all_mean_grads.append(mean_grads)

    optimizer.apply_gradients(zip(all_mean_grads, model.trainable_variables))


np.random.seed(42)
show_one_episode(pg_policy)

Iteration: 10/10, mean rewards: 28.6Steps: 31


Q-LEARNING

In [54]:
# Import libraries
import gymnasium as gym_lib
import numpy as np_lib
import time
import matplotlib.animation as animation_lib
import matplotlib.pyplot as plt_lib

#---------------#

# Create the resources of the figure used in animation
plt_lib.rc('font', size=14)
plt_lib.rc('axes', labelsize=14, titlesize=14)
plt_lib.rc('legend', fontsize=14)
plt_lib.rc('xtick', labelsize=10)
plt_lib.rc('ytick', labelsize=10)
plt_lib.rc('animation', html='jshtml')

#---------------#

from pathlib import Path

images_dir = Path() / "images" / "rl"
images_dir.mkdir(parents=True, exist_ok=True)

# Method to save the figure
def save_figure(fig_identifier, is_tight_layout=True, figure_extension="png", figure_resolution=300):
    fig_path = images_dir / f"{fig_identifier}.{figure_extension}"
    if is_tight_layout:
        plt_lib.tight_layout()
    plt_lib.savefig(fig_path, format=figure_extension, dpi=figure_resolution)
    
#---------------#

# Create the environment
environment = gym_lib.make("CartPole-v1", render_mode="rgb_array")
environment.action_space.seed(42)
np_lib.random.seed(42)
observation_initial, info_initial = environment.reset(seed=42)

# Control Panel
total_episodes = 10000
reward_log_interval = 500
learning_decay = 500
exploration_decay = 500

#---------------#

# Specify the number of bins for each state value
number_bins = (12, 12, 12, 12)
# Discretize the observation space
bounds_lower = [environment.observation_space.low[0], -0.25, 
                environment.observation_space.low[2], -np_lib.radians(50)]
bounds_upper = [environment.observation_space.high[0], 0.25, 
                environment.observation_space.high[2], np_lib.radians(50)]

#---------------#

def update_frame(frame_number, frames_list, frame_patch):
    frame_patch.set_data(frames_list[frame_number])
    return frame_patch,

def animate_frames(frames_sequence, should_repeat=False, frame_interval=40):
    figure_animation = plt_lib.figure()
    patch_animation = plt_lib.imshow(frames_sequence[0])
    plt_lib.axis('off')
    animation_obj = animation_lib.FuncAnimation(
        figure_animation, update_frame, fargs=(frames_sequence, patch_animation),
        frames=len(frames_sequence), repeat=should_repeat, interval=frame_interval)
    plt_lib.close()
    return animation_obj

#---------------#

def discretize_state(cart_pos, cart_velocity, pole_angle, pole_velocity_at_tip):
    def discretize(value, min_value, max_value, bins):
        value_clipped = max(min_value, min(value, max_value))
        bin_width = (max_value - min_value) / bins
        bin_index = int((value_clipped - min_value) / bin_width)
        return min(bin_index, bins - 1)

    cart_position_bin = discretize(cart_pos, bounds_lower[0], bounds_upper[0], number_bins[0])
    cart_velocity_bin = discretize(cart_velocity, bounds_lower[1], bounds_upper[1], number_bins[1])
    pole_angle_bin = discretize(pole_angle, bounds_lower[2], bounds_upper[2], number_bins[2])
    pole_velocity_bin = discretize(pole_velocity_at_tip, bounds_lower[3], bounds_upper[3], number_bins[3])

    return (cart_position_bin, cart_velocity_bin, pole_angle_bin, pole_velocity_bin)

#---------------#

# Policy function returns the best action given the state
def select_action_greedy(state):
    return np_lib.argmax(q_table[state])

# Exploration rate decay function
def decay_exploration_rate(episode_number, rate_minimum=0.01):
    return max(rate_minimum, min(1, 1.0 - np_lib.log10((episode_number + 1) / exploration_decay)))

# Learning rate decay function
def decay_learning_rate(episode_number, rate_minimum=0.01):
    return max(rate_minimum, min(1.0, 1.0 - np_lib.log10((episode_number + 1) / learning_decay)))

# Function to calculate the new Q value
def calculate_new_q_value(reward, new_state, discount_factor=1):
    future_optimal_value = np_lib.max(q_table[new_state])
    learned_value = reward + discount_factor * future_optimal_value
    return learned_value

#---------------#

# Q-table initialization (all zeros across all bins)
q_table = np_lib.zeros(number_bins + (environment.action_space.n,))

# Start timer to record training time (excluding video export)
total_training_time = 0

#---------------#

episode_rewards = []

# Game loop for episodes
for episode in range(total_episodes):
    # Reset environment for a new episode
    observation, info = environment.reset()
    # Discretize initial observation
    current_state, is_done, is_truncated = discretize_state(*observation), False, False
    # Tracking the last episode number
    last_episode_number = episode
    # Initialize total reward for the current episode
    total_reward_current_episode = 0
 
    while not is_done and not is_truncated:
        # Time tracking for this iteration
        start_time_iteration = time.time()
 
        # Select action based on policy
        action_selected = select_action_greedy(current_state)
        # Apply exploration
        if np_lib.random.random() < decay_exploration_rate(episode):
            action_selected = environment.action_space.sample()
        
        # Take the action in the environment
        observation_new, reward, is_done, is_truncated, info = environment.step(action_selected)
        state_new = discretize_state(*observation_new)
        # Update Q-table with new knowledge
        learning_rate_current = decay_learning_rate(episode)
        q_value_new = calculate_new_q_value(reward, state_new)
        q_value_old = q_table[current_state + (action_selected,)]
        q_table[current_state + (action_selected,)] = (1 - learning_rate_current) * q_value_old + learning_rate_current * q_value_new
 
        # Update current state to new state
        current_state = state_new
 
        # Update stats for the episode
        total_reward_current_episode += reward
 
        # Stop time for this iteration
        end_time_iteration = time.time()
        total_training_time += end_time_iteration - start_time_iteration
 
    episode_rewards.append(total_reward_current_episode)

    # Early stopping if the model is performing well enough
    if np_lib.mean(episode_rewards) >= 200:  # Consider only the last 100 episodes
        break

#---------------#

# Calculate and print training time
training_minutes, training_seconds = divmod(total_training_time, 60)
print("\nTraining summary:")
print("Last episode:", last_episode_number)
print("Average reward:", np_lib.mean(episode_rewards), "Min reward:", np_lib.min(episode_rewards), "Max reward:", np_lib.max(episode_rewards))
print(f"Total training time: {int(training_minutes)}m {int(training_seconds)}s\n")
plot_animation(frames)


Training summary:
Last episode: 5038
Average reward: 200.01091486406034 Min reward: 8.0 Max reward: 500.0
Total training time: 0m 13s

