In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pickle as pkl
from utils.utils import _down_cast, data_preprocessing, diff_lists, log_status
from utils.utils import parse_columns_to_string
from utils import constants

from utils.configure_logger import configure_logger
configure_logger()
from logging import getLogger
logger = getLogger(__name__)

import warnings
warnings.simplefilter("ignore")

In [3]:
# file names and paths
DATA_BASE_PATH = constants.DATA_BASE_PATH
DATA_BASE_PATH_UNCERTAINTY = constants.DATA_BASE_PATH_UNCERTAINTY
SALES_EVALUATION = constants.SALES_EVALUATION
SALES_VALIDATION = constants.SALES_VALIDATION
CALENDAR = constants.CALENDAR
SAMPLE_SUBMISSION = constants.SAMPLE_SUBMISSION
SELL_PRICES = constants.SELL_PRICES
PRECOMPUTED_BASE_PATH = constants.PRECOMPUTED_BASE_PATH

DAYS: int = constants.DAYS #28
QUANTILES: int = constants.QUANTILES
AGG_LEVEL_COLUMNS = constants.AGG_LEVEL_COLUMNS
D_CROSS_VAL_START_LIST = constants.D_CROSS_VAL_START_LIST #[1802, 1830, 1858, 1886, 1914]

In [4]:
# read all data
sales_validation: pd.DataFrame = _down_cast(pd.read_csv(DATA_BASE_PATH + SALES_VALIDATION))
# sales_evaluation: pd.DataFrame = _down_cast(pd.read_csv(DATA_BASE_PATH + SALES_EVALUATION))
calendar: pd.DataFrame = _down_cast(pd.read_csv(DATA_BASE_PATH + CALENDAR))
sell_prices: pd.DataFrame = _down_cast(pd.read_csv(DATA_BASE_PATH + SELL_PRICES))

### Feature Engineering

#### Test Runs

In [1]:
# ~25 seconds
# 1802, 1830, 1858, 1886, 1914
def drop_days_after(df, day_threshold):
    """ 
    Drop all days after a certain threshold.
    By doing this, we ensure that every fold is truly out-of-sample,
    because we throw away all future data.
    """
    columns_keep = [c for c in df.columns if c.split('_')[0] != 'd']
    columns_keep += [
        c for c in 
            [d for d in df.columns if d.split('_')[0] == 'd'] 
            if int(c.split('_')[1]) < day_threshold
    ]
    return df[columns_keep]

In [7]:
D_START_VAL = 1914
df, submission_idx = data_preprocessing(
    drop_days_after(
        sales_validation,#.iloc[:int(1000000)],
        day_threshold = D_START_VAL
    ), 
    calendar,
    sell_prices
)
df = df[(df.wm_yr_wk > df.release)]
df['id'] = df['id'].astype(str)

In [None]:
@log_status
def compute_features(df: pd.DataFrame, group_columns: list, q: int = None, sparse_features: bool = False, agg_level: str = None):
    """
    Type of features computed:
     - auto_lagged_values
     - auto_sold_ma/ewm/std/qtile
     - price_momentum
     - price_uncond
     - price_auto_std (rolling price std.)
    """
    # drop all NaT dates
    idx = df['date'].notna()
    df = df[idx]
    
    # these columns should always be included, because they are required 
    # in future groupings during training and computing the WSPL
    feature_columns = []
    for c in ['id', 'state_id', 'item_id', 'dept_id', 'cat_id', 'store_id', 'd', 'sold']:
        if c in df:
            feature_columns += [c]
            
    # to be sure
    df['month'] = df['month'].astype(int)
    
    # to ensure the data is in the correct order, we need d_{day} as an integer
    df["index"] = df.index
    df["d_int"] = df["d"].apply(lambda x: int(x.split("_")[-1]))
    df = df.sort_values('d_int')
    df["day"] = df.date.dt.day # the day of the month
    
    # to use groupby only once, lower runtime
    df_grouped = df.groupby(group_columns)
    
    ################################################
    ############### AUTOCORRELATION ################
    ################################################
    PREFIX = 'auto_'
    logger.info('Computing autocorrelation features')
    # DIRECT LAGGED VALUES
    old_columns: set = set(df.columns)
    LAG_SHIFT: int = 1
    for lag in [1, 2, 7, 14, 28, 56]:
        if sparse_features: continue
        df[PREFIX + f"sold_{lag}"] = df_grouped["sold"].shift(lag) # 1-day lag
    feature_columns += diff_lists(df.columns, old_columns)
    
    # ROLLING EWM/MA/STD/QUANTILE
    old_columns: set = set(df.columns)
    for i in [3,7,14,21,28,56,112,168]:
        if sparse_features and i != 28: continue
        min_periods = int(np.ceil(i ** 0.8))
        
        df[PREFIX + f'sold_ma_{i}'] = df_grouped['sold'].transform(lambda x: x.shift(LAG_SHIFT).rolling(i, min_periods).mean()).astype(np.float16)
        df[PREFIX + f'sold_std_{i}'] = df_grouped['sold'].transform(lambda x: x.shift(LAG_SHIFT).rolling(i, min_periods).std()).astype(np.float16)
        df[PREFIX + f'sold_ewm_{i}'] = df_grouped['sold']\
            .transform(lambda x: x.shift(LAG_SHIFT)
            .ewm(span=i, min_periods = min_periods)
            .mean())\
            .astype(np.float16)

        for quantile in [0.01, 0.1, 0.25, .5, 0.75, 0.9, 0.99]:#QUANTILES:
            if i * min(quantile, 1-quantile) >= 1:
                if sparse_features and i!=28:continue
                df[PREFIX + f'sold_qtile_{i}_{quantile}'] = df_grouped['sold'].transform(lambda x: x.shift(LAG_SHIFT).rolling(i, min_periods).quantile(quantile)).astype(np.float16)
    feature_columns += diff_lists(df.columns, old_columns)
    
    ###############################################
    ############ PRICE AUTOCORRELATION ############
    ###############################################
    logger.info('Computing price autocorrelation features')
    
    # SIMPLY THE CURRENT PRICE
    # 'temp_id' is used for grouping at Level1/Total
    # Unconditional features for univariate level would not make sense when using LightGBM
    if 'temp_id' not in group_columns:
        old_columns = set(df.columns)
        df_grouped_d = df.groupby(group_columns + ['d'])
        df['price_uncond_avg'] = df_grouped_d['sell_price'].transform(lambda x: x.mean()).astype(np.float32)
        df['price_uncond_std'] = df_grouped_d['sell_price'].transform(lambda x: x.std()).astype(np.float32)
        df['price_uncond_median'] = df_grouped_d['sell_price'].transform(lambda x: x.median()).astype(np.float32)
        feature_columns += diff_lists(df.columns, old_columns)
    
    # to prevent type issues when storing dataframe as .parquet file
    for c in ['wm_yr_wk', 'year', 'month']:
        df[c] = df[c].astype('int32')
        
    # PRICE DIFFERENCES BY WEEK/MONTH/YEAR (USSUALLY ON)
    PREFIX = 'price_momentum_'
    old_columns = set(df.columns)
    df[PREFIX + 'w'] = df['sell_price'] / df.groupby(group_columns + ['wm_yr_wk'])['sell_price'].transform(lambda x: x.shift(LAG_SHIFT)).astype(np.float32)
    df[PREFIX + 'm'] = df['sell_price'] / df.groupby(group_columns + ['year', 'month'])['sell_price'].transform(lambda x: x.shift(LAG_SHIFT)).astype(np.float32)
    df[PREFIX + 'y'] = df['sell_price'] / df.groupby(group_columns + ['year'])['sell_price'].transform(lambda x: x.shift(LAG_SHIFT)).astype(np.float32)
    feature_columns += diff_lists(df.columns, old_columns)
    
    # VARIATION OF PRICES
    PREFIX = 'price_auto_'
    old_columns = set(df.columns)
    for i in [28, 56, 112]:
        min_periods = int(np.ceil(i ** 0.8))
        df[PREFIX + f'std_{int(i)}'] = df_grouped['sell_price'].transform(lambda x: x.rolling(i).std()).astype(np.float16)
    feature_columns += diff_lists(df.columns, old_columns)

    ################################################
    ############## SEASONAL FEATURES ###############
    ################################################
    logger.info('Seasonal features to category')
    # set seasonal features as type 'category'
    # LightGBM will automatically 'pick them up' and encode
    PREFIX = 'seasonal_'
    df[PREFIX + 'weekday'] = df['weekday'].astype('category')
    feature_columns += [PREFIX + 'weekday']
    df[PREFIX + 'monthday'] = df['day'].astype('category')
    feature_columns += [PREFIX + 'monthday']
    df[PREFIX + 'month'] = df['month'].astype('category')
    feature_columns += [PREFIX + 'month']
    
    ################################################
    ################ OTHER FEATURES ################
    ################################################
    # THIS FEATURE IS ONLY RELEVANT AT LOWER LEVELS WHERE 0 SALES OCCUR
    if agg_level in ['Level10', 'Level11', 'Level12']:
        logger.info('Computing PCT of non-zero days')
        old_columns = set(df.columns)
        for i in [7, 14, 28, 28*2, 28*4]:
            if sparse_features and i != 28: continue
            df[f'sold_pct_nonzero_{i}'] = df_grouped['sold'].transform(lambda x: (x!=0).rolling(i).mean().shift(LAG_SHIFT)).astype(np.float16)
        feature_columns += diff_lists(df.columns, old_columns)
    
    ################################################
    ############### STATE/STORE/CAT ################
    ################################################
    # temporary memory savings
    df = _down_cast(df)
    
    logger.info('Create state_id if store_id in group columns')
    if 'store_id' in group_columns:
        df['state_id'] = df['store_id'].str.split('_').apply(lambda x: x[0])   
        feature_columns += ['state_id']
    if 'state_id' in group_columns:
        df['state_id'] = df['state_id'].astype('category')
    if 'store_id' in group_columns:
        df['store_id'] = df['store_id'].astype('category')
    
    ################################################
    ############## TARGET ENGINEERING ##############
    ################################################
    # The forecasting period is 28 days. We do not want to do recursive forecasting,
    # i.e. reusing forecasts and recomputing features to predict another day ahead and so on. 
    # Instead, we pass the days of forecasting as a feature in the model.
    # For the training data, this forecasting period is randomized so the model
    # is trained to predict various days ahead.
    # For the out-of-sample period, the features are exactly the same,
    # except of course for the number of days ahead (i.e. increasing from 1 to 28)
    # The code for this is quite ugly, but it works.
    
    mapping_dict = {int(D_START_VAL+i):i for i in range(DAYS)}
    def map_d_to_days_fwd(row):
        """ 
        This functions is applied on the dataframe.
        For the prediction period, the random integers between 1,28
        will be replaced by 1 to 28 linearly. For the other rows,
        the random number is returned.
        """
        return mapping_dict.get(row['d_int'], row['days_fwd'])

    # We assign every target a random number of days ahead.
    # Then we 'match' the row of features that belongs to it.
    # Example: say we have a sold target recorded at day 1850, 
    # and the randomly assigned days ahead is 17,
    # then we will match this target with the row of features at day 1833.
    days_forecast = np.random.randint(low=0, high=DAYS, size=len(df))
    
    # Create df with solely the target and date
    # These columns belong to the target and should not be shifted randomly
    columns_temp = ['sold', 'd']\
        + ['seasonal_weekday', 'seasonal_month', 'seasonal_monthday']\
        + ['price_momentum_'+i for i in ['w', 'm', 'y']]\
        + ['price_auto_std_'+str(i) for i in [28, 56, 112]]
    df_temp: pd.DataFrame = df[columns_temp + group_columns + ['d_int']]
    df_temp['d_int'] = df_temp['d_int'].astype(int)
    df_temp['days_fwd'] = days_forecast
    df_temp['days_fwd'] = df_temp.apply(map_d_to_days_fwd, axis=1)
    # days_fwd = pd.to_timedelta(days_forecast, unit='D')
    df_temp['d_int'] -= df_temp['days_fwd']
    df_temp['d_int'] = df_temp['d_int'].astype(int)
    df_temp['d'] = 'd_' + df_temp['d_int'].astype(str)
    feature_columns += ['days_fwd']
    
    # Match features with shifted targets
    df = df.drop(columns_temp, axis=1)
    df = pd.merge(
        df,
        df_temp,
        on = ['d_int'] + group_columns,
        how = 'right'
    )
    # add DAYS back to d_int and d
    df['d_int'] += df['days_fwd']
    df['d'] = 'd_' + df['d_int'].astype(str)
    
    # drop invalid cases of forecasting windows (i.e. negative d)
    idx = df['d_int'] > DAYS
    df = df[idx]
    ############# RETURN FINAL RESULTS ############
    
    # return final results
    df = _down_cast(df)
    return df[feature_columns]

### Trial run if feature_engineering works

In [34]:
# total ~77 seconds
D_START_VAL = 1914
level = 'Level3'

if level == 'Level1':
    df['temp_id'] = 'temp_id'
agg_columns = AGG_LEVEL_COLUMNS[level] if level != 'Level1' else ['temp_id']
agg_dict = {
    'sold': np.nansum,
    'sell_price': np.nanmean,
    'date': 'last',
    'weekday': 'last',
    'month': 'last',
    'year': 'last',
    'wm_yr_wk': 'last'
}
features = compute_features(
    df.groupby(agg_columns + ['d']).agg(agg_dict).reset_index(drop=False),
    agg_columns,
    sparse_features=False,
    agg_level=level
)
features

2023-12-04 11:33:55 - compute_features - INFO - calling
2023-12-04 11:33:55 - __main__ - INFO - Computing autocorrelation features
2023-12-04 11:33:55 - __main__ - INFO - Computing price autocorrelation features
2023-12-04 11:33:59 - __main__ - INFO - Encoding date features to dummies
2023-12-04 11:33:59 - __main__ - INFO - Create state_id if store_id in group columns


Unnamed: 0,store_id,d,sold,auto_sold_56,auto_sold_1,auto_sold_2,auto_sold_28,auto_sold_7,auto_sold_14,auto_sold_qtile_168_0.75,...,price_momentum_m,price_momentum_y,price_auto_std_28,price_auto_std_56,price_auto_std_112,seasonal_weekday,seasonal_monthday,seasonal_month,state_id,days_fwd
210,CA_2,d_29,3512.0,,1844.0,2064.0,,,,,...,0.998047,0.998047,,,,Saturday,26,2,CA,16
211,WI_2,d_29,1954.0,,,,,,,,...,1.004883,1.004883,,,,Saturday,26,2,WI,21
212,TX_3,d_29,2392.0,,1542.0,1763.0,,,,,...,1.004883,1.004883,,,,Saturday,26,2,TX,15
213,CA_3,d_29,4760.0,,3264.0,3044.0,,5168.0,5416.0,,...,1.002930,1.002930,,,,Saturday,26,2,CA,7
214,CA_4,d_29,1636.0,,1915.0,1634.0,,1501.0,,,...,1.007812,1.007812,,,,Saturday,26,2,CA,12
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
19335,WI_2,d_1941,0.0,4360.0,5128.0,5404.0,4256.0,4348.0,6228.0,5476.0,...,1.000000,1.000000,0.000990,0.001777,0.001838,Sunday,22,5,WI,27
19336,WI_3,d_1941,0.0,3252.0,4324.0,4688.0,2964.0,3556.0,4044.0,4300.0,...,1.000000,1.000000,0.002609,0.002550,0.002495,Sunday,22,5,WI,27
19337,CA_3,d_1941,0.0,6292.0,7720.0,7420.0,5488.0,6068.0,6580.0,6592.0,...,1.000000,1.000000,0.001584,0.003113,0.004791,Sunday,22,5,CA,27
19338,WI_1,d_1941,0.0,3452.0,4872.0,4772.0,3040.0,3236.0,3492.0,4340.0,...,1.000000,1.000000,0.001319,0.001660,0.002312,Sunday,22,5,WI,27


In [38]:
# verify that for the prediction period,
# days forward is not random but 0 to DAYS-1, increasing linearly
for i in range(DAYS):
    assert len(features[features['d'] == f'd_{int(D_START_VAL+i)}']['days_fwd'].unique())==1
    assert features[features['d'] == f'd_{int(D_START_VAL+i)}']['days_fwd'].unique()[0]==i

### Final Runs Feature Engineering for Validation and Evaluation

In [39]:
@log_status
def groupby_agglevel(df: pd.DataFrame, agg_columns: list, agg_dict: dict):
    return df.groupby(agg_columns).agg(agg_dict).reset_index(drop=False)

In [41]:
# params for un
SPARSE_FEATURES = False
TEST_RUN = False
MAX_AGG_LEVEL = 3 
# this makes level indicates to only 
# compute for aggregation levels for
# Level1, Level2 ... Level{12-MAX_AGG_LEVEl}
# i.e. MAX_AGG_LEVEL=3 results in;
# Level1, Level2, ..., Level9

# compute features+targets in each fold (targets are stored in the same df)
for D_START_VAL in D_CROSS_VAL_START_LIST:
    logger.info(f'D_START_VAL: {D_START_VAL}')

    # the grouped df's can be precomputed, which saves time
    # here we check if this is the case
    all_found = True
    for agg_level in AGG_LEVEL_COLUMNS:
        try:
            aaaaa = pd.read_parquet(f'../data/uncertainty/fold_{int(D_START_VAL)}/grouped/grouped_{agg_level}.parquet')
        except:
            all_found = False
            logger.info(f'grouped df not computed for level: {agg_level}')
        
    if not all_found:
        # pivot initial dataframe and compute features/targets
        df_val, submission_idx_validation = data_preprocessing(
            drop_days_after(sales_validation,
            day_threshold = D_START_VAL), 
            calendar,
            sell_prices
        )
        # drop all leading rows with leading zeros for each product
        df_val_after_release = df_val[(df_val.wm_yr_wk > df_val.release)]
        del df_val
    else:
        logger.info('grouped df computed for all levels, original dataframe is not loaded')

    # for each aggregation level
    for agg_level, agg_columns in AGG_LEVEL_COLUMNS.items(): 
        # remove index to compute all
        if agg_level in [f'Level{int(12 - i)}' for i in range(0,MAX_AGG_LEVEL)]:
            continue
            
        # group data for specific grouping columns per level
        # and compute features
        agg_dict = {
            'sold': np.nansum,
            'sell_price': np.nanmean,
            'date': 'last',
            'weekday': 'last',
            'month': 'last',
            'year': 'last',
            'wm_yr_wk': 'last'
        }

        if len(agg_columns) == 0:
            agg_columns = ['temp_id']
        
        # load grouped df if already exists
        try:
            aaaaa = pd.read_parquet(f'../data/uncertainty/fold_{int(D_START_VAL)}/grouped/grouped_{agg_level}.parquet')
            aaaaa = _down_cast(aaaaa)
        except:
            logger.info(f'not existing yet: ../data/uncertainty/fold_{int(D_START_VAL)}/grouped/grouped_{agg_level}.parquet')
            logger.info(f'computing and storing grouped dataframe for level: {agg_level}')
            # get data on aggregated level
            if len(agg_columns) == 0 or 'temp_id' in agg_columns:
                df_val_after_release['temp_id'] = 'temp_id'

            aaaaa = groupby_agglevel(df_val_after_release, agg_columns + ['d'], agg_dict)
            idx_keep = aaaaa['date'].notna()
            aaaaa = aaaaa[idx_keep]
            # to suitable type for .parquet
            for c in aaaaa.columns:
                if c not in agg_columns + ['d','date','weekday']:#['id', 'item_id', 'dept_id', 'cat_id', 'store_id', 'd', 'state_id']:
                    aaaaa[c] = aaaaa[c].astype(np.float32)
            aaaaa.to_parquet(f'../data/uncertainty/fold_{int(D_START_VAL)}/grouped/grouped_{agg_level}.parquet', index=False)
            aaaaa = _down_cast(aaaaa)
            
        # compute all features
        features = compute_features(
            aaaaa,
            agg_columns,
            sparse_features=SPARSE_FEATURES,
            agg_level=agg_level
        )
        
        # to suitable type for .parquet file storage
        for c in features.columns:
            if c not in ['id', 'item_id', 'dept_id', 'cat_id', 'store_id', 'd', 'state_id', 'seasonal_weekday', 'seasonal_monthday', 'seasonal_month']:
                features[c] = features[c].astype(np.float32)
        
        # format string and save file
        agg_string = parse_columns_to_string(agg_columns)
        if not TEST_RUN:
            features.to_parquet(f'../data/uncertainty/fold_{int(D_START_VAL)}/features/' + f'features_val_{agg_string}.parquet', index=False)
            del features
        else:
            pass
    
    # in the case of a test run,
    # we might want to verify if the computed dataframe is correct
    # else, remove the df to reduce memory usage
    if not TEST_RUN:
        try:
            del df_val_after_release
        except:
            pass

2023-12-04 11:36:58 - __main__ - INFO - D_START_VAL: 1802
2023-12-04 11:37:02 - __main__ - INFO - grouped df computed for all levels, original dataframe is not loaded
2023-12-04 11:37:02 - compute_features - INFO - calling
2023-12-04 11:37:02 - __main__ - INFO - Computing autocorrelation features
2023-12-04 11:37:02 - __main__ - INFO - Computing price autocorrelation features
2023-12-04 11:37:02 - __main__ - INFO - Encoding date features to dummies
2023-12-04 11:37:02 - __main__ - INFO - Create state_id if store_id in group columns
2023-12-04 11:37:02 - compute_features - INFO - calling
2023-12-04 11:37:02 - __main__ - INFO - Computing autocorrelation features
2023-12-04 11:37:02 - __main__ - INFO - Computing price autocorrelation features
2023-12-04 11:37:04 - __main__ - INFO - Encoding date features to dummies
2023-12-04 11:37:04 - __main__ - INFO - Create state_id if store_id in group columns
2023-12-04 11:37:04 - compute_features - INFO - calling
2023-12-04 11:37:04 - __main__ - IN

In [17]:
# # pivot initial dataframe and compute features/targets
# df_eval, submission_idx_validation = data_preprocessing(sales_evaluation, calendar, sell_prices)
# df_eval_after_release = df_eval[(df_eval.wm_yr_wk > df_eval.release)]
# del df_eval

# # set prediction values to nan values
# pred_index = df_eval_after_release['d'].isin(D_CV_OOS)
# df_eval_after_release.loc[pred_index, 'sold'] = np.nan
# del pred_index

In [43]:
# TEST_RUN = False
# for agg_level in AGG_LEVEL_COLUMNS:
#     agg_columns = AGG_LEVEL_COLUMNS[agg_level]
#     # get data on aggregated level
#     if len(agg_columns) == 0:
#         df_eval_after_release['temp_id'] = 'temp_id'
#         agg_columns = ['temp_id']
#     agg_dict = {
#         'sold': np.nansum,
#         'date': 'last',
#         'weekday': 'last',
#         'month': 'last'
#     }
#     features = compute_features(
#         groupby_agglevel(df_eval_after_release, agg_columns + ['d'], agg_dict),
#         agg_columns
#     )
    
#     # to suitable format for .parquet
#     for c in features.columns:
#         if c not in ['id', 'item_id', 'dept_id', 'cat_id', 'store_id', 'd', 'state_id']:
#             features[c] = features[c].astype(np.float32)
        
#     # format string and save file
#     agg_string = parse_columns_to_string(agg_columns)
#     if not TEST_RUN:
#         features.to_parquet(PRECOMPUTED_BASE_PATH + f'features_eval_{agg_string}.parquet', index=False)
#         del features
#     else:
#         features.to_parquet(PRECOMPUTED_BASE_PATH + f'/test/features_eval_{agg_string}.parquet', index=False)
        
# if not TEST_RUN: 
#     del df_eval_after_release