[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/ZuchniakK/CryptoDataProcessing/blob/main/4_data_save.ipynb)

### In this notebook I put together all dataset generation steps.

In [1]:
import copy
import json
import pickle
import random
import zipfile
from concurrent.futures import ALL_COMPLETED, ThreadPoolExecutor, wait
from datetime import date, datetime, timedelta
from functools import partial
from os import makedirs
from os.path import exists, join
from pprint import pprint
from time import monotonic

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pandas_ta as ta
import talib
from sklearn.preprocessing import PowerTransformer, QuantileTransformer, StandardScaler

# I put the functions defined in previous notebooks in the crypto_data.py file and we will import them from there
from crypto_data import (
    daterange,
    download_extract_zip,
    get_trades,
    merge_ohlc,
    trades2ohlc,
)

In [3]:
%matplotlib inline
plt.rcParams["figure.figsize"] = [8, 8]

Categorization of features from previous notebooks

In [4]:
MONOTONIC_COL = [
    "SQZ_NO",
    "SQZPRO_NO",
    "TOS_STDEVALL_LR",
    "TOS_STDEVALL_L_1",
    "TOS_STDEVALL_U_1",
    "TOS_STDEVALL_L_2",
    "TOS_STDEVALL_U_2",
    "TOS_STDEVALL_L_3",
    "TOS_STDEVALL_U_3",
]

PAIR_COL = [
    "HILOl_13_21",
    "HILOs_13_21",
    "PSARl_0.02_0.2",
    "PSARs_0.02_0.2",
    "QQEl_14_5_4.236",
    "QQEs_14_5_4.236",
    "SUPERTl_7_3.0",
    "SUPERTs_7_3.0",
]

CANDLE_COL = [
    "CDL_RICKSHAWMAN",
    "CDL_DARKCLOUDCOVER",
    "CDL_HOMINGPIGEON",
    "CDL_LONGLEGGEDDOJI",
    "CDL_MATCHINGLOW",
    "CDL_SPINNINGTOP",
    "CDL_ADVANCEBLOCK",
    "CDL_PIERCING",
    "CDL_3WHITESOLDIERS",
    "CDL_HIKKAKE",
    "CDL_SHOOTINGSTAR",
    "CDL_3LINESTRIKE",
    "CDL_STICKSANDWICH",
    "CDL_CONCEALBABYSWALL",
    "CDL_DOJISTAR",
    "CDL_GAPSIDESIDEWHITE",
    "CDL_KICKINGBYLENGTH",
    "CDL_HARAMICROSS",
    "CDL_3INSIDE",
    "CDL_BREAKAWAY",
    "CDL_EVENINGDOJISTAR",
    "CDL_UPSIDEGAP2CROWS",
    "CDL_XSIDEGAP3METHODS",
    "CDL_INSIDE",
    "CDL_ONNECK",
    "CDL_BELTHOLD",
    "CDL_MARUBOZU",
    "CDL_ABANDONEDBABY",
    "CDL_HIKKAKEMOD",
    "CDL_RISEFALL3METHODS",
    "CDL_KICKING",
    "CDL_DOJI_10_0.1",
    "CDL_HARAMI",
    "CDL_3BLACKCROWS",
    "CDL_LADDERBOTTOM",
    "CDL_INNECK",
    "CDL_SHORTLINE",
    "CDL_3OUTSIDE",
    "CDL_MORNINGSTAR",
    "CDL_HIGHWAVE",
    "CDL_LONGLINE",
    "CDL_TRISTAR",
    "CDL_UNIQUE3RIVER",
    "CDL_2CROWS",
    "CDL_THRUSTING",
    "CDL_COUNTERATTACK",
    "CDL_MORNINGDOJISTAR",
    "CDL_INVERTEDHAMMER",
    "CDL_CLOSINGMARUBOZU",
    "CDL_HANGINGMAN",
    "CDL_TASUKIGAP",
    "CDL_3STARSINSOUTH",
    "CDL_ENGULFING",
    "CDL_DRAGONFLYDOJI",
    "CDL_HAMMER",
    "CDL_GRAVESTONEDOJI",
    "CDL_MATHOLD",
    "CDL_TAKURI",
    "CDL_IDENTICAL3CROWS",
    "CDL_EVENINGSTAR",
    "CDL_STALLEDPATTERN",
    "CDL_SEPARATINGLINES",
]

ZERO_ONE_COL = [
    "DEC_1",
    "THERMOl_20_2_0.5",
    "TTM_TRND_6",
    "INC_1",
    "THERMOs_20_2_0.5",
    "AMATe_LR_8_21_2",
    "SQZPRO_OFF",
    "AOBV_LR_2",
    "SQZPRO_ON_NARROW",
    "PSARr_0.02_0.2",
    "SQZPRO_ON_NORMAL",
    "AMATe_SR_8_21_2",
    "AOBV_SR_2",
    "SQZ_OFF",
    "SUPERTd_7_3.0",
    "SQZ_ON",
    "SQZPRO_ON_WIDE",
]

PRICE_COL = [
    "DCL_20_20",
    "ISB_26",
    "ABER_ZG_5_15",
    "high",
    "HA_open",
    "weighted",
    "ABER_SG_5_15",
    "WMA_10",
    "EMA_10",
    "ICS_26",
    "HWU",
    "low",
    "SUPERT_7_3.0",
    "MIDPOINT_2",
    "VWAP_D",
    "RMA_10",
    "HLC3",
    "TRIMA_10",
    "HWL",
    "HA_low",
    "HA_high",
    "IKS_26",
    "KCBe_20_2",
    "ACCBM_20",
    "MEDIAN_30",
    "CKSPl_10_3_20",
    "KCLe_20_2",
    "ACCBL_20",
    "LR_14",
    "KAMA_10_2_30",
    "HL2",
    "TEMA_10",
    "QTL_30_0.5",
    "ZL_EMA_10",
    "VWMA_10",
    "ITS_9",
    "LDECAY_5",
    "DCM_20_20",
    "JMA_7_0",
    "VIDYA_14",
    "DEMA_10",
    "ACCBU_20",
    "ISA_9",
    "FWMA_10",
    "CKSPs_10_3_20",
    "HILO_13_21",
    "WCP",
    "HMA_10",
    "PWMA_10",
    "HA_close",
    "BBM_5_2.0",
    "SWMA_10",
    "MCGD_10",
    "DCU_20_20",
    "MIDPRICE_2",
    "T3_10_0.7",
    "HWM",
    "SSF_10_2",
    "OHLC4",
    "HWMA_0.2_0.1_0.1",
    "open",
    "BBL_5_2.0",
    "ALMA_10_6.0_0.85",
    "SINWMA_14",
    "close",
    "ABER_XG_5_15",
    "BBU_5_2.0",
    "KCUe_20_2",
    "SMA_10",
]

OTHER_COL = [
    "STCmacd_10_12_26_0.5",
    "WILLR_14",
    "ENTP_10",
    "INERTIA_20_14",
    "J_9_3",
    "TRIX_30_9",
    "AROONOSC_14",
    "AO_5_34",
    "MACDs_12_26_9",
    "QQE_14_5_4.236",
    "trades",
    "PVOs_12_26_9",
    "SMIs_5_20_5",
    "QQE_14_5_4.236_RSIMA",
    "KVOs_34_55_13",
    "STOCHRSIk_14_14_3_3",
    "MFI_14",
    "MACD_12_26_9",
    "EOM_14_100000000",
    "PVT",
    "STOCHk_14_3_3",
    "SMIo_5_20_5",
    "DPO_20",
    "high_Z_30_1",
    "PVOh_12_26_9",
    "AR_26",
    "PSARaf_0.02_0.2",
    "BBP_5_2.0",
    "FISHERT_9_1",
    "CG_10",
    "NATR_14",
    "STDEV_30",
    "CMF_20",
    "THERMOma_20_2_0.5",
    "PPO_12_26_9",
    "LOGRET_1",
    "PPOs_12_26_9",
    "BBB_5_2.0",
    "D_9_3",
    "OBVe_4",
    "BOP",
    "OBV_max_2",
    "volume_asset_buyer_maker",
    "FISHERTs_9_1",
    "SQZ_20_2.0_20_1.5",
    "ABER_ATR_5_15",
    "VTXM_14",
    "OBVe_12",
    "K_9_3",
    "TSIs_13_25_13",
    "volume",
    "MASSI_9_25",
    "RVGIs_14_4",
    "ZS_30",
    "PCTRET_1",
    "RVGI_14_4",
    "THERMO_20_2_0.5",
    "close_Z_30_1",
    "open_Z_30_1",
    "BEARP_13",
    "KURT_30",
    "DMP_14",
    "STOCHRSId_14_14_3_3",
    "MACDh_12_26_9",
    "RVI_14",
    "VAR_30",
    "SLOPE_1",
    "BR_26",
    "CCI_14_0.015",
    "TRUERANGE_1",
    "SMI_5_20_5",
    "COPC_11_14_10",
    "BULLP_13",
    "RSX_14",
    "VTXP_14",
    "volume_asset_buyer_taker_ratio",
    "AROOND_14",
    "AD",
    "PGO_14",
    "EBSW_40_10",
    "MAD_30",
    "STCstoch_10_12_26_0.5",
    "KST_10_15_20_30_10_10_10_15",
    "STOCHd_14_3_3",
    "KVO_34_55_13",
    "PDIST",
    "ROC_10",
    "STC_10_12_26_0.5",
    "CMO_14",
    "PVI_1",
    "PSL_12",
    "volume_asset_buyer_taker",
    "DMN_14",
    "UO_7_14_28",
    "volume_asset",
    "TRIXs_30_9",
    "PVR",
    "PPOh_12_26_9",
    "KSTs_9",
    "SKEW_30",
    "MOM_10",
    "ADOSC_3_10",
    "BIAS_SMA_26",
    "UI_14",
    "low_Z_30_1",
    "NVI_1",
    "EFI_13",
    "PVOL",
    "OBV_min_2",
    "ATRr_14",
    "VHF_28",
    "SQZPRO_20_2.0_20_2_1.5_1",
    "APO_12_26",
    "CTI_12",
    "CFO_9",
    "PVO_12_26_9",
    "trades_full",
    "RSI_14",
    "CHOP_14_1_100",
    "ADX_14",
    "AROONU_14",
    "TSI_13_25_13",
    "QS_10",
    "OBV",
    "ER_10",
]

Functions from earlier notebooks and an improved data loader

In [5]:
# custom encoder for writing numpy arrays to a json file
class JSONEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        elif isinstance(obj, np.floating):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        elif isinstance(obj, (datetime, date)):
            return obj.isoformat()
        else:
            return super(NpEncoder, self).default(obj)


# wraper on pandas concatenation catching error of concatenation of empty list and list containing none values
def safe_concat(dataframes):
    dataframes = [df for df in dataframes if df is not None]
    try:
        return pd.concat(dataframes)
    except ValueError:
        return None

#### By putting together the analyzes presented in previous notebooks, we can wrap the entire process of building a dataset into one class. Also, with very minor modifications, we can use multithreading for the most time-consuming stages - data collection and generation of technical analysis indicators and, consequently, speed up the process of generating dateset several times.

In [6]:
class OHLCDataSet:

    """Class that enables the generation of datasets,
    with we can wrap the entire process of building a dataset of Binance trading data with technical analysis.

    Example:
    ```ods = OHLCDataSet(
    pairs=[('eth', 'usdt'), ('btc', 'usdt')],
    start_date=date(2022, 3, 10),
    end_date=date(2022, 3, 13),
    train_end=date(2022, 3, 12),
    ohlc_intervals = [2, 4, 10],
    multithreading=True,
    )
    ```
    Args:
        pairs: List of trading pairs symbol.
        start_date: Date From when to collect data.
        end_date: Date by which data is collected.
        train_end. The date when we consider that the training set ends, important because some global statistics,
        such as the average, are calculated only on the basis of training data.
        base_time_offset: Time unit which constitutes the database of the data set.
        All the time intervals used must be an integer multiple of this value.
        ohlc_intervals: Defines with which time step size the time series are created,
        multiples of the base unit base_time_offset.
        drop_n_first_rows: How many leading rows are thrown from the DataFrame,
        the values may be different for different intervals
        dataset_seq_len: Defines the number of time steps that the final samples used to train the models will have.
        The length of the time series can be different for different intervals.
        drop_columns_names: The names of the columns that we do not want to use may be different for different intervals
        columns: a dictionary containing the following keys:
        monotonic_col, price_col, pair_col, zero_one_col, candle_col, other_col
        which contain assignments of particular column names to different classes of features.
        If one is not provided, or the entire dictionary is not provided,
        the assignment will be generated automatically.
        If you do not know your data well, it is better to leave it blank.
        multithreading: Use of multithreading for data retrieval and preprocessing
        transform_mode: Specifies the procedure of normalization of features not correlated with the price (often fat-tail distributions)
        The choices are ‘power’, ‘quintile’ or ‘mix’ (default).
        ‘power’ uses PowerTransformer, ‘quintile’ uses QuantileTransformer and ‘mix’ uses PowerTransformer
        unless there is an error then uses QuantileTransformer.
        binance_base_url: url for binance data, by default 'https://data.binance.vision'.
        data_dir: Where to save downloaded files, by default 'data'
        fill_gaps_limit: removes columns that have gaps longer than fill_gaps_limit, by default 5
        inverse_error_quintiles: the quintile values ​​for which the inverse transformation error is to be calculated,
        by default [q for q in np.arange(0,1,0.1)] + [0.95, 0.99, 0.999]


    """

    def __init__(
        self,
        pairs,
        start_date,
        end_date,
        train_end,
        base_time_offset=pd.tseries.offsets.Minute(),
        price_scaling_method="standardised_max_pct_change",
        ohlc_intervals=None,
        drop_n_first_rows=None,
        dataset_seq_len=None,
        drop_columns_names=None,
        columns=None,
        multithreading=False,
        keep_unchanged_df=False,
        transform_mode="mix",
        binance_base_url="https://data.binance.vision",
        data_dir="data",
        fill_gaps_limit=5,
        inverse_error_quintiles=None,
    ):
        self.inverse_error_quintiles = inverse_error_quintiles
        if self.inverse_error_quintiles is None:
            self.inverse_error_quintiles = [q for q in np.arange(0, 1, 0.1)] + [
                0.95,
                0.99,
                0.999,
            ]
        self.binance_base_url = binance_base_url
        self.data_dir = data_dir
        self.fill_gaps_limit = fill_gaps_limit
        if ohlc_intervals is None:
            ohlc_intervals = [1, 10, 60]
        for interval in ohlc_intervals[1:]:
            if interval % ohlc_intervals[0]:
                raise ValueError(
                    "successive intervals must be integers of multiples of the basic interval"
                )
        if drop_n_first_rows is None:
            drop_n_first_rows = {interval: 96 for interval in ohlc_intervals}
        if dataset_seq_len is None:
            dataset_seq_len = {interval: 30 for interval in ohlc_intervals}
        if drop_columns_names is None:
            drop_columns_names = {
                interval: [
                    "DPO_20",
                    "ICS_26",
                    "EOM_14_100000000",
                    "QQEl_14_5_4.236",
                    "QQEs_14_5_4.236",
                    "SQZPRO_NO",
                    "SQZ_NO",
                ]
                for interval in ohlc_intervals
            }
        if columns is None:
            columns = {}

        self.pairs = [asset + base for asset, base in pairs]
        self.asset_base_pairs = pairs

        self.start_date = start_date
        self.end_date = end_date
        self.train_end = train_end
        self.base_time_offset = base_time_offset
        self.price_scaling_method = price_scaling_method
        self.ohlc_intervals = ohlc_intervals
        self.drop_n_first_rows = drop_n_first_rows
        self.dataset_seq_len = dataset_seq_len
        self.drop_columns_names = drop_columns_names

        # You can provide dict that describes different type of columns, or this can be calculated from data
        self.monotonic_col = columns.get("monotonic_col")
        self.price_col = columns.get("price_col")
        # this one set of columns is an exception, it must be specified before preprocessing
        # because we use it at an early stage of data cleansing which is needed to calculated the rest of the column types
        self.pair_col = columns.get(
            "pair_col",
            [
                "HILOl_13_21",
                "HILOs_13_21",
                "PSARl_0.02_0.2",
                "PSARs_0.02_0.2",
                "QQEl_14_5_4.236",
                "QQEs_14_5_4.236",
                "SUPERTl_7_3.0",
                "SUPERTs_7_3.0",
            ],
        )
        self.time_col = [
            "HOUR_SIN",
            "HOUR_COS",
            "DAY_SIN",
            "DAY_COS",
            "WEEK_SIN",
            "WEEK_COS",
            "YEAR_SIN",
            "YEAR_COS",
        ]
        self.zero_one_col = columns.get("zero_one_col")
        self.candle_col = columns.get("candle_col")
        self.other_col = columns.get("other_col")

        # dataset parameters, set in self.make_dataset
        self.all_columns = None
        self.power_factors = None
        self.price_std = None

        self.ohlc_data = {
            pair: {
                interval: {
                    self.ohlc_intervals[0] * step: {}
                    for step in range(interval // self.ohlc_intervals[0])
                }
                for interval in self.ohlc_intervals
            }
            for pair in self.pairs
        }

        self.multithreading = multithreading
        self.keep_unchanged_df = keep_unchanged_df
        self.unchanged_df = None
        self.transform_mode = transform_mode
        # Dictionary to store the time taken by individual tasks
        self.timing = {}

    def print_lenght(self):
        i = 0
        for interval in self.ohlc_intervals:
            for step in range(interval // self.ohlc_intervals[0]):
                for pair in self.pairs:
                    print(
                        i,
                        pair,
                        interval,
                        step,
                        self.ohlc_data[pair][interval][self.ohlc_intervals[0] * step]
                        .to_numpy(dtype=np.float64)
                        .shape,
                        self.ohlc_data[pair][interval][
                            self.ohlc_intervals[0] * step
                        ].index.nunique(),
                    )
                    i += 1

    def make_dataset(self):
        # loading and basic cleaning data
        start = monotonic()
        start_make_dataset = start
        self._load_data()
        self.timing["load_time"] = monotonic() - start

        start = monotonic()
        self._clear_data()
        self.timing["clear_time"] = monotonic() - start
        start = monotonic()
        self._remove_no_variance_features()
        self.timing["remove_no_variance_time"] = monotonic() - start
        start = monotonic()
        self._remove_redundant_features()
        self.timing["remove_redundant_features_time"] = monotonic() - start

        start = monotonic()
        # organizing features by various types (various preprocessing for various types)
        self.all_columns = self.get_all_columns()
        if self.monotonic_col is None:
            self.monotonic_col = self.get_monotonic_col()
        if self.zero_one_col is None:
            self.zero_one_col = self.get_zero_one_col()
        if self.candle_col is None:
            self.candle_col = self.get_candle_col()
        self.pair_col = self.get_pairs_col()
        start_inner = monotonic()
        if self.price_col is None:
            self.price_col = self.get_price_like_col()
        self.timing["get_price_columns_time"] = monotonic() - start_inner
        self.timing["get_columns_time"] = monotonic() - start

        start = monotonic()
        self.monotonic_col = [
            col for col in self.monotonic_col if col not in self.candle_col
        ]
        self.pair_col = [col for col in self.pair_col if col not in self.candle_col]
        self.zero_one_col = [
            col for col in self.zero_one_col if col not in self.candle_col
        ]
        self.price_col = [
            col
            for col in self.price_col
            if all(
                [
                    col not in cols
                    for cols in [
                        self.monotonic_col,
                        self.pair_col,
                        self.candle_col,
                        self.zero_one_col,
                    ]
                ]
            )
        ]
        self.other_col = [
            col
            for col in self.all_columns
            if all(
                [
                    col not in cols
                    for cols in [
                        self.monotonic_col,
                        self.pair_col,
                        self.candle_col,
                        self.zero_one_col,
                        self.price_col,
                    ]
                ]
            )
        ]
        self.timing["columns_organizing_time"] = monotonic() - start
        self._drop_first_unused()
        self._trim_longer_dataframes(front_trim=False)

        self.timing["drop_unused_time"] = monotonic() - start
        if self.keep_unchanged_df:
            self.unchanged_df = copy.deepcopy(self.ohlc_data)

        start = monotonic()
        self._drop_monotonic_col()
        self.timing["drop_monotonic_time"] = monotonic() - start

        start = monotonic()
        self._normalize_candle_col()
        self.timing["normalize_candle_time"] = monotonic() - start

        start = monotonic()
        self.power_factors = self._normalize_other_col(mode=self.transform_mode)
        self.timing["normalize_other_time"] = monotonic() - start

        start = monotonic()
        self.price_std = self.get_price_std()
        self.timing["get_price_std_time"] = monotonic() - start

        start = monotonic()
        self._add_time_features()
        self.all_columns.extend(self.time_col)
        self.timing["add_time_features_time"] = monotonic() - start
        self.timing["whole_process_time"] = monotonic() - start_make_dataset

    def _process_single_data(self, pair, single_date, verbose=False):
        if verbose:
            print(pair, single_date.strftime("%Y-%m-%d"))
        try:
            day_trades = get_trades(
                pair,
                single_date.year,
                str(single_date.month).zfill(2),
                str(single_date.day).zfill(2),
                binance_base_url=self.binance_base_url,
                trades_dir=join(self.data_dir, "trades"),
            )
        except zipfile.BadZipfile:
            print("BadZipfile, probably wrong date")
            return None

        for interval in self.ohlc_intervals:
            for step in range(interval // self.ohlc_intervals[0]):
                offset = self.ohlc_intervals[0] * step
                day_ohlc = trades2ohlc(
                    day_trades,
                    resampling_frequency=interval,
                    offset=offset,
                    base_time_offset=self.base_time_offset,
                )
                self.ohlc_data[pair][interval][offset][single_date] = day_ohlc

    def _process_ohlc_ta(self, pair, interval, offset, save_to_csv=False):
        self.ohlc_data[pair][interval][offset] = merge_ohlc(
            self.ohlc_data[pair][interval][offset]
        )
        self.ohlc_data[pair][interval][offset].ta.strategy(ta.AllStrategy)

        # remove don`t used collumns
        for col in self.drop_columns_names[interval]:
            self.ohlc_data[pair][interval][offset].pop(col)

        # Some technical indicators needs some previous data points to calculate value,
        # by dropping first n rows we put more features in our dataset.
        self.ohlc_data[pair][interval][offset] = self.ohlc_data[pair][interval][
            offset
        ].iloc[self.drop_n_first_rows[interval] :]
        # remove columns that are still empty at the beginning after the set number of leading
        # rows have been dropped. Also removes columns that have gaps longer than self.fill_gaps_limit (5)
        # These infinities are a numerical error, when calculating technical indicators,
        # we want to treat them as not a number
        self.ohlc_data[pair][interval][offset].replace(
            [np.inf, -np.inf], np.nan, inplace=True
        )
        for col in self.ohlc_data[pair][interval][offset]:
            if col in self.pair_col:
                continue
            self.ohlc_data[pair][interval][offset].fillna(
                method="ffill", limit=self.fill_gaps_limit, inplace=True
            )
            if self.ohlc_data[pair][interval][offset][col].isnull().values.any():
                self.ohlc_data[pair][interval][offset].pop(col)

    def _load_data(self):
        pd_list = [
            (pair, single_date)
            for pair in self.pairs
            for single_date in daterange(self.start_date, self.end_date)
        ]
        params_list = [
            (pair, interval, self.ohlc_intervals[0] * step)
            for pair in self.pairs
            for interval in self.ohlc_intervals
            for step in range(interval // self.ohlc_intervals[0])
        ]

        if self.multithreading:
            with ThreadPoolExecutor(20) as executor:
                futures = [
                    executor.submit(self._process_single_data, *pair_data)
                    for pair_data in pd_list
                ]
                wait(futures, return_when=ALL_COMPLETED)
                futures = [
                    executor.submit(self._process_ohlc_ta, *params)
                    for params in params_list
                ]
                wait(futures, return_when=ALL_COMPLETED)
        else:
            for pair_data in pd_list:
                self._process_single_data(*pair_data)
            for params in params_list:
                self._process_ohlc_ta(*params)

    def _drop_monotonic_col(self):
        for pair in self.pairs:
            for interval in self.ohlc_intervals:
                for step in range(interval // self.ohlc_intervals[0]):
                    offset = self.ohlc_intervals[0] * step
                    for col in self.ohlc_data[pair][interval][offset]:
                        if col in self.monotonic_col:
                            self.ohlc_data[pair][interval][offset].pop(col)

    # For the same pair and the same dataframe interval, they should be the same length for all offsets.
    # Mainly due to later vectorization and performance issues
    def _trim_longer_dataframes(self, front_trim=True):
        for pair in self.pairs:
            for interval in self.ohlc_intervals[1:]:
                min_sequence_len = min(
                    [
                        len(
                            self.ohlc_data[pair][interval][
                                self.ohlc_intervals[0] * step
                            ]
                        )
                        for step in range(interval // self.ohlc_intervals[0])
                    ]
                )
                for step in range(interval // self.ohlc_intervals[0]):
                    offset = self.ohlc_intervals[0] * step
                    if len(self.ohlc_data[pair][interval][offset]) > min_sequence_len:
                        to_drop = (
                            len(self.ohlc_data[pair][interval][offset])
                            - min_sequence_len
                        )
                        if to_drop > 0:
                            if front_trim:
                                to_drop = self.ohlc_data[pair][interval][offset].index[
                                    :to_drop
                                ]
                                self.ohlc_data[pair][interval][offset].drop(
                                    index=to_drop, inplace=True
                                )
                            else:
                                to_drop = self.ohlc_data[pair][interval][offset].index[
                                    -to_drop:
                                ]
                                self.ohlc_data[pair][interval][offset].drop(
                                    index=to_drop, inplace=True
                                )

    def _clear_data(self):
        # If some pair (we allow different sets of features for different intervals) dont have some column,
        # this column also must be dropped from others dataframes
        for interval in self.ohlc_intervals:
            columns_lists = []
            for pair in self.pairs:
                for step in range(interval // self.ohlc_intervals[0]):
                    offset = self.ohlc_intervals[0] * step
                    columns_lists.append(
                        self.ohlc_data[pair][interval][offset].columns.values
                    )
            all_columns = []
            [all_columns.extend(cols) for cols in columns_lists]
            all_columns = list(set(all_columns))
            common_columns = [
                c for c in all_columns if all([c in cc for cc in columns_lists])
            ]

            for pair in self.pairs:
                for step in range(interval // self.ohlc_intervals[0]):
                    offset = self.ohlc_intervals[0] * step
                    for col in self.ohlc_data[pair][interval][offset]:
                        if col not in common_columns:
                            self.ohlc_data[pair][interval][offset].pop(col)

        self._trim_longer_dataframes()
        # Some time steps will never be used, so they can be deleted
        for pair in self.pairs:
            last_base_timestamp = self.ohlc_data[pair][self.ohlc_intervals[0]][0].index[
                -1
            ]
            first_base_timestamp = self.ohlc_data[pair][self.ohlc_intervals[0]][
                0
            ].index[0]
            first_usable_timestamp = first_base_timestamp

            for interval in self.ohlc_intervals[1:]:
                for step in range(interval // self.ohlc_intervals[0]):
                    offset = self.ohlc_intervals[0] * step
                    self.ohlc_data[pair][interval][offset] = self.ohlc_data[pair][
                        interval
                    ][offset][
                        (
                            self.ohlc_data[pair][interval][offset].index
                            + (interval - self.ohlc_intervals[0])
                            * self.base_time_offset
                            <= last_base_timestamp
                        )
                        & (
                            self.ohlc_data[pair][interval][offset].index
                            >= first_base_timestamp
                        )
                    ]
                first_usable_timestamp = max(
                    self.ohlc_data[pair][interval][self.ohlc_intervals[0]].index[0]
                    + self.dataset_seq_len[interval] * interval * self.base_time_offset,
                    first_usable_timestamp,
                )

            self.ohlc_data[pair][self.ohlc_intervals[0]][0] = self.ohlc_data[pair][
                self.ohlc_intervals[0]
            ][0][
                self.ohlc_data[pair][self.ohlc_intervals[0]][0].index
                >= first_usable_timestamp
            ]
            first_base_timestamp = self.ohlc_data[pair][self.ohlc_intervals[0]][
                0
            ].index[0]
            for interval in self.ohlc_intervals[1:]:
                for step in range(interval // self.ohlc_intervals[0]):
                    offset = self.ohlc_intervals[0] * step
                    self.ohlc_data[pair][interval][offset] = self.ohlc_data[pair][
                        interval
                    ][offset][
                        self.ohlc_data[pair][interval][offset].index
                        + self.dataset_seq_len[interval]
                        * interval
                        * self.base_time_offset
                        >= first_base_timestamp
                    ]

    def _remove_no_variance_features(self):
        for interval in self.ohlc_intervals:

            columns_lists = []
            for pair in self.pairs:
                for step in range(interval // self.ohlc_intervals[0]):
                    offset = self.ohlc_intervals[0] * step
                    columns_lists.append(
                        self.ohlc_data[pair][interval][offset].columns.values
                    )
            all_columns = []
            [all_columns.extend(cols) for cols in columns_lists]
            all_columns = list(set(all_columns))

            no_variance_columns = []
            for col in all_columns:
                col_values = []
                for pair in self.pairs:
                    for step in range(interval // self.ohlc_intervals[0]):
                        offset = self.ohlc_intervals[0] * step
                        col_values.extend(
                            self.ohlc_data[pair][interval][offset][col].unique()
                        )
                col_values = list(set(col_values))
                if len(col_values) <= 1:
                    no_variance_columns.append(col)

            for pair in self.pairs:
                for step in range(interval // self.ohlc_intervals[0]):
                    offset = self.ohlc_intervals[0] * step
                    for col in no_variance_columns:
                        self.ohlc_data[pair][interval][offset].pop(col)

        return self.ohlc_data

    def _remove_redundant_features(self):
        for interval in self.ohlc_intervals:
            cor = (
                pd.concat(
                    [
                        pd.concat(
                            [
                                self.ohlc_data[pair][interval][
                                    self.ohlc_intervals[0] * step
                                ]
                                for pair in self.pairs
                            ]
                        )
                        for step in range(interval // self.ohlc_intervals[0])
                    ]
                )
                .corr()
                .abs()
            )

            upper_tri = cor.where(np.triu(np.ones(cor.shape), k=1).astype(bool))
            to_drop = [
                column for column in upper_tri.columns if any(upper_tri[column] == 1)
            ]

            for pair in self.pairs:
                for step in range(interval // self.ohlc_intervals[0]):
                    offset = self.ohlc_intervals[0] * step
                    self.ohlc_data[pair][interval][offset].drop(
                        to_drop, axis=1, inplace=True
                    )

    def get_all_columns(self):
        all_columns = []
        [
            all_columns.extend(
                self.ohlc_data[self.pairs[0]][interval][0].columns.values
            )
            for interval in self.ohlc_intervals
        ]
        return sorted(list(set(all_columns)))

    def get_monotonic_col(self):

        monotonic_features = {}
        for interval in self.ohlc_intervals:
            for feature in self.all_columns:
                if feature in self.ohlc_data[self.pairs[0]][interval][0]:
                    is_monotonic = True
                    for pair in self.pairs:
                        for step in range(interval // self.ohlc_intervals[0]):
                            offset = self.ohlc_intervals[0] * step
                            is_monotonic = (
                                self.ohlc_data[pair][interval][offset][
                                    feature
                                ].is_monotonic
                                or self.ohlc_data[pair][interval][offset][
                                    feature
                                ].is_monotonic_decreasing
                            )
                            if not is_monotonic:
                                break
                        if not is_monotonic:
                            break
                    if is_monotonic:
                        monotonic_features.setdefault(feature, []).append(True)
                    else:
                        monotonic_features.setdefault(feature, []).append(False)

        monotonic_features = [
            feature
            for feature, is_monotonic in monotonic_features.items()
            if all(is_monotonic)
        ]
        return monotonic_features

    def get_price_like_col(self, price_col="close", price_treshold=0.55):
        price_series = pd.concat(
            [
                pd.concat(
                    [
                        pd.concat(
                            [
                                self.ohlc_data[pair][interval][offset][price_col]
                                for offset in [
                                    self.ohlc_intervals[0] * step
                                    for step in range(
                                        interval // self.ohlc_intervals[0]
                                    )
                                ]
                                if price_col in self.ohlc_data[pair][interval][offset]
                            ]
                        )
                        for pair in self.pairs
                    ]
                )
                for interval in self.ohlc_intervals
            ]
        )

        col_stats = {}
        for col in self.all_columns:
            if col.startswith("CDL_"):
                continue
            col_series = pd.concat(
                [
                    pd.concat(
                        [
                            pd.concat(
                                [
                                    self.ohlc_data[pair][interval][offset][col]
                                    if col in self.ohlc_data[pair][interval][offset]
                                    else pd.Series(
                                        [
                                            None
                                            for _ in range(
                                                len(
                                                    self.ohlc_data[pair][interval][
                                                        offset
                                                    ]
                                                )
                                            )
                                        ],
                                        name=col,
                                        dtype=float,
                                        index=self.ohlc_data[pair][interval][
                                            offset
                                        ].index,
                                    )
                                    for offset in [
                                        self.ohlc_intervals[0] * step
                                        for step in range(
                                            interval // self.ohlc_intervals[0]
                                        )
                                    ]
                                ]
                            )
                            for pair in self.pairs
                        ]
                    )
                    for interval in self.ohlc_intervals
                ]
            )
            col_stats[col] = {
                "mean": (col_series / price_series).mean(),
                "std": (col_series / price_series).std(),
            }

        price_features = [
            col
            for col in col_stats.keys()
            if abs(col_stats[col]["mean"] - 1) <= price_treshold
            and col_stats[col]["std"] <= price_treshold
        ]
        return price_features

    def get_candle_col(self):
        candle_col = []
        for col in self.all_columns:
            if col.startswith("CDL_"):
                candle_col.append(col)
        return candle_col

    def get_zero_one_col(self):
        zero_one_features = []
        for col in self.all_columns:
            if col.startswith("CDL_"):
                continue
            col_series = safe_concat(
                [
                    safe_concat(
                        [
                            safe_concat(
                                [
                                    self.ohlc_data[pair][interval][offset][col]
                                    for offset in [
                                        self.ohlc_intervals[0] * step
                                        for step in range(
                                            interval // self.ohlc_intervals[0]
                                        )
                                    ]
                                    if col in self.ohlc_data[pair][interval][offset]
                                ]
                            )
                            for pair in self.pairs
                        ]
                    )
                    for interval in self.ohlc_intervals
                ]
            )
            if col_series.isin([-1, 0, 1]).all():
                zero_one_features.append(col)
        return zero_one_features

    def get_pairs_col(self):
        pairs = [
            "HILOl_13_21",
            "HILOs_13_21",
            "PSARl_0.02_0.2",
            "PSARs_0.02_0.2",
            "QQEl_14_5_4.236",
            "QQEs_14_5_4.236",
            "SUPERTl_7_3.0",
            "SUPERTs_7_3.0",
        ]
        return [col for col in self.all_columns if col in pairs]

    def _normalize_candle_col(self):
        for interval in self.ohlc_intervals:
            for pair in self.pairs:
                for step in range(interval // self.ohlc_intervals[0]):
                    offset = self.ohlc_intervals[0] * step
                    candle_col_exist = [
                        col
                        for col in self.candle_col
                        if col in self.ohlc_data[pair][interval][offset]
                    ]
                    self.ohlc_data[pair][interval][offset][candle_col_exist] /= 100

    def _normalize_other_col(self, mode="mix"):
        if mode == "power":
            return self._power_transform_other_col()
        elif mode == "quintile":
            return self._quintile_other_col()
        elif mode == "mix":
            return self._power_quintile_transform_other_col()
        else:
            raise ValueError(f"_normalize_other_col, mode {mode} not supported")

    def _quintile_other_col(self):
        result = {}
        for interval in self.ohlc_intervals:
            other_columns = [
                c
                for c in self.ohlc_data[self.pairs[0]][interval][0].columns.values
                if c in self.other_col
            ]
            sc_quint = QuantileTransformer(
                n_quantiles=1000,
                output_distribution="normal",
                ignore_implicit_zeros=False,
                subsample=100000,
            )
            sc_quint.fit(
                pd.concat(
                    [
                        pd.concat(
                            [
                                self.ohlc_data[pair][interval][
                                    self.ohlc_intervals[0] * step
                                ].loc[self.start_date : self.train_end][other_columns]
                                for pair in self.pairs
                            ]
                        )
                        for step in range(interval // self.ohlc_intervals[0])
                    ]
                )
            )
            errors = []
            for pair in self.pairs:
                for step in range(interval // self.ohlc_intervals[0]):
                    offset = self.ohlc_intervals[0] * step
                    transformed = sc_quint.transform(
                        self.ohlc_data[pair][interval][offset][other_columns]
                    )
                    restored = sc_quint.inverse_transform(transformed)
                    original = self.ohlc_data[pair][interval][offset][
                        other_columns
                    ].to_numpy()
                    inverse_error = restored - original
                    inverse_error = np.absolute(
                        np.divide(
                            inverse_error,
                            original,
                            out=np.zeros_like(inverse_error),
                            where=original != 0,
                        )
                    )
                    errors.append(inverse_error)
                    self.ohlc_data[pair][interval][offset][other_columns] = transformed

            errors = np.concatenate(errors, axis=0)
            max_errors = errors.max(axis=0)
            errors = np.array(
                [
                    [
                        np.quantile(errors, q_val, axis=0)
                        for q_val in self.inverse_error_quintiles
                    ]
                ]
            ).T
            errors = errors.reshape((errors.shape[0], -1))

            result[interval] = {
                name: {
                    "quantiles": quantiles_,
                    "mode": "quantiles",
                    "inverse_error_quantiles": q_error,
                    "max_inverse_error": m_error,
                }
                for name, quantiles_, q_error, m_error in zip(
                    other_columns, sc_quint.quantiles_.T, errors, max_errors
                )
            }
            result[interval]["used_quintiles"] = self.inverse_error_quintiles
        return result

    def _power_quintile_transform_other_col(self):
        result = {}
        for interval in self.ohlc_intervals:
            result[interval] = {}

            other_columns = [
                c
                for c in self.ohlc_data[self.pairs[0]][interval][0].columns.values
                if c in self.other_col
            ]

            sc_quint = QuantileTransformer(
                n_quantiles=1000,
                output_distribution="normal",
                ignore_implicit_zeros=False,
                subsample=100000,
            )
            sc_power = PowerTransformer(method="yeo-johnson")

            for i, column_name in enumerate(other_columns):
                merged_data = pd.concat(
                    [
                        pd.concat(
                            [
                                self.ohlc_data[pair][interval][
                                    self.ohlc_intervals[0] * step
                                ].loc[self.start_date : self.train_end][[column_name]]
                                for pair in self.pairs
                            ]
                        )
                        for step in range(interval // self.ohlc_intervals[0])
                    ]
                )

                sc_quint.fit(merged_data)
                sc_power.fit(merged_data)

                errors = []
                if sc_power._scaler.var_[0] > 0:
                    for pair in self.pairs:
                        for step in range(interval // self.ohlc_intervals[0]):
                            offset = self.ohlc_intervals[0] * step
                            transformed = sc_power.transform(
                                self.ohlc_data[pair][interval][offset][[column_name]]
                            )
                            restored = sc_power.inverse_transform(transformed)
                            original = self.ohlc_data[pair][interval][offset][
                                [column_name]
                            ].to_numpy()
                            inverse_error = restored - original
                            inverse_error = np.absolute(
                                np.divide(
                                    inverse_error,
                                    original,
                                    out=np.zeros_like(inverse_error),
                                    where=original != 0,
                                )
                            )
                            errors.append(inverse_error)
                            self.ohlc_data[pair][interval][offset][
                                [column_name]
                            ] = transformed

                    eshapes = [e.shape for e in errors]
                    errors = np.concatenate(errors, axis=0)
                    result[interval][column_name] = {
                        "mean": sc_power._scaler.mean_,
                        "var": sc_power._scaler.var_,
                        "lambda": sc_power.lambdas_,
                        "mode": "power",
                        "inverse_error_quantiles": {
                            q_val: np.quantile(errors, q_val)
                            for q_val in self.inverse_error_quintiles
                        },
                        "max_inverse_error": errors.max(),
                    }

                else:
                    for pair in self.pairs:
                        for step in range(interval // self.ohlc_intervals[0]):
                            offset = self.ohlc_intervals[0] * step
                            transformed = sc_quint.transform(
                                self.ohlc_data[pair][interval][offset][[column_name]]
                            )
                            restored = sc_quint.inverse_transform(transformed)
                            original = self.ohlc_data[pair][interval][offset][
                                [column_name]
                            ].to_numpy()
                            inverse_error = np.absolute(restored - original)
                            inverse_error = np.divide(
                                inverse_error,
                                original,
                                out=np.zeros_like(inverse_error),
                                where=original != 0,
                            )
                            errors.append(inverse_error)
                            self.ohlc_data[pair][interval][offset][
                                [column_name]
                            ] = transformed

                    eshapes = [e.shape for e in errors]
                    errors = np.concatenate(errors, axis=0)
                    result[interval][column_name] = {
                        "quantiles": sc_quint.quantiles_,
                        "mode": "quantiles",
                        "inverse_error_quantiles": {
                            q_val: np.quantile(errors, q_val)
                            for q_val in self.inverse_error_quintiles
                        },
                        "max_inverse_error": errors.max(),
                    }

        return result

    def _power_transform_other_col(self):
        result = {}
        for interval in self.ohlc_intervals:
            other_columns = [
                c
                for c in self.ohlc_data[self.pairs[0]][interval][0].columns.values
                if c in self.other_col
            ]
            sc_power = PowerTransformer()
            sc_power.fit(
                pd.concat(
                    [
                        pd.concat(
                            [
                                self.ohlc_data[pair][interval][
                                    self.ohlc_intervals[0] * step
                                ].loc[self.start_date : self.train_end][other_columns]
                                for pair in self.pairs
                            ]
                        )
                        for step in range(interval // self.ohlc_intervals[0])
                    ]
                )
            )

            errors = []
            for pair in self.pairs:
                for step in range(interval // self.ohlc_intervals[0]):
                    offset = self.ohlc_intervals[0] * step
                    transformed = sc_power.transform(
                        self.ohlc_data[pair][interval][offset][other_columns]
                    )
                    restored = sc_power.inverse_transform(transformed)
                    original = self.ohlc_data[pair][interval][offset][
                        other_columns
                    ].to_numpy()
                    inverse_error = restored - original
                    inverse_error = np.absolute(
                        np.divide(
                            inverse_error,
                            original,
                            out=np.zeros_like(inverse_error),
                            where=original != 0,
                        )
                    )
                    errors.append(inverse_error)
                    self.ohlc_data[pair][interval][offset][other_columns] = transformed

            errors = np.concatenate(errors, axis=0)
            max_errors = errors.max(axis=0)
            errors = np.array(
                [
                    [
                        np.quantile(errors, q_val, axis=0)
                        for q_val in self.inverse_error_quintiles
                    ]
                ]
            ).T
            errors = errors.reshape((errors.shape[0], -1))

            result[interval] = {
                name: {
                    "mean": mean_,
                    "var": var_,
                    "lambda": lambda_,
                    "mode": "power",
                    "inverse_error_quantiles": q_error,
                    "max_inverse_error": m_error,
                }
                for name, mean_, var_, lambda_, q_error, m_error in zip(
                    other_columns,
                    sc_power._scaler.mean_,
                    sc_power._scaler.var_,
                    sc_power.lambdas_,
                    errors,
                    max_errors,
                )
            }
            result[interval]["used_quintiles"] = self.inverse_error_quintiles

        return result

    def get_price_std(self):

        pct_change_result = {}
        max_pct_change_result = {}
        last_pct_change_result = {}
        price_cols = ["open", "low", "high", "close", "weighted"]
        for price_col in price_cols:
            pct_change_result[price_col] = {
                interval: pd.concat(
                    [
                        pd.DataFrame().assign(
                            **{
                                str(i): self.ohlc_data[pair][interval][offset]
                                .loc[self.start_date : self.train_end][price_col]
                                .pct_change(periods=-i)
                                for i in range(1, self.dataset_seq_len[interval])
                            }
                        )
                        for pair in self.pairs
                        for offset in [
                            self.ohlc_intervals[0] * step
                            for step in range(interval // self.ohlc_intervals[0])
                        ]
                    ]
                )
                .stack()
                .std()
                for interval in self.ohlc_intervals
            }

            max_pct_change_result[price_col] = {
                interval: pd.concat(
                    [
                        pd.DataFrame().assign(
                            **{
                                str(i): self.ohlc_data[pair][interval][offset]
                                .loc[self.start_date : self.train_end][price_col]
                                .pct_change(periods=-i)
                                for i in range(1, self.dataset_seq_len[interval])
                            }
                        )
                        for pair in self.pairs
                        for offset in [
                            self.ohlc_intervals[0] * step
                            for step in range(interval // self.ohlc_intervals[0])
                        ]
                    ]
                )
                .abs()
                .max(axis=1)
                .std()
                for interval in self.ohlc_intervals
            }

            last_pct_change_result[price_col] = {
                interval: pd.concat(
                    [
                        self.ohlc_data[pair][interval][offset]
                        .loc[self.start_date : self.train_end][price_col]
                        .pct_change(periods=self.dataset_seq_len[interval] - 1)
                        for pair in self.pairs
                        for offset in [
                            self.ohlc_intervals[0] * step
                            for step in range(interval // self.ohlc_intervals[0])
                        ]
                    ]
                ).std()
                for interval in self.ohlc_intervals
            }

        result = {
            interval: {
                price_col: {
                    "pct_change": pct_change_result[price_col][interval],
                    "max_pct_change": max_pct_change_result[price_col][interval],
                    "last_pct_change": last_pct_change_result[price_col][interval],
                }
                for price_col in price_cols
            }
            for interval in self.ohlc_intervals
        }

        return result

    def _drop_first_unused(self):
        for interval in self.ohlc_intervals[1:]:
            freq = interval // self.ohlc_intervals[0]
            offsets = [
                self.ohlc_intervals[0] * step
                for step in range(interval // self.ohlc_intervals[0])
            ]
            offsets = sorted(offsets, reverse=True)[:-1]
            offsets.insert(0, 0)
            for pair in self.pairs:
                base_list_len = len(self.ohlc_data[pair][self.ohlc_intervals[0]][0])
                first_n_pos = list(
                    range(
                        base_list_len
                        - self.dataset_seq_len[self.ohlc_intervals[0]]
                        + 1,
                        base_list_len
                        - self.dataset_seq_len[self.ohlc_intervals[0]]
                        + 1
                        - freq,
                        -1,
                    )
                )
                for pos in first_n_pos:
                    offset = offsets[(pos - 1) % freq]
                    idx = -((pos + freq - 1) // freq)
                    to_drop = (
                        len(self.ohlc_data[pair][interval][offset])
                        + idx
                        - self.dataset_seq_len[interval]
                        + 1
                    )
                    self.ohlc_data[pair][interval][offset].drop(
                        index=self.ohlc_data[pair][interval][offset].index[:to_drop],
                        inplace=True,
                    )

    # The time in seconds is not a useful feature. I have assumed a few fairly standard periodicity that may be present
    def _add_time_features(self, add_timestamp=False):
        hour = 60 * 60
        day = 24 * hour
        week = 7 * day
        year = (365.2425) * day

        for interval in self.ohlc_intervals:
            for pair in self.pairs:
                for step in range(interval // self.ohlc_intervals[0]):
                    offset = self.ohlc_intervals[0] * step

                    # now we dont assume intervals shorther than 1s sow we divide by 10**9 to convert nanoseconds to seconds
                    # it should be changed if we want to use shorter interwals.
                    # we add half of base_time_offset to get time of
                    timestamp_middle = (
                        self.ohlc_data[pair][interval][offset].index.asi8
                        + offset * self.base_time_offset.nanos / 2
                    )
                    seconds = timestamp_middle / 10**9

                    self.ohlc_data[pair][interval][offset]["HOUR_SIN"] = np.sin(
                        seconds * (2 * np.pi / hour)
                    )
                    self.ohlc_data[pair][interval][offset]["HOUR_COS"] = np.cos(
                        seconds * (2 * np.pi / hour)
                    )
                    self.ohlc_data[pair][interval][offset]["DAY_SIN"] = np.sin(
                        seconds * (2 * np.pi / day)
                    )
                    self.ohlc_data[pair][interval][offset]["DAY_COS"] = np.cos(
                        seconds * (2 * np.pi / day)
                    )
                    self.ohlc_data[pair][interval][offset]["WEEK_SIN"] = np.sin(
                        seconds * (2 * np.pi / week)
                    )
                    self.ohlc_data[pair][interval][offset]["WEEK_COS"] = np.cos(
                        seconds * (2 * np.pi / week)
                    )
                    self.ohlc_data[pair][interval][offset]["YEAR_SIN"] = np.sin(
                        seconds * (2 * np.pi / year)
                    )
                    self.ohlc_data[pair][interval][offset]["YEAR_COS"] = np.cos(
                        seconds * (2 * np.pi / year)
                    )

                    if add_timestamp:
                        self.ohlc_data[pair][interval][offset][
                            "TIMESTAMP"
                        ] = timestamp_middle

    def get_ohlc_data(self):
        return self.ohlc_data

    def get_arrays(self):

        base = np.concatenate(
            [
                self.ohlc_data[pair][self.ohlc_intervals[0]][0].to_numpy(
                    dtype=np.float64
                )
                for pair in self.pairs
            ]
        )

        other_intervals = [
            np.stack(
                [
                    np.concatenate(
                        [
                            self.ohlc_data[pair][interval][
                                self.ohlc_intervals[0] * step
                            ].to_numpy(dtype=np.float64)
                            for pair in self.pairs
                        ]
                    )
                    for step in range(interval // self.ohlc_intervals[0])
                ]
            )
            for interval in self.ohlc_intervals[1:]
        ]

        return base, *other_intervals

    def get_metadata(self):

        metadata = {
            "timing": self.timing,
            "pairs": self.pairs,
            "asset_base_pairs": self.asset_base_pairs,
            "start_date": self.start_date,
            "end_date": self.end_date,
            "train_end": self.train_end,
            "base_time_offset": self.base_time_offset.__class__.__name__,
            "ohlc_intervals": self.ohlc_intervals,
            "drop_n_first_rows": self.drop_n_first_rows,
            "dataset_seq_len": self.dataset_seq_len,
            "drop_columns_names": self.drop_columns_names,
            "all_columns": self.all_columns,
            "monotonic_col": self.monotonic_col,
            "price_col": self.price_col,
            "pair_col": self.pair_col,
            "zero_one_col": self.zero_one_col,
            "candle_col": self.candle_col,
            "other_col": self.other_col,
            "dataframe_stats": {
                interval: {
                    "columns_order": self.ohlc_data[self.pairs[0]][interval][
                        0
                    ].columns.values,
                    "offsets": [
                        self.ohlc_intervals[0] * step
                        for step in range(interval // self.ohlc_intervals[0])
                    ],
                    "power_factors": self.power_factors[interval],
                    "price_std": self.price_std[interval],
                    "pairs": {
                        pair: {
                            "first_timestamps": [
                                self.ohlc_data[pair][interval][
                                    self.ohlc_intervals[0] * step
                                ].index[0]
                                for step in range(interval // self.ohlc_intervals[0])
                            ],
                            "last_timestamps": [
                                self.ohlc_data[pair][interval][
                                    self.ohlc_intervals[0] * step
                                ].index[-1]
                                for step in range(interval // self.ohlc_intervals[0])
                            ],
                            "last_train_timestamps": [
                                self.ohlc_data[pair][interval][
                                    self.ohlc_intervals[0] * step
                                ]
                                .loc[: self.train_end]
                                .index[-1]
                                for step in range(interval // self.ohlc_intervals[0])
                            ],
                            "lenghts": [
                                len(
                                    self.ohlc_data[pair][interval][
                                        self.ohlc_intervals[0] * step
                                    ]
                                )
                                for step in range(interval // self.ohlc_intervals[0])
                            ],
                            "train_steps": [
                                len(
                                    self.ohlc_data[pair][interval][
                                        self.ohlc_intervals[0] * step
                                    ].loc[: self.train_end]
                                )
                                for step in range(interval // self.ohlc_intervals[0])
                            ],
                        }
                        for pair in self.pairs
                    },
                }
                for interval in self.ohlc_intervals
            },
        }

        return metadata

    def save_arrays(self, filename):
        if len(filename.split(".")) == 1:
            filename += ".npz"
        if len(filename.split(".")) == 2 and filename.split(".")[-1] != "npz":
            raise ValueError("file name must end with .npz")
        np.savez(
            filename,
            **{str(k): v for k, v in zip(self.ohlc_intervals, self.get_arrays())},
        )

    def save_compressed_arrays(self, filename):
        if len(filename.split(".")) == 1:
            filename += ".npz"
        if len(filename.split(".")) == 2 and filename.split(".")[-1] != "npz":
            raise ValueError("file name must end with .npz")
        np.savez_compressed(
            filename,
            **{str(k): v for k, v in zip(self.ohlc_intervals, self.get_arrays())},
        )

    def save(self, filename):
        if len(filename.split(".")) == 1:
            filename += ".pkl"
        if len(filename.split(".")) == 2 and filename.split(".")[-1] != "pkl":
            raise ValueError("file name must end with .pkl")
        with open(filename, "wb") as file:
            pickle.dump(self, file)

    def save_metadata(self, filename, to_pickle=True):
        if to_pickle:
            if len(filename.split(".")) == 1:
                filename += ".pkl"
            if len(filename.split(".")) == 2 and filename.split(".")[-1] != "pkl":
                raise ValueError("file name must end with .pkl")
            with open(filename, "wb") as file:
                pickle.dump(self.get_metadata(), file)
        else:
            if len(filename.split(".")) == 1:
                filename += ".json"
            if len(filename.split(".")) == 2 and filename.split(".")[-1] != "json":
                raise ValueError("file name must end with .json")
            with open(filename, "w") as fp:
                json.dump(self.get_metadata(), fp, cls=JSONEncoder)

Now, we instantiate OHLCDataSet with custom config.
I changed pairs providing format: [('eth', 'usdt'), ('btc', 'usdt')] contains more info than ['ethusdt', 'btcusdt'],
it is easier to extract asset and base name - usefull in further encoding

In [7]:
# We can specify the exact names of the columns we want to use,
# then we will force the selection of columns and their categorization into appropriate groups.
# The dictionary can be given partially or not at all, then the features and their assignment
# to groups will be determined automatically, based on the data, this approach is the most robustnes
# columns={
#     'monotonic_col': MONOTONIC_COL,
#     'price_col': PRICE_COL,
#     'pair_col': PAIR_COL,
#     'zero_one_col': ZERO_ONE_COL,
#     'candle_col': CANDLE_COL,
#     'other_col': OTHER_COL,
# }
columns = None

ods = OHLCDataSet(
    pairs=[("eth", "usdt"), ("btc", "usdt"), ("doge", "usdt")],
    start_date=date(2022, 4, 1),
    end_date=date(2022, 4, 30),
    train_end=date(2022, 4, 25),
    ohlc_intervals=[10, 20, 60],
    multithreading=True,
    keep_unchanged_df=True,
    transform_mode="mix",
    base_time_offset=pd.tseries.offsets.Minute(),
    columns=columns,
)

In [None]:
# download data and prepare dataset
res = ods.make_dataset()

In [9]:
pprint(ods.timing)

{'add_time_features_time': 0.037396013001853134,
 'clear_time': 0.10598202200344531,
 'columns_organizing_time': 0.0003433620004216209,
 'drop_monotonic_time': 0.07275125600062893,
 'drop_unused_time': 0.034627368000656134,
 'get_columns_time': 1.1785183839965612,
 'get_price_columns_time': 0.47894473200358334,
 'get_price_std_time': 3.578840794001735,
 'load_time': 69.07267884699831,
 'normalize_candle_time': 0.16356760100097745,
 'normalize_other_time': 11.04634865999833,
 'remove_no_variance_time': 1.1368558080030198,
 'remove_redundant_features_time': 2.9867632950008556,
 'whole_process_time': 89.44456547699883}


For features characterized by a log normal or other fat-tail distribution, we use poweer transform (Yeo-Johnson transformation) or the quintile transformation. During these transformations, the error of restoring the original values is checked. Checking this error helps to make sure that the transformation was successful. We can see that the error for 0.9 quintiles are very small, so a significant number of features are mapped with very high accuracy, however, there are outliers, most often caused by the quintile transformation which is strictly based on data and badly reproduces data beyond the data fit range. We have such a situation when we have values ​​in the test set that exceed the range of the training set. Statistically, the larger the training set we have, the lower the probability of large errors.

In [10]:
# The average, highest and lowest 99% quintile of inverse error for the analyzed features
quintile = 0.99

average = {
    interval: sum(
        [
            d["inverse_error_quantiles"][quintile]
            for d in ods.get_metadata()["dataframe_stats"][interval][
                "power_factors"
            ].values()
        ]
    )
    / len(
        [
            d["inverse_error_quantiles"][quintile]
            for d in ods.get_metadata()["dataframe_stats"][interval][
                "power_factors"
            ].values()
        ]
    )
    for interval in ods.ohlc_intervals
}

highest = {
    interval: max(
        [
            d["inverse_error_quantiles"][quintile]
            for d in ods.get_metadata()["dataframe_stats"][interval][
                "power_factors"
            ].values()
        ]
    )
    for interval in ods.ohlc_intervals
}

lowest = {
    interval: min(
        [
            d["inverse_error_quantiles"][quintile]
            for d in ods.get_metadata()["dataframe_stats"][interval][
                "power_factors"
            ].values()
        ]
    )
    for interval in ods.ohlc_intervals
}

print(f"average 99% quintile of inverse error for the analyzed features: \n{average}")
print(f"highest 99% quintile of inverse error for the analyzed features: \n{highest}")
print(f"lowest 99% quintile of inverse error for the analyzed features: \n{lowest}")

average 99% quintile of inverse error for the analyzed features: 
{10: 0.00018689653831357233, 20: 0.0001222590640398664, 60: 0.00032492385173927017}
highest 99% quintile of inverse error for the analyzed features: 
{10: 0.02017959183673483, 20: 0.014285714285714235, 60: 0.03846153846153853}
lowest 99% quintile of inverse error for the analyzed features: 
{10: 0.0, 20: -0.0, 60: 0.0}


In [11]:
# The average, highest and lowest 99% quintile of inverse error for the analyzed features
quintile = 0.99

average = {
    interval: sum(
        [
            d["inverse_error_quantiles"][quintile]
            for d in ods.get_metadata()["dataframe_stats"][interval][
                "power_factors"
            ].values()
        ]
    )
    / len(
        [
            d["inverse_error_quantiles"][quintile]
            for d in ods.get_metadata()["dataframe_stats"][interval][
                "power_factors"
            ].values()
        ]
    )
    for interval in ods.ohlc_intervals
}

highest = {
    interval: max(
        [
            d["inverse_error_quantiles"][quintile]
            for d in ods.get_metadata()["dataframe_stats"][interval][
                "power_factors"
            ].values()
        ]
    )
    for interval in ods.ohlc_intervals
}

lowest = {
    interval: min(
        [
            d["inverse_error_quantiles"][quintile]
            for d in ods.get_metadata()["dataframe_stats"][interval][
                "power_factors"
            ].values()
        ]
    )
    for interval in ods.ohlc_intervals
}

print(f"average 99% quintile of inverse error for the analyzed features: \n{average}")
print(f"highest 99% quintile of inverse error for the analyzed features: \n{highest}")
print(f"lowest 99% quintile of inverse error for the analyzed features: \n{lowest}")

average 99% quintile of inverse error for the analyzed features: 
{10: 0.00018689653831357233, 20: 0.0001222590640398664, 60: 0.00032492385173927017}
highest 99% quintile of inverse error for the analyzed features: 
{10: 0.02017959183673483, 20: 0.014285714285714235, 60: 0.03846153846153853}
lowest 99% quintile of inverse error for the analyzed features: 
{10: 0.0, 20: -0.0, 60: 0.0}


In [12]:
# the highest inverse error for the analyzed features
highest_error = {
    interval: sorted(
        [
            (k, v["max_inverse_error"])
            for k, v in ods.get_metadata()["dataframe_stats"][interval][
                "power_factors"
            ].items()
        ],
        key=lambda x: x[1],
        reverse=True,
    )[:10]
    for interval in ods.ohlc_intervals
}
print("highest inverse error for the analyzed features:\n")
pprint(highest_error)

highest inverse error for the analyzed features:

{10: [('BBP_5_2.0', 1.0),
      ('SQZ_20_2.0_20_1.5', 1.0),
      ('STOCHRSIk_14_14_3_3', 1.0),
      ('STOCHRSId_14_14_3_3', 0.8461538461538461),
      ('MFI_14', 0.10816523075392998),
      ('QS_10', 0.03896103896103894),
      ('CFO_9', 0.005336000000000043),
      ('NVI_1', 0.0020398571832649704),
      ('BBB_5_2.0', 0.001877999999993626),
      ('high_Z_30_1', 0.001444673852413607)],
 20: [('SQZ_20_2.0_20_1.5', 1.0),
      ('STOCHRSIk_14_14_3_3', 1.0),
      ('STOCHRSId_14_14_3_3', 0.02570093457943925),
      ('QS_10', 0.025641025641025664),
      ('low_Z_30_1', 0.006513114328130283),
      ('CFO_9', 0.0040239999999998844),
      ('PGO_14', 0.001351068766824989),
      ('BBB_5_2.0', 0.0012880000000014865),
      ('high_Z_30_1', 0.0006254755957285614),
      ('close_Z_30_1', 0.0005943526922971583)],
 60: [('APO_12_26', 1.0),
      ('QS_10', 1.0),
      ('SQZ_20_2.0_20_1.5', 1.0),
      ('STOCHRSIk_14_14_3_3', 0.5000000000000001),
  

In fact, only a few features generate large inverse errors. We save the transformation parameters in the metadata so that at a later stage, if we want, we can reject features with a high maximum error or a selected quintile.

We can save the prepared dataset by casting it only to numpy arrays or their compressed version, we can also save the entire class object as a pickle file or save the metadata to a json file.

In [13]:
# save whole instance
ods.save("instance")
# save metadata
ods.save_metadata("metadata")
# convert dataframes to numpy arrays and save it ~ 19MB, for this example
ods.save_arrays("arrays")
# compressed variant ~ 12MB, for this example
ods.save_compressed_arrays("arrays_compressed")

In [None]:
# Print dataset metadata
pprint(ods.get_metadata())