# Reinforcement Learning for Dynamic Pricing (SAC)

This notebook implements a Soft Actor-Critic (SAC) agent to optimize prices within strict product-specific constraints.

In [1]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import pandas as pd
import pickle
from stable_baselines3 import SAC
from stable_baselines3.common.vec_env import DummyVecEnv
from sklearn.preprocessing import LabelEncoder

%matplotlib inline

Gym has been unmaintained since 2022 and does not support NumPy 2.0 amongst other critical functionality.
Please upgrade to Gymnasium, the maintained drop-in replacement of Gym, or contact the authors of your software and request that they upgrade.
Users of this version of Gym should be able to simply replace 'import gym' with 'import gymnasium as gym' in the vast majority of cases.
See the migration guide at https://gymnasium.farama.org/introduction/migration_guide/ for additional information.


In [2]:
# Load Data, Model, Encoders, and Constraints
df = pd.read_csv('../data/sales_data.csv')
price_constraints = pd.read_csv('price_constraints.csv').set_index('Product ID')

with open('demand_model_lgbm.pkl', 'rb') as f:
    demand_model = pickle.load(f)

with open('label_encoders.pkl', 'rb') as f:
    le_dict = pickle.load(f)

# Preprocess Data for Environment Simulation
# We need to encode the dataframe exactly as the model expects
df['Date'] = pd.to_datetime(df['Date'])
df['Month'] = df['Date'].dt.month
df['Day'] = df['Date'].dt.day
df['Weekday'] = df['Date'].dt.weekday

for col, le in le_dict.items():
    # Handle unseen labels if any (though we are training on same data for now)
    df[col] = df[col].map(lambda x: le.transform([x])[0] if x in le.classes_ else -1)

# Features needed for prediction (excluding Price which is the Action)
feature_cols = ['Store ID', 'Product ID', 'Category', 'Region', 'Inventory Level', 
                'Discount', 'Weather Condition', 'Promotion', 
                'Competitor Pricing', 'Seasonality', 'Epidemic', 'Month', 'Day', 'Weekday']

# --- Safe prediction wrapper to avoid degenerate zero outputs ---
import numpy as _np
def demand_predict_safe(input_df, min_floor=0.1):
    """Wrapper around the LightGBM predictor that enforces a minimum floor and returns a numpy array."""
    preds = demand_model.predict(input_df)
    preds = _np.array(preds, dtype=float)
    preds = _np.maximum(preds, min_floor)
    return preds

# Small diagnostic: sweep a price grid for a representative product and report how many price points return near-zero demand
try:
    # pick a representative row (first product we have)
    rep = df.iloc[0].copy()
    prices = _np.linspace(float(rep.get('Price', 10))*0.5, float(rep.get('Price', 10))*1.5, 21)
    zero_like = 0
    for p in prices:
        input_row = rep.copy()
        input_row['Price'] = p
        # build input as model expects (feature_cols + Price)
        in_dict = input_row[feature_cols].to_dict()
        in_dict['Price'] = p
        pred = demand_predict_safe(pd.DataFrame([in_dict]), min_floor=0.1)[0]
        if pred <= 0.1:
            zero_like += 1
    print(f"Diagnostic: {zero_like}/{len(prices)} price points produce near-zero predictions (floor=0.1)")
except Exception as e:
    print('Diagnostic sweep failed:', e)

Diagnostic: 21/21 price points produce near-zero predictions (floor=0.1)


## Define Custom Environment with Constraints

In [9]:
class PricingEnv(gym.Env):
    def __init__(self, df, model, constraints, feature_cols, episode_length=30,
                 zero_demand_penalty=10.0, price_change_penalty=0.1, reward_scale=100.0):
        super(PricingEnv, self).__init__()
        self.df = df.sort_values('Date')  # Ensure chronological order
        self.model = model
        self.constraints = constraints
        self.feature_cols = feature_cols
        self.episode_length = episode_length
        self.zero_demand_penalty = zero_demand_penalty
        self.price_change_penalty = price_change_penalty
        self.reward_scale = reward_scale

        # Group data by Store and Product to form episodes
        self.groups = [group for _, group in self.df.groupby(['Store ID', 'Product ID']) if len(group) >= episode_length]

        # Action Space: Continuous Price normalized to [-1, 1] for stable learning
        self.action_space = spaces.Box(low=-1, high=1, shape=(1,), dtype=np.float32)

        # Observation Space: Features + previous price (to discourage wild swings)
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(len(feature_cols) + 1,), dtype=np.float32)

    def reset(self, *, seed=None, return_info=True, options=None):
        # Pick a random product/store sequence
        if not self.groups:
            # Fallback if no groups large enough (shouldn't happen with full data)
            self.current_group = self.df.iloc[:self.episode_length].reset_index(drop=True)
        else:
            group_idx = np.random.randint(0, len(self.groups))
            self.current_group = self.groups[group_idx].reset_index(drop=True)
            
        self.current_step = 0
        self.state_row = self.current_group.iloc[self.current_step]
        self.product_id = self.state_row['Product ID']

        # Get constraints (map back to raw product id if encoders used)
        try:
            raw_product_id = le_dict['Product ID'].inverse_transform([int(self.product_id)])[0]
        except Exception:
            raw_product_id = self.product_id

        try:
            self.min_price = self.constraints.loc[raw_product_id, 'min_price']
            self.max_price = self.constraints.loc[raw_product_id, 'max_price']
        except KeyError:
            # Reasonable defaults if constraints missing
            self.min_price, self.max_price = max(0.01, float(self.state_row.get('Price', 10)) * 0.5), float(self.state_row.get('Price', 10)) * 1.5

        # Previous price init (use current observed price)
        self.prev_price = float(self.state_row.get('Price', (self.min_price + self.max_price) / 2))

        # Compute a model-based demand cap for this product to prevent unrealistic spikes.
        # We predict demand over the historical group prices (or the current prev_price if no Price present)
        try:
            group_input = self.current_group[self.feature_cols].copy()
            # ensure Price column exists in group_input for model input
            if 'Price' in self.current_group.columns:
                group_input['Price'] = self.current_group['Price'].values
            else:
                group_input['Price'] = self.prev_price
            # Use the same safe predictor; fallback to raw predict if needed
            preds = demand_predict_safe(group_input, min_floor=0.1)
            # Use 99th percentile as a conservative cap
            self.product_demand_cap = float(np.percentile(preds, 99)) if len(preds) > 0 else np.inf
            # If percentile is extremely small or NaN, fall back to no cap
            if not np.isfinite(self.product_demand_cap) or self.product_demand_cap <= 0.0:
                self.product_demand_cap = np.inf
        except Exception:
            self.product_demand_cap = np.inf

        obs = list(self.state_row[self.feature_cols].values.astype(np.float32)) + [self.prev_price]
        obs = np.array(obs, dtype=np.float32)
        # gymnasium reset should return (obs, info) when return_info=True
        if return_info:
            return obs, {'product_demand_cap': self.product_demand_cap}
        return obs

    def step(self, action):
        # Scale action [-1, 1] to [MinPrice, MaxPrice]
        scaled_action = float(action[0])
        price = float(self.min_price + (self.max_price - self.min_price) * (scaled_action + 1) / 2)

        # Prepare input for Demand Model
        input_data = self.state_row[self.feature_cols].to_dict()
        input_data['Price'] = price
        input_df = pd.DataFrame([input_data])

        # Predict Demand using safe wrapper to enforce floors and avoid degenerate zeros
        try:
            predicted_demand = float(demand_predict_safe(input_df, min_floor=0.1)[0])
        except Exception:
            # Fallback to model.predict if wrapper fails
            predicted_demand = float(max(0.0, self.model.predict(input_df)[0]))

        # Enforce non-negativity and cap by inventory to reflect stockouts
        predicted_demand = max(0.0, predicted_demand)
        inventory = float(self.state_row.get('Inventory Level', np.inf)) if not pd.isna(self.state_row.get('Inventory Level', np.nan)) else np.inf
        if inventory > 0 and not np.isinf(inventory):
            predicted_demand = min(predicted_demand, inventory)

        # Apply product-level demand cap computed at reset to avoid model spikes
        if hasattr(self, 'product_demand_cap') and np.isfinite(self.product_demand_cap):
            # allow a small multiplier (>1) so the agent can explore slightly beyond historical extremes if model supports it
            allowed_cap = float(self.product_demand_cap * 1.1)
            if predicted_demand > allowed_cap:
                predicted_demand = allowed_cap
                capped_flag = True
            else:
                capped_flag = False
        else:
            capped_flag = False

        # Calculate Revenue
        revenue = price * predicted_demand

        # Penalties to avoid degenerate policies that exploit zero-demand
        zero_pen = self.zero_demand_penalty if predicted_demand < 1e-3 else 0.0
        # Penalize large price jumps to stabilize learning
        price_change_pen = self.price_change_penalty * abs(price - self.prev_price) / max(1.0, self.prev_price)

        # Raw reward: revenue minus penalties
        raw_reward = revenue - zero_pen - price_change_pen
        # Scale reward to keep learning stable
        scaled_reward = raw_reward / self.reward_scale

        # Step
        self.current_step += 1
        done = self.current_step >= self.episode_length
        terminated = bool(done)
        truncated = False

        if not done:
            self.state_row = self.current_group.iloc[self.current_step]
            next_obs = list(self.state_row[self.feature_cols].values.astype(np.float32)) + [price]
            next_obs = np.array(next_obs, dtype=np.float32)
        else:
            next_obs = np.zeros(self.observation_space.shape, dtype=np.float32)

        # Update prev_price for next step
        self.prev_price = price

        info = {
            'revenue': revenue, 
            'price': price, 
            'demand': predicted_demand, 
            'min_price': self.min_price, 
            'max_price': self.max_price,
            'product_demand_cap': getattr(self, 'product_demand_cap', np.inf),
            'demand_capped': bool(capped_flag)
        }

        # gymnasium.step returns (obs, reward, terminated, truncated, info)
        return next_obs, float(scaled_reward), terminated, truncated, info

## Train SAC Agent

In [14]:
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize
from stable_baselines3.common.monitor import Monitor
from stable_baselines3.common.callbacks import EvalCallback, StopTrainingOnNoModelImprovement
import os
import traceback

def make_env():
    # Increase penalties and provide reward scaling to avoid degenerate zero-demand optima
    env = PricingEnv(df, demand_model, price_constraints, feature_cols,
                     episode_length=30, zero_demand_penalty=50.0, price_change_penalty=0.05, reward_scale=100.0)
    return Monitor(env)

env = DummyVecEnv([make_env])
# Normalize observations and rewards to stabilize learning (important when rewards have wide dynamic range)
env = VecNormalize(env, norm_obs=True, norm_reward=True, clip_obs=10.)

# Check for TensorBoard availability and set tensorboard_log accordingly
try:
    import tensorboard  # type: ignore
    tb_log = './logs/sac_pricing/'
except Exception:
    print('TensorBoard not installed; disabling tensorboard logging. To enable, run: pip install tensorboard')
    tb_log = None

# SAC Agent with tuned hyperparams for stability
model = SAC(
    "MlpPolicy",
    env,
    verbose=1,
    buffer_size=500_000,
    batch_size=256,
    ent_coef=0.01,  # reduce exploration compared with auto for smoke runs
    learning_rate=3e-4,
    tau=0.005,
    tensorboard_log=tb_log,
    )

# Setup evaluation environment and callback to save the best model and stop on no improvement
eval_env = DummyVecEnv([make_env])
# Note: do not normalize rewards on eval env (use same obs normalization if needed)
eval_env = VecNormalize(eval_env, norm_obs=True, norm_reward=False, clip_obs=10.)
os.makedirs('./logs/sac_pricing/best_model', exist_ok=True)
stop_cb = StopTrainingOnNoModelImprovement(max_no_improvement_evals=5, min_evals=5, verbose=1)
eval_callback = EvalCallback(eval_env, best_model_save_path='./logs/sac_pricing/best_model', 
                             log_path='./logs/sac_pricing/eval', eval_freq=5000, n_eval_episodes=5, deterministic=True, callback_on_new_best=stop_cb)

# Smoke training: 50k steps for quick validation with eval callback
try:
    model.learn(total_timesteps=300_000, callback=eval_callback)
    model.save('sac_pricing_model')
except ImportError as ie:
    # Specific helpful message if tensorboard import is the cause
    print('ImportError during training:')
    print(ie)
    print('\nIf this is due to TensorBoard, install it with: pip install tensorboard')
except Exception as e:
    print('Error during model.learn():')
    traceback.print_exc()

TensorBoard not installed; disabling tensorboard logging. To enable, run: pip install tensorboard
Using cpu device
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 30       |
|    ep_rew_mean     | 63.6     |
| time/              |          |
|    episodes        | 4        |
|    fps             | 340      |
|    time_elapsed    | 0        |
|    total_timesteps | 120      |
| train/             |          |
|    actor_loss      | 0.0297   |
|    critic_loss     | 0.00965  |
|    ent_coef        | 0.01     |
|    learning_rate   | 0.0003   |
|    n_updates       | 19       |
---------------------------------
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 30       |
|    ep_rew_mean     | 63.6     |
| time/              |          |
|    episodes        | 4        |
|    fps             | 340      |
|    time_elapsed    | 0        |
|    total_timesteps | 120      |
| train/             |          |
|

## Evaluation

In [15]:
# Evaluation (vectorized env) - print debug fields added
obs = env.reset()
for _ in range(10):
    action, _states = model.predict(obs)
    obs, rewards, dones, infos = env.step(action)
    info0 = infos[0] if isinstance(infos, (list, tuple)) else infos
    print(f"Price: {info0['price']:.2f} (Bounds: {info0['min_price']:.2f}-{info0['max_price']:.2f}), Demand: {info0['demand']:.2f}, Revenue: {info0['revenue']:.2f}, Capped: {info0.get('demand_capped', False)}, Cap: {info0.get('product_demand_cap', float('inf')):.2f}, Floor: {info0.get('product_floor', 1e-6):.6f}")

Price: 95.20 (Bounds: 11.59-99.74), Demand: 5.25, Revenue: 500.24, Capped: False, Cap: 33.14, Floor: 0.000001
Price: 75.91 (Bounds: 11.59-99.74), Demand: 7.40, Revenue: 562.00, Capped: False, Cap: 33.14, Floor: 0.000001
Price: 98.60 (Bounds: 11.59-99.74), Demand: 10.08, Revenue: 993.73, Capped: False, Cap: 33.14, Floor: 0.000001
Price: 95.99 (Bounds: 11.59-99.74), Demand: 0.10, Revenue: 9.60, Capped: False, Cap: 33.14, Floor: 0.000001
Price: 47.40 (Bounds: 11.59-99.74), Demand: 0.10, Revenue: 4.74, Capped: False, Cap: 33.14, Floor: 0.000001
Price: 96.10 (Bounds: 11.59-99.74), Demand: 27.91, Revenue: 2681.90, Capped: False, Cap: 33.14, Floor: 0.000001
Price: 45.22 (Bounds: 11.59-99.74), Demand: 4.49, Revenue: 202.88, Capped: False, Cap: 33.14, Floor: 0.000001
Price: 36.52 (Bounds: 11.59-99.74), Demand: 1.10, Revenue: 40.00, Capped: False, Cap: 33.14, Floor: 0.000001
Price: 77.43 (Bounds: 11.59-99.74), Demand: 2.07, Revenue: 159.97, Capped: False, Cap: 33.14, Floor: 0.000001
Price: 97.53