In [6]:
import polars as pl
import numpy as np
import matplotlib.pyplot as plt
import os
# Install psutil if not available
try:
    import psutil
except ImportError:
    !pip install psutil
    import psutil

In [7]:
DATA_DIR = "./Data/Parquet/star_schema_daily"  # Data\Parquet\star_schema_daily
PATHS = {
    "dim_item": os.path.join(DATA_DIR, "dim_item.parquet"),
    "dim_store": os.path.join(DATA_DIR, "dim_store.parquet"),
    "dim_state": os.path.join(DATA_DIR, "dim_state.parquet"),
    "dim_calendar": os.path.join(DATA_DIR, "dim_calendar.parquet"),
    "fact_sales": os.path.join(DATA_DIR, "fact_sales.parquet"),
}

In [23]:
dim_item = pl.scan_parquet(PATHS["dim_item"])
dim_store = pl.scan_parquet(PATHS["dim_store"])
dim_state = pl.scan_parquet(PATHS["dim_state"])
dim_cal = pl.scan_parquet(PATHS["dim_calendar"]).with_columns(pl.col("date_id").cast(pl.Date))
fact = pl.scan_parquet(PATHS["fact_sales"]).with_columns(pl.col("date_id").cast(pl.Date))

In [None]:
# đảm bảo date_id là Date - di chuyển xuống sau join
# dim_cal = dim_cal.with_columns(pl.col("date_id").cast(pl.Date))
# fact = fact.with_columns(pl.col("date_id").cast(pl.Date))

# ----------- Thứ 2 (03/10/2025) -----------

In [24]:
# --- Join dim info vào fact để tiện xử lý ---
fact = (
    fact.join(dim_item.select(["item_id", "dept_id", "cat_id", "price"]).with_columns(
        pl.col("price").cast(pl.Float64)
    ), on="item_id", how="left")
    .join(dim_store.select(["store_id", "state_id", "type", "size"]), on="store_id", how="left")
    .join(dim_cal.select(["date_id", "weekday", "day_of_month", "month", "year", "week_id", "event_name", "event_type",
                          "snap_CA", "snap_TX", "snap_WI"]), on="date_id", how="left")
)

In [25]:
# Time features, lags, rolling
# đảm bảo date_id là Date
fact = fact.with_columns(pl.col("date_id").cast(pl.Date))

def create_time_features(df: pl.LazyFrame) -> pl.LazyFrame:
    # basic time features
    df = df.with_columns([
        pl.col("weekday").alias("weekday"),  # từ dim_calendar nếu có
        pl.col("month").alias("month"),
        pl.col("year").alias("year"),
        (pl.col("date_id").dt.week()).alias("week_of_year")
    ])
    # holiday flag (từ event_name/event_type)
    df = df.with_columns([
        (pl.col("event_name").is_not_null() & (pl.col("event_name") != "")).alias("is_holiday_event"),
        (pl.col("event_type").is_not_null() & (pl.col("event_type") != "")).alias("is_event"),
        pl.col("snap_CA").fill_null(False).alias("snap_CA"),
        pl.col("snap_TX").fill_null(False).alias("snap_TX"),
        pl.col("snap_WI").fill_null(False).alias("snap_WI")
    ])
    return df

fact = create_time_features(fact)
# Select only necessary columns to reduce memory
fact = fact.select([
    "item_id", "store_id", "date_id", "units_sold", "revenue",
    "dept_id", "cat_id", "price", "state_id", "type", "size",
    "weekday", "day_of_month", "month", "year", "week_id", "week_of_year",
    "event_name", "event_type", "is_holiday_event", "is_event",
    "snap_CA", "snap_TX", "snap_WI"
])

In [14]:
# Create unit_price for promotion detection
fact = fact.with_columns(
    (pl.when(pl.col("units_sold") > 0)
     .then(pl.col("revenue") / pl.col("units_sold"))
     .otherwise(pl.lit(None))
    ).alias("unit_price")
)

In [15]:
# Lag features: shift by 1,7,28 days grouped by (item_id, store_id)
LAGS = [1, 7, 28]
for lag in LAGS:
    fact = fact.with_columns([
        pl.col("units_sold").shift(lag).over(["item_id", "store_id"]).alias(f"units_lag_{lag}"),
        pl.col("revenue").shift(lag).over(["item_id", "store_id"]).alias(f"revenue_lag_{lag}"),
        pl.col("unit_price").shift(lag).over(["item_id", "store_id"]).alias(f"unit_price_lag_{lag}")
    ])

In [28]:
# Rolling means: 7, 28, 90 (grouped by item-store), forward-looking excluded (use past window ending at t-1)
ROLLS = [7, 28, 90]
for w in ROLLS:
    # polars rolling_mean expression with .over
    # ensure data sorted within groups by date_id for correct rolling
    fact = fact.sort(["item_id", "store_id", "date_id"])
    fact = fact.with_columns([
        pl.col("units_sold").rolling_mean(window_size=w).over(["item_id", "store_id"]).alias(f"units_roll_mean_{w}"),
        pl.col("revenue").rolling_mean(window_size=w).over(["item_id", "store_id"]).alias(f"revenue_roll_mean_{w}")
    ])

# moving average short example (exponential not native - can approximate or use pandas if needed)
# here use simple moving average is fine.

In [26]:
# Collect to DataFrame before rolling operations to avoid lazy issues and optimize memory
fact = fact.collect()

In [27]:
# Check memory usage after collect
import psutil
import os
process = psutil.Process(os.getpid())
print(f"Memory usage: {process.memory_info().rss / 1024 / 1024:.2f} MB")
print(f"DataFrame shape: {fact.shape}")

Memory usage: 2250.29 MB
DataFrame shape: (58327370, 24)


In [29]:
# Lưu dữ liệu đã xử lý
output_path = "./Data/Processed/fact_features.parquet"
os.makedirs(os.path.dirname(output_path), exist_ok=True)
fact.write_parquet(output_path)
print(f"Đã lưu dữ liệu vào {output_path}")
print(f"Kích thước file: {os.path.getsize(output_path) / (1024**3):.2f} GB")

Đã lưu dữ liệu vào ./Data/Processed/fact_features.parquet
Kích thước file: 0.31 GB


# ----------- Thứ 3 (04/10/2025) -----------

In [None]:
# Heuristic: promotion if unit_price < (dim_item.price * 0.98)  (2% threshold)
fact = fact.with_columns(
    (pl.col("unit_price") < (pl.col("price") * 0.98)).alias("is_promo_price")
)

In [None]:
fact = fact.with_columns(pl.col("is_promo_price").fill_null(False))

In [None]:
# Compute promotion summary by item/store
promo_summary = (
    fact.groupby(["item_id", "store_id"])
    .agg([
        pl.mean("units_sold").alias("avg_units"),
        pl.mean("revenue").alias("avg_revenue"),
        pl.sum("is_promo_price").alias("promo_days"),
        pl.count().alias("total_days"),
        (pl.col("promo_days") / pl.col("total_days")).alias("promo_ratio")
    ])
)

In [None]:
# Effect of promo: compare avg units on promo days vs non-promo days (global)
promo_effect = (
    fact.groupby("is_promo_price")
    .agg([
        pl.mean("units_sold").alias("mean_units"),
        pl.mean("revenue").alias("mean_revenue"),
        pl.count().alias("days")
    ])
).sort("is_promo_price")

In [None]:
# Also analyze event/holiday effect:
event_effect = (
    fact.groupby("is_holiday_event")
    .agg([
        pl.mean("units_sold").alias("mean_units"),
        pl.mean("revenue").alias("mean_revenue"),
        pl.count().alias("days")
    ])
).sort("is_holiday_event")

In [None]:
# Price elasticity rough calc per item: correlation between unit_price and units_sold per item (sample)
# We'll compute for items with enough variance
def price_units_corr(df):
    # return DataFrame item_id, corr
    # convert to pandas for correlation per group (simpler)
    pdf = df.select(["item_id", "unit_price", "units_sold"]).to_pandas()
    res = []
    for item, g in pdf.groupby("item_id"):
        if g["unit_price"].count() > 30 and g["unit_price"].std() > 0:
            corr = g["unit_price"].corr(g["units_sold"])
            res.append((item, corr))
    import pandas as pd
    return pd.DataFrame(res, columns=["item_id", "price_units_corr"])

# ----------- Thứ 4 (05/10/2025) -----------

In [None]:
def plot_time_series_total_units(df: pl.DataFrame, save_path=None):
    # aggregate by date
    ts = df.groupby("date_id").agg(pl.sum("units_sold").alias("total_units")).sort("date_id")
    pdf = ts.to_pandas()
    plt.figure(figsize=(14,5))
    plt.plot(pdf["date_id"], pdf["total_units"], label="total_units")
    # add rolling mean for visualization (30-day)
    pdf["rolling_30"] = pdf["total_units"].rolling(30, min_periods=1).mean()
    plt.plot(pdf["date_id"], pdf["rolling_30"], label="rolling_30")
    plt.title("Total Units Sold Over Time")
    plt.xlabel("Date")
    plt.ylabel("Units")
    plt.legend()
    plt.tight_layout()
    if save_path:
        plt.savefig(save_path)
    else:
        plt.show()
    plt.close()

In [None]:
def plot_category_seasonality(df: pl.DataFrame, cat_id, freq="M", save_path=None):
    # aggregate per month for a category
    sub = df.filter(pl.col("cat_id") == cat_id)
    agg = sub.with_columns(pl.col("date_id").dt.truncate("1mo").alias("month")).groupby("month").agg(pl.sum("units_sold").alias("units"))
    pdf = agg.sort("month").to_pandas()
    plt.figure(figsize=(12,4))
    plt.plot(pdf["month"], pdf["units"], label=f"cat_{cat_id}")
    pdf["roll_3"] = pdf["units"].rolling(3, min_periods=1).mean()
    plt.plot(pdf["month"], pdf["roll_3"], label="3-month MA")
    plt.title(f"Seasonality - Category {cat_id}")
    plt.xlabel("Month")
    plt.ylabel("Units")
    plt.legend()
    plt.tight_layout()
    if save_path:
        plt.savefig(save_path)
    else:
        plt.show()
    plt.close()


In [None]:
def plot_category_correlation(df: pl.DataFrame, start_date=None, end_date=None, save_path=None):
    # pivot: index = month, cols = cat_id, values = sum units
    tmp = df.with_columns(pl.col("date_id").dt.truncate("1mo").alias("month"))
    agg = tmp.groupby(["month", "cat_id"]).agg(pl.sum("units_sold").alias("units"))
    pivot = agg.pivot(values="units", index="month", columns="cat_id").fill_null(0).sort("month")
    pdf = pivot.to_pandas().set_index("month")
    corr = pdf.corr()
    # plot heatmap via matplotlib
    fig, ax = plt.subplots(figsize=(8,6))
    c = ax.imshow(corr.values, aspect='auto')
    ax.set_xticks(range(len(corr.columns)))
    ax.set_xticklabels(corr.columns, rotation=90)
    ax.set_yticks(range(len(corr.index)))
    ax.set_yticklabels(corr.index)
    fig.colorbar(c, ax=ax)
    plt.title("Correlation matrix between categories (monthly aggregated units)")
    plt.tight_layout()
    if save_path:
        plt.savefig(save_path)
    else:
        plt.show()
    plt.close()

# ----------- Thứ 5 (06/10/2025) -----------

In [None]:
def generate_basic_insights(fact_df: pl.DataFrame):
    insights = []
    # top categories by total units
    top_cats = fact_df.groupby("cat_id").agg(pl.sum("units_sold").alias("total_units")).sort("total_units", reverse=True).head(10)
    insights.append(("top_cats", top_cats))
    # top states by revenue
    top_states = fact_df.groupby("state_id").agg(pl.sum("revenue").alias("total_revenue")).sort("total_revenue", reverse=True).head(10)
    insights.append(("top_states", top_states))
    # seasonality signal check: variance across months / mean
    monthly = fact_df.with_columns(pl.col("date_id").dt.truncate("1mo").alias("month")).groupby("month").agg(pl.sum("units_sold").alias("units"))
    pdf = monthly.to_pandas()
    seasonality_coef = pdf["units"].std() / (pdf["units"].mean() + 1e-9)
    insights.append(("seasonality_coef", seasonality_coef))
    return insights