In [34]:
import numpy as np
import pandas as pd
from pandas import DataFrame as df
import datetime
import math
from itertools import chain
import boto3
import sagemaker as sm
import pyarrow.parquet as pq
import s3fs

# feature engineering, targets creation

In [35]:
def add_spread(df):
    df['spread'] = df.high - df.low
    return df
def add_weekly_cat(df):
    day_cos, day_sin = list(), list()
    for date in df.index:
        day = datetime.datetime.strptime(date, '%Y-%m-%d').weekday()
        radians = 2*math.pi*day/6
        day_cos.append(math.cos(radians))
        day_sin.append(math.sin(radians))
    df['week_cos'] = day_cos
    df['week_sin'] = day_sin
    return df
def add_yearly_cat(df):
    day_cos, day_sin = list(), list()
    for date in df.index:
        day = datetime.datetime.strptime(date, '%Y-%m-%d').timetuple().tm_yday
        radians = 2*math.pi*day/365
        day_cos.append(math.cos(radians))
        day_sin.append(math.sin(radians))
    df['year_cos'] = day_cos
    df['year_sin'] = day_sin
    return df
def add_percent_changes(df, metrics=None, periods=None, shifts=None): ## pure!!!
    d = dict()
    if metrics is None:
        metrics=df.columns
    for shift_len in shifts:
        for metric in metrics:
            for period in periods:
                percents = df[metric].shift(shift_len).pct_change(periods=period)
                d[f'{metric}_percent_change_percent_lag_{period}_shift_{shift_len}']=percents
    df1 = pd.concat((df, pd.DataFrame(d)), axis=1)
    return df1
def add_stats(df, lengths=None, metrics=None, shifts=None, quantiles=None): ## pure!!!
    d = dict()
    if metrics is None:
        metrics=df.columns
    for shift_len in shifts:
        for metric in metrics:
            for length in lengths:
                win = df[metric].shift(shift_len).rolling(length, min_periods=4)
                low = win.min()
                high = win.max()
                d.update({
                    f'{metric}_window_{length}_shift_{shift_len}_mean': win.mean(),
                    f'{metric}_window_{length}_shift_{shift_len}_std' : win.std(),
                    f'{metric}_window_{length}_shift_{shift_len}_skew' : win.skew(),
                    f'{metric}_window_{length}_shift_{shift_len}_kurtosis' : win.kurt(),
                    f'{metric}_window_{length}_shift_{shift_len}_high' : high,
                    f'{metric}_window_{length}_shift_{shift_len}_low' : low,
                    f'{metric}_window_{length}_shift_{shift_len}_spread' : high.subtract(low),
                    **dict(zip([f'{metric}_window_{length}_shift_{shift_len}_quantile_{n:.2f}' for n in quantiles], [win.quantile(round(n,2)) for n in quantiles]))}) ## cycles through each quantile and zips a dictionary from it, then expands it
    df1 = pd.concat((df, pd.DataFrame(d)), axis=1)
    return df1
def add_correlations(df): ## TODO
    return df 
def add_percent_changes_between_windows(df): ## can be done by calling standard add_percent_changes on the metrics generated by stats. does not need own function
    pass 
def add_features(df, 
                 standard_metrics=['open','close','high','spread','low','volume'], 
                 win_lens = [10,40,70,180, 360],
                 period_offsets = [*range(1,7),*range(10,131,30)],
                 shifts = [*range(0, 61, 15)],
                 quantiles = [*np.arange(0.05,.25, 0.10), *np.arange(0.80,1.0, 0.05)]
                ):
    features_df = df\
    .pipe(add_spread)\
    .pipe(add_stats, lengths=win_lens, metrics=standard_metrics, shifts=shifts, quantiles=quantiles)\
    .pipe(add_percent_changes, metrics=standard_metrics, periods=period_offsets, shifts=shifts)\
    .pipe(add_weekly_cat)\
    .pipe(add_yearly_cat)
    return features_df
def add_percent_change_1_month_window_3_months_ahead(): ## TODO
    return df
def add_targets(df,
                standard_metrics=['open','close','high','spread','low'], 
                win_lens = [*range(7,30,7)], 
                shifts = [*range(-180,-14, 14)],
                quantiles = [*np.arange(0.05,.25, 0.10), *np.arange(0.80,1.0, 0.05)],
                percent_metrics=['open','close','high','spread','low'],
                percent_change_shifts = [*range(-180,-14, 14)], 
                period_offsets = [*range(1,10),*range(10,101,20)]
                
                ):
    targets_df = df.pipe(add_stats, lengths=win_lens, metrics=standard_metrics, shifts=shifts, quantiles=quantiles)\
    .pipe(add_percent_changes, metrics=percent_metrics, periods=period_offsets, shifts=percent_change_shifts)
    return targets_df 

In [36]:
# 3 s ± 15.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) features 

# testing

In [37]:
# standard_metrics=['low']
# win_lens = [20] 
# period_offsets = [90]
# shifts = [-90]
# percent_change_shifts = [0]
# quantiles = [0.2]
# percent_metrics=[f'{standard_metrics[0]}_window_{win_lens[0]}_shift_{shifts[0]}_mean']


# df = pd.read_parquet('temp.parquet')
# # df = add_spread(df)
# features_df = df.pipe(add_features)
# targets_df = df.pipe(add_targets, win_lens=win_lens, standard_metrics=standard_metrics, percent_metrics=percent_metrics, period_offsets=period_offsets, shifts=shifts, percent_change_shifts=percent_change_shifts, quantiles=quantiles)
# df = pd.concat([targets_df, features_df], keys=['targets', 'features'], axis=1)
# target_col = targets_df['low_window_20_shift_-90_mean_percent_change_percent_lag_90_shift_0']
# df_algo = pd.concat([target_col, features_df], axis=1)
# display('df shape', df.shape, 'no null shapes', df.dropna(axis=1).shape, df.dropna(axis=0).shape)
# display('targets info', df.targets.info(), 'features info', df.features.info())
# display('nulls total', df.isna().sum().describe())
# display('total df info', df.info(), df.describe(), df)
# display(df_algo)

# file manipulation

In [54]:
def main():
    ##constants
    standard_metrics=['low']
    win_lens = [20] 
    period_offsets = [90]
    shifts = [-90]
    percent_change_shifts = [0]
    quantiles = [0.2]
    percent_metrics=[f'{standard_metrics[0]}_window_{win_lens[0]}_shift_{shifts[0]}_mean']
    ##code
    df = pd.read_parquet('temp.parquet')
    features_df = df.pipe(add_features)
    targets_df = df.pipe(add_targets, win_lens=win_lens, standard_metrics=standard_metrics, percent_metrics=percent_metrics, period_offsets=period_offsets, shifts=shifts, percent_change_shifts=percent_change_shifts, quantiles=quantiles)
    target_col = targets_df['low_window_20_shift_-90_mean_percent_change_percent_lag_90_shift_0']
    df_algo = pd.concat([target_col, features_df], axis=1)
    df_algo.dropna(axis=0, inplace=True)
    df_algo.to_csv('xgboost_demo.csv', header=False)

def transform_data_to_s3():
    bucket = sm.Session().default_bucket()
    subfolder='stock-data-raw'
    s3c = boto3.client('s3')
    s3f = s3fs.S3FileSystem()
    f_names = s3f.ls(f'{bucket}/{subfolder}')
    new_folder = 'stock_data_raw_with_3_mo_target'
    for i, file_name in enumerate(f_names):
        df = pq.ParquetDataset(f's3://{file_name}', filesystem=s3f).read_pandas().to_pandas()
        target_added_df = main_target_add(df)
        target_added_df.to_csv('temp.csv')
        new_file_name = file_name.split('/')[-1].replace('.parquet', '_with_3_mo_target.csv')
        new_file_path = '/'.join([new_folder, new_file_name])
        s3c.upload_file('temp.csv', bucket, new_file_path)
        if i%100==0:
            display(i)
def main_target_add(df):
    ##constants
    standard_metrics=['low']
    win_lens = [20] 
    period_offsets = [90]
    shifts = [0]
    percent_change_shifts = [-90]
    quantiles = [0.2]
    percent_metrics=[f'{standard_metrics[0]}_window_{win_lens[0]}_shift_{shifts[0]}_mean']
    ##code
    targets_df = df.pipe(add_targets, win_lens=win_lens, standard_metrics=standard_metrics, percent_metrics=percent_metrics, period_offsets=period_offsets, shifts=shifts, percent_change_shifts=percent_change_shifts, quantiles=quantiles)
    target_col = targets_df[f'{percent_metrics[0]}_percent_change_percent_lag_{period_offsets[0]}_shift_{percent_change_shifts[0]}']
    df_algo = pd.concat([target_col, df], axis=1)
    return df_algo
    

In [55]:
# main()
transform_data_to_s3()

0

100

200

300

400

500

600

700

800

900

1000

1100

1200

1300

1400

1500

1600

1700

1800

1900

2000

2100

2200

2300

2400

2500

2600

2700

2800

# testing speeds

In [53]:
# df = df.pipe(add_)
# display(len(df.columns))
# standard_metrics=['open']#,'close','high','low','volume']
# win_lens = [*range(10,180,10),*range(182,365,91)]
# period_offsets = [*range(1,10),*range(10,101,10)]
# display('40+ times this many metrics', len(standard_metrics)*len(win_lens)**2*len(period_offsets)) 
# %timeit df.pipe(add_stats, lengths=win_lens, metrics=standard_metrics, shifts=win_lens)
# for these metrics: 5 times: 20.8 s ± 213 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# <class 'pandas.core.frame.DataFrame'>
# Index: 4578 entries, 2002-12-31 to 2021-04-08
# Columns: 16650 entries, open to open_length_364_shift_364_spread
# dtypes: float64(16649), int64(1)
# memory usage: 581.6+ MB
## would be a total of 2.5+ GB per stock or ~12.5+ terabytes for all stocks in NYSE
# df = df.pipe(add_stats, lengths=win_lens, metrics=standard_metrics, shifts=win_lens)
# display(df.info(memory_usage='deep'))

# standard_metrics=['open','close','high','low','volume']
# win_lens = [*range(7,30,7)]
# period_offsets = [*range(1,10),*range(10,101,10)]
# shifts = [*range(-180,-14, 14)]
# display('40+ times this many metrics', len(standard_metrics)*len(win_lens)*len(period_offsets)*len(shifts)) 

## for these metrics: 10.7 s ± 117 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# %timeit  add_targets(df, standard_metrics=standard_metrics)
# df = df.pipe(add_targets, standard_metrics=['open'])
# display(df.info(memory_usage='deep'))
# <class 'pandas.core.frame.DataFrame'>
# Index: 4578 entries, 2002-12-31 to 2021-04-08
# Columns: 16650 entries, open to open_length_364_shift_364_spread
# dtypes: float64(16649), int64(1)
# memory usage: 581.8 MB

# standard_metrics=['open','close','high','spread','low','volume']
# win_lens = [*range(10,180,30),*range(182,365,91)]
# period_offsets = [*range(1,10),*range(10,101,10)]
# shifts = [*range(0,180, 14)]
# display('40+ times this many metrics', len(standard_metrics)*len(win_lens)*len(shifts)*len(period_offsets)) 
# df = df.pipe(add_features)
# display(df.info(memory_usage='deep'))
# %timeit df.pipe(add_features)
# <class 'pandas.core.frame.DataFrame'>
# Index: 4578 entries, 2002-12-31 to 2021-04-08
# Columns: 9142 entries, open to year_sin
# dtypes: float64(9141), int64(1)
# memory usage: 319.6 MB
## 14.3 s ± 123 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

# standard_metrics=['open','close','high','spread','low','volume'], 
#                  win_lens = [10,40,70,180,360],
#                  period_offsets = [*range(1,7),*range(10,91,20)],
#                  shifts = [*range(0, 180, 14)],
#                  quantiles = [*np.arange(0.05,.25, 0.10), *np.arange(0.80,1.0, 0.05)]
# df = df.pipe(add_features)
# display(df.info())
# %timeit df.pipe(add_features)
# <class 'pandas.core.frame.DataFrame'>
# Index: 4578 entries, 2002-12-31 to 2021-04-08
# Columns: 5086 entries, open to year_sin
# dtypes: float64(5085), int64(1)
# memory usage: 177.7+ MB
# 7.s74 s ± 96.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# df = df.pipe(add_targets)
# display(df.info())
# %timeit df.pipe(add_targets)
# <class 'pandas.core.frame.DataFrame'>
# Index: 4578 entries, 2002-12-31 to 2021-04-08
# Columns: 3131 entries, open to low_percent_change_90_lag
# dtypes: float64(3130), int64(1)
# memory usage: 109.4+ MB
# 4.25 s ± 14.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
## with individual df['']=... in add_stats:
# 506 ms ± 37.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# %timeit df.pipe(add_stats, lengths=[10,40,70,180,360],\
#                 metrics=['open','close','high','low','volume'],\
#                 shifts=[*range(0, 60, 14)],\
#                 quantiles=[*np.arange(0.05,.25, 0.10), *np.arange(0.80,1.0, 0.05)])
## with creating a list of dicts and concatenating them:
## notes for speed tests
#                 dicts.append({
#                     f'{metric}_length_{length}_shift_{shift_len}_mean' : win.mean(),
#                     f'{metric}_length_{length}_shift_{shift_len}_std' : win.std(),
#                     f'{metric}_length_{length}_shift_{shift_len}_skew' : win.skew(),
#                     f'{metric}_length_{length}_shift_{shift_len}_kurtosis' : win.kurt(),
#                     f'{metric}_length_{length}_shift_{shift_len}_high' : win.max(),
#                     f'{metric}_length_{length}_shift_{shift_len}_low' : win.min()
#                 })
#     df = pd.concat([df, *[pd.DataFrame(d for d in dicts)]])
# 806 ms ± 18.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# %timeit df.pipe(add_features)
# df = df.pipe(add_features)
# display(df.info(memory_usage='deep'))
# display(df)
# latest without shift changes in the percent changes:
# 3.18 s ± 105 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# <class 'pandas.core.frame.DataFrame'>
# Index: 4578 entries, 2002-12-31 to 2021-04-08
# Columns: 1966 entries, open to year_sin
# dtypes: float64(1965), int64(1)
# memory usage: 69.0 MB
## with shift changes in percent changes. I believe the concatenating is a large part of the time to take. 
# 3.14 s ± 112 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
# <class 'pandas.core.frame.DataFrame'>
# Index: 4578 entries, 2002-12-31 to 2021-04-08
# Columns: 1990 entries, open to year_sin
# dtypes: float64(1989), int64(1)
# memory usage: 69.8 MB
## times 5000 stocks: 350 gigabytes; 4 hours. 
## needs to speed up in some way to add in the next 14-100 features with ~1000x feature engineering 
## needs memory minimization for the 100x+ additional feature engineering; what are ways to break up into smaller memory? can the preprocessing be done with batches for the training algorithm; time-memory trade-off. 

Time to try formatting, reading training with small subsets of data, small feature set, and test efficacy. 

# Extras for keras manipulations

In [46]:

# def to_ts_df(daily_stocks_data, lookback, metric):
#     ## column names
#     columns = list()
#     for i in range(lookback):
#         columns.append(f'{metric}_{i}')
#     columns.append(f'{metric}_target')
#     df = pd.DataFrame(columns=columns)
#     ## columns
#     data = daily_stocks_data[metric].to_numpy()
#     for index, col in enumerate(df.columns):
#         df[col] = data[index:len(data)-lookback+index]
#     ## dates index
#     dates = daily_stocks_data.date.to_numpy()[:-lookback]
#     df.insert(0, 'dates', dates)
#     df.dropna(axis='index', inplace=True)
#     return df
# def to_ts(df, metric, lookback):
#     data, targets = list(), list()
#     for i in range(lookback,len(df.index)):
#         data.append(df.iloc[i-lookback:i,:].values) ## first four metrics
#         targets.append(df[metric].to_list()[i])
#     data = np.array(data)
#     targets = np.array(targets)
#     return data, targets
# def min_max_scale(col):
#     scaled = col.subtract(col.min()).divide(col.max()-col.min())
#     return scaled
# def multi_stock_ts_split(df,tickers): ## could be sped up 
#     data_tr, data_te, targets_tr, targets_te = [],[],[],[]
#     for ticker in tickers:
#         data, targets = to_ts(df[ticker].dropna(), 'low', lookback) ## drops nan for each stock
#         x = train_test_split(data, targets, shuffle=False)
#         data_tr.append(x[0])
#         data_te.append(x[1])
#         targets_tr.append(x[2]) 
#         targets_te.append(x[3])    
#     return np.concatenate(data_tr), np.concatenate(data_te), np.concatenate(targets_tr), np.concatenate(targets_te)
# df = pd.read_pickle(f"./{tickers}_daily.pkl")
# df = df.apply(min_max_scale)
# features = len(df.columns)
# display(df)