In [1]:
import warnings

warnings.filterwarnings("ignore")

In [2]:
from pathlib import Path

raw_data_folder: Path = Path("00_raw/")
draft_data_processing_folder: Path = Path("10_draft_processing/")
aggregare_draft_data_folder: Path = Path("20_aggregate_draft/")
prod_data_folder: Path = Path("30_prod/")
features_data_folder: Path = Path("40_features/")

In [3]:
import pandas as pd

excel_dataset_name: str = "12_Industry_Portfolios_Daily.xlsx"
equally_weighted_returns_sheet_name: str = "Average Equal Weighted Returns"

df: pd.DataFrame = pd.read_excel(
    io=raw_data_folder.joinpath(excel_dataset_name),
    sheet_name=equally_weighted_returns_sheet_name,
)
df.rename(columns={"Unnamed: 0": "Date"}, inplace=True)
df["Date"] = pd.to_datetime(df["Date"], format="%Y%m%d")

sectors: list[str] = [
    industry.strip() for industry in df.columns.difference(["Date"]).to_list()
]

df.rename(
    columns={k: f"{k.strip()}_Returns_1" for k in df.columns.difference(["Date"])},
    inplace=True,
)

df.loc[:, df.columns != "Date"] = df.loc[:, df.columns != "Date"] / 100

In [4]:
features: list[dict[str, str]] = [
    {
        "name": "Volatility_Index",
        "type": "market_status",
        "source": "yahoo_finance",
        "ticker": "^VIX",
        "reference_variable": "Close",
        "description": "CBOE Volatility Index",
        "added_features": [
            {
                "name": "Returns",
                "reference_variable": "Close",
                "periods": [5, 20, 40],
            },
            {
                "name": "StdDev",
                "reference_variable": "Close",
                "periods": [5, 20, 40],
            },
        ],
    },
    {
        "name": "Momentum",
        "type": "asset_feature",
        "reference_variable": "Returns",
        "reference_period": 1,
        "periods": [15, 40, 70],
    },
    {
        "name": "StdDev",
        "type": "asset_feature",
        "reference_variable": "Returns",
        "reference_period": 1,
        "periods": [15, 40, 70],
    },
]

In [5]:
import numpy as np
import yfinance as yf

from numpy.typing import NDArray

for feature in features:
    feature_name: str = feature.get("name", "N/A")
    feature_type: str = feature.get("type", "N/A")

    print(f"Currently fetching data on the following feature: {feature_name}")

    match feature_type:
        case "market_status":
            feature_source: str = feature.get("source")
            added_features: list[dict[str, str]] = feature.get("added_features", [])

            if feature_source == "yahoo_finance":
                feature_ticker: str = feature.get("ticker")

                if feature_ticker is None:
                    print("Unable to recognize identify the ticker")

                _feature_data: pd.DataFrame = yf.download(tickers=feature_ticker)

                for added_feature in added_features:
                    added_feature_name: str = added_feature.get("name", "N/A")

                    print(
                        "Currently fetching data on the following feature:"
                        f" {added_feature_name}"
                    )

                    match added_feature_name:
                        case "Momentum":
                            variable_reference: str = added_feature.get(
                                "reference_variable"
                            )
                            periods: list[int | float] = added_feature.get(
                                "periods",
                                [],
                            )

                            if variable_reference is None:
                                print("No variable reference has been specified")

                            if not periods:
                                print("No periods have been specified")
                                ticker_name = feature.get("name", "N/A")

                            if variable_reference not in _feature_data.columns:
                                print(
                                    "Unable to identify the specified variable"
                                    " reference on the given dataset"
                                )

                            for period in periods:
                                end_data: pd.Series = _feature_data.loc[
                                    period:, variable_reference
                                ].to_numpy()

                                start_data: pd.Series = _feature_data.loc[
                                    : len(_feature_data) - period - 1,
                                    variable_reference,
                                ].to_numpy()

                                _feature_data[
                                    f"{variable_reference}_{added_feature_name}_{str(period)}"
                                ] = np.zeros(
                                    len(
                                        _feature_data,
                                    ),
                                )

                                _feature_data.loc[
                                    period:,
                                    f"{variable_reference}_{added_feature_name}_{str(period)}",
                                ] = (end_data - start_data) / 100

                        case "Returns":
                            variable_reference: str = added_feature.get(
                                "reference_variable"
                            )

                            periods: list[int | float] = added_feature.get(
                                "periods",
                                [],
                            )

                            if variable_reference is None:
                                print("No variable reference has been specified")

                            if not periods:
                                print("No periods have been specified")
                                ticker_name = feature.get("name", "N/A")

                            if variable_reference not in _feature_data.columns:
                                print(
                                    "Unable to identify the specified variable"
                                    " reference on the given dataset"
                                )

                            for period in periods:
                                _feature_data[
                                    f"{variable_reference}_{added_feature_name}_{str(period)}"
                                ] = _feature_data[variable_reference].pct_change(period)

                        case "StdDev":
                            variable_reference: str = added_feature.get(
                                "reference_variable"
                            )
                            periods: list[int | float] = added_feature.get(
                                "periods",
                                [],
                            )

                            if variable_reference is None:
                                print("No variable reference has been specified")

                            if variable_reference not in _feature_data.columns:
                                print(
                                    "Unable to identify the specified variable"
                                    " reference on the given dataset"
                                )

                            for period in periods:
                                _feature_data[
                                    f"{variable_reference}_{added_feature_name}_{str(period)}"
                                ] = (
                                    _feature_data[variable_reference]
                                    .rolling(period)
                                    .std()
                                )

                _feature_data["Date"] = _feature_data.index
                _feature_data.reset_index(inplace=True, drop=True)
                col = _feature_data.pop("Date")
                _feature_data.insert(0, col.name, col)

                _feature_data.to_csv(
                    path_or_buf=draft_data_processing_folder.joinpath(
                        f"{feature_name}.csv"
                    ),
                    sep=",",
                    encoding="UTF-8",
                    index=False,
                )
            else:
                print("Not able to correctly fetch data on this feature...")

        case "asset_feature":
            match feature_name:
                case "Momentum":
                    variable_reference: str = feature.get("reference_variable")
                    variable_period: str = feature.get("reference_period")
                    periods: list[int | float] = feature.get("periods", [])

                    if variable_reference is None:
                        print("No variable reference has been specified")

                    if variable_period is None:
                        print("No variable reference period has been specified")

                    if not periods:
                        print("No periods have been specified")

                    for sector in sectors:
                        reference_column_name: str = (
                            f"{sector}_{variable_reference}_{variable_period}"
                        )
                        if reference_column_name not in df.columns:
                            print(
                                "Unable to identify the specified variable reference on"
                                " the given dataset"
                            )

                        for period in periods:
                            _ones_vector: NDArray = np.ones(len(df))
                            _returns: NDArray = (
                                _ones_vector + df[reference_column_name].to_numpy()
                            )
                            _cumulated_returns: NDArray = np.cumprod(_returns)

                            end_data: NDArray = _cumulated_returns[period:,]
                            start_data: NDArray = _cumulated_returns[: len(df) - period]

                            df.loc[
                                period:,
                                f"{sector}_{variable_reference}_{feature_name}_{period}",
                            ] = (
                                end_data - start_data
                            )

                case "StdDev":
                    variable_reference: str = feature.get("reference_variable")
                    variable_period: str = feature.get("reference_period")
                    periods: list[int | float] = feature.get("periods", [])

                    if variable_reference is None:
                        print("No variable reference has been specified")

                    if variable_period is None:
                        print("No variable reference period has been specified")

                    if not periods:
                        print("No periods have been specified")

                    for sector in sectors:
                        reference_column_name: str = (
                            f"{sector}_{variable_reference}_{variable_period}"
                        )

                        if reference_column_name not in df.columns:
                            print(
                                "Unable to identify the specified variable reference on"
                                " the given dataset"
                            )

                        for period in periods:
                            df[
                                f"{sector}_{variable_reference}_{feature_name}_{period}"
                            ] = (df[reference_column_name].rolling(period).std())
                case _:
                    print("Unable to properly recognize the specified asset feature")

df.to_csv(
    path_or_buf=draft_data_processing_folder.joinpath("dataset.csv"),
    sep=",",
    encoding="UTF-8",
    index=False,
)

Currently fetching data on the following feature: Volatility_Index


[*********************100%%**********************]  1 of 1 completed


Currently fetching data on the following feature: Returns
Currently fetching data on the following feature: StdDev
Currently fetching data on the following feature: Momentum
Currently fetching data on the following feature: StdDev


In [6]:
market_status_features: list[str] = ["Close", "Returns", "Momentum", "StdDev"]

_data_frames: list[pd.DataFrame] = [df]

for feature in features:
    feature_type: str = feature.get("type")

    if feature_type == "market_status":
        feature_name: str = feature.get("name")

        _df: pd.DataFrame = pd.read_csv(
            filepath_or_buffer=draft_data_processing_folder.joinpath(
                f"{feature_name}.csv"
            ),
            sep=",",
            encoding="UTF-8",
        )

        if feature_name is None:
            print("Unable to correctly fetch the feature name")

        filtered_df: pd.DataFrame = _df.loc[
            :,
            ["Date"]
            + [
                column
                for column in _df.columns
                if len(set(column.split("_")).intersection(set(market_status_features)))
                >= 1
            ],
        ]

        filtered_df["Date"] = pd.to_datetime(filtered_df["Date"])
        filtered_df.columns = ["Date"] + [
            f"{feature_name}_" + column for column in filtered_df.columns[1:]
        ]

        _data_frames.append(filtered_df)

common_dates: set = set(_data_frames[0]["Date"])

for i in range(1, len(_data_frames)):
    common_dates.intersection_update(set(_data_frames[i]["Date"]))

_columns: set = set()

mask = df["Date"].isin(common_dates)
masked: pd.DataFrame = df[mask].reset_index(drop=True)
_columns.update(masked.columns)

result: pd.DataFrame = masked

for i in range(1, len(_data_frames)):
    df = _data_frames[i]
    mask = df["Date"].isin(common_dates)
    masked: pd.DataFrame = df[mask].reset_index(drop=True)
    _columns.update(masked.columns)

    result = pd.concat(
        [result, masked.loc[:, masked.columns != "Date"]],
        axis=1,
        join="outer",
    )

result.to_csv(
    path_or_buf=aggregare_draft_data_folder.joinpath("dataset.csv"),
    sep=",",
    encoding="UTF-8",
    index=False,
)

In [7]:
max_first_valid_indexes: int = 0

for column in result.columns:
    current_first_valid_index = result[column].first_valid_index()
    if result[column].first_valid_index() > max_first_valid_indexes:
        max_first_valid_indexes = current_first_valid_index

result = result.loc[max_first_valid_indexes:, :]
result.reset_index(drop=True, inplace=True)

In [8]:
result.to_csv(
    path_or_buf=prod_data_folder.joinpath("dataset.csv"),
    sep=",",
    encoding="UTF-8",
    index=False,
)