In [0]:
#importing libraries
import json
from os import getenv
from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import expr

#not used in batch processing
#from alpaca_trade_api.stream import Stream

from alpaca_trade_api.rest import REST, TimeFrame, TimeFrameUnit
from delta.tables import DeltaTable

In [0]:
#creating Spark session
spark = SparkSession.builder.appName('DSSA5102').getOrCreate()
spark.getActiveSession()

In [0]:
#activating environmentals variables that are attached to the cluster
alpaca_api_key = getenv("APCA_API_KEY_ID")
alpaca_secret_key = getenv("APCA_API_SECRET_KEY")
alpaca_base_url = getenv("APCA_API_BASE_URL")

#similar to an HTTP call
api = REST(alpaca_api_key, alpaca_secret_key, api_version='v2')

In [0]:
#getting count of all stock tickers (this is the bronze portion of the objective)

# Create an empty list for results
data = []

# Use the API to get a list of all available stock tickers
assets = api.list_assets()

# Traverse the list of tickers, applying a filter for only those that are tradable
# Format the results into a Spark Row(), then append the result to the list called data
for ticker in assets:
  if ticker.tradable == True:
     data.append(
       Row(
         id= ticker.id,
         easy_to_borrow= ticker.easy_to_borrow,
         exchange= ticker.exchange,
         fractionable= ticker.fractionable, 
         marginable= ticker.marginable,
         name= ticker.name,
         shortable= ticker.shortable,
         status= ticker.status,
         symbol= ticker.symbol,
         tradable = ticker.tradable)
     )

# Create a Spark Dataframe using the list called data
tradable_df = spark.createDataFrame(data)

# Display a sample of the results and print total rows
display(tradable_df)
print("Total Number of Tradable Ticker Symbols",tradable_df.count())

In [0]:
#delta does ACID guarantees, tracks changes

# DBTITLE 1,Write Tradable Stock Assets to a Delta Table & Add the Delta Table to the Hive Metastore
# Get or create target delta table
if (DeltaTable.isDeltaTable(spark, "/alpaca/assets/")):
    # Load delta table
    deltaTable = DeltaTable.forPath(spark, "/alpaca/assets/")
    # Merge new or changed data into target
    deltaTable.alias('delta') \
        .merge(
        source=tradable_df.alias('updates'),
        condition=expr(f"delta.id = updates.id")
    ) \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
    # Clean up old files
    deltaTable.vacuum()

else:
    tradable_df \
        .write \
        .format('delta') \
        .save("/alpaca/assets/")

# Create table in hive metastore
if not spark._jsparkSession.catalog().tableExists("alpaca.assets"):
    spark.sql(f"CREATE DATABASE IF NOT EXISTS alpaca")
    spark.sql(f"CREATE TABLE IF NOT EXISTS alpaca.assets USING DELTA LOCATION '/alpaca/assets/'")

In [0]:
# DBTITLE 1,Get the Historical Hourly Price Data for the last Year for each Tradable Stock
# The Alpaca API serve polling request as paginated results with a limit of 10000 rows
# A single year contains 8760 hours keeping us below request limits per ticker
# The Free Alpaca APIs do not support more 200 calls per minute, so executing without concurrency will take some time to run
from pyspark.sql.types import StructType,StructField, StringType, TimestampType, DecimalType, IntegerType

def process_bars(bar, symbol):
  values = bar.__dict__.pop('_raw')
  bar = dict()
  bar['symbol'] = symbol
  bar['timestamp'] = values.pop('t')
  bar['open'] = values.pop('o')
  bar['high'] = values.pop('h')
  bar['low'] = values.pop('l')
  bar['close'] = values.pop('c')
  bar['volume'] = values.pop('v')
  bar['trade_count'] = values.pop('n')
  bar['vwap'] = values.pop('vw')
  json.dumps(bar)
  return bar

bar_list = []

# If we do this for every stock we pulled from the apis, the dataset will grow to ~14mil rows
# so we will work with a sample for our lab. This will prevent us from having to make concurrent API calls
# and consuming more resources than what is available using single node spark
seed = 10
sample = tradable_df.sample(False, .01, seed)

for row in range(sample.count()):
  symbol = sample.collect()[row][-2]
  bar_iter = api.get_bars_iter(symbol, TimeFrame.Hour, start="2020-01-01", end="2021-01-01")
  for bar in bar_iter:
    bar = process_bars(bar, symbol)
    bar_list.append(bar)
  
prices_df = spark.read.json(sc.parallelize(bar_list))
prices_df.count() 
display(prices_df) 

In [0]:
# DBTITLE 1,Write Sampled Stock Assets Hourly Market Data to a Delta Table & Add the Delta Table to the Hive Metastore
# Get or create target delta table
if (DeltaTable.isDeltaTable(spark, "/alpaca/bars/")):
    # Load delta table
    deltaTable = DeltaTable.forPath(spark, "/alpaca/bars/")
    # Merge new or changed data into target
    deltaTable.alias('delta') \
        .merge(
        source=prices_df.alias('updates'),
        condition=expr(f"delta.symbol = updates.symbol and delta.timestamp=updates.timestamp")
    ) \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
    # Clean up old files
    deltaTable.vacuum()

else:
      prices_df \
        .write \
        .partitionBy("symbol") \
        .format('delta') \
        .save("/alpaca/bars/")

# Create table in hive metastore
if not spark._jsparkSession.catalog().tableExists("alpaca.bars"):
    spark.sql(f"CREATE DATABASE IF NOT EXISTS alpaca")
    spark.sql(f"CREATE TABLE IF NOT EXISTS alpaca.bars USING DELTA LOCATION '/alpaca/bars/'")

In [0]:
#importing libraries for batch processing
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as f
from delta.tables import DeltaTable

In [0]:
# DBTITLE 1,Get or Create a New Spark Session
spark = SparkSession.builder.appName('alpaca-batch').getOrCreate()
spark.getActiveSession()

In [0]:
# DBTITLE 1,Read the Alpaca Bar (Silver) Delta Table 
bars = spark.table('alpaca.bars')
display(bars)

In [0]:
bars = bars \
  .withColumn("year", f.year(f.col('timestamp'))) \
  .withColumn("month", f.month(f.col('timestamp'))) \
  .withColumn("day", f.dayofmonth(f.col('timestamp'))) \
  .withColumn("dayofweek", f.dayofweek(f.col('timestamp'))) \
  .withColumn('dayofyear', f.dayofyear(f.col('timestamp'))) \
  .withColumn('hour', f.hour('timestamp')) \

display(bars)

In [0]:
# DBTITLE 1,Lag Based Features
spec = Window.partitionBy("symbol").orderBy('timestamp')

lags = [i for i in range(1,6)]
columns = ['close', 'high', 'low', 'open', 'trade_count', 'volume']

for col in columns:
  for l in lags:
    bars = bars \
        .withColumn(f'{col}_tminus_{l}', f.lag(f.col(col), offset=l).over(spec))

display(bars)

In [0]:
# DBTITLE 1,Analytic Function Based Features
bars = bars.withColumn("cume_dist",f.cume_dist().over(spec))

display(bars)

In [0]:
# DBTITLE 1,Rolling and Expanding Window Based Features
spec = Window.partitionBy("symbol").orderBy('timestamp')
rolling = Window.partitionBy("symbol").orderBy('timestamp').rowsBetween(-3, 0)

columns = ['close', 'high', 'low', 'open', 'trade_count', 'volume']

for col in columns:
  bars = bars \
    .withColumn(f"{col}_avg", f.avg(f.col(col)).over(spec)) \
    .withColumn(f"{col}_min", f.min(f.col(col)).over(spec)) \
    .withColumn(f"{col}_max", f.max(f.col(col)).over(spec)) \
    .withColumn(f"{col}_3h_rolling_avg", f.avg(f.col(col)).over(rolling)) \
    .withColumn(f"{col}_3h_rolling_min", f.min(f.col(col)).over(rolling)) \
    .withColumn(f"{col}_3h_rolling_max", f.max(f.col(col)).over(rolling)) 

display(bars)

In [0]:
# DBTITLE 1,Target Feature
bars = bars.withColumn('target',f.lead(f.col('close'), offset=6).over(spec))

display(bars)

In [0]:
# DBTITLE 1,Write Feature Table (Gold) to Delta Lake
# Get or create target delta table
if (DeltaTable.isDeltaTable(spark, "/feature_store/stock_prices")):
    # Load delta table
    deltaTable = DeltaTable.forPath(spark, "/feature_store/stock_prices")
    # Merge new or changed data into target
    deltaTable.alias('delta') \
        .merge(
        source=bars.alias('updates'),
        condition=expr(f"delta.symbol = updates.symbol and delta.timestamp=updates.timestamp")
    ) \
        .whenMatchedUpdateAll() \
        .whenNotMatchedInsertAll() \
        .execute()
    # Clean up old files
    deltaTable.vacuum()

else:
      bars \
        .write \
        .partitionBy("symbol") \
        .format('delta') \
        .save("/feature_store/stock_prices")

# Create table in hive metastore
if not spark._jsparkSession.catalog().tableExists("feature_store.stock_prices"):
    spark.sql(f"CREATE DATABASE IF NOT EXISTS feature_store")
    spark.sql(f"CREATE TABLE IF NOT EXISTS feature_store.stock_prices USING DELTA LOCATION '/feature_store/stock_prices'")