In [None]:
import gymnasium as gym
import logging
import numpy as np
import sys

In [None]:
#Online MC control, only works for discrete observation & action space(tabular case)
#deterministic policy
class MonteCarloControl:
    def __init__(self,env,logger,gamma=1.0,first_visit=False):
        self.env = env
        self.n = env.observation_space.n
        self.action = env.action_space.n
        self.first = first_visit
        self.gamma = gamma
        self.logger = logger
        self.epsilon = 1.0
    def initialize(self):
        self.q_value = np.zeros((self.n,self.action))
        self.returns = np.zeros((self.n,self.action))
        self.nums = np.zeros((self.n,self.action),dtype=np.int32)#record number of returns for a specific (s,a)
        self.pi = None
    def generate_episode(self):
        episode = {"states":[],"actions":[],"rewards":[]}
        obs, _ = self.env.reset()
        self.starting = obs
        episode["states"].append(obs)
        done = False
        while not done:
            action = self.env.action_space.sample() if self.pi is None or np.random.uniform() <= self.epsilon else self.pi[obs]
            obs, reward, terminated, truncated, _ = self.env.step(action)
            episode["states"].append(obs)
            episode["rewards"].append(reward)
            episode["actions"].append(action)
            done = terminated or truncated
        return episode
    def evaluate(self,n_iter):
        obs, _ = self.env.reset()
        returns = 0
        done = False
        while not done:
            obs, reward, terminated, truncated, _ = self.env.step(self.pi[obs])
            returns += reward
            done = terminated or truncated
        self.logger.info("[ITERATION {}]: The expected return is {}".format(n_iter,returns))
        
    def run(self,num_eps=1000):
        if not hasattr(self,"pi"):
            self.initialize()
        for k in range(num_eps):
            episode = self.generate_episode()
            episodic_returns = np.zeros((self.n,self.action))
            episodic_nums = np.zeros((self.n,self.action),dtype=np.int32)
            states, rewards, actions = episode["states"], episode["rewards"], episode["actions"]
            returns = 0
            for state, reward, action in zip(reversed(states[:-1]),reversed(rewards),reversed(actions)):
                returns = returns * self.gamma + reward
                if self.first is True:
                    episodic_returns[state,action] = returns
                    episodic_nums[state,action] = 1
                else:
                    episodic_returns[state,action] += returns
                    episodic_nums[state,action] += 1
            self.returns += episodic_returns
            self.nums += episodic_nums
            self.q_value = self.returns / ((self.nums == 0) + self.nums)
            
            self.epsilon = 1/(k+2)
            self.pi = np.argmax(self.q_value,axis=1)
            self.evaluate(k)
        return self.pi, self.q_value

In [None]:
def get_logger():
    logger = logging.getLogger('MCLogger')
    logger.setLevel(logging.INFO)
    fileHandler = logging.FileHandler('mc.log',mode='w')
    stdHandler = logging.StreamHandler(stream=sys.stdout)
    formatter = logging.Formatter('%(message)s')
    fileHandler.setFormatter(formatter)
    stdHandler.setFormatter(formatter)
    if not logger.hasHandlers():
        logger.addHandler(fileHandler)
        logger.addHandler(stdHandler)
    return logger

In [None]:
cliff_env = gym.make('CliffWalking-v0')
lake_env = gym.make('FrozenLake-v1')

In [None]:
cliff_env = gym.wrappers.TimeLimit(cliff_env,max_episode_steps=2000)
lake_env = gym.wrappers.TimeLimit(lake_env,max_episode_steps=2000)

In [None]:
mcc = MonteCarloControl(env=cliff_env,logger=get_logger(),first_visit=True)

In [None]:
pi,q = mcc.run(num_eps=6000)