In [29]:
import gymnasium
from gymnasium import spaces
import numpy as np

In [30]:
class CompetitiveAGIEnv(gymnasium.Env):
    """Competitive AGI Race Environment with Two Parties and State Validation"""

    def __init__(self):
        super(CompetitiveAGIEnv, self).__init__()

        # Defining collaboration modes
        self.COLLAB_MODES = {
            'INDEPENDENT': 0,   # Both parties working independently (never collaborated)
            'COLLABORATIVE': 1, # Both parties actively collaborating
            'POST_COLLAB': 2,   # Both previously collaborated but now independent
        }
        
        # Defining party statuses
        self.PARTY_STATUS = {
            'EXPLORING': 0,     # Actively exploring
            'RETREATED': 1,     # Retreated
            'FOUND_AGI': 2,     # Found AGI
        }
        
        # Collaboration mode (applies to both parties)
        self.collab_mode = self.COLLAB_MODES['INDEPENDENT']
        
        # Individual party statuses
        self.party_1_status = self.PARTY_STATUS['EXPLORING']
        self.party_2_status = self.PARTY_STATUS['EXPLORING']
        
        # Track which party has broken collaboration
        self.party_1_broke_collab = False
        self.party_2_broke_collab = False
        
        # Track resources
        self.party_1_resources = 100
        self.party_2_resources = 100
        
        # 4 actions for each party:
        # 0 = Explore independently
        # 1 = Retreat (take a break)
        # 2 = Initiate/continue collaboration
        # 3 = Break collaboration (if currently collaborating)
        # Each party chooses one action, so the combined action space is (4, 4)
        self.action_space = spaces.Tuple((spaces.Discrete(4), spaces.Discrete(4)))
        
        # The observation space includes:
        # - Collaboration mode (0, 1, 2)
        # - Party 1 status (0, 1, 2)
        # - Party 2 status (0, 1, 2)
        # - Party 1 resources (float)
        # - Party 2 resources (float)
        self.observation_space = spaces.Tuple((
            spaces.Discrete(3),  # collaboration mode
            spaces.Discrete(3),  # party_1_status
            spaces.Discrete(3),  # party_2_status
            spaces.Box(low=0, high=float('inf'), shape=(1,), dtype=np.float32),  # party_1_resources
            spaces.Box(low=0, high=float('inf'), shape=(1,), dtype=np.float32),  # party_2_resources
        ))
        
        # Base probabilities for AGI discovery
        self.independent_agi_prob = 0.01
        self.collaborative_agi_prob = 0.03
        self.post_collab_agi_prob = 0.015
        
        # Resource dynamics
        self.explore_independent_cost = 1
        self.explore_collaborative_cost = 3
        self.retreat_reward = 10
        self.collaboration_initiation_cost = 5
        self.agi_reward = 100
        
        # Competition penalty: if the opponent finds AGI first
        self.competition_penalty = -50
        
        # Max environment steps
        self.max_steps = 1000
        self.current_step = 0

        # Defining the transition probabilities and rewards for each party
        # Structure: {(collab_mode, party_status): {action: [{next_status, prob, reward, effects}, ...]}}
        self.party_1_transitions = {
            (self.PARTY_STATUS['EXPLORING'], self.COLLAB_MODES['INDEPENDENT']): {
                0: [  # Explore independently
                    {"next_status": self.PARTY_STATUS['FOUND_AGI'], "prob": self.independent_agi_prob, 
                        "reward": self.agi_reward, "resource_change": -self.explore_independent_cost,
                        "effects": [("party_2_reward", self.competition_penalty, 
                                    lambda: self.party_2_status == self.PARTY_STATUS['EXPLORING'])]},
                    {"next_status": self.PARTY_STATUS['EXPLORING'], "prob": 1 - self.independent_agi_prob, 
                        "reward": -1, "resource_change": -self.explore_independent_cost, "effects": []}                        #1. CHANGED REWARD FROM 0 TO -1
                ],
                1: [  # Retreat
                    {"next_status": self.PARTY_STATUS['RETREATED'], "prob": 1.0, 
                        "reward": self.retreat_reward, "resource_change": self.retreat_reward, "effects": []}
                ],
                2: [  # Try to initiate collaboration. COLLABORATIVE CASE ALREADY HANDLED EARLIER
                    {"next_status": self.PARTY_STATUS['EXPLORING'], "prob": 1.0, 
                        "reward": -5, "resource_change": 0, "effects": []}  # Actual collab handled separately                 #3. CHANGED REWARD FROM 0 TO -5
                ],
                # 3: [  # Invalid in this state                                                                             #2. REMOVED INVALID ACTIONS
                #     {"next_status": self.PARTY_STATUS['EXPLORING'], "prob": 1.0, 
                #      "reward": -5, "resource_change": 0, "effects": []}  # Penalty for invalid action
                # ]
            },
            (self.PARTY_STATUS['EXPLORING'], self.COLLAB_MODES['COLLABORATIVE']): {
                # 0: [  # Explore collaboratively                                                                           #2. REMOVED INVALID ACTIONS
                #     {"next_status": self.PARTY_STATUS['FOUND_AGI'], "prob": self.collaborative_agi_prob, 
                #      "reward": self.agi_reward, "resource_change": -self.explore_collaborative_cost,
                #      "effects": [("party_2_status", self.PARTY_STATUS['FOUND_AGI'], 
                #                  lambda: self.party_2_status == self.PARTY_STATUS['EXPLORING']),
                #                 ("party_2_reward", self.agi_reward, 
                #                  lambda: self.party_2_status == self.PARTY_STATUS['EXPLORING'])]},
                #     {"next_status": self.PARTY_STATUS['EXPLORING'], "prob": 1 - self.collaborative_agi_prob, 
                #      "reward": 0, "resource_change": -self.explore_collaborative_cost, "effects": []}
                # ],
                1: [  # Retreat
                    {"next_status": self.PARTY_STATUS['RETREATED'], "prob": 1.0, 
                        "reward": self.retreat_reward, "resource_change": self.retreat_reward, "effects": []}
                ],
                2: [  # Explore collaboratively
                    {"next_status": self.PARTY_STATUS['FOUND_AGI'], "prob": self.collaborative_agi_prob, 
                        "reward": self.agi_reward, "resource_change": -self.explore_collaborative_cost,
                        "effects": [("party_2_status", self.PARTY_STATUS['FOUND_AGI'], 
                                    lambda: self.party_2_status == self.PARTY_STATUS['EXPLORING']),
                                ("party_2_reward", self.agi_reward, 
                                    lambda: self.party_2_status == self.PARTY_STATUS['EXPLORING'])]},
                    {"next_status": self.PARTY_STATUS['EXPLORING'], "prob": 1 - self.collaborative_agi_prob, 
                        "reward": -5, "resource_change": -self.explore_collaborative_cost, "effects": []}                      #3. CHANGED REWARD FROM 0 TO -5
                ],
                3: [  # Break collaboration, COLLABORATIVE CASE ALREADY HANDLED EARLIER
                    {"next_status": self.PARTY_STATUS['EXPLORING'], "prob": 1.0, 
                        "reward": 0, "resource_change": 0, "effects": [("party_1_broke_collab", True, lambda: True)]}
                ]
            },
            (self.PARTY_STATUS['EXPLORING'], self.COLLAB_MODES['POST_COLLAB']): {
                0: [  # Explore after collaboration
                    {"next_status": self.PARTY_STATUS['FOUND_AGI'], "prob": self.post_collab_agi_prob, 
                        "reward": self.agi_reward, "resource_change": -self.explore_independent_cost,
                        "effects": [("party_2_reward", self.competition_penalty, 
                                    lambda: self.party_2_status == self.PARTY_STATUS['EXPLORING'])]},
                    {"next_status": self.PARTY_STATUS['EXPLORING'], "prob": 1 - self.post_collab_agi_prob, 
                        "reward": -1, "resource_change": -self.explore_independent_cost, "effects": []}                        #1. CHANGED REWARD FROM 0 TO -1
                ],
                1: [  # Retreat
                    {"next_status": self.PARTY_STATUS['RETREATED'], "prob": 1.0, 
                        "reward": self.retreat_reward, "resource_change": self.retreat_reward, "effects": []}
                ],
                # 2: [  # Invalid in this state 
                #     {"next_status": self.PARTY_STATUS['EXPLORING'], "prob": 1.0, 
                #      "reward": 0, "resource_change": 0, "effects": []}  # Actual collab handled separately
                # ],
                # 3: [  # Invalid in this state                                                                                 #2. REMOVED INVALID ACTIONS
                #     {"next_status": self.PARTY_STATUS['EXPLORING'], "prob": 1.0, 
                #      "reward": -5, "resource_change": 0, "effects": []}  # Penalty for invalid action
                # ]
            }
        }

        # Define similar transitions for party 2 (could be identical or asymmetric)
        self.party_2_transitions = self.party_1_transitions.copy()  # Deep copy if needed

        # Define joint collaboration transitions
        self.collaboration_transitions = {
            self.COLLAB_MODES['INDEPENDENT']: {
                (2, 2): {"next_mode": self.COLLAB_MODES['COLLABORATIVE'], "resource_change": -self.collaboration_initiation_cost}
            },
            self.COLLAB_MODES['COLLABORATIVE']: {
                (2, 2): {"next_mode": self.COLLAB_MODES['COLLABORATIVE'], "resource_change": -5},
                (3, 2): {"next_mode": self.COLLAB_MODES['POST_COLLAB'], "resource_change": -1},
                (2, 3): {"next_mode": self.COLLAB_MODES['POST_COLLAB'], "resource_change": -1},
                (3, 3): {"next_mode": self.COLLAB_MODES['POST_COLLAB'], "resource_change": -1},
                # # Any other action combination breaks collaboration
                # "default": {"next_mode": self.COLLAB_MODES['POST_COLLAB'], "resource_change": 0}
            },
            # self.COLLAB_MODES['POST_COLLAB']: {
            #     (2, 2): {"next_mode": self.COLLAB_MODES['COLLABORATIVE'], "resource_change": -self.collaboration_initiation_cost}
            # }
        }

    def _get_observation(self):
        """Return the current observation (state)."""
        return (
            self.collab_mode,
            self.party_1_status,
            self.party_2_status,
            np.array([self.party_1_resources], dtype=np.float32),
            np.array([self.party_2_resources], dtype=np.float32),
        )

    def _is_terminal(self):
        """Check if the episode has reached a terminal state."""
        return (self.party_1_status != self.PARTY_STATUS['EXPLORING'] or 
                self.party_2_status != self.PARTY_STATUS['EXPLORING'] or 
                self.current_step >= self.max_steps or
                self.party_1_resources <= 0 or 
                self.party_2_resources <= 0)

    def _get_terminal_info(self):
        """Gather information about how the episode ended."""
        info = {}
        
        if self.party_1_resources <= 0:
            info["bankrupt"] = "party_1"
        if self.party_2_resources <= 0:
            info["bankrupt"] = "party_2" if "bankrupt" not in info else "both"
        
        if self.current_step >= self.max_steps:
            info["timeout"] = True
            
        if self.party_1_status == self.PARTY_STATUS['FOUND_AGI'] and self.party_2_status == self.PARTY_STATUS['FOUND_AGI']:
            info["winner"] = "both"
        elif self.party_1_status == self.PARTY_STATUS['FOUND_AGI']:
            info["winner"] = "party_1"
        elif self.party_2_status == self.PARTY_STATUS['FOUND_AGI']:
            info["winner"] = "party_2"
        
        if "winner" not in info and "bankrupt" not in info and not info.get("timeout", False):
            # Someone retreated
            if self.party_1_status == self.PARTY_STATUS['RETREATED'] and self.party_2_status == self.PARTY_STATUS['RETREATED']:
                info["both_retreated"] = True
            elif self.party_1_status == self.PARTY_STATUS['RETREATED']:
                info["retreated"] = "party_1"
            elif self.party_2_status == self.PARTY_STATUS['RETREATED']:
                info["retreated"] = "party_2"
                
        return info

    # def _process_collaboration_actions(self, action_1, action_2):
    #     """Process collaboration dynamics based on both parties' actions.
    #         HERE, True and False JUST INDICATE WHETHER WE MADE SOME CHANGE BASED ON THEIR STATE, AND COLLABORATION-RELATED ACTION OR NOT. 
    #         WE COULD HAVE AVOIDED TRUE, AND FALSE ALTOGETHER"""
        
    #     # Starting collaboration requires both parties to choose action 2
    #     if (self.collab_mode != self.COLLAB_MODES['COLLABORATIVE'] and 
    #         action_1 == 2 and action_2 == 2):
            
    #         # Both parties agree to collaborate
    #         self.collab_mode = self.COLLAB_MODES['COLLABORATIVE']
    #         self.party_1_resources -= self.collaboration_initiation_cost
    #         self.party_2_resources -= self.collaboration_initiation_cost
    #         return True
            
    #     # Breaking collaboration happens if either party chooses action 3
    #     elif (self.collab_mode == self.COLLAB_MODES['COLLABORATIVE'] and 
    #           (action_1 == 3 or action_2 == 3)):
            
    #         # Collaboration is broken
    #         self.collab_mode = self.COLLAB_MODES['POST_COLLAB']
            
    #         # Record who broke collaboration
    #         if action_1 == 3:
    #             self.party_1_broke_collab = True
    #         if action_2 == 3:
    #             self.party_2_broke_collab = True
    #         return True
            
    #     # Continuing collaboration requires both to choose action 2
    #     elif (self.collab_mode == self.COLLAB_MODES['COLLABORATIVE'] and 
    #           not (action_1 == 2 and action_2 == 2)):
            
    #         # Collaboration ends (not actively broken, but not continued)
    #         self.collab_mode = self.COLLAB_MODES['POST_COLLAB']
    #         return True
            
    #     return False
    
    def _process_collaboration_actions(self, action_1, action_2):
        collab_action_key = (action_1, action_2)
        
        # Apply collaboration transition if defined for this action pair
        if self.collab_mode in self.collaboration_transitions:
            collab_state_transitions = self.collaboration_transitions[self.collab_mode]
            
            if collab_action_key in collab_state_transitions:
                transition = collab_state_transitions[collab_action_key]
                
                # Apply collaboration transition
                old_collab_mode = self.collab_mode
                self.collab_mode = transition["next_mode"]
                
                # Apply resource changes for collaboration transitions
                if transition["resource_change"] != 0:
                    self.party_1_resources += transition["resource_change"]
                    self.party_2_resources += transition["resource_change"]
                
                # Record collaboration break
                if old_collab_mode == self.COLLAB_MODES['COLLABORATIVE'] and self.collab_mode == self.COLLAB_MODES['POST_COLLAB']:
                    if action_1 == 3:
                        self.party_1_broke_collab = True
                    if action_2 == 3:
                        self.party_2_broke_collab = True

    def _get_agi_probability(self, party_id):
        """Get the probability of finding AGI based on collaboration mode."""
        if self.collab_mode == self.COLLAB_MODES['INDEPENDENT']:
            return self.independent_agi_prob
        elif self.collab_mode == self.COLLAB_MODES['COLLABORATIVE']:
            return self.collaborative_agi_prob
        else:  # POST_COLLAB
            return self.post_collab_agi_prob
        
    def _process_party1_actions(self, action_1, reward_1, reward_2):
        # Process party 1's action if not in terminal state
        if self.party_1_status == self.PARTY_STATUS['EXPLORING']:
            state_key = (self.party_1_status, self.collab_mode)
            
            if state_key in self.party_1_transitions and action_1 in self.party_1_transitions[state_key]:
                transitions = self.party_1_transitions[state_key][action_1]
                
                # Select transition based on probabilities
                probs = [t["prob"] for t in transitions]
                transition_idx = np.random.choice(len(transitions), p=probs)
                transition = transitions[transition_idx]
                
                # Apply transition
                self.party_1_status = transition["next_status"]
                reward_1 += transition["reward"]
                self.party_1_resources += transition["resource_change"]
                
                # Apply side effects
                for effect in transition["effects"]:
                    target, value, condition = effect
                    if condition():
                        if target == "party_2_reward":
                            reward_2 += value
                        elif target == "party_2_status":
                            self.party_2_status = value
                        elif target == "party_1_broke_collab":
                            self.party_1_broke_collab = value
    
    def _process_party2_actions(self, action_2, reward_1, reward_2):
        # Process party 2's action if not in terminal state and if party 1's action didn't
        # already cause party 2 to find AGI through collaboration
        if self.party_2_status == self.PARTY_STATUS['EXPLORING']:
            state_key = (self.party_2_status, self.collab_mode)
            
            if state_key in self.party_2_transitions and action_2 in self.party_2_transitions[state_key]:
                transitions = self.party_2_transitions[state_key][action_2]
                
                # Select transition based on probabilities
                probs = [t["prob"] for t in transitions]
                transition_idx = np.random.choice(len(transitions), p=probs)
                transition = transitions[transition_idx]
                
                # Apply transition
                self.party_2_status = transition["next_status"]
                reward_2 += transition["reward"]
                self.party_2_resources += transition["resource_change"]
                
                # Apply side effects
                for effect in transition["effects"]:
                    target, value, condition = effect
                    if condition():
                        if target == "party_1_reward":
                            reward_1 += value
                        elif target == "party_1_status":
                            self.party_1_status = value
                        elif target == "party_2_broke_collab":
                            self.party_2_broke_collab = value

    def step(self, action):
        """Take a step in the environment with actions from both parties."""
        """Take a step using transition matrices instead of if-else logic."""
        action_1, action_2 = action
        
        # Validate actions
        assert 0 <= action_1 < 4, f"Invalid action for party 1: {action_1}"
        assert 0 <= action_2 < 4, f"Invalid action for party 2: {action_2}"
        
        # If already in terminal state, return without changes
        if self._is_terminal():
            return self._get_observation(), (0, 0), True, False, self._get_terminal_info()
        
        # Process collaboration transitions first
        self._process_collaboration_actions(action_1, action_2)
        
        reward_1, reward_2 = 0, 0
        info = {}

        self._process_party1_actions(action_1, reward_1, reward_2)
        self._process_party2_actions(action_2, reward_1, reward_2)

        # Increment step counter
        self.current_step += 1
        
        # Determine if episode has ended
        done = self._is_terminal()
        
        # Gather additional info for terminal states
        if done:
            info.update(self._get_terminal_info())
        
        return self._get_observation(), (reward_1, reward_2), done, False, info



    def reset(self, seed=None, options=None):
        """Reset the environment to initial state."""
        if seed is not None:
            np.random.seed(seed)
            
        self.collab_mode = self.COLLAB_MODES['INDEPENDENT']
        self.party_1_status = self.PARTY_STATUS['EXPLORING']
        self.party_2_status = self.PARTY_STATUS['EXPLORING']
        self.party_1_broke_collab = False
        self.party_2_broke_collab = False
        self.party_1_resources = 100
        self.party_2_resources = 100
        self.current_step = 0
        
        return self._get_observation(), {}

    def close(self):
        """Clean up resources."""
        pass

# Testing code below

In [31]:
import numpy as np
import gymnasium as gym
from gymnasium import spaces
import matplotlib.pyplot as plt
import random
from tqdm import tqdm
import time

# First, make sure your CompetitiveAGIEnv class is defined correctly
# Then, run these tests:

def test_basic_environment():
    """Test basic environment functionality."""
    env = CompetitiveAGIEnv()
    obs, _ = env.reset()
    
    print("Initial Observation:", obs)
    print("Action Space:", env.action_space)
    print("Observation Space:", env.observation_space)
    
    # Test a single step with random actions
    action = (random.randint(0, 3), random.randint(0, 3))
    next_obs, rewards, done, _, info = env.step(action)
    
    print(f"\nTook action: {action}")
    print("Next Observation:", next_obs)
    print("Rewards:", rewards)
    print("Done:", done)
    print("Info:", info)
    
    # Reset and run a full episode with random actions
    obs, _ = env.reset()
    done = False
    total_steps = 0
    
    while not done and total_steps < 100:
        action = (random.randint(0, 3), random.randint(0, 3))
        obs, rewards, done, _, info = env.step(action)
        total_steps += 1
    
    print(f"\nEpisode ended after {total_steps} steps")
    print("Final Observation:", obs)
    print("Final Info:", info)


def test_collaboration_dynamics():
    """Test collaboration-specific dynamics."""
    env = CompetitiveAGIEnv()
    obs, _ = env.reset()
    
    # Test initiating collaboration
    print("Testing collaboration initiation...")
    action = (2, 2)  # Both parties choose to collaborate
    next_obs, rewards, done, _, info = env.step(action)
    
    print(f"Collaboration Mode: {next_obs[0]}")
    print(f"Resources P1: {next_obs[3][0]}, P2: {next_obs[4][0]}")
    
    if next_obs[0] == env.COLLAB_MODES['COLLABORATIVE']:
        print("✓ Collaboration successfully initiated")
    else:
        print("✗ Collaboration failed to initiate")
    
    # Test collaborative exploration
    if not done:
        action = (0, 0)  # Both parties explore while in collaboration
        next_obs, rewards, done, _, info = env.step(action)
        print("\nBoth parties explored while collaborating:")
        print(f"Resources P1: {next_obs[3][0]}, P2: {next_obs[4][0]}")
    
    # Test breaking collaboration
    env.reset()
    # First initiate collaboration
    env.step((2, 2))
    
    # Then break it
    print("\nTesting collaboration breaking...")
    action = (3, 0)  # Party 1 breaks collaboration
    next_obs, rewards, done, _, info = env.step(action)
    
    print(f"Collaboration Mode: {next_obs[0]}")
    if next_obs[0] == env.COLLAB_MODES['POST_COLLAB']:
        print("✓ Collaboration successfully broken")
    else:
        print("✗ Collaboration failed to break")
    
    # Test if broke_collab flag is set
    print(f"Party 1 broke collab: {env.party_1_broke_collab}")
    print(f"Party 2 broke collab: {env.party_2_broke_collab}")

# Run the tests
if __name__ == "__main__":
    print("=== Basic Environment Test ===")
    test_basic_environment()
    
    print("\n=== Collaboration Dynamics Test ===")
    test_collaboration_dynamics()

=== Basic Environment Test ===
Initial Observation: (0, 0, 0, array([100.], dtype=float32), array([100.], dtype=float32))
Action Space: Tuple(Discrete(4), Discrete(4))
Observation Space: Tuple(Discrete(3), Discrete(3), Discrete(3), Box(0.0, inf, (1,), float32), Box(0.0, inf, (1,), float32))

Took action: (2, 2)
Next Observation: (1, 0, 0, array([92.], dtype=float32), array([92.], dtype=float32))
Rewards: (0, 0)
Done: False
Info: {}

Episode ended after 6 steps
Final Observation: (0, 0, 1, array([97.], dtype=float32), array([110.], dtype=float32))
Final Info: {'retreated': 'party_2'}

=== Collaboration Dynamics Test ===
Testing collaboration initiation...
Collaboration Mode: 1
Resources P1: 92.0, P2: 92.0
✓ Collaboration successfully initiated

Both parties explored while collaborating:
Resources P1: 92.0, P2: 92.0

Testing collaboration breaking...
Collaboration Mode: 1
✗ Collaboration failed to break
Party 1 broke collab: True
Party 2 broke collab: False
