## Cross-Entropy Method

In [6]:
import gymnasium as gym
import typing as tt
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from dataclasses import dataclass
from torch.utils.tensorboard.writer import SummaryWriter

In [7]:
HIDDEN_SIZE = 128
BATCH_SIZE = 16
PERCENTILE = 70

class Net(nn.Module):
    def __init__(self, obs_size: int, hidden_size: int, n_actions: int):
        super(Net, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(obs_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, n_actions),
        )
    def forward(self, x):
        return self.net(x)

@dataclass
class Step:
    observation: np.ndarray
    action: int

@dataclass
class Episode:
    reward: float
    steps: tt.List[Step]

def iterate_batches(env: gym.Env, net: Net, batch_size: int) -> tt.Generator[tt.List[Episode], None, None]:
    batch = []
    episode_reward = 0.
    episode_steps = []
    obs, _ = env.reset()
    sm = nn.Softmax(dim=1)

    while True:
        obs_v = torch.tensor(obs, dtype=torch.float32)
        act_probs_v = sm(net(obs_v.unsqueeze(0)))
        act_probs = act_probs_v.data.numpy()[0]
        action = np.random.choice(len(act_probs), p=act_probs)
        next_obs, reward, is_done, is_trunc, _ = env.step(action)
        episode_reward += reward
        step = Step(observation=obs, action=action)
        episode_steps.append(step)

        if is_done or is_trunc:
            e = Episode(reward=episode_reward, steps=episode_steps)
            batch.append(e)
            episode_reward = 0
            episode_steps = []
            next_obs, _ = env.reset()
            if len(batch) == batch_size:
                yield batch
                batch = []
        obs = next_obs

def filter_batch(batch: tt.List[Episode], percentile: float) -> tt.Tuple[torch.FloatTensor, torch.LongTensor, float, float]:
    rewards = list(map(lambda s: s.reward, batch))
    reward_bound = float(np.percentile(rewards, percentile))
    reward_mean = float(np.mean(rewards))
    train_obs: tt.List[np.ndarray] = []
    train_act: tt.List[int] = []
    
    for episode in batch:
        if episode.reward < reward_bound:
            continue
        train_obs.extend(map(lambda step: step.observation, episode.steps))
        train_act.extend(map(lambda step: step.action, episode.steps))
    
    train_obs_v = torch.FloatTensor(np.vstack(train_obs))
    train_act_v = torch.LongTensor(train_act)
    return train_obs_v, train_act_v, reward_bound, reward_mean



In [None]:
if __name__ == "__main__":
    env = gym.make("CartPole-v1")
    assert env.observation_space.shape is not None
    obs_size = env.observation_space.shape[0]
    assert isinstance(env.action_space, gym.spaces.Discrete)
    n_actions = int(env.action_space.n)

    net = Net(obs_size, HIDDEN_SIZE, n_actions)
    objective = nn.CrossEntropyLoss()
    optimizer = optim.Adam(params=net.parameters(), lr=0.01)
    writer = SummaryWriter()

    for iter_no, batch in enumerate(iterate_batches(env, net, BATCH_SIZE)):
        obs_v, acts_v, reward_b, reward_m = filter_batch(batch, PERCENTILE)
        optimizer.zero_grad()
        action_scores_v = net(obs_v)
        loss_v = objective(action_scores_v, acts_v)
        loss_v.backward()
        optimizer.step()

        print("%d: loss=%.3f, reward_mean=%.1f, rw_bound=%.1f" % (iter_no, loss_v.item(), reward_m, reward_b))
        writer.add_scalar("loss", loss_v.item(), iter_no)
        writer.add_scalar("reward_mean", reward_m, iter_no)
        writer.add_scalar("reward_bound", reward_b, iter_no)

        if reward_m > 475:
            print("Solved!")
            break
    writer.close()


0: loss=0.697, reward_mean=26.7, rw_bound=29.0
1: loss=0.671, reward_mean=32.6, rw_bound=33.5
2: loss=0.658, reward_mean=25.6, rw_bound=27.0
3: loss=0.647, reward_mean=38.7, rw_bound=42.5
4: loss=0.630, reward_mean=32.3, rw_bound=45.0
5: loss=0.648, reward_mean=39.9, rw_bound=47.5
6: loss=0.626, reward_mean=45.8, rw_bound=52.0
7: loss=0.619, reward_mean=48.7, rw_bound=51.5
8: loss=0.610, reward_mean=42.2, rw_bound=49.0
9: loss=0.612, reward_mean=44.1, rw_bound=47.5
10: loss=0.596, reward_mean=46.4, rw_bound=55.5
11: loss=0.582, reward_mean=53.2, rw_bound=63.0
12: loss=0.597, reward_mean=59.8, rw_bound=65.5
13: loss=0.576, reward_mean=50.6, rw_bound=54.5
14: loss=0.591, reward_mean=49.5, rw_bound=56.0
15: loss=0.580, reward_mean=49.7, rw_bound=55.0
16: loss=0.569, reward_mean=57.4, rw_bound=60.5
17: loss=0.581, reward_mean=62.2, rw_bound=68.5
18: loss=0.554, reward_mean=70.6, rw_bound=72.5
19: loss=0.558, reward_mean=65.1, rw_bound=70.0
20: loss=0.556, reward_mean=66.5, rw_bound=78.0
21

# Value Based Method

### $$ V(s) = E [\sum_{t=0}^\infty \gamma^t  R_t] $$

### Value Iteration Method for Frozen-Lake

In [18]:
import typing as tt
import gymnasium as gym
from collections import defaultdict, Counter
from torch.utils.tensorboard.writer import SummaryWriter

ENV_NAME = "FrozenLake-v1"
GAMMA = 0.9
TEST_EPISODES = 20

State = int
Action = int
RewardKey = tt.Tuple[State, Action, State]
TransitKey = tt.Tuple[State, Action]

class Agent:
    def __init__(self):
        self.env = gym.make(ENV_NAME)
        self.state, _ = self.env.reset()
        self.rewards: tt.Dict[RewardKey, float] = defaultdict(float)
        self.transits: tt.Dict[TransitKey, Counter] = defaultdict(Counter)
        self.values: tt.Dict[State, float] = defaultdict(float)
    
    def play_n_random_steps(self, n:int):
        for _ in range(n):
            action = self.env.action_space.sample()
            new_state, reward, is_done, is_trunc, _ = self.env.step(action)
            rw_key = (self.state, action, new_state)
            self.rewards[rw_key] = float(reward)
            tr_key = (self.state, action)
            self.transits[tr_key][new_state] += 1
            if is_done or is_trunc:
                self.state, _ = self.env.reset()
            else:
                self.state = new_state

    def calc_action_value(self, state:State, action: Action) -> float:
        target_counts = self.transits[(state, action)]
        total = sum(target_counts.values())
        action_value = 0.
        for tgt_state, count in target_counts.items():
            rw_key = (state, action, tgt_state)
            reward = self.rewards[rw_key]
            val = reward + GAMMA * self.values[tgt_state]
            action_value += (count / total) * val
        return action_value
    
    def select_action(self, state: State) -> Action:
        best_action, best_value = None, None
        for action in range(self.env.action_space.n):
            action_value = self.calc_action_value(state, action)
            if best_value is None or best_value < action_value:
                best_action, best_value = action, action_value
        return best_action
    
    def play_episode(self, env: gym.Env) -> float:
        total_reward = 0.
        state, _ = self.env.reset()
        while True:
            action = self.select_action(state)
            new_state, reward, is_done, is_trunc, _ = self.env.step(action)
            rw_key = (state, action, new_state)
            self.rewards[rw_key] = float(reward)
            tr_key = (state, action)
            self.transits[tr_key][new_state] += 1
            total_reward += reward
            if is_done or is_trunc:
                break
            state = new_state
        return total_reward
    
    def value_iteration(self):
        for state in range(self.env.observation_space.n):
            action_values = [self.calc_action_value(state, action) for action in range(self.env.action_space.n)]
            self.values[state] = max(action_values)

In [20]:
if __name__ == "__main__":
    test_env = gym.make(ENV_NAME)
    agent = Agent()
    writer = SummaryWriter(comment="-v-iteration")

    iter_no = 0
    best_reward = 0.
    while True:
        iter_no += 1
        agent.play_n_random_steps(100)
        agent.value_iteration()
        reward = 0.
        for _ in range(TEST_EPISODES): 
            reward += agent.play_episode(test_env) 
        reward /= TEST_EPISODES 
        writer.add_scalar("reward", reward, iter_no) 
        if reward > best_reward: 
            print(f"{iter_no}: Best reward updated {best_reward:.3} -> {reward:.3}") 
            best_reward = reward 
        if reward > 0.80: 
            print("Solved in %d iterations!" % iter_no) 
            break 
    writer.close()


6: Best reward updated 0.0 -> 0.1
8: Best reward updated 0.1 -> 0.15
9: Best reward updated 0.15 -> 0.5
13: Best reward updated 0.5 -> 0.55
14: Best reward updated 0.55 -> 0.6
16: Best reward updated 0.6 -> 0.65
18: Best reward updated 0.65 -> 0.7
21: Best reward updated 0.7 -> 0.85
Solved in 21 iterations!
