In [56]:
import pandas as pd
import boto3
import pyarrow
import awswrangler as wr

from alpaca.data.historical import StockHistoricalDataClient
from alpaca.data.live import StockDataStream
from alpaca.data.requests import StockBarsRequest, StockTradesRequest
from alpaca.data.timeframe import TimeFrame

from dotenv import load_dotenv
import os
from datetime import datetime, timedelta

In [91]:
load_dotenv()
ALPACA_API_KEY = os.environ.get("ALPACA_API_KEY")
ALPACA_SECRET_KEY = os.environ.get("ALPACA_SECRET_KEY")
AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY_ID")
AWS_SECRET_KEY =  os.environ.get("AWS_SECRET_ACCESS_KEY")
AWS_REGION = os.environ.get("AWS_DEFAULT_REGION")

## Reading Tickers

In [58]:
nasdaq = pd.read_csv("https://s3-alpaca-stock-data.s3.us-west-1.amazonaws.com/tickers/nasdaq100.csv")
nyse = pd.read_csv("https://s3-alpaca-stock-data.s3.us-west-1.amazonaws.com/tickers/nyse100.csv")

In [59]:
nasdaq.head()

Unnamed: 0,Ticker,Code,Name
0,FOXA,GOOG/NASDAQ_FOXA,21st Century Fox Class A
1,ATVI,GOOG/NASDAQ_ATVI,Activision Blizzard Inc
2,ADBE,GOOG/NASDAQ_ADBE,Adobe Systems Inc
3,AKAM,GOOG/NASDAQ_AKAM,Akamai Technologies Inc
4,ALXN,GOOG/NASDAQ_ALXN,Alexion Pharmaceuticals Inc


In [60]:
nyse.head()

Unnamed: 0,Ticker,Code,Name
0,MMM,GOOG/NYSE_MMM,3M Co.
1,ABT,GOOG/NYSE_ABT,Abbott Laboratories
2,ACN,GOOG/NYSE_ACN,Accenture PLC Cl A
3,AGN,GOOG/NYSE_AGN,Allergan Inc.
4,MO,GOOG/NYSE_MO,Altria Group Inc.


In [61]:
nasdaq_tickers = list(nasdaq.iloc[:, 0])
nyse_tickers = list(nyse.iloc[:, 0])
tickers = set(nasdaq_tickers + nyse_tickers)

## Batch Data Ingestion

In [62]:
hist_client = StockHistoricalDataClient(ALPACA_API_KEY, ALPACA_SECRET_KEY)

In [63]:
def convert_datetime(date: datetime):
    return datetime(date.year, date.month, date.day, 0, 0, 0)

def getStockHistoricalData(client: StockHistoricalDataClient, start_date, end_date = datetime.today()):
    bar_request = StockBarsRequest(
        symbol_or_symbols=tickers,
        timeframe=TimeFrame.Day,
        start=start_date,
        end=end_date
    )
    res = client.get_stock_bars(bar_request)
    return(res)

bars = getStockHistoricalData(hist_client, datetime(2023, 9, 13), datetime(2023, 9, 14)).data
# bars = getStockHistoricalData(hist_client, datetime.today() - timedelta(days = 1)).data

In [64]:
len(bars)

158

In [65]:
BAR_SCHEMA = {
    "symbol": str,
    "high": "float64",
    "low": "float64",
    "open": "float64",
    "timestamp": "datetime64[ns]",
    "trade_count": "float64",
    "volume": "float64",
    "vwap": "float64"
}
df = pd.DataFrame(columns=BAR_SCHEMA.keys()).astype(BAR_SCHEMA)

for i, ticker in enumerate(bars):
    for bar in bars[ticker]:
        entry = {
            "symbol": bar.symbol,
            "high": bar.high,
            "low": bar.low,
            "open": bar.open,
            "timestamp": bar.timestamp,
            "trade_count": bar.trade_count,
            "volume": bar.volume,
            "vwap": bar.vwap
        }
        df.loc[len(df)] = entry


In [66]:
df['year'] = df['timestamp'].dt.year
df['month'] = df['timestamp'].dt.month
df['day'] = df['timestamp'].dt.day

In [67]:
df.head()

Unnamed: 0,symbol,high,low,open,timestamp,trade_count,volume,vwap,year,month,day
0,DD,75.49,73.98,75.2,2023-09-13 04:00:00+00:00,29430.0,2151232.0,74.346422,2023,9,13
1,DLTR,114.6742,112.16,114.6,2023-09-13 04:00:00+00:00,37580.0,2047961.0,113.154879,2023,9,13
2,MA,418.41,414.97,416.17,2023-09-13 04:00:00+00:00,44541.0,2142220.0,416.384234,2023,9,13
3,PG,153.59,151.66,151.975,2023-09-13 04:00:00+00:00,67637.0,4040670.0,152.901439,2023,9,13
4,VOD,9.915,9.82,9.87,2023-09-13 04:00:00+00:00,16761.0,7157524.0,9.871288,2023,9,13


## Using AWS SDK (Main Flow)

In [92]:
session = boto3.Session(
    aws_access_key_id = AWS_ACCESS_KEY,
    aws_secret_access_key = AWS_SECRET_KEY,
    region_name = AWS_REGION
)

In [89]:
glue_db_name = "alpaca_stocks_database"
glue_table_name = f"stocks_table_{datetime.now().year}_{datetime.now().month}"
s3_bucket_path = "s3://s3-alpaca-stock-data/daily/"

# wr.catalog.create_parquet_table(database = glue_db_name,
#                                 table = glue_table_name,
#                                 boto3_session = session,
#                                 path = s3_bucket_path,
#                                 columns_types = {
#                                     "symbol": "string",
#                                     "high": "double",
#                                     "low": "double",
#                                     "open": "double",
#                                     "timestamp": "timestamp",
#                                     "trade_count": "double",
#                                     "volume": "double",
#                                     "vwap": "double"
#                                 },
#                                 partitions_types = {
#                                     "year": "string",
#                                     "month": "string",
#                                     "day": "string",
#                                 })

In [87]:
wr.s3.to_parquet(
    df = df,
    path = s3_bucket_path,
    dataset = True,
    partition_cols = ["year", "month", "day"],
    database = glue_db_name,
    table = glue_table_name,
    mode = "overwrite_partitions",
    boto3_session = boto3.Session()
)

{'paths': ['s3://s3-alpaca-stock-data/daily/year=2023/month=9/day=13/f4024133d61b4c77971b5a6f3109b41e.snappy.parquet'],
 'partitions_values': {'s3://s3-alpaca-stock-data/daily/year=2023/month=9/day=13/': ['2023',
   '9',
   '13']}}

In [51]:
wr.athena.read_sql_query(f"SELECT * FROM {glue_table_name}", database = glue_db_name)

QueryFailed: TABLE_NOT_FOUND: line 7:3: Table 'awsdatacatalog.alpaca_stocks_database.stocks_table_2023_9' does not exist. You may need to manually clean the data at location 's3://aws-athena-query-results-548386779929-us-east-2/tables/68f95f0a-eb1b-48dd-b8fe-6a9fdef8585e' before retrying. Athena will not delete data in your account.

In [50]:
df_athena

Unnamed: 0,symbol,high,low,open,timestamp,trade_count,volume,vwap,year,month,day
0,DD,76.42,75.305,75.97,2023-09-11 04:00:00,29616.0,1879465.0,75.757702,2023,9,11
1,FOXA,32.32,31.320,31.36,2023-09-11 04:00:00,46701.0,3229086.0,31.827620,2023,9,11
2,GILD,77.17,76.000,76.05,2023-09-11 04:00:00,60056.0,8053756.0,76.709498,2023,9,11
3,VZ,34.09,33.580,33.70,2023-09-11 04:00:00,111546.0,18883869.0,33.877310,2023,9,11
4,DD,75.98,74.920,75.01,2023-09-12 04:00:00,27848.0,2093238.0,75.389116,2023,9,12
...,...,...,...,...,...,...,...,...,...,...,...
311,AKAM,105.34,103.925,104.20,2023-09-12 04:00:00,29581.0,1321847.0,104.382950,2023,9,12
312,BAX,39.14,38.565,38.93,2023-09-11 04:00:00,30044.0,2790719.0,38.799307,2023,9,11
313,CHKP,136.58,134.850,135.00,2023-09-11 04:00:00,11168.0,512016.0,135.961660,2023,9,11
314,BAX,38.99,38.220,38.94,2023-09-12 04:00:00,26697.0,2121468.0,38.471872,2023,9,12


## Using Boto3

In [124]:
df.to_parquet("stocks.parquet.gzip", engine = "pyarrow", compression = "gzip")

In [125]:
s3 = boto3.client("s3")

file_path = "stocks.parquet.gzip"
bucket_name = "s3-alpaca-stock-data"


In [126]:
s3.upload_file(file_path, bucket_name, "historical/test.parquet.gzip")