In [None]:
crypto_key = "YOUR_CRYPTO_API_KEY"
stocks_key = "YOUR_STOCKS_API_KEY"
agg_bar_size = "1"
agg_bar_unit = "minute"

is_crypto = False
tickers = ["SPY"]
start_date = "2015-01-01"
end_date = "2023-03-18"

#Install dependencies

In [None]:
%%capture
!pip install aiohttp aiodns
!pip install pandas_market_calendars

In [None]:
%%capture
!pip install nest_asyncio

# Import Libraries

In [None]:
import pandas as pd
import numpy as np
import pandas_market_calendars as mcal
from datetime import datetime, date, time, timedelta
import requests
import os
import glob
import nest_asyncio
nest_asyncio.apply()
import pandas as pd
import asyncio
import aiohttp  # 
import datetime
import warnings
from dateutil.relativedelta import *
warnings.filterwarnings("ignore")
import pytz

# Download data from Polygon

In [None]:
def daterange(date1, date2):
    for n in range(int((date2 - date1).days) + 1):
        yield date1 + timedelta(n)

data_dict = {}

async def get(
    session: aiohttp.ClientSession,
    date: str,
    **kwargs
) -> dict:
    global data_dict
    if is_crypto:
      key = crypto_key
    else:
      key = stocks_key

    api = f"https://api.polygon.io/v2/aggs/ticker/{ticker}/range/{agg_bar_size}/{agg_bar_unit}/{date}/{date}?adjusted=true&sort=asc&limit=1440&apiKey={key}"
    # print(f"Requesting {api}")
    resp = await session.request('GET', url=api, **kwargs)
    # print(resp)
    data = await resp.json()
    # print(data)
    data_dict[date] = data
    
async def main(dates, **kwargs):
    async with aiohttp.ClientSession() as session:
        tasks = []
        for c in dates:
            tasks.append(get(session=session, date=c, **kwargs))
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        return responses

# Make Cache

In [None]:
def convert_responses_to_ohlcv():
  print(f"Response Count: {len(data_dict)}")
  new_dict = []

  for index,i in enumerate(data_dict):
      if 'results' not in list(data_dict[i].keys()):
          pass
      else:
          new_dict = new_dict + data_dict[i]['results']

  return new_dict

def make_df(new_dict):
  df = pd.DataFrame(new_dict)
  df['timestamp'] = pd.to_datetime(df['t'], unit='ms')
  df['timestamp'] = df['timestamp'].dt.tz_localize('UTC')
  df['timestamp'] = df['timestamp'].dt.tz_convert('US/Eastern')
  df['timestamp'] = df['timestamp'].dt.tz_localize(None)
  df.sort_values(by='timestamp', ignore_index=True, inplace=True)
  # df['timestamp'] = df['timestamp'] + timedelta(minutes=1)

  df.sort_values(by='timestamp', ignore_index=True, inplace=True)

  df = df[['timestamp','o','h','l','c','v']]
  return df

# Run stocks data tests

In [None]:
def get_original_time_difference(dt):
  nyse = mcal.get_calendar("NYSE")
  tms_df = nyse.schedule(start_date=str(dt), end_date=str(dt))
  # print((tms_df['market_close'].iloc[0] - tms_df['market_open'].iloc[0]).total_seconds() / 60)
  return (tms_df['market_close'].iloc[0] - tms_df['market_open'].iloc[0]).total_seconds() / 60

def check_holidays(row, circuit_breakers):
  if row['date'].strftime("%Y-%m-%d") in circuit_breakers:
    row['missing'] = 0
  elif get_original_time_difference(row['date']) < row['timestamp']:
    row['missing'] = 0
  else:
    row['missing'] = 1

  return row

def is_stock_data_missing(df):
  circuit_breakers = ['2020-03-09', '2020-03-12', '2020-03-16', '2020-03-18'] 
  df['timestamp'] = pd.to_datetime(df['timestamp'])
  market_hours_df = df.loc[(df['timestamp'].dt.time >= time(9, 30)) & (df['timestamp'].dt.time <= time(16, 0))]
  data_count_df = market_hours_df.groupby(by=market_hours_df['timestamp'].dt.date).count()
  missing_data_df = data_count_df.loc[data_count_df['timestamp'] < 390]
  missing_data_df['date'] = missing_data_df.index

  missing_data_df = missing_data_df.apply(check_holidays, args=([circuit_breakers]), axis=1)

  if missing_data_df[missing_data_df['missing'] == 1].shape[0] > 0:
    print("Some Market Hours data is missing!")
    print(missing_data_df[missing_data_df['missing'] == 1])
    return missing_data_df

  else:
    print("No market hours data missing! Run write data to bucket section")
    return missing_data_df

# Run crypto data tests

In [None]:
def is_crypto_data_missing(df):
  data_count_df = df.groupby(df['timestamp'].dt.date).count()
  missing_data_df = data_count_df.loc[data_count_df['timestamp'] < 1440]

  if missing_data_df.shape[0] > 0:
    print("Some data is missing!")
    print(missing_data_df)
    return True

  else:
    print("No data missing!")
    return False

# Write data to bucket

In [None]:
def remove_old_cache_if_already_exists(ticker):
  if ':' in ticker:
    ticker_to_write = ticker.split(':')[1]
  else:
    ticker_to_write = ticker

  fpaths = glob.glob(f"/content/BTCacheResampled_{ticker_to_write}*.csv")

  for f in fpaths:
    os.remove(f)
    print(f"Removed {f}")

def write_to_bucket(df, ticker):
  remove_old_cache_if_already_exists(ticker)

  if ':' in ticker:
    ticker_to_write = ticker.split(':')[1]
  else:
    ticker_to_write = ticker

  df.to_csv(f"/content/BTCacheResampled_{ticker_to_write}_{df['timestamp'].iloc[0].strftime('%Y-%m-%d')}_{df['timestamp'].iloc[-1].strftime('%Y-%m-%d')}.csv", 
            index=False)

  print(f"File saved at: /content/bucket/Resampled Cache/BTCacheResampled_{ticker_to_write}_{df['timestamp'].iloc[0].strftime('%Y-%m-%d')}_{df['timestamp'].iloc[-1].strftime('%Y-%m-%d')}.csv")

# Main

In [None]:
if __name__ == '__main__':
  for ticker in tickers:
    dates = []
    for i in daterange(pd.to_datetime(start_date), pd.to_datetime(end_date)):
        dates.append(i.date().strftime("%Y-%m-%d"))

    ticker = ticker.upper()
    asyncio.run(main(dates))  
    print(f"Data Downloaded")

    n_dict = convert_responses_to_ohlcv()
    df = make_df(n_dict)
    print(f"OHLCV Prepared for {ticker}")

    write_to_bucket(df, ticker)

    data_dict = {}