# Imports and Config

In [1]:
import os
from typing import Optional, Dict, Tuple
import numpy as np
import warnings

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque
import random

from scipy.optimize import minimize_scalar
import pandas as pd

SEED = 99

os.makedirs("results", exist_ok=True)
torch.cuda.is_available()

True

# `AR1_Shock`

In [2]:
class AR1_Shock:
    def __init__(self, rho, sigma_eta, seed=None):
        self.rho = rho
        self.sigma_eta = sigma_eta
        self.rng = np.random.RandomState(seed)
        self.current = 0.0

    def reset(self):
        self.current = 0.0

    def generate_next(self):
        eta = self.rng.normal(0, self.sigma_eta)
        self.current = self.rho * self.current + eta
        return self.current


# `TheoreticalBenchmarks`

In [3]:
class TheoreticalBenchmarks:
    """Calculate theoretical Nash and Monopoly prices/profits for market models"""
    
    def __init__(self, seed=None):
        self.seed = seed
        if seed is not None:
            np.random.seed(seed)
    
    def calculate_logit_benchmarks(self, shock_cfg=None):
        """
        Calculate Nash and Monopoly benchmarks for Logit model
        With shocks: Uses Monte Carlo integration
        Without shocks: Uses analytical formulas
        """
        # Model parameters
        a = 2.0
        c = 1.0
        mu = 0.25
        a0 = 0.0
        
        if shock_cfg is None or not shock_cfg.get('enabled', False):
            # No shocks - use standard benchmarks
            p_N = 1.473
            p_M = 1.925
            
            # Calculate profits at these prices
            exp_N = np.exp((a - p_N) / mu)
            exp_outside = np.exp(a0 / mu)
            q_N = exp_N / (2 * exp_N + exp_outside)
            E_pi_N = (p_N - c) * q_N
            
            exp_M = np.exp((a - p_M) / mu)
            q_M = exp_M / (2 * exp_M + exp_outside)
            E_pi_M = (p_M - c) * q_M
            
            return {
                'p_N': p_N,
                'E_pi_N': E_pi_N,
                'p_M': p_M,
                'E_pi_M': E_pi_M,
                'shock_enabled': False
            }
        
        # With shocks - use Monte Carlo
        scheme_params = {
            'A': {'rho': 0.3, 'sigma_eta': 0.5},
            'B': {'rho': 0.95, 'sigma_eta': 0.05},
            'C': {'rho': 0.9, 'sigma_eta': 0.3}
        }
        
        scheme = shock_cfg.get('scheme', 'A')
        params = scheme_params[scheme.upper()]
        rho = params['rho']
        sigma_eta = params['sigma_eta']
        
        # Unconditional variance of shocks
        sigma2 = sigma_eta**2 / (1 - rho**2)
        sigma = np.sqrt(sigma2)
        
        # Monte Carlo samples
        N = 10000
        shock_mode = shock_cfg.get('mode', 'independent')
        
        if shock_mode == 'independent':
            xi_samples = np.random.normal(0, sigma, (N, 2))
        else:  # correlated
            xi_samples = np.random.normal(0, sigma, N)
        
        # Expected profit functions
        def E_pi1(p1, p2):
            if shock_mode == 'independent':
                exps1 = np.exp((a - p1 + xi_samples[:, 0]) / mu)
                exps2 = np.exp((a - p2 + xi_samples[:, 1]) / mu)
            else:
                exps1 = np.exp((a - p1 + xi_samples) / mu)
                exps2 = np.exp((a - p2 + xi_samples) / mu)
            den = exps1 + exps2 + np.exp(a0 / mu)
            qs = exps1 / den
            return np.mean((p1 - c) * qs)
        
        def E_pi2(p1, p2):
            if shock_mode == 'independent':
                exps1 = np.exp((a - p1 + xi_samples[:, 0]) / mu)
                exps2 = np.exp((a - p2 + xi_samples[:, 1]) / mu)
            else:
                exps1 = np.exp((a - p1 + xi_samples) / mu)
                exps2 = np.exp((a - p2 + xi_samples) / mu)
            den = exps1 + exps2 + np.exp(a0 / mu)
            qs = exps2 / den
            return np.mean((p2 - c) * qs)
        
        # Nash equilibrium
        def best_response(p_j):
            def neg_E_pi(p_i):
                return -E_pi1(p_i, p_j)
            res = minimize_scalar(neg_E_pi, bounds=(c, c + 5), method='bounded')
            return res.x
        
        p_guess = 1.5
        for _ in range(50):
            p_guess = best_response(p_guess)
        p_N = p_guess
        E_pi_N = E_pi1(p_N, p_N)
        
        # Monopoly
        def neg_E_joint(p):
            return -(E_pi1(p, p) + E_pi2(p, p))
        
        res_M = minimize_scalar(neg_E_joint, bounds=(c, c + 5), method='bounded')
        p_M = res_M.x
        E_pi_M = E_pi1(p_M, p_M)
        
        return {
            'p_N': p_N,
            'E_pi_N': E_pi_N,
            'p_M': p_M,
            'E_pi_M': E_pi_M,
            'shock_enabled': True,
            'scheme': scheme,
            'mode': shock_mode,
            'sigma': sigma
        }
    
    def calculate_hotelling_benchmarks(self, shock_cfg=None):
        """
        Calculate Nash and Monopoly benchmarks for Hotelling model
        Shocks don't affect expected benchmarks (linearity in net shock)
        """
        # Model parameters
        c = 0.0
        v_bar = 1.75
        theta = 1.0
        
        # Standard benchmarks (unchanged with shocks)
        p_N = 1.00
        p_M = 1.25
        
        # Calculate profits
        # At Nash: both firms charge 1, split market equally
        q_N = 0.5
        E_pi_N = p_N * q_N
        
        # At Monopoly: both charge 1.25, split market equally
        q_M = 0.5
        E_pi_M = p_M * q_M
        
        return {
            'p_N': p_N,
            'E_pi_N': E_pi_N,
            'p_M': p_M,
            'E_pi_M': E_pi_M,
            'shock_enabled': shock_cfg is not None and shock_cfg.get('enabled', False),
            'note': 'Shocks do not affect expected benchmarks for Hotelling'
        }
    
    def calculate_linear_benchmarks(self, shock_cfg=None):
        """
        Calculate Nash and Monopoly benchmarks for Linear model
        Shocks don't affect expected benchmarks (linearity in shocks)
        """
        # Model parameters
        c = 0.0
        a_bar = 1.0
        d = 0.25
        
        # Standard benchmarks (unchanged with shocks)
        p_N = 0.4286
        p_M = 0.5
        
        # Calculate profits
        denominator = 1 - d**2
        
        # At Nash
        q_N = (a_bar - p_N - d * (a_bar - p_N)) / denominator
        E_pi_N = p_N * q_N
        
        # At Monopoly
        q_M = a_bar - p_M
        E_pi_M = p_M * q_M
        
        return {
            'p_N': p_N,
            'E_pi_N': E_pi_N,
            'p_M': p_M,
            'E_pi_M': E_pi_M,
            'shock_enabled': shock_cfg is not None and shock_cfg.get('enabled', False),
            'note': 'Shocks do not affect expected benchmarks for Linear'
        }
    
    def calculate_all_benchmarks(self, shock_cfg=None):
        """Calculate benchmarks for all three models"""
        results = {
            'logit': self.calculate_logit_benchmarks(shock_cfg),
            'hotelling': self.calculate_hotelling_benchmarks(shock_cfg),
            'linear': self.calculate_linear_benchmarks(shock_cfg)
        }
        return results
    
    def generate_benchmark_table(self, shock_configs):
        """
        Generate a comprehensive table of benchmarks for different configurations
        
        Args:
            shock_configs: List of shock configurations to test
        
        Returns:
            pandas DataFrame with benchmarks
        """
        data = []
        
        for config in shock_configs:
            config_name = config.get('name', 'No Config')
            benchmarks = self.calculate_all_benchmarks(config)
            
            for model, bench in benchmarks.items():
                data.append({
                    'Configuration': config_name,
                    'Model': model.upper(),
                    'Nash Price': round(bench['p_N'], 4),
                    'Nash Profit': round(bench['E_pi_N'], 4),
                    'Monopoly Price': round(bench['p_M'], 4),
                    'Monopoly Profit': round(bench['E_pi_M'], 4),
                    'Shock Enabled': bench['shock_enabled']
                })
        
        return pd.DataFrame(data)


# `MarketEnv` Class

In [4]:
class MarketEnv:
    """Base Market environment supporting all three models with optional shocks"""
   
    def __init__(
        self,
        market_model: str = "logit",
        shock_cfg: Optional[Dict] = None,
        n_firms: int = 2,
        horizon: int = 10000,
        seed: Optional[int] = None
    ):
        self.model = market_model.lower()
        self.n_firms = n_firms
        self.N = 15 # Number of discrete prices
        self.horizon = horizon
        self.t = 0
        self.rng = np.random.RandomState(seed)
       
        # Model-specific parameters (verified against PDFs)
        if self.model == "logit":
            self.c = 1.0
            self.a = 2.0
            self.a0 = 0.0
            self.mu = 0.25
            self.P_N = 1.473
            self.P_M = 1.925
        elif self.model == "hotelling":
            self.c = 0.0
            self.v_bar = 1.75
            self.theta = 1.0
            self.P_N = 1.00
            self.P_M = 1.25
        elif self.model == "linear":
            self.c = 0.0
            self.a_bar = 1.0
            self.d = 0.25
            
            # Nash (Cournot duopoly)
            self.P_N = self.a_bar * (1 - self.d) / (2 - self.d)  # 0.4286
            q_N = self.P_N  # By symmetry at equilibrium
            pi_N = self.P_N * q_N  # 0.1959
            
            # Monopoly (single firm, no competition)
            self.P_M = self.a_bar / 2  # 0.5
            q_M = self.a_bar - self.P_M  # 0.5 (residual demand, NOT Cournot!)
            pi_M = self.P_M * q_M  # 0.25
            
            # Store for Delta calculations
            self.pi_N_linear = pi_N  # 0.1959
            self.pi_M_linear = pi_M  # 0.25
            
            # Verify profit range is healthy
            profit_range = pi_M - pi_N  # Should be ~0.05
            if profit_range < 0.04:
                import warnings
                warnings.warn(f"Linear model profit range too small: {profit_range:.4f}")
        else:
            raise ValueError(f"Unknown model: {self.model}")
       
        # Construct price grid
        span = self.P_M - self.P_N
        self.price_grid = np.linspace(
            self.P_N - 0.15 * span,
            self.P_M + 0.15 * span,
            self.N
        )
       
        # Initialize shock configuration
        self.shock_cfg = shock_cfg or {}
        self.shock_enabled = self.shock_cfg.get("enabled", False)
       
        if self.shock_enabled:
            self.shock_mode = self.shock_cfg.get("mode", "correlated")
            scheme = self.shock_cfg.get("scheme", None)
           
            # Get AR(1) parameters (verified against PDFs)
            if scheme:
                scheme_params = {
                    'A': {'rho': 0.3, 'sigma_eta': 0.5},
                    'B': {'rho': 0.95, 'sigma_eta': 0.05},
                    'C': {'rho': 0.9, 'sigma_eta': 0.3}
                }
                if scheme.upper() in scheme_params:
                    params = scheme_params[scheme.upper()]
                    self.rho = params['rho']
                    self.sigma_eta = params['sigma_eta']
                else:
                    raise ValueError(f"Unknown scheme: {scheme}")
            else:
                self.rho = self.shock_cfg.get('rho', 0.9)
                self.sigma_eta = self.shock_cfg.get('sigma_eta', 0.15)
           
            # Initialize shock generators
            if self.shock_mode == "independent":
                self.shock_generators = [
                    AR1_Shock(self.rho, self.sigma_eta, seed=seed+i if seed else None)
                    for i in range(self.n_firms)
                ]
            else: # correlated
                self.shock_generator = AR1_Shock(self.rho, self.sigma_eta, seed=seed)
           
            self.current_shocks = np.zeros(self.n_firms)
       
        self.reset()
   
    def reset(self):
        """Reset environment - returns state as actual prices"""
        self.t = 0
       
        # Reset shocks
        if self.shock_enabled:
            if self.shock_mode == "independent":
                for gen in self.shock_generators:
                    gen.reset()
            else:
                self.shock_generator.reset()
            self.current_shocks = np.zeros(self.n_firms)
       
        # Initialize prices at middle of grid
        mid_idx = self.N // 2
        self.current_price_idx = np.array([mid_idx] * self.n_firms)
       
        return self._get_state()
   
    def _get_state(self):
        """Get current state as actual prices (numpy array)"""
        return self.price_grid[self.current_price_idx].copy()
    
    def _get_state_indices(self):
        """Get current state as price indices (for Q-learning compatibility)"""
        return tuple(self.current_price_idx)
   
    def _generate_shocks(self):
        """Generate next period shocks"""
        if not self.shock_enabled:
            return np.zeros(self.n_firms)
       
        if self.shock_mode == "independent":
            shocks = np.array([gen.generate_next() for gen in self.shock_generators])
        else:
            shock = self.shock_generator.generate_next()
            shocks = np.array([shock] * self.n_firms)
       
        return shocks
   
    def get_current_shocks(self):
        """Get current shocks (for PSO evaluation)"""
        return self.current_shocks.copy() if self.shock_enabled else np.zeros(self.n_firms)
   
    def calculate_demand_and_profit(self, prices: np.ndarray, shocks: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """Calculate demand and profit given prices and shocks"""
        if self.model == "logit":
            return self._logit_demand_profit(prices, shocks)
        elif self.model == "hotelling":
            return self._hotelling_demand_profit(prices, shocks)
        elif self.model == "linear":
            return self._linear_demand_profit(prices, shocks)
   
    def _logit_demand_profit(self, prices: np.ndarray, shocks: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """Logit demand - shocks affect variance but not Nash equilibrium
        
        Critical: E[demand | shocks] = demand(no shocks) since E[ε] = 0
        Nash price must remain constant regardless of shock scheme
        """
        # Base utilities WITHOUT shocks (defines Nash equilibrium)
        base_utilities = (self.a - prices) / self.mu
        
        # Add shocks OUTSIDE mu scaling to preserve E[utility]
        realized_utilities = base_utilities + shocks
        
        # Numerical stability
        max_util = np.max(realized_utilities)
        exp_utils = np.exp(realized_utilities - max_util)
        exp_outside = np.exp(self.a0/self.mu - max_util)
        
        denominator = np.sum(exp_utils) + exp_outside
        demands = exp_utils / denominator
        profits = (prices - self.c) * demands
        
        return demands, profits
   
    def _hotelling_demand_profit(self, prices: np.ndarray, shocks: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """Hotelling demand with net shock affecting boundary (verified against PDF)"""
        p1, p2 = prices[0], prices[1]
        epsilon_net = shocks[0] - shocks[1] if self.n_firms == 2 else 0
       
        x_hat = 0.5 + (p2 - p1 + epsilon_net) / (2 * self.theta)
        x_hat = np.clip(x_hat, 0, 1)
       
        q1 = x_hat
        q2 = 1 - x_hat
        demands = np.array([q1, q2])
        profits = prices * demands
       
        return demands, profits
   
    def _linear_demand_profit(self, prices: np.ndarray, shocks: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        """Linear demand with additive shocks (verified against PDF)"""
        a_shocked = self.a_bar + shocks
        denominator = 1 - self.d**2
       
        q1 = ((a_shocked[0] - prices[0]) - self.d * (a_shocked[1] - prices[1])) / denominator
        q2 = ((a_shocked[1] - prices[1]) - self.d * (a_shocked[0] - prices[0])) / denominator
       
        q1 = max(0, q1)
        q2 = max(0, q2)
        demands = np.array([q1, q2])
        profits = prices * demands
       
        return demands, profits
   
    def step(self, action_indices):
        """Execute one step - returns (state_prices, profits, done, info)
        
        Args:
            action_indices: Array of price indices for each agent
            
        Returns:
            next_state: Numpy array of actual prices (not indices)
            profits: Numpy array of profits for each agent
            done: Episode termination flag
            info: Dictionary with prices, demands, and shocks
        """
        action_indices = np.asarray(action_indices, dtype=int)
       
        # Update price indices
        self.current_price_idx = action_indices
        prices = self.price_grid[self.current_price_idx]
       
        # Generate shocks for this period
        self.current_shocks = self._generate_shocks()
       
        # Calculate demands and profits
        demands, profits = self.calculate_demand_and_profit(prices, self.current_shocks)
       
        # Update time
        self.t += 1
       
        # Get next state (actual prices, not indices)
        next_state = self._get_state()
       
        # Episode termination
        done = False
       
        # Info dictionary
        info = {
            'prices': prices.copy(),
            'demands': demands.copy(),
            'shocks': self.current_shocks.copy(),
            'price_indices': self.current_price_idx.copy()  # Include indices for Q-learning
        }
       
        return next_state, profits, done, info

class MarketEnvContinuous(MarketEnv):
    """Market environment that accepts both discrete indices and continuous prices as actions"""
    
    def step(self, actions):
        """Execute one step with flexible action handling
        
        Args:
            actions: List/array where each element can be:
                     - int/np.integer: discrete price index
                     - float: continuous price value
                     
        Returns:
            next_state: Numpy array of actual prices
            profits: Numpy array of profits
            done: Episode termination flag
            info: Dictionary with prices, demands, shocks, and indices
        """
        prices = []
        indices = []
        
        for a in actions:
            if isinstance(a, (int, np.integer)):
                # Discrete action: use price from grid
                prices.append(self.price_grid[a])
                indices.append(a)
            else:
                # Continuous action: use actual price value
                price = float(a)
                prices.append(price)
                # Find closest grid index for tracking
                indices.append(np.argmin(np.abs(self.price_grid - price)))
        
        prices = np.array(prices)
        self.current_price_idx = np.array(indices)
        
        # Generate shocks
        self.current_shocks = self._generate_shocks()
        
        # Calculate demands and profits
        demands, profits = self.calculate_demand_and_profit(prices, self.current_shocks)
        
        # Update time
        self.t += 1
        
        # Get next state (actual prices)
        next_state = self._get_state()
        
        # Episode termination
        done = False
        
        # Info dictionary
        info = {
            'prices': prices.copy(),
            'demands': demands.copy(),
            'shocks': self.current_shocks.copy(),
            'price_indices': self.current_price_idx.copy()
        }
        
        return next_state, profits, done, info

    def get_state_prices(self):
        """Get current state as actual prices (same as _get_state)"""
        return self._get_state()


# `DQNAgent`

In [5]:
class DQNAgent:
    """
    Deep Q-Network Agent implementation following Mnih et al. (2015)
    and the dynamic pricing paper specifications.
   
    Key features:
    - Uses continuous state representation (actual prices, not indices)
    - Implements Double DQN with separate target network
    - Proper experience replay with adequate buffer size
    - Gradient clipping and Huber loss for stability
    - GPU acceleration for faster training in pricing competitions
    """
   
    def __init__(self, agent_id, state_dim=2, action_dim=15, seed=None):
        """
        Initialize DQN Agent.
       
        Args:
            agent_id: Agent identifier (0 or 1)
            state_dim: Dimension of state space (2 for price tuple)
            action_dim: Number of discrete actions (price grid size)
            seed: Random seed for reproducibility
        """
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.agent_id = agent_id
        self.state_dim = state_dim
        self.action_dim = action_dim
       
        # Hyperparameters from papers
        self.gamma = 0.99 # Discount factor
        self.epsilon = 1.0 # Initial exploration rate
        self.epsilon_min = 0.01 # Minimum exploration rate
        self.epsilon_decay = 0.995 # Decay rate per episode
        self.learning_rate = 0.0001 # Adam optimizer learning rate
        self.batch_size = 128 # Minibatch size for training
        self.memory_size = 50000 # Experience replay buffer size
        self.target_update_freq = 500 # Steps between target network updates
       
        # Set random seeds if provided
        if seed is not None:
            torch.manual_seed(seed)
            if self.device.type == 'cuda':
                torch.cuda.manual_seed(seed)
            np.random.seed(seed)
            random.seed(seed)
       
        # For deterministic behavior on GPU
        if self.device.type == 'cuda':
            torch.backends.cudnn.deterministic = True
            torch.backends.cudnn.benchmark = False
       
        # Initialize neural networks
        self.network = self._build_network().to(self.device)
        self.target_network = self._build_network().to(self.device)
       
        # Copy weights to target network
        self.target_network.load_state_dict(self.network.state_dict())
        self.target_network.eval() # Set target network to evaluation mode
       
        # Initialize optimizer
        self.optimizer = optim.Adam(self.network.parameters(), lr=self.learning_rate)
       
        # Initialize experience replay buffer
        self.memory = deque(maxlen=self.memory_size)
       
        # Step counter for target network updates
        self.steps = 0
       
    def _build_network(self):
        """
        Build the neural network architecture.
        Deeper network than original for better representation learning.
        """
        return nn.Sequential(
            nn.Linear(self.state_dim, 128),
            nn.ReLU(),
            nn.Linear(128, 128),
            nn.ReLU(),
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Linear(64, self.action_dim)
        )
   
    def remember(self, state, action, reward, next_state, done):
        """
        Store experience in replay buffer.
       
        Args:
            state: Current state (np.array of prices)
            action: Action taken (index)
            reward: Reward received
            next_state: Next state (np.array of prices)
            done: Episode termination flag
        """
        self.memory.append((state, action, reward, next_state, done))
   
    def select_action(self, state, explore=True):
        """
        Select action using epsilon-greedy policy.
       
        Args:
            state: Current state as np.array([own_price, opponent_price])
            explore: Whether to use exploration (training) or exploitation (evaluation)
       
        Returns:
            action: Selected action index
        """
        # Epsilon-greedy exploration
        if explore and np.random.random() < self.epsilon:
            return np.random.randint(self.action_dim)
       
        # Exploitation: choose best action according to Q-network
        with torch.no_grad():
            state_tensor = torch.FloatTensor(state).unsqueeze(0).to(self.device)
            q_values = self.network(state_tensor)
            return q_values.argmax().item()
   
    def replay(self):
        """
        Train the network on a minibatch sampled from experience replay.
        Implements Double DQN with target network.
        """
        # Need minimum experiences before training
        if len(self.memory) < self.batch_size:
            return
       
        # Sample minibatch from replay buffer
        batch = random.sample(self.memory, self.batch_size)
       
        # Separate batch components
        states = torch.FloatTensor([e[0] for e in batch]).to(self.device)
        actions = torch.LongTensor([e[1] for e in batch]).to(self.device)
        rewards = torch.FloatTensor([e[2] for e in batch]).to(self.device)
        next_states = torch.FloatTensor([e[3] for e in batch]).to(self.device)
        dones = torch.FloatTensor([e[4] for e in batch]).to(self.device)
       
        # Current Q values for taken actions
        current_q_values = self.network(states).gather(1, actions.unsqueeze(1))
       
        # Double DQN implementation
        # Step 1: Use main network to select best actions for next states
        with torch.no_grad():
            next_actions = self.network(next_states).argmax(1)
           
        # Step 2: Use target network to evaluate those actions
        next_q_values = self.target_network(next_states).gather(
            1, next_actions.unsqueeze(1)
        ).squeeze(1)
       
        # Compute targets (Bellman equation)
        targets = rewards + (1 - dones) * self.gamma * next_q_values
       
        # Compute loss (Huber loss for stability)
        loss = F.smooth_l1_loss(current_q_values.squeeze(), targets.detach())
       
        # Optimize
        self.optimizer.zero_grad()
        loss.backward()
       
        # Gradient clipping to prevent exploding gradients
        torch.nn.utils.clip_grad_norm_(self.network.parameters(), max_norm=1.0)
       
        self.optimizer.step()
       
        # Increment step counter
        self.steps += 1
       
        # Periodically update target network
        if self.steps % self.target_update_freq == 0:
            self.update_target_network()
   
    def update_target_network(self):
        """
        Copy weights from main network to target network.
        This stabilizes training by keeping targets fixed for multiple updates.
        """
        self.target_network.load_state_dict(self.network.state_dict())
   
    def update_epsilon(self):
        """
        Decay exploration rate.
        Called at the end of each episode or at regular intervals.
        """
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
   
    def save(self, filepath):
        """
        Save model weights.
       
        Args:
            filepath: Path to save the model
        """
        torch.save({
            'network_state_dict': self.network.state_dict(),
            'target_network_state_dict': self.target_network.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'epsilon': self.epsilon,
            'steps': self.steps
        }, filepath)
   
    def load(self, filepath):
        """
        Load model weights.
       
        Args:
            filepath: Path to load the model from
        """
        checkpoint = torch.load(filepath, map_location=self.device)
        self.network.load_state_dict(checkpoint['network_state_dict'])
        self.target_network.load_state_dict(checkpoint['target_network_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        self.epsilon = checkpoint['epsilon']
        self.steps = checkpoint['steps']


# `DDPGAgent`

In [6]:
class ReplayBuffer:
    """Experience replay buffer for DDPG"""
    def __init__(self, capacity=100000):
        self.buffer = deque(maxlen=capacity)
   
    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))
   
    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return (np.array(states), np.array(actions), np.array(rewards),
                np.array(next_states), np.array(dones))
   
    def __len__(self):
        return len(self.buffer)

class Actor(nn.Module):
    """Actor network for DDPG"""
    def __init__(self, state_dim, action_dim, hidden_dim=400):
        super(Actor, self).__init__()
        self.bn_input = nn.BatchNorm1d(state_dim)
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.bn1 = nn.BatchNorm1d(hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, 300)
        self.bn2 = nn.BatchNorm1d(300)
        self.fc3 = nn.Linear(300, action_dim)
       
        # Initialize weights
        self._init_weights()
   
    def _init_weights(self):
        nn.init.uniform_(self.fc1.weight, -1/np.sqrt(self.fc1.in_features), 1/np.sqrt(self.fc1.in_features))
        nn.init.uniform_(self.fc2.weight, -1/np.sqrt(self.fc2.in_features), 1/np.sqrt(self.fc2.in_features))
        nn.init.uniform_(self.fc3.weight, -3e-3, 3e-3)
   
    def forward(self, state):
        x = self.bn_input(state)
        x = F.relu(self.bn1(self.fc1(x)))
        x = F.relu(self.bn2(self.fc2(x)))
        x = torch.tanh(self.fc3(x))
        return x

class Critic(nn.Module):
    """Critic network for DDPG"""
    def __init__(self, state_dim, action_dim, hidden_dim=400):
        super(Critic, self).__init__()
        self.bn_input = nn.BatchNorm1d(state_dim)
        self.fc1 = nn.Linear(state_dim, hidden_dim)
        self.bn1 = nn.BatchNorm1d(hidden_dim)
        self.fc2 = nn.Linear(hidden_dim + action_dim, 300)
        self.fc3 = nn.Linear(300, 1)
       
        # Initialize weights
        self._init_weights()
   
    def _init_weights(self):
        nn.init.uniform_(self.fc1.weight, -1/np.sqrt(self.fc1.in_features), 1/np.sqrt(self.fc1.in_features))
        nn.init.uniform_(self.fc2.weight, -1/np.sqrt(self.fc2.in_features), 1/np.sqrt(self.fc2.in_features))
        nn.init.uniform_(self.fc3.weight, -3e-3, 3e-3)
   
    def forward(self, state, action):
        x = self.bn_input(state)
        x = F.relu(self.bn1(self.fc1(x)))
        x = torch.cat([x, action], dim=1)
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class OUNoise:
    """Ornstein-Uhlenbeck process for exploration"""
    def __init__(self, action_dim, mu=0.0, theta=0.15, sigma=0.2):
        self.action_dim = action_dim
        self.mu = mu
        self.theta = theta
        self.sigma = sigma
        self.state = np.ones(self.action_dim) * self.mu
        self.reset()
   
    def reset(self):
        self.state = np.ones(self.action_dim) * self.mu
   
    def sample(self):
        dx = self.theta * (self.mu - self.state)
        dx += self.sigma * np.random.randn(self.action_dim)
        self.state += dx
        return self.state.copy()

class DDPGAgent:
    """Deep Deterministic Policy Gradient Agent for pricing competition simulation"""
    def __init__(
        self,
        agent_id,
        state_dim,
        action_dim,
        hidden_dim=400,
        actor_lr=1e-4,
        critic_lr=1e-3,
        gamma=0.99,
        tau=0.001,
        buffer_size=1000000,
        batch_size=64,
        seed=None,
        price_min=0.0,
        price_max=2.0
    ):
        self.agent_id = agent_id
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.tau = tau
        self.batch_size = batch_size
        self.price_min = price_min
        self.price_max = price_max
        
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        if seed is not None:
            torch.manual_seed(seed)
            if self.device.type == 'cuda':
                torch.cuda.manual_seed(seed)
            np.random.seed(seed)
            random.seed(seed)
        
        # For deterministic behavior on GPU
        if self.device.type == 'cuda':
            torch.backends.cudnn.deterministic = True
            torch.backends.cudnn.benchmark = False
        
        # Actor networks
        self.actor = Actor(state_dim, action_dim, hidden_dim).to(self.device)
        self.actor_target = Actor(state_dim, action_dim, hidden_dim).to(self.device)
        self.actor_target.load_state_dict(self.actor.state_dict())
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=actor_lr)
        
        # Critic networks
        self.critic = Critic(state_dim, action_dim, hidden_dim).to(self.device)
        self.critic_target = Critic(state_dim, action_dim, hidden_dim).to(self.device)
        self.critic_target.load_state_dict(self.critic.state_dict())
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=critic_lr, weight_decay=1e-2)
        
        # Replay buffer
        self.replay_buffer = ReplayBuffer(buffer_size)
        
        # Noise process
        self.noise = OUNoise(action_dim)
        
        # Exploration parameters
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
   
    def select_action(self, state, explore=True):
        """Select action using actor network with optional exploration noise"""
        state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        self.actor.eval()
        with torch.no_grad():
            action = self.actor(state).cpu().numpy()[0]
        self.actor.train()
        
        if explore:
            noise = self.noise.sample() * self.epsilon
            action += noise
            action = np.clip(action, -1, 1)
        
        normalized_action = action.copy()  # For remember/replay (normalized [-1,1])
        
        # Scale to [price_min, price_max]
        scaled_price = self.price_min + (self.price_max - self.price_min) * (action[0] + 1) / 2
        
        return scaled_price, normalized_action
   
    def remember(self, state, action, reward, next_state, done):
        """Store experience in replay buffer"""
        self.replay_buffer.push(state, action, reward, next_state, done)
   
    def replay(self):
        """Train on a batch of experiences from replay buffer"""
        if len(self.replay_buffer) < self.batch_size:
            return
       
        # Sample batch
        states, actions, rewards, next_states, dones = self.replay_buffer.sample(self.batch_size)
       
        states = torch.FloatTensor(states).to(self.device)
        actions = torch.FloatTensor(actions).to(self.device)
        rewards = torch.FloatTensor(rewards).unsqueeze(1).to(self.device)
        next_states = torch.FloatTensor(next_states).to(self.device)
        dones = torch.FloatTensor(dones).unsqueeze(1).to(self.device)
       
        # Update critic
        with torch.no_grad():
            next_actions = self.actor_target(next_states)
            target_q = self.critic_target(next_states, next_actions)
            target_q = rewards + (1 - dones) * self.gamma * target_q
       
        current_q = self.critic(states, actions)
        critic_loss = F.mse_loss(current_q, target_q)
       
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()
       
        # Update actor
        actor_loss = -self.critic(states, self.actor(states)).mean()
       
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()
       
        # Soft update target networks
        self._soft_update(self.actor, self.actor_target)
        self._soft_update(self.critic, self.critic_target)
   
    def _soft_update(self, local_model, target_model):
        """Soft update target network parameters: θ_target = τ*θ_local + (1 - τ)*θ_target"""
        for target_param, local_param in zip(target_model.parameters(), local_model.parameters()):
            target_param.data.copy_(self.tau * local_param.data + (1.0 - self.tau) * target_param.data)
   
    def update_epsilon(self):
        """Decay exploration rate"""
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)
   
    def reset_noise(self):
        """Reset the noise process"""
        self.noise.reset()


# Simulation

In [7]:
def run_simulation(model, seed, shock_cfg, benchmarks):
    """Run DDPG vs DDPG simulation"""
    np.random.seed(seed)
    
    env = MarketEnvContinuous(market_model=model, shock_cfg=shock_cfg, seed=seed)
    
    # Get price range for DDPG scaling
    price_min = env.price_grid[0]
    price_max = env.price_grid[-1]
    
    ddpg_agent1 = DDPGAgent(
        agent_id=0,
        state_dim=2,
        action_dim=1,
        seed=seed,
        price_min=price_min,
        price_max=price_max
    )
    ddpg_agent2 = DDPGAgent(
        agent_id=1,
        state_dim=2,
        action_dim=1,
        seed=seed+1000,
        price_min=price_min,
        price_max=price_max
    )
    
    shared_state = env.reset()
    profits_history = []
    prices_history = []
    
    state1 = np.array([shared_state[0], shared_state[1]], dtype=np.float32)
    state2 = np.array([shared_state[1], shared_state[0]], dtype=np.float32)
    
    for t in range(env.horizon):
        # Get continuous prices from DDPG agents
        price1, normalized_action1 = ddpg_agent1.select_action(state1, explore=True)
        price2, normalized_action2 = ddpg_agent2.select_action(state2, explore=True)
        
        # Use continuous prices directly
        actions = [price1, price2]
        shared_next_state, rewards, done, info = env.step(actions)
        
        next_state1 = np.array([shared_next_state[0], shared_next_state[1]], dtype=np.float32)
        next_state2 = np.array([shared_next_state[1], shared_next_state[0]], dtype=np.float32)
        
        # Store normalized actions for replay
        ddpg_agent1.remember(state1, normalized_action1, rewards[0], next_state1, done)
        ddpg_agent1.replay()
        ddpg_agent1.update_epsilon()
        
        ddpg_agent2.remember(state2, normalized_action2, rewards[1], next_state2, done)
        ddpg_agent2.replay()
        ddpg_agent2.update_epsilon()
        
        # Reset noise periodically for better exploration
        if t % 100 == 0:
            ddpg_agent1.reset_noise()
            ddpg_agent2.reset_noise()
        
        state1 = next_state1
        state2 = next_state2
        shared_state = shared_next_state
        
        prices_history.append(info['prices'])
        profits_history.append(rewards)
    
    # Calculate metrics from last 1000 timesteps
    last_prices = np.array(prices_history[-1000:])
    avg_price_ddpg1 = np.mean(last_prices[:, 0])
    avg_price_ddpg2 = np.mean(last_prices[:, 1])
    
    last_profits = np.array(profits_history[-1000:])
    avg_profit_ddpg1 = np.mean(last_profits[:, 0])
    avg_profit_ddpg2 = np.mean(last_profits[:, 1])
    
    p_n = benchmarks['p_N']
    p_m = benchmarks['p_M']
    pi_n = benchmarks['E_pi_N']
    pi_m = benchmarks['E_pi_M']
    
    # Calculate Delta (profit-based collusion metric)
    profit_range = pi_m - pi_n
    if profit_range > 1e-6:
        delta_ddpg1 = (avg_profit_ddpg1 - pi_n) / profit_range
        delta_ddpg2 = (avg_profit_ddpg2 - pi_n) / profit_range
    else:
        delta_ddpg1 = 0
        delta_ddpg2 = 0
    
    # Calculate RPDI (price-based collusion metric)
    price_range = p_m - p_n
    if price_range > 1e-6:
        rpdi_ddpg1 = (avg_price_ddpg1 - p_n) / price_range
        rpdi_ddpg2 = (avg_price_ddpg2 - p_n) / price_range
    else:
        rpdi_ddpg1 = 0
        rpdi_ddpg2 = 0
    
    return {
        'avg_price1': avg_price_ddpg1,
        'avg_price2': avg_price_ddpg2,
        'avg_profit1': avg_profit_ddpg1,
        'avg_profit2': avg_profit_ddpg2,
        'delta1': delta_ddpg1,
        'delta2': delta_ddpg2,
        'rpdi1': rpdi_ddpg1,
        'rpdi2': rpdi_ddpg2,
        'p_n': p_n,
        'p_m': p_m,
        'pi_n': pi_n,
        'pi_m': pi_m
    }


def main():
    # Scheme B: High persistence, low variance
    shock_cfg = {
        'enabled': True,
        'scheme': 'A',
        'rho': 0.3,
        'sigma_eta': 0.5,
        'mode': 'independent'
    }

    
    benchmark_calculator = TheoreticalBenchmarks(seed=SEED)
    
    print("=" * 80)
    print("DDPG vs DDPG - SCHEME B SHOCKS (ρ=0.95, σ_η=0.05)")
    print("=" * 80)
    
    all_benchmarks = benchmark_calculator.calculate_all_benchmarks(shock_cfg)
    
    models = ['logit', 'hotelling', 'linear']
    num_runs = 10
    results = {}
    
    # Store individual run metrics
    run_logs = {model: {'delta1': [], 'delta2': [], 'rpdi1': [], 'rpdi2': [], 
                        'profit1': [], 'profit2': []} for model in models}
    
    for model in models:
        print(f"\n{'='*60}")
        print(f"Model: {model.upper()}")
        print(f"{'='*60}")
        
        model_benchmarks = all_benchmarks[model]
        
        # Print benchmark info
        print(f"  Benchmarks: P_N={model_benchmarks['p_N']:.4f}, P_M={model_benchmarks['p_M']:.4f}")
        print(f"              π_N={model_benchmarks['E_pi_N']:.4f}, π_M={model_benchmarks['E_pi_M']:.4f}")
        print(f"              Profit Range: {model_benchmarks['E_pi_M'] - model_benchmarks['E_pi_N']:.4f}")
        
        for run in range(num_runs):
            seed = SEED + run
            result = run_simulation(model, seed, shock_cfg, model_benchmarks)
            
            # Log individual run results
            run_logs[model]['delta1'].append(result['delta1'])
            run_logs[model]['delta2'].append(result['delta2'])
            run_logs[model]['rpdi1'].append(result['rpdi1'])
            run_logs[model]['rpdi2'].append(result['rpdi2'])
            run_logs[model]['profit1'].append(result['avg_profit1'])
            run_logs[model]['profit2'].append(result['avg_profit2'])
            
            print(f"\n  Run {run + 1}:")
            print(f"    DDPG 1 -> Price: {result['avg_price1']:.4f}, Profit: {result['avg_profit1']:.4f}, "
                  f"Δ: {result['delta1']:.4f}, RPDI: {result['rpdi1']:.4f}")
            print(f"    DDPG 2 -> Price: {result['avg_price2']:.4f}, Profit: {result['avg_profit2']:.4f}, "
                  f"Δ: {result['delta2']:.4f}, RPDI: {result['rpdi2']:.4f}")
        
        # Calculate model averages
        results[model] = {
            'Avg Price DDPG1': np.mean([run_logs[model]['profit1'][i] for i in range(num_runs)]),
            'Avg Price DDPG2': np.mean([run_logs[model]['profit2'][i] for i in range(num_runs)]),
            'Theo Price': model_benchmarks['p_N'],
            'Delta DDPG1': np.mean(run_logs[model]['delta1']),
            'Delta DDPG2': np.mean(run_logs[model]['delta2']),
            'RPDI DDPG1': np.mean(run_logs[model]['rpdi1']),
            'RPDI DDPG2': np.mean(run_logs[model]['rpdi2']),
            'Std Delta1': np.std(run_logs[model]['delta1']),
            'Std Delta2': np.std(run_logs[model]['delta2'])
        }
        
        print(f"\n  Model Summary:")
        print(f"    DDPG1: Δ = {results[model]['Delta DDPG1']:.4f} ± {results[model]['Std Delta1']:.4f}")
        print(f"    DDPG2: Δ = {results[model]['Delta DDPG2']:.4f} ± {results[model]['Std Delta2']:.4f}")
    
    # Generate summary table
    print(f"\n{'='*80}")
    print("SUMMARY TABLE")
    print(f"{'='*80}\n")
    
    data = {
        'Model': [m.upper() for m in models],
        'P_N': [round(all_benchmarks[m]['p_N'], 4) for m in models],
        'P_M': [round(all_benchmarks[m]['p_M'], 4) for m in models],
        'π_N': [round(all_benchmarks[m]['E_pi_N'], 4) for m in models],
        'π_M': [round(all_benchmarks[m]['E_pi_M'], 4) for m in models],
        'Profit Range': [round(all_benchmarks[m]['E_pi_M'] - all_benchmarks[m]['E_pi_N'], 4) for m in models],
        'DDPG1 Δ': [round(results[m]['Delta DDPG1'], 4) for m in models],
        'DDPG2 Δ': [round(results[m]['Delta DDPG2'], 4) for m in models],
        'DDPG1 RPDI': [round(results[m]['RPDI DDPG1'], 4) for m in models],
        'DDPG2 RPDI': [round(results[m]['RPDI DDPG2'], 4) for m in models]
    }
    
    df = pd.DataFrame(data)
    
    # Save results
    os.makedirs("./results", exist_ok=True)
    df.to_csv("./results/ddpg_vs_ddpg_schemeB.csv", index=False)
    print(df.to_string(index=False))
    
    # Overall averages
    print(f"\n{'='*80}")
    print("OVERALL AVERAGES ACROSS ALL MODELS")
    print(f"{'='*80}\n")
    
    avg_delta1 = np.mean([results[m]['Delta DDPG1'] for m in models])
    avg_delta2 = np.mean([results[m]['Delta DDPG2'] for m in models])
    avg_rpdi1 = np.mean([results[m]['RPDI DDPG1'] for m in models])
    avg_rpdi2 = np.mean([results[m]['RPDI DDPG2'] for m in models])
    
    print(f"DDPG Agent 1:")
    print(f"  Average Delta (Δ):  {avg_delta1:.4f}")
    print(f"  Average RPDI:       {avg_rpdi1:.4f}")
    print(f"\nDDPG Agent 2:")
    print(f"  Average Delta (Δ):  {avg_delta2:.4f}")
    print(f"  Average RPDI:       {avg_rpdi2:.4f}")
    
    print(f"\n{'='*80}")
    print("[Results saved to ./results/ddpg_vs_ddpg_schemeB.csv]")
    print(f"{'='*80}\n")


if __name__ == "__main__":
    main()


DDPG vs DDPG - SCHEME B SHOCKS (ρ=0.95, σ_η=0.05)

Model: LOGIT
  Benchmarks: P_N=1.7973, P_M=2.0886
              π_N=0.3282, π_M=0.3640
              Profit Range: 0.0358

  Run 1:
    DDPG 1 -> Price: 1.5851, Profit: 0.1648, Δ: -4.5682, RPDI: -0.7283
    DDPG 2 -> Price: 1.4056, Profit: 0.2380, Δ: -2.5223, RPDI: -1.3443

  Run 2:
    DDPG 1 -> Price: 1.4056, Profit: 0.1956, Δ: -3.7076, RPDI: -1.3443
    DDPG 2 -> Price: 1.4056, Profit: 0.1923, Δ: -3.7992, RPDI: -1.3443

  Run 3:
    DDPG 1 -> Price: 1.4058, Profit: 0.1929, Δ: -3.7832, RPDI: -1.3439
    DDPG 2 -> Price: 1.4056, Profit: 0.1950, Δ: -3.7240, RPDI: -1.3445

  Run 4:
    DDPG 1 -> Price: 1.9923, Profit: 0.3426, Δ: 0.4018, RPDI: 0.6696
    DDPG 2 -> Price: 1.9923, Profit: 0.3366, Δ: 0.2345, RPDI: 0.6695

  Run 5:
    DDPG 1 -> Price: 1.9923, Profit: 0.3325, Δ: 0.1198, RPDI: 0.6696
    DDPG 2 -> Price: 1.9923, Profit: 0.3497, Δ: 0.5987, RPDI: 0.6697

  Run 6:
    DDPG 1 -> Price: 1.4057, Profit: 0.1988, Δ: -3.6170, RPDI: -1