
# Price Prediction & Optimization — **shoes**

Эта тетрадка реализует полный конвейер **прогноза спроса** и **рекомендаций цен** для интернет‑магазина обуви.
Проектные изменения учтены: **схема и таблицы — `shoes`**.

**Что внутри**
- Загрузка данных из БД (`SHOES_DB_URL`): `shoes.products`, `shoes.price_history`, `shoes.sales_daily`, `shoes.costs` (опционально).
- Фичи: лаги продаж, окна, скидки, разница с конкурентами, сезонность, сток.
- Модель: `HistGradientBoostingRegressor` (scikit‑learn).
- GroupKFold по товарам.
- Подбор цен сеткой кандидатов + ограничения.
- Запись рекомендаций в `shoes.price_recommendations`.


In [None]:

# === Конфигурация окружения ===
import os
from pathlib import Path

SHOES_DB_URL = os.getenv("SHOES_DB_URL", "postgresql+psycopg2://postgres:postgres@localhost:5432/shoes")

ARTIFACT_DIR = Path(os.getenv("ARTIFACT_DIR", "./artifacts"))
ARTIFACT_DIR.mkdir(parents=True, exist_ok=True)
MODEL_PATH = ARTIFACT_DIR / os.getenv("MODEL_FILENAME", "price_model_shoes.joblib")

RANDOM_STATE = int(os.getenv("RANDOM_STATE", "42"))
CV_SPLITS = int(os.getenv("CV_SPLITS", "5"))
MAX_TRAIN_DATE = os.getenv("MAX_TRAIN_DATE")
MIN_HISTORY_DAYS = int(os.getenv("MIN_HISTORY_DAYS", "90"))

DEFAULT_PRICE_STEP = float(os.getenv("DEFAULT_PRICE_STEP", "100.0"))
DEFAULT_PRICE_MIN_MULT = float(os.getenv("DEFAULT_PRICE_MIN_MULT", "0.7"))
DEFAULT_PRICE_MAX_MULT = float(os.getenv("DEFAULT_PRICE_MAX_MULT", "1.3"))
DEFAULT_OPTIMIZE_FOR = os.getenv("DEFAULT_OPTIMIZE_FOR", "revenue")  # 'revenue'/'profit'
DEFAULT_MIN_MARGIN_PCT = float(os.getenv("DEFAULT_MIN_MARGIN_PCT", "0.0"))

print("DB URL:", SHOES_DB_URL)
print("MODEL_PATH:", MODEL_PATH)


In [None]:

# === Импорты ===
import warnings
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd
from datetime import datetime, timedelta

from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine

from sklearn.model_selection import GroupKFold
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_percentage_error
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder
from sklearn.pipeline import Pipeline
from sklearn.experimental import enable_hist_gradient_boosting  # noqa: F401
from sklearn.ensemble import HistGradientBoostingRegressor

import joblib

pd.set_option("display.max_columns", 120)
pd.set_option("display.width", 160)


In [None]:

# === Подключение к БД ===
def get_engine(url: str = None) -> Engine:
    url = url or SHOES_DB_URL
    engine = create_engine(url, pool_pre_ping=True, future=True)
    return engine

def run_sql(engine: Engine, sql: str, params: dict | None = None):
    with engine.begin() as conn:
        return conn.execute(text(sql), params or {})


In [None]:

# === Загрузка данных из схемы shoes ===
LOAD_SQL = '''
SELECT
    ph.product_id::bigint                    AS product_id,
    ph.dt::date                              AS dt,
    ph.price::numeric                        AS base_price,
    ph.promo_price::numeric                  AS promo_price,
    COALESCE(ph.promo_price, ph.price)       AS effective_price,
    ph.competitor_min_price::numeric         AS competitor_min_price,
    ph.stock::int                            AS stock,
    sd.units_sold::int                       AS units_sold,
    sd.revenue::numeric                      AS revenue,
    p.brand_id::bigint                       AS brand_id,
    p.category_id::bigint                    AS category_id,
    p.gender::text                           AS gender,
    p.season::text                           AS season,
    p.material_upper::text                   AS material_upper,
    p.material_sole::text                    AS material_sole,
    p.color::text                            AS color,
    c.cost::numeric                          AS unit_cost
FROM shoes.price_history ph
JOIN shoes.sales_daily sd
  ON sd.product_id = ph.product_id AND sd.date = ph.dt
JOIN shoes.products p
  ON p.id = ph.product_id
LEFT JOIN shoes.costs c
  ON c.product_id = ph.product_id
WHERE 1=1
  AND (:max_train_date::date IS NULL OR ph.dt <= :max_train_date::date)
ORDER BY ph.product_id, ph.dt
'''

def load_data(engine: Engine, max_train_date: str | None = MAX_TRAIN_DATE) -> pd.DataFrame:
    df = pd.read_sql(text(LOAD_SQL), engine, params={"max_train_date": max_train_date})
    df["dt"] = pd.to_datetime(df["dt"]).dt.date
    if "unit_cost" not in df.columns:
        df["unit_cost"] = np.nan
    return df

engine = get_engine()
raw_df = load_data(engine)
print("Загружено строк:", len(raw_df))
raw_df.head(3)


In [None]:

# === Генерация признаков ===
def add_group_lags(df: pd.DataFrame, by: str, on: str, cols: list, lags=(1, 7, 14)) -> pd.DataFrame:
    df = df.sort_values([by, on]).copy()
    for col in cols:
        for L in lags:
            df[f"{col}_lag{L}"] = df.groupby(by)[col].shift(L)
    return df

def add_group_rolls(df: pd.DataFrame, by: str, on: str, cols: list, windows=(7, 14, 28)) -> pd.DataFrame:
    df = df.sort_values([by, on]).copy()
    for col in cols:
        for W in windows:
            df[f"{col}_roll{W}"] = (
                df.groupby(by)[col]
                  .rolling(W, min_periods=max(2, W//2))
                  .mean()
                  .reset_index(level=0, drop=True)
            )
    return df

def build_features(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df["dt"] = pd.to_datetime(df["dt"])
    df["discount"] = (df["base_price"] - df["effective_price"]).fillna(0.0)
    df["discount_pct"] = (df["discount"] / df["base_price"]).replace([np.inf, -np.inf], 0.0).fillna(0.0)
    df["comp_gap"] = (df["effective_price"] - df["competitor_min_price"]).astype(float)
    df["dow"] = df["dt"].dt.dayofweek
    df["dom"] = df["dt"].dt.day
    df["month"] = df["dt"].dt.month
    df["weekofyear"] = df["dt"].dt.isocalendar().week.astype(int)
    df = add_group_lags(df, "product_id", "dt", cols=["units_sold", "effective_price"], lags=(1, 7, 14))
    df = add_group_rolls(df, "product_id", "dt", cols=["units_sold"], windows=(7, 14, 28))
    df["in_stock"] = (df["stock"] > 0).astype(int)
    df = df.groupby("product_id").apply(lambda g: g.iloc[28:]).reset_index(drop=True)
    return df

feat_df = build_features(raw_df)
print("После генерации фичей:", feat_df.shape)
feat_df.head(3)


In [None]:

# === Обучение и CV ===
TARGET = "units_sold"
GROUP_COL = "product_id"
DATE_COL = "dt"

categorical_cols = ["gender", "season", "material_upper", "material_sole", "color"]
numeric_cols = [c for c in feat_df.columns if c not in [TARGET, GROUP_COL, DATE_COL] + categorical_cols]

preprocess = ColumnTransformer(
    transformers=[
        ("cat", OneHotEncoder(handle_unknown="ignore", sparse_output=False), categorical_cols),
        ("num", "passthrough", numeric_cols),
    ],
    remainder="drop",
)

reg = HistGradientBoostingRegressor(
    loss="squared_error",
    max_depth=None,
    max_iter=400,
    learning_rate=0.05,
    l2_regularization=0.0,
    random_state=RANDOM_STATE,
)

pipe = Pipeline(steps=[("prep", preprocess), ("model", reg)])

gkf = GroupKFold(n_splits=CV_SPLITS)

X = feat_df.drop(columns=[TARGET])
y = feat_df[TARGET].astype(float)
groups = feat_df[GROUP_COL]

rmse_scores, mape_scores, r2_scores = [], [], []

for fold, (tr_idx, va_idx) in enumerate(gkf.split(X, y, groups), start=1):
    Xtr, Xva = X.iloc[tr_idx], X.iloc[va_idx]
    ytr, yva = y.iloc[tr_idx], y.iloc[va_idx]
    pipe.fit(Xtr, ytr)
    p = pipe.predict(Xva)
    rmse = mean_squared_error(yva, p, squared=False)
    yva_safe = yva.mask(yva == 0, np.nan)
    mape = mean_absolute_percentage_error(yva_safe.dropna(), pd.Series(p, index=yva_safe.index).dropna())
    r2 = r2_score(yva, p)
    rmse_scores.append(rmse); mape_scores.append(mape); r2_scores.append(r2)
    print(f"Fold {fold}: RMSE={rmse:.3f}, MAPE={mape:.3f}, R2={r2:.3f}")

print("\nCV summary:")
print("RMSE:", np.round(np.mean(rmse_scores), 3), "+/-", np.round(np.std(rmse_scores), 3))
print("MAPE:", np.round(np.mean(mape_scores), 3), "+/-", np.round(np.std(mape_scores), 3))
print("R2  :", np.round(np.mean(r2_scores), 3), "+/-", np.round(np.std(r2_scores), 3))


In [None]:

# === Обучение на всех данных и сохранение модели ===
pipe.fit(X, y)
joblib.dump(pipe, MODEL_PATH)
print("Модель сохранена в:", MODEL_PATH)


In [None]:

# === Инференс и подбор цен ===
def load_model(path=MODEL_PATH):
    return joblib.load(path)

def latest_snapshot_for_product(fe_df: pd.DataFrame, product_id: int) -> pd.Series | None:
    g = fe_df[fe_df["product_id"] == product_id]
    if g.empty:
        return None
    return g.sort_values("dt").iloc[-1]

def candidate_price_grid(current_price: float,
                         step: float = DEFAULT_PRICE_STEP,
                         min_mult: float = DEFAULT_PRICE_MIN_MULT,
                         max_mult: float = DEFAULT_PRICE_MAX_MULT,
                         pmin: float | None = None,
                         pmax: float | None = None) -> np.ndarray:
    if current_price is None or np.isnan(current_price) or current_price <= 0:
        current_price = 1000.0
    lo = pmin if pmin is not None else current_price * min_mult
    hi = pmax if pmax is not None else current_price * max_mult
    if hi <= lo:
        hi = lo + step * 5
    start = max(step, (lo // step) * step)
    grid = np.arange(start, hi + step, step, dtype=float)
    return np.unique(grid)

def build_candidate_frame(latest_row: pd.Series,
                          prices: np.ndarray) -> pd.DataFrame:
    cols = latest_row.index.tolist()
    dfc = pd.DataFrame([latest_row.values] * len(prices), columns=cols)
    dfc["effective_price"] = prices
    if "base_price" in dfc.columns:
        dfc["discount"] = (dfc["base_price"] - dfc["effective_price"]).fillna(0.0)
        dfc["discount_pct"] = (dfc["discount"] / dfc["base_price"]).replace([np.inf, -np.inf], 0.0).fillna(0.0)
    if "competitor_min_price" in dfc.columns:
        dfc["comp_gap"] = (dfc["effective_price"] - dfc["competitor_min_price"]).astype(float)
    return dfc

def choose_best_price(model,
                      latest_row: pd.Series,
                      prices: np.ndarray,
                      optimize_for: str = DEFAULT_OPTIMIZE_FOR,
                      min_margin_pct: float = DEFAULT_MIN_MARGIN_PCT) -> dict:
    dfc = build_candidate_frame(latest_row, prices)
    Xc = dfc.drop(columns=["units_sold"], errors="ignore")
    y_pred = model.predict(Xc).clip(min=0.0)
    revenue = prices * y_pred
    unit_cost = latest_row.get("unit_cost", np.nan)
    cost_val = unit_cost if unit_cost == unit_cost else 0.0
    profit = (prices - cost_val) * y_pred
    if min_margin_pct and unit_cost == unit_cost:
        per_unit_margin_pct = np.where(prices > 0, (prices - unit_cost) / prices, -np.inf)
        mask = per_unit_margin_pct >= float(min_margin_pct)
    else:
        mask = np.ones_like(prices, dtype=bool)
    if not mask.any():
        mask = np.ones_like(prices, dtype=bool)
    idx = np.argmax(profit[mask]) if optimize_for == "profit" else np.argmax(revenue[mask])
    valid_indices = np.where(mask)[0]
    best_i = valid_indices[idx]
    return {
        "best_price": float(prices[best_i]),
        "expected_units": float(y_pred[best_i]),
        "expected_revenue": float(revenue[best_i]),
        "expected_profit": float(profit[best_i]),
        "grid": prices.tolist(),
        "y_pred": y_pred.tolist(),
        "revenue": revenue.tolist(),
        "profit": profit.tolist(),
    }


In [None]:

# === Массовые рекомендации и запись в БД ===
CREATE_RECS_SQL = '''
CREATE TABLE IF NOT EXISTS shoes.price_recommendations (
    product_id          bigint      NOT NULL,
    as_of_date          date        NOT NULL,
    recommended_price   numeric     NOT NULL,
    expected_units      numeric,
    expected_revenue    numeric,
    expected_profit     numeric,
    current_price       numeric,
    current_promo_price numeric,
    unit_cost           numeric,
    method              text        NOT NULL,
    created_at          timestamp   NOT NULL DEFAULT now()
);
'''

def recommend_prices_batch(engine: Engine,
                           fe_df: pd.DataFrame,
                           product_ids: list[int] | None = None,
                           price_step: float = DEFAULT_PRICE_STEP,
                           pmin: float | None = None,
                           pmax: float | None = None,
                           optimize_for: str = DEFAULT_OPTIMIZE_FOR,
                           min_margin_pct: float = DEFAULT_MIN_MARGIN_PCT,
                           persist: bool = True) -> pd.DataFrame:
    model = load_model(MODEL_PATH)
    if product_ids is None:
        product_ids = sorted(fe_df["product_id"].unique().tolist())

    run_sql(engine, CREATE_RECS_SQL)

    rows = []
    for pid in product_ids:
        snap = latest_snapshot_for_product(fe_df, pid)
        if snap is None:
            continue
        cur_price = snap.get("base_price", np.nan)
        prices = candidate_price_grid(
            current_price=cur_price,
            step=price_step,
            min_mult=DEFAULT_PRICE_MIN_MULT if pmin is None else 1.0,
            max_mult=DEFAULT_PRICE_MAX_MULT if pmax is None else 1.0,
            pmin=pmin, pmax=pmax
        )
        best = choose_best_price(
            model, snap, prices,
            optimize_for=optimize_for,
            min_margin_pct=min_margin_pct
        )
        rows.append({
            "product_id": int(pid),
            "as_of_date": pd.to_datetime(snap["dt"]).date(),
            "recommended_price": best["best_price"],
            "expected_units": best["expected_units"],
            "expected_revenue": best["expected_revenue"],
            "expected_profit": best["expected_profit"],
            "current_price": float(snap.get("base_price", np.nan)),
            "current_promo_price": float(snap.get("promo_price", np.nan)) if snap.get("promo_price", np.nan) == snap.get("promo_price", np.nan) else None,
            "unit_cost": float(snap.get("unit_cost", np.nan)) if snap.get("unit_cost", np.nan) == snap.get("unit_cost", np.nan) else None,
            "method": f"hgb_regressor/{optimize_for}",
        })

    recs_df = pd.DataFrame(rows)
    if persist and not recs_df.empty:
        recs_df.to_sql(
            "price_recommendations",
            con=engine,
            schema="shoes",
            if_exists="append",
            index=False
        )
    return recs_df

print("Функции рекомендаций готовы.")



## Пример запуска

1. Обновите `SHOES_DB_URL` при необходимости.
2. Запустите загрузку данных и генерацию признаков.
3. Обучите и сохраните модель.
4. Сформируйте рекомендации и (опционально) запишите их в БД.


In [None]:

# пример (раскомментируйте после обучения модели):
# engine = get_engine()
# recs = recommend_prices_batch(
#     engine,
#     fe_df=feat_df,
#     product_ids=None,
#     price_step=100.0,
#     pmin=None, pmax=None,
#     optimize_for="revenue",   # или "profit"
#     min_margin_pct=0.05,
#     persist=True
# )
# recs.head()
