In [None]:
import numpy as np
import random
import matplotlib.pyplot as plt
import scipy.stats as stats
from scipy import optimize

# Environment Setup

In [None]:
class NonStationaryStochasticPricingEnvironment:
    """
    Non-Stationary stochastic environment, with the distribution over customer valuations for a single product changing quickly over time.
    """
    def __init__(self, valuation_distributions, demand_noise_std=0.1):
        """
        Args:
            valuation_distributions: A list of different scipy.stats distributions representing customer valuations
            demand_noise_std: Standard deviation of noise in demand probability
            current_round: Variable that keep the count of the round been played.
        """
        self.valuation_dist = valuation_distributions
        self.noise_std = demand_noise_std
        self.current_round = 0


    def demand_probability(self, price):
        """
        Calculate the probability that a customer buys at given price.
        This is P(valuation >= price) with some noise.
        """
        # Retrieve the distribution associated with the current round
        current_dist = self.valuation_dist[self.current_round]

        # Base probability: customers buy if their valuation >= price
        base_prob = 1 - current_dist.cdf(price)
        
        # Add some noise to make it stochastic
        noise = np.random.normal(0, self.noise_std)
        
        prob = base_prob + noise
        # Ensure probability is in [0, 1]
        return np.clip(prob, 0, 1)
    

    def simulate_round(self, price):
        
        """
        Simulate one pricing round.
        Returns: (sale_made, revenue)
        """

        # Retrieve the distribution associated with the current round
        current_dist = self.valuation_dist[self.current_round]

        # Draw a random customer valuation from the distribution
        valuation = current_dist.rvs()
        
        # Customer purchases if their valuation >= price
        sale_made = 1 if valuation >= price else 0
        
        # Revenue is price if sale was made, 0 otherwise
        revenue = sale_made * price
        
        # Updating rounds' count
        self.current_round += 1

        return sale_made, revenue

Poi quando vado a definire l'env conf, definisco una funzione lambda per media e std. Creo un vettore di distribuzioni da passare all'environment.
Forse addirittura ha senso cambiare la distribuzione (e non farle solo normale), per creare un cambiamento più sharp.

# Agent

In [None]:
class PrimalDualAgent:
    """
    Primal-dual agent for dynamic pricing with inventory constraints in highly non-stationary environments.
    
    This agent implements a primal-dual based algorithm adapted for:
    1. Dynamic pricing (instead of traditional MAB rewards)
    2. Inventory constraints (limited number of products to sell)
    3. Dual optimization: maximize revenue while respecting inventory constraint
    
    The algorithm maintains upper confidence bounds on revenue (f_UCB) and 
    lower confidence bounds on demand probability (c_LCB), then solves a 
    linear program to find the optimal price distribution.
    """
    
    def __init__(self, P, T, prices, eta = 0.1, ema_alpha = 0.2, lambda0 = 0.0):
        """
        Initialize the UCB agent for constrained dynamic pricing.
        
        Args:
            P: Total inventory (number of products available)
            T: Time horizon (number of rounds)
            prices: List of available prices to choose from
            eta: learning rate for dual update
        """
        k = len(prices)

        # Environment parameters
        self.prices = prices  # Available price options
        self.K = k           # Number of price arms
        self.T = T           # Total number of rounds
        self.t = 0           # Current round number
        
        # Primal-dual parameters
        self.rho = P/float(T)                         # Target selling rate
        self.eta = eta
        self.lambda_ = float(lambda0)
        self.lambda_upper = 1.0 / max(self.rho, 1e-8) # Projection upper bound
        
        # Inventory management
        self.inventory = P  # Initial inventory
        self.remaining_inventory = P # Current remaining inventory 

        # EMA estimates (reactive to non-stationarity)
        self.ema_alpha = ema_alpha
        self.f_est = np.zeros(self.K)        # Estimated expected revenue for each price
        self.c_est = np.ones(self.K) * 1e-6  # Estimated demand probability for each price, initialized with small positive number to avoid degenerate LP constraints
        
        
        self.N_pulls = np.zeros(k)           # Number of times each price was selected
        self.current_price_idx = None

        # History tracking
        self.history = {
            'prices': [],     # Selected prices over time
            'rewards': [],    # Observed revenues over time
            'purchases': [],  # Purchase indicators over time
            'lambda': [],     # Lambda parameter
            'inventory': []   # Inventory levels over time
        }
    
    def select_price(self):
        """
        Select the next price using LP with inventory constraints.
        
        Solve LP : maximize sum_i gamma_i * (f_est_i - lambda*c_est_i)
                   subject to sum gamma = 1, gamma >= 0
        
        If LP fails or objective degenerate, fallback to greedy argmax(f_est - lambda*c_est)         
        Then sample price according to gamma

        Returns:
            Selected price, or np.nan if no inventory remaining
        """
        # No inventory left - cannot make meaningful pricing decisions
        if self.remaining_inventory < 1:
            self.current_price_idx = np.argmax(self.prices)  # Arbitrary selection
            return np.nan
            
        # Objective coefficients (with - since we want to maximize, while linprog does minimization)
        obj = -(self.f_est - self.lambda_ * self.c_est)
        
        # Constraints: sum gamma = 1, gamma>=0
        A_eq = [np.ones(self.K)]
        b_eq = [1.0]
        bounds = [(0.0, 1.0) for _ in range(self.K)]
        try:            
            res = optimize.linprog(c = obj, A_eq = A_eq, b_eq = b_eq, bounds = bounds, method = 'highs')
            if res.success:
                gamma = res.x
                gamma = np.maximum(gamma,0.0)
                s = gamma.sum()
                if s<=1e-12:
                    gamma = np.ones(self.K) / self.K
                else:
                    gamma = gamma / s
            else:
                # fallback: deterministic greedy on adjusted objective
                scores = self.f_Est - self.lambda_ * self.c_est
                best = np.argmax(scores)
                gamma = np.zeros(self.K)
                gamma[best] = 1.0
        except Exception:
            scores = self.f_est - self.lambda_ * self.c_est
            best = np.argmax(scores)
            gamma = np.zeros(self.K)
            gamma[best] = 1.0
        
        # Sample according to gamma
        idx = np.random.choice(self.K, p=gamma)
        self.current_price_idx = int(idx)
        return float(self.prices[self.current_price_idx])
    
    def update(self, reward, purchased):
        """
        Update agent's statistics based on observed outcome.
        
        Args:
            reward: Revenue obtained (price if purchased, 0 otherwise)
            purchased: Boolean indicating if purchase was made
        """
        idx = self.current_price_idx
        
        # Update pull count
        self.N_pulls[idx] += 1
        
        # Update EMA for f_est, reward is either price, if purchased, or 0
        prev_f = self.f_est[idx]
        self.f_est[idx] = (1-self.ema_alpha)*prev_f+self.ema_alpha*reward
        
        # Dual update: lambda <- proj_[0, 1/rho] (lambda - eta*(rho-c_t(b_t)))
        purchased_indicator = 1.0 if purchased else 0.0
        grad = (self.rho - purchased_indicator)
        self.lambda_ = self.lambda_ - self.eta * grad
        # Project
        self.lambda_ = np.clip(self.lambda_, 0.0, self.lambda_upper)

        # Update inventory only if purchase was actually made and inventory available
        if purchased and self.remaining_inventory > 0:
            self.remaining_inventory -= 1
        elif purchased and self.remaining_inventory <= 0:
            # This shouldn't happen with proper price selection, but handle gracefully
            reward = 0
            purchased = False
        
        # Record history
        self.history['prices'].append(self.prices[idx])
        self.history['rewards'].append(reward)
        self.history['purchases'].append(purchased)
        self.history['lambda'].append(self.lambda_)
        self.history['inventory'].append(self.remaining_inventory)
        
        # Increment time
        self.t += 1

    def get_state(self):
        return {
            't': self.t,
            'remaining_inventory': self.remaining_inventory,
            'lambda': self.lambda_,
            'f_est': self.f_est.copy(),
            'c_est': self.c_est.copy()
        }