In [1]:
import numpy as np
import pandas as pd

In [3]:
import mlflow
from sklearn.svm import SVR
from sklearn.metrics import r2_score, mean_absolute_percentage_error, mean_squared_error
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.preprocessing import OrdinalEncoder, RobustScaler
from sklearn.feature_selection import RFECV
from xgboost import XGBRegressor, XGBRFRegressor
from feature_engine.timeseries.forecasting import LagFeatures, WindowFeatures, ExpandingWindowFeatures
from feature_engine.datetime import DatetimeFeatures
from feature_engine.selection import SmartCorrelatedSelection
from sklearn.decomposition import PCA

In [4]:
import logging
from typing import Annotated, Tuple, Dict, List, Union

<center><b>Process Data</b></center>

In [5]:
def ingest_data(data_source: Annotated[str, 'data_source']) -> Annotated[pd.DataFrame, 'data']:
    """
    Ingests data from a given path.

    Args:
        data_source: The path to the data.

    Returns:
        The data as a string.
    """
    try:
        logging.info(f"Reading data from {data_source}")
        data = pd.read_parquet(data_source)
        logging.info(f"Data read from {data_source}")
        return data
    except Exception as e:
        logging.error(f"Error reading data from {data_source}: {e}")
        raise e

In [6]:
def clean_data(data: Annotated[pd.DataFrame, 'data']) -> Annotated[pd.DataFrame, 'cleaned_data']:
    """
    Clean the data by removing duplicates, null values, and converting columns to appropriate types.

    Args:
        data (pd.DataFrame): The input data.

    Returns:
        pd.DataFrame: The cleaned data. None if an error occurs.

    """
    try:
        logging.info("Cleaning data...")
        data.drop_duplicates(keep='last', inplace=True)
        data.dropna(inplace=True)
        data.drop(columns=['wire.1','client_id', 'CID', 'Base Size'], inplace=True)
        
        # format the date time
        data['date'] = pd.to_datetime(data.date).values

        # Sort
        data.sort_values(by='date', inplace=True)

        # renaming cols
        data.columns = [col.lower().strip().replace(' ', '_')
                        for col in data.columns]
        data.rename(
            {'area_(km)^2': 'area_km2', 'population_(approx.)': 'population',
             'literacy_rate_(%)': 'literacy_rate_perc'},
            axis=1, inplace=True)

        # optimizing for memory
        for col in data.select_dtypes('float64').columns:
            data[col] = data[col].astype('float32')

        for col in data.select_dtypes('int64').columns:
            data[col] = data[col].astype('int32')

        # literacy rate conversion
        data['literacy_rate_perc'] = data.literacy_rate_perc.astype('float32')

        # rename date -> timestamp
        data.rename({'date': 'timestamp'}, axis=1, inplace=True)

        logging.info("Data cleaned.")
        return data
    except Exception as e:
        logging.error(f"Error cleaning data: {e}")
        raise e

In [7]:
def encode_and_aggregate_data(
    data: Annotated[pd.DataFrame, 'cleaned_data']
) -> Tuple[Annotated[pd.DataFrame, 'target'], Annotated[pd.DataFrame, 'static features'], Annotated[pd.DataFrame, 'aggregated application_group table'], Annotated[pd.DataFrame, 'aggregated uses tabe'], Annotated[pd.DataFrame, 'aggregated mkt table']]:
    """
    Encode categorical features of the data.

    Args:
        data: Dataframe containing the categorical features.

    Returns:
        Dataframe containing the encoded categorical features OR None.

    """
    try:
        logging.info('Encoding categorical features...')
        
        # HASH FEATURES category
        data['category'] = data.category.apply(
            lambda cat: {'Domestic': 1, 'Power': 0}[cat])
        data['grade'] = data.grade.apply(
            lambda cat: {'Grade1': 1, 'Grade2': 2,
                         'Grade3': 3, 'Grade4': 4}[cat]
        )
        data['ecoind'] = data.ecoind.apply(
            lambda cat: {'Medium': 2, 'High': 4, 'Low': 2, 'Poor': 1}[cat]
        )

        # OneHot Encoding
        data = pd.get_dummies(data, columns=['division', 'region'])
        
        # renaming cols
        data.columns = [col.lower().strip().replace(' ', '_')
                        for col in data.columns]
        
        for column in data.select_dtypes('bool').columns:
            data[column] = data[column].astype(int)
    

        # optimize for memory
        for col in data.select_dtypes('int64').columns:
            data[col] = data[col].astype('int32')

        # Aggregate targets by outlet_id
        targets = df.pivot_table(index=['timestamp','outlet_id'], aggfunc={
                'net_price': 'mean',
                'qtym': 'mean',
            }
        ).reset_index()
        
        # Aggreate static feature by outlet_id
        static_features = df[['timestamp','outlet_id','wire', 'rm',
           'fy', 'grade', 'noc',
           'dfc', 'area_km2', 'population', 'literacy_rate_perc', 'pcx', 'excnts',
           'exach', 'trc', 'tlcolt', 'tmtm', 'ecoind', 'sf', 'sop', 'pminx',
           'tms_cr', 'mas', 'kpi', ]].groupby(by=['timestamp','outlet_id'],).mean().reset_index()
        
        # aggreated appliatin group by outlet_id
        application_group = pd.DataFrame(columns=['General', 'Moderate', 'Rich', 'Industry'])
        for outlet in df.outlet_id.value_counts().index:
            ratio = df.loc[df.outlet_id==outlet, 'application_group'].value_counts(normalize=True).to_dict()
            application_group.loc[outlet] = ratio
        application_group = application_group.fillna(0).reset_index().rename(columns={'index':'outlet_id'}).astype(np.float32)
        
        # Aggregated uses by outlet_id
        uses = pd.DataFrame(columns=[
            'House Wiring', 'Fan & Lighting Connection',
            'Air Condition & Washing Machine, Heavy Item', 'Lift & Heavy Item',
            'Earthing', 'Industry, Machineries'
            ]
        )
        for outlet in df.outlet_id.value_counts().index:
            ratio = df.loc[df.outlet_id==outlet, 'uses'].value_counts(normalize=True).to_dict()
            uses.loc[outlet] = ratio
        uses = uses.fillna(0).reset_index().rename(columns={'index':'outlet_id'}).astype(np.float32)
        
        # Aggregated mkt ratio by outlet_id
        mkt = pd.DataFrame(columns=
            ['Urban', 'Rural', 'Semi Urban', 'Others']
        )
        for outlet in df.outlet_id.value_counts().index:
            ratio = df.loc[df.outlet_id==outlet, 'mkt'].value_counts(normalize=True).to_dict()
            mkt.loc[outlet] = ratio
        mkt = mkt.fillna(0).reset_index().rename(columns={'index':'outlet_id'}).astype(np.float32)
        logging.info('Encoding categorical features completed.')
        return targets, static_features, application_group, uses, mkt
    except Exception as e:
        logging.error(f'Error encoding categorical features: {e}')
        return None

In [8]:
def AddTemporalFeatures(targets: Annotated[pd.DataFrame, 'encoded data']) -> Annotated[pd.DataFrame, 'temporal features']:
    features_to_extract = [
        "month", "quarter", "semester", "week", "day_of_week", "day_of_month",
        "day_of_year", "weekend", "month_start", "month_end", "quarter_start",
        "quarter_end", "year_start", "year_end"
    ]

    try:
        logging.info(f'==> Processing AddTemporalFeatures()')
        temporal = DatetimeFeatures(
            features_to_extract=features_to_extract).fit_transform(targets[['timestamp']])
        # for col in temporal.columns:
        #     data.loc[:, col] = temporal[col].values
        logging.info(f'==> Successfully processed AddTemporalFeatures()')
        return temporal
    except Exception as e:
        logging.error(f'==> Error in AddTemporalFeatures()')
        raise e

In [9]:
def AddLagFeatures(targets: Annotated[pd.DataFrame, 'after added temporal features']) -> Annotated[pd.DataFrame, 'Lag features']:
    """
    Add lag features to the data.
    """
    logging.info(f"Adding lag features to the data.")
    try:
        # Add Lag  Feature
        lagfeatures = LagFeatures(variables=None, periods=[3, 8, 16, 24], freq=None, sort_index=True,
                                  missing_values='raise', drop_original=False)
        lagfeatures.fit(targets[['timestamp', 'net_price', 'qtym']])
        features = lagfeatures.transform(
            targets[['timestamp', 'net_price', 'qtym']])
        # for col in list(features.columns)[3:]:
        #     data[col] = features[col].values
        logging.info(f'==> Successfully processed add_lag_features()')
        return features.drop(['timestamp', 'net_price', 'qtym'], axis=1)
    except Exception as e:
        logger.error(f'in The add_lag_features(): {e}')
        raise e

In [10]:
def AddWindowFeatures(targets: Annotated[pd.DataFrame, 'After lag features added']) -> Annotated[pd.DataFrame, 'window features']:
    """Add window features to the dataframe

    Args:
        data (Union[dd.DataFrame, pd.DataFrame]): The dataframe to add window features to.

    Returns:
        Union[dd.DataFrame, pd.DataFrame]: The dataframe with window features added.
    """
    logging.info("Adding window features to the dataframe")

    try:
        windowfeatures = WindowFeatures(variables=None, window=24, freq=None, sort_index=True,
                                        missing_values='raise', drop_original=False)
        windowfeatures.fit(
            targets[['timestamp', 'net_price', 'qtym']])
        features = windowfeatures.transform(
            targets[['timestamp', 'net_price', 'qtym']])
        # for col in list(features.columns)[3:]:
        #     data[col] = features[col].values
        logging.info(f'==> Successfully processed add_window_features()')
        return features.drop(['timestamp', 'net_price', 'qtym'], axis=1)
    except Exception as e:
        logging.error(f'in add_window_features(): {e}')
        raise e

In [11]:
def AddExpWindowFeatures(targets: Annotated[pd.DataFrame, 'after added temporal features']) -> Annotated[pd.DataFrame, 'added Expanding Window features']:
    """Add Expanding Window Features to the data.
    Args:
        data (pd.DataFrame): The input data.
    Returns:
        pd.DataFrame: The data with added expanding window features.
    """
    try:

        expwindow = ExpandingWindowFeatures(
            variables=None, min_periods=7, functions='std', 
            periods=7, freq=None, sort_index=True, 
            missing_values='raise', drop_original=False
        )
        features = expwindow.fit_transform(targets[['timestamp', 'net_price', 'qtym']])
        
        # # 
        # for col in list(features.columns)[3:]:
        #     data[col] = features[col].values
        return features.drop(['timestamp', 'net_price', 'qtym'], axis=1)
    except Exception as e:
        logging.error(f'in The add_expw_features(): {e}')
        raise e

In [12]:
def mergeAllFeatures(
    targets: Annotated[pd.DataFrame, 'targets'], 
    static_features: Annotated[pd.DataFrame, 'static_features'],
    application_group: Annotated[pd.DataFrame, 'application_group'],
    uses: Annotated[pd.DataFrame, 'uses'],
    mkt: Annotated[pd.DataFrame, 'mkt'],
    
) -> Tuple[Annotated[pd.DataFrame, 'features'], Annotated[pd.Series, 'target']]:
    """Merges All Features into One.
    Args:
        data (pd.DataFrame): The input data.
    Returns:
        pd.DataFrame: The data with added expanding window features.
    """
    try:
        logging.info(f'==> merging features...')
        
        # Generate outlet wise timeseries_features
        timeseries_features_outlet_wise = pd.DataFrame()
        for outlet_id in targets.outlet_id.value_counts().index:
            outlet_wise = targets.loc[targets.outlet_id==outlet_id]
            temporal = AddTemporalFeatures(outlet_wise)
            lag_features = AddLagFeatures(outlet_wise)
            window_features = AddWindowFeatures(outlet_wise)
            exp_window_features = AddExpWindowFeatures(outlet_wise)
            outlet_wise_features = pd.concat([outlet_wise[['timestamp','outlet_id',]], temporal, lag_features, window_features, exp_window_features], axis=1)
            timeseries_features_outlet_wise = pd.concat([timeseries_features_outlet_wise, outlet_wise_features], ignore_index=True)
        
        # Merge outlet wise timeseries_features
        targets.merge(timeseries_features_outlet_wise, on=['timestamp','outlet_id',], how='inner')

        
        # Merge application group, uses, mkt
        data = targets.merge(
            application_group, on='outlet_id', how='inner'
        ).merge(
            uses, on='outlet_id', how='inner'
        ).merge(
            mkt, on='outlet_id', how='inner'
        ).merge(
            static_features, on=['timestamp', 'outlet_id'], how='inner'
        ).merge(
            timeseries_features_outlet_wise, on=['timestamp', 'outlet_id'], how='inner')
        
        # Drops Nulls
        data.bfill(inplace=True)
        return data.drop(columns=['timestamp', 'net_price', 'qtym', 'outlet_id']), data['qtym']
    except Exception as e:
        logging.error(f'==> Error when merging features: {e}')
        raise e

In [13]:
def scale_data(features: Annotated[pd.DataFrame, 'features to scale'],) -> Annotated[pd.DataFrame, 'standardized features']:
    """Scaling step.
    Args:
        data: Input data.
    Returns:
        Normalized data.
    """
    try:
        logging.info(f'==> Processing scale_data()')
        # Assuming the data is a pandas DataFrame
        scaler = RobustScaler(
            with_centering=True,
            with_scaling=True,
            quantile_range=(25.0, 75.0),
            copy=True,
            unit_variance=False,
        )
        scaler.fit(features)
        features = pd.DataFrame(scaler.transform(features), columns=features.columns)
        scaler.fit(features)
        
#         # save Scaler model
#         joblib.dump(scaler, os.path.join(config.ARTIFACTS_DIR, 'scaler.pkl'))
#         logger.info(
#             f'Scaler model saved to {os.path.join(config.ARTIFACTS_DIR, "scaler.pkl")}')
#         features.to_parquet(config.FEATURE_STORE, index=False)
        return features
    except Exception as e:
        logging.error(f"in scale_data(): {e}")
        raise e

In [14]:
def split_data(
    features: Annotated[pd.DataFrame, 'features'],
    target: Annotated[pd.Series, 'target'],
    test_size: float = 0.25,
    random_state: int = 33
) -> Tuple[Annotated[pd.DataFrame, 'X_train'], Annotated[pd.DataFrame, 'X_test'], Annotated[pd.Series, 'y_train'], Annotated[pd.Series, 'y_test']]:
    """
    Split the data into train and test sets.

    Args:
        features (pd.DataFrame): The input data.
        target (pd.Series) : Target colun
        test_size (float): The proportion of the data to include in the test set. Default is 0.2.
        random_state (int): The seed for the random number generator. Default is 42.

    Returns:
        Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: The train and test sets.
    """
    logging.info("Splitting data...")
    X_train, X_test, y_train, y_test = train_test_split(
        features, target, test_size=test_size, random_state=random_state)
    logging.info("Data split successfully.")
    return X_train, X_test, y_train, y_test

In [15]:
df = ingest_data(data_source='/demand-forecast-SQGroup/data/data_bya_series.parquet')
df = clean_data(data=df)
targets, static_features, application_group, uses, mkt = encode_and_aggregate_data(data=df)
features, target = mergeAllFeatures(targets, static_features, application_group, uses, mkt)
std_features = scale_data(features=features)
X_train, X_test, y_train, y_test = split_data(features=std_features, target=target)

In [16]:
X_train.shape, y_train.shape

((27996, 62), (27996,))

In [17]:
X_test.shape, y_test.shape

((9333, 62), (9333,))

In [20]:
# rfe = RFECV(
#     SVR(kernel='linear'),
#     step=1,
#     min_features_to_select=int(np.sqrt(features.shape[1])),
#     cv=3,
#     scoring='r2',
#     verbose=1,
#     n_jobs=None,
#     importance_getter='coef_',
# )
# rfe.fit(X=X_train, y=y_train)

In [21]:
sm = SmartCorrelatedSelection(variables=None, method='pearson', threshold=0.8, missing_values='ignore', selection_method='missing_values', estimator=None, scoring='r2', cv=5, confirm_variables=False)
best = sm.fit_transform(X_train, y_train)

In [24]:
from sklearn.decomposition import KernelPCA, PCA

def decompose_features_kernel_pca(X, n_components=2, kernel='linear', load=False):
    """
    Decomposes features using KernelPCA.

    Args:
    - X (array-like): Input feature matrix.
    - n_components (int): Number of components (default: 2).
    - kernel (str): Kernel function to use ('linear', 'poly', 'rbf', 'sigmoid', or a callable) (default: 'linear').

    Returns:
    - X_transformed (array-like): Transformed feature matrix.
    """
    if load:
        pass
    
    # Initialize KernelPCA object
    kpca = PCA(n_components=20)
    
    # Fit and transform the input feature matrix
    X_transformed = kpca.fit_transform(X)

    return X_transformed, kpca


In [25]:
pcs_f, pca = decompose_features_kernel_pca(best, n_components=int(np.sqrt(best.shape[1])), kernel='linear', load=False)

In [29]:
pcs_f

array([[-1.7522293 ,  0.98296154,  0.51003387, ..., -0.14732248,
        -0.40549644, -0.1982893 ],
       [ 1.70618202, -1.18945608,  1.29284123, ..., -0.32820525,
        -0.1185912 ,  0.48002742],
       [ 0.0939029 ,  1.70894291, -0.08137184, ...,  1.34855327,
        -1.43384542, -0.74639628],
       ...,
       [ 1.02862341, -3.84349677,  1.06624677, ..., -0.95197557,
        -0.36398289,  0.06009521],
       [ 1.08015896, -1.13031837, -1.57221804, ..., -0.6510743 ,
         0.06035134, -0.20690283],
       [-2.37712072,  1.32586295,  1.3178424 , ...,  0.44071855,
        -0.74486799, -0.43703693]])

<center><b> Experiments </b></center>

In [26]:
def evaluate_model(
    model,
    X: pd.DataFrame,
    y: pd.DataFrame,
) -> None:
    """
    Evaluate the model
    """
    try:
        y_pred = model.predict(X).reshape(y.shape[0], 1)

        # MAPE, MSE, RMSE, R2, AIC, BIC
        mape = mean_absolute_percentage_error(y, y_pred)
        mse = mean_squared_error(y, y_pred)
        r2 = r2_score(y, y_pred)
        mlflow.log_metrics(dict(mape=mape, mse=mse, r2=r2))

    except Exception as e:
        raise e

In [27]:
# mlflow.create_experiment('Net Price Forecasting')

In [31]:
mlflow.set_experiment('Net Price Forecasting')
with mlflow.start_run():
    XGB_PARAMS_SPACE = {
        'max_depth': [3, 5, 7],
        'learning_rate': [0.1, 0.3, 0.5],
        'n_estimators': [100, 200, 300],
        'subsample': [0.5, 0.7, 0.9],
        'colsample_bytree': [0.5, 0.7, 0.9],
        'gamma': [0, 0.1, 0.2]
    }

       
    # Randomized Grid Search for XGBoost hyperparameters
    grid = RandomizedSearchCV(
        XGBRFRegressor(),
        param_distributions=XGB_PARAMS_SPACE,
        scoring='r2',
        cv=5,
        verbose=1
    )
    grid.fit(pcs_f, y_train)
    model = grid.best_estimator_
    mlflow.log_params(grid.best_params_)
    mlflow.xgboost.log_model(model, 'XGBRF-QTYM')
    ### log metrics
    evaluate_model(grid.best_estimator_, pca.transform(X_test[best.columns]), y_test)

Fitting 5 folds for each of 10 candidates, totalling 50 fits




In [32]:
np.sqrt(46)

6.782329983125268