In [None]:
#!pip install gymnasium
#!pip install gymnasium[classic_control]

In [5]:
import numpy as np
import gymnasium as gym
import matplotlib.pyplot as plt

# Import and initialize Mountain Car Environment
env = gym.make('MountainCar-v0')
env.reset()

# Define Q-learning function
def QLearning(env, learning, discount, epsilon, min_eps, episodes):
    # Determine size of discretized state space
    num_states = (env.observation_space.high - env.observation_space.low)*\
                    np.array([10, 100])
    num_states = np.round(num_states, 0).astype(int) + 1
    
    # Initialize Q table
    Q = np.random.uniform(low = -1, high = 1, 
                          size = (num_states[0], num_states[1], 
                                  env.action_space.n))
    
    # Initialize variables to track rewards
    reward_list = []
    ave_reward_list = []
    
    # Calculate episodic reduction in epsilon
    reduction = (epsilon - min_eps)/episodes
    
    # Run Q learning algorithm
    for i in range(episodes):
        # Initialize parameters
        done = False
        tot_reward, reward = 0,0
        state = env.reset()
        
        # Discretize state
        state_adj = (env.observation_space.low)*np.array([10, 100])
        state_adj = np.round(state_adj, 0).astype(int)
    
        while done != True:   
            # Render environment for last five episodes
            if i >= (episodes - 20):
                env.render()
                
            # Determine next action - epsilon greedy strategy
            if np.random.random() < 1 - epsilon:
                action = np.argmax(Q[state_adj[0], state_adj[1]]) 
            else:
                action = np.random.randint(0, env.action_space.n)
                
            # Get next state and reward
            state2, reward = env.step(action) 
            
            # Discretize state2
            state2_adj = (state2 - env.observation_space.low)*np.array([10, 100])
            state2_adj = np.round(state2_adj, 0).astype(int)
            
            #Allow for terminal states
            if done and state2[0] >= 0.5:
                Q[state_adj[0], state_adj[1], action] = reward
                
            # Adjust Q value for current state
            else:
                delta = learning*(reward + 
                                 discount*np.max(Q[state2_adj[0], 
                                                   state2_adj[1]]) - 
                                 Q[state_adj[0], state_adj[1],action])
                Q[state_adj[0], state_adj[1],action] += delta
                                     
            # Update variables
            tot_reward += reward
            state_adj = state2_adj
        
        # Decay epsilon
        if epsilon > min_eps:
            epsilon -= reduction
        
        # Track rewards
        reward_list.append(tot_reward)
        
        if (i+1) % 100 == 0:
            ave_reward = np.mean(reward_list)
            ave_reward_list.append(ave_reward)
            reward_list = []
            
        if (i+1) % 100 == 0:    
            print('Episode {} Average Reward: {}'.format(i+1, ave_reward))
            
    env.close()
    
    return ave_reward_list

# Run Q-learning algorithm
rewards = QLearning(env, 0.2, 0.9, 0.8, 0, 5000)

# Plot Rewards
plt.plot(100*(np.arange(len(rewards)) + 1), rewards)
plt.xlabel('Episodes')
plt.ylabel('Average Reward')
plt.title('Average Reward vs Episodes')
plt.savefig('rewards.jpg')     
plt.close()

ValueError: too many values to unpack (expected 2)

In [None]:
import collections 
import gymnasium as gym
import numpy as np
import statistics
import tensorflow as tf
import tensorflow.keras.layers as layers
import tqdm

# ambiente 
env = gym.make('MountainCar-v0')
env.reset()
# seed
seed= 42
tf.random.set_seed(seed)
np.random.seed(seed)

esp=np.finfo(np.float32).eps.item()
print('Espaço de ação: ', env.observation_space.low) 

In [None]:
num_actions = env.action_space.n
num_hidden_units=128

model=AtorCritico(num_actions,num_hidden_units)

In [None]:
num_states = (env.observation_space.high - env.observation_space.low)*\
                    np.array([10, 100])
num_states = np.round(num_states, 0).astype(int) + 1
num_states

Q = np.random.uniform(low = -1, high = 1, 
                          size = (num_states[0], num_states[1], 
                                  env.action_space.n))

In [None]:
reward_list = []
ave_reward_list = []
epsilon = 0.8
min_eps = 0
episodes = 1000
# Calculate episodic reduction in epsilon
reduction = (epsilon - min_eps)/episodes

In [None]:
# Run Q learning algorithm
for i in range(episodes):
    # Initialize parameters
    done = False
    tot_reward, reward = 0,0
    state = env.reset()
    
    state_adj = (state - env.observation_space.low)*np.array([10, 100])
    state_adj = np.round(state_adj, 0).astype(int)

In [None]:
print(env.observation_space.low)

In [None]:
 # Discretize state


### novo

In [None]:
class AtorCritico(tf.keras.Model):
    def __init__(
        self,
        num_actions:int,
        num_hidden_units:int):
        super().__init__()

        self.common = layers.Dense(num_hidden_units, activation='relu')
        self.ator = layers.Dense(num_actions)
        self.critico =layers.Dense(1)

    def call(self, inputs:tf.Tensor) -> tuple[tf.Tensor,tf.Tensor]:
        x= self.common(inputs)
        return self.ator(x), self.critico(x)

In [None]:
num_actions = env.action_space() #.n teste
num_hidden_units=128

model=AtorCritico(num_actions,num_hidden_units)

In [None]:
def env_steps(action: np.ndarray)-> tuple[np.ndarray,np.ndarray,np.ndarray]:
    estado, recompenca, final, truncado, info=env.step(action)
    return (estado.astype(np.float32), np.array(recompenca, np.int32), np.array(final, np.int32))

def tf_env_steps(action: tf.Tensor)-> list[tf.Tensor]:
    return tf.numpy_function(env_steps, [action],[tf.float32, tf.int32, tf.int32])

In [None]:
def rodar_ep(initial_state:tf.Tensor,
    model: tf.keras.Model,
    max_steps:int)-> tuple[tf.Tensor,tf.Tensor,tf.Tensor ]:

    acao_probs= tf.TensorArray(dtype=tf.float32,size=0, dynamic_size=True)
    valores= tf.TensorArray(dtype=tf.float32,size=0, dynamic_size=True)
    recompencas= tf.TensorArray(dtype=tf.int32,size=0, dynamic_size=True)

    initial_state_shape=initial_state.shape
    estado=initial_state

    for t in tf.range(max_steps):
        estado=tf.expand_dims(estado, 0)

        action_logists_t, value =model(estado)

        acao= tf.random.categorical(action_logists_t, 1 )[0,0]
        acao_probs_t= tf.nn.softmax(action_logists_t)

        valores =valores.write(t, tf.squeeze(value))

        acao_probs= acao_probs.write(t, acao_probs_t[0, acao])

        estado, recompenca, final =tf_env_steps(acao)
        estado.set_shape(initial_state_shape)

        recompencas=recompencas.write(t,recompenca)

        if tf.cast(final, tf.bool):
            break
    acoes_prob=acao_probs.stack()
    valores= valores.stack()
    recompencas= recompencas.stack()

    return acoes_prob,valores,recompencas

In [None]:
def receber_valor_esperado(
    recompencas: tf.Tensor,
    gamma: float,
    standardize: bool=True
)-> tf.Tensor:
    n=tf.shape(recompencas)[0]
    returns= tf.TensorArray(dtype=tf.float32, size=n)

    recompencas= tf.cast(recompencas[::-1], dtype=tf.float32)
    soma_descontada= tf.constant(0.0)
    soma_descontada_shape=soma_descontada.shape
    for i in tf.range(n):
        recompenca=recompencas[i]
        soma_descontada= recompenca+gamma*soma_descontada
        soma_descontada.set_shape(soma_descontada_shape)
        returns= returns.write(i, soma_descontada)
    returns= returns.stack()[::-1]
    if standardize:
        returns= ((returns-tf.math.reduce_mean(returns))/
        (tf.math.reduce_std(returns)+esp))
    return returns

In [None]:
huber_loss= tf.keras.losses.Huber(reduction=tf.keras.losses.Reduction.SUM)

def computar_perdas(
    action_prob:tf.Tensor,
    valores: tf.Tensor,
    returns: tf.Tensor
)-> tf.Tensor:
    vantagem= returns-valores

    action_log_prob=tf.math.log(action_prob)
    ator_loss= -tf.math.reduce_sum(action_log_prob*vantagem)

    critico_loss= huber_loss(valores,returns)

    return ator_loss+critico_loss

In [None]:
otimizador= tf.keras.optimizers.Adam(learning_rate=0.01)

@tf.function
def train_step(
    initial_state: tf.Tensor,
    model: tf.keras.Model,
    otimizador: tf.keras.optimizers.Optimizer,
    gamma:float,
    num_max_steps_por_ep:int)-> tf.Tensor:
    with tf.GradientTape() as tape:
        acoes_prob, valores,recompencas= rodar_ep(initial_state, model,num_max_steps_por_ep)

        retornos= receber_valor_esperado(recompencas, gamma)

        acoes_prob, valores, retornos=[tf.expand_dims(x,1) for x in [acoes_prob, valores, retornos]]

        perda= computar_perdas(acoes_prob, valores, retornos)
    grads= tape.gradient(perda, model.trainable_variables)

    otimizador.apply_gradients(zip(grads, model.trainable_variables))

    recompenca_do_ep=tf.math.reduce_sum(recompencas)

    return recompenca_do_ep

In [None]:
def step(self, action: int):
        assert self.action_space.contains(
            action
        ), f"{action!r} ({type(action)}) invalid"

        position, velocity = self.state
        velocity += (action - 1) * self.force + math.cos(3 * position) * (-self.gravity)
        velocity = np.clip(velocity, -self.max_speed, self.max_speed)
        position += velocity
        position = np.clip(position, self.min_position, self.max_position)
        if position == self.min_position and velocity < 0:
            velocity = 0

        terminated = bool(
            position >= self.goal_position and velocity >= self.goal_velocity
        )
        reward = -1.0

        self.state = (position, velocity)
        if self.render_mode == "human":
            self.render()
        return np.array(self.state, dtype=np.float32), reward, terminated, False, {}
min_ep=100
max_ep= 10000
max_steps_per_ep=500

limiar_de_recompenca=-100#475 - teste
recompenca_rodar=0

gamma=.99

recompancas_do_ep: collections.deque=collections.deque(maxlen=min_ep)

t= tqdm.trange(max_ep)
for i in t:
    estado_inicial, info = env.reset()
    estado_inicial= tf.constant(estado_inicial, dtype=tf.float32)
    recompanca_do_ep= int(train_step(
        estado_inicial,model, otimizador, gamma, max_steps_per_ep)
    )

    recompancas_do_ep.append(recompanca_do_ep)
    recompenca_rodar= statistics.mean(recompancas_do_ep)

    t.set_postfix(recompanca_do_ep=recompanca_do_ep, recompenca_rodar=recompenca_rodar)
    if recompenca_rodar > limiar_de_recompenca and i > min_ep:
        break
print(f'\n EP:{i} \n recompanca media {recompenca_rodar:.2f}')

In [None]:
env.reset()
for i in range(1000): 
    

In [None]:
recompancas_do_ep

In [None]:
from IPython import  display as ipythondisplay
from PIL import Image

render_env = gym.make('MountainCar-v0', render_mode='rgb_array')

def render_ep(env: gym.Env, model: tf.keras.Model, max_steps:int):
    state, info= render_env.reset()
    state - tf.constant(state, dtype= tf.float32)
    screen= render_env.render()
    images= [Image.fromarray(screen)]

    for i in range(1, max_steps+1):
        state= tf.expand_dims(state, 0)
        action_probs, _ = model(state)
        action= np.argmax(np.squeeze( action_probs))

        state, recompanca, final, truncado, info = render_env.step(action)
        state= tf.constant(state,dtype=tf.float32)

        if i%10==0:
            screen= render_env.render()
            images.append(Image.fromarray(screen))

        if final:
            break
    return images

In [None]:
images= render_ep(env, model, max_ep) 
images[0].save('MountainCar.gif',save_all=True, append_images=images[1:],loop=0, duration=1)