<a href="https://colab.research.google.com/github/Deepikadhinakaran/AI_price_optima/blob/main/fast_py.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [29]:
import os
import io
import numpy as np
import pandas as pd
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
import joblib
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("pricing_api")

In [30]:
DATA_CSV = os.getenv("PRICING_BASELINE_CSV", "dynamic_pricing.csv")
MODEL_PATH = os.getenv("PRICING_MODEL_PATH", "gradient_boosting_model.joblib")
PORT = int(os.getenv("PORT", "8000"))
print(f"Loading data from {DATA_CSV}")
print(f"Loading model from {MODEL_PATH}")

Loading data from dynamic_pricing.csv
Loading model from gradient_boosting_model.joblib


In [31]:
app = FastAPI(title="Dynamic Pricing API", version="1.0.0")
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # lock down in prod
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

In [32]:
try:
    df_base_all = pd.read_csv(DATA_CSV)
    logger.info("Loaded baseline CSV rows=%d", len(df_base_all))
except Exception as e:
    raise RuntimeError(f"Could not load {DATA_CSV}: {e}")

try:
    best_pipe = joblib.load(MODEL_PATH)
    logger.info("Loaded model from %s", MODEL_PATH)
except Exception as e:
    raise RuntimeError(f"Could not load model {MODEL_PATH}: {e}")

In [33]:
def _engineer(df: pd.DataFrame) -> pd.DataFrame:
    """Perform same feature engineering as notebook. Works in-place on a copy."""
    df = df.copy()

    # Ratios / supply
    if "Number_of_Riders" in df.columns and "Number_of_Drivers" in df.columns:
        df["Rider_Driver_Ratio"] = df["Number_of_Riders"] / df["Number_of_Drivers"].clip(lower=1)
        df["Driver_to_Rider_Ratio"] = df["Number_of_Drivers"] / df["Number_of_Riders"].clip(lower=1)
        # avoid division by zero; if Driver_to_Rider_Ratio is zero (shouldn't normally), this becomes inf -> handled later
        df["Supply_Tightness"] = 1.0 / df["Driver_to_Rider_Ratio"].replace(0, np.nan)

    # Loyalty score (safe mapping)
    if "Customer_Loyalty_Status" in df.columns:
        _loy_map = {"Regular": 0, "Silver": 1, "Gold": 2}
        df["Loyalty_Score"] = (
            df["Customer_Loyalty_Status"].astype(str).map(_loy_map).fillna(0).astype(int)
        )

    # Peak time flag
    if "Time_of_Booking" in df.columns:
        df["Peak"] = df["Time_of_Booking"].astype(str).isin(["Morning", "Evening"]).astype(int)

    # Cost granularity
    if "Historical_Cost_of_Ride" in df.columns and "Expected_Ride_Duration" in df.columns:
        df["Cost_per_Min"] = df["Historical_Cost_of_Ride"] / (df["Expected_Ride_Duration"].clip(lower=1))
# Vehicle factor (safe mapping)
    if "Vehicle_Type" in df.columns:
        _veh_map = {"Economy": 1.0, "Premium": 1.25}
        df["Vehicle_Factor"] = df["Vehicle_Type"].astype(str).map(_veh_map).fillna(1.1).astype(float)

    # Inventory health
    if {"Number_of_Drivers", "Number_of_Riders", "Supply_Tightness"}.issubset(df.columns):
        df["Inventory_Health_Index"] = (
            0.6 * (df["Number_of_Drivers"] / df["Number_of_Riders"].clip(lower=1)).clip(upper=2.0)
            + 0.4 * (1.0 - df["Supply_Tightness"].clip(upper=2.0) / 2.0)
        )

    # Baseline price
    def _baseline_price_row(r):
        # guard when cost is missing
        try:
            cost = float(r.get("Historical_Cost_of_Ride", 0.0))
        except Exception:
            cost = 0.0
        v_adj = {"Economy": 1.10, "Premium": 1.22}
        t_adj = {"Morning": 1.02, "Afternoon": 1.00, "Evening": 1.04, "Night": 1.01}
        l_adj = {"Urban": 1.03, "Suburban": 1.02, "Rural": 0.98}
        base = max(cost * 1.15, 0.0)
        base *= v_adj.get(str(r.get("Vehicle_Type", "Economy")), 1.12)
        base *= t_adj.get(str(r.get("Time_of_Booking", "Afternoon")), 1.00)
        base *= l_adj.get(str(r.get("Location_Category", "Urban")), 1.00)
        # ensure baseline isn't absurdly small if cost==0
        return max(base, cost * 1.12, 0.0)

    if "Historical_Cost_of_Ride" in df.columns:
        df["baseline_price"] = df.apply(_baseline_price_row, axis=1).round(2)
    else:
        # If cost missing, create baseline_price=0 so downstream code knows it's missing
        if "baseline_price" not in df.columns:
            df["baseline_price"] = 0.0
          # Competitor price (fallback)
    if "baseline_price" in df.columns:
        if "competitor_price" not in df.columns:
            np.random.seed(42)
            df["competitor_price"] = (
                df["baseline_price"] * np.random.uniform(0.94, 1.02, size=len(df))
            ).round(2)

    # Placeholder p_complete heuristic if not present
    if "p_complete" not in df.columns:
        def estimate_p_complete(row, price):
            # safe fallbacks
            try:
                cost = float(row.get("Historical_Cost_of_Ride", 0.0))
            except Exception:
                cost = 0.0
            rel = (price / max(cost, 1e-6)) - 1.0
            ratio = float(row.get("Driver_to_Rider_Ratio", 1.0) or 1.0)
            loy = float(row.get("Loyalty_Score", 0) or 0)
            peak = float(row.get("Peak", 0) or 0)
            veh = float(row.get("Vehicle_Factor", 1.0) or 1.0)
            x = (-0.3 + 0.9 * ratio + 0.12 * loy - 0.08 * peak - 0.45 * rel - 0.02 * (veh - 1.0))
            x = np.clip(x, -40, 40)
            p = 1.0 / (1.0 + np.exp(-x))
            return float(np.clip(p, 0.02, 0.98))
        df["p_complete"] = df.apply(lambda r: estimate_p_complete(r, r.get("baseline_price", 0.0)), axis=1)

    # Ensure a 'price' column exists (baseline by default) — this fixes the missing 'price' error
    if "price" not in df.columns:
        df["price"] = df["baseline_price"]

    return df

df_base_all = _engineer(df_base_all)


In [34]:
# ===== Policy + optimizer (unchanged logic but with safe guards) =====
STABILITY_PCT = 0.15
MIN_GM_PCT = 0.12
COMP_CAP   = {"Economy": 1.05, "Premium": 1.08}
COMP_FLOOR = {"Economy": 0.90, "Premium": 0.88}
TIME_NUDGE = {"Morning": +0.03, "Afternoon": 0.00, "Evening": +0.04, "Night": +0.01}

def gm_pct(price, cost):
    try:
        price = float(price); cost = float(cost)
    except Exception:
        return 0.0
    if price <= 0: return 0.0
    return (price - cost) / price

def inv_nudge(ratio):
    try:
        ratio = float(ratio)
    except Exception:
        ratio = 1.0
    if ratio < 0.8:  return +0.03
    if ratio > 1.2:  return -0.03
    return 0.0

def row_price_bounds(row):
    base = float(row.get("baseline_price", 0.0))
    cost = float(row.get("Historical_Cost_of_Ride", 0.0) or 0.0)
    veh  = str(row.get("Vehicle_Type", "Economy"))
    comp = float(row.get("competitor_price", base) or base)

    lo, hi = base*(1-STABILITY_PCT), base*(1+STABILITY_PCT)

    base_gm = gm_pct(base, cost)
    min_gm  = max(MIN_GM_PCT, base_gm)
    lo_gm   = cost / max(1 - min_gm, 1e-9)

    cap = COMP_CAP.get(veh, 1.06)
    floor = COMP_FLOOR.get(veh, 0.90)
    lo_cmp, hi_cmp = comp*floor, comp*cap

    lower = max(lo, lo_gm, lo_cmp)
    upper = min(hi, hi_cmp)
    if upper < lower:
        # collapse to upper (keeps feasible)
        lower = upper
    return lower, upper


In [35]:
# ML-based p_complete for arbitrary price
CAT_FEATS = ["Time_of_Booking","Location_Category","Vehicle_Type","Customer_Loyalty_Status"]
NUM_FEATS = ["price","Expected_Ride_Duration","Historical_Cost_of_Ride","Number_of_Riders","Number_of_Drivers",
             "Rider_Driver_Ratio","Driver_to_Rider_Ratio","Supply_Tightness",
             "Cost_per_Min","Inventory_Health_Index","competitor_price","baseline_price"]
FEATURES = CAT_FEATS + NUM_FEATS

def _safe_get_from_row(row, key):
    """Return a value from Series/dict-like row safely."""
    if isinstance(row, dict):
        return row.get(key, None)
    try:
        # row might be a pandas Series
        if key in row.index:
            return row.loc[key]
    except Exception:
        pass
    # fallback
    return None

def predict_p_for_price(row_X, price_value: float) -> float:
    """
    row_X can be a pandas Series, dict, or any mapping-like.
    This builds a single-row DataFrame with FEATURES order,
    stringifies categoricals and ensures numeric columns are numeric.
    """
    data = {}
    # Populate FEATURES
    for c in FEATURES:
        if c == "price":
            # enforce numeric price_value
            try:
                data["price"] = float(price_value)
            except Exception:
                data["price"] = np.nan
            continue

        raw = _safe_get_from_row(row_X, c)
        # categorical -> string (avoid NaN floats in cat columns)
        if c in CAT_FEATS:
            if pd.isna(raw) or raw is None:
                data[c] = "missing"
            else:
                data[c] = str(raw)
        else:
            # numeric
            try:
                # convert empty strings to NaN
                if raw is None or (isinstance(raw, str) and raw.strip() == ""):
                    data[c] = np.nan
                else:
                    data[c] = float(raw)
            except Exception:
                data[c] = np.nan

    df_row = pd.DataFrame([data], columns=FEATURES)
    # Predict with model — convert exceptions into a controlled error
    try:
        p_arr = best_pipe.predict(df_row)
        p = float(p_arr[0])
    except Exception as e:
        # Bubble up a helpful error — caller should handle / return HTTPException
        raise RuntimeError(f"Model prediction failed: {e}")
    return float(np.clip(p, 0.0, 1.0))

def choose_price_policy(row_full, n_grid=15):
    # row_full: Series or dict
    base = float(_safe_get_from_row(row_full, "baseline_price") or 0.0)
    cost = float(_safe_get_from_row(row_full, "Historical_Cost_of_Ride") or 0.0)

    lo, hi = row_price_bounds(row_full)

    t_n = TIME_NUDGE.get(str(_safe_get_from_row(row_full, "Time_of_Booking") or "Afternoon"), 0.0)
    i_n = inv_nudge(float(_safe_get_from_row(row_full, "Driver_to_Rider_Ratio") or 1.0))
    center = np.clip(base * (1 + t_n + i_n), lo, hi)

    grid_left  = np.linspace(lo, center, max(2, n_grid//2), endpoint=False)
    grid_right = np.linspace(center, hi, max(2, n_grid - len(grid_left)), endpoint=True)
    grid = np.unique(np.concatenate([grid_left, grid_right]))

    # predict baseline completion prob — wrap possible predict exceptions
    try:
        p_base = predict_p_for_price(row_full, base)
    except Exception as e:
        # If the model can't predict, fallback to p_complete present in row or heuristic 0.5
        fallback = _safe_get_from_row(row_full, "p_complete")
        p_base = float(fallback) if (fallback is not None and not pd.isna(fallback)) else 0.5

    best_p, best_pc, best_rev = base, p_base, base * p_base
    for p in grid:
        if gm_pct(p, cost) < MIN_GM_PCT:
            continue
        try:
            pc = predict_p_for_price(row_full, p)
        except Exception:
            # skip grid point if model fails for this price
            continue
        if pc + 1e-9 < p_base:
            continue
        rev = p * pc
        if rev > best_rev:
            best_p, best_pc, best_rev = p, pc, rev
    return float(best_p), float(best_pc), float(lo), float(hi), float(p_base)

def compute_kpis(df_base: pd.DataFrame, df_scn: pd.DataFrame) -> dict:
    intents_col="Number_of_Riders"
    price_col="price"
    pcomplete_col="p_complete"
    cost_col="Historical_Cost_of_Ride"

    # safe existence / type casting
    for col in [intents_col, price_col, pcomplete_col, cost_col]:
        if col not in df_base.columns or col not in df_scn.columns:
            raise ValueError(f"KPI calculation requires column '{col}' in both baseline and scenario dataframes.")

    intents_b = df_base[intents_col].astype(float).clip(lower=1)
    intents_s = df_scn[intents_col].astype(float).clip(lower=1)

    comp_b = intents_b * df_base[pcomplete_col].astype(float)
    comp_s = intents_s * df_scn[pcomplete_col].astype(float)

    rev_b = (df_base[price_col].astype(float) * comp_b).sum()
    rev_s = (df_scn[price_col].astype(float)  * comp_s).sum()

    cost_b = (df_base[cost_col].astype(float) * comp_b).sum()
    cost_s = (df_scn[cost_col].astype(float)  * comp_s).sum()

    revenue_lift_pct = (rev_s - rev_b) / max(rev_b, 1e-9) * 100.0
    gm_b = (rev_b - cost_b) / max(rev_b, 1e-9) * 100.0
    gm_s = (rev_s - cost_s) / max(rev_s, 1e-9) * 100.0

    conv_b = (comp_b.sum() / intents_b.sum()) * 100.0
    conv_s = (comp_s.sum() / intents_s.sum()) * 100.0

    price_change_rate = (df_base[price_col].astype(float) != df_scn[price_col].astype(float)).mean() * 100.0

    return {
        "Revenue (₹) baseline": round(rev_b, 2),
        "Revenue (₹) scenario": round(rev_s, 2),
        "Revenue Lift (%)": round(revenue_lift_pct, 2),
        "Gross Margin (baseline %)": round(gm_b, 2),
        "Gross Margin (scenario %)": round(gm_s, 2),
        "Conversion Rate (baseline %)": round(conv_b, 2),
        "Conversion Rate (scenario %)": round(conv_s, 2),
        "Cancellation Rate (baseline %)": round(100.0 - conv_b, 2),
        "Cancellation Rate (scenario %)": round(100.0 - conv_s, 2),
        "Price Change Rate (%)": round(price_change_rate, 2),
    }

In [36]:
# ====== Schemas ======
class RecommendRequest(BaseModel):
    record: dict = Field(..., description="One record (JSON).")

class RecommendBatchResponse(BaseModel):
    kpis: dict
    n_rows: int

# ====== Endpoints ======
@app.get("/health")
def health():
    return {"ok": True, "rows_loaded": int(len(df_base_all))}

@app.post("/recommend")
def recommend(req: RecommendRequest):
    base_cols = df_base_all.columns
    # Fill row using baseline schema to avoid missing columns
    row = {c: None for c in base_cols}
    row.update(req.record or {})

    df = pd.DataFrame([row])
    try:
        df = _engineer(df)
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"Feature engineering failed: {e}")

    # If Historical_Cost_of_Ride is missing, we cannot produce a reliable price — return helpful error
    if pd.isna(df.loc[0, "Historical_Cost_of_Ride"]) or df.loc[0, "Historical_Cost_of_Ride"] == 0:
        raise HTTPException(status_code=400, detail="Historical_Cost_of_Ride is missing or zero — cannot compute recommendation reliably.")

    try:
        p_star, pc_star, lo, hi, p_base = choose_price_policy(df.iloc[0])
    except RuntimeError as e:
        raise HTTPException(status_code=500, detail=str(e))
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Pricing policy failed: {e}")

    cost = float(df.loc[0, "Historical_Cost_of_Ride"] or 0.0)
    gm = gm_pct(p_star, cost)

    return {
        "price_recommended": round(p_star, 2),
        "p_complete_recommended": round(pc_star, 4),
        "bounds": {"low": round(lo,2), "high": round(hi,2)},
        "p_complete_baseline": round(p_base, 4),
        "gm_pct": round(gm*100, 2),
    }

In [37]:
@app.post("/recommend_batch")
def recommend_batch(file: UploadFile = File(...)):
    try:
        content = file.file.read()
        df = pd.read_csv(io.BytesIO(content))
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"Invalid CSV: {e}")

    try:
        df = _engineer(df)
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"Feature engineering failed: {e}")

    # baseline p_complete using model where possible
    df_base = df.copy()
    df_base["price"] = df_base["baseline_price"]
    p_list = []
    for i in df_base.index:
        try:
            p = predict_p_for_price(df_base.loc[i], df_base.loc[i, "price"])
        except Exception:
            p = df_base.loc[i].get("p_complete", 0.5)
        p_list.append(p)
    df_base["p_complete"] = np.clip(p_list, 0.0, 1.0)

    rows_out, prices, pcomps = [], [], []
    for i, r in df.iterrows():
        # require cost
        if pd.isna(r.get("Historical_Cost_of_Ride")) or (r.get("Historical_Cost_of_Ride") in [None, 0]):
            # produce a row-level note but do not crash
            rows_out.append({
                "index": int(i),
                "error": "Historical_Cost_of_Ride missing or zero — skipped",
            })
            prices.append(r.get("baseline_price", 0.0))
            pcomps.append(r.get("p_complete", 0.5))
            continue

        try:
            p_star, pc_star, lo, hi, p_base = choose_price_policy(r)
        except Exception as e:
            rows_out.append({
                "index": int(i),
                "error": f"choose_price_policy failed: {e}"
            })
            prices.append(r.get("baseline_price", 0.0))
            pcomps.append(r.get("p_complete", 0.5))
            continue

        rows_out.append({
            "index": int(i),
            "price_recommended": round(p_star, 2),
            "p_complete_recommended": round(pc_star, 4),
            "bound_low": round(lo,2),
            "bound_high": round(hi,2),
            "p_complete_baseline": round(p_base, 4),
            "gm_pct": round(gm_pct(p_star, float(r["Historical_Cost_of_Ride"])) * 100, 2)
        })
        prices.append(p_star)
        pcomps.append(pc_star)

    df_scn = df.copy()
    df_scn["price"] = np.round(prices, 2)
    df_scn["p_complete"] = np.clip(pcomps, 0.0, 1.0)

    try:
        kpis = compute_kpis(df_base, df_scn)
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"KPI computation failed: {e}")
    return {"kpis": kpis, "n_rows": int(len(df)), "rows": rows_out}


In [38]:
@app.post("/kpis")
def kpis(file_base: UploadFile = File(...), file_scn: UploadFile = File(...)):
    try:
        df_b = pd.read_csv(io.BytesIO(file_base.file.read()))
        df_s = pd.read_csv(io.BytesIO(file_scn.file.read()))
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"Invalid CSV(s): {e}")

    try:
        res = compute_kpis(df_b, df_s)
    except Exception as e:
        raise HTTPException(status_code=400, detail=f"KPI computation failed: {e}")
    return res