### OpenAI Taxi-v3 (Monte Carlo methods and TD methods)

In [None]:
# import libraries
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import random
import sys

In [None]:
environment_name = 'Taxi-v3'
render_mode = 'rgb_array'

### 1. Monte Carlo Methods

#### On-Policy

#### Off-Policy

### Temporal-Difference Learning Methods

In [None]:
class TDAgent:
    # An agent class that interacts with the environment
    def __init__(self,actionSpace = 4,stateSpace = 48) -> None:
        # parameter space for the agent
        self.actionSpace = actionSpace # number of actions
        self.stateSpace = stateSpace   # number of states
        self.e = 0.1                   # epsilon
        self.discount_factor = 0.95    # gamma
        self.learning_rate = 0.8       # alpha 
        self.Q_values = np.zeros((stateSpace,actionSpace),dtype=np.float32) # Q values
    
    def epsilon_greedy(self,s):
        # epsilon greedy policy
        epsilon = random.random() # choose a random number between 0 and 1
        # compare the epsilon with the threshold
        if epsilon < self.e: 
            #  if epsilon is less than the threshold, choose a random action from the action space
            return np.random.choice(self.actionSpace)  
        else:
            # if epsilon is greater than the threshold, choose the action with the highest Q value
            return np.argmax(self.Q_values[s,]) 

In [None]:
class TDLearning:
    # TD learning experiment
    # initialise the parameters for the TD learning experiment
    def __init__(self,num_episodes = 500,method = 'Q-Learning') -> None:

        self.num_episodes = num_episodes # number of episodes
        self.episode_length = 500        # maximum length of an episode
        self.method = method             # method to use for TD learning (Q-Learning or SARSA)
        

    # this is the main TD experiment
    def run(self,env,agent):
        # stats store the accumulated reward for each episode
        stats = np.zeros(self.num_episodes)
        #  episode_lengths store the length of each episode
        episode_lengths = np.zeros(self.num_episodes) 
        
        # this is a helper function
        # this is used to update the Q-Values in the TD learning algorithm
        def update_q_value(state,a,next_state,reward,agent):
            
            Q_values = agent.Q_values
            q = Q_values[state,a]
            
            optimal_target = 0
            if self.method == 'Q-Learning': # pick the action greedily w.r.t the Q values
                optimal_target = agent.discount_factor*np.max(Q_values[next_state,])
            elif self.method == 'SARSA': # use the epsilon greedy policy to pick the next action
                optimal_target = agent.discount_factor*Q_values[next_state,agent.epsilon_greedy(next_state)]
            # update the Q value
            q = q + agent.learning_rate*(reward+optimal_target - q)

            return q

        # helper function to print progress after every 10000 episodes
        def print_progress(episode):
            print("Episode: {}/{}".format(episode, self.num_episodes), end = " ")
            print("Cumulative Reward: {}".format(stats[episode]), end='\n')
            
        # run the experiment iterate over the number of episodes
        for episodes in range(self.num_episodes):
            observation = env.reset() # get the initial observation from the environment
            s = observation[0]        # get the state from the observation
            # t = 0
            for t in range(self.episode_length): # iterate over the episode length
                # t+=1
                a = agent.epsilon_greedy(s) # choose the action in state s using the epsilon greedy policy
                
                next_state,reward,done,info , _ = env.step(a) # take the action and get the next state, reward, done and info
                stats[episodes]+=reward         # update the cumulative reward
                episode_lengths[episodes] = t   # update the episode length
                agent.Q_values[s,a] = update_q_value(s,a,next_state,reward,agent) # do the update step
                s = next_state                  # update the state

                if done == True:
                    # reached terminal state
                    break
                # return 1
            if episodes % 100 == 0:
                print_progress(episodes)
        
        return stats,episode_lengths

#### Q-Learning

In [None]:
env = gym.make(environment_name,render_mode=render_mode)
env.reset()
nA = env.action_space.n
nS = env.observation_space.n 

agent = TDAgent(actionSpace=nA,stateSpace=nS)
qvalue = TDLearning(method='Q-Learning')
ql_stats,ql_episode_lengths = qvalue.run(env,agent)


#### Sarsa

In [None]:
env.reset()
agent2 = TDAgent(actionSpace=env.action_space.n,stateSpace=env.observation_space.n)
SARSA = TDLearning(method='SARSA')
sarsa_stats,sarsa_episode_lengths = SARSA.run(env,agent2)


In [None]:
# plot the results
plt.plot(ql_stats,label='Q-Learning')
plt.rcParams['figure.figsize'] = [10, 5]
plt.xlabel('Episodes')
plt.ylabel('Accumulated Reward')
plt.legend()
plt.show()


In [None]:
plt.plot(sarsa_stats,label='SARSA')
plt.xlabel('Episodes')
plt.ylabel('Accumulated Reward')
plt.legend()
plt.show()


In [None]:
# perform the actions in the environment
# according to the optimal Q values by the agents


def TestAgentPolicy(env,Q_values):
    num_episodes = 100
    episode_len = 1000
    stats = np.zeros(num_episodes)
    time = np.zeros(num_episodes)
    for episodes in range(num_episodes):
        observation = env.reset()
        s = observation[0]

        for t in range(episode_len):
            a = np.argmax(Q_values[s,])
            next_state,reward,done,info,_ = env.step(a)
            stats[episodes]+=reward
            time[episodes] = t
            s = next_state
            if done == True:
                break
    return stats,time




In [None]:
# plot qst 

qst , tst = TestAgentPolicy(env,agent.Q_values)
sst , tst2 = TestAgentPolicy(env,agent2.Q_values)
plt.plot(qst,label='Q-Learning')
plt.plot(sst,label='SARSA')
plt.xlabel('Episodes')
plt.ylabel('Accumulated Reward')
plt.legend()

In [None]:
# plot timesteps tst and tst2
plt.plot(tst,label='Q-Learning')
plt.plot(tst2,label='SARSA')
plt.xlabel('Episodes')
plt.ylabel('Timesteps')
# plt.yscale('log')
plt.legend()