In [7]:
!pip install gymnasium==1.0.0 moviepy==1.0.3

Looking in indexes: https://pypi.org/simple, https://pypi.ngc.nvidia.com



[notice] A new release of pip is available: 24.2 -> 25.3
[notice] To update, run: C:\Users\dorfe\AppData\Local\Microsoft\WindowsApps\PythonSoftwareFoundation.Python.3.10_qbz5n2kfra8p0\python.exe -m pip install --upgrade pip


In [8]:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
from IPython.display import display, clear_output, HTML
from IPython.display import Video
from moviepy.editor import ImageSequenceClip
# from google.colab import files
import imageio
import time

# Assignment 1: Policy Iteration in the Repeated Prisoner's Dilemma
--- 
* Authors: Sara, Dor
* IDs: 
---

### Step 1: install and import
---

In [None]:
# Install required packages
!pip install gymnasium==1.0.0 moviepy==1.0.3

In [17]:
# Imports and Setup

import gymnasium as gym
from gymnasium import spaces
import numpy as np
import random
from enum import Enum
from typing import Tuple, Dict, Any, Optional

# Set seed for reproducibility
random.seed(42)
np.random.seed(42)

### Step 2: MDP Components - Constants and Definitions
---

In [22]:
class Action(Enum):
    """
    Action Space A = {COOPERATE, DEFECT}

    In RL terms: These are the actions available to the agent at each timestep.
    """
    COOPERATE = 0  # a = 0
    DEFECT = 1     # a = 1


class OpponentStrategy(Enum):
    """
    The opponent's policy (fixed, not learned).

    In RL terms: The opponent is part of the ENVIRONMENT, not the agent.
    The opponent's policy determines part of the transition dynamics P(s'|s,a).
    """
    ALL_C = "all_c"                  # π_opp(s) = COOPERATE for all s
    ALL_D = "all_d"                  # π_opp(s) = DEFECT for all s
    TFT = "tft"                      # π_opp(s) = agent's last action
    IMPERFECT_TFT = "imperfect_tft"  # π_opp(s) = agent's last action + noise


class MemoryType(Enum):
    """
    Determines the State Space S.

    Memory-1: |S| = 4 states
    Memory-2: |S| = 16 states
    """
    MEMORY_1 = 1
    MEMORY_2 = 2




Reward Function R(s, a, s') - but since opponent action determines s',
we can express this as R(my_action, opponent_action)

The immediate reward depends on the joint action (my_action, opponent_action)
This is the REWARD FUNCTION of our MDP

| You \ Opponent | Cooperate (C) | Defect (D) |
|----------------|---------------|------------|
| Cooperate (C)  | R = 3         | S = 0      |
| Defect (D)     | T = 5         | P = 1      |

In [23]:
REWARD_FUNCTION = {
    # R(a_agent, a_opponent) -> immediate reward
    (Action.COOPERATE, Action.COOPERATE): 3.0,  # R: Mutual cooperation
    (Action.COOPERATE, Action.DEFECT): 0.0,     # S: Sucker's payoff
    (Action.DEFECT, Action.COOPERATE): 5.0,     # T: Temptation payoff
    (Action.DEFECT, Action.DEFECT): 1.0,        # P: Mutual defection
}

# Transition noise for Imperfect TFT
SLIP_PROBABILITY = 0.1  # ε = 0.1

### Step 3: The MDP Environment
---

Repeated Prisoner's Dilemma as a Markov Decision Process (MDP).
    
MDP Components
--------------
S (State Space):
- Memory-1: S = {(a_{t-1}, b_{t-1})} where a=agent action, b=opponent action
        |S| = 2 × 2 = 4 states
- Memory-2: S = {(a_{t-2}, a_{t-1}, b_{t-2}, b_{t-1})}
        |S| = 2 × 2 × 2 × 2 = 16 states

A (Action Space):
- A = {COOPERATE (0), DEFECT (1)}
- |A| = 2 actions

P(s'|s, a) (Transition Model):
- Depends on opponent's policy (part of environment dynamics).
- Deterministic opponents (ALL_C, ALL_D, TFT): P(s'|s,a) ∈ {0, 1}
- Stochastic opponent (IMPERFECT_TFT): P(s'|s,a) ∈ {0.1, 0.9}

R(s, a) (Reward Function):
- Expected immediate reward when taking action a in state s.
- For deterministic opponents: R(s,a) = r (the payoff)
- For stochastic opponents: R(s,a) = Σ P(s'|s,a) × r(s,a,s')

γ (Discount Factor):
- Specified when running Policy Iteration, not stored in environment.

Parameters
----------
opponent_strategy : str
- The fixed policy of the opponent: 'all_c', 'all_d', 'tft', 'imperfect_tft'
memory : int
- Determines state space size: 1 for Memory-1, 2 for Memory-2

In [None]:
class PrisonersDilemmaMDP(gym.Env):
    metadata = {"render_modes": ["human"]}

    def __init__(
        self,
        opponent_strategy: str = "tft",
        memory: int = 1
    ):
        """Initialize the MDP."""
        super().__init__()

        # =====================================================================
        # Store MDP Configuration
        # =====================================================================

        # Convert to enums
        if isinstance(opponent_strategy, str):
            self.opponent_strategy = OpponentStrategy(opponent_strategy.lower())
        else:
            self.opponent_strategy = opponent_strategy

        if isinstance(memory, int):
            self.memory = MemoryType(memory)
        else:
            self.memory = memory

        # =====================================================================
        # Define Action Space A
        # =====================================================================
        # A = {0: COOPERATE, 1: DEFECT}
        # |A| = 2
        self.action_space = spaces.Discrete(2)
        self.num_actions = 2

        # =====================================================================
        # Define State Space S
        # =====================================================================
        if self.memory == MemoryType.MEMORY_1:
            # S = {(a_{t-1}, b_{t-1})}
            # States: (C,C), (C,D), (D,C), (D,D)
            # |S| = 4
            self.observation_space = spaces.MultiDiscrete([2, 2])
            self.num_states = 4
        else:
            # S = {(a_{t-2}, a_{t-1}, b_{t-2}, b_{t-1})}
            # |S| = 16
            self.observation_space = spaces.MultiDiscrete([2, 2, 2, 2])
            self.num_states = 16

        # =====================================================================
        # History Tracking (needed to determine current state)
        # =====================================================================
        self.agent_history = []
        self.opponent_history = []

        # Episode tracking
        self.timestep = 0
        self.cumulative_reward = 0.0

    # =========================================================================
    # Core MDP Methods
    # =========================================================================

    def reset(
        self,
        seed: Optional[int] = None,
        options: Optional[dict] = None
    ) -> Tuple[np.ndarray, Dict]:
        """
        Reset the MDP to initial state s_0.

        Initial State Convention:
            We assume "phantom" cooperation before t=0.
            - Memory-1: s_0 = (C, C)
            - Memory-2: s_0 = (C, C, C, C)

        Returns
        -------
        state : np.ndarray
            Initial state s_0
        info : dict
            Additional information
        """
        super().reset(seed=seed)

        # Reset tracking
        self.timestep = 0
        self.cumulative_reward = 0.0

        # Initialize history (phantom cooperation)
        if self.memory == MemoryType.MEMORY_1:
            self.agent_history = [Action.COOPERATE.value]
            self.opponent_history = [Action.COOPERATE.value]
            initial_state = np.array([0, 0], dtype=np.int64)
        else:
            self.agent_history = [Action.COOPERATE.value, Action.COOPERATE.value]
            self.opponent_history = [Action.COOPERATE.value, Action.COOPERATE.value]
            initial_state = np.array([0, 0, 0, 0], dtype=np.int64)

        return initial_state, {}

    def step(self, action: int) -> Tuple[np.ndarray, float, bool, bool, Dict]:
        """
        Execute one step in the MDP.

        Given current state s and action a:
            1. Environment samples s' ~ P(s'|s,a)
            2. Agent receives reward r = R(s,a) or R(s,a,s')
            3. State transitions: s ← s'

        Parameters
        ----------
        action : int
            Agent's action a ∈ A = {0, 1}

        Returns
        -------
        next_state : np.ndarray
            The new state s'
        reward : float
            Immediate reward r
        terminated : bool
            Whether episode ended (False for infinite horizon)
        truncated : bool
            Whether episode was cut off (False)
        info : dict
            Debugging information
        """
        agent_action = Action(action)

        # =====================================================================
        # Step 1: Environment determines opponent action based on P(s'|s,a)
        # =====================================================================
        opponent_action = self._get_opponent_action()

        # =====================================================================
        # Step 2: Compute immediate reward R(s, a)
        # =====================================================================
        reward = self._get_reward(agent_action, opponent_action)

        # =====================================================================
        # Step 3: Update history and compute next state s'
        # =====================================================================
        self.agent_history.append(agent_action.value)
        self.opponent_history.append(opponent_action.value)

        next_state = self._get_current_state()

        # =====================================================================
        # Step 4: Update statistics
        # =====================================================================
        self.timestep += 1
        self.cumulative_reward += reward

        # Infinite horizon MDP - never terminates
        terminated = False
        truncated = False

        info = {
            "agent_action": agent_action.name,
            "opponent_action": opponent_action.name,
            "timestep": self.timestep,
            "cumulative_reward": self.cumulative_reward
        }

        return next_state, reward, terminated, truncated, info

    # =========================================================================
    # Transition Model P(s'|s, a)
    # =========================================================================

    def _get_opponent_action(self) -> Action:
        """
        Sample opponent's action based on their fixed policy.

        This is part of the Transition Model P(s'|s,a).
        The opponent's policy determines how the environment responds
        to the agent's action.

        Returns
        -------
        Action
            The opponent's action this timestep
        """
        if self.opponent_strategy == OpponentStrategy.ALL_C:
            # Deterministic: π_opp(s) = C always
            # P(opponent=C | s, a) = 1.0
            return Action.COOPERATE

        elif self.opponent_strategy == OpponentStrategy.ALL_D:
            # Deterministic: π_opp(s) = D always
            # P(opponent=D | s, a) = 1.0
            return Action.DEFECT

        elif self.opponent_strategy == OpponentStrategy.TFT:
            # Deterministic: π_opp(s) = agent's last action
            # P(opponent=a_{t-1} | s, a) = 1.0
            last_agent_action = self.agent_history[-1]
            return Action(last_agent_action)

        elif self.opponent_strategy == OpponentStrategy.IMPERFECT_TFT:
            # Stochastic: opponent tries to copy but slips with prob ε
            # P(opponent=a_{t-1} | s, a) = 0.9
            # P(opponent≠a_{t-1} | s, a) = 0.1
            intended_action = Action(self.agent_history[-1])

            if random.random() < SLIP_PROBABILITY:
                # Slip: do opposite with probability ε = 0.1
                return Action(1 - intended_action.value)
            else:
                # No slip: do intended with probability 1-ε = 0.9
                return intended_action

        else:
            raise ValueError(f"Unknown opponent strategy: {self.opponent_strategy}")

    # =========================================================================
    # Reward Function R(s, a)
    # =========================================================================

    def _get_reward(self, agent_action: Action, opponent_action: Action) -> float:
        """
        Compute immediate reward R(s, a).

        In this MDP, the reward depends on the joint action.
        Since opponent's action is determined by the transition model,
        R(s, a) is well-defined.

        Parameters
        ----------
        agent_action : Action
            Agent's action a
        opponent_action : Action
            Opponent's action (sampled from environment dynamics)

        Returns
        -------
        float
            Immediate reward r
        """
        return REWARD_FUNCTION[(agent_action, opponent_action)]

    # =========================================================================
    # State Representation
    # =========================================================================

    def _get_current_state(self) -> np.ndarray:
        """
        Construct current state s from history.

        Returns
        -------
        np.ndarray
            Current state observation
        """
        if self.memory == MemoryType.MEMORY_1:
            # s = (a_{t-1}, b_{t-1})
            return np.array([
                self.agent_history[-1],
                self.opponent_history[-1]
            ], dtype=np.int64)
        else:
            # s = (a_{t-2}, a_{t-1}, b_{t-2}, b_{t-1})
            return np.array([
                self.agent_history[-2],
                self.agent_history[-1],
                self.opponent_history[-2],
                self.opponent_history[-1]
            ], dtype=np.int64)

    # =========================================================================
    # Helper Methods for Policy Iteration (Part III)
    # =========================================================================

    def state_to_index(self, state: np.ndarray) -> int:
        """
        Convert state array to unique index in {0, 1, ..., |S|-1}.

        Useful for tabular methods like Policy Iteration.

        Memory-1 mapping:
            (C,C)=0, (C,D)=1, (D,C)=2, (D,D)=3

        Memory-2 mapping:
            Binary encoding: 0-15
        """
        if self.memory == MemoryType.MEMORY_1:
            return state[0] * 2 + state[1]
        else:
            return state[0] * 8 + state[1] * 4 + state[2] * 2 + state[3]

    def index_to_state(self, index: int) -> np.ndarray:
        """
        Convert state index back to state array.

        Inverse of state_to_index.
        """
        if self.memory == MemoryType.MEMORY_1:
            return np.array([index // 2, index % 2], dtype=np.int64)
        else:
            return np.array([
                (index >> 3) & 1,
                (index >> 2) & 1,
                (index >> 1) & 1,
                index & 1
            ], dtype=np.int64)

    def get_all_states(self) -> list:
        """
        Return list of all states in S.

        Returns
        -------
        list of np.ndarray
            All possible states
        """
        return [self.index_to_state(i) for i in range(self.num_states)]

    def render(self, mode: str = "human"):
        """Display current MDP state for debugging."""
        print(f"\n===== Timestep t={self.timestep} =====")
        print(f"Agent history (last 5):    {['C' if a==0 else 'D' for a in self.agent_history[-5:]]}")
        print(f"Opponent history (last 5): {['C' if a==0 else 'D' for a in self.opponent_history[-5:]]}")
        print(f"Cumulative reward: {self.cumulative_reward}")
