# RL

In [None]:
import numpy as np

def max_entropy_policy_optimization(states, actions, rewards, transitions, temperature=1.0, gamma=0.99, n_iterations=1000, tolerance=1e-6):
    """
    Find optimal policy using Maximum Entropy Reinforcement Learning (Soft Q-Learning).

    Args:
        states: list/array of state indices [0, 1, 2, ..., n_states-1]
        actions: list/array of action indices [0, 1, 2, ..., n_actions-1]
        rewards: dict or 2D array, rewards[state][action] = reward value
        transitions: dict or 3D array, transitions[state][action] = next_state distribution
        temperature: float, controls exploration (higher = more exploration)
        gamma: float, discount factor for future rewards
        n_iterations: int, maximum number of value iteration steps
        tolerance: float, convergence threshold

    Returns:
        policy: 2D array of shape (n_states, n_actions) with action probabilities
        q_values: 2D array of shape (n_states, n_actions) with soft Q-values
    """
    # Step 1: Initialize dimensions
    n_states = len(states)  # Total number of states in the MDP
    n_actions = len(actions)  # Total number of actions available

    # Step 2: Initialize Q-values (soft Q-function) to zeros
    q_values = np.zeros((n_states, n_actions))                                                                  # Q[s, a] represents expected return from taking action a in state s
    # Step 3: Value iteration loop to learn optimal Q-values
    for iteration in range(n_iterations):

        q_old = q_values.copy()                                                                                 # Store previous Q-values to check convergence
        # Step 4: Update Q-value for each state-action pair
        for s in states:
            for a in actions:
                # Step 4a: Get immediate reward for this state-action pair
                r = rewards[s][a] if isinstance(rewards, dict) else rewards[s, a]
                # Step 4b: Get next state distribution (could be deterministic or stochastic)
                next_state_dist = transitions[s][a] if isinstance(transitions, dict) else transitions[s, a]
                # Step 4c: Calculate expected soft value of next state
                expected_next_value = 0                                                                         # This is where maximum entropy comes in

                # Handle both deterministic (single next state) and stochastic transitions
                if isinstance(next_state_dist, (int, np.integer)):
                    s_next = next_state_dist                                                                    # Deterministic transition to single next state
                    # Step 4d: Compute soft value (max-ent value function)
                    soft_value = temperature * np.log(np.sum(np.exp(q_old[s_next] / temperature)))              # V(s) = temperature * log(sum(exp(Q(s,a)/temperature)))
                    expected_next_value = soft_value
                else:
                    # Stochastic transition over multiple next states
                    for s_next, prob in enumerate(next_state_dist):
                        if prob > 0:
                            soft_value = temperature * np.log(np.sum(np.exp(q_old[s_next] / temperature)))      # Compute soft value for this next state
                            expected_next_value += prob * soft_value                                            # Weight by transition probability

                # Step 4e: Bellman backup with soft value function
                # Q(s,a) = r(s,a) + gamma * E[V(s')]
                q_values[s, a] = r + gamma * expected_next_value

        # Step 5: Check for convergence
        # If Q-values haven't changed much, we've found the optimal Q-function
        max_change = np.max(np.abs(q_values - q_old))
        if max_change < tolerance:
            print(f"Converged after {iteration + 1} iterations")
            break

    # Step 6: Extract optimal policy from Q-values using Boltzmann distribution
    # This gives us a stochastic policy that trades off reward and entropy
    # pi(a|s) = exp(Q(s,a)/temperature) / sum(exp(Q(s,a')/temperature))
    policy = np.zeros((n_states, n_actions))
    for s in states:
        # Step 6a: Compute unnormalized probabilities using softmax
        exp_q = np.exp(q_values[s] / temperature)
        # Step 6b: Normalize to get valid probability distribution
        policy[s] = exp_q / np.sum(exp_q)

    # Step 7: Return the optimal maximum entropy policy and Q-values
    return policy, q_values


# Example usage:
if __name__ == "__main__":
    # Define a simple 3-state, 2-action MDP
    states = [0, 1, 2]
    actions = [0, 1]

    # Reward function: rewards[state][action]
    rewards = {
        0: [1.0, 0.5],   # State 0: action 0 gives reward 1.0, action 1 gives 0.5
        1: [0.0, 2.0],   # State 1: action 0 gives reward 0.0, action 1 gives 2.0
        2: [0.5, 0.5]    # State 2: both actions give reward 0.5
    }

    # Transition function: transitions[state][action] = next_state
    # (deterministic transitions for simplicity)
    transitions = {
        0: [1, 2],  # State 0: action 0 → state 1, action 1 → state 2
        1: [2, 0],  # State 1: action 0 → state 2, action 1 → state 0
        2: [0, 1]   # State 2: action 0 → state 0, action 1 → state 1
    }

    # Find optimal policy
    policy, q_values = max_entropy_policy_optimization(
        states, actions, rewards, transitions,
        temperature=1.0,  # Controls exploration level
        gamma=0.95,       # Discount factor
        n_iterations=100
    )

    print("\nOptimal Policy (probability of each action in each state):")
    print(policy)
    print("\nQ-Values:")
    print(q_values)


Optimal Policy (probability of each action in each state):
[[0.75980863 0.24019137]
 [0.09539876 0.90460124]
 [0.40078342 0.59921658]]

Q-Values:
[[33.0680211  31.9163905 ]
 [31.4163905  33.66581918]
 [32.16581918 32.5680211 ]]


# IRL

In [1]:
import numpy as np

def max_entropy_inverse_rl(states, actions, demonstrations, transitions,
                           feature_matrix, temperature=1.0, gamma=0.99,
                           n_iterations=100, learning_rate=0.01):
    """
    Find reward function from expert demonstrations using Maximum Entropy Inverse RL.

    Args:
        states: list/array of state indices [0, 1, 2, ..., n_states-1]
        actions: list/array of action indices [0, 1, 2, ..., n_actions-1]
        demonstrations: list of trajectories, each trajectory is list of (state, action) tuples
        transitions: dict or 3D array, transitions[state][action] = next_state
        feature_matrix: 2D array of shape (n_states, n_features), features for each state
        temperature: float, controls exploration assumption in expert policy
        gamma: float, discount factor for future rewards
        n_iterations: int, number of gradient descent iterations
        learning_rate: float, step size for gradient updates

    Returns:
        reward_weights: 1D array of learned reward function weights
        recovered_rewards: 1D array of recovered reward for each state
    """
    # Step 1: Initialize dimensions
    n_states = len(states)  # Total number of states in the MDP
    n_actions = len(actions)  # Total number of actions available
    n_features = feature_matrix.shape[1]  # Number of features describing each state

    # Step 2: Initialize reward weights randomly
    # The reward function is parameterized as: r(s) = w^T * φ(s)
    # where w are weights and φ(s) are features of state s
    reward_weights = np.random.randn(n_features) * 0.01

    # Step 3: Compute empirical feature expectations from expert demonstrations
    # This represents what features the expert actually visits
    # μ_E = (1/N) * Σ_trajectories Σ_t γ^t * φ(s_t)
    empirical_feature_expectations = np.zeros(n_features)
    total_steps = 0  # Count total timesteps across all demonstrations

    # Step 3a: Iterate through each expert trajectory
    for trajectory in demonstrations:
        # Step 3b: Accumulate discounted features along the trajectory
        for t, (state, action) in enumerate(trajectory):
            # Discount factor applied based on timestep
            discount = gamma ** t
            # Add discounted features of this state to expectation
            empirical_feature_expectations += discount * feature_matrix[state]
            total_steps += 1

    # Step 3c: Normalize by number of timesteps to get average
    empirical_feature_expectations /= len(demonstrations)

    # Step 4: Gradient descent loop to learn reward weights
    for iteration in range(n_iterations):
        # Step 4a: Compute rewards from current weight estimate
        # r(s) = w^T * φ(s) for each state
        rewards = feature_matrix @ reward_weights  # Matrix multiplication: (n_states, n_features) @ (n_features,) = (n_states,)

        # Step 4b: Expand rewards to state-action pairs
        # Since reward is typically state-based, same reward for all actions in a state
        rewards_sa = np.tile(rewards, (n_actions, 1)).T  # Shape: (n_states, n_actions)

        # Step 4c: Run forward RL to get policy under current reward estimate
        # This uses soft Q-learning (maximum entropy RL) to find what policy
        # would be optimal if the current reward estimate were correct
        q_values = np.zeros((n_states, n_actions))

        # Step 4d: Soft value iteration to compute Q-values
        for _ in range(50):  # Inner loop for value iteration convergence
            q_old = q_values.copy()

            # Update Q-value for each state-action pair
            for s in states:
                for a in actions:
                    # Get immediate reward for this state-action pair
                    r = rewards_sa[s, a]

                    # Get next state from transition function
                    next_state = transitions[s][a] if isinstance(transitions, dict) else transitions[s, a]

                    # Handle deterministic transitions
                    if isinstance(next_state, (int, np.integer)):
                        s_next = next_state
                        # Compute soft value: V(s) = temperature * log(Σ exp(Q(s,a)/temperature))
                        soft_value = temperature * np.log(np.sum(np.exp(q_old[s_next] / temperature)))
                        expected_next_value = soft_value
                    else:
                        # Handle stochastic transitions
                        expected_next_value = 0
                        for s_next, prob in enumerate(next_state):
                            if prob > 0:
                                soft_value = temperature * np.log(np.sum(np.exp(q_old[s_next] / temperature)))
                                expected_next_value += prob * soft_value

                    # Bellman backup: Q(s,a) = r(s,a) + γ * E[V(s')]
                    q_values[s, a] = r + gamma * expected_next_value

        # Step 4e: Extract policy from Q-values using Boltzmann distribution
        # π(a|s) = exp(Q(s,a)/temperature) / Σ_a' exp(Q(s,a')/temperature)
        policy = np.zeros((n_states, n_actions))
        for s in states:
            exp_q = np.exp(q_values[s] / temperature)
            policy[s] = exp_q / np.sum(exp_q)

        # Step 4f: Compute expected feature counts under current policy
        # This represents what features our current policy would visit
        # We need to compute state visitation frequencies first

        # Step 4g: Compute state visitation frequencies
        # This tells us how often each state is visited under current policy
        # Using a simplified approach: iterative computation of discounted state frequencies
        state_freq = np.zeros(n_states)

        # Initialize with starting states from demonstrations
        for trajectory in demonstrations:
            if len(trajectory) > 0:
                start_state = trajectory[0][0]
                state_freq[start_state] += 1.0
        state_freq /= len(demonstrations)

        # Propagate frequencies forward through state transitions
        for _ in range(20):  # Iterate to convergence
            new_freq = state_freq.copy()
            for s in states:
                for a in actions:
                    # Get transition to next state
                    next_state = transitions[s][a] if isinstance(transitions, dict) else transitions[s, a]

                    # Handle deterministic transitions
                    if isinstance(next_state, (int, np.integer)):
                        s_next = next_state
                        # Accumulate frequency: how often we reach s_next from s via action a
                        new_freq[s_next] += gamma * state_freq[s] * policy[s, a]
                    else:
                        # Handle stochastic transitions
                        for s_next, prob in enumerate(next_state):
                            if prob > 0:
                                new_freq[s_next] += gamma * state_freq[s] * policy[s, a] * prob

            state_freq = new_freq / np.sum(new_freq)  # Normalize

        # Step 4h: Compute expected feature counts from state visitation frequencies
        # μ(π) = Σ_s P(s|π) * φ(s)
        # This is what features we expect to see under our current policy
        expected_feature_counts = state_freq @ feature_matrix  # Shape: (n_features,)

        # Step 4i: Compute gradient of the objective
        # The gradient of maximum entropy IRL objective is:
        # ∇_w L = μ_E - μ(π_w)
        # We want to match expert feature expectations with policy feature expectations
        gradient = empirical_feature_expectations - expected_feature_counts

        # Step 4j: Update reward weights using gradient ascent
        # Move weights in direction that makes policy match expert demonstrations better
        reward_weights += learning_rate * gradient

        # Step 4k: Print progress every 10 iterations
        if (iteration + 1) % 10 == 0:
            # Measure how close we are to matching expert behavior
            feature_diff = np.linalg.norm(empirical_feature_expectations - expected_feature_counts)
            print(f"Iteration {iteration + 1}: Feature expectation difference = {feature_diff:.6f}")

    # Step 5: Compute final recovered rewards for each state
    # r(s) = w^T * φ(s)
    recovered_rewards = feature_matrix @ reward_weights

    # Step 6: Return learned reward weights and recovered reward function
    return reward_weights, recovered_rewards


# Example usage:
if __name__ == "__main__":
    # Define a simple 4-state, 2-action MDP
    states = [0, 1, 2, 3]
    actions = [0, 1]

    # Transition function: transitions[state][action] = next_state
    transitions = {
        0: [1, 2],  # State 0: action 0 → state 1, action 1 → state 2
        1: [2, 3],  # State 1: action 0 → state 2, action 1 → state 3
        2: [3, 0],  # State 2: action 0 → state 3, action 1 → state 0
        3: [0, 1]   # State 3: action 0 → state 0, action 1 → state 1
    }

    # Feature matrix: each state described by 2 features
    # For example: [distance_to_goal, safety_level]
    feature_matrix = np.array([
        [0.0, 0.5],  # State 0 features
        [0.3, 0.7],  # State 1 features
        [0.7, 0.3],  # State 2 features
        [1.0, 1.0]   # State 3 features (goal state with high values)
    ])

    # Expert demonstrations: list of trajectories
    # Each trajectory is a sequence of (state, action) pairs
    # Expert seems to prefer reaching state 3
    demonstrations = [
        [(0, 0), (1, 1), (3, 1)],  # Trajectory 1: 0→1→3→1
        [(0, 1), (2, 0), (3, 0)],  # Trajectory 2: 0→2→3→0
        [(1, 1), (3, 1), (1, 1)],  # Trajectory 3: 1→3→1→3
    ]

    # Run inverse RL to recover reward function
    reward_weights, recovered_rewards = max_entropy_inverse_rl(
        states, actions, demonstrations, transitions, feature_matrix,
        temperature=1.0,
        gamma=0.9,
        n_iterations=50,
        learning_rate=0.1
    )

    print("\nLearned Reward Weights:")
    print(reward_weights)
    print("\nRecovered Rewards for each state:")
    for s in states:
        print(f"State {s}: {recovered_rewards[s]:.3f}")

Iteration 10: Feature expectation difference = 1.421723
Iteration 20: Feature expectation difference = 1.352766
Iteration 30: Feature expectation difference = 1.309204
Iteration 40: Feature expectation difference = 1.281922
Iteration 50: Feature expectation difference = 1.265306

Learned Reward Weights:
[3.59460848 5.69581094]

Recovered Rewards for each state:
State 0: 2.848
State 1: 5.065
State 2: 4.225
State 3: 9.290
