**IMPORTS**


In [None]:
import datetime
import logging
import pandas as pd
from typing import Any, List, Literal, Tuple, TypeAlias, cast
from sklearn.multioutput import RegressorChain
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from xgboost import XGBRegressor
from gsp.model.utils import make_mw_in_groups, make_shift_in_groups, ColumnTransformerWrapper, show, get_nth_previous_working_date
from data import SCRAPED_STOCK_FILE_PATH


**FUNCTIONS**

In [None]:
def load_data() -> pd.DataFrame:
    return pd.read_csv(
        SCRAPED_STOCK_FILE_PATH,
        dtype={
            "Date": "period[D]",
            "Open": "float",
            "High": "float",
            "Low": "float",
            "Close": "float",
            "Volume": "int",
            "Area": "category",
            "Name": "category",
        },
    )


def clean_data(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy(deep=True)
    periods = pd.date_range(
        start=df["Date"].min().to_timestamp().date(), end=df["Date"].max().to_timestamp().date(), freq="B"
    )
    periods_df = pd.DataFrame({"Date": periods}, dtype="period[D]")

    clean_data = (
        cast(
            pd.DataFrame,
            periods_df.set_index("Date")
            .join(df.set_index("Date"))
            .set_index("Name", append=True)
            .sort_index()
            .unstack("Name")
            .ffill()
            .stack("Name", future_stack=True),  # type: ignore
        )
        .reset_index()
        .dropna(subset=["Name"])
        .set_index(["Date", "Name"])
    )

    return clean_data


def engineer_features(
    df: pd.DataFrame,
    categorical_features: List[str],
    shift_list: List[int],
    mwm_list: List[int],
) -> pd.DataFrame:

    df = df.copy()

    if "DayOfWeek" in categorical_features:
        df["DayOfWeek"] = cast(pd.PeriodIndex, df.index.get_level_values("Date")).dayofweek

    if "Month" in categorical_features:
        df["Month"] = cast(pd.PeriodIndex, df.index.get_level_values("Date")).month

    if "Year" in categorical_features:
        df["Year"] = cast(pd.PeriodIndex, df.index.get_level_values("Date")).year

    if "WeekOfYear" in categorical_features:
        df["WeekOfYear"] = cast(pd.PeriodIndex, df.index.get_level_values("Date")).week # type: ignore

    if "DayOfMonth" in categorical_features:
        df["DayOfMonth"] = cast(pd.PeriodIndex, df.index.get_level_values("Date")).day

    if "Quarter" in categorical_features:
        df["Quarter"] = cast(pd.PeriodIndex, df.index.get_level_values("Date")).quarter

    if "AreaCat" in categorical_features:
        df["AreaCat"] = df["Area"]

    grouped_lags: List[pd.DataFrame | pd.Series] = [
        make_shift_in_groups(df, groupby=["Name"], column="Close", shift=shift_list),
        make_mw_in_groups(df, groupby=["Name"], column="Close", window=mwm_list),
    ]

    df_grouped_lags = df.join(grouped_lags, how="left")  

    return df_grouped_lags


def process_data(
    df: pd.DataFrame, 
    n_steps: int, 
    categorical_features: List[str], 
    starting_date_to_consider: datetime.date | None = None
) -> Tuple[pd.DataFrame, pd.DataFrame]:

    df = df.copy()

    """First date where all stocks have data"""
    first_all_valid_date: datetime.date = (
        cast(pd.Period, df.unstack("Name").dropna().first_valid_index()).to_timestamp().date()
    )

    earliest_date: datetime.date = first_all_valid_date

    if starting_date_to_consider  is not None and starting_date_to_consider  < first_all_valid_date:
        show(
            f"WARNING: starting_date_to_consider ({starting_date_to_consider}) is before the first date where all stocks have data. Using {first_all_valid_date} instead."
        )
    elif starting_date_to_consider  is not None:
        earliest_date = starting_date_to_consider 

    ctw = ColumnTransformerWrapper(
        transformers=[
            ("onehot", OneHotEncoder(), categorical_features),
        ],
        remainder="passthrough",
    )

    X = ctw.fit_transform(df.drop(columns=["Adj Close", "Volume", "High", "Low", "Open", "Area"])).loc[
        earliest_date.isoformat() :
    ]
    y = make_shift_in_groups(df, groupby=["Name"], column="Close", shift=[-i for i in range(1, n_steps + 1)])

    return X, y


def split_data(
    X: pd.DataFrame,
    y: pd.DataFrame,
    n_steps: int,
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """A function that splits the time series data into train and test sets in regards
    to the number of forecasting steps. It will test the model on the last 2*n_steps.

    Args:
        X (pd.DataFrame): Features dataframe
        y (pd.DataFrame): Target dataframe
        n_steps (int): Number of steps of the forecasting task

    Returns:
        Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame, pd.DataFrame]: X_train, y_train, X_test, y_test
    """
    latest_date = cast(pd.Period, X.index.get_level_values("Date").max()).to_timestamp().date()
    test_end_date = get_nth_previous_working_date(n=n_steps, date=latest_date)
    test_start_date = get_nth_previous_working_date(n=2*n_steps-1, date=latest_date)
    train_end_date = get_nth_previous_working_date(n=2*n_steps, date=latest_date)

    X_train = X.loc[:train_end_date.isoformat()]
    y_train = X.loc[:train_end_date.isoformat()]
    X_test = X.loc[test_start_date.isoformat() : test_end_date.isoformat()]
    y_test = y.loc[test_start_date.isoformat() : test_end_date.isoformat()]

    return X_train, y_train, X_test, y_test


n_forecasting_problem : int = 3
categorical_features : List[str] = ["DayOfWeek", "AreaCat"]
stocks = load_data()
clean = clean_data(stocks)
features = engineer_features(clean, categorical_features, shift_list=[1], mwm_list=[5])
X, y = process_data(features, n_forecasting_problem, categorical_features)
X_train, y_train, X_test, y_test = split_data(X, y, n_forecasting_problem)

show("DONE")

**EXECUTION**