Setup

In [1]:
import numpy as np, pandas as pd, joblib, math, os
from sklearn.linear_model import Ridge
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.metrics import r2_score
np.set_printoptions(suppress=True, floatmode="fixed")

DATA_DIR = "/content/data"

Load & prepare

In [5]:
!mkdir -p ~/.kaggle
!cp /content/kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

!pip -q install kaggle
!mkdir -p /content/data

# Download the single ZIP for the competition
!kaggle competitions download -c m5-forecasting-accuracy -p /content/data

# Unzip the actual archive we just downloaded
%cd /content/data
!unzip -o m5-forecasting-accuracy.zip calendar.csv sell_prices.csv sales_train_validation.csv || unzip -o m5-forecasting-accuracy.zip
%cd /content

# sanity check
import os, pandas as pd
print(os.listdir("/content/data"))


m5-forecasting-accuracy.zip: Skipping, found more recently modified local copy (use --force to force download)
/content/data
Archive:  m5-forecasting-accuracy.zip
  inflating: calendar.csv            
  inflating: sales_train_validation.csv  
  inflating: sell_prices.csv         
/content
['m5-forecasting-accuracy.zip', 'sales_train_validation.csv', 'sell_prices.csv', 'calendar.csv']


In [6]:
cal = pd.read_csv(f"{DATA_DIR}/calendar.csv")
prices = pd.read_csv(f"{DATA_DIR}/sell_prices.csv")
sales = pd.read_csv(f"{DATA_DIR}/sales_train_validation.csv")

# pick one store and one category for a focused example
store_id = "CA_1"
cat_id = "FOODS"

meta = sales[['id','item_id','dept_id','cat_id','store_id','state_id']]
keep_ids = meta[(meta['store_id']==store_id) & (meta['cat_id'].str.startswith(cat_id))]['id']

sales_small = sales[sales['id'].isin(keep_ids)].copy()
# Melt to long format (daily demand)
value_cols = [c for c in sales_small.columns if c.startswith('d_')]
df = sales_small[['id','item_id','dept_id','cat_id','store_id','state_id']+value_cols] \
        .melt(id_vars=['id','item_id','dept_id','cat_id','store_id','state_id'],
              var_name='d', value_name='qty')

# Map day -> calendar
cal_small = cal[['d','date','wm_yr_wk','weekday','wday','month','event_name_1','event_type_1']].copy()
df = df.merge(cal_small, on='d', how='left')

# Map (store, item, week) -> sell_price
pmap = prices[prices['store_id']==store_id][['store_id','item_id','wm_yr_wk','sell_price']]
df = df.merge(pmap, on=['store_id','item_id','wm_yr_wk'], how='left')

# Basic cleaning
df['sell_price'] = df['sell_price'].fillna(method='ffill').fillna(method='bfill')
df = df.dropna(subset=['sell_price'])
df['log_price'] = np.log(df['sell_price'])
df['log_qty']   = np.log1p(df['qty'])   # log(1+q) for zeros
df['is_event']  = (~df['event_name_1'].isna()).astype(int)

# Keep last ~18 months to match price coverage
df['date'] = pd.to_datetime(df['date'])
df = df[df['date'] >= (df['date'].max() - pd.Timedelta(days=540))].copy()
df.head(), df.shape

  df['sell_price'] = df['sell_price'].fillna(method='ffill').fillna(method='bfill')


(                                  id      item_id  dept_id cat_id store_id  \
 1971564  FOODS_1_001_CA_1_validation  FOODS_1_001  FOODS_1  FOODS     CA_1   
 1971565  FOODS_1_002_CA_1_validation  FOODS_1_002  FOODS_1  FOODS     CA_1   
 1971566  FOODS_1_003_CA_1_validation  FOODS_1_003  FOODS_1  FOODS     CA_1   
 1971567  FOODS_1_004_CA_1_validation  FOODS_1_004  FOODS_1  FOODS     CA_1   
 1971568  FOODS_1_005_CA_1_validation  FOODS_1_005  FOODS_1  FOODS     CA_1   
 
         state_id       d  qty       date  wm_yr_wk   weekday  wday  month  \
 1971564       CA  d_1373    0 2014-11-01     11440  Saturday     1     11   
 1971565       CA  d_1373    1 2014-11-01     11440  Saturday     1     11   
 1971566       CA  d_1373    1 2014-11-01     11440  Saturday     1     11   
 1971567       CA  d_1373   10 2014-11-01     11440  Saturday     1     11   
 1971568       CA  d_1373    0 2014-11-01     11440  Saturday     1     11   
 
         event_name_1 event_type_1  sell_price  log_pr

Fit a log-linear demand model

In [7]:
feat_cols = ['log_price','weekday','month','is_event','item_id']
X = df[feat_cols].copy()
y = df['log_qty'].values

preproc = ColumnTransformer([
    ('passthrough', 'passthrough', ['log_price']),
    ('cat', OneHotEncoder(handle_unknown='ignore'),
     ['weekday','month','is_event','item_id'])
])

model = Pipeline([
    ('prep', preproc),
    ('ridge', Ridge(alpha=1.0))
])

model.fit(X, y)
y_hat = model.predict(X)
print("In-sample R^2:", round(r2_score(y, y_hat), 3))
elasticity = float(model.named_steps['ridge'].coef_[0])
print("Approx. price elasticity (should be negative):", round(elasticity, 3))

In-sample R^2: 0.428
Approx. price elasticity (should be negative): -0.261


Create a discrete price grid and revenue scorer

In [8]:
# build per-item price grids from historical min/max
bounds = df.groupby('item_id')['sell_price'].agg(['min','max']).reset_index()
def make_grid(mn, mx, steps=6):
    if pd.isna(mn) or pd.isna(mx) or mn <= 0: return []
    if mx <= mn: mx = mn * 1.1
    return list(np.round(np.linspace(mn*0.9, mx*1.1, steps), 2))

price_grid = {r.item_id: make_grid(r['min'], r['max']) for _, r in bounds.iterrows()}

def expected_qty(context_row, trial_price):
    row = context_row.copy()
    row['log_price'] = math.log(trial_price)
    # predict log(1+qty) then invert
    mu = float(model.predict(row[feat_cols].to_frame().T)[0])
    return max(0.0, math.exp(mu) - 1.0)

def expected_revenue(context_row, trial_price):
    return trial_price * expected_qty(context_row, trial_price)

Contextual LinUCB on real covariates

In [9]:
# Build a pool of contexts (one per (date,item) row)
pool = df[['item_id','weekday','month','is_event']].copy()
pool['log_price'] = 0.0  # placeholder; price varies per arm

# Numerical encoding for LinUCB context
ctx_cols = ['weekday','month','is_event']
enc = OneHotEncoder(handle_unknown='ignore', sparse_output=False)
Xctx = enc.fit_transform(pool[ctx_cols])
d = Xctx.shape[1]

class LinUCB:
    def __init__(self, d, alpha=0.8):
        self.alpha = alpha
        self.A = np.eye(d)
        self.b = np.zeros(d)
    def select(self, x, prices, proto_row):
        scores = []
        A_inv = np.linalg.inv(self.A); theta = A_inv @ self.b
        for p in prices:
            mu = expected_revenue(proto_row, p)  # context-aware expected revenue
            # use same A_inv for all arms here (shared model); you can make per-arm matrices too
            sigma = math.sqrt(x @ A_inv @ x)
            scores.append(mu + self.alpha * sigma)
        idx = int(np.argmax(scores))
        return idx, scores
    def update(self, x, reward):
        self.A += np.outer(x, x)
        self.b += reward * x

agent = LinUCB(d=d, alpha=0.8)
rng = np.random.default_rng(7)

T = 5000
cum_rev = 0.0
taken = []

for t in range(T):
    j = rng.integers(0, len(pool))
    row = df.iloc[j]
    proto = row[feat_cols].copy()
    item = row['item_id']
    grid = price_grid.get(item, [])
    if len(grid) < 2:
        continue

    x = Xctx[j]
    arm, scores = agent.select(x, grid, proto)
    price = grid[arm]

    # Sample realized demand around expected qty (Poisson)
    lam = expected_qty(proto, price)
    qty = rng.poisson(max(1e-6, lam))
    reward = price * qty

    agent.update(x, reward)
    cum_rev += reward
    taken.append(reward)

print("Steps:", len(taken), " Avg revenue/impression:", round(cum_rev/len(taken), 3))

Steps: 5000  Avg revenue/impression: 4.08


Export artifact for the API

In [10]:
ARTIFACT_DIR = "artifacts_m5"
os.makedirs(ARTIFACT_DIR, exist_ok=True)
joblib.dump({
    "onehot": enc,
    "feature_cols": feat_cols,
    "ctx_cols": ctx_cols,
    "price_grid": price_grid,
    "model_pipe": model  # demand predictor
}, f"{ARTIFACT_DIR}/m5_price_artifacts.pkl")

print("Saved:", f"{ARTIFACT_DIR}/m5_price_artifacts.pkl")
print("Items with grids:", sum(len(v)>=2 for v in price_grid.values()))

Saved: artifacts_m5/m5_price_artifacts.pkl
Items with grids: 1437


In [11]:
from google.colab import files
files.download('/content/artifacts_m5/m5_price_artifacts.pkl')

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>