In [1]:
# execute duckdb query and transform the result into a parquet file
import duckdb
import polars as pl
import polars.selectors as cs
from snapshot import snapshot_query
from typing import List

pl.Config.set_tbl_rows(50)

polars.config.Config

In [2]:
train_from = "2013-01-01"
train_upto = "2016-08-15"
today = "2016-08-15"

snapshot_query = snapshot_query.format(date_from=train_from, date_upto=train_upto)

with duckdb.connect(database="../dbcore/data/core.db", read_only=True) as con:
    train_snapshot_df = con.execute(snapshot_query).pl()
train_snapshot_df.write_parquet("../../data/favorita_dataset/snapshots/train_snapshot.parquet", compression="zstd")

In [3]:
train_snapshot_df.filter(product_id=1, store_id=1).select(
        pl.col("c_date"),
    pl.col("product_id"),
    pl.col("store_id"),
    pl.col("log_units_sold"),
    pl.col("h1_log_units_sold"),
)

c_date,product_id,store_id,log_units_sold,h1_log_units_sold
date,i32,i32,f64,f64
2013-01-02,1,1,3.555348,3.912023
2013-01-03,1,1,3.912023,3.135494
2013-01-04,1,1,3.135494,3.401197
2013-01-05,1,1,3.401197,3.332205
2013-01-06,1,1,3.332205,3.258097
2013-01-07,1,1,3.258097,2.944439
2013-01-08,1,1,2.944439,3.526361
2013-01-09,1,1,3.526361,3.433987
2013-01-10,1,1,3.433987,2.639057
2013-01-11,1,1,2.639057,2.944439


In [None]:
def extract_target_df(dataset: pl.DataFrame, target_cols: List[str], agg_level: List[str]) -> pl.DataFrame:
    """
    Get target DataFrame for training.
    
    Args:
        dataset (pl.LazyFrame): LazyFrame containing training data.
        target_cols (str): Column name for the target variable.
    Returns:
        pl.LazyFrame: LazyFrame with target variable.
    """
    # fill nulls in target columns using forward fill strategy

    tmp = dataset.group_by(agg_level, maintain_order=True).agg(
        pl.col(name).fill_null(strategy="forward") for name in target_cols
    ).explode(target_cols)

    return tmp.select(target_cols)

# get target set for train set and validation set
def pop_columns(df: pl.DataFrame, col_names: List[str]) -> pl.DataFrame:
    return pl.DataFrame(
        [
            df.drop_in_place(col_name)
            for col_name in col_names
        ]
    )

def drop_columns(df: pl.DataFrame, col_names: List[str]) -> pl.DataFrame:
    """
    Drop specified columns from the DataFrame.
    
    Args:
        df (pl.DataFrame): DataFrame to drop columns from.
        col_names (List[str]): List of column names to drop.
        
    Returns:
        pl.DataFrame: DataFrame with specified columns dropped.
    """
    for col_name in col_names:
        df.drop_in_place(col_name)

def feature_engineering(train_lzdf: pl.LazyFrame, target: str, horizon:int, agg_level: List[str], fill_nulls: bool=False) -> pl.LazyFrame:
    """
    Feature engineering for sales data.
    
    Args:
        train_lzdf (pl.LazyFrame): LazyFrame containing sales data.
        
    Returns:
        pl.LazyFrame: LazyFrame with engineered features.
    """

    # 2. Add calendar features
    train_lzdf = train_lzdf.with_columns([
        pl.col("c_date").dt.weekday().alias("dayofweek"),
        pl.col("c_date").dt.day().alias("dayofmonth"),
        pl.col("c_date").dt.ordinal_day().alias("dayofyear"),
        pl.col("c_date").dt.week().alias("weekofyear"),
        pl.col("c_date").dt.month().alias("month"),
        pl.col("c_date").dt.year().alias("year"),
    ])

    # 4. Add rolling features over the previous 3, 7, 14, and 28 days
    for window in [3, 7, 14, 21, 28]:
        # Only consider window before current c_date to avoid data leakage. This is done by using closed='left'
        tmp = train_lzdf.rolling('c_date', period=f'{window}d', closed='right', group_by=agg_level).agg(
            # 4.1 Calculate rolling mean, median, std, min, max, and ewm_mean to log_units_sold
            pl.col("log_units_sold").mean().alias(f"mean_{window}d_log_units_sold"),
            pl.col("log_units_sold").median().alias(f"median_{window}d_log_units_sold"),
            pl.col("log_units_sold").std().alias(f"std_{window}d_log_units_sold"),
            pl.col("log_units_sold").min().alias(f"min_{window}d_log_units_sold"),
            pl.col("log_units_sold").max().alias(f"max_{window}d_log_units_sold"),
            pl.col("log_units_sold").ewm_mean(alpha=0.9, adjust=True).last().alias(f"ewm_{window}d_log_units_sold"),
            pl.col("log_units_sold").diff().mean().alias(f"diff_mean_{window}d_log_units_sold"),
        ).with_columns(
            # 4.3 Calculate ratio of max to mean. Useful to identify outliers
            (pl.col(f"max_{window}d_log_units_sold") / pl.col(f"mean_{window}d_log_units_sold"))
                .alias(f"max_mean_ratio_{window}d_log_units_sold")
        )
        # train_lzdf = pl.concat([train_lzdf, tmp], how="horizontal", parallel=True)
        train_lzdf = train_lzdf.join(tmp, on=agg_level + ["c_date"], how="left")

    # 5. Add weekday rolling mean. e.i. mean of the same weekday in the past 4 weeks
    # TODO: No funciona bien, el primer problema es que el rolling semana no es correcto, checa over()
    # el segundo problema es que no se esta considerando el dia de la semana, se deben de crear columns para cada dia de la semana
    # for weekday in range(1, 8):
    #     train_lzdf = train_lzdf.with_columns(
    #         pl.col("log_units_sold")
    #             .rolling_mean_by('date', window_size=f"3w", closed="right")
    #             .over(agg_level + ["weekday"])
    #             .alias(f"mean_3w_{weekday}wd_log_units_sold")
    #     )

    # 5. Add yearly rolling ewm. e.i. ewm of the same day in the past 3 years
    feature_name = "ewm_3y_log_units_sold"

    tmp = train_lzdf.rolling("c_date", period="3y", closed="right", group_by=agg_level+["dayofyear"]).agg(
        pl.col.log_units_sold.ewm_mean(alpha=0.9).last().alias(feature_name)
    ).with_columns(
        (pl.col("c_date").dt.offset_by("1y").dt.offset_by(f"-{h}d")).alias(f"h{h}_c_date") \
        for h in range(1, horizon+1)
    )
    
    for h in range(1, horizon + 1):
        train_lzdf = train_lzdf.join(
            tmp.select(
                *agg_level,
                f"h{h}_c_date",
                feature_name
            ).rename({feature_name: f"h{h}_{feature_name}"}),
            left_on=agg_level+["c_date"],
            right_on=agg_level+[f"h{h}_c_date"],
            how="left",
        )

    # # 8. Finally fills null values with 0
    # if fill_nulls:
    #     train_lzdf = train_lzdf.fill_null(0)
    
    # 9. Filter by date range
    return train_lzdf.sort(by=["product_id", "store_id", "c_date"])
    

# Get columns for horizons
def apply_horizon_shifting(train_dataset: pl.DataFrame, horizons: int, agg_level: List[str]):
    # Add predictions columns for horizons
    for horizon in range(1, horizons + 1):
        tmp = train_dataset.select(
            *agg_level,
            pl.col("date") - pl.duration(days=horizon),
            pl.col("log_units_sold").alias(f"h{horizon}_log_units_sold"),
        )

        train_dataset = train_dataset.join(tmp, on=agg_level+["date"], how="left")
    return train_dataset


In [5]:
def save_df(df: pl.DataFrame, path: str):
    # choossing the best compression for a LightGBM model
    df.write_parquet(
        path,
        compression="zstd",
        # row_group_size=1000000,  # Uncomment if you need to optimize for large datasets
        # partition_by=["store_id", "product_id"],  # Uncomment if you want to optimize
    )

def train_pipeline(
    dataset: pl.DataFrame,
    target: str,
    horizon: int,
    agg_level: List[str],
):
    target_cols = [f"h{h}_{target}" for h in range(1, horizon + 1)]
    target_df = extract_target_df(dataset, target_cols, agg_level)
    drop_columns(dataset, target_cols)
    input_df = feature_engineering(dataset, target, horizon, agg_level)
    dates_df = pop_columns(input_df, ["c_date"])

    save_df(target_df, f"../../data/favorita_dataset/output/train_target.parquet")
    save_df(input_df, f"../../data/favorita_dataset/output/train_input.parquet")
    save_df(dates_df, f"../../data/favorita_dataset/output/train_dates.parquet")

    return input_df, target_df, dates_df

In [6]:
dataset = pl.read_parquet("../../data/favorita_dataset/snapshots/train_snapshot.parquet")
x, y, c = train_pipeline(
    dataset=dataset,
    target="log_units_sold",
    horizon=7,
    agg_level=["product_id", "store_id"],
)

In [7]:
x.select(
    # pl.col("c_date"),
    pl.col("product_id"),
    pl.col("store_id"),
    pl.col("log_units_sold"),
    y.get_column("h1_log_units_sold"),
    # pl.col("h1_log_units_sold"),
).filter(product_id=7, store_id=5)

product_id,store_id,log_units_sold,h1_log_units_sold
i32,i32,f64,f64
7,5,3.295837,2.70805
7,5,2.70805,2.70805
7,5,2.70805,2.890372
7,5,2.890372,3.135494
7,5,3.135494,2.397895
7,5,2.397895,1.94591
7,5,1.94591,2.302585
7,5,2.302585,2.564949
7,5,2.564949,3.135494
7,5,3.135494,3.258097


In [51]:
sample = train_df.filter(pl.col.product_id == 1, pl.col.store_id == 1)

print(sample.height)

sample = sample.with_columns(
    pl.col.c_date.dt.weekday().alias("dayofweek"),
    pl.col.c_date.dt.year().alias("year"),
    pl.col.c_date.dt.month().alias("month"),
    pl.col.c_date.dt.ordinal_day().alias("dayofyear"),
    pl.col.c_date.dt.day().alias("dayofmonth"),
).sort(["product_id", "store_id", "c_date"])

tmp = sample.rolling("c_date", period="3y", closed="right", group_by=["product_id", "store_id", "dayofyear"]).agg(
    pl.col.log_units_sold.ewm_mean(alpha=0.9).last().alias("ewm_3y_log_units_sold")
).with_columns(
    (pl.col("c_date").dt.offset_by("1y").dt.offset_by(f"-{h}d")).alias(f"h{h}_c_date") for h in range(1,8)
)#.drop("c_date", "dayofyear")

1316


In [None]:
import polars as pl
import polars.selectors as cs

from typing import Tuple, List
from datetime import date

# def ewm(arr: pl.Series, alpha=0.9):
#     if arr.len() == 0:
#         return None
#     weights = (1 - alpha) ** np.arange(arr.len()-1, -1, -1)
#     return (arr * weights).sum() / weights.sum()

def feature_engineering(snapshot_path: str, between: Tuple[date, date], target:str, agg_level: List[str], fill_nulls: bool=False) -> pl.LazyFrame:
    """
    Feature engineering for sales data.
    
    Args:
        sales_lzdf (pl.LazyFrame): LazyFrame containing sales data.
        
    Returns:
        pl.LazyFrame: LazyFrame with engineered features.
    """
    
    # 1. Load
    sales_lzdf = pl.scan_parquet(
        snapshot_path,
    ).sort(agg_level + ["date"]).with_columns([
        pl.col("units_sold").log1p(),
        cs.boolean().cast(pl.Int8),  # Convert boolean to int for compatibility with LightGBM
        cs.string().cast(pl.Categorical), # Convert string categories to categorical type
    ]).rename({"units_sold": "log_units_sold"})  # Drop original units_sold column to avoid confusion

    # 2. Add calendar features
    sales_lzdf = sales_lzdf.with_columns([
        pl.col("date").dt.weekday().alias("weekday"),
        pl.col("date").dt.month().alias("month"),
        pl.col("date").dt.week().alias("weekofyear"),
        pl.col("date").dt.ordinal_day().alias("dayofyear"),
    ]).with_columns(
        cs.boolean().cast(pl.Int8)  # Convert boolean to int for compatibility with LightGBM
    )

    # # 3. Add lagged features for the previous 1, 3, 7, and 14 days
    # for lag in [1, 3, 7, 14]:
    #     # 3.1 This is a workaround for considering no registered sales on the lag day
    #     # by shifting the date by the lag and joining on the shifted date
    #     tmp = sales_lzdf.select(
    #         *agg_level,
    #         pl.col("date") + pl.duration(days=lag),
    #         pl.col("log_units_sold").alias(f"lag_{lag}d_log_units_sold"),
    #         # pl.col("is_on_promotion").alias(f"lag_{lag}d_is_on_promotion"),
    #     )
        
    #     sales_lzdf = sales_lzdf.join(tmp, on=agg_level+["date"], how="left")

        # 3.2 This is the original way to add lags, but it will not consider no registered sales on the lag day
        # sales_lzdf = sales_lzdf.with_columns(
        #     pl.col("log_units_sold")
        #         .shift_by(lag, fill_value=0)  # Fill with 0 to avoid NaN
        #         .over(["product_id", "store_id"], order_by="date")
        #         .alias(f"lag_{lag}d_log_units_sold")
        # )

    # 4. Add rolling features over the previous 3, 7, 14, and 28 days
    for window in [3, 7, 14, 28]:
        # Only consider window before current date to avoid data leakage. This is done by using closed='left'
        tmp = sales_lzdf.rolling('date', period=f'{window}d', closed='right', group_by=agg_level).agg(
            # 4.1 Calculate rolling mean, median, std, min, max, and ewm_mean to log_units_sold
            pl.col("log_units_sold").mean().alias(f"mean_{window}d_log_units_sold"),
            pl.col("log_units_sold").median().alias(f"median_{window}d_log_units_sold"),
            pl.col("log_units_sold").std().alias(f"std_{window}d_log_units_sold"),
            pl.col("log_units_sold").min().alias(f"min_{window}d_log_units_sold"),
            pl.col("log_units_sold").max().alias(f"max_{window}d_log_units_sold"),
            pl.col("log_units_sold").ewm_mean(alpha=0.9, adjust=True).last().alias(f"ewm_{window}d_log_units_sold"),
            pl.col("log_units_sold").diff().mean().alias(f"diff_mean_{window}d_log_units_sold"),
        ).with_columns(
            # 4.3 Calculate ratio of max to mean. Useful to identify outliers
            (pl.col(f"max_{window}d_log_units_sold") / pl.col(f"mean_{window}d_log_units_sold"))
                .alias(f"max_mean_ratio_{window}d_log_units_sold")
        )
        # sales_lzdf = pl.concat([sales_lzdf, tmp], how="horizontal", parallel=True)
        sales_lzdf = sales_lzdf.join(tmp, on=agg_level + ["date"], how="left")

        # sales_lzdf = sales_lzdf.with_columns(
        #     # Calculate rolling mean
        #     pl.col('log_units_sold') # Only consider window before current date to avoid data leakage
        #         .rolling_mean_by('date', window_size=f"{window}d", closed="left")
        #         .over(["product_id", "store_id"])
        #         .alias(f"mean_{window}d_log_units_sold"),
        #     # Calculate rolling median
        #     pl.col('log_units_sold') # Only consider window before current date to avoid data leakage
        #         .rolling_median_by('date', window_size=f"{window}d", closed="left")
        #         .over(["product_id", "store_id"])
        #         .alias(f"median_{window}d_log_units_sold"),
        #     # Calculate rolling standard deviation
        #     pl.col('log_units_sold') # Only consider window before current date to avoid data leakage
        #         .rolling_std_by('date', window_size=f"{window}d", closed="left")
        #         .over(["product_id", "store_id"])
        #         .alias(f"std_{window}d_log_units_sold"),
        #     # Calculate rolling min
        #     pl.col('log_units_sold') # Only consider window before current date to avoid data leakage
        #         .rolling_min_by('date', window_size=f"{window}d", closed="left")
        #         .over(["product_id", "store_id"])
        #         .alias(f"min_{window}d_log_units_sold"),
        #     # Calculate rolling max
        #     pl.col('log_units_sold') # Only consider window before current date to avoid data leakage
        #         .rolling_max_by('date', window_size=f"{window}d", closed="left")
        #         .over(["product_id", "store_id"])
        #         .alias(f"max_{window}d_log_units_sold"),
        #     # # calculate rolling exponential weighted mean
        #     # pl.col('log_units_sold') # Only consider window before current date to avoid data leakage
        #     # .rolling_map(ewm, window_size=f"{window}d", closed="left"),
                # pl.col("is_on_promotion")
                #         .rolling_sum_by('date', window_size=f"{window}d", closed="left")
                #         .over(agg_level)
                #         .alias(f"sum_{window}d_is_on_promotion")
        # )

    # 5. Add weekday rolling mean. e.i. mean of the same weekday in the past 4 weeks
    for weeks in [1, 2, 3, 4]:
        sales_lzdf = sales_lzdf.with_columns(
            pl.col("log_units_sold")
                .rolling_mean_by('date', window_size=f"{weeks}w", closed="left")
                .over(agg_level + ["weekday"])
                .alias(f"mean_{weeks}w_log_units_sold")
        )

    # 5. Add yearly rolling mean. e.i. mean of the same day in the past 4 years
    for years in [1, 2, 3, 4]:
        sales_lzdf = sales_lzdf.with_columns(
            pl.col("log_units_sold")
                .rolling_mean_by('date', window_size=f"{years}w", closed="left")
                .over(agg_level + ["dayofyear"])
                .alias(f"mean_{years}y_log_units_sold")
        )

    # 6. Add is_on_promotion column rolling sum after 16 days
    for window in [3, 7, 14]:
        tmp = sales_lzdf.rolling('date', period=f'{window}d', offset="0d", closed='right', group_by=agg_level).agg(
            pl.col("is_on_promotion").sum().alias(f"sum_next_{window}d_is_on_promotion")
        )
        sales_lzdf = sales_lzdf.join(tmp, on=agg_level + ["date"], how="left")

    # 7. Join item features
    # TODO: Esta concatenacion no deberia de estar aqui, deberia de ser otro paso del pipeline
    products_lzdf = pl.scan_parquet(
        "../../data/favorita_dataset/subset/products.parquet"
    ).with_columns(
        cs.boolean().cast(pl.Int8),  # Convert boolean to int for compatibility with LightGBM
        cs.string().cast(pl.Categorical), # Convert string categories to categorical type
    )

    sales_lzdf = sales_lzdf.join(
        products_lzdf,
        on="product_id",
        how="left"
    )

    # 8. Finally fills null values with 0
    if fill_nulls:
        sales_lzdf = sales_lzdf.fill_null(0)
    
    # 9. Filter by date range
    return sales_lzdf.filter(pl.col("date").is_between(*between))
    

# Get columns for horizons
def apply_horizon_shifting(train_dataset: pl.DataFrame, horizons: int, agg_level: List[str]):
    # Add predictions columns for horizons
    for horizon in range(1, horizons + 1):
        tmp = train_dataset.select(
            *agg_level,
            pl.col("date") - pl.duration(days=horizon),
            pl.col("log_units_sold").alias(f"h{horizon}_log_units_sold"),
        )

        train_dataset = train_dataset.join(tmp, on=agg_level+["date"], how="left")
    return train_dataset

    # # 7. Flatten for single model
    # # let's expand for horizon 1–7
    # horizons = []
    # for h in range(1,2):
    #     tmp = sales_lzdf.filter(
    #         pl.col("date") <= pl.date("2017-08-15")
    #     ).with_columns([
    #         pl.lit(h).alias("horizon")
    #     ])
    #     # align y label
    #     tmp = tmp.with_columns(
    #         pl.col("units_sold")
    #         .shift(-h, by=["store_nbr", "item_nbr"])
    #         .alias("target")
    #     )
    #     horizons.append(tmp)

    # train = pl.concat(horizons)

In [6]:
# count nulls
x.null_count()

product_id,store_id,log_units_sold,product_group,next_1d_event_id,next_2d_event_id,next_3d_event_id,next_4d_event_id,next_5d_event_id,next_6d_event_id,next_7d_event_id,next_1d_is_on_promo,next_2d_is_on_promo,next_3d_is_on_promo,next_4d_is_on_promo,next_5d_is_on_promo,next_6d_is_on_promo,next_7d_is_on_promo,dayofweek,dayofmonth,dayofyear,weekofyear,month,year,mean_3d_log_units_sold,median_3d_log_units_sold,std_3d_log_units_sold,min_3d_log_units_sold,max_3d_log_units_sold,ewm_3d_log_units_sold,diff_mean_3d_log_units_sold,max_mean_ratio_3d_log_units_sold,mean_7d_log_units_sold,median_7d_log_units_sold,std_7d_log_units_sold,min_7d_log_units_sold,max_7d_log_units_sold,ewm_7d_log_units_sold,diff_mean_7d_log_units_sold,max_mean_ratio_7d_log_units_sold,mean_14d_log_units_sold,median_14d_log_units_sold,std_14d_log_units_sold,min_14d_log_units_sold,max_14d_log_units_sold,ewm_14d_log_units_sold,diff_mean_14d_log_units_sold,max_mean_ratio_14d_log_units_sold,mean_21d_log_units_sold,median_21d_log_units_sold,std_21d_log_units_sold,min_21d_log_units_sold,max_21d_log_units_sold,ewm_21d_log_units_sold,diff_mean_21d_log_units_sold,max_mean_ratio_21d_log_units_sold,mean_28d_log_units_sold,median_28d_log_units_sold,std_28d_log_units_sold,min_28d_log_units_sold,max_28d_log_units_sold,ewm_28d_log_units_sold,diff_mean_28d_log_units_sold,max_mean_ratio_28d_log_units_sold,h1_ewm_3y_log_units_sold,h2_ewm_3y_log_units_sold,h3_ewm_3y_log_units_sold,h4_ewm_3y_log_units_sold,h5_ewm_3y_log_units_sold,h6_ewm_3y_log_units_sold,h7_ewm_3y_log_units_sold
u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32,u32
0,0,0,0,42130,42026,42027,42063,42064,42170,42066,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,37,0,0,0,37,0,0,0,37,0,0,0,37,0,0,0,35,0,0,0,35,0,0,0,35,0,0,0,35,0,0,0,35,0,0,0,35,0,12892,12857,12822,12787,12752,12717,12612


In [None]:
def split_stage(
    dataset: pl.LazyFrame,
    interval: Tuple[date, date],
    target_cols: List[str],
) -> Tuple[pl.DataFrame, pl.DataFrame]:

    x_df = dataset.filter(pl.col("date").is_between(*interval)).collect()
    date_df = pop_columns(x_df, ["date"])

    return x_df, date_df    

In [None]:
train_interval = (date(2013, 1, 1), date(2016, 12, 31))
save_split(dataset, train_interval, tag="train", horizon=7)

valid_interval = (date(2016, 12, 1), date(2017, 8, 15))
save_split(dataset, valid_interval, tag="valid", horizon=7)

In [None]:
import polars as pl

alpha = 0.9
window = 2

dates = [
    "2020-01-01",
    "2020-01-02",
    "2020-01-03",
    "2020-01-04",
    "2020-01-06",
    "2020-01-07",
    "2020-01-01",
    "2020-01-02",
]

df = pl.DataFrame({
    "store_id": [3,3,3,3,3,3,2,2],
    "product_id": [10,10,10,10,10,10,10,10],
    "date": dates,
    "b": [1,2,None,None,5,6,1,None],
}).with_columns(
    pl.col("date").str.strptime(pl.Date).set_sorted()
)

# [
#     # pl.col("b"). #.last()#.shift(-1).first().fill_null(0)
# ]
df.with_columns(
    pl.col("b")
        # .rolling("date", period="2d", closed="left", offset="0d")
        # .list.mean()
        .rolling_min(2)
        .over(["product_id", "store_id"], order_by="date")
        .alias("new")
        # .rolling("date", period="2d", closed="left", offset="0d")
        # .mean()
        #     pl.col("b").shift(-1)  # Shift to get the next value in the group
        # )
        # .shift(-1)  # Shift to get the next value in the group
        
        # .over(["product_id", "store_id"])
        # .ewm_mean(alpha=0.9, adjust=True)
        # .alias("new")
)

store_id,product_id,date,b,new
i64,i64,date,i64,i64
3,10,2020-01-01,1.0,
3,10,2020-01-02,2.0,1.0
3,10,2020-01-03,,
3,10,2020-01-04,,
3,10,2020-01-06,5.0,
3,10,2020-01-07,6.0,5.0
2,10,2020-01-01,1.0,
2,10,2020-01-02,,


In [None]:
df.rolling('date', period='2d', closed='right', group_by=["product_id", "store_id"]).agg(
    pl.exclude("date"),
    pl.col("b").mean().alias("b_rolling_window"),
)

product_id,store_id,date,b,c,b_rolling_window
i64,i64,date,list[i64],list[i64],f64
10,2,2020-01-01,[1],[1],1.0
10,2,2020-01-02,"[1, 2]","[1, 1]",1.5
10,3,2020-01-01,[1],[1],1.0
10,3,2020-01-02,"[1, 2]","[1, 1]",1.5
10,3,2020-01-03,"[2, 3]","[1, 1]",2.5
10,3,2020-01-04,"[3, 4]","[1, 1]",3.5
10,3,2020-01-06,[5],[1],5.0
10,3,2020-01-07,"[5, 6]","[1, 1]",5.5


In [None]:
df.group_by(["product_id", "store_id"]).agg(
    pl.col("b").fill_null(strategy="forward")#.over("date")
).explode("b")
# df.fill_null(strategy="backward").over(["product_id", "store_id"], order_by="date")

product_id,store_id,b
i64,i64,i64
10,2,1
10,2,1
10,3,1
10,3,2
10,3,2
10,3,2
10,3,5
10,3,6


In [None]:
import numpy as np

x = np.array([1, 2, 3])
alpha = 0.9

ewm = [
    x[0],
    (2*1 + 1*0.1) / (1 + 0.1),
    (3*1 + 2*0.1 + 1*0.01) / (1 + 0.1 + 0.01)
]
print(ewm)
# [1.0, 1.9090909090909092, 2.891891891891892]

[1, 1.909090909090909, 2.8918918918918917]
