# Processing Coinbase API Data with Spark Structured Streaming


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, avg, col
from pyspark.sql.functions import expr
from pyspark.sql import functions as F

In [None]:
spark_session = SparkSession.builder.config("spark.sql.streaming.schemaInference", True).getOrCreate()

websocket_stream = (
    spark_session
    .readStream
    .format("ws")
    .option("schema", "ticker")
    .load()
)

console_query = (
    websocket_stream.select("side", "product_id", "last_size", "best_bid", "best_ask", "time")
    .writeStream
    .format("console")
    .outputMode("append")
    .option("truncate", "false")
    .start()
)

console_query.awaitTermination(10)
console_query.stop()
websocket_stream.printSchema()

root
 |-- type: string (nullable = false)
 |-- trade_id: long (nullable = false)
 |-- sequence: long (nullable = false)
 |-- time: timestamp (nullable = false)
 |-- product_id: string (nullable = false)
 |-- price: double (nullable = false)
 |-- side: string (nullable = false)
 |-- last_size: double (nullable = false)
 |-- best_bid: double (nullable = false)
 |-- best_ask: double (nullable = false)



## time window aggregation

A time window was implemented to analyze the Coinbase data stream. The average 'best_bid' price for each product was calculated over a 5 minute window with results updated every minute. The output was written to an inside memory table for periodic querying and display.


In [None]:
spark = (
    SparkSession
    .builder
    .appName('AverageBestBid')
    .config('spark.sql.streaming.schemaInference', True)
    .config('spark.sql.streaming.forceDeleteTempCheckpointLocation', True)
    .getOrCreate()
)

coinbase_stream = (
    spark
    .readStream
    .format('ws')
    .option('schema', 'ticker')
    .load()
)

five_min_window = window(col('time'), windowDuration='5 minutes', slideDuration='1 minutes')

avg_bid_query = (
    coinbase_stream
    .select('product_id', 'time', 'best_bid')
    .groupBy(five_min_window, 'product_id')
    .agg(F.avg('best_bid').alias('avg_best_bid'))
    .orderBy('window')
    .writeStream
    .format('memory')
    .outputMode('complete')
    .queryName('avg_best_bid_table')
    .option('truncate', 'false')
    .start()
)

avg_bid_query.awaitTermination(12)

for batch_num in range(5):
    print(f'Batch {batch_num}\n')
    avg_best_bid_df = spark.sql('select * from avg_best_bid_table').show(truncate=False)

avg_bid_query.stop()

Batch 0

+------------------------------------------+----------+------------------+
|window                                    |product_id|avg_best_bid      |
+------------------------------------------+----------+------------------+
|{2024-11-27 04:38:00, 2024-11-27 04:43:00}|ETH-USD   |3411.015          |
|{2024-11-27 04:38:00, 2024-11-27 04:43:00}|BTC-EUR   |88393.69          |
|{2024-11-27 04:38:00, 2024-11-27 04:43:00}|ETH-BTC   |0.0367975         |
|{2024-11-27 04:38:00, 2024-11-27 04:43:00}|BTC-USD   |92661.899         |
|{2024-11-27 04:39:00, 2024-11-27 04:44:00}|ETH-USD   |3410.9752083333333|
|{2024-11-27 04:39:00, 2024-11-27 04:44:00}|BTC-USD   |92665.62115384615 |
|{2024-11-27 04:39:00, 2024-11-27 04:44:00}|BTC-EUR   |88393.69          |
|{2024-11-27 04:39:00, 2024-11-27 04:44:00}|ETH-BTC   |0.0367975         |
|{2024-11-27 04:40:00, 2024-11-27 04:45:00}|BTC-USD   |92665.62115384615 |
|{2024-11-27 04:40:00, 2024-11-27 04:45:00}|ETH-BTC   |0.0367975         |
|{2024-11-27 04:

## Late data handling with watermarking


Water marking implementation for data integrity. Aggregating the mean price over a sliding window and late data processing within the defined threshold. Alsoa a mock WebSocket server was used for controlled data simulation.


In [None]:
spark = (
    SparkSession
    .builder
    .config('spark.sql.streaming.schemaInference', True)
    .getOrCreate()
)

mock_stream = (
    spark
    .readStream
    .format('ws')
    .option('schema', 'ticker')
    .option('url', 'ws://mock:8025')
    .load()
)

price_aggregation = (
    mock_stream
    .withWatermark("time", "30 seconds")
    .groupBy(F.window("time", "10 seconds", "5 seconds"), "product_id")
    .mean("price")
)

watermark_query = (
    price_aggregation
    .writeStream
    .format("console")
    .outputMode("update")
    .option("truncate", "false")
    .start()
)

watermark_query.awaitTermination(40)
watermark_query.stop()

## Stream to strream join


A stream to stream join operation. The 'ticker' and 'heartbeat' data streams were joined on `trade_id`, with watermarks applied to correctly manage event timing.


In [None]:
spark = (
    SparkSession
    .builder
    .config('spark.sql.streaming.schemaInference', True)
    .config('spark.sql.streaming.forceDeleteTempCheckpointLocation', True)
    .getOrCreate()
)

ticker_df = (
    spark
    .readStream
    .format('ws')
    .option('schema', 'ticker')
    .load()
)

heartbeat_df = (
    spark
    .readStream
    .format('ws')
    .option('schema', 'heartbeat')
    .load()
)

buy_transactions_df = (
    ticker_df
    .filter(ticker_df.side == 'buy')
    .withColumnRenamed('time', 'buy_time')
    .withWatermark('buy_time', '20 seconds')
)

heartbeat_events_df = (
    heartbeat_df
    .filter(heartbeat_df.type == 'heartbeat')
    .withColumnRenamed('time', 'heartbeat_time')
    .withColumn('last_trade_id_AS_trade_id', col('last_trade_id'))
    .withWatermark('heartbeat_time', '20 seconds')
)

join_expr = buy_transactions_df.trade_id == heartbeat_events_df.last_trade_id_AS_trade_id
join_type = 'inner'

joined_df = buy_transactions_df.join(heartbeat_events_df, join_expr, join_type) \
                               .dropDuplicates(['trade_id'])

join_query = (
    joined_df
    .writeStream
    .format('memory')
    .outputMode('append')
    .queryName('joined_table')
    .start()
)

join_query.awaitTermination(60)

for batch in range(5):
    print(f'Batch {batch}')
    spark.sql('select * from joined_table').show(truncate=False)

join_query.stop()

Batch 0

+---------+------+-----------+-----------------------+----------+--------+----+----------+--------+--------+
|trade_id |type  |sequence   |buy_time               |product_id|price   |side|last_size |best_bid|best_ask|
+---------+------+-----------+-----------------------+----------+--------+----+----------+--------+--------+
|730825133|ticker|93168283072|2024-12-03 18:28:15.806|BTC-USD   |95456.9 |buy |1.142E-5  |95456.89|95456.9 |
|575891113|ticker|71138240754|2024-12-03 18:28:14.805|ETH-USD   |3570.7  |buy |2.85247131|3570.69 |3570.93 |
|730825122|ticker|93168282239|2024-12-03 18:28:13.505|BTC-USD   |95456.9 |buy |4.978E-4  |95456.89|95456.9 |
|575891102|ticker|71138240452|2024-12-03 18:28:14.041|ETH-USD   |3570.94 |buy |0.0033763 |3570.93 |3570.94 |
|34713602 |ticker|7660274523 |2024-12-03 18:27:18.125|ETH-BTC   |0.03736 |buy |1.4404    |0.03735 |0.03737 |
|730825126|ticker|93168282690|2024-12-03 18:28:14.825|BTC-USD   |95456.9 |buy |9.9808E-4 |95456.89|95456.9 |
|575891116