In [1]:
import pandas as pd
import numpy as np
import warnings
from tqdm import tqdm
warnings.filterwarnings("ignore")

In [2]:
def process_dataset(df, symbol, datasets=None):
    df = df[df.symbol==symbol]
    if datasets is None:
        datasets = {}
    for dur in durations:
        new_dataset = df.resample(dur).agg({'open': 'first', 'high': 'max', 'low': 'min', 'close': 'last', 'volume': 'sum'})
        datasets[dur] = new_dataset.copy()
    return datasets

### minute level data

In [3]:
bucket_loc = 's3://sisyphus-general-bucket/AthenaInsights'
data_folder = 'latest_data'

In [4]:
# stock_data_day_level_name: stock_bars_day.parquet
# stock_bars_hour_level_name: stock_bars_hour.parquet
# stock_bars_minute_level_name: stock_bars_minute.parquet

In [8]:
import pandas as pd

def fill_missing_minutes(df, freq='1T'):
    """
    Fill missing minutes in financial time series data for each symbol, ensuring no data
    is generated outside of a day's trading hours.

    Parameters:
        df (pd.DataFrame): Dataframe with columns 'symbol', 'us_eastern_timestamp', 'open', 'high', 'low', 'close', 'volume', 'trade_count', 'vwap'.
        freq (str): Frequency for resampling, default is '1T' (one minute).

    Returns:
        pd.DataFrame: Dataframe with missing minutes filled within valid trading hours.
    """
    # Convert timestamp to datetime if not already
    df['us_eastern_timestamp'] = pd.to_datetime(df['us_eastern_timestamp'])

    # Set datetime as the index temporarily
    df.set_index('us_eastern_timestamp', inplace=True)

    # Function to resample each group while respecting daily bounds
    def resample_group(group):
        # Group by each day to respect daily boundaries
        daily_groups = []
        for name, day_group in group.groupby(group.index.date):
            min_time = day_group.index.min()
            max_time = day_group.index.max()

            # Resample within the day's min and max times
            resampled = day_group.resample(freq).ffill()
            resampled = resampled[(resampled.index >= min_time) & (resampled.index <= max_time)]

            # Fill missing data within the day
            resampled['open'].fillna(resampled['close'], inplace=True)
            resampled['high'].fillna(resampled['close'], inplace=True)
            resampled['low'].fillna(resampled['close'], inplace=True)
            resampled['volume'].fillna(0, inplace=True)
            resampled['trade_count'].fillna(0, inplace=True)
            resampled['vwap'].fillna(resampled['close'], inplace=True)

            daily_groups.append(resampled)

        # Combine all daily resampled groups
        return pd.concat(daily_groups)

    # Apply the resampling function to each symbol group
    filled_df = df.groupby('symbol').apply(resample_group)

    # Clean up the index
    filled_df.reset_index(level=0, drop=True, inplace=True)

    return filled_df.reset_index()

In [9]:
df = pd.read_parquet(f'{bucket_loc}/{data_folder}/parquet/stock_bars_minute.parquet')
df['timestamp'] = pd.to_datetime(df['timestamp'])
df['us_eastern_timestamp'] = df['timestamp'].dt.tz_convert('US/Eastern')
df['us_eastern_timestamp'] = df['us_eastern_timestamp'].dt.tz_localize(None)
df = df.drop(columns='timestamp')
df = df[df.symbol.isin(['SPY', 'QQQ'])]

assert df.open.isna().sum() == 0
assert df.high.isna().sum() == 0
assert df.low.isna().sum() == 0
assert df.close.isna().sum() == 0

# df['us_eastern_date'] = df.us_eastern_timestamp.dt.date
# df['market_open'] = df.us_eastern_timestamp.between('09:30:00', '16:00:00')
df = fill_missing_minutes(df)
df['market_open'] = (df.us_eastern_timestamp.dt.time>=pd.to_datetime('09:30:00').time()) & (df.us_eastern_timestamp.dt.time < pd.to_datetime('16:00:00').time())
df.set_index('us_eastern_timestamp', inplace=True)

In [10]:
df.head()

Unnamed: 0_level_0,symbol,open,high,low,close,volume,trade_count,vwap,market_open
us_eastern_timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
2024-01-02 04:00:00,QQQ,409.84,409.84,409.76,409.79,1541.0,26.0,409.799268,False
2024-01-02 04:01:00,QQQ,409.81,409.81,409.75,409.75,604.0,11.0,409.776912,False
2024-01-02 04:02:00,QQQ,409.71,409.71,409.68,409.68,1510.0,9.0,409.68931,False
2024-01-02 04:03:00,QQQ,409.71,409.71,409.66,409.66,2679.0,18.0,409.697663,False
2024-01-02 04:04:00,QQQ,409.63,409.63,409.61,409.61,343.0,6.0,409.623333,False


In [11]:
datasets = {}
symbol = df.symbol.unique() # 'SPY'
durations = ['2min', '3min', '5min', '10min', '15min', '20min', '25min', '30min']
for sym in symbol:
    datasets[sym] = {}
    datasets[sym]['1min'] = df[df.symbol==sym].copy()
    datasets[sym] = process_dataset(df, sym, datasets[sym])

In [12]:
datasets.keys(), datasets['SPY'].keys()

(dict_keys(['QQQ', 'SPY']),
 dict_keys(['1min', '2min', '3min', '5min', '10min', '15min', '20min', '25min', '30min']))

### hour level data

In [13]:
# stock_data_day_level_name: stock_bars_day.parquet
# stock_bars_hour_level_name: stock_bars_hour.parquet
# stock_bars_minute_level_name: stock_bars_minute.parquet

In [20]:
df2 = pd.read_parquet(f'{bucket_loc}/{data_folder}/parquet/stock_bars_hour.parquet')
df2['timestamp'] = pd.to_datetime(df2['timestamp'])
df2['us_eastern_timestamp'] = df2['timestamp'].dt.tz_convert('US/Eastern')
df2['us_eastern_timestamp'] = df2['us_eastern_timestamp'].dt.tz_localize(None)
df2 = df2.drop(columns='timestamp')
df2 = df2[df2.symbol.isin(['SPY', 'QQQ'])]

# df2['us_eastern_date'] = df2.us_eastern_timestamp.dt.date
# df2['market_open'] = df2.us_eastern_timestamp.between('09:30:00', '16:00:00')
df2['market_open'] = (df2.us_eastern_timestamp.dt.time>=pd.to_datetime('09:30:00').time()) & (df2.us_eastern_timestamp.dt.time < pd.to_datetime('16:00:00').time())
df2.set_index('us_eastern_timestamp', inplace=True)

In [21]:
df2.head()

Unnamed: 0_level_0,symbol,open,high,low,close,volume,trade_count,vwap,market_open
us_eastern_timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
2024-01-02 04:00:00,QQQ,409.84,409.84,408.94,409.03,31737.0,355.0,409.28823,False
2024-01-02 05:00:00,QQQ,409.05,409.1,408.45,408.49,28039.0,234.0,408.816953,False
2024-01-02 06:00:00,QQQ,408.42,408.59,405.5,406.17,280578.0,1327.0,406.407732,False
2024-01-02 07:00:00,QQQ,406.19,406.25,404.83,404.99,308453.0,1862.0,405.626898,False
2024-01-02 08:00:00,QQQ,408.41,410.0,404.81,405.61,469161.0,4130.0,405.534781,False


In [22]:
symbol = df2.symbol.unique() # 'SPY'
durations = ['120min', '180min', '240min']
for sym in symbol:
    datasets[sym]['60min'] = df2[df2.symbol==sym].copy()
    datasets[sym] = process_dataset(df2, sym, datasets[sym])

In [23]:
datasets.keys(), datasets['SPY'].keys()

(dict_keys(['QQQ', 'SPY']),
 dict_keys(['1min', '2min', '3min', '5min', '10min', '15min', '20min', '25min', '30min', '60min', '120min', '180min', '240min']))

### day level data

In [24]:
# stock_data_day_level_name: stock_bars_day.parquet
# stock_bars_hour_level_name: stock_bars_hour.parquet
# stock_bars_minute_level_name: stock_bars_minute.parquet

In [25]:
df3 = pd.read_parquet(f'{bucket_loc}/{data_folder}/parquet/stock_bars_day.parquet')
df3['timestamp'] = pd.to_datetime(df3['timestamp'])
df3['us_eastern_timestamp'] = df3['timestamp'].dt.tz_convert('US/Eastern')
df3['us_eastern_timestamp'] = df3['us_eastern_timestamp'].dt.tz_localize(None)
df3 = df3.drop(columns='timestamp')
df3 = df3[df3.symbol.isin(['SPY', 'QQQ'])]

# df3['us_eastern_date'] = df3.us_eastern_timestamp.dt.date
# df3['market_open'] = df3.us_eastern_timestamp.between('09:30:00', '16:00:00')
df3['market_open'] = (df3.us_eastern_timestamp.dt.time>=pd.to_datetime('09:30:00').time()) & (df3.us_eastern_timestamp.dt.time < pd.to_datetime('16:00:00').time())
df3.set_index('us_eastern_timestamp', inplace=True)

In [26]:
df3.head()

Unnamed: 0_level_0,symbol,open,high,low,close,volume,trade_count,vwap,market_open
us_eastern_timestamp,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1
2024-01-02,QQQ,405.84,406.09,400.24,402.59,58073141.0,498841.0,402.642947,False
2024-01-03,QQQ,399.93,401.0,397.89,398.33,47037517.0,421720.0,399.376503,False
2024-01-04,QQQ,396.44,399.59,396.06,396.28,39471644.0,341643.0,397.541973,False
2024-01-05,QQQ,396.45,399.56,395.34,396.75,44923110.0,405464.0,397.442473,False
2024-01-08,QQQ,397.99,405.24,397.8399,404.95,42543338.0,362296.0,402.636103,False


In [27]:
symbol = df3.symbol.unique() # 'SPY'
durations = ['2D', '3D', '5D', '10D', '15D', '20D', '30D', '50D', '100D', '150D', '200D']
for sym in symbol:
    datasets[sym]['1D'] = df3[df3.symbol==sym].copy()
    datasets[sym] = process_dataset(df3, sym, datasets[sym])

### writing out to s3

In [28]:
all_durations = []
for sym in datasets.keys():
    for dur in datasets[sym].keys():
        all_durations.append(dur)
all_durations = set(all_durations)
print(all_durations)

{'60min', '10min', '3D', '240min', '2min', '25min', '15min', '150D', '2D', '1min', '1D', '20min', '20D', '30D', '3min', '30min', '10D', '120min', '5min', '100D', '50D', '200D', '5D', '15D', '180min'}


In [29]:
for dur in all_durations:
    dur_df = pd.DataFrame()
    for sym in datasets.keys():
        if dur in datasets[sym].keys():
            dur_df = pd.concat([dur_df, datasets[sym][dur].assign(symbol=sym)])
    dur_df.to_parquet(f'{bucket_loc}/{data_folder}/data_prep/stock_bars_{dur}.parquet')

### testing

In [21]:
date_f = '2024-11-04'

In [22]:
# x = df[df.symbol=='SPY'].reset_index()
x[(x.timestamp>=pd.to_datetime(f'2024-11-04 00:00:00+00:00'))&(x.timestamp<pd.to_datetime(f'2024-11-04 23:00:000+00:00'))].trade_count.sum()
x[(x.us_eastern_timestamp>=pd.to_datetime(f'2024-11-04 00:00:00'))&(x.us_eastern_timestamp<pd.to_datetime(f'2024-11-04 23:00:00'))]#.trade_count.sum()

NameError: name 'x' is not defined

In [None]:
y = df2[df2.symbol=='SPY']
y[y.us_eastern_date==pd.to_datetime(date_f)]

In [None]:
z = df3[df3.symbol=='SPY']
z[z.us_eastern_date==pd.to_datetime(date_f)]

In [None]:
# 