In [7]:
import pandas as pd
from github import Github

import concurrent.futures
import time
import datetime as dt
import io
import os
from threading import Thread
import logging

from alpaca.trading.client import TradingClient

from alpaca.data import StockHistoricalDataClient
from alpaca.data.requests import StockBarsRequest
from alpaca.data.timeframe import TimeFrame
from alpaca.data.enums import Adjustment, DataFeed

from Alpaca_config import *

trading_client = TradingClient(API_KEY_PAPER, API_SECRET_PAPER) # dir(trading_client)
stock_client = StockHistoricalDataClient(API_KEY_PAPER, API_SECRET_PAPER)

In [2]:
repository = Github(github_strat_token).get_user().get_repo(dedicated_repo)
ETF_mapping = repository.get_contents(gh_csv_ETFDB)
ETF_mapping_df = pd.read_csv(io.StringIO(ETF_mapping.decoded_content.decode()),sep=",")
ALL_ETFS = ETF_mapping_df.symbol.to_list()

In [4]:
len(ALL_ETFS)

2697

In [5]:
clock = trading_client.get_clock()
today = clock.timestamp
previous_day = today - pd.Timedelta('1D')
previous_day_40 = today - pd.Timedelta('40D')
previous_day_40

datetime.datetime(2023, 8, 14, 19, 41, 30, 911104, tzinfo=datetime.timezone(datetime.timedelta(days=-1, seconds=72000)))

In [24]:
import time

# Timer start for original option
start_time = time.time()

bars_request_params = StockBarsRequest(symbol_or_symbols=ALL_ETFS, 
                                       start=previous_day_40, end=previous_day, 
                                       timeframe=TimeFrame.Day, adjustment=Adjustment.ALL,feed=DataFeed.SIP)
daily_df = stock_client.get_stock_bars(bars_request_params).df

# Timer end for original option
end_time = time.time()
print(f"Original option took {end_time - start_time:.2f} seconds.")

Original option took 20.60 seconds.


In [9]:
daily_df = daily_df.reset_index()
daily_df.timestamp = daily_df.timestamp.dt.date
daily_df

Unnamed: 0,symbol,timestamp,open,high,low,close,volume,trade_count,vwap
0,AAXJ,2023-08-15,65.540,65.540,65.0500,65.1400,325578.0,2385.0,65.186224
1,AAXJ,2023-08-16,64.620,64.930,64.4550,64.5000,808612.0,4847.0,64.588107
2,AAXJ,2023-08-17,65.190,65.326,64.6100,64.6800,1401366.0,6944.0,64.955623
3,AAXJ,2023-08-18,63.910,64.200,63.7100,64.0700,1430987.0,6294.0,64.002295
4,AAXJ,2023-08-21,63.800,64.200,63.7600,64.1600,852505.0,4688.0,63.922077
...,...,...,...,...,...,...,...,...,...
73311,ZHDG,2023-09-18,17.650,17.650,17.5930,17.5950,663.0,5.0,17.593878
73312,ZHDG,2023-09-19,17.470,17.530,17.4100,17.5300,6591.0,9.0,17.450319
73313,ZHDG,2023-09-20,17.551,17.551,17.4309,17.4309,5475.0,4.0,17.498507
73314,ZHDG,2023-09-21,17.180,17.340,17.1400,17.1464,11714.0,126.0,17.222009


In [27]:
start_time_parallel = time.time()

# 1. Split your symbols into smaller chunks
def chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

# 2. Create a function to fetch the data for each chunk
def fetch_data_for_chunk(symbols_chunk):
    bars_request_params = StockBarsRequest(
        symbol_or_symbols=symbols_chunk,
        start=previous_day_40,
        end=previous_day,
        timeframe=TimeFrame.Day,
        adjustment=Adjustment.ALL,
        feed=DataFeed.SIP
    )
    return stock_client.get_stock_bars(bars_request_params).df.reset_index()

# 3. Use ThreadPoolExecutor to parallelize the fetching
NUM_THREADS = 5
SYMBOLS_PER_CHUNK = (len(ALL_ETFS) // NUM_THREADS)+10
symbols_chunks = list(chunks(ALL_ETFS, SYMBOLS_PER_CHUNK))

dataframes = []

with concurrent.futures.ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
    for df in executor.map(fetch_data_for_chunk, symbols_chunks):
        dataframes.append(df)

# Concatenate the results
final_df_parallel = pd.concat(dataframes, ignore_index=True)
final_df_parallel.timestamp = final_df_parallel.timestamp.dt.date

# Timer end for parallelized option
end_time_parallel = time.time()
print(f"Parallelized option took {end_time_parallel - start_time_parallel:.2f} seconds.")


Parallelized option took 7.26 seconds.


In [22]:
#final_df_parallel = final_df_parallel.reset_index()
#final_df_parallel.timestamp = final_df_parallel.timestamp.dt.date
final_df_parallel

Unnamed: 0,symbol,timestamp,open,high,low,close,volume,trade_count,vwap
0,AAXJ,2023-08-15,65.5400,65.5400,65.0500,65.1400,325578.0,2385.0,65.186224
1,AAXJ,2023-08-16,64.6200,64.9300,64.4550,64.5000,808612.0,4847.0,64.588107
2,AAXJ,2023-08-17,65.1900,65.3260,64.6100,64.6800,1401366.0,6944.0,64.955623
3,AAXJ,2023-08-18,63.9100,64.2000,63.7100,64.0700,1430987.0,6294.0,64.002295
4,AAXJ,2023-08-21,63.8000,64.2000,63.7600,64.1600,852505.0,4688.0,63.922077
...,...,...,...,...,...,...,...,...,...
73311,WFIG,2023-09-18,42.8510,42.8870,42.8510,42.8870,1192.0,2.0,42.853850
73312,WFIG,2023-09-19,42.8000,42.8289,42.7715,42.7715,1839.0,3.0,42.809434
73313,WFIG,2023-09-20,42.7810,42.7810,42.7810,42.7810,1.0,1.0,42.910000
73314,WFIG,2023-09-21,42.4950,42.4950,42.4624,42.4624,250.0,3.0,42.491332
