# Описание Домашнего Задания

Признаковая инженерия

<b>Цель:</b>
В данном домашнем задании вы потренируетесь в очистке и обогащении данных.

<b>Описание/Пошаговая инструкция выполнения домашнего задания:</b>
Уважаемый студент!


Вы построили ваши торговые стратегии на основе технического анализа и провели их оценку. С удивлением вы обнаружили, что некоторые модели работают лучше, некоторые хуже, причем это неравномерно распределено во времени.

Поговорив с коллегами, вы решаете построить более сложные модели машинного обучения, но для их эффективной работы необходимо подготовить и очистить данные.

Также постоянно поступают новые данные, и тратить время на их очистку и подготовку непозволительная роскошь. Вы понимаете, что на подготовленных данных вы сможете сформировать новые признаки, которые послужат основой для более сложных моделей.

На основании вышесказанного вам необходимо построить конвейер подготовки данных. Он будет получать сырые данные с информационных площадок, и автоматически готовить их в вид, пригодный для использования в моделях машинного обучения.

<b>Вам предлагается на основе представленной информации:</b>

1. Создать код на Python, который очистит ваши данные от выбросов и заполнит пропуски в них (если они присутствуют).
2. Создать код Python, который сформирует новые признаки из данных для их дальнейшего использования в процессе моделирования.
3. Код должен в автоматическом режиме при получении новой порции данных запустить конвейер очистки и преобразования данных.
4. Подготовленные данные должны сохраняться в виде файлов или БД.
5. Новые данные не должны дублировать предыдущие и не должны их перезаписывать. Только добавлять новые.
6. Сформировать дашборд, показывающий процесс накопления данных и отображающий графики по ним.

# 0. (попытка) Фикса бага в загрузке сырых данных

В рамках проекта уже реализована функция
```
utils.update_tickers_data()
```
которая загружает данные из Yahoo Finance и сохраняет их в локальной SQLite базе в таблицу, название которой зависит от интервала ('1d'-> 'd1' и т.п.)

В рамках этой функции уже реализовано недублирование уже имеющихся данных, но существует баг, из-за которого даже при отсутствии новых данных yfinance всё равно вызывается, и это кроме прочего влияет на выполнение критерия №5

После ряда испытаний выяснилось, что это проявляется в момент выходных / bank holidays - поскольку свежих данных в принципе не существует, логика пытается загрузить их каждый раз.

Т.к. это не влечёт перезаписи/дублирования уже существующих данных - видимо, проблемой это не является

In [1]:
# Относительные ссылки, включая импорты, относительно корневой папки проекта
import os

os.chdir(os.path.dirname(os.getcwd()))

import main
import logging

from src.core import utils

# initialize
logger = logging.getLogger()
# initialize config dict
config = main.main_launch()
# SUBSTITUTE FOR TESTING
# TODO



In [2]:
TICKERS = config.TICKERS
START_DT = config.START_DT
END_DT = (
    config.END_DT
)  # < если сегодня выходной, то последний доступный тик будет в будний/торговый день, и utils.update_tickers_data() попытается загрузить обновлённые данные (хотя их и нет)
INTERVAL = config.INTERVAL

In [3]:
res = utils.get_unavailable_parts(TICKERS, START_DT, END_DT, INTERVAL)

[INFO   ] 2025-04-13@17:59:06: Checking already available data...
[INFO   ] 2025-04-13@17:59:08: 0 tickers have no data at all
[INFO   ] 2025-04-13@17:59:08: 5 tickers lack history in start part
[INFO   ] 2025-04-13@17:59:08: 508 tickers lack history in end part


In [4]:
TICKERS = config.TICKERS
START_DT = config.START_DT
END_DT = '2025-04-11' # < пятница #config.END_DT
INTERVAL = config.INTERVAL

In [5]:
res = utils.get_unavailable_parts(TICKERS, START_DT, END_DT, INTERVAL)

[INFO   ] 2025-04-13@17:59:08: Checking already available data...
[INFO   ] 2025-04-13@17:59:11: 0 tickers have no data at all
[INFO   ] 2025-04-13@17:59:11: 5 tickers lack history in start part
[INFO   ] 2025-04-13@17:59:11: 0 tickers lack history in end part


# 1. Код на Python, очищающий данные от выбросов и заполняющий пропуски в них

На прошлых этапах проекта была реализована функция
```
utils.get_history()
```
которая забирает исходные исторические данные из локального кэша (SQLite)

Используем её как отправную точку

Для очистки данных от выбросов используем Z-score


In [1]:
# Относительные ссылки, включая импорты, относительно корневой папки проекта
import os

os.chdir(os.path.dirname(os.getcwd()))

import main
import logging

import pandas as pd
from scipy.stats import zscore
import plotly.express as px
import plotly.graph_objects as go

from src.core import utils

# initialize
logger = logging.getLogger()
# initialize config dict
config = main.main_launch()
# SUBSTITUTE FOR TESTING
# TODO



In [2]:
tickers = ['BTC-USD']
# start = config.START_DT
# end = config.END_DT
interval = config.INTERVAL
update_cache = False # do not call yfinance during testing


In [3]:
# Get data from DB
data = utils.get_history(
    tickers=tickers,
    start=None,  # Use all existing data
    end=None,  # Use all existing data
    interval=interval,
    update_cache=update_cache,
)

[INFO   ] 2025-04-13@20:29:35: Getting history from local cache DB...
[INFO   ] 2025-04-13@20:29:35: Got history of shape (3861, 7), 0 NaNs


## Обнаружение и очистка аномалий

Пока что наблюдается использование только цен Закрытия и Объема торгов при создании торговых стратегий, но на всякий случай сделаем для всех показателей

In [4]:
def detect_anomalies_z(df:pd.DataFrame, column:str):
    df_copy = df.copy()
    df_copy["Z-score"] = zscore(df_copy[column])
    anomalies_idx = df_copy[abs(df_copy["Z-score"]) > 1.5].index
    return anomalies_idx

In [5]:
# Detect anomalies separately for each ticker and for each column, replace them with NANs
outlier_free_data = []
for ticker in data["Ticker"].unique():
    data_ticker = data[data["Ticker"] == ticker]

    for column in ['Open', 'Low', 'High', 'Close', 'Volume']:
        column_anomalies = detect_anomalies_z(data_ticker, column)
        data_ticker.loc[column_anomalies, column] = None

    outlier_free_data.append(data_ticker)
data_filtered_v1 = pd.concat(outlier_free_data, axis=0).reset_index(drop=True)


При таком подходе видна проблема: если ряд имеет выраженный тренд, в выбросы попадают наиболее свежие и актуальные данные

In [6]:
px.scatter(x=data_filtered_v1["Date"], y=data_filtered_v1["Close"]).show()

Поэтому попробуем перевести логику с абсолютных значений на дельты

In [7]:
def detect_anomalies_z_diff(df: pd.DataFrame, column: str):
    df_copy = df.copy()
    df_copy["Z-score"] = zscore(df_copy[column].diff().fillna(0))
    anomalies_idx = df_copy[abs(df_copy["Z-score"]) > 1.5].index
    return anomalies_idx

In [8]:
# Detect anomalies separately for each ticker and for each column, replace them with NANs
outlier_free_data = []
for ticker in data["Ticker"].unique():
    data_ticker = data[data["Ticker"] == ticker]

    for column in ["Open", "Low", "High", "Close", "Volume"]:
        column_anomalies = detect_anomalies_z_diff(data_ticker, column)
        data_ticker.loc[column_anomalies, column] = None

    outlier_free_data.append(data_ticker)
data_filtered_v2 = pd.concat(outlier_free_data, axis=0).reset_index(drop=True)

In [9]:
px.scatter(x=data_filtered_v2["Date"], y=data_filtered_v2["Close"]).show()

Пропусков в свежих данных всё еще заметное количество, но с этим уже можно жить

In [10]:
data_filtered_v2.tail(10)

Unnamed: 0,Date,Ticker,Open,Low,High,Close,Volume
3851,2025-04-03 00:00:00.000000,BTC-USD,,81282.101562,,83102.828125,36852110000.0
3852,2025-04-04 00:00:00.000000,BTC-USD,83100.25,81670.75,84696.148438,83843.804688,45157640000.0
3853,2025-04-05 00:00:00.000000,BTC-USD,83844.703125,82377.734375,84207.015625,83504.796875,
3854,2025-04-06 00:00:00.000000,BTC-USD,83504.507812,,83704.71875,,
3855,2025-04-07 00:00:00.000000,BTC-USD,,,,79235.335938,
3856,2025-04-08 00:00:00.000000,BTC-USD,79218.476562,,80823.890625,,
3857,2025-04-09 00:00:00.000000,BTC-USD,,,,,
3858,2025-04-10 00:00:00.000000,BTC-USD,,,82700.929688,,
3859,2025-04-11 00:00:00.000000,BTC-USD,,78936.320312,,,41656780000.0
3860,2025-04-12 00:00:00.000000,BTC-USD,,,,,


При этом убирается не то чтобы мало точек, т.е. фильтрация выбросов всё же выполняет свою функцию

In [11]:
data_filtered_v1.isna().sum()

Date        0
Ticker      0
Open      435
Low       434
High      433
Close     435
Volume    263
dtype: int64

In [12]:
data_filtered_v2.isna().sum()

Date        0
Ticker      0
Open      370
Low       357
High      347
Close     373
Volume    222
dtype: int64

In [13]:
data = data_filtered_v2.copy()

## Заполнение пропусков в данных

Логически по шагам:

- делаем ресэмплинг по времени (== заполняются пропущенные временные точки)
- заполняем линейной интерполяцией - считаем что это не влечет за собой data leakage
- дозаполняем ffill'ом и после - bfill'ом - обрабатываем пропуски в конце и начале временного периода

In [14]:
# Make sure we have end of requested time period
data["Date"] = data["Date"].astype("datetime64[ns]")
if pd.to_datetime(config.END_DT) not in data["Date"].values:
    # Append a row with end date
    data = pd.concat(
        [data, pd.DataFrame({"Date": pd.to_datetime(config.END_DT)}, index=[0])], axis=0
    ).reset_index(drop=True)

In [15]:
# Time resampling based on 'Date' field
data["Date"] = data["Date"].astype("datetime64[ns]")
data.set_index("Date", inplace=True)
data = data.resample(config.INTERVAL).asfreq()


In [16]:
# Ticker is always same, ffill/bfill it
data["Ticker"] = data["Ticker"].ffill().bfill()
# Linear interpolation for value columns
for column in ["Open", "Low", "High", "Close", "Volume"]:
    data[column] = data[column].interpolate(
        "linear"
    )  # it does ffill as well, but run next steps just in case
    data[column] = data[column].ffill().bfill()

In [17]:
# Reset index to make output shape same as input
data = data.reset_index(drop=False)

In [18]:
px.scatter(x=data["Date"], y=data["Close"]).show()

In [19]:
# Side-by-side comparison - without interpolation
px.scatter(x=data_filtered_v2["Date"], y=data_filtered_v2["Close"]).show()

In [20]:
data

Unnamed: 0,Date,Ticker,Open,Low,High,Close,Volume
0,2014-09-17,BTC-USD,465.864014,452.421997,468.174011,457.334015,2.105680e+07
1,2014-09-18,BTC-USD,456.859985,413.104004,456.859985,424.440002,3.448320e+07
2,2014-09-19,BTC-USD,424.102997,384.532013,427.834991,394.795990,3.791970e+07
3,2014-09-20,BTC-USD,394.673004,389.882996,423.295990,408.903992,3.686360e+07
4,2014-09-21,BTC-USD,408.084991,393.181000,412.425995,398.821014,2.658010e+07
...,...,...,...,...,...,...,...
3857,2025-04-09,BTC-USD,79218.476562,80083.458333,81762.410156,79235.335938,4.265702e+10
3858,2025-04-10,BTC-USD,79218.476562,79509.889323,82700.929688,79235.335938,4.215690e+10
3859,2025-04-11,BTC-USD,79218.476562,78936.320312,82700.929688,79235.335938,4.165678e+10
3860,2025-04-12,BTC-USD,79218.476562,78936.320312,82700.929688,79235.335938,4.165678e+10


## Соберём воедино в одну функцию

In [21]:
def detect_anomalies_z_diff(df: pd.DataFrame, column: str):
    df_copy = df.copy()
    df_copy["Z-score"] = zscore(df_copy[column].diff().fillna(0))
    anomalies_idx = df_copy[abs(df_copy["Z-score"]) > 1.5].index
    return anomalies_idx

In [22]:
def clean_anomalies_and_fill_gaps(data:pd.DataFrame, end_dt:str, interval:str) -> pd.DataFrame:
    """
    Data preparation function:
    1) Detect anomalies (separately per Ticker and all its value columns), remove them;
    2) Fill missing values (both from missing in raw data and from anomaly detection step)
    """
    # 1) Detect anomalies separately for each ticker and for each column, replace them with NANs
    logger.info("Detecting and removing anomalies...")
    outlier_free_data = []
    for ticker in data["Ticker"].unique():
        data_ticker = data[data["Ticker"] == ticker]

        for column in ["Open", "Low", "High", "Close", "Volume"]:
            column_anomalies = detect_anomalies_z_diff(data_ticker, column)
            data_ticker.loc[column_anomalies, column] = None

        outlier_free_data.append(data_ticker)
    data = pd.concat(outlier_free_data, axis=0).reset_index(drop=True)

    # 2) Fill missing values
    logger.info("Filling missing values...")
    gap_filled_data = []
    for ticker in data["Ticker"].unique():
        data_ticker = data[data["Ticker"] == ticker]

        # Make sure we have end of requested time period
        data_ticker["Date"] = data_ticker["Date"].astype("datetime64[ns]")
        if pd.to_datetime(end_dt) not in data_ticker["Date"].values:
            # Append a row with end date
            data_ticker = pd.concat(
                [
                    data_ticker,
                    pd.DataFrame({"Date": pd.to_datetime(end_dt)}, index=[0]),
                ],
                axis=0,
            ).reset_index(drop=True)

        # Time resampling based on 'Date' field
        data_ticker["Date"] = data_ticker["Date"].astype("datetime64[ns]")
        data_ticker.set_index("Date", inplace=True)
        data_ticker = data_ticker.resample(interval).asfreq()

        # Ticker is always same, ffill/bfill it
        data_ticker["Ticker"] = data_ticker["Ticker"].ffill().bfill()
        # Linear interpolation for value columns
        for column in ["Open", "Low", "High", "Close", "Volume"]:
            data_ticker[column] = data_ticker[column].interpolate(
                "linear"
            )  # it does ffill as well, but run next steps just in case
            data_ticker[column] = data_ticker[column].ffill().bfill()

        # Reset index to make output shape same as input
        data_ticker = data_ticker.reset_index(drop=False)

        gap_filled_data.append(data_ticker)
    data = pd.concat(gap_filled_data, axis=0).reset_index(drop=True)

    logger.info("Data cleaned from anomalies. Filled missing values in it.")

    return data

In [23]:
# TEST
# 1) Get data from DB
data = utils.get_history(
    tickers=tickers,
    start=None,  # Use all existing data
    end=None,  # Use all existing data
    interval=interval,
    update_cache=update_cache,
)
logger.info(f"Input Data in DB: {data.shape=}, {data.isna().sum().sum():,d} NaNs")

# 2) Clean it
data = clean_anomalies_and_fill_gaps(data, end_dt=config.END_DT, interval=config.INTERVAL)
logger.info(f"Cleaned from anomalies and filled gaps: {data.shape=}, {data.isna().sum().sum():,d} NaNs")


[INFO   ] 2025-04-13@20:30:05: Getting history from local cache DB...
[INFO   ] 2025-04-13@20:30:05: Got history of shape (3861, 7), 0 NaNs
[INFO   ] 2025-04-13@20:30:05: Input Data in DB: data.shape=(3861, 7), 0 NaNs
[INFO   ] 2025-04-13@20:30:05: Detecting and removing anomalies...
[INFO   ] 2025-04-13@20:30:05: Filling missing values...
[INFO   ] 2025-04-13@20:30:05: Data cleaned from anomalies. Filled missing values in it.
[INFO   ] 2025-04-13@20:30:05: Cleaned from anomalies and filled gaps: data.shape=(3862, 7), 0 NaNs


In [24]:
# TEST - ALL tickers
# 1) Get data from DB
data = utils.get_history(
    tickers=config.TICKERS,
    start=None,  # Use all existing data
    end=None,  # Use all existing data
    interval=interval,
    update_cache=update_cache,
)
logger.info(f"Input Data in DB: {data.shape=}, {data.isna().sum().sum():,d} NaNs")

# 2) Clean it
data = clean_anomalies_and_fill_gaps(
    data, end_dt=config.END_DT, interval=config.INTERVAL
)
logger.info(
    f"Cleaned from anomalies and filled gaps: {data.shape=}, {data.isna().sum().sum():,d} NaNs"
)

[INFO   ] 2025-04-13@20:30:30: Getting history from local cache DB...
[INFO   ] 2025-04-13@20:30:39: Got history of shape (1837089, 7), 0 NaNs
[INFO   ] 2025-04-13@20:30:39: Input Data in DB: data.shape=(1837089, 7), 0 NaNs
[INFO   ] 2025-04-13@20:30:39: Detecting and removing anomalies...
[INFO   ] 2025-04-13@20:31:51: Filling missing values...


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy



A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value ins

In [26]:
len(config.TICKERS), (2662134-1837089)


(508, 825045)

In [27]:
data.groupby('Ticker').count()

Unnamed: 0_level_0,Date,Open,Low,High,Close,Volume
Ticker,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
A,5579,5579,5579,5579,5579,5579
AAPL,5579,5579,5579,5579,5579,5579
ABBV,4485,4485,4485,4485,4485,4485
ABNB,1586,1586,1586,1586,1586,1586
ABT,5579,5579,5579,5579,5579,5579
...,...,...,...,...,...,...
YUM,5579,5579,5579,5579,5579,5579
ZBH,5579,5579,5579,5579,5579,5579
ZBRA,5579,5579,5579,5579,5579,5579
ZTS,4455,4455,4455,4455,4455,4455


In [33]:
ticker = "^GSPC"

px.scatter(x=data[data['Ticker']==ticker]["Date"], y=data[data['Ticker']==ticker]["Close"]).show()

# 2. Код на Python, формирующий новые признаки из данных для их дальнейшего использования в процессе моделирования

## Очевидно, отправная точка для генерации фич - очищенные от выбросов данные, т.е. результат п.1

In [1]:
# Относительные ссылки, включая импорты, относительно корневой папки проекта
import os

os.chdir(os.path.dirname(os.getcwd()))

import main
import logging

import numpy as np
import pandas as pd

from src.core import utils

# initialize
logger = logging.getLogger()
# initialize config dict
config = main.main_launch()
# SUBSTITUTE FOR TESTING
# TODO



In [2]:
tickers = ["BTC-USD"] # один для теста
# start = config.START_DT
# end = config.END_DT
interval = config.INTERVAL
update_cache = False  # do not call yfinance during testing

In [3]:
# TEST
# 1) Get data from DB
data = utils.get_history(
    tickers=tickers,
    start=None,  # Use all existing data
    end=None,  # Use all existing data
    interval=interval,
    update_cache=update_cache,
)
logger.info(f"Input Data in DB: {data.shape=}, {data.isna().sum().sum():,d} NaNs")

# 2) Clean it
data = utils.clean_anomalies_and_fill_gaps(
    data, end_dt=config.END_DT, interval=config.INTERVAL
)
logger.info(
    f"Cleaned from anomalies and filled gaps: {data.shape=}, {data.isna().sum().sum():,d} NaNs"
)

[INFO   ] 2025-04-14@00:43:47: Getting history from local cache DB...
[INFO   ] 2025-04-14@00:43:48: Got history of shape (3861, 7), 0 NaNs
[INFO   ] 2025-04-14@00:43:48: Input Data in DB: data.shape=(3861, 7), 0 NaNs
[INFO   ] 2025-04-14@00:43:48: Detecting and removing anomalies...
[INFO   ] 2025-04-14@00:43:48: Filling missing values...
[INFO   ] 2025-04-14@00:43:48: Data cleaned from anomalies. Filled missing values in it.
[INFO   ] 2025-04-14@00:43:48: Cleaned from anomalies and filled gaps: data.shape=(3862, 7), 0 NaNs


## Генерим фичи

In [4]:
def add_simple_datetime_features(data:pd.DataFrame)->tuple[pd.DataFrame, list[str]]:
    """
    Add simple Datetime features, inferred directly from 'Date' column
    """
    # NOTE: looks like some will be relevant for a given interval, some will not
    data["year"] = data["Date"].dt.strftime("%Y")
    data["month"] = data["Date"].dt.strftime("%m")
    data["day"] = data["Date"].dt.strftime("%d")
    data["year_month"] = data["Date"].dt.strftime("%Y_%m")
    data["hour"] = data["Date"].dt.strftime("%H")
    data["minute"] = data["Date"].dt.strftime("%M")

    feature_columns = ['year', 'month', 'day', 'year_month', 'hour', 'minute']

    return data, feature_columns


In [5]:
def add_sin_cos_datetime_features(data:pd.DataFrame)->tuple[pd.DataFrame, list[str]]:
    """
    Add sin/cos transformations for month / day / hour / minute
    """
    sin_cos_map = {
        'month': 12,
        'day': 30,
        'hour': 24,
        'minute': 60
    }
    feature_columns = []
    for feature_in, max_val in sin_cos_map.items():
        data[f"{feature_in}_sin"] = np.sin(data[feature_in].astype(int) * (2.0 * np.pi / max_val))
        data[f"{feature_in}_cos"] = np.cos(
            data[feature_in].astype(int) * (2.0 * np.pi / max_val)
        )
        feature_columns += [f"{feature_in}_sin", f"{feature_in}_cos"]

    return data, feature_columns

In [6]:
def add_lag_features(data:pd.DataFrame, features:list[str], lag_periods:int)->tuple[pd.DataFrame, list[str]]:
    """
    Добавляет лаги для указанных признаков на указанное количество периодов назад.

    data: DataFrame с исходными данными
    features: список признаков, для которых необходимо добавить лаги
    lag_periods: сколько лагов назад необходимо создать
    Возвращает:
    - обновленный DataFrame с лагами
    - список новых колонок, которые можно использовать как признаки
    """
    data = data.copy()  # Работаем с копией DataFrame
    feature_columns = []  # Список для хранения новых колонок

    # Для каждого признака создаем лаги
    for feature in features:
        for lag in range(1, lag_periods + 1):
            new_col_name = f"{feature}_lag_{lag}"
            data[new_col_name] = data[feature].shift(lag)
            feature_columns.append(new_col_name)

    # Удаляем строки с NaN значениями, которые появились из-за сдвигов
    data = data.dropna()

    return data, feature_columns

In [7]:
def add_rolling_features(data:pd.DataFrame, features:list[str], window_sizes:list[int])->tuple[pd.DataFrame, list[str]]:
    """
    Добавляет скользящие характеристики для указанных признаков и окон.

    data: DataFrame с исходными данными
    features: список признаков, для которых необходимо добавить скользящие характеристики
    window_sizes: список размеров окон для расчета характеристик (например, [5, 14, 30])

    Возвращает:
    - обновленный DataFrame с новыми фичами
    - список новых колонок, которые можно использовать как признаки
    """
    data = data.copy()  # Работаем с копией DataFrame
    feature_columns = []  # Список для хранения новых колонок

    # Для каждого признака и для каждого окна
    for feature in features:
        for window_size in window_sizes:
            # Скользящее среднее
            data[f"{feature}_mean_{window_size}"] = (
                data[feature].rolling(window=window_size).mean()
            )
            feature_columns.append(f"{feature}_mean_{window_size}")

            # Скользящая медиана
            data[f"{feature}_median_{window_size}"] = (
                data[feature].rolling(window=window_size).median()
            )
            feature_columns.append(f"{feature}_median_{window_size}")

            # Скользящий минимум
            data[f"{feature}_min_{window_size}"] = (
                data[feature].rolling(window=window_size).min()
            )
            feature_columns.append(f"{feature}_min_{window_size}")

            # Скользящий максимум
            data[f"{feature}_max_{window_size}"] = (
                data[feature].rolling(window=window_size).max()
            )
            feature_columns.append(f"{feature}_max_{window_size}")

            # Скользящее стандартное отклонение
            data[f"{feature}_std_{window_size}"] = (
                data[feature].rolling(window=window_size).std()
            )
            feature_columns.append(f"{feature}_std_{window_size}")

            # Скользящий размах (макс - мин)
            data[f"{feature}_range_{window_size}"] = (
                data[f"{feature}_max_{window_size}"]
                - data[f"{feature}_min_{window_size}"]
            )
            feature_columns.append(f"{feature}_range_{window_size}")

            # Скользящее абсолютное отклонение от медианы (mad)
            data[f"{feature}_mad_{window_size}"] = (
                data[feature]
                .rolling(window=window_size)
                .apply(lambda x: np.median(np.abs(x - np.median(x))), raw=True)
            )
            feature_columns.append(f"{feature}_mad_{window_size}")

    # Удаление строк с NaN значениями, которые появляются из-за сдвигов
    data = data.dropna()

    return data, feature_columns

In [8]:
def add_trend_features(data:pd.DataFrame, features:list[str], lag_periods:int)->tuple[pd.DataFrame, list[str]]:
    """
    Добавляет классические финансовые признаки: отношение к предыдущим периодам, логарифмические изменения и индикаторы трендов.

    data: DataFrame с исходными данными
    features: список признаков, для которых необходимо добавить индикаторы
    lag_periods: сколько периодов назад учитывать для расчетов

    Возвращает:
    - обновленный DataFrame с новыми фичами
    - список новых колонок, которые можно использовать как признаки
    """
    data = data.copy()  # Работаем с копией DataFrame
    feature_columns = []  # Список для хранения новых колонок

    for feature in features:
        # Отношение текущего значения к предыдущему (лаг = 1)
        data[f"{feature}_ratio_1"] = data[feature] / data[feature].shift(1)
        feature_columns.append(f"{feature}_ratio_1")

        # Логарифмическое изменение (логарифм отношения текущего значения к предыдущему)
        data[f"{feature}_log_diff_1"] = np.log(data[feature] / data[feature].shift(1))
        feature_columns.append(f"{feature}_log_diff_1")

        # Momentum (разница между текущим значением и значением N периодов назад)
        data[f"{feature}_momentum_{lag_periods}"] = data[feature] - data[feature].shift(
            lag_periods
        )
        feature_columns.append(f"{feature}_momentum_{lag_periods}")

        # Rate of Change (ROC): процентное изменение за N периодов
        data[f"{feature}_roc_{lag_periods}"] = (
            (data[feature] - data[feature].shift(lag_periods))
            / data[feature].shift(lag_periods)
            * 100
        )
        feature_columns.append(f"{feature}_roc_{lag_periods}")

        # Exponential Moving Average (EMA) с периодом N
        data[f"{feature}_ema_{lag_periods}"] = (
            data[feature].ewm(span=lag_periods, adjust=False).mean()
        )
        feature_columns.append(f"{feature}_ema_{lag_periods}")

    # Удаление строк с NaN значениями, которые появились из-за сдвигов
    data = data.dropna()

    return data, feature_columns

In [9]:
def add_macd(data:pd.DataFrame, feature:str, short_window:int=12, long_window:int=26)->tuple[pd.DataFrame, list[str]]:
    """
    Добавляет индикатор MACD (разница между краткосрочным и долгосрочным EMA).

    data: DataFrame с исходными данными
    feature: признак, для которого необходимо рассчитать MACD
    short_window: окно для краткосрочного EMA (по умолчанию 12)
    long_window: окно для долгосрочного EMA (по умолчанию 26)

    Возвращает:
    - обновленный DataFrame с MACD
    - название новой колонки с MACD
    """
    data = data.copy()

    # Рассчитываем краткосрочное и долгосрочное EMA
    ema_short = data[feature].ewm(span=short_window, adjust=False).mean()
    ema_long = data[feature].ewm(span=long_window, adjust=False).mean()

    # Разница между краткосрочным и долгосрочным EMA (MACD)
    data[f"{feature}_macd"] = ema_short - ema_long

    return data, [f"{feature}_macd"]

In [10]:
def add_features(data:pd.DataFrame)->tuple[pd.DataFrame, list[str]]:
    """
    Add features to the dataframe, calling individual specific feature functions
    data DataFrame may contain multiple Tickers
    """
    logger.info("Adding features...")
    all_feature_columns = []
    # Part 1. Features which can be mapped on multiple tickers at once

    # Simple Datetime features
    logger.info("Adding simple datetime features...")
    data, datetime_features = add_simple_datetime_features(data)
    all_feature_columns += datetime_features

    # Sin-Cos transformations
    logger.info("Adding sin-cos transformations for datetime features...")
    data, sincos_features = add_sin_cos_datetime_features(data)
    all_feature_columns += sincos_features

    # Part 2. Features which require only a single ticker
    full_data = []
    for ticker in data['Ticker'].unique():
        logger.info(f"Adding ticker-specific features for {ticker}")
        ticker_data = data[data['Ticker']==ticker]

        # Lag features
        logger.info("Adding lag features...")
        lag_periods = 30  # пальцем в небо, TODO протестировать / подобрать лучшее
        features_to_lag = ['Open', 'High', 'Low', 'Close', 'Volume']
        ticker_data, lag_features = add_lag_features(ticker_data, features_to_lag, lag_periods)
        all_feature_columns += lag_features

        # Rolling features
        logger.info("Adding rolling features...")
        window_sizes = [5, 14, 30]
        features_to_rolling = ["Open", "High", "Low", "Close", "Volume"]
        ticker_data, rolling_features = add_rolling_features(ticker_data, features_to_rolling, window_sizes)
        all_feature_columns += rolling_features

        # Trend features
        logger.info("Adding trend features...")
        lag_periods = 3
        features_to_trend = ["Open", "High", "Low", "Close", "Volume"]
        ticker_data, trend_features = add_trend_features(ticker_data, features_to_trend, lag_periods)
        all_feature_columns += trend_features

        # MACD
        logger.info("Adding MACD indicator...")
        macd_short_window = 12
        macd_long_window = 26
        ticker_data, macd_columns = add_macd(ticker_data, 'Close', macd_short_window, macd_long_window)
        all_feature_columns += macd_columns

        full_data.append(ticker_data)

    data = pd.concat(full_data, axis=0).reset_index(drop=True)
    del full_data

    # Drop duplicates from all_feature_columns
    all_feature_columns = list(set(all_feature_columns))

    logger.info(f"Features added, {len(all_feature_columns):,d} new columns added")
    logger.info(
        f"Dataframe after features mapping: {data.shape=}, {data.isna().sum().sum():,d} NaNs"
    )

    return data, all_feature_columns

In [11]:
# TEST
data, all_feature_columns = add_features(data)


[INFO   ] 2025-04-14@00:43:55: Adding features...
[INFO   ] 2025-04-14@00:43:55: Adding simple datetime features...
[INFO   ] 2025-04-14@00:43:55: Adding sin-cos transformations for datetime features...
[INFO   ] 2025-04-14@00:43:55: Adding ticker-specific features for BTC-USD
[INFO   ] 2025-04-14@00:43:55: Adding lag features...
  data[new_col_name] = data[feature].shift(lag)
  data[new_col_name] = data[feature].shift(lag)
  data[new_col_name] = data[feature].shift(lag)
  data[new_col_name] = data[feature].shift(lag)
  data[new_col_name] = data[feature].shift(lag)
  data[new_col_name] = data[feature].shift(lag)
  data[new_col_name] = data[feature].shift(lag)
  data[new_col_name] = data[feature].shift(lag)
  data[new_col_name] = data[feature].shift(lag)
  data[new_col_name] = data[feature].shift(lag)
  data[new_col_name] = data[feature].shift(lag)
  data[new_col_name] = data[feature].shift(lag)
  data[new_col_name] = data[feature].shift(lag)
  data[new_col_name] = data[feature].shift(l

In [12]:
data.sample(3)

Unnamed: 0,Date,Ticker,Open,Low,High,Close,Volume,year,month,day,...,Close_log_diff_1,Close_momentum_3,Close_roc_3,Close_ema_3,Volume_ratio_1,Volume_log_diff_1,Volume_momentum_3,Volume_roc_3,Volume_ema_3,Close_macd
3159,2023-07-13,BTC-USD,30387.488281,30268.351562,31814.515625,31476.048828,23686080000.0,2023,7,13,...,0.035059,1061.578125,3.490372,30954.304683,1.599799,0.469878,8857870000.0,59.736616,18593270000.0,643.851011
626,2016-08-05,BTC-USD,578.281006,569.981995,578.281006,575.04303,66127900.0,2016,8,5,...,-0.005629,27.578003,5.0374,576.052543,0.52779,-0.639056,-264805100.0,-80.017737,117612800.0,-21.382553
822,2017-02-17,BTC-USD,1026.119995,1025.640015,1053.170044,1046.209961,136474000.0,2017,2,17,...,0.018104,41.659973,4.147128,1030.966065,1.116105,0.109845,-1472000.0,-1.067084,125514700.0,24.458986


In [13]:
data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3800 entries, 0 to 3799
Columns: 302 entries, Date to Close_macd
dtypes: datetime64[ns](1), float64(294), object(7)
memory usage: 8.8+ MB


Поскольку функций довольно много, и список должен быть расширяемым по ходу развития проекта, вынесем логику генерации фич в отдельный модуль - ./app/src/core/features.py