In [1]:
import gymnasium as gym
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
import numpy as np
import seaborn as sns
from tqdm import tqdm
from collections import defaultdict # allows access to undefined keys
matplotlib.use('TkAgg')  # or 'Qt5Agg' if you prefer Qt

In [2]:
class CliffWalkingAgent():
    def __init__(self,
                 learning_rate:float,
                 initial_epsilon:float,
                 epsilon_decay:float,
                 final_epsilon:float,
                 discount_factor:float = 0.95,
                 ):
        
    #Initialize the agent with empty dictionary of action/state values (q_values), a learning rate and an epsilon
    # discount_factor : Is for computing the Q-value namely gamma 
        self.q_values = defaultdict(lambda: np.zeros(env.action_space.n))
    
        self.lr = learning_rate
        self.epsilon = initial_epsilon
        self.epsilon_decay = epsilon_decay
        self.final_epsilon = final_epsilon
        self.discount_factor = discount_factor
        
        self.training_error = []
    
    def choose_action(self, obs:tuple[int,int,bool])->int:
        # Return the best action with a probability of (1- epsilon) 
        if np.random.random() < self.epsilon:
            return env.action_space.sample()
        else:
            return int(np.argmax(self.q_values[obs]))
    
    def update_q_values(self,
                        obs:tuple[int,int,bool],
                        action:int,
                        reward:float,
                        terminated:bool,
                        next_obs:tuple[int,int,bool]):
        future_q_value = (not terminated) * np.max(self.q_values[next_obs])

        temporal_diffrence = (reward + (self.discount_factor * future_q_value))- self.q_values[obs][action]
        
        self.q_values[obs][action] = (
            self.q_values[obs][action] + self.lr * temporal_diffrence
        )
        self.training_error.append(temporal_diffrence)
        
    def decay_epsilon(self):
        self.epsilon = max(self.final_epsilon, self.epsilon * self.epsilon_decay)

In [3]:
learning_rate = 0.1
n_episodes = 10_000
start_epsilon = 1
epsilon_decay = 0.99
final_epsilon = 0.08

agent = CliffWalkingAgent(
    learning_rate=learning_rate,
    initial_epsilon=start_epsilon,
    final_epsilon=final_epsilon,
    epsilon_decay=epsilon_decay,
    
    
)

In [4]:
env = gym.make('CliffWalking-v0', render_mode='rgb_array')
env.reward_range = (-100, 100)
def custom_reward(observation, reward, done, info):
    if observation == env.goal_state:
        reward = 100  # Set reward of 10 for reaching the goal
    return observation, reward, done, info

env.reward_wrapper = custom_reward 

In [5]:
env = gym.wrappers.RecordEpisodeStatistics(env, deque_size=n_episodes)
env = gym.wrappers.TimeLimit(env, max_episode_steps=60)

rewards = 0 
for episode in tqdm(range(n_episodes)):
    obs, info = env.reset()
    done = False
    
    # play one episode
    while not done:
        action = agent.choose_action(obs)
        next_obs, reward, terminated, truncated, info = env.step(action)
        rewards += reward
        # update the agent
        agent.update_q_values(obs, action, reward, terminated, next_obs)

        # update if the environment is done and the current obs
        done = terminated or truncated
        obs = next_obs

    agent.decay_epsilon()

100%|██████████| 10000/10000 [00:07<00:00, 1341.88it/s]


In [6]:
q_values = np.array([value for key, value in agent.q_values.items()])
print(np.argmax(q_values,axis=1))


[1 0 1 2 1 1 2 1 1 2 1 1 2 1 1 1 0 2 3 1 1 1 2 1 1 2 1 2 1 1 1 1 1 1 2 2 2
 0]


In [7]:
print(f'total rewards = {rewards}')


total rewards = -469126


In [11]:
class CustomRewardWrapper(gym.Wrapper):
    def __init__(self, env):
        super().__init__(env)
        self.goal_reward = 10  # Set desired reward for reaching the goal
        self.goal_state = 47  # The actual goal state for CliffWalking

    def step(self, action):
        observation, reward, terminated, truncated, info = self.env.step(action)
        
        # Check if the agent has reached the goal state
        if observation == self.goal_state:
            print(f'Goal reached! Observation: {observation}')
            reward = self.goal_reward
        
        return observation, reward, terminated, truncated, info

# Create and wrap the environment
env = gym.make('CliffWalking-v0',render_mode='rgb_array')
env = CustomRewardWrapper(env)

obs, info = env.reset()

plt.ion()
fig, ax = plt.subplots(figsize=(8,8))
action_text = ax.text(510, 20, '', color='white', fontsize=12, bbox=dict(facecolor='blue', alpha=0.8))
img = ax.imshow(env.render())
actions = ['Move Up','Move Right','Move Down','Move Left']
rewards = 0
num_epochs= 3
for step in range(num_epochs):
    obs, info = env.reset()
    done = False
    while not done:
        action = agent.choose_action(obs)
        next_obs, reward, terminated, truncated, info = env.step(action)
        rewards += reward
        
        print(f'step {step}:  obs = {next_obs} , reward = {reward}')
        frame = env.render()
        img.set_data(frame)
        action_text.set_text(f'Step: {actions[action] }')

        fig.canvas.draw()
        fig.canvas.flush_events()
        plt.pause(.05)
        done = terminated or truncated
        obs = next_obs

plt.ioff()  # Turn off interactive mode
plt.show()  # Keep the window open after the animation finishes
plt.close()
env.close()

step 0:  obs = 24 , reward = -1
step 0:  obs = 25 , reward = -1
step 0:  obs = 26 , reward = -1
step 0:  obs = 27 , reward = -1
step 0:  obs = 28 , reward = -1
step 0:  obs = 29 , reward = -1
step 0:  obs = 30 , reward = -1
step 0:  obs = 31 , reward = -1
step 0:  obs = 32 , reward = -1
step 0:  obs = 33 , reward = -1
step 0:  obs = 34 , reward = -1
step 0:  obs = 35 , reward = -1
Goal reached! Observation: 47
step 0:  obs = 47 , reward = 10
step 1:  obs = 24 , reward = -1
step 1:  obs = 25 , reward = -1
step 1:  obs = 26 , reward = -1
step 1:  obs = 27 , reward = -1
step 1:  obs = 28 , reward = -1
step 1:  obs = 29 , reward = -1
step 1:  obs = 36 , reward = -100
step 1:  obs = 36 , reward = -1
step 1:  obs = 24 , reward = -1
step 1:  obs = 25 , reward = -1
step 1:  obs = 26 , reward = -1
step 1:  obs = 27 , reward = -1
step 1:  obs = 28 , reward = -1
step 1:  obs = 29 , reward = -1
step 1:  obs = 30 , reward = -1
step 1:  obs = 31 , reward = -1
step 1:  obs = 32 , reward = -1
step 1: 

In [9]:
print(f'total rewards = {rewards}')

total rewards = -6


In [10]:
print(f'action space shape : {env.action_space.n}') # Number of possible actions is 4
print(f'observation space shape : {env.observation_space}') 
print(f'observation space numbers : {env.nS}') 
#-------------- obesrvation is a tupe of 3 values : --------------
#1) player cards value
#2) dealer's face up card
#3) usable ace for player, equal 1 if ace is considered an 11 without busting

print(f'reward range : {env.reward_range}') # default reward range is set to -inf +inf
# print(f'\nEnv specs : {env.spec}') 
print(f'\nEnv metadata : {env.metadata}') # render_modes adn render_fps

action space shape : 4
observation space shape : Discrete(48)
observation space numbers : 48
reward range : (-inf, inf)

Env metadata : {'render_modes': ['human', 'rgb_array', 'ansi'], 'render_fps': 4}


  logger.warn(
