# Data Cleanup for Various Future Datafeeds

# Notebook's Environment

In [None]:
INSTALL_DEPS = False
if INSTALL_DEPS:
  %pip installnumpy==1.26.4
  %pip installpandas==2.2.1
  %pip installpandas_market_calendars==4.4.0
  %pip installpytz==2024.1
  %pip installscipy==1.12.0

!python --version

# Cloud Environment Setup

In [None]:
import os
import sys
import warnings
import pandas as pd
import numpy as np
import glob
from datetime import datetime
from pandas.tseries.offsets import BDay, Day
import pandas_market_calendars as mcal
import matplotlib.pyplot as plt
import seaborn as sns

from tqdm import tqdm

warnings.filterwarnings("ignore")

CLEAN_DATA_PATH = f".{os.sep}data"
DATA_PATH = f".{os.sep}data{os.sep}unstructureddata"
print("running localhost!")

# Get Data

In [None]:
class StockFeat:
    DATETIME = "Datetime"
    OPEN = "Open"
    HIGH = "High"
    LOW = "Low"
    CLOSE = "Close"
    VOLUME = "Volume"
    list = [OPEN, HIGH, LOW, CLOSE, VOLUME]

class YFinanceOptions:
    INDEX = "Datetime"
    MIN1_RANGE = 7 - 1
    MIN15_RANGE = 60 - 1
    HOUR_RANGE = 730 - 1
    DAY_RANGE = 7300 - 1
    D1="1d"
    H1="1h"
    M15="15m"
    M1="1m"
    DATE_TIME_FORMAT = "%Y-%m-%d"
    DATE_TIME_HRS_FORMAT = '%Y-%m-%d %H:%M:%S %Z'

INTERVAL = YFinanceOptions.M15
DATE_TIME_FORMAT = "%Y-%m-%d"
END_DATE = pd.Timestamp(datetime.now() - Day(1)).strftime(DATE_TIME_FORMAT)
START_DATE =  pd.Timestamp(datetime.now() - Day(YFinanceOptions.MIN15_RANGE)).strftime(DATE_TIME_FORMAT)

SNP_FUT = "ES=F"
NSDQ_FUT = "NQ=F"
GOLD_FUT = "GC=F"
CRUDOIL_FUT="CL=F"
VOLATILITY_FUT= "^VIX"
RUS_FUT = "RTY=F"
RATES_FUT = "2YY=F"

CORN_FUT = "ZC=F"
SOYOIL_FUT = "ZL=F"
KCWHEAT_FUT = "KE=F"
SOYBEAN_FUT = "ZS=F"
SOYBEANMEAL_FUT = "ZM=F"
WHEAT_FUT = "ZW=F"
LIVECATTLE_FUT = "LE=F"
LEANHOG_FUT = "HE=F"
FEEDERCATTLE_FUT = "GF=F"
MILK_FUT = "DC=F"

TICKER_SYMBOLS = [RATES_FUT, VOLATILITY_FUT, GOLD_FUT, CRUDOIL_FUT, SNP_FUT, RUS_FUT, CORN_FUT, SOYOIL_FUT, KCWHEAT_FUT, SOYBEAN_FUT, SOYBEANMEAL_FUT, WHEAT_FUT, LIVECATTLE_FUT, LEANHOG_FUT, FEEDERCATTLE_FUT, MILK_FUT]

In [None]:
import yfinance as yf
from scipy.stats import skew, kurtosis

def get_yf_tickers_df(tickers_symbols, start, end, interval=INTERVAL, datadir=DATA_PATH):
    tickers = {}
    earliest_end= pd.to_datetime(datetime.strptime(end,YFinanceOptions.DATE_TIME_FORMAT)).tz_localize("UTC")
    latest_start = pd.to_datetime(datetime.strptime(start,YFinanceOptions.DATE_TIME_FORMAT)).tz_localize("UTC")
    os.makedirs(datadir, exist_ok=True)
    for symbol in tickers_symbols:
        cached_file_path = f"{datadir}/{symbol}-{start.split(' ')[0]}-{end.split(' ')[0]}-{interval}.csv"
        print(f"Checking file: {cached_file_path}")
        if os.path.exists(cached_file_path):
            print(f"loading from {cached_file_path}")
            df = pd.read_csv(cached_file_path, parse_dates= True, index_col=0)
            try:
                df.index = pd.to_datetime(df.index).tz_localize('US/Central').tz_convert('UTC')
            except Exception as e:
                df.index = pd.to_datetime(df.index).tz_convert('UTC')
            assert len(df) > 0, "Empty data"
        else:
            df = yf.download(
                symbol,
                start=start,
                end=end,
                progress=True,
                interval=interval
            )
            assert len(df) > 0, "No data pulled"
            try:
                df.index = pd.to_datetime(df.index).tz_localize('US/Central').tz_convert('UTC')
            except Exception as e:
                df.index = pd.to_datetime(df.index).tz_convert('UTC')
        min_date = df.index.min()
        max_date = df.index.max()
        nan_count = df["Close"].isnull().sum()
        skewness = round(skew(df["Close"].dropna()), 2)
        kurt = round(kurtosis(df["Close"].dropna()), 2)
        outliers_count = (df["Close"] > df["Close"].mean() + (3 * df["Close"].std())).sum()
        print(
            f"{symbol} => min_date: {min_date}, max_date: {max_date}, kurt:{kurt}, skewness:{skewness}, outliers_count:{outliers_count},  nan_count: {nan_count}"
        )
        tickers[symbol] = df

        if min_date > latest_start:
            latest_start = min_date
        if max_date < earliest_end:
            earliest_end = max_date

    nyse = mcal.get_calendar('CME_Agriculture')
    schedule = nyse.schedule(start_date=latest_start, end_date=earliest_end)
    all_trading_days = mcal.date_range(schedule, frequency='1T')

    for symbol, df in tickers.items():
        df_filtered = df[(df.index >= latest_start) & (df.index <= earliest_end)]
        df_reindexed = df_filtered.reindex(all_trading_days, method='nearest')
        df_reindexed.index = pd.to_datetime(df_reindexed.index)
        df_reindexed = df_reindexed[~df_reindexed.index.duplicated(keep='first')]
        df_reindexed.index.name = 'Date'
        tickers[symbol] = df_reindexed


        cached_file_path = f"{datadir}/{symbol}-{start.split(' ')[0]}-{end.split(' ')[0]}-{interval}.csv"
        if not os.path.exists(cached_file_path):
            df_reindexed.to_csv(cached_file_path, index=True)

    return tickers, latest_start, earliest_end

tickers, latest_start, earliest_end = get_yf_tickers_df(TICKER_SYMBOLS, start=START_DATE, end=END_DATE)

# Clean Data

In [None]:
def get_prefix(filename):
    prefix = filename.split(os.sep)[-1].split("-")[0].replace("=F", "")
    return prefix


files = glob.glob(DATA_PATH + f"{os.sep}*-{INTERVAL}.csv")
assert files and len(files) > 0

fut_tickers = []
df_list = []
data_quality_metrics = []

for f in files:
    print(f)
    prefix = get_prefix(f)
    fut_tickers.append(prefix)
    df_temp = pd.read_csv(f, index_col="Date", parse_dates=True)
    # TODO: Use bid ask here.
    df_temp["Spread"] = df_temp["High"] - df_temp["Low"]

    df_temp.columns = [prefix + "_" + col for col in df_temp.columns]
    df_temp = df_temp.apply(
        pd.to_numeric, errors="coerce"
    )  # Force conversion to numeric and coerce errors to NaN


    df_list.append(df_temp)
    for col in tqdm(df_temp.columns):
        metrics = {
            "Column": col,
            "Total NaNs": df_temp[col].isnull().sum(),
            "Skewness": df_temp[col].skew(),
            "Kurtosis": df_temp[col].kurtosis(),
            "Mean": df_temp[col].mean(),
            "Standard Deviation": df_temp[col].std(),
            "Min": df_temp[col].min(),
            "Max": df_temp[col].max(),
        }
        data_quality_metrics.append(metrics)

In [None]:
futs_df = pd.concat(df_list, axis=1)
try:
    futs_df.index = futs_df.index.tz_localize("GMT")
except Exception as e:
    print(e)
    # Probably already TZ aware
futs_df.sort_index(inplace=True)
futs_df = futs_df.iloc[futs_df.notnull().all(axis=1).argmax() :]
futs_df.interpolate(method="time", inplace=True)

assert not futs_df.isnull().any().any()
print(fut_tickers)
print(f"Dataset Shape: {futs_df.shape}")

futs_df.head(2)

In [None]:
PRICE_COLS =["Close", "Open", "High", "Low", "Spread"]
def create_fut_rets_df(df, price_types = PRICE_COLS):
    log_returns_data = {}
    pct_change_data = {}

    for price_type in tqdm(price_types):
        for column in tqdm(df.columns):
            if price_type in column:
                log_return_col_name = f"{column}_lr"
                pct_change_col_name = f"{column}_pc"
                log_returns_data[log_return_col_name] = np.log(
                    df[column] / df[column].shift(1)
                )
                pct_change_data[pct_change_col_name] = df[column].pct_change() * 100
    log_fut_rets_df = pd.DataFrame(log_returns_data, index=df.index).bfill()
    pct_changes_df = pd.DataFrame(pct_change_data, index=df.index).bfill()
    combined_df = pd.concat([log_fut_rets_df, pct_changes_df], axis=1)

    return combined_df

fut_rets_df = create_fut_rets_df(futs_df)
fut_rets_df.head(2)

# EDA

## Skew, Kurosis and Outliers

In [None]:
data_to_plot = fut_rets_df.filter(regex="^.*Close.*")
plt.figure(figsize=(22, 12))
ax = sns.violinplot(data=data_to_plot, orient='h', scale='width')

plt.title("Violin Plot of Returns")
plt.show()

In [None]:
metrics_df = pd.DataFrame(data_quality_metrics)
metrics_df

## Data Mining for Correlations

Current findings:
| Pairing, lags               | Correlation | P-Value       |
|-----------------------------|-------------|---------------|
| (ES_Close, GC_Close), 1     | 0.016987    | 1.152765e-08  |
| (ES_Close, NQ_Close), 1     | 0.016356    | 3.919657e-08  |
| (GC_Close, NQ_Close), 1     | 0.014338    | 1.459589e-06  |
| (CL_Close, GC_Close), 1     | 0.006259    | 3.551049e-02  |
| (2YY_Close, ES_Close), 5    | -0.008258   | 5.538042e-03  |
| (2YY_Close, GC_Close), 5    | -0.009686   | 1.139459e-03  |
| (2YY_Close, NQ_Close), 5    | -0.009862   | 9.234257e-04  |
| (2YY_Close, NQ_Close), 1    | -0.033188   | 7.051777e-29  |
| (2YY_Close, ES_Close), 1    | -0.033695   | 1.026262e-29  |
| (2YY_Close, GC_Close), 1    | -0.039427   | 4.567548e-40  |


In [None]:
from statsmodels.tsa.stattools import adfuller
from scipy.stats import pearsonr
from itertools import combinations

LAGS_IN_MINS = [1, 5] # [1, 5, 15, 60, 240, 480]
MAX_DIFF = 2


def make_stationary(series, max_diff=MAX_DIFF):
    result = adfuller(series.dropna(), autolag="AIC")
    p_value = result[1]
    if p_value < 0.05:
        return series, 0

    diff_count = 0
    differenced_series = series.copy()
    while p_value >= 0.05 and diff_count < max_diff:
        differenced_series = differenced_series.diff().dropna()
        result = adfuller(differenced_series, autolag="AIC")
        p_value = result[1]
        diff_count += 1
        if p_value < 0.05:
            break

    return differenced_series, diff_count


def calculate_correlations(data, lags=LAGS_IN_MINS):
    correlations = {}
    rets = data.filter(regex="(_Close(_lr|_pc)?)$")
    for col1, col2 in tqdm(combinations(rets.columns, 2), desc="calculate_correlations"):
        series1, _ = make_stationary(rets[col1])
        series2, _ = make_stationary(rets[col2])

        for lag in tqdm(lags, desc=f'calculate_correlations {col1} vs {col2}'):
            lagged_series2 = series2.shift(lag).dropna()
            truncated_series1 = series1.iloc[lag:]

            if len(truncated_series1) == 0 or len(truncated_series1) != len(lagged_series2):
                continue

            correlation, p_value = pearsonr(truncated_series1, lagged_series2)
            if p_value < 0.05:
                correlations[((col1, col2), lag)] = (correlation, p_value)

    return correlations

GET_CORR = False
if GET_CORR:
    # Also takes time
    correlation_results = calculate_correlations(futs_df)
    if len(correlation_results) > 0:
        correlation_df = pd.DataFrame.from_dict(
            correlation_results, orient="index", columns=["Correlation", "P-Value"]
        )
        correlation_df = correlation_df.sort_values(by="Correlation", ascending=False)

        print(correlation_df)
    else:
        print("No meaningful coorelations")

## Autocorrelation

Current results:

| Series     | Lag | Coefficient | P-Value          |
|------------|-----|-------------|------------------|
| NQ_High    | 1   | 0.999971    | 0.000000e+00     |
| NQ_Low     | 1   | 0.999969    | 0.000000e+00     |
| NQ_Open    | 1   | 0.999967    | 0.000000e+00     |
| NQ_Close   | 1   | 0.999967    | 0.000000e+00     |
| NQ_Volume  | 1   | 0.821516    | 0.000000e+00     |
| NQ_Spread  | 1   | 0.727226    | 0.000000e+00     |
| NQ_Open    | 5   | -0.017075   | 2.064916e-27     |
| NQ_Close   | 5   | -0.017068   | 8.802106e-30     |
| NQ_High    | 5   | -0.019068   | 7.274848e-44     |
| NQ_Volume  | 5   | 0.116937    | 0.000000e+00     |
| NQ_Spread  | 5   | 0.139868    | 0.000000e+00     |
| NQ_Low     | 5   | NaN         | NaN              |


In [None]:
from statsmodels.tsa.arima.model import ARIMA

def get_ar_stats(df):
    # this to avoid the ARIMA warnings
    df.index = pd.DatetimeIndex(df.index).to_period('T')

    def test_autoregression(series, lags=LAGS_IN_MINS):
        results = {}
        for lag in tqdm(lags):
            try:
                model = ARIMA(series, order=(lag, 0, 0))
                fitted_model = model.fit()
                coef = fitted_model.params.get(f'ar.L{lag}')
                p_value = fitted_model.pvalues.get(f'ar.L{lag}')
                if p_value < 0.05:
                    results[lag] = {'Coefficient': coef, 'P-Value': p_value}
                else:
                    raise Exception("Not AR")
            except Exception as e:
                results[lag] = {'Coefficient': None, 'P-Value': None}
        return results

    ar_results = {}
    for col in tqdm(df.columns, desc="get_ar_stats"):
        ar_results[col] = test_autoregression(df[col].dropna())

    ar_df = pd.DataFrame.from_dict({(i, j): ar_results[i][j]
                                    for i in ar_results.keys()
                                    for j in ar_results[i].keys()},
                                orient='index')
    return ar_df

if GET_CORR:
    # This takes time
    ar_df = get_ar_stats(futs_df.filter(regex='^NQ'))
    ar_df

## Cummulative Returns Visualizations

In [None]:
for c in fut_rets_df.columns:
    if c.endswith("_Close_pc"):
        fut_rets_df[c + "_c"] = fut_rets_df[c].cumsum()

# Plotting
plt.figure(figsize=(22, 12))
for c in fut_rets_df.columns:
    if c.endswith("_Close_pc_c"):
        plt.plot(fut_rets_df.index, fut_rets_df[c], label=c.replace("_Close_pc_c", ""), alpha=0.65)
plt.title("% Cummulative")
plt.legend()
plt.show()

## Distributions

In [None]:
import scipy.stats
from scipy.stats import norm

def plot_norm(df):
    columns = df.filter(regex="(_Close(_lr)?)$").columns
    n_cols = len(columns)
    n_cols_adjusted = n_cols // 2 + (n_cols % 2)
    fig, axes = plt.subplots(nrows=n_cols_adjusted // 2, ncols=n_cols_adjusted //2, figsize=(22, 18))

    axes = axes.flatten()
    plt.title("Log Returns Gaussian Fit")
    for ax, column in zip(axes, columns):
        data = df[column].dropna()
        sns.kdeplot(
            data,
            fill=True,
            common_norm=False,
            alpha=0.5,
            linewidth=0.5,
            ax=ax
        )

        params = norm.fit(data)
        ks_stat, ks_p_value = scipy.stats.kstest(data, norm.name, args=params)

        mu, std = params
        xmin, xmax = ax.get_xlim()

        x = np.linspace(xmin, xmax, 100)
        p = norm.pdf(x, mu, std)
        ax.plot(x, p, 'k', linewidth=2)
        title = f"{column} Gaussian Fit (ks {ks_stat:.02f}, ks_p {ks_p_value:.02f}) \nMean = {mu:.04f}, Std = {std:.04f}"
        ax.set_title(title)
        ax.set_xlabel("Log Return Value")
        ax.set_ylabel("Density")

    for ax in axes[len(columns):]:
        ax.set_visible(False)


    plt.tight_layout()
    plt.show()

plot_norm(fut_rets_df)

# Save For Models

In [None]:
futs_df.to_csv(CLEAN_DATA_PATH + f"{os.sep}futures_1min_data.csv")