In [1]:
import requests
# get alpaca api keys using this guide: https://alpaca.markets/docs/market-data/getting-started/#creating-an-alpaca-account-and-finding-your-api-keys
from secrets_config import api_key_id, api_secret_key 
import pandas as pd
from datetime import datetime, timezone

In [2]:
# docs: https://alpaca.markets/docs/api-references/market-data-api/stock-pricing-data/historical/

"""
request data with the following parameters: 
- stock: tesla 
- start date: 01/01/2020
- end date: 02/01/2020
"""
stock_ticker = "tsla" # tlsa maps to tesla
base_url = f"https://data.alpaca.markets/v2/stocks/{stock_ticker}/trades"
start_time = datetime(2020, 1, 1, tzinfo=timezone.utc).isoformat()
end_time = datetime(2020, 1, 2, tzinfo=timezone.utc).isoformat()

"""
to authenticate to the api, you will need to use the APCA-API-KEY-ID and APCA-API-SECRET-KEY fields. 
for example:
    headers = {
        "APCA-API-KEY-ID": "<your_api_key_id>",
        "APCA-API-SECRET-KEY": "<your_api_secret_key>"
    }   

and in the request.get() method, you will have to use: 
    requests.get(url=url, params=params, headers=headers)
"""

response_data = []

params = {
    "start": start_time,
    "end": end_time
}

# auth example: https://alpaca.markets/docs/api-references/trading-api/
headers = {
    "APCA-API-KEY-ID": api_key_id,
    "APCA-API-SECRET-KEY": api_secret_key
}
response = requests.get(base_url, params=params, headers=headers)
if response.json().get("trades") is not None: 
    response_data.extend(response.json().get("trades"))

In [3]:
# read json data to a dataframe 
df_quotes = pd.json_normalize(data=response_data, meta=["symbol"])
# rename columns to more meaningful names
df_quotes_renamed = df_quotes.rename(columns={
    "i": "id",
    "t": "timestamp",
    "x": "exchange",
    "p": "price",
    "s": "size",
})

In [4]:
# keep only 'id', 'timestamp', 'exchange', 'price', 'size' columns 
df_quotes_selected = df_quotes_renamed[['id', 'timestamp', 'exchange', 'price', 'size']]

In [5]:
df_exchange_codes = pd.read_csv("data/exchange_codes.csv")

In [6]:
df_exchange = pd.merge(left=df_quotes_selected, right=df_exchange_codes, left_on="exchange", right_on="exchange_code").drop(columns=["exchange_code", "exchange"]).rename(columns={"exchange_name": "exchange"})
df_exchange.head()

Unnamed: 0,id,timestamp,price,size,exchange
0,9905,2020-01-01T00:00:20.4997Z,418.93,60,Cboe EDGX
1,9906,2020-01-01T00:00:38.5731Z,418.7,10,Cboe EDGX
2,9907,2020-01-01T00:09:35.5596Z,418.75,20,Cboe EDGX
3,9908,2020-01-01T00:10:13.0418Z,418.75,115,Cboe EDGX
4,9909,2020-01-01T00:10:28.0684Z,418.75,200,Cboe EDGX


Create new file with new timestamp of the ingestion start and end times

In [7]:
start_time_str = start_time.replace(":", "-").replace(".","-")
end_time_str = end_time.replace(":", "-").replace(".","-")
df_exchange.to_parquet(f"data/exchange_data_{start_time_str}_{end_time_str}.parquet", index=False)

Upsert data into postgres database table using timestamp as the primary key

In [8]:
from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, Float # https://www.tutorialspoint.com/sqlalchemy/sqlalchemy_core_creating_table.htm
from sqlalchemy.engine import URL
from sqlalchemy.dialects import postgresql
from secrets_config import db_user, db_password, db_server_name, db_database_name
from sqlalchemy.schema import CreateTable 

In [9]:
# create connection to database 
connection_url = URL.create(
    drivername = "postgresql+pg8000", 
    username = db_user,
    password = db_password,
    host = db_server_name, 
    port = 5432,
    database = db_database_name, 
)

engine = create_engine(connection_url)


In [10]:
df_exchange.columns

Index(['id', 'timestamp', 'price', 'size', 'exchange'], dtype='object')

In [11]:
df_exchange

Unnamed: 0,id,timestamp,price,size,exchange
0,9905,2020-01-01T00:00:20.4997Z,418.93,60,Cboe EDGX
1,9906,2020-01-01T00:00:38.5731Z,418.70,10,Cboe EDGX
2,9907,2020-01-01T00:09:35.5596Z,418.75,20,Cboe EDGX
3,9908,2020-01-01T00:10:13.0418Z,418.75,115,Cboe EDGX
4,9909,2020-01-01T00:10:28.0684Z,418.75,200,Cboe EDGX
...,...,...,...,...,...
64,12012,2020-01-01T00:30:39.2224Z,418.89,23,NYSE Arca
65,12013,2020-01-01T00:30:53.5212Z,418.89,27,NYSE Arca
66,12014,2020-01-01T00:37:46.7533Z,418.65,20,NYSE Arca
67,12015,2020-01-01T00:58:13.0119Z,418.52,100,NYSE Arca


In [20]:
meta = MetaData()
stock_price_tesla_table = Table(
    "stock_price_tesla", meta, 
    Column("id", Integer, primary_key=True),
    Column("exchange", String, primary_key=True),
    Column("timestamp", String, primary_key=True),
    Column("price", Float),
    Column("size", Integer)
)
meta.create_all(engine) # creates table if it does not exist 


In [22]:
insert_statement = postgresql.insert(stock_price_tesla_table).values(df_exchange.to_dict(orient='records'))
upsert_statement = insert_statement.on_conflict_do_update(
    index_elements=['id', 'exchange', 'timestamp'],
    set_={c.key: c for c in insert_statement.excluded if c.key not in ['id', 'exchange', 'timestamp']})
engine.execute(upsert_statement)

<sqlalchemy.engine.cursor.LegacyCursorResult at 0x7fbe50ec5d00>