In [1]:
import arviz as az
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import preliz as pz
import pymc as pm
import seaborn as sns

from pydantic import BaseModel, Field, model_validator, field_validator
from numpy.typing import NDArray

plt.style.use("bmh")
plt.rcParams["figure.figsize"] = [12, 7]
plt.rcParams["figure.dpi"] = 100
plt.rcParams["figure.facecolor"] = "white"

%load_ext autoreload
%autoreload 2
%config InlineBackend.figure_format = "retina"

In [2]:
seed: int = sum(map(ord, "multilevel_elasticities"))
rng: np.random.Generator = np.random.default_rng(seed=seed)

In [3]:
class Item(BaseModel):
    id: int = Field(..., ge=0)
    prices: NDArray[np.float_]
    sales: NDArray[np.float_]

    class Config:
        arbitrary_types_allowed = True

    @field_validator("prices", "sales")
    def validate_gt_0(cls, value):
        if (value <= 0).any():
            raise ValueError("prices and sales must be positive")
        return value

    @field_validator("prices", "sales")
    def validate_size_gt_0(cls, value):
        if value.size == 0:
            raise ValueError("prices and sales must have at least one element")
        return value

    @model_validator(mode="before")
    def validate_sizes(cls, values) -> "Item":
        if values["prices"].size != values["sales"].size:
            raise ValueError("prices and sales must have the same size")
        return values

    def to_dataframe(self) -> pd.DataFrame:
        return pd.DataFrame(
            data={"item_id": self.id, "price": self.prices, "sales": self.sales}
        )


class Store(BaseModel):
    id: int = Field(..., ge=0)
    items: list[Item] = Field(..., min_items=1)

    @field_validator("items")
    def validate_item_ids(cls, value):
        if len({item.id for item in value}) != len(value):
            raise ValueError("items must have unique ids")
        return value

    def to_dataframe(self) -> pd.DataFrame:
        df = pd.concat([item.to_dataframe() for item in self.items], axis=0)
        df["store_id"] = self.id
        return df.reset_index(drop=True)


class Region(BaseModel):
    id: int = Field(..., ge=0)
    stores: list[Store] = Field(..., min_items=1)
    median_income: float = Field(..., gt=0)  # Z_j

    @field_validator("stores")
    def validate_store_ids(cls, value):
        if len({store.id for store in value}) != len(value):
            raise ValueError("stores must have unique ids")
        return value

    def to_dataframe(self) -> pd.DataFrame:
        df = pd.concat([store.to_dataframe() for store in self.stores], axis=0)
        df["region_id"] = self.id
        return df.reset_index(drop=True)


class Market(BaseModel):
    regions: list[Region] = Field(..., min_items=1)

    @field_validator("regions")
    def validate_region_ids(cls, value):
        if len({region.id for region in value}) != len(value):
            raise ValueError("regions must have unique ids")
        return value

    def to_dataframe(self) -> pd.DataFrame:
        df = pd.concat([region.to_dataframe() for region in self.regions], axis=0)
        return df.reset_index(drop=True)


In [11]:
n_regions = 5  # J

n_stores_per_region_dist = pm.NegativeBinomial.dist(mu=20, alpha=2)
n_stores_per_region_draws = pm.draw(n_stores_per_region_dist, draws=n_regions)

median_income_per_region_dist = pm.Gamma.dist(mu=10, sigma=1)
median_income_per_region_draws = pm.draw(median_income_per_region_dist, draws=n_regions)

In [14]:
time_range = 20

epsilon = 0.1

a_alpha = 0.5
b_alpha = 0.1
sigma_gamma_0j = 0.02

a_beta = 0.1
b_beta = 0.8
sigma_gamma_1j = 0.03

price_mu = 1.5
price_sigma = 0.25

regions: list[Region] = []

for j in range(n_regions):
    n_stores_per_region = n_stores_per_region_draws[j]
    median_income_per_region = median_income_per_region_draws[j]

    stores: list[Store] = []

    for i in range(n_stores_per_region):
        alpha_j_dist = pm.Normal.dist(
            mu=a_alpha + b_alpha * median_income_per_region, sigma=sigma_gamma_0j
        )
        alpha_j_samples = pm.draw(alpha_j_dist, draws=time_range)

        beta_j_dist = pm.Normal.dist(
            mu=a_beta + b_beta * median_income_per_region, sigma=sigma_gamma_1j
        )
        beta_j_samples = pm.draw(beta_j_dist, draws=time_range)

        prices_dist = pm.Gamma.dist(mu=price_mu, sigma=price_sigma)
        prices_samples = pm.draw(prices_dist, draws=time_range)

        log_sales_dist = pm.Normal.dist(
            mu=alpha_j_samples + beta_j_samples * np.log(prices_samples), sigma=epsilon
        )

        log_sales_samples = pm.draw(log_sales_dist, draws=1)

        sales_samples = np.exp(log_sales_samples)
        sales_samples

        store = Store(
            id=i, items=[Item(id=0, prices=prices_samples, sales=sales_samples)]
        )

        stores.append(store)

    region = Region(id=j, stores=stores, median_income=median_income_per_region)

    regions.append(region)

market = Market(regions=regions)

In [15]:
market.to_dataframe()


Unnamed: 0,item_id,price,sales,store_id,region_id
0,0,1.445378,139.010092,0,0
1,0,1.752577,844.154054,0,0
2,0,1.512802,201.037830,0,0
3,0,1.378522,116.006151,0,0
4,0,1.665698,590.941897,0,0
...,...,...,...,...,...
795,0,1.705799,148.827828,1,4
796,0,1.283124,24.414466,1,4
797,0,1.580569,85.117821,1,4
798,0,1.474058,57.302912,1,4
