##### Course: Data Gathering & Warehousing
##### Instructor: Carl Chatterton 
##### Lab: #2
---

### Part 2 - Stream Processing 

##### Learning Objectives:
- Learn how to set up a Spark Structured Streaming pipeline
- Learn how to consume data from websockets using the Alpaca SDK or request library in python
- Learn how to develop a schema for data in a DataLake (Amazon S3) 
- Use the `bronze, silver, gold pattern` to prepare aggregate tables using Delta Lake and Spark Structured Streaming

##### Setup Instructions:
1. Create a Free (Paper Trading) Account at [Alpaca Markets](https://alpaca.markets/)
2. Store the API credentials as environment variables in Databricks in the Spark cluster configs
3. Install the alpaca-trade-api library from pypi
3. Run this notebook to stream live Crypto market data (from Coinbase) to S3 as JSON
4. Create a new notebook to called `lab-2-stream-processing-<YOUR NAME>` for your submission, this new notebook will be your ELT script to complete the pattern by using Structured Streaming and Delta to process `bronze -> silver -> gold`

In [0]:
import json
import time
from os import getenv
import asyncio
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import expr
from alpaca_trade_api.stream import Stream
from delta.tables import DeltaTable
import nest_asyncio

nest_asyncio.apply() # This line is needed when working with Juypter Notebooks 

In [0]:
spark = SparkSession.builder.appName('DSSA').getOrCreate()
spark.getActiveSession()

In [0]:
alpaca_api_key = getenv("APCA_API_KEY_ID")
alpaca_secret_key = getenv("APCA_API_SECRET_KEY")
alpaca_base_url = getenv("APCA_API_BASE_URL")

In [0]:
# Function that tries to repair connections to the websocket should a connection error occur
def run_connection(conn):
  # Make sure we have an event loop, if not create one
  loop = asyncio.get_event_loop()
  loop.set_debug(True)
  
  try:
      conn.run()
  except KeyboardInterrupt:
      print("Interrupted execution by user")
      loop.run_until_complete(conn.stop_ws())
      exit(0)
  except Exception as e:
      print(f'Exception from websocket connection: {e}')
  finally:
      print("Trying to re-establish connection")
      time.sleep(3)
      run_connection(conn)

async def on_crypto_bar(bar):
  # Ignore Bars data that is not exchanged on Coinbase
  if bar.exchange != 'CBSE':
      return
  # Extract the values from the Alpaca responce class and add them to a new dict
  values = bar.__dict__.pop('_raw')
  bars = dict()
  bars['symbol'] = values.pop('symbol')
  bars['timestamp'] = values.pop('timestamp')
  bars['exchange'] = values.pop('exchange')
  bars['open'] = values.pop('open')
  bars['high'] = values.pop('high')
  bars['low'] = values.pop('low')
  bars['close'] = values.pop('close')
  bars['volume'] = values.pop('volume')
  bars['trade_count'] = values.pop('trade_count')
  bars['vwap'] = values.pop('vwap')
  
  df = spark.read.json(sc.parallelize([bars]))
  df.coalesce(1).write.mode('append').json('alpaca/crypto/btc/bars')
  #print(dbutils.fs.ls('./alpaca/crypto/btc/bars'))

if __name__ == '__main__':
    conn = Stream(alpaca_api_key, 
                  alpaca_secret_key,
                  data_feed='iex')

    conn.subscribe_crypto_bars(on_crypto_bar, "BTCUSD")

    run_connection(conn)

data websocket error, restarting connection: code = 1006 (connection closed abnormally [internal]), no reason
data websocket error, restarting connection: code = 1006 (connection closed abnormally [internal]), no reason
data websocket error, restarting connection: code = 1006 (connection closed abnormally [internal]), no reason
error during websocket communication: An error occurred while calling o2508.json.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:560)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:324)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:198)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:126)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.sca

In [0]:
#dbutils.fs.rm('alpaca/crypto/btc/bars', True) # Removes Existing Bronze Data
#dbutils.fs.rm('/alpaca/crypto/btc/silver_bars/', True) # Removes Existing Silver Data
# dbutils.fs.rm('/alpaca/crypto/btc/gold_bars/', True) # Remove Existing Gold Data

