# chapter 9

In [1]:
import gymnasium as gymp
import numpy as np

def gradient_monte_carlo(env, num_episodes=10000, alpha=0.01, gamma=1.0):
    num_actions = env.action_space.n
    num_states = env.observation_space.n
    
    # Initialize weights for linear function approximator
    w = np.zeros(num_states)  # One weight per state
    
    # Random policy: uniform probability over actions
    def random_policy():
        return np.ones(num_actions) / num_actions
    
    # Feature vector: one-hot encoding for state s
    def phi(state):
        features = np.zeros(num_states)
        features[state] = 1.0
        return features
    
    # Value function estimate: v_hat(s, w) = w^T * phi(s)
    def v_hat(state, w):
        return np.dot(w, phi(state))
    
    # Run episodes
    for episode in range(num_episodes):
        # Initialize episode
        state, _ = env.reset()
        episode_states = []
        episode_actions = []
        episode_rewards = []
        done = False
        
        # Generate episode using random policy
        while not done:
            action_probs = random_policy()
            action = np.random.choice(num_actions, p=action_probs)
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            
            episode_states.append(state)
            episode_actions.append(action)
            episode_rewards.append(reward)
            state = next_state
        
        # Compute returns and update weights
        G = 0
        for t in range(len(episode_states) - 1, -1, -1):
            state = episode_states[t]
            reward = episode_rewards[t]
            G = reward + gamma * G  # Compute return
            
            # Update weights: w = w + alpha * (G - v_hat(s, w)) * phi(s)
            w += alpha * (G - v_hat(state, w)) * phi(state)
    
    return w, v_hat


In [2]:
import gymnasium as gym
import numpy as np

def semi_gradient_td0(env, num_episodes=10000, alpha=0.01, gamma=1.0):
    num_actions = env.action_space.n
    num_states = env.observation_space.n
    
    # Initialize weights for linear function approximator
    w = np.zeros(num_states)  # One weight per state
    
    # Random policy: uniform probability over actions
    def random_policy():
        return np.ones(num_actions) / num_actions
    
    # Feature vector: one-hot encoding for state s
    def phi(state):
        features = np.zeros(num_states)
        features[state] = 1.0
        return features
    
    # Value function estimate: v_hat(s, w) = w^T * phi(s)
    def v_hat(state, w):
        return np.dot(w, phi(state))
    
    # Loop for each episode
    for episode in range(num_episodes):
        # Initialize Sg
        state, _ = env.reset()
        
        # Loop for each step of episode
        done = False
        while not done:
            # Choose A ~ π(·|S)
            action_probs = random_policy()
            action = np.random.choice(num_actions, p=action_probs)
            
            # Take action A, observe R, S'
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            
            # Update weights: w ← w + α[R + γv̂(S', w) − v̂(S, w)]∇v̂(S, w)
            target = reward + gamma * v_hat(next_state, w)
            error = target - v_hat(state, w)
            w += alpha * error * phi(state)
            
            # S ← S'
            state = next_state
    
    return w, v_hat



In [3]:
import gymnasium as gym
import numpy as np

def n_step_semi_gradient_td(env, num_episodes=10000, alpha=0.01, n=3, gamma=1.0):
    num_actions = env.action_space.n
    num_states = env.observation_space.n
    
    # Initialize weights
    w = np.zeros(num_states)
    
    # Random policy: uniform probability over actions
    def random_policy():
        return np.ones(num_actions) / num_actions
    
    # Feature vector: one-hot encoding for state s
    def phi(state):
        features = np.zeros(num_states)
        features[state] = 1.0
        return features
    
    # Value function estimate: v_hat(s, w) = w^T * phi(s)
    def v_hat(state, w):
        return np.dot(w, phi(state))
    
    # Loop for each episode
    for episode in range(num_episodes):
        # Initialize and store S0 != terminal
        state, _ = env.reset()
        states = [state]  # Store states: S0, S1, ...
        rewards = [0]     # Store rewards: dummy R0, R1, ...
        
        T = float('inf')  # Time of termination
        t = 0  # Current time step
        
        while True:
            # If t < T, take action and observe next state/reward
            if t < T:
                action_probs = random_policy()
                action = np.random.choice(num_actions, p=action_probs)
                next_state, reward, terminated, truncated, _ = env.step(action)
                
                # Store next state and reward
                states.append(next_state)
                rewards.append(reward)
                
                # Check if terminal
                if terminated or truncated:
                    T = t + 1
            
            # Tau is the time whose estimate is being updated
            tau = t - n + 1
            
            # If tau >= 0, update weights
            if tau >= 0:
                # Compute G: sum of rewards from tau+1 to min(tau+n, T)
                G = 0
                for i in range(tau + 1, min(tau + n, T) + 1):
                    G += (gamma ** (i - tau - 1)) * rewards[i]
                
                # If tau + n < T, add gamma^n * v_hat(S_{tau+n}, w)
                if tau + n < T:
                    G += (gamma ** n) * v_hat(states[tau + n], w)
                
                # Update weights: w ← w + α[G - v_hat(S_tau, w)]∇v_hat(S_tau, w)
                state_tau = states[tau]
                w += alpha * (G - v_hat(state_tau, w)) * phi(state_tau)
            
            t += 1
            
            # Until tau = T - 1
            if tau == T - 1:
                break
    
    return w, v_hat



In [4]:
import gymnasium as gym
import numpy as np

def n_step_semi_gradient_td_fourier(env, num_episodes=10000, alpha=0.01, n=3, gamma=1.0, fourier_order=5):
    num_actions = env.action_space.n
    num_states = env.observation_space.n
    num_features = fourier_order + 1  # Order 0 to fourier_order
    
    # Initialize weights
    w = np.zeros(num_features)
    
    # Random policy: uniform probability over actions
    def random_policy():
        return np.ones(num_actions) / num_actions
    
    # Fourier basis features: cos(pi * c * s_norm), s_norm in [0, 1]
    def phi(state):
        s_norm = state / (num_states - 1)  # Normalize state to [0, 1]
        features = np.array([np.cos(np.pi * c * s_norm) for c in range(fourier_order + 1)])
        return features
    
    # Value function estimate: v_hat(s, w) = w^T * phi(s)
    def v_hat(state, w):
        return np.dot(w, phi(state))
    
    # Loop for each episode
    for episode in range(num_episodes):
        state, _ = env.reset()
        states = [state]
        rewards = [0]
        
        T = float('inf')
        t = 0
        
        while True:
            if t < T:
                action_probs = random_policy()
                action = np.random.choice(num_actions, p=action_probs)
                next_state, reward, terminated, truncated, _ = env.step(action)
                
                states.append(next_state)
                rewards.append(reward)
                
                if terminated or truncated:
                    T = t + 1
            
            tau = t - n + 1
            if tau >= 0:
                G = 0
                for i in range(tau + 1, min(tau + n, T) + 1):
                    G += (gamma ** (i - tau - 1)) * rewards[i]
                if tau + n < T:
                    G += (gamma ** n) * v_hat(states[tau + n], w)
                
                state_tau = states[tau]
                w += alpha * (G - v_hat(state_tau, w)) * phi(state_tau)
            
            t += 1
            if tau == T - 1:
                break
    
    return w, v_hat



In [5]:
import gymnasium as gym
import numpy as np

def n_step_semi_gradient_td_coarse(env, num_episodes=10000, alpha=0.01, n=3, gamma=1.0):
    num_actions = env.action_space.n
    num_states = env.observation_space.n
    
    # Define coarse coding regions (overlapping intervals)
    num_regions = 4
    region_size = 6  # Each region covers 6 states
    regions = [(i, i + region_size) for i in range(0, num_states, 4)]  # e.g., (0,6), (4,10), (8,14), (12,18)
    
    # Initialize weights
    w = np.zeros(num_regions)
    
    # Random policy
    def random_policy():
        return np.ones(num_actions) / num_actions
    
    # Coarse coding features: 1 if state in region, 0 otherwise
    def phi(state):
        features = np.zeros(num_regions)
        for idx, (start, end) in enumerate(regions):
            if start <= state < min(end, num_states):
                features[idx] = 1.0
        return features
    
    # Value function estimate
    def v_hat(state, w):
        return np.dot(w, phi(state))
    
    # Loop for each episode
    for episode in range(num_episodes):
        state, _ = env.reset()
        states = [state]
        rewards = [0]
        
        T = float('inf')
        t = 0
        
        while True:
            if t < T:
                action_probs = random_policy()
                action = np.random.choice(num_actions, p=action_probs)
                next_state, reward, terminated, truncated, _ = env.step(action)
                
                states.append(next_state)
                rewards.append(reward)
                
                if terminated or truncated:
                    T = t + 1
            
            tau = t - n + 1
            if tau >= 0:
                G = 0
                for i in range(tau + 1, min(tau + n, T) + 1):
                    G += (gamma ** (i - tau - 1)) * rewards[i]
                if tau + n < T:
                    G += (gamma ** n) * v_hat(states[tau + n], w)
                
                state_tau = states[tau]
                w += alpha * (G - v_hat(state_tau, w)) * phi(state_tau)
            
            t += 1
            if tau == T - 1:
                break
    
    return w, v_hat


In [6]:
import gymnasium as gym
import numpy as np

def n_step_semi_gradient_td_tile(env, num_episodes=10000, alpha=0.01, n=3, gamma=1.0, num_tilings=4, tile_width=4):
    num_actions = env.action_space.n
    num_states = env.observation_space.n
    num_tiles_per_tiling = (num_states + tile_width - 1) // tile_width  # Ceiling division
    num_features = num_tilings * num_tiles_per_tiling  # Total features
    
    # Initialize weights
    w = np.zeros(num_features)
    
    # Random policy
    def random_policy():
        return np.ones(num_actions) / num_actions
    
    # Tile coding features
    def phi(state):
        features = np.zeros(num_features)
        for tiling in range(num_tilings):
            # Offset for each tiling
            offset = tiling * (tile_width / num_tilings)
            tile_idx = int((state + offset) // tile_width)
            if tile_idx < num_tiles_per_tiling:
                feature_idx = tiling * num_tiles_per_tiling + tile_idx
                features[feature_idx] = 1.0
        return features
    
    # Value function estimate
    def v_hat(state, w):
        return np.dot(w, phi(state))
    
    # Loop for each episode
    for episode in range(num_episodes):
        state, _ = env.reset()
        states = [state]
        rewards = [0]
        
        T = float('inf')
        t = 0
        
        while True:
            if t < T:
                action_probs = random_policy()
                action = np.random.choice(num_actions, p=action_probs)
                next_state, reward, terminated, truncated, _ = env.step(action)
                
                states.append(next_state)
                rewards.append(reward)
                
                if terminated or truncated:
                    T = t + 1
            
            tau = t - n + 1
            if tau >= 0:
                G = 0
                for i in range(tau + 1, min(tau + n, T) + 1):
                    G += (gamma ** (i - tau - 1)) * rewards[i]
                if tau + n < T:
                    G += (gamma ** n) * v_hat(states[tau + n], w)
                
                state_tau = states[tau]
                w += alpha * (G - v_hat(state_tau, w)) * phi(state_tau)
            
            t += 1
            if tau == T - 1:
                break
    
    return w, v_hat



In [None]:
import gymnasium as gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import random
from collections import deque

# Neural Network for Q-value approximation
class DQN(nn.Module):
    def __init__(self, input_size, output_size):
        super(DQN, self).__init__()
        # Simple network: two linear layers
        self.fc1 = nn.Linear(input_size, 64)  # Input: state features, Hidden: 64 units
        self.fc2 = nn.Linear(64, output_size)  # Output: Q-values for each action
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# DQN Agent
class DQNAgent:
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = deque(maxlen=2000)  # Experience replay buffer
        self.gamma = 1.0  # Discount factor
        self.epsilon = 1.0  # Exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001
        self.batch_size = 32
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # Main and target networks
        self.model = DQN(state_size, action_size).to(self.device)
        self.target_model = DQN(state_size, action_size).to(self.device)
        self.target_model.load_state_dict(self.model.state_dict())
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.learning_rate)

    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        # Epsilon-greedy policy
        if random.random() <= self.epsilon:
            return random.randrange(self.action_size)
        state = torch.FloatTensor(state).to(self.device)
        with torch.no_grad():
            q_values = self.model(state)
        return q_values.argmax().item()

    def replay(self):
        if len(self.memory) < self.batch_size:
            return
        # Sample a batch from memory
        batch = random.sample(self.memory, self.batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)

        states = torch.FloatTensor(states).to(self.device)
        actions = torch.LongTensor(actions).to(self.device)
        rewards = torch.FloatTensor(rewards).to(self.device)
        next_states = torch.FloatTensor(next_states).to(self.device)
        dones = torch.FloatTensor(dones).to(self.device)

        # Compute Q-values
        q_values = self.model(states).gather(1, actions.unsqueeze(1)).squeeze(1)
        next_q_values = self.target_model(next_states).max(1)[0].detach()
        target = rewards + (1 - dones) * self.gamma * next_q_values

        # Compute loss and optimize
        loss = nn.MSELoss()(q_values, target)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        # Decay epsilon
        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

    def update_target_network(self):
        self.target_model.load_state_dict(self.model.state_dict())

def main():
    # Environment setup
    env = gym.make("FrozenLake-v1", is_slippery=False)
    state_size = env.observation_space.n  # 16 states
    action_size = env.action_space.n  # 4 actions

    # One-hot encoding for states
    def one_hot(state, size):
        return np.eye(size)[state]

    agent = DQNAgent(state_size, action_size)
    episodes = 1000
    target_update_freq = 10  # Update target network every 10 episodes

    for episode in range(episodes):
        state, _ = env.reset()
        state = one_hot(state, state_size)
        total_reward = 0

        for t in range(100):  # Max steps per episode
            action = agent.act(state)
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated
            next_state_encoded = one_hot(next_state, state_size)

            # Modify reward for better learning
            reward = 1.0 if terminated and reward > 0 else -0.01 if not done else -1.0
            agent.remember(state, action, reward, next_state_encoded, done)

            state = next_state_encoded
            total_reward += reward

            if done:
                break

            agent.replay()

        if episode % target_update_freq == 0:
            agent.update_target_network()

        if episode % 100 == 0:
            print(f"Episode {episode}, Total Reward: {total_reward}, Epsilon: {agent.epsilon:.2f}")

    # Evaluate the learned policy
    state, _ = env.reset()
    state = one_hot(state, state_size)
    done = False
    steps = 0
    while not done and steps < 100:
        action = agent.act(state)
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        state = one_hot(next_state, state_size)
        steps += 1
        print(f"Step {steps}: State {next_state}, Action {action}, Reward {reward}")

    env.close()

if __name__ == "__main__":
    main()

In [7]:
def lstd(env,num_episodes=10000,epsilon=0.1,gamma=0.99):
    num_actions=env.action_space.n
    num_states=env.observation_space.n
    d=num_states
    A_inv=np.eye(d)/epsilon
    b=np.zeros(d)
    def random_policy():
        return np.ones(num_actions)/num_actions
    def phi(state):
        features=np.zeros(d)
        features[state]=1.0
        return features
    def v_hat(state,w):
        return np.dot(w,phi(state))

    for episode in range(num_episodes):
        states,_=env.reset()
        x=phi(state)
        while True:
            action_prob=random_policy()
            action=np.random.choice(num_actions,p=action_prob)
            next_state,reward,terminated,truncated,_=env.step(action)
            x_next=phi(next_state)
            v=np.dot(A_inv.T,(x-gamma*x_next))
            A_inv=A_inv-np.outer(np.dot(A_inv,x),v.T)/(1 + np.dot(v.T, x))
            b=b+reward*x
            w=np.dot(A_inv,b)
            state = next_state
            x = x_next
            
            if terminated or truncated:
                break
    
    return w, v_hat

In [8]:
def memory_based_nearest_neighbor(env, num_episodes=10000, gamma=1.0):
    num_actions = env.action_space.n
    num_states = env.observation_space.n
    
    # Memory to store (state, value) pairs
    memory = []  # List of tuples (state, value)
    
    # Random policy
    def random_policy():
        return np.ones(num_actions) / num_actions
    
    # Simulate episodes to collect training examples using Monte Carlo
    for episode in range(num_episodes):
        state, _ = env.reset()
        trajectory = [(state, 0)]  # (state, reward)
        
        # Generate a trajectory
        while True:
            action_probs = random_policy()
            action = np.random.choice(num_actions, p=action_probs)
            next_state, reward, terminated, truncated, _ = env.step(action)
            trajectory.append((next_state, reward))
            state = next_state
            if terminated or truncated:
                break
        
        # Compute returns (Monte Carlo) for each state in the trajectory
        G = 0
        for t in range(len(trajectory) - 2, -1, -1):
            state, reward = trajectory[t]
            G = reward + gamma * G
            # Store or update the (state, value) pair in memory
            # If state already exists, update its value (average for simplicity)
            found = False
            for i, (s, v) in enumerate(memory):
                if s == state:
                    memory[i] = (s, (v + G) / 2)  # Average the values
                    found = True
                    break
            if not found:
                memory.append((state, G))
    
    # Nearest neighbor value function estimate
    def v_hat(query_state):
        if not memory:
            return 0.0  # Default value if memory is empty
        
        # Find the nearest state in memory
        # For FrozenLake, use absolute difference as distance
        distances = [abs(query_state - s) for s, _ in memory]
        nearest_idx = np.argmin(distances)
        _, value = memory[nearest_idx]
        return value
    
    return memory, v_hat

In [None]:
import gymnasium as gym
import numpy as np

def n_step_semi_gradient_td_with_interest_emphasis(env, num_episodes=10000, alpha=0.01, n=3, gamma=1.0):
    num_actions = env.action_space.n
    num_states = env.observation_space.n
    d = num_states  # Feature dimension (one-hot encoding)
    
    # Initialize weights
    w = np.zeros(d)
    
    # Random policy
    def random_policy():
        return np.ones(num_actions) / num_actions
    
    # Feature representation: one-hot encoding
    def phi(state):
        features = np.zeros(d)
        features[state] = 1.0
        return features
    
    # Value function estimate
    def v_hat(state, w):
        return np.dot(w, phi(state))
    
    # Interest function: higher interest for states closer to the goal (state 15)
    def interest(state):
        distance_to_goal = abs(state - 15)  # Goal is state 15 in FrozenLake
        max_distance = 15  # Maximum possible distance in 4x4 grid
        return 1.0 - (distance_to_goal / max_distance)  # Linearly decreasing interest
    
    # Loop for each episode
    for episode in range(num_episodes):
        state, _ = env.reset()
        states = [state]
        rewards = [0]
        interests = [interest(state)]  # Store interest for each state
        emphases = [0] * n  # Store past n emphases for M_t computation
        
        T = float('inf')
        t = 0
        
        while True:
            if t < T:
                action_probs = random_policy()
                action = np.random.choice(num_actions, p=action_probs)
                next_state, reward, terminated, truncated, _ = env.step(action)
                
                states.append(next_state)
                rewards.append(reward)
                interests.append(interest(next_state))
                
                if terminated or truncated:
                    T = t + 1
            
            tau = t - n + 1
            if tau >= 0:
                # Compute emphasis M_t
                I_t = interests[tau]
                M_t = I_t
                if tau - n >= 0:
                    M_t += (gamma ** n) * emphases[tau % n]  # M_t = I_t + gamma^n * M_{t-n}
                emphases[tau % n] = M_t
                
                # Compute n-step return G
                G = 0
                for i in range(tau + 1, min(tau + n, T) + 1):
                    G += (gamma ** (i - tau - 1)) * rewards[i]
                if tau + n < T:
                    G += (gamma ** n) * v_hat(states[tau + n], w)
                
                # Update weights with emphasis
                state_tau = states[tau]
                grad_v = phi(state_tau)  # Gradient of v_hat is just phi for linear approximation
                w += alpha * M_t * (G - v_hat(state_tau, w)) * grad_v
            
            t += 1
            if tau == T - 1:
                break
    
    return w, v_hat



In [9]:
import gymnasium as gym
import numpy as np

def kernel_regression(env, num_episodes=10000, gamma=1.0):
    num_actions = env.action_space.n
    num_states = env.observation_space.n
    
    # Memory to store (state, value) pairs
    memory = []  # List of tuples (state, value)
    
    # Random policy
    def random_policy():
        return np.ones(num_actions) / num_actions
    
    # Kernel function: inverse distance for discrete states
    def kernel(s, s_prime):
        distance = abs(s - s_prime)
        return 1.0 / (1.0 + distance)  # Simple kernel: 1 / (1 + |s - s'|)

    # Simulate episodes to collect training examples using Monte Carlo
    for episode in range(num_episodes):
        state, _ = env.reset()
        trajectory = [(state, 0)]  # (state, reward)
        
        # Generate a trajectory
        while True:
            action_probs = random_policy()
            action = np.random.choice(num_actions, p=action_probs)
            next_state, reward, terminated, truncated, _ = env.step(action)
            trajectory.append((next_state, reward))
            state = next_state
            if terminated or truncated:
                break
        
        # Compute returns (Monte Carlo) for each state in the trajectory
        G = 0
        for t in range(len(trajectory) - 2, -1, -1):
            state, reward = trajectory[t]
            G = reward + gamma * G
            # Store or update the (state, value) pair in memory
            found = False
            for i, (s, v) in enumerate(memory):
                if s == state:
                    memory[i] = (s, (v + G) / 2)  # Average the values
                    found = True
                    break
            if not found:
                memory.append((state, G))
    
    # Kernel regression value function estimate
    def v_hat(state_to_query):  # Renamed parameter for clarity
        if not memory:
            return 0.0  # Default value if memory is empty
        
        # Compute weighted average using kernel function
        total_weight = 0.0
        weighted_sum = 0.0
        for stored_state, value in memory:
            weight = kernel(state_to_query, stored_state)
            weighted_sum += weight * value
            total_weight += weight
        
        if total_weight == 0:
            return 0.0  # Avoid division by zero
        return weighted_sum / total_weight  # Normalize by total weight
    
    return memory, v_hat



In [1]:
import gymnasium as gym
import numpy as np

def n_step_semi_gradient_td_with_interest_emphasis(env, num_episodes=10000, alpha=0.01, n=3, gamma=1.0):
    num_actions = env.action_space.n
    num_states = env.observation_space.n
    d = num_states  # Feature dimension (one-hot encoding)
    
    # Initialize weights
    w = np.zeros(d)
    
    # Random policy
    def random_policy():
        return np.ones(num_actions) / num_actions
    
    # Feature representation: one-hot encoding
    def phi(state):
        features = np.zeros(d)
        features[state] = 1.0
        return features
    
    # Value function estimate
    def v_hat(state, w):
        return np.dot(w, phi(state))
    
    # Interest function: higher interest for states closer to the goal (state 15)
    def interest(state):
        distance_to_goal = abs(state - 15)  # Goal is state 15 in FrozenLake
        max_distance = 15  # Maximum possible distance in 4x4 grid
        return 1.0 - (distance_to_goal / max_distance)  # Linearly decreasing interest
    
    # Loop for each episode
    for episode in range(num_episodes):
        state, _ = env.reset()
        states = [state]
        rewards = [0]
        interests = [interest(state)]  # Store interest for each state
        emphases = [0] * n  # Store past n emphases for M_t computation
        
        T = float('inf')
        t = 0
        
        while True:
            if t < T:
                action_probs = random_policy()
                action = np.random.choice(num_actions, p=action_probs)
                next_state, reward, terminated, truncated, _ = env.step(action)
                
                states.append(next_state)
                rewards.append(reward)
                interests.append(interest(next_state))
                
                if terminated or truncated:
                    T = t + 1
            
            tau = t - n + 1
            if tau >= 0:
                # Compute emphasis M_t
                I_t = interests[tau]
                M_t = I_t
                if tau - n >= 0:
                    M_t += (gamma ** n) * emphases[tau % n]  # M_t = I_t + gamma^n * M_{t-n}
                emphases[tau % n] = M_t
                
                # Compute n-step return G
                G = 0
                for i in range(tau + 1, min(tau + n, T) + 1):
                    G += (gamma ** (i - tau - 1)) * rewards[i]
                if tau + n < T:
                    G += (gamma ** n) * v_hat(states[tau + n], w)
                
                # Update weights with emphasis
                state_tau = states[tau]
                grad_v = phi(state_tau)  # Gradient of v_hat is just phi for linear approximation
                w += alpha * M_t * (G - v_hat(state_tau, w)) * grad_v
            
            t += 1
            if tau == T - 1:
                break
    
    return w, v_hat



# chapter 10

In [2]:
import gymnasium as gym
import numpy as np

def semi_gradient_sarsa(env, num_episodes=10000, alpha=0.01, gamma=1.0, epsilon=0.1):
    num_actions = env.action_space.n
    num_states = env.observation_space.n
    d = num_states * num_actions  # Feature dimension for state-action pairs
    
    # Initialize weights
    w = np.zeros(d)
    
    # Feature representation: one-hot encoding for state-action pair
    def phi(state, action):
        features = np.zeros(d)
        index = state * num_actions + action  # Unique index for (state, action)
        features[index] = 1.0
        return features
    
    # Action-value function estimate
    def q_hat(state, action, w):
        return np.dot(w, phi(state, action))
    
    # Epsilon-greedy policy
    def epsilon_greedy(state, w):
        if np.random.rand() < epsilon:
            return np.random.randint(num_actions)  # Explore
        else:
            q_values = [q_hat(state, a, w) for a in range(num_actions)]
            return np.argmax(q_values)  # Exploit
    
    # Loop for each episode
    for episode in range(num_episodes):
        state, _ = env.reset()
        action = epsilon_greedy(state, w)  # Initial action (ε-greedy)
        
        while True:
            # Take action, observe R, S'
            next_state, reward, terminated, truncated, _ = env.step(action)
            
            if terminated or truncated:
                # Update for terminal state: target is just the reward
                grad_q = phi(state, action)
                w += alpha * (reward - q_hat(state, action, w)) * grad_q
                break
            
            # Choose A' using ε-greedy policy
            next_action = epsilon_greedy(next_state, w)
            
            # Update weights
            target = reward + gamma * q_hat(next_state, next_action, w)
            grad_q = phi(state, action)
            w += alpha * (target - q_hat(state, action, w)) * grad_q
            
            # Update state and action
            state = next_state
            action = next_action
    
    return w, q_hat



In [3]:
import gymnasium as gym
import numpy as np

def n_step_semi_gradient_sarsa(env, num_episodes=10000, alpha=0.01, gamma=1.0, epsilon=0.1, n=3):
    num_actions = env.action_space.n
    num_states = env.observation_space.n
    d = num_states * num_actions  # Feature dimension for state-action pairs
    
    # Initialize weights
    w = np.zeros(d)
    
    # Feature representation: one-hot encoding for state-action pair
    def phi(state, action):
        features = np.zeros(d)
        index = state * num_actions + action  # Unique index for (state, action)
        features[index] = 1.0
        return features
    
    # Action-value function estimate
    def q_hat(state, action, w):
        return np.dot(w, phi(state, action))
    
    # Epsilon-greedy policy
    def epsilon_greedy(state, w):
        if np.random.rand() < epsilon:
            return np.random.randint(num_actions)  # Explore
        else:
            q_values = [q_hat(state, a, w) for a in range(num_actions)]
            return np.argmax(q_values)  # Exploit
    
    # Loop for each episode
    for episode in range(num_episodes):
        state, _ = env.reset()
        action = epsilon_greedy(state, w)  # Initial action A_0
        
        # Circular buffers for storing S_t, A_t, R_t (mod n+1)
        buffer_size = n + 1
        states = [0] * buffer_size  # S_t
        actions = [0] * buffer_size  # A_t
        rewards = [0] * buffer_size  # R_t
        states[0] = state
        actions[0] = action
        
        T = float('inf')
        t = 0
        
        while True:
            if t < T:
                # Take action A_t, observe R_{t+1}, S_{t+1}
                next_state, reward, terminated, truncated, _ = env.step(actions[t % buffer_size])
                states[(t + 1) % buffer_size] = next_state
                rewards[(t + 1) % buffer_size] = reward
                
                if terminated or truncated:
                    T = t + 1
                else:
                    # Select A_{t+1} using ε-greedy
                    next_action = epsilon_greedy(next_state, w)
                    actions[(t + 1) % buffer_size] = next_action
            
            # Time to update (τ = t - n + 1)
            tau = t - n + 1
            if tau >= 0:
                # Compute n-step return G_{τ:τ+n}
                G = 0
                for i in range(tau + 1, min(tau + n, T) + 1):
                    G += (gamma ** (i - tau - 1)) * rewards[i % buffer_size]
                if tau + n < T:
                    G += (gamma ** n) * q_hat(states[(tau + n) % buffer_size], actions[(tau + n) % buffer_size], w)
                
                # Update weights
                state_tau = states[tau % buffer_size]
                action_tau = actions[tau % buffer_size]
                grad_q = phi(state_tau, action_tau)
                w += alpha * (G - q_hat(state_tau, action_tau, w)) * grad_q
            
            t += 1
            if tau == T - 1:
                break
    
    return w, q_hat



In [4]:
import gymnasium as gym
import numpy as np

def differential_semi_gradient_sarsa(env, num_steps=100000, alpha=0.01, beta=0.01, epsilon=0.1, gamma=1.0):
    num_actions = env.action_space.n
    num_states = env.observation_space.n
    d = num_states * num_actions  # Feature dimension for state-action pairs
    
    # Initialize weights and average reward
    w = np.zeros(d)  # Value-function weights
    avg_reward = 0.0  # R̄ (average reward estimate)
    
    # Feature representation: one-hot encoding for state-action pair
    def phi(state, action):
        features = np.zeros(d)
        index = state * num_actions + action  # Unique index for (state, action)
        features[index] = 1.0
        return features
    
    # Action-value function estimate
    def q_hat(state, action, w):
        return np.dot(w, phi(state, action))
    
    # Epsilon-greedy policy
    def epsilon_greedy(state, w):
        if np.random.rand() < epsilon:
            return np.random.randint(num_actions)  # Explore
        else:
            q_values = [q_hat(state, a, w) for a in range(num_actions)]
            return np.argmax(q_values)  # Exploit
    
    # Initialize state and action
    state, _ = env.reset()
    action = epsilon_greedy(state, w)
    
    # Loop for each step
    for step in range(num_steps):
        # Take action, observe R, S'
        next_state, reward, terminated, truncated, _ = env.step(action)
        
        # If terminal, reset the environment (treat as continuing task)
        if terminated or truncated:
            next_state, _ = env.reset()
        
        # Choose A' using ε-greedy policy
        next_action = epsilon_greedy(next_state, w)
        
        # Update average reward R̄
        delta = reward - avg_reward + q_hat(next_state, next_action, w) - q_hat(state, action, w)
        avg_reward += beta * delta
        
        # Update weights
        grad_q = phi(state, action)
        w += alpha * delta * grad_q
        
        # Update state and action
        state = next_state
        action = next_action
    
    return w, q_hat, avg_reward


In [5]:
import gymnasium as gym
import numpy as np

def differential_n_step_semi_gradient_sarsa(env, num_steps=100000, alpha=0.01, beta=0.01, epsilon=0.1, gamma=1.0, n=3):
    num_actions = env.action_space.n
    num_states = env.observation_space.n
    d = num_states * num_actions  # Feature dimension for state-action pairs
    
    # Initialize weights and average reward
    w = np.zeros(d)  # Value-function weights
    avg_reward = 0.0  # R̄ (average reward estimate)
    
    # Feature representation: one-hot encoding for state-action pair
    def phi(state, action):
        features = np.zeros(d)
        index = state * num_actions + action  # Unique index for (state, action)
        features[index] = 1.0
        return features
    
    # Action-value function estimate
    def q_hat(state, action, w):
        return np.dot(w, phi(state, action))
    
    # Epsilon-greedy policy
    def epsilon_greedy(state, w):
        if np.random.rand() < epsilon:
            return np.random.randint(num_actions)  # Explore
        else:
            q_values = [q_hat(state, a, w) for a in range(num_actions)]
            return np.argmax(q_values)  # Exploit
    
    # Initialize state and action
    state, _ = env.reset()
    action = epsilon_greedy(state, w)
    
    # Circular buffers for storing S_t, A_t, R_t (mod n+1)
    buffer_size = n + 1
    states = [0] * buffer_size  # S_t
    actions = [0] * buffer_size  # A_t
    rewards = [0] * buffer_size  # R_t
    states[0] = state
    actions[0] = action
    
    # Loop for each step
    t = 0
    while t < num_steps:
        # Take action A_t, observe R_{t+1}, S_{t+1}
        next_state, reward, terminated, truncated, _ = env.step(actions[t % buffer_size])
        
        # If terminal, reset the environment (treat as continuing task)
        if terminated or truncated:
            next_state, _ = env.reset()
        
        states[(t + 1) % buffer_size] = next_state
        rewards[(t + 1) % buffer_size] = reward
        
        # Select A_{t+1} using ε-greedy
        next_action = epsilon_greedy(next_state, w)
        actions[(t + 1) % buffer_size] = next_action
        
        # Time to update (τ = t - n + 1)
        tau = t - n + 1
        if tau >= 0:
            # Compute n-step differential return
            delta = 0
            for i in range(tau + 1, tau + n + 1):
                delta += rewards[i % buffer_size] - avg_reward
            delta += q_hat(states[(tau + n) % buffer_size], actions[(tau + n) % buffer_size], w) - q_hat(states[tau % buffer_size], actions[tau % buffer_size], w)
            
            # Update average reward R̄
            avg_reward += beta * delta
            
            # Update weights
            grad_q = phi(states[tau % buffer_size], actions[tau % buffer_size])
            w += alpha * delta * grad_q
        
        t += 1
    
    return w, q_hat, avg_reward



In [6]:
import gymnasium as gym
import numpy as np

def differential_n_step_semi_gradient_sarsa(env, num_steps=200000, alpha=0.05, beta=0.05, epsilon=0.3, gamma=0.9, n=3):
    num_actions = env.action_space.n
    num_states = env.observation_space.n
    d = num_states * num_actions  # Feature dimension for state-action pairs
    
    # Initialize weights and average reward
    w = np.zeros(d)  # Value-function weights
    avg_reward = 0.0  # R̄ (average reward estimate)
    
    # Feature representation: one-hot encoding for state-action pair
    def phi(state, action):
        features = np.zeros(d)
        index = state * num_actions + action  # Unique index for (state, action)
        features[index] = 1.0
        return features
    
    # Action-value function estimate
    def q_hat(state, action, w):
        return np.dot(w, phi(state, action))
    
    # Epsilon-greedy policy
    def epsilon_greedy(state, w):
        if np.random.rand() < epsilon:
            return np.random.randint(num_actions)  # Explore
        else:
            q_values = [q_hat(state, a, w) for a in range(num_actions)]
            return np.argmax(q_values)  # Exploit
    
    # Initialize state and action
    state, _ = env.reset()
    action = epsilon_greedy(state, w)
    
    # Circular buffers for storing S_t, A_t, R_t (mod n+1)
    buffer_size = n + 1
    states = [0] * buffer_size  # S_t
    actions = [0] * buffer_size  # A_t
    rewards = [0] * buffer_size  # R_t
    terminated_flags = [False] * buffer_size  # Track termination
    states[0] = state
    actions[0] = action
    
    # Loop for each step
    t = 0
    while t < num_steps:
        # Take action A_t, observe R_{t+1}, S_{t+1}
        next_state, reward, terminated, truncated, _ = env.step(actions[t % buffer_size])
        
        # If terminal, reset the environment (treat as continuing task)
        if terminated or truncated:
            next_state, _ = env.reset()
        
        states[(t + 1) % buffer_size] = next_state
        rewards[(t + 1) % buffer_size] = reward
        terminated_flags[(t + 1) % buffer_size] = terminated or truncated
        
        # Select A_{t+1} using ε-greedy
        next_action = epsilon_greedy(next_state, w)
        actions[(t + 1) % buffer_size] = next_action
        
        # Time to update (τ = t - n + 1)
        tau = t - n + 1
        if tau >= 0:
            # Compute n-step differential return
            delta = 0
            for i in range(tau + 1, tau + n + 1):
                delta += gamma ** (i - tau - 1) * (rewards[i % buffer_size] - avg_reward)
                if terminated_flags[i % buffer_size]:  # If termination within n steps, stop
                    break
            else:
                # If no termination within n steps, add the q-value of the next state-action
                delta += (gamma ** n) * q_hat(states[(tau + n) % buffer_size], actions[(tau + n) % buffer_size], w)
            
            delta -= q_hat(states[tau % buffer_size], actions[tau % buffer_size], w)
            
            # Update average reward R̄
            avg_reward += beta * delta
            
            # Update weights
            grad_q = phi(states[tau % buffer_size], actions[tau % buffer_size])
            w += alpha * delta * grad_q
        
        t += 1
    
    return w, q_hat, avg_reward

def main():
    env = gym.make("FrozenLake-v1", is_slippery=False)
    w, q_hat, avg_reward = differential_n_step_semi_gradient_sarsa(env, num_steps=200000, alpha=0.05, beta=0.05, epsilon=0.3, gamma=0.9, n=3)
    
    print(f"Average Reward Estimate: {avg_reward:.2f}")
    print("Estimated Action-Value Function (Differential n-step SARSA, ε-greedy):")
    for i in range(4):
        for a in range(4):  # 4 actions: Left, Down, Right, Up
            row = [f"{q_hat(i*4 + j, a, w):.2f}" for j in range(4)]
            print(f"Action {a}: {' '.join(row)}")

if __name__ == "__main__":
    main()

Average Reward Estimate: 0.11
Estimated Action-Value Function (Differential n-step SARSA, ε-greedy):
Action 0: -0.36 -0.35 -0.21 -0.11
Action 1: -0.27 -0.13 0.01 -0.10
Action 2: -0.28 -0.14 -0.20 -0.13
Action 3: -0.37 -0.23 -0.14 -0.15
Action 0: -0.26 0.00 -0.09 0.00
Action 1: -0.16 0.00 0.28 0.00
Action 2: -0.13 0.00 -0.09 0.00
Action 3: -0.32 0.00 -0.02 0.00
Action 0: -0.08 -0.00 0.18 0.00
Action 1: -0.12 0.18 0.53 0.00
Action 2: 0.03 0.25 -0.07 0.00
Action 3: -0.19 -0.09 0.16 0.00
Action 0: 0.00 -0.07 0.40 0.00
Action 1: 0.00 0.31 0.63 0.00
Action 2: 0.00 0.53 0.85 0.00
Action 3: 0.00 0.19 0.42 0.00


# 11

In [7]:
import gymnasium as gym
import numpy as np

def tdc(env, num_steps=100000, alpha=0.005, beta=0.05, epsilon=0.1, gamma=0.9):
    num_states = env.observation_space.n
    num_actions = env.action_space.n
    d = num_states  # Feature dimension (one-hot encoding for states)

    # Initialize weights and auxiliary vector
    w = np.zeros(d)  # For v(s, w)
    v = np.zeros(d)  # Auxiliary vector for gradient correction

    # Feature representation: one-hot encoding for states
    def phi(state):
        features = np.zeros(d)
        features[state] = 1.0
        return features

    # Value function estimate
    def v_hat(state, w):
        return np.dot(w, phi(state))

    # Policies
    def behavior_policy():
        return np.random.randint(num_actions)  # Random policy

    def target_policy(state, w):
        if np.random.rand() < epsilon:
            return np.random.randint(num_actions)
        q_values = [sum(v_hat(next_state, w) for next_state, prob in env.P[state][a].items()) for a in range(num_actions)]
        return np.argmax(q_values)

    # Initialize state
    state, _ = env.reset()

    # Loop for each step
    for t in range(num_steps):
        # Take action using behavior policy
        action = behavior_policy()
        next_state, reward, terminated, truncated, _ = env.step(action)

        # If terminal, reset environment
        if terminated or truncated:
            next_state, _ = env.reset()

        # Compute importance sampling ratio
        pi_a = 1.0 if action == target_policy(state, w) else 0.0  # ε-greedy target policy
        b_a = 1.0 / num_actions  # Random behavior policy
        rho = pi_a / b_a

        # Features
        x_t = phi(state)
        x_t_next = phi(next_state)

        # TD error
        delta = reward + gamma * v_hat(next_state, w) - v_hat(state, w)

        # Update auxiliary vector v (LMS rule)
        v += beta * rho * (delta - np.dot(v, x_t)) * x_t

        # Update weights w (TDC update)
        w += alpha * rho * (delta * x_t - x_t_next * np.dot(x_t, v))

        # Update state
        state = next_state

    return w, v_hat



In [8]:
import gymnasium as gym
import numpy as np

def emphatic_td(env, num_steps=100000, alpha=0.03, gamma=0.9, epsilon=0.1):
    num_states = env.observation_space.n
    num_actions = env.action_space.n
    d = num_states  # Feature dimension (one-hot encoding for states)

    # Initialize weights
    w = np.zeros(d)  # For v(s, w)

    # Feature representation: one-hot encoding for states
    def phi(state):
        features = np.zeros(d)
        features[state] = 1.0
        return features

    # Value function estimate
    def v_hat(state, w):
        return np.dot(w, phi(state))

    # Policies
    def behavior_policy():
        return np.random.randint(num_actions)  # Random policy

    def target_policy(state, w):
        if np.random.rand() < epsilon:
            return np.random.randint(num_actions)
        q_values = [sum(v_hat(next_state, w) for next_state, prob in env.P[state][a].items()) for a in range(num_actions)]
        return np.argmax(q_values)

    # Initialize state and emphasis
    state, _ = env.reset()
    M = 0.0  # Emphasis M_t, initialized as M_{-1} = 0
    rho_prev = 1.0  # Initial rho_{t-1} for t=0

    # Loop for each step
    for t in range(num_steps):
        # Take action using behavior policy
        action = behavior_policy()
        next_state, reward, terminated, truncated, _ = env.step(action)

        # If terminal, reset environment and emphasis
        if terminated or truncated:
            next_state, _ = env.reset()
            M = 0.0  # Reset emphasis on episode boundary
            rho_prev = 1.0  # Reset rho for new episode

        # Compute importance sampling ratio
        pi_a = 1.0 if action == target_policy(state, w) else 0.0  # ε-greedy target policy
        b_a = 1.0 / num_actions  # Random behavior policy
        rho = pi_a / b_a

        # Features
        x_t = phi(state)
        x_t_next = phi(next_state)

        # TD error
        delta = reward + gamma * v_hat(next_state, w) - v_hat(state, w)

        # Interest (set to 1 for simplicity, as in the book's example)
        I_t = 1.0

        # Update emphasis
        M = gamma * rho_prev * M + I_t

        # Update weights
        w += alpha * M * rho * delta * x_t

        # Update state and rho for next iteration
        state = next_state
        rho_prev = rho

    return w, v_hat



In [10]:
import gym
import torch
import torch.nn as nn
import torch.optim as optim

# Create the environment
env = gym.make("CartPole-v1")

# Define a simple neural network for value function approximation
class ValueNetwork(nn.Module):
    def __init__(self, input_dim):
        super(ValueNetwork, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 1)
        )

    def forward(self, x):
        return self.fc(x)

# Initialize model, optimizer, loss
obs_dim = env.observation_space.shape[0]
model = ValueNetwork(obs_dim)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.MSELoss()

# Training loop
episodes = 500
gamma = 0.99

for episode in range(episodes):
    state, _ = env.reset()
    state = torch.tensor(state, dtype=torch.float32)
    done = False
    total_reward = 0

    while not done:
        # Select a random action
        action = env.action_space.sample()

        # Step in the environment
        next_state, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        next_state_tensor = torch.tensor(next_state, dtype=torch.float32)

        # Compute target using TD(0)
        with torch.no_grad():
            target = reward
            if not done:
                target += gamma * model(next_state_tensor).item()
            target = torch.tensor([target], dtype=torch.float32)

        # Predict current value
        value = model(state)

        # Compute loss and update
        loss = loss_fn(value, target)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Move to next state
        state = next_state_tensor
        total_reward += reward

    if (episode + 1) % 50 == 0:
        print(f"Episode {episode+1}, Total Reward: {total_reward}")

env.close()


  if not isinstance(terminated, (bool, np.bool8)):


Episode 50, Total Reward: 10.0
Episode 100, Total Reward: 27.0
Episode 150, Total Reward: 12.0
Episode 200, Total Reward: 19.0
Episode 250, Total Reward: 12.0
Episode 300, Total Reward: 44.0
Episode 350, Total Reward: 18.0
Episode 400, Total Reward: 31.0
Episode 450, Total Reward: 27.0
Episode 500, Total Reward: 30.0


# chapter 12

In [12]:
def offline_lambda_return(env,num_episodes=10000,alpha=0.1,gamma=0.9,lambda_=0.9,epsilon=0.1):
    num_actions=env.action_space.n
    num_states=env.observation_space.n
    d=num_states
    w=np.zeros(d)

    def phi(state):
        features=np.zeros(d)
        features[state]=1
        return features
    def v_hat(state,w):
        return np.dot(w,phi(state))
    def epsilon_greedy(state,w):
        if np.random.rand()<epsilon:
            return np.random.randit(num_actions)
        q_values=[sum(v_hat(next_state,w) for next_state,p in env.P[state][a].items()) for a in range(num_actions)]
        return np.argmax(q_values)

    for episode in range(num_episodes):
        trajectory=[]
        state,_=env.reset()
        done=False
        while not done:
            action=epsilon_greedy(state,w)
            next_state,reward,terminated,trucated,_=env.step(action)
            trajectory.append((state,action,reward,next_state))
            state=next_state
            done=termminated or truncated
        T=len(trajectory)
        G_lambda=np.zeros(T)
        G=np.zeros(T)
        for t in range(T-1,-1,-1):
            _,_,reward,next_state=trajectory[t]
            if t==(T-1):
                G[t]=reward
            else:
                G[t]=reward+gamma*G[t+1]
        for t in range(t):
            sum_n_step=0.0
            for n in range(1,T-t):
                G_t_to_tn=0.0
                for i in range(t,t+n):
                    _,_,reward,_=trajectory[i]
                    G_t_to_tn += (gamma ** (i-t)) * reward
                if t+n < T:
                    _, _, _, next_state = trajectory[t+n-1]
                    G_t_to_tn += (gamma ** n) * v_hat(next_state, w)
                sum_n_step += (lambda_ ** (n-1)) * G_t_to_tn

            # Add the Monte Carlo term for n ≥ T-t
            G_lambda[t] = (1 - lambda_) * sum_n_step + (lambda_ ** (T-t-1)) * G[t]

        # Update weights for each t using the λ-return
        for t in range(T):
            state, _, _, _ = trajectory[t]
            x_t = phi(state)
            w += alpha * (G_lambda[t] - v_hat(state, w)) * x_t
            

In [14]:
def semi_gradient_td_lambda(env,num_episodes=10000,alpha=0.1,gamma=0.9,lamnda=0.1,epsilon=0.1):
    num_actions=env.observation_space.n
    num_states=env.action_space.n
    d=num_states
    w=np.zeros(d)
    def phi(state):
        features=np.zeros(d)
        features[state]=1.0
        return features
    def v_hat(state, w):
        # Ensure terminal states have value 0
        if state == 15:  # Terminal state in FrozenLake (goal state)
            return 0.0
        return np.dot(w, phi(state))
    def epsilon_greedy(state, w):
        if np.random.rand() < epsilon:
            return np.random.randint(num_actions)
        q_values = [sum(v_hat(next_state, w) for next_state, prob in env.P[state][a].items()) for a in range(num_actions)]
        return np.argmax(q_values)

    # Main loop over episodes
    for episode in range(num_action):
        state,_=env.reset()
        z=np.zeros(d)
        done=False
        while not done:
            action=epsilon_greedy(state,w)
            next_state,reward,terminated,truncated,_=env.step(action)
            done=terminated or truncated
            x_t=phi(state)
            x_t_next=phi(next_state)
            z = gamma * lambda_ * z + x_t  # For linear FA, ∇v̂(S, w) = x(S)

            # Compute TD error: δ = R + γv̂(S', w) - v̂(S, w)
            delta = reward + gamma * v_hat(next_state, w) - v_hat(state, w)

            # Update weights: w ← w + αδz
            w += alpha * delta * z

            # Move to next state
            state = next_state

    return w, v_hat

In [15]:
def true_online_td_lambda(env,num_episodes=10000,alpha=0.1,gamma=0.0,epsilon=0.1,lambda_=0.1):
    num_actions=env.observation_space.n
    num_states=env.action_space.n
    d=num_states
    w=np.zeros(d)
    def x(state):
        # Ensure terminal states have feature vector 0
        if state == 15:  # Terminal state in FrozenLake (goal state)
            return np.zeros(d)
        features = np.zeros(d)
        features[state] = 1.0
        return features
    def v_hat(state,w):
        return np.dot(w,x(state))
    def epsilon_greedy(state, w):
        if np.random.rand() < epsilon:
            return np.random.randint(num_actions)
        q_values = [sum(v_hat(next_state, w) for next_state, prob in env.P[state][a].items()) for a in range(num_actions)]
        return np.argmax(q_values)
    for episode in range(num_episodes):
        state,_=env.reset()
        x_t=x(state)
        z=np.zeros(d)
        V_old=0.0 # Temporary scalar
        done=False
        while not done:
            action= epsilon_greedy(state,w)
            next_state,reward,terminated,truncated,_=env.step(action)
            done=terminated or truncated
            x_t_next=x(next_state)
            # Compute values and TD error
            V = np.dot(w, x_t)  # V = w^T x
            V_next = np.dot(w, x_t_next)  # V' = w^T x'
            delta = reward + gamma * V_next - V  # δ = R + γV' - V

            # Update eligibility trace: z ← γλz + (1 - α z^T x) x
            z = gamma * lambda_ * z + (1 - alpha * np.dot(z, x_t)) * x_t

            # Update weights: w ← w + α(δ + V - V_old)z - α(V - V_old)x
            w += alpha * (delta + V - V_old) * z - alpha * (V - V_old) * x_t

            # Update V_old and x
            V_old = V_next
            x_t = x_t_next
            state = next_state

            # Check for terminal state (x' = 0)
            if np.all(x_t == 0):
                break

    return w, v_hat
        

In [16]:
def sarsa_lambda(env,num_episodes=10000,alpha=0.1,gamma=0.9,epsilon=0.1,lambda_=0.9):
    num_states=env.observation_space.n
    num_actions=env.oservation_space.n
    d=num_actions*num_states
    w=np.zeros(d)
    z=np.zeros(d)
    def F(s,a):
        return [s*num_actions+a]
    def q_hat(s,a,w):
        active_features=F(s,a)
        return sum(w[i] for i in active_features)
    def epsilon_greedy(state,w):
        if np.random.rand()<epsilon:
            return np.random.randit(num_actions)
        q_values=[q_hat(state,a,w) for a in range(num_actions)]
        return np.argmax(q_values)
    for episode in range(num_episodes):
        state,_=env.reset()
        action=epsilon_greedy(state,w)
        z=np.zeros(d)
        done=False
        while not done:
            next_state,reward,terminated,truncated,_=env.step(action)
            done=terminated or truncated
            q_current=q_hat(state,action,w)
            if done:
                delta=reward-q_current
                next_action=None
            else:
                next_action=epsilon_greedy(next_state,w)
                q_next=q_hat(next_state,next_action,w)
                delta=reward+gamma*q_next-q_current
            for i in F(state, action):
                z[i] = gamma * lambda_ * z[i] + 1  # Accumulating traces

            # Update weights
            w += alpha * delta * z

            if done:
                break

            # Decay eligibility trace
            z = gamma * lambda_ * z

            # Update state and action
            state = next_state
            action = next_action

    return w, q_hat

# chapter 13

In [17]:
def reinforce(env,num_episodes=1000,alpha=0.1,gamma=0.9):
    num_states=np.observation_space.n
    num_actions=np.action_space.n
    d=num_states*num_actions
    theta=np.zeros(d)
    # Feature function φ(s, a): one-hot encoding for state-action pairs
    def phi(s, a):
        features = np.zeros(d)
        features[s * num_actions + a] = 1.0
        return features
    # Softmax policy π(a|s, θ)
    def policy(s, theta):
        scores = np.array([np.dot(theta, phi(s, a)) for a in range(num_actions)])
        exp_scores = np.exp(scores - np.max(scores))  # Subtract max for numerical stability
        probs = exp_scores / np.sum(exp_scores)
        return probs
    def sample_action(s,theta):
        probs=policy(s,theta)
        return np.random.choice(num_actions,p=probs)
    # Gradient of log π(a|s, θ)
    def grad_log_pi(s, a, theta):
        # ∇_θ ln π(a|s, θ) = φ(s, a) - Σ_b π(b|s, θ) φ(s, b)
        x_sa=phi(s,a)
        probs=policy(s,theta)
        expected_phi=np.zeros(d)
        for b in range(num_actions):
            expected_phi+=probs[b]*phi(s,a)
        return x_sa-expected_phi
    for episode in range(num_episodes):
        trajectory=[]
        state,_=env.reset()
        done=False
        while not done:
            action=sample_actions(s,theta)
            next_state,reward,terminated,truncated,_=env.step(action)
            trajectory.append((state,action,reward))
            state=next_state
            done = terminated or truncated

        # Compute returns and update θ
        T = len(trajectory)
        G = 0
        for t in range(T-1, -1, -1):
            state, action, reward = trajectory[t]
            G = reward + gamma * G  # Compute return G_t
            # Update θ: θ ← θ + α G_t ∇_θ ln π(A_t|S_t, θ)
            grad = grad_log_pi(state, action, theta)
            theta += alpha * G * grad

    return theta, policy

In [18]:
def reinforce_with_baseline(env, num_episodes=5000, alpha_theta=0.01, alpha_w=0.1, gamma=0.9):
    num_states = env.observation_space.n
    num_actions = env.action_space.n
    d_policy = num_states * num_actions  # Feature dimension for policy (state-action pairs)
    d_value = num_states  # Feature dimension for value function (states)

    # Initialize policy and value function parameters
    theta = np.zeros(d_policy)  # For π(a|s, θ)
    w = np.zeros(d_value)  # For v(s, w)

    # Feature function φ(s, a) for policy: one-hot encoding for state-action pairs
    def phi(s, a):
        features = np.zeros(d_policy)
        features[s * num_actions + a] = 1.0
        return features

    # Feature function x(s) for value function: one-hot encoding for states
    def x(s):
        features = np.zeros(d_value)
        features[s] = 1.0
        return features

    # Softmax policy π(a|s, θ)
    def policy(s, theta):
        scores = np.array([np.dot(theta, phi(s, a)) for a in range(num_actions)])
        exp_scores = np.exp(scores - np.max(scores))  # Subtract max for numerical stability
        probs = exp_scores / np.sum(exp_scores)
        return probs

    # Sample an action from the policy
    def sample_action(s, theta):
        probs = policy(s, theta)
        return np.random.choice(num_actions, p=probs)

    # Gradient of log π(a|s, θ)
    def grad_log_pi(s, a, theta):
        x_sa = phi(s, a)
        probs = policy(s, theta)
        expected_phi = np.zeros(d_policy)
        for b in range(num_actions):
            expected_phi += probs[b] * phi(s, b)
        return x_sa - expected_phi

    # State-value function v(s, w)
    def v_hat(s, w):
        return np.dot(w, x(s))

    # Main loop over episodes
    for episode in range(num_episodes):
        # Generate an episode
        trajectory = []
        state, _ = env.reset()
        done = False

        while not done:
            action = sample_action(state, theta)
            next_state, reward, terminated, truncated, _ = env.step(action)
            trajectory.append((state, action, reward))
            state = next_state
            done = terminated or truncated

        # Update parameters for each step
        T = len(trajectory)
        G = 0
        for t in range(T-1, -1, -1):
            state, action, reward = trajectory[t]
            G = reward + gamma * G  # Compute return G_t

            # Compute advantage: δ = G_t - v̂(S_t, w)
            delta = G - v_hat(state, w)

            # Update value function: w ← w + α_w δ ∇v̂(S_t, w)
            w += alpha_w * delta * x(state)

            # Update policy: θ ← θ + α_θ δ ∇ln π(A_t|S_t, θ)
            grad = grad_log_pi(state, action, theta)
            theta += alpha_theta * delta * grad

    return theta, policy, w, v_hat

In [21]:
import gymnasium as gym
import numpy as np

def one_step_actor_critic(env, num_episodes=1000, alpha_theta=0.01, alpha_w=0.1, gamma=0.9):
    num_states = env.observation_space.n
    num_actions = env.action_space.n
    d_policy = num_states * num_actions  # Feature dimension for policy (state-action pairs)
    d_value = num_states  # Feature dimension for value function (states)

    # Initialize policy and value function parameters
    theta = np.zeros(d_policy)  # For π(a|s, θ)
    w = np.zeros(d_value)  # For v(s, w)

    # Feature function φ(s, a) for policy: one-hot encoding for state-action pairs
    def phi(s, a):
        features = np.zeros(d_policy)
        features[s * num_actions + a] = 1.0
        return features

    # Feature function x(s) for value function: one-hot encoding for states
    def x(s):
        features = np.zeros(d_value)
        features[s] = 1.0
        return features

    # Softmax policy π(a|s, θ)
    def policy(s, theta):
        scores = np.array([np.dot(theta, phi(s, a)) for a in range(num_actions)])
        exp_scores = np.exp(scores - np.max(scores))  # Subtract max for numerical stability
        probs = exp_scores / np.sum(exp_scores)
        return probs

    # Sample an action from the policy
    def sample_action(s, theta):
        probs = policy(s, theta)
        return np.random.choice(num_actions, p=probs)

    # Gradient of log π(a|s, θ)
    def grad_log_pi(s, a, theta):
        x_sa = phi(s, a)
        probs = policy(s, theta)
        expected_phi = np.zeros(d_policy)
        for b in range(num_actions):
            expected_phi += probs[b] * phi(s, b)
        return x_sa - expected_phi

    # State-value function v(s, w)
    def v_hat(s, w):
        return np.dot(w, x(s))

    # Main loop over episodes
    for episode in range(num_episodes):
        # Initialize state and I
        state, _ = env.reset()
        I = 1.0  # Discounting factor for policy updates

        # Loop until episode terminates
        done = False
        while not done:
            # Sample action
            action = sample_action(state, theta)

            # Take action, observe reward and next state
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            # Compute TD error: δ = R + γ v̂(S', w) - v̂(S, w)
            v_current = v_hat(state, w)
            v_next = 0.0 if done else v_hat(next_state, w)  # v̂(S', w) = 0 if terminal
            delta = reward + gamma * v_next - v_current

            # Update critic: w ← w + α_w δ ∇v̂(S, w)
            w += alpha_w * delta * x(state)

            # Update actor: θ ← θ + α_θ I δ ∇ln π(A|S, θ)
            grad = grad_log_pi(state, action, theta)
            theta += alpha_theta * I * delta * grad

            # Update I and state
            I *= gamma
            state = next_state

    return theta, policy, w, v_hat

def main():
    env = gym.make("FrozenLake-v1", is_slippery=False)
    # Unwrap the environment to access the underlying FrozenLake environment
    env = env.unwrapped
    # Fix terminal state transitions for state 15
    env.P[15] = {a: {(15, 0.0, True, False): 1.0} for a in range(4)}

    theta, policy, w, v_hat = one_step_actor_critic(env, num_episodes=100000, alpha_theta=0.01, alpha_w=0.1, gamma=0.9)

    # Print the learned policy
    print("Learned Policy (action probabilities):")
    for s in range(16):
        probs = policy(s, theta)
        print(f"State {s}: ", end="")
        for a in range(4):
            print(f"Action {a}: {probs[a]:.3f} ", end="")
        print()

    # Print the learned state-value function
    print("\nLearned State-Value Function:")
    for i in range(4):
        row = [f"{v_hat(i*4 + j, w):.2f}" for j in range(4)]
        print(f"Row {i}: {' '.join(row)}")

    # Test the policy
    total_reward = 0
    num_test_episodes = 100
    for _ in range(num_test_episodes):
        state, _ = env.reset()
        done = False
        episode_reward = 0
        while not done:
            action = np.argmax(policy(state, theta))  # Greedy action for testing
            state, reward, terminated, truncated, _ = env.step(action)
            episode_reward += reward
            done = terminated or truncated
        total_reward += episode_reward
    print(f"\nAverage reward over {num_test_episodes} test episodes: {total_reward / num_test_episodes:.2f}")

if __name__ == "__main__":
    main()

Learned Policy (action probabilities):
State 0: Action 0: 0.005 Action 1: 0.990 Action 2: 0.001 Action 3: 0.004 
State 1: Action 0: 0.390 Action 1: 0.126 Action 2: 0.271 Action 3: 0.212 
State 2: Action 0: 0.212 Action 1: 0.372 Action 2: 0.187 Action 3: 0.229 
State 3: Action 0: 0.268 Action 1: 0.236 Action 2: 0.248 Action 3: 0.248 
State 4: Action 0: 0.004 Action 1: 0.993 Action 2: 0.001 Action 3: 0.002 
State 5: Action 0: 0.250 Action 1: 0.250 Action 2: 0.250 Action 3: 0.250 
State 6: Action 0: 0.152 Action 1: 0.520 Action 2: 0.151 Action 3: 0.177 
State 7: Action 0: 0.250 Action 1: 0.250 Action 2: 0.250 Action 3: 0.250 
State 8: Action 0: 0.004 Action 1: 0.001 Action 2: 0.993 Action 3: 0.002 
State 9: Action 0: 0.003 Action 1: 0.962 Action 2: 0.034 Action 3: 0.001 
State 10: Action 0: 0.014 Action 1: 0.973 Action 2: 0.005 Action 3: 0.008 
State 11: Action 0: 0.250 Action 1: 0.250 Action 2: 0.250 Action 3: 0.250 
State 12: Action 0: 0.250 Action 1: 0.250 Action 2: 0.250 Action 3: 0.2

In [22]:
import gymnasium as gym
import numpy as np

def actor_critic_with_traces(env, num_episodes=1000, alpha_theta=0.01, alpha_w=0.1, gamma=0.9, lambda_theta=0.8, lambda_w=0.8):
    num_states = env.observation_space.n
    num_actions = env.action_space.n
    d_policy = num_states * num_actions  # Feature dimension for policy (state-action pairs)
    d_value = num_states  # Feature dimension for value function (states)

    # Initialize policy and value function parameters
    theta = np.zeros(d_policy)  # For π(a|s, θ)
    w = np.zeros(d_value)  # For v(s, w)

    # Feature function φ(s, a) for policy: one-hot encoding for state-action pairs
    def phi(s, a):
        features = np.zeros(d_policy)
        features[s * num_actions + a] = 1.0
        return features

    # Feature function x(s) for value function: one-hot encoding for states
    def x(s):
        features = np.zeros(d_value)
        features[s] = 1.0
        return features

    # Softmax policy π(a|s, θ)
    def policy(s, theta):
        scores = np.array([np.dot(theta, phi(s, a)) for a in range(num_actions)])
        exp_scores = np.exp(scores - np.max(scores))  # Subtract max for numerical stability
        probs = exp_scores / np.sum(exp_scores)
        return probs

    # Sample an action from the policy
    def sample_action(s, theta):
        probs = policy(s, theta)
        return np.random.choice(num_actions, p=probs)

    # Gradient of log π(a|s, θ)
    def grad_log_pi(s, a, theta):
        x_sa = phi(s, a)
        probs = policy(s, theta)
        expected_phi = np.zeros(d_policy)
        for b in range(num_actions):
            expected_phi += probs[b] * phi(s, b)
        return x_sa - expected_phi

    # State-value function v(s, w)
    def v_hat(s, w):
        return np.dot(w, x(s))

    # Main loop over episodes
    for episode in range(num_episodes):
        # Initialize state and eligibility traces
        state, _ = env.reset()
        z_theta = np.zeros(d_policy)  # Eligibility trace for θ
        z_w = np.zeros(d_value)  # Eligibility trace for w

        # Loop until episode terminates
        done = False
        while not done:
            # Sample action
            action = sample_action(state, theta)

            # Take action, observe reward and next state
            next_state, reward, terminated, truncated, _ = env.step(action)
            done = terminated or truncated

            # Compute TD error: δ = R + γ v̂(S', w) - v̂(S, w)
            v_current = v_hat(state, w)
            v_next = 0.0 if done else v_hat(next_state, w)  # v̂(S', w) = 0 if terminal
            delta = reward + gamma * v_next - v_current

            # Update eligibility traces
            grad_v = x(state)  # ∇v̂(S, w) = x(s) for linear approximation
            grad_pi = grad_log_pi(state, action, theta)  # ∇ln π(A|S, θ)
            z_w = gamma * lambda_w * z_w + grad_v
            z_theta = gamma * lambda_theta * z_theta + grad_pi

            # Update critic: w ← w + α_w δ z_w
            w += alpha_w * delta * z_w

            # Update actor: θ ← θ + α_θ δ z_θ
            theta += alpha_theta * delta * z_theta

            # Update state
            state = next_state

    return theta, policy, w, v_hat



In [23]:
import gymnasium as gym
import numpy as np

def actor_critic_traces_continuing(env, num_steps=10000, alpha_theta=0.01, alpha_w=0.1, alpha_R=0.01, gamma=0.9, lambda_w=0.8, lambda_theta=0.8):
    num_states = env.observation_space.n
    num_actions = env.action_space.n
    d_policy = num_states * num_actions  # Feature dimension for policy (state-action pairs)
    d_value = num_states  # Feature dimension for value function (states)

    # Initialize policy and value function parameters
    theta = np.zeros(d_policy)  # For π(a|s, θ)
    w = np.zeros(d_value)  # For v(s, w)
    R_bar = 0.0  # Average reward estimate

    # Initialize eligibility traces
    z_theta = np.zeros(d_policy)  # Eligibility trace for θ
    z_w = np.zeros(d_value)  # Eligibility trace for w

    # Feature function φ(s, a) for policy: one-hot encoding for state-action pairs
    def phi(s, a):
        features = np.zeros(d_policy)
        features[s * num_actions + a] = 1.0
        return features

    # Feature function x(s) for value function: one-hot encoding for states
    def x(s):
        features = np.zeros(d_value)
        features[s] = 1.0
        return features

    # Softmax policy π(a|s, θ)
    def policy(s, theta):
        scores = np.array([np.dot(theta, phi(s, a)) for a in range(num_actions)])
        exp_scores = np.exp(scores - np.max(scores))  # Subtract max for numerical stability
        probs = exp_scores / np.sum(exp_scores)
        return probs

    # Sample an action from the policy
    def sample_action(s, theta):
        probs = policy(s, theta)
        return np.random.choice(num_actions, p=probs)

    # Gradient of log π(a|s, θ)
    def grad_log_pi(s, a, theta):
        x_sa = phi(s, a)
        probs = policy(s, theta)
        expected_phi = np.zeros(d_policy)
        for b in range(num_actions):
            expected_phi += probs[b] * phi(s, b)
        return x_sa - expected_phi

    # State-value function v(s, w)
    def v_hat(s, w):
        return np.dot(w, x(s))

    # Initialize state
    state, _ = env.reset()

    # Main loop over time steps
    for step in range(num_steps):
        # Sample action
        action = sample_action(state, theta)

        # Take action, observe reward and next state
        next_state, reward, terminated, truncated, _ = env.step(action)

        # Simulate continuing environment: if terminated, reset to start state
        if terminated or truncated:
            next_state, _ = env.reset()

        # Compute TD error: δ = R - R̄ + v̂(S', w) - v̂(S, w)
        v_current = v_hat(state, w)
        v_next = v_hat(next_state, w)
        delta = reward - R_bar + v_next - v_current

        # Update average reward: R̄ ← R̄ + α_R δ
        R_bar += alpha_R * delta

        # Update eligibility traces
        grad_v = x(state)  # ∇v̂(S, w) = x(s) for linear approximation
        grad_pi = grad_log_pi(state, action, theta)  # ∇ln π(A|S, θ)
        z_w = gamma * lambda_w * z_w + grad_v
        z_theta = gamma * lambda_theta * z_theta + grad_pi

        # Update critic: w ← w + α_w δ z_w
        w += alpha_w * delta * z_w

        # Update actor: θ ← θ + α_θ δ z_θ
        theta += alpha_theta * delta * z_theta

        # Update state
        state = next_state

    return theta, policy, w, v_hat, R_bar

