In [2]:
import numpy as np

In [3]:
def _calc_value_func(theta: float) -> np.ndarray:
    """Calculates the value function

    **YOU MUST IMPLEMENT THIS FUNCTION FOR Q1**

    **DO NOT ALTER THE MDP HERE**

    Useful Variables:
    1. `self.mpd` -- Gives access to the MDP.
    2. `self.mdp.R` -- 3D NumPy array with the rewards for each transition.
        E.g. the reward of transition [3] -2-> [4] (going from state 3 to state 4 with action
        2) can be accessed with `self.R[3, 2, 4]`
        R_{t+k+1}
    3. `self.mdp.P` -- 3D NumPy array with transition probabilities.
        P(s', r | s, a), from s to s', action a, reward r
        *REMEMBER*: the sum of (STATE, ACTION, :) should be 1.0 (all actions lead somewhere)
        E.g. the transition probability of transition [3] -2-> [4] (going from state 3 to
        state 4 with action 2) can be accessed with `self.P[3, 2, 4]`

    :param theta (float): theta is the stop threshold for value iteration
    :return (np.ndarray of float with dim (num of states)):
        1D NumPy array with the values of each state.
        E.g. V[3] returns the computed value for state 3
    """
    ### zero inilization
    V = np.zeros((self.state_dim))
    while True:
        delta = 0
        V_new = np.zeros_like(V)
        # for every state, for every action, for every possible subsiquent state 
        # but only legal moves
        for state in range(self.state_dim):
            state_action_values = np.zeros((self.state_dim, self.action_dim))
            for action in range(self.action_dim):
                state_act_val = 0
                for next_state in range(self.state_dim):
                    ### consider only legal moves
                    # corresponding transition probability
                    if self.mdp.P[state, action, next_state] != 0:
                        # value in the image in the PDF
                        state_act_val += self.mdp.P[state, action, next_state] * \
                        (self.mdp.R[state, action, next_state] + self.gamma *  V[next_state])  
                # store the value (in pi(action | state)?)    
                state_action_values[state, action] = state_act_val
            # get the action with the max value, for this state
            best_action_value = np.max(state_action_values[state])
            # for all states
            V_new[state]  = best_action_value
            # definition of delta, for this state
            delta = max(delta, abs(best_action_value - V[state]))
        V = V_new    
            
        if delta < theta: # stop threshold for value iteration
            ### Update V until convergence
            break
    
    return V

In [4]:
import numpy as np

def calc_value_func_matrix(self, theta: float) -> np.ndarray:
    V = np.zeros(self.state_dim)  # Initialize value function vector
    while True:
        # Compute the expected utility of taking each action in each state
        # Reshape V to use broadcasting: (state_dim, 1) -> (state_dim, 1, 1)
        V_reshaped = V.reshape(-1, 1, 1)
        # Use broadcasting to compute the sum of rewards and discounted future values
        # for all transitions, resulting in a (state_dim, action_dim, state_dim) array
        utility = self.mdp.P * (self.mdp.R + self.gamma * V_reshaped)
        
        # Sum over next states to get the expected utility for each action in each state
        # This results in a (state_dim, action_dim) array
        action_values = np.sum(utility, axis=2)
        
        # Select the best action for each state (maximum expected utility)
        V_new = np.max(action_values, axis=1)
        
        # Compute the maximum change in the value function across all states
        delta = np.max(np.abs(V_new - V))
        
        # Update the value function
        V = V_new
        
        # Check for convergence
        if delta < theta:
            break
            
    return V


In [9]:
# Test Q1 1 

import numpy as np

class MockMDP:
    def __init__(self, state_dim, action_dim, gamma):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        # Initialize random transition probabilities and rewards
        self.P = np.random.rand(state_dim, action_dim, state_dim)
        self.R = np.random.rand(state_dim, action_dim, state_dim) - 0.5  # Rewards in [-0.5, 0.5]
        # Normalize transition probabilities
        self.P /= self.P.sum(axis=2, keepdims=True)

    def _calc_value_func(self, theta: float) -> np.ndarray:
        V = np.zeros((self.state_dim))
        while True:
            delta = 0
            V_new = np.zeros_like(V)
            for state in range(self.state_dim):
                state_action_values = np.zeros((self.state_dim, self.action_dim))
                for action in range(self.action_dim):
                    state_act_val = 0
                    for next_state in range(self.state_dim):
                        if self.P[state, action, next_state] != 0:
                            state_act_val += self.P[state, action, next_state] * \
                            (self.R[state, action, next_state] + self.gamma * V[next_state])
                    state_action_values[state, action] = state_act_val
                best_action_value = np.max(state_action_values[state])
                V_new[state] = best_action_value
                delta = max(delta, abs(best_action_value - V[state]))
            V = V_new
            if delta < theta:
                break
        return V

    def calc_value_func_matrix(self, theta: float) -> np.ndarray:
        V = np.zeros(self.state_dim)
        gamma = self.gamma
        while True:
            action_values = np.sum(self.P * (self.R + gamma * V[None, None, :]), axis=2)
            V_new = np.max(action_values, axis=1)
            delta = np.max(np.abs(V_new - V))
            V = V_new
            if delta < theta:
                break
        return V

# Test parameters
state_dim = 10  # Number of states
action_dim = 4  # Number of actions
gamma = 0.95  # Discount factor
theta = 0.01  # Threshold for stopping the iteration

# Create a mock MDP instance
mock_mdp = MockMDP(state_dim, action_dim, gamma)

# Calculate value functions using both methods
v_original = mock_mdp._calc_value_func(theta)
v_matrix = mock_mdp.calc_value_func_matrix(theta)

# Test if the outputs are similar within a tolerance
np.testing.assert_array_almost_equal(v_original, v_matrix, decimal=5)

"Test passed: The original and matrix-optimized methods produce the same results."


'Test passed: The original and matrix-optimized methods produce the same results.'

In [None]:
# Test temp

import numpy as np

class MockMDP:
    def __init__(self, state_dim, action_dim, gamma):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        # Initialize random transition probabilities and rewards
        self.P = np.random.rand(state_dim, action_dim, state_dim)
        self.R = np.random.rand(state_dim, action_dim, state_dim) - 0.5  # Rewards in [-0.5, 0.5]
        # Normalize transition probabilities
        self.P /= self.P.sum(axis=2, keepdims=True)

    def _calc_value_func(self, theta: float) -> np.ndarray:
        V = np.zeros((self.state_dim))
        return V

    def calc_value_func_matrix(self, theta: float) -> np.ndarray:
        V = np.zeros(self.state_dim)
        return V

# Test parameters
state_dim = 10  # Number of states
action_dim = 4  # Number of actions
gamma = 0.95  # Discount factor
theta = 0.01  # Threshold for stopping the iteration

# Create a mock MDP instance
mock_mdp = MockMDP(state_dim, action_dim, gamma)

# Calculate value functions using both methods
v_original = mock_mdp._calc_value_func(theta)
v_matrix = mock_mdp.calc_value_func_matrix(theta)

# Test if the outputs are similar within a tolerance
np.testing.assert_array_almost_equal(v_original, v_matrix, decimal=5)

"Test passed: The original and matrix-optimized methods produce the same results."


In [11]:
# Test Q1 2

import numpy as np

class MockMDP:
    def __init__(self, state_dim, action_dim, gamma):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        # Initialize random transition probabilities and rewards
        self.P = np.random.rand(state_dim, action_dim, state_dim)
        self.R = np.random.rand(state_dim, action_dim, state_dim) - 0.5  # Rewards in [-0.5, 0.5]
        # Normalize transition probabilities
        self.P /= self.P.sum(axis=2, keepdims=True)

    def _calc_policy(self, V: np.ndarray) -> np.ndarray:
        policy = np.zeros([self.state_dim, self.action_dim])
        
        state_action_vals = np.zeros_like(policy)
        for state in range(self.state_dim):
            for action in range(self.action_dim):
                for next_state in range(self.state_dim):
                    if self.P[state, action, next_state] != 0:
                        state_action_vals[state][action] += self.P[state, action, next_state] * \
                                        (self.R[state, action, next_state] + self.gamma * V[next_state])
            greedy_action = np.argmax(state_action_vals[state])
            policy[state, greedy_action] = 1
        
        return policy

    def calc_policy_vectorized(self, V: np.ndarray) -> np.ndarray:
        action_values = np.sum(self.P * (self.R + self.gamma * V[None, None, :]), axis=2)
        best_actions = np.argmax(action_values, axis=1)
        
        policy = np.zeros([self.state_dim, self.action_dim])
        policy[np.arange(self.state_dim), best_actions] = 1.0
        
        return policy

# Mock test for value function V
def test_policy_calculation():
    state_dim = 10
    action_dim = 4
    gamma = 0.95
    theta = 0.01
    
    # Initialize mock MDP
    mock_mdp = MockMDP(state_dim, action_dim, gamma)

    # Mock value function
    V = np.random.rand(state_dim)  # This would normally be the output of a value function calculation method

    # Calculate policies using both methods
    policy_original = mock_mdp._calc_policy(V)
    policy_vectorized = mock_mdp.calc_policy_vectorized(V)

    # Test if the policy outputs are similar within a tolerance
    np.testing.assert_array_almost_equal(policy_original, policy_vectorized)
    print("Policy calculation test passed: Both methods produce the same results.")

# Run the policy calculation test
test_policy_calculation()


Policy calculation test passed: Both methods produce the same results.


Loop-based output: [0.07635597 0.20214211 0.26271381 0.1478511 ]
Vectorized output: [0.07635597 0.20214211 0.26271381 0.1478511 ]
Test passed: Loop-based and vectorized outputs are equivalent.


In [137]:
# Test Q1 3 

import numpy as np

class AAMockMDP:
    def __init__(self, state_dim, action_dim, gamma, theta, P, R):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.theta = theta
        # Initialize random transition probabilities and rewards
        self.P = P
        self.R = R
        
    
    def policy_eval_vectorized(self, policy: np.ndarray) -> np.ndarray:
        V = np.zeros(self.state_dim)
        
        while True:
            delta = 0
            V_prev = V.copy()
            
            for state in range(state_dim):
                expected_returns = policy[state, :, None] * self.P[state] * (self.R[state] + self.gamma * V[None, :])
                # (action_dim, state_dim)
                V[state] = np.sum(expected_returns)  
 
            delta = np.max(np.abs(V - V_prev))
            if delta < self.theta:
                break    
        
        return V
    
    def _policy_eval(self, policy: np.ndarray) -> np.ndarray:
        """Computes one policy evaluation step

        **YOU MUST IMPLEMENT THIS FUNCTION FOR Q1**

        :param policy (np.ndarray of float with dim (num of states, num of actions)):
            output of _calc_policy()
            A 2D NumPy array that encodes the policy.
            It is indexed as (STATE, ACTION) where policy[STATE, ACTION] has the probability of
            taking action 'ACTION' in state 'STATE'.
            REMEMBER: the sum of policy[STATE, :] should always be 1.0
            For deterministic policies the following holds for each state S:
            policy[S, BEST_ACTION] = 1.0
            policy[S, OTHER_ACTIONS] = 0
        :return (np.ndarray of float with dim (num of states)): 
            A 1D NumPy array that encodes the computed value function
            It is indexed as (State) where V[State] is the value of state 'State'
        """
        # The following is adapted from Adam Jelley's code from https://gist.github.com/AdamJelley/d1e872426b0186980a15f1b6421250c2
        V = np.zeros(self.state_dim)
        
        while True:
            delta = 0
            for state in range(self.state_dim):
                v = 0
                for action, act_prob in enumerate(policy[state]):
                    for next_state, trans_prob in enumerate(self.P[state, action]):
                        if self.P[state, action, next_state] != 0:
                            # act_prob * value in the image in the PDF 
                            v += act_prob * trans_prob * \
                                    (self.R[state, action, next_state] + self.gamma *  V[next_state])      
                # print("scalar v", v)
                delta = max(delta, abs(v - V[state]))
                V[state] = v   
            if delta < self.theta:
            # if True: 
                ### Update value function until convergence
                break    
        
        return V

# Assuming you have filled in the _policy_eval method in MockMDP class with your provided function

def test_policy_evaluation():
    state_dim = 5
    action_dim = 2
    gamma = 0.99
    theta = 0.01
    
    # P = np.random.rand(state_dim, action_dim, state_dim)
    # R = np.random.rand(state_dim, action_dim, state_dim) - 0.5  # Rewards in [-0.5, 0.5]
    # # Normalize transition probabilities
    # P /= P.sum(axis=2, keepdims=True)
    
    # Initialize mock MDP
    mock_mdp = AAMockMDP(state_dim, action_dim, gamma, theta, P, R)
    
    # Initialize a deterministic policy for testing
    policy = np.zeros((state_dim, action_dim))
    for s in range(state_dim):
        policy[s, np.random.randint(action_dim)] = 1
    
    # Evaluate the policy using both methods
    V_vectorized = mock_mdp.policy_eval_vectorized(policy)
    V_original = mock_mdp._policy_eval(policy)
    
    
    # Test if the value function outputs are similar within a tolerance
    np.testing.assert_array_almost_equal(V_original, V_vectorized, decimal=5)
    print("Policy evaluation test passed: Both methods produce the same results.")

# Run the policy evaluation test
test_policy_evaluation()


Policy evaluation test passed: Both methods produce the same results.


In [157]:
# Test Q1 4

import numpy as np

class MockMDP:
    def __init__(self, state_dim, action_dim, gamma, theta):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.theta = theta
        # Initialize random transition probabilities and rewards
        self.P = np.random.rand(state_dim, action_dim, state_dim)
        self.R = np.random.rand(state_dim, action_dim, state_dim) - 0.5  # Rewards in [-0.5, 0.5]
        # Normalize transition probabilities
        self.P /= self.P.sum(axis=2, keepdims=True)
    
    def _policy_eval(self, policy):
        # Simple policy evaluation for testing (not optimized for performance)
        V = np.zeros(self.state_dim)
        
        while True:
            delta = 0
            # for every state, for every (action and act_prob), for every possible (subsiquent state and transition probability) 
            # but only legal moves
            for state in range(self.state_dim):
                v = 0
                for action, act_prob in enumerate(policy[state]):
                    for next_state, trans_prob in enumerate(self.P[state, action]):
                        if self.P[state, action, next_state] != 0:
                            # act_prob * value in the image in the PDF 
                            v += act_prob * trans_prob * \
                                    (self.R[state, action, next_state] + self.gamma *  V[next_state])      
                # pdb.set_trace()  
                # same as before, for every state
                # but set V  
                delta = max(delta, abs(v - V[state]))
                V[state] = v      
            if delta < self.theta:
                ### Update value function until convergence
                break    
        
        return V

    # Your _policy_improvement method adapted for the MockMDP
    def _policy_improvement(self):
        policy = np.zeros([self.state_dim, self.action_dim])
        V = np.zeros([self.state_dim])
        ### PUT YOUR CODE HERE ###
        
        # The following is adapted from Adam Jelley's code from https://gist.github.com/AdamJelley/d1e872426b0186980a15f1b6421250c2
        state_action_values = policy
        greedy_policy = np.zeros_like(policy)
        while True:   
            policy_stable = True
            V = self._policy_eval(policy)
            # for every state, for every action, for every possible subsiquent state 
            # but only legal moves
            for state in range(self.state_dim):
                state_action_values = np.zeros_like(policy)
                for action in range(self.action_dim):
                    for next_state in range(self.state_dim):
                        ### Consider only legal transitions
                        if self.P[state, action, next_state] != 0:
                            # equation in PDF
                            state_action_values[state][action] += self.P[state, action, next_state] * \
                                    (self.R[state, action, next_state] + self.gamma *  V[next_state]) 
                # ???
                if np.all(policy[state] == policy[state][0]) and \
                   np.all(state_action_values[state] == state_action_values[state][0]):
                    old_action = np.random.choice(np.arange(self.action_dim))
                    new_action =  np.random.choice(np.arange(self.action_dim))
                else: 
                    old_action = np.argmax(policy[state]) 
                    new_action = np.argmax(state_action_values[state])
                # set to 1 or 0
                for action in range(self.action_dim):
                    greedy_policy[state][action] = 1 if action == new_action else 0
                    
                if old_action != new_action:
                    policy_stable = False
                
            policy = greedy_policy
            
            if policy_stable and not(np.all(V == 0)):
                ### add a condition to account that V may become [0, 0, 0] eventhough
                ### finding optimal policy. I think this is due to np.argmax() 
                ### and random.choice()
                break
                
                
        return policy, V

    # The policy_improvement_matrix method
    def policy_improvement_matrix(self):
        policy = np.zeros([self.state_dim, self.action_dim])
        while True:
            V = self._policy_eval(policy)
            act_vals = np.sum(self.P * (self.R + self.gamma * V[None, None, :]), axis=2)
            print(act_vals.shape)
            new_policy = np.eye(self.action_dim)[np.argmax(act_vals, axis=1)]
            if (new_policy == policy).all():
                break
            policy = new_policy
        return policy, V

# Test parameters
state_dim = 5
action_dim = 3
gamma = 0.95
theta = 0.01

# Create a mock MDP instance
mock_mdp = MockMDP(state_dim, action_dim, gamma, theta)

# Calculate policies and value functions using both methods
policy1, V1 = mock_mdp._policy_improvement()
policy2, V2 = mock_mdp.policy_improvement_matrix()

# Tests to ensure that the policies and value functions are similar
np.testing.assert_array_almost_equal(V1, V2, decimal=5)
np.testing.assert_array_equal(policy1, policy2)

print("Test passed: The original and matrix-optimized methods produce the same policies and value functions.")



(5, 3)
(5, 3)
Test passed: The original and matrix-optimized methods produce the same policies and value functions.


In [182]:
# test Q2 3

import numpy as np
import random
from typing import List, Dict
from collections import defaultdict

# Assuming the existence of the original learn function as `learn_original`
# and the adapted function as `mc_first_visit_learn`


def learn_original(q_table: np.ndarray, sa_counts: np.ndarray, obses: np.ndarray, actions: np.ndarray, rewards: np.ndarray, gamma: float) -> None:
        """Updates the Q-table based on agent experience

        **YOU MUST IMPLEMENT THIS FUNCTION FOR Q2**

        :param obses (List(int)): list of received observations representing environmental states
            of trajectory (in the order they were encountered)
        :param actions (List[int]): list of indices of applied actions in trajectory (in the
            order they were applied)
        :param rewards (List[float]): list of received rewards during trajectory (in the order
            they were received)
        :return (Dict): A dictionary containing the updated Q-value of all the updated state-action pairs
            indexed by the state action pair.
        """
        updated_values = defaultdict(lambda:0)
        
        ### PUT YOUR CODE HERE ###
        ### Note to myself: this is just for one episode
        
        all_state_act_rwd_triple = [(obses[i], actions[i], rewards[i]) for i in range(len(obses))]
        ## consider single (state, act) pair
        first_state_act_episode = set([(triple[0], triple[1]) for triple in all_state_act_rwd_triple])

        for (obs, act) in first_state_act_episode:
            ## calc reward from  first visit 
            first_occurence_idx = [i for i, (fst_obs, fst_act, _) in enumerate(all_state_act_rwd_triple) if (fst_obs == obs and fst_act == act)][0]
            # print("first_occurence_idx", first_occurence_idx)
            ## calc discounted reward and average
            updated_values[( obs, act )] += sum([triple[2]*(gamma**i) for i, triple in enumerate(all_state_act_rwd_triple[first_occurence_idx:])])
            
            sa_counts[( obs, act )] += 1.0
            ## update
            q_table[(obs, act)] = updated_values[( obs, act )] / \
                                       sa_counts[( obs, act )]
                                       
        return q_table
        
        
        # return self.q_table


# def mc_first_visit_learn(q_table: np.ndarray, sa_counts: np.ndarray, obses: np.ndarray, actions: np.ndarray, rewards: np.ndarray, gamma: float) -> None:
#     """
#     Updates the Q-table based on first-visit Monte Carlo method.
    
#     :param q_table: 2D NumPy array of shape (n_states, n_actions) representing the Q-values.
#     :param sa_counts: 2D NumPy array of shape (n_states, n_actions) tracking the visit counts.
#     :param obses: 1D NumPy array of observed states in the episode.
#     :param actions: 1D NumPy array of actions taken in the episode.
#     :param rewards: 1D NumPy array of received rewards in the episode.
#     :param gamma: Discount factor.
#     """
#     episode_length = len(obses)
#     G = 0  # Initialize cumulative reward
#     for i in reversed(range(episode_length)):
#         G = gamma * G + rewards[i]  # Update cumulative reward with discount
#         obs, action = obses[i], actions[i]
        
#         # Check if this is the first visit to (obs, action)
#         if (obs, action) not in zip(obses[:i], actions[:i]):
#             sa_counts[obs, action] += 1  # Update visit count
#             # Incremental update to the average
#             q_table[obs, action] += (G - q_table[obs, action]) / sa_counts[obs, action]
            
# import numpy as np


def mc_first_visit_learn(q_table: np.ndarray, sa_counts: np.ndarray, obses: np.ndarray, actions: np.ndarray, rewards: np.ndarray, gamma: float) -> None:

    
    updated_values = defaultdict(lambda:0)
    
    state_act_rwd = list(zip(obses, actions, rewards))
    state_act = {(elem[0], elem[1]) for elem in state_act_rwd}

    for (state, act) in state_act:
        idx = [i for i, (fst_obs, fst_act, _) in enumerate(state_act_rwd) if (
            fst_obs == state and fst_act == act)][0]
        
        weighted_sum = sum(triple[2] * (gamma ** i) for i, triple in enumerate(state_act_rwd[idx:]))
        
        updated_values[(state, act)] += weighted_sum
        sa_counts[( state, act )] += 1.0
        q_table[(state, act)] = updated_values[( state, act )] / sa_counts[( state, act )]
        
    return q_table


    
    # state_act_rwd = np.vstack((obses, actions, rewards)).T
    
    # state_act = set([(elem[0], elem[1]) for elem in state_act_rwd])
    
    # for (state, act) in state_act:
    #     # reward from first visit 
    #     # np.argmax to find the first True in the boolean mask
    #     idx = np.argmax(state_act_rwd[:, 0] == state) & (state_act_rwd[:, 1] == act)
    #     print("idx", idx)
    #     rewards = np.array([triple[2] for triple in state_act_rwd[idx:]])
    #     gamma_powers = gamma ** np.arange(len(rewards))
    #     updated_values[state, act] += np.sum(np.multiply(rewards, gamma_powers))
    #     sa_counts[( state, act )] += 1.0
    #     q_table[(state, act)] = updated_values[( state, act )] / sa_counts[( state, act )]
    

    # # Unique state-action pairs and their first indices
    # state_actions, first_indices = np.unique(np.stack((obses, actions), axis=-1), axis=0, return_index=True)
    
    # # Compute discounted rewards from the first occurrence of each state-action pair
    # discounted_rewards = np.array([np.sum(rewards[idx:] * (gamma ** np.arange(len(rewards) - idx))) for idx in first_indices])
    
    # # Update Q-values
    # for i, (state, action) in enumerate(state_actions):
    #     updated_values[(state, action)] += discounted_rewards[i]
    #     sa_counts[(state, action)] += 1
    #     q_table[(state, action)] = updated_values[(state, action)] / sa_counts[(state, action)]
    
    # # return self.q_table



def generate_episode(n_states, n_actions, episode_length):
    obses = np.random.randint(0, n_states, size=episode_length)
    actions = np.random.randint(0, n_actions, size=episode_length)
    rewards = np.random.randn(episode_length)  # Normal distribution for rewards
    return obses, actions, rewards

def test_mc_update_equivalence(n_states=10, n_actions=5, episode_length=20, gamma=0.95):
    obses, actions, rewards = generate_episode(n_states, n_actions, episode_length)
    
    # Initialize Q-Table and counts for original function (dictionary-based)
    q_table_original = {}
    sa_counts_original = {}
    for obs in range(n_states):
        for action in range(n_actions):
            q_table_original[(obs, action)] = 0
            sa_counts_original[(obs, action)] = 0
    
    # Initialize Q-Table and counts for adapted function (NumPy arrays)
    q_table_adapted = np.zeros((n_states, n_actions))
    sa_counts_adapted = np.zeros((n_states, n_actions))
    
    # Run original function (adapt it to process a single episode)
    # You will need to implement this part based on the `learn` function you have
    return_org = learn_original(q_table_adapted, sa_counts_adapted, obses, actions, rewards, gamma)
    
    # Run adapted function
    return_new = mc_first_visit_learn(q_table_adapted, sa_counts_adapted, obses, actions, rewards, gamma)
    
    np.testing.assert_array_almost_equal(return_org, return_new, decimal=5)
    
    
    # Convert adapted Q-Table and counts to dictionary format for comparison
    # q_table_adapted_dict = {(obs, action): q_table_adapted[obs, action] for obs in range(n_states) for action in range(n_actions)}
    # sa_counts_adapted_dict = {(obs, action): sa_counts_adapted[obs, action] for obs in range(n_states) for action in range(n_actions)}
    
    # # Compare the Q-tables and counts
    # assert q_table_original == q_table_adapted_dict, "Q-Tables do not match."
    # assert sa_counts_original == sa_counts_adapted_dict, "Counts do not match."
    
    # print("Test passed: Both implementations produce equivalent results.")

# Example usage
test_mc_update_equivalence()


In [203]:
# Correctly implemented numpy version
def numpy_discounted_rewards_correct(rewards, gamma):
    # print(np.arange(len(rewards))[::-1])
    discounted_rewards = rewards[::-1] * np.power(gamma, np.arange(len(rewards))[::-1])
    return np.flip(np.cumsum(discounted_rewards)).dtype

def original_discounted_rewards(rewards, gamma):   
    discounted_rewards = [rewards[t] * (gamma ** t) for t in range(len(rewards))][::-1]
    return np.flip(np.cumsum(discounted_rewards)).dtype

# Test case
rewards = [1, 2, 3, 4]
gamma = 0.9

# Applying both methods
original_rewards_result = original_discounted_rewards(rewards, gamma)
numpy_rewards_result = numpy_discounted_rewards_correct(rewards, gamma)

# Test to verify matching results
test_result_correct = original_rewards_result == numpy_rewards_result

test_result_correct, original_rewards_result, numpy_rewards_result


(True, dtype('float64'), dtype('float64'))

In [199]:
defaultdict(lambda:0)

defaultdict(<function __main__.<lambda>()>, {})