# Tabular Learning

In [None]:
import gymnasium as gym
from gymnasium.wrappers import NumpyToTorch
import collections
import wandb


In [2]:
from velora.utils import load_config

config = load_config("config/tl_fl.yaml")

## Value Iteration

In [3]:
from velora import Config


class Agent:
    def __init__(self, config: Config, map_size: str) -> None:
        self.env: gym.Env = gym.make(config.env.name, map_name=map_size)
        self.state, _ = self.env.reset()

        self.rewards = collections.defaultdict(float)
        self.transits = collections.defaultdict(collections.Counter)
        self.values = collections.defaultdict(float)

    def play_n_random_steps(self, count: int) -> None:
        for _ in range(count):
            action = self.env.action_space.sample()
            next_state, reward, terminated, truncated, _ = self.env.step(action)

            self.rewards[(self.state, action, next_state)] = reward
            self.transits[(self.state, action)][next_state] += 1

            if truncated or terminated:
                self.state, _ = self.env.reset()
            else: 
                self.state = next_state
    
    def calc_action_value(self, state: int, action: int) -> float:
        target_counts = self.transits[(state, action)]
        total = sum(target_counts.values())
        action_value = 0.0
        
        for tgt_state, count in target_counts.items():
            reward = self.rewards[(state, action, tgt_state)]
            val = reward + config.model.gamma * self.values[tgt_state]
            action_value += (count / total) * val
        
        return action_value

    def select_action(self, state: int) -> int:
        best_action, best_value = None, None

        for action in range(self.env.action_space.n):
            action_value = self.calc_action_value(state, action)

            if best_value is None or best_value < action_value:
                best_value = action_value
                best_action = action
        
        return best_action

    def play_episode(self, env: gym.Env) -> float:
        total_reward = 0.
        state, _ = env.reset()

        while True:
            action = self.select_action(state)
            next_state, reward, terminated, truncated, _ = env.step(action)
            self.rewards[(state, action, next_state)] = reward
            self.transits[(state, action)][next_state] += 1
            total_reward += reward

            if terminated or truncated:
                break

            state = next_state
        
        return total_reward

    def value_iteration(self) -> None:
        for state in range(self.env.observation_space.n):
            state_values = [
                self.calc_action_value(state, action)
                for action in range(self.env.action_space.n)
            ]
            self.values[state] = max(state_values)


In [4]:
# wandb.login()

In [11]:
def train(agent: Agent, config: Config, map_size: str) -> None:
    test_env: gym.Env = gym.make(config.env.name, map_name=map_size)
    iter_no = 0
    best_reward = 0.

    while True:
        iter_no += 1
        reward = 0.

        agent.play_n_random_steps(100)
        agent.value_iteration()
        
        for _ in range(config.env.episodes):
            reward += agent.play_episode(test_env)
        
        reward /= config.env.episodes

        if reward > best_reward:
            print(f"Best reward updated {best_reward:.3f} -> {reward:.3f}")
            best_reward = reward
        
        if reward > config.other.solve_threshold:
            print(f"Solved in {iter_no} iterations!")
            break

In [None]:
map_size="4x4"
agent = Agent(config=config, map_size=map_size)
train(agent, config, map_size)

Best reward updated 0.000 -> 0.150
Best reward updated 0.150 -> 0.200
Best reward updated 0.200 -> 0.250
Best reward updated 0.250 -> 0.350
Best reward updated 0.350 -> 0.550
Best reward updated 0.550 -> 0.650
Best reward updated 0.650 -> 0.750
Best reward updated 0.750 -> 0.800
Best reward updated 0.800 -> 0.850
Solved in 87 iterations!


# Q Learning

In [8]:
from velora import Config


class Agent_Q:
    def __init__(self, config: Config, map_size: str) -> None:
        self.env: gym.Env = gym.make(config.env.name, map_name=map_size)
        self.state, _ = self.env.reset()

        self.rewards = collections.defaultdict(float)
        self.transits = collections.defaultdict(collections.Counter)
        self.values = collections.defaultdict(float)

    def play_n_random_steps(self, count: int) -> None:
        for _ in range(count):
            action = self.env.action_space.sample()
            next_state, reward, terminated, truncated, _ = self.env.step(action)

            self.rewards[(self.state, action, next_state)] = reward
            self.transits[(self.state, action)][next_state] += 1

            if truncated or terminated:
                self.state, _ = self.env.reset()
            else: 
                self.state = next_state

    def select_action(self, state: int) -> int:
        best_action, best_value = None, None

        for action in range(self.env.action_space.n):
            action_value = self.values[(state, action)]

            if best_value is None or best_value < action_value:
                best_value = action_value
                best_action = action
        
        return best_action

    def play_episode(self, env: gym.Env) -> float:
        total_reward = 0.
        state, _ = env.reset()

        while True:
            action = self.select_action(state)
            next_state, reward, terminated, truncated, _ = env.step(action)
            self.rewards[(state, action, next_state)] = reward
            self.transits[(state, action)][next_state] += 1
            total_reward += reward

            if terminated or truncated:
                break

            state = next_state
        
        return total_reward

    def value_iteration(self) -> None:
        for state in range(self.env.observation_space.n):
            for action in range(self.env.action_space.n):
                action_value = 0.
                target_counts = self.transits[(state, action)]
                total = sum(target_counts.values())

                for tgt_state, count in target_counts.items():
                    key = (state, action, tgt_state)
                    reward = self.rewards[key]
                    best_action = self.select_action(tgt_state)

                    val = reward + config.model.gamma * self.values[(tgt_state, best_action)]
                    action_value += (count / total) * val
                
                self.values[(state, action)] = action_value

In [13]:
map_size="4x4"
agent = Agent_Q(config=config, map_size=map_size)
train(agent, config, map_size)

Best reward updated 0.000 -> 0.300
Best reward updated 0.300 -> 0.350
Best reward updated 0.350 -> 0.500
Best reward updated 0.500 -> 0.850
Solved in 18 iterations!
