In [2]:
!pip install gymnasium
!pip install plotly

Collecting gymnasium
  Downloading gymnasium-1.0.0-py3-none-any.whl.metadata (9.5 kB)
Collecting farama-notifications>=0.0.1 (from gymnasium)
  Downloading Farama_Notifications-0.0.4-py3-none-any.whl.metadata (558 bytes)
Downloading gymnasium-1.0.0-py3-none-any.whl (958 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m958.1/958.1 kB[0m [31m13.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading Farama_Notifications-0.0.4-py3-none-any.whl (2.5 kB)
Installing collected packages: farama-notifications, gymnasium
Successfully installed farama-notifications-0.0.4 gymnasium-1.0.0


In [34]:
import plotly.io as pio
pio.renderers.default = 'colab'

from google.colab import output
output.enable_custom_widget_manager()

In [35]:
import gymnasium as gym
import numpy as np
import tensorflow as tf
import plotly.graph_objects as goplot_value_function_heatmap
import plotly.express as px
import plotly.subplots as sp
import pandas as pd
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Dict, List, Tuple, Any
from tqdm.auto import tqdm
import time

class CliffWalkingEnvironment:
    def __init__(self):
        self.env = gym.make("CliffWalking-v0")
        self.nS = self.env.observation_space.n
        self.nA = self.env.action_space.n
        self.height = 4
        self.width = 12

    def get_ascii_map(self):
        """Returns ASCII representation of the environment"""
        grid = [['.' for _ in range(self.width)] for _ in range(self.height)]

        # Mark cliff positions
        for x in range(1, self.width-1):
            grid[self.height-1][x] = 'C'

        # Mark start and goal
        grid[self.height-1][0] = 'S'
        grid[self.height-1][self.width-1] = 'G'

        return '\n'.join([''.join(row) for row in grid])

    def state_to_coords(self, state):
        """Convert state number to grid coordinates"""
        return state // self.width, state % self.width

    def coords_to_state(self, row, col):
        """Convert grid coordinates to state number"""
        return row * self.width + col

In [None]:
class EpsilonGreedyPolicy:
    def __init__(self, env, epsilon_start=1.0, epsilon_end=0.01, epsilon_decay=0.995):
        self.env = env
        self.epsilon = epsilon_start
        self.epsilon_end = epsilon_end
        self.epsilon_decay = epsilon_decay
        self.q_table = defaultdict(lambda: np.zeros(env.nA))

    def select_action(self, state):
        """
        Epsilon-greedy action selection
        """
        if np.random.random() < self.epsilon:
            return np.random.randint(self.env.nA)
        else:
            return np.argmax(self.q_table[state])

    def update(self, state, action, reward, next_state, alpha=0.1, gamma=0.99):
        """
        Q-learning update rule
        """
        best_next_action = np.argmax(self.q_table[next_state])
        td_target = reward + gamma * self.q_table[next_state][best_next_action]
        td_error = td_target - self.q_table[state][action]
        self.q_table[state][action] += alpha * td_error

    def decay_epsilon(self):
        """
        Decay exploration rate
        """
        self.epsilon = max(self.epsilon_end, self.epsilon * self.epsilon_decay)

In [None]:
class SoftmaxPolicy:
    def __init__(self, env, temperature_start=1.0, temperature_end=0.1, temperature_decay=0.995):
        self.env = env
        self.temperature = temperature_start
        self.temperature_end = temperature_end
        self.temperature_decay = temperature_decay
        self.q_table = defaultdict(lambda: np.zeros(env.nA))

    def select_action(self, state):
        """
        Softmax action selection
        """
        q_values = self.q_table[state]
        probabilities = self._softmax(q_values)
        return np.random.choice(self.env.nA, p=probabilities)

    def _softmax(self, x):
        """
        Compute softmax probabilities
        """
        x = x / self.temperature
        exp_x = np.exp(x - np.max(x))  # Subtract max for numerical stability
        return exp_x / exp_x.sum()

    def update(self, state, action, reward, next_state, alpha=0.1, gamma=0.99):
        """
        Q-learning update rule
        """
        best_next_action = np.argmax(self.q_table[next_state])
        td_target = reward + gamma * self.q_table[next_state][best_next_action]
        td_error = td_target - self.q_table[state][action]
        self.q_table[state][action] += alpha * td_error

    def decay_temperature(self):
        """
        Decay temperature parameter
        """
        self.temperature = max(self.temperature_end, self.temperature * self.temperature_decay)

In [None]:
@dataclass
class TrainingMetrics:
    """Store training metrics during learning process"""
    episode_rewards: List[float] = field(default_factory=list)
    episode_lengths: List[int] = field(default_factory=list)
    state_visits: Dict[int, int] = field(default_factory=lambda: defaultdict(int))
    action_history: Dict[Tuple[int, int], int] = field(default_factory=lambda: defaultdict(int))
    td_errors: List[float] = field(default_factory=list)
    value_history: Dict[int, List[float]] = field(default_factory=lambda: defaultdict(list))
    path_sequences: List[List[int]] = field(default_factory=list)
    near_cliff_visits: int = 0
    recovery_actions: int = 0
    success_count: int = 0
    q_value_updates: Dict[Tuple[int, int], List[float]] = field(default_factory=lambda: defaultdict(list))
    policy_entropy: List[float] = field(default_factory=list)
    training_phase: List[str] = field(default_factory=list)

In [36]:
class TDLearner:
    def __init__(self, env, gamma=0.99, alpha=0.1):
        """
        Initialize TD Learning agent

        Args:
            env: CliffWalkingEnvironment instance
            gamma: Discount factor
            alpha: Learning rate
        """
        self.env = env
        self.gamma = gamma
        self.alpha = alpha
        self.value_function = defaultdict(float)
        self.metrics_collector = MetricsCollector(env)

    def learn(self, policy, episodes):
        """
        Enhanced learning loop with metrics collection

        Args:
            policy: Policy instance (EpsilonGreedy or Softmax)
            episodes: Number of training episodes

        Returns:
            dict: Training results and collected metrics
        """
        history = []
        current_path = []

        with tqdm(total=episodes, desc="Training", unit="episode") as pbar:
            for episode in range(episodes):
                state, _ = self.env.env.reset()
                done = False
                total_reward = 0
                steps = 0
                current_path = [state]

                while not done:
                    # Collect pre-action metrics
                    self.metrics_collector.update_state_visit(state)

                    # Select and execute action
                    action = policy.select_action(state)
                    self.metrics_collector.update_action_history(state, action)
                    next_state, reward, done, truncated, _ = self.env.env.step(action)
                    done = done or truncated

                    # Update policy and collect metrics
                    td_error = self._update_policy(policy, state, action, reward, next_state)
                    self.metrics_collector.record_td_error(td_error)

                    # Check for near-cliff states and recovery actions
                    if self.metrics_collector.check_near_cliff(state):
                        self.metrics_collector.metrics.near_cliff_visits += 1
                        if reward > -100:  # Successful recovery
                            self.metrics_collector.metrics.recovery_actions += 1

                    # Update state and collect post-action metrics
                    state = next_state
                    current_path.append(state)
                    total_reward += reward
                    steps += 1

                # Episode completion metrics
                self.metrics_collector.metrics.episode_rewards.append(total_reward)
                self.metrics_collector.metrics.episode_lengths.append(steps)
                self.metrics_collector.record_path_sequence(current_path)

                if reward > -100:  # Successful episode
                    self.metrics_collector.metrics.success_count += 1

                # Policy parameter decay
                if hasattr(policy, 'decay_epsilon'):
                    policy.decay_epsilon()
                elif hasattr(policy, 'decay_temperature'):
                    policy.decay_temperature()

                # Record training phase
                phase = self._determine_training_phase(episode, episodes)
                self.metrics_collector.metrics.training_phase.append(phase)

                # Update progress bar
                pbar.set_postfix({
                    'Reward': f'{total_reward:.2f}',
                    'Steps': steps,
                    'Success Rate': f'{self.metrics_collector.metrics.success_count/(episode+1):.2%}'
                })
                pbar.update(1)

        return {
            'value_function': dict(self.value_function),
            'history': history,
            'metrics': self.metrics_collector.metrics
        }

    def _update_policy(self, policy, state, action, reward, next_state) -> float:
        """
        Update policy and calculate TD error

        Args:
            policy: Current policy
            state: Current state
            action: Taken action
            reward: Received reward
            next_state: Resulting state

        Returns:
            float: TD error
        """
        # Standard Q-learning update
        best_next_action = np.argmax(policy.q_table[next_state])
        td_target = reward + self.gamma * policy.q_table[next_state][best_next_action]
        td_error = td_target - policy.q_table[state][action]
        policy.q_table[state][action] += self.alpha * td_error

        # Update value function and record metrics
        self.value_function[state] = np.max(policy.q_table[state])
        self.metrics_collector.update_value_history(state, self.value_function[state])

        return td_error

    def _determine_training_phase(self, episode: int, total_episodes: int) -> str:
        """
        Determine current training phase

        Args:
            episode: Current episode number
            total_episodes: Total number of episodes

        Returns:
            str: Current training phase
        """
        if episode < total_episodes * 0.2:
            return 'exploration'
        elif episode < total_episodes * 0.5:
            return 'exploitation_transition'
        elif episode < total_episodes * 0.8:
            return 'stable_performance'
        else:
            return 'fine_tuning'

In [None]:
def plot_value_function_heatmap(env, metrics, title="Value Function Heatmap"):
    """
    Create heatmap of final value function

    Args:
        env: CliffWalkingEnvironment instance
        metrics: TrainingMetrics instance
        title: Plot title
    """
    values = np.zeros((env.height, env.width))
    for state in range(env.nS):
        row, col = env.state_to_coords(state)
        values[row][col] = metrics.value_history[state][-1] if state in metrics.value_history else 0

    fig = go.Figure(data=go.Heatmap(
        z=values,
        colorscale='Viridis',
        text=np.around(values, 2),
        texttemplate='%{text}',
        textfont={"size": 10},
        hoverongaps=False
    ))

    fig.update_layout(
        title=title,
        xaxis_title="Column",
        yaxis_title="Row",
        width=800,
        height=400
    )

    fig.show()  # Instead of return fig

In [None]:
def plot_training_progress(metrics, title="Training Progress"):
    """
    Create multi-metric training progress plot

    Args:
        metrics: TrainingMetrics instance
        title: Plot title
    """
    fig = sp.make_subplots(
        rows=2, cols=2,
        subplot_titles=("Episode Rewards", "Episode Lengths",
                       "TD Errors", "Success Rate")
    )

    # Episode rewards
    fig.add_trace(
        go.Scatter(y=metrics.episode_rewards, name="Reward"),
        row=1, col=1
    )

    # Episode lengths
    fig.add_trace(
        go.Scatter(y=metrics.episode_lengths, name="Steps"),
        row=1, col=2
    )

    # TD Errors
    fig.add_trace(
        go.Scatter(y=metrics.td_errors, name="TD Error"),
        row=2, col=1
    )

    # Success rate
    cumulative_success = np.cumsum(
        [1 if r > -100 else 0 for r in metrics.episode_rewards]
    ) / np.arange(1, len(metrics.episode_rewards) + 1)
    fig.add_trace(
        go.Scatter(y=cumulative_success, name="Success Rate"),
        row=2, col=2
    )

    fig.update_layout(
        title=title,
        height=800,
        width=1000,
        showlegend=True
    )

    fig.show()  # Instead of return fig

In [None]:
env = CliffWalkingEnvironment()
print("\nEnvironment Map:")
print(env.get_ascii_map())

# Initialize policies
epsilon_greedy = EpsilonGreedyPolicy(env)
softmax = SoftmaxPolicy(env)

print("\nStarting Epsilon-Greedy Learning...")
td_learner_eps = TDLearner(env)
eps_results = td_learner_eps.learn(epsilon_greedy, episodes=500)

print("\nStarting Softmax Learning...")
td_learner_softmax = TDLearner(env)
softmax_results = td_learner_softmax.learn(softmax, episodes=500)

# Create and display visualizations
plot_value_function_heatmap(
    env,
    eps_results['metrics'],
    "Epsilon-Greedy Value Function"
)

plot_value_function_heatmap(
    env,
    softmax_results['metrics'],
    "Softmax Value Function"
)

plot_training_progress(
    eps_results['metrics'],
    "Epsilon-Greedy Training Progress"
)

plot_training_progress(
    softmax_results['metrics'],
    "Softmax Training Progress"
)