In [None]:
import itertools
import numpy as np
import pandas as pd
from typing import Dict, Tuple, List
import logging
from dataclasses import dataclass
import time
from data_generator import TestConfiguration, create_test_instance

In [None]:
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

In [None]:
@dataclass(frozen=True)
class DPState:
    """Immutable state representation for Dynamic Programming solution."""
    capacity: Tuple[int, ...]  # (capacity_day1, capacity_day2, capacity_day3)
    time: int

class DynamicProgramming:
    """Dynamic Programming solution for hotel revenue optimization."""
    
    def __init__(self, instance: Dict):
        """Initialize using test instance from data_generator."""
        self.params = instance['parameters']
        self.T = self.params.T
        self.N = self.params.N
        self.C = self.params.C
        self.price_min = self.params.price_min
        self.price_max = self.params.price_max
        
        self.booking_classes = instance['booking_classes']
        self.arrival_probs = instance['arrival_probabilities']
        self.price_sensitivity = instance['reservation_price_params']
        
        # Use exactly 3 price levels as specified
        self.price_levels = [self.price_min, (self.price_min + self.price_max)/2, self.price_max]
        self.price_combinations = list(product(self.price_levels, repeat=self.N))
        
        # Pre-compute stay patterns
        self.class_stays = {
            (arr, dep): set(range(arr - 1, dep)) 
            for arr, dep in self.booking_classes
        }
        self.stay_lengths = {
            (arr, dep): dep - arr + 1 
            for arr, dep in self.booking_classes
        }
        
        # Initialize value function and policy
        self.value_function: Dict[DPState, float] = {}
        self.optimal_policy: Dict[DPState, Dict[int, float]] = {}
        
    def solve(self) -> Tuple[Dict[Tuple[int, ...], Dict[int, float]], float]:
        """Solve the DP problem and return value functions and optimal policy."""
        logger.info("Starting DP solution")
        
        # Initialize boundary conditions
        self._initialize_boundary_conditions()
        
        # Backward induction
        for t in range(self.T, 0, -1):
            logger.info(f"Processing time period {t}")
            for capacity in self._generate_capacity_vectors():
                state = DPState(capacity=capacity, time=t)
                optimal_value, optimal_prices = self._compute_optimal_decision(state)
                self.value_function[state] = optimal_value
                
                # Store policy as dictionary mapping day to price
                self.optimal_policy[state] = {
                    i+1: price for i, price in enumerate(optimal_prices)
                }
        
        # Extract and format results for capacity = 5
        results = {}
        for state, prices in self.optimal_policy.items():
            if state.capacity[0] == 5:  # Only for first day capacity = 5
                results[state.capacity] = prices
        
        # Get optimal value for initial state
        initial_state = DPState(
            capacity=tuple(self.C for _ in range(self.N)),
            time=1
        )
        optimal_value = self.value_function[initial_state]
        
        return results, optimal_value
    
    def _initialize_boundary_conditions(self):
        """Initialize boundary conditions."""
        for capacity in self._generate_capacity_vectors():
            # Terminal period conditions
            terminal_state = DPState(capacity=capacity, time=self.T + 1)
            self.value_function[terminal_state] = 0.0
            
            # Zero capacity conditions for all periods
            if sum(capacity) == 0:
                for t in range(1, self.T + 2):
                    state = DPState(capacity=capacity, time=t)
                    self.value_function[state] = 0.0
    
    def _compute_optimal_decision(self, state: DPState) -> Tuple[float, Tuple[float, ...]]:
        """Compute optimal value and prices for a state."""
        max_value = float('-inf')
        optimal_prices = self.price_levels[0:self.N]  # Default to minimum prices
        
        for prices in self.price_combinations:
            value = self._compute_expected_value(state, prices)
            if value > max_value:
                max_value = value
                optimal_prices = prices
        
        return max_value, optimal_prices
    
    def _compute_expected_value(self, state: DPState, prices: Tuple[float, ...]) -> float:
        """Compute expected value for state-prices pair."""
        value = 0.0
        current_probs = self.arrival_probs[state.time]
        
        # No arrival case
        no_arrival_prob = 1.0 - sum(current_probs.values())
        if no_arrival_prob > 0:
            next_state = DPState(capacity=state.capacity, time=state.time + 1)
            value += no_arrival_prob * self.value_function[next_state]
        
        # For each possible booking request
        for (arrival, departure), arrival_prob in current_probs.items():
            if arrival_prob <= 0:
                continue
                
            stay_nights = self.class_stays[(arrival, departure)]
            has_capacity = all(state.capacity[day] > 0 for day in stay_nights)
            
            if has_capacity:
                # Calculate average price and acceptance probability
                stay_prices = [prices[day] for day in stay_nights]
                avg_price = sum(stay_prices) / self.stay_lengths[(arrival, departure)]
                
                eps = self.price_sensitivity[(arrival, departure)]
                accept_prob = max(0, 1 - eps * avg_price)
                
                if accept_prob > 0:
                    # Acceptance case
                    next_capacity = list(state.capacity)
                    for day in stay_nights:
                        next_capacity[day] -= 1
                    
                    next_state = DPState(capacity=tuple(next_capacity), time=state.time + 1)
                    immediate_revenue = sum(stay_prices)
                    future_value = self.value_function[next_state]
                    
                    value += arrival_prob * accept_prob * (immediate_revenue + future_value)
                
                # Rejection case
                if accept_prob < 1:
                    reject_prob = 1 - accept_prob
                    next_state = DPState(capacity=state.capacity, time=state.time + 1)
                    value += arrival_prob * reject_prob * self.value_function[next_state]
            else:
                # No capacity case
                next_state = DPState(capacity=state.capacity, time=state.time + 1)
                value += arrival_prob * self.value_function[next_state]
        
        return value
    
    def _generate_capacity_vectors(self) -> List[Tuple[int, ...]]:
        """Generate all possible capacity vectors."""
        return [tuple(cap) for cap in product(range(self.C + 1), repeat=self.N)]

class StochasticApproximation:
    """
    Implementation of the Stochastic Approximation Algorithm for hotel dynamic pricing.
    
    This implementation follows the exact methodology described in the theoretical framework,
    incorporating smoothed decision functions and proper gradient calculations.
    """
    
    def __init__(self, instance: Dict, learning_params: Dict = None):
        """
        Initialize the SAA algorithm with problem instance and learning parameters.
        
        Args:
            instance: Dictionary containing problem parameters and data
            learning_params: Dictionary containing learning rate parameters:
                - eta_0: Initial learning rate
                - gamma: Learning rate decay parameter
                - eta_min: Minimum learning rate
                - max_epochs: Maximum number of training epochs
                - batch_size: Mini-batch size for gradient computation
        """
        # Extract instance parameters
        self.params = instance['parameters']
        self.booking_classes = instance['booking_classes']
        self.arrival_probs = instance['arrival_probabilities']
        self.epsilon = instance['reservation_price_params']
        
        # Set learning parameters
        default_learning_params = {
            'eta_0': 0.1,
            'gamma': 0.1,
            'eta_min': 0.001,
            'max_epochs': 1000,
            'batch_size': 32
        }
        self.learning_params = {**default_learning_params, **(learning_params or {})}
        
        # Initialize smoothing parameters from instance
        self.alpha = self.params.alpha  # Price acceptance smoothing
        self.beta = self.params.beta    # Capacity smoothing
        
        # Initialize prices and precompute class information
        self.prices = self._initialize_prices()
        self._precompute_class_info()
        
        logger.info(f"Initialized SAA with {len(self.booking_classes)} booking classes")
    
    def _initialize_prices(self) -> Dict[int, np.ndarray]:
        """Initialize price vectors for each booking period."""
        return {t: np.full(self.params.N, (self.params.price_min + self.params.price_max) / 2)
                for t in range(1, self.params.T + 1)}
    
    def _precompute_class_info(self):
        """Precompute booking class information for efficient computation."""
        self.class_stays = {}
        self.stay_lengths = {}
        for arrival, departure in self.booking_classes:
            self.class_stays[(arrival, departure)] = list(range(arrival - 1, departure))
            self.stay_lengths[(arrival, departure)] = departure - arrival + 1
    
    def _compute_smoothed_decision(self, 
                                 price_bt: float,
                                 qt: float,
                                 xt: np.ndarray,
                                 stay_nights: List[int]) -> float:
        """
        Compute smoothed decision function value.
        
        Args:
            price_bt: Average price for booking class b at time t
            qt: Customer's reservation price
            xt: Current capacity vector
            stay_nights: List of nights required for the stay
            
        Returns:
            Smoothed decision function value
        """
        # Price acceptance smoothing
        sp = 1 / (1 + np.exp(-self.alpha * (qt - price_bt)))
        
        # Capacity smoothing for each required night
        sx = np.prod([1 / (1 + np.exp(-self.beta * (xt[i] - 1))) for i in stay_nights])
        
        return sp * sx
    
    def _compute_decision_gradients(self,
                                  price_bt: float,
                                  qt: float,
                                  xt: np.ndarray,
                                  stay_nights: List[int],
                                  Lb: int) -> Tuple[np.ndarray, np.ndarray]:
        """
        Compute gradients of the smoothed decision function.
        
        Args:
            price_bt: Average price for booking class b at time t
            qt: Customer's reservation price
            xt: Current capacity vector
            stay_nights: List of nights required for the stay
            Lb: Length of stay
            
        Returns:
            Tuple of gradients with respect to prices and capacities
        """
        # Initialize gradients
        price_grad = np.zeros(self.params.N)
        capacity_grad = np.zeros(self.params.N)
        
        # Compute common terms
        sp = 1 / (1 + np.exp(-self.alpha * (qt - price_bt)))
        sp_grad = -self.alpha * sp * (1 - sp)
        
        sx_values = [1 / (1 + np.exp(-self.beta * (xt[i] - 1))) for i in stay_nights]
        sx_prod = np.prod(sx_values)
        
        # Compute price gradients
        for i in stay_nights:
            price_grad[i] = sp_grad * (1/Lb) * sx_prod
            
        # Compute capacity gradients
        for i in stay_nights:
            sx_i = 1 / (1 + np.exp(-self.beta * (xt[i] - 1)))
            capacity_grad[i] = sp * sx_prod * self.beta * (1 - sx_i) / sx_i
            
        return price_grad, capacity_grad
    
    def _compute_immediate_revenue(self,
                                 bt: Tuple[int, int],
                                 price_bt: float,
                                 ut: float) -> float:
        """
        Compute immediate revenue for an accepted booking.
        
        Args:
            bt: Booking class tuple (arrival, departure)
            price_bt: Average price for the stay
            ut: Decision function value
            
        Returns:
            Immediate revenue
        """
        Lb = self.stay_lengths[bt]
        return Lb * price_bt * ut
    
    def _generate_sample_path(self) -> List[Tuple]:
        """Generate a sample path of customer arrivals and reservation prices."""
        path = []
        for t in range(1, self.params.T + 1):
            # Generate arrival
            probs = self.arrival_probs[t]
            arrival_prob = np.random.random()
            
            if arrival_prob < sum(probs.values()):
                # Select booking class
                classes = list(probs.keys())
                probabilities = [probs[c] for c in classes]
                bt = classes[np.random.choice(len(classes), p=np.array(probabilities)/sum(probabilities))]
                
                # Generate reservation price
                eps = self.epsilon[bt]
                max_price = 1/eps  # Price where acceptance probability becomes 0
                qt = np.random.uniform(self.params.price_min, max_price)
                
                path.append((t, bt, qt))
            else:
                path.append((t, None, None))
        
        return path
    
    def _forward_pass(self, sample_path: List[Tuple]) -> Tuple[float, Dict, Dict]:
        """
        Perform forward pass through the sample path and compute gradients.
        
        Args:
            sample_path: List of (time, booking_class, reservation_price) tuples
            
        Returns:
            Tuple of (total_revenue, price_gradients, capacity_gradients)
        """
        total_revenue = 0
        price_gradients = {t: np.zeros(self.params.N) for t in range(1, self.params.T + 1)}
        capacity_gradients = {t: np.zeros(self.params.N) for t in range(1, self.params.T + 1)}
        
        # Initialize capacity
        xt = np.full(self.params.N, self.params.C)
        
        for t, bt, qt in sample_path:
            if bt is not None:
                # Extract booking class information
                stay_nights = self.class_stays[bt]
                Lb = self.stay_lengths[bt]
                
                # Compute average price for the stay
                stay_prices = self.prices[t][stay_nights]
                price_bt = np.mean(stay_prices)
                
                # Compute decision function value
                ut = self._compute_smoothed_decision(price_bt, qt, xt, stay_nights)
                
                if ut > 0:
                    # Compute revenue
                    revenue = self._compute_immediate_revenue(bt, price_bt, ut)
                    total_revenue += revenue
                    
                    # Compute gradients
                    price_grad, capacity_grad = self._compute_decision_gradients(
                        price_bt, qt, xt, stay_nights, Lb)
                    
                    price_gradients[t] += price_grad
                    capacity_gradients[t] += capacity_grad
                    
                    # Update capacity
                    xt[stay_nights] -= ut
        
        return total_revenue, price_gradients, capacity_gradients
    
    def _update_prices(self, gradients: Dict[int, np.ndarray], learning_rate: float):
        """Update prices using computed gradients."""
        for t in range(1, self.params.T + 1):
            self.prices[t] += learning_rate * gradients[t]
            # Project prices to feasible range
            self.prices[t] = np.clip(self.prices[t], 
                                   self.params.price_min, 
                                   self.params.price_max)
    
    def solve(self) -> Tuple[Dict[int, np.ndarray], float, float]:
        """
        Execute the SAA algorithm and return optimized prices and performance metrics.
        
        Returns:
            Tuple of (optimal_prices, expected_revenue, solve_time)
        """
        start_time = time.time()
        best_revenue = float('-inf')
        best_prices = None
        
        for epoch in range(self.learning_params['max_epochs']):
            epoch_revenue = 0
            epoch_price_gradients = {t: np.zeros(self.params.N) 
                                   for t in range(1, self.params.T + 1)}
            
            # Compute learning rate for current epoch
            learning_rate = max(
                self.learning_params['eta_min'],
                self.learning_params['eta_0'] / (1 + self.learning_params['gamma'] * epoch)
            )
            
            # Process mini-batches
            for _ in range(self.learning_params['batch_size']):
                # Generate sample path and compute gradients
                sample_path = self._generate_sample_path()
                revenue, price_grads, _ = self._forward_pass(sample_path)
                
                epoch_revenue += revenue
                for t in range(1, self.params.T + 1):
                    epoch_price_gradients[t] += price_grads[t]
            
            # Average gradients over mini-batch
            for t in range(1, self.params.T + 1):
                epoch_price_gradients[t] /= self.learning_params['batch_size']
            
            # Update prices using averaged gradients
            self._update_prices(epoch_price_gradients, learning_rate)
            
            # Track best solution
            avg_revenue = epoch_revenue / self.learning_params['batch_size']
            if avg_revenue > best_revenue:
                best_revenue = avg_revenue
                best_prices = {t: np.copy(self.prices[t]) 
                             for t in range(1, self.params.T + 1)}
            
            if epoch % 100 == 0:
                logger.info(f"Epoch {epoch}: Revenue = {avg_revenue:.2f}, "
                          f"Learning Rate = {learning_rate:.6f}")
        
        solve_time = time.time() - start_time
        return best_prices, best_revenue, solve_time
    
    def evaluate(self, prices: Dict[int, np.ndarray], num_samples: int = 1000) -> float:
        """
        Evaluate a pricing policy using Monte Carlo simulation.
        
        Args:
            prices: Dictionary mapping time periods to price vectors
            num_samples: Number of sample paths to evaluate
            
        Returns:
            Expected revenue
        """
        total_revenue = 0
        original_prices = self.prices
        self.prices = prices
        
        for _ in range(num_samples):
            sample_path = self._generate_sample_path()
            revenue, _, _ = self._forward_pass(sample_path)
            total_revenue += revenue
        
        self.prices = original_prices
        return total_revenue / num_samples

In [None]:
def run_experiment1():
    """Run Experiment 1: Solution Quality Assessment."""
    logger.info("Starting Experiment 1: Solution Quality Assessment")
    
    # Create test instance
    config = TestConfiguration()
    test_params = config.get_config(
        test_type='minimal',
        market_condition='standard',
        discretization='coarse'
    )
    
    # Override parameters for small instance
    test_params.update({
        'T': 10,  # Small booking horizon
        'N': 5,   # Small service horizon
        'C': 5    # Small capacity
    })
    
    instance = create_test_instance(
        demand_scenario='base',
        market_condition='standard',
        test_configuration=test_params,
        seed=42
    )
    
    # Solve using Dynamic Programming
    dp = DynamicProgramming(instance)
    dp_revenue, dp_time = dp.solve()
    
    # Solve using Stochastic Approximation
    saa = StochasticApproximation(instance)
    saa_revenue, saa_time = saa.solve()
    
    # Calculate gap
    gap_percentage = ((dp_revenue - saa_revenue) / dp_revenue) * 100
    
    # Print results
    logger.info("\nExperiment 1 Results:")
    logger.info(f"Dynamic Programming Revenue: ${dp_revenue:.2f}")
    logger.info(f"SAA Revenue: ${saa_revenue:.2f}")
    logger.info(f"Optimality Gap: {gap_percentage:.2f}%")
    logger.info(f"DP Solution Time: {dp_time:.2f} seconds")
    logger.info(f"SAA Solution Time: {saa_time:.2f} seconds")
    
    return {
        'dp_revenue': dp_revenue,
        'saa_revenue': saa_revenue,
        'gap_percentage': gap_percentage,
        'dp_time': dp_time,
        'saa_time': saa_time
    }

In [None]:
if __name__ == "__main__":
    results = run_experiment1()