In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import window
import pyspark.sql.functions as F
from pyspark.sql.functions import expr, col

In [2]:
spark = SparkSession.builder.config(
    "spark.sql.streaming.schemaInference", True
).getOrCreate()

stream = (
    spark.readStream.format("ws").option("schema", "ticker").load()
)  # we need to pass `option("schema", "ticker")` to get correct channel subscribed

query = (
    stream.select("side", "product_id", "last_size", "best_bid", "best_ask", "time")
    .writeStream.format("console")
    .outputMode("append")
    .option("truncate", "false")
    .start()
)

query.awaitTermination(10)  # Let's wait for 10 seconds.
query.stop()  # Let's stop the query
stream.printSchema()
# spark.stop() # And stop the whole session

root
 |-- type: string (nullable = false)
 |-- trade_id: long (nullable = false)
 |-- sequence: long (nullable = false)
 |-- time: timestamp (nullable = false)
 |-- product_id: string (nullable = false)
 |-- price: double (nullable = false)
 |-- side: string (nullable = false)
 |-- last_size: double (nullable = false)
 |-- best_bid: double (nullable = false)
 |-- best_ask: double (nullable = false)



By running `stream.start()` we start in a separate websocket daemon which streams the results. If there is an error on the front-end side (e.g. an error parsing another Python line) this fact will not be reported to the Spark and the socket will remain open! Remember to close the stream each time using the `stop()` method (in the above example `query.stop()`). If a reference to a query is lost, stop the entire session using the `stop()` method as well (in the above example `spark.stop()`) 


In [3]:
# Panic button - press only if you messed up opening new websocket and lost reference to it

query.stop()
spark.stop()

# Task 1

**Analyse the CoinBase data stream**
 Write a query that prints the average value of a selected parameter (e.g. `price`) in sliding time windows relative to the time of the transaction (column `time`), grouping by exchange relationship (from which currency to which currency - column `product_id`). 


In [4]:
spark = SparkSession.builder.config(
    "spark.sql.streaming.schemaInference", True
).getOrCreate()

stream = spark.readStream.format("ws").option("schema", "ticker").load()

query = (
    stream.select("product_id", "price", "time")
    .groupBy(window("time", "2 minutes", "1 minutes"), "product_id")
    .mean("price")
    .orderBy("window")
)

query = (
    query.writeStream.format("console")
    .outputMode("complete")
    .option("truncate", "false")
    .start()
)

query.awaitTermination(10)
query.stop()

In [5]:
# Batch: 0
# -------------------------------------------
# +------+----------+----------+
# |window|product_id|avg(price)|
# +------+----------+----------+
# +------+----------+----------+

# -------------------------------------------
# Batch: 1
# -------------------------------------------
# +------------------------------------------+----------+------------------+
# |window                                    |product_id|avg(price)        |
# +------------------------------------------+----------+------------------+
# |{2024-11-29 22:51:00, 2024-11-29 22:53:00}|ETH-BTC   |0.03699           |
# |{2024-11-29 22:51:00, 2024-11-29 22:53:00}|BTC-USD   |97452.40555555554 |
# |{2024-11-29 22:51:00, 2024-11-29 22:53:00}|ETH-USD   |3603.3239999999996|
# |{2024-11-29 22:51:00, 2024-11-29 22:53:00}|BTC-EUR   |92146.69          |
# |{2024-11-29 22:52:00, 2024-11-29 22:54:00}|ETH-BTC   |0.03699           |
# |{2024-11-29 22:52:00, 2024-11-29 22:54:00}|BTC-USD   |97452.40555555554 |
# |{2024-11-29 22:52:00, 2024-11-29 22:54:00}|BTC-EUR   |92146.69          |
# |{2024-11-29 22:52:00, 2024-11-29 22:54:00}|ETH-USD   |3603.3239999999996|
# +------------------------------------------+----------+------------------+

# -------------------------------------------
# Batch: 2
# -------------------------------------------
# +------------------------------------------+----------+-----------------+
# |window                                    |product_id|avg(price)       |
# +------------------------------------------+----------+-----------------+
# |{2024-11-29 22:51:00, 2024-11-29 22:53:00}|ETH-BTC   |0.03699          |
# |{2024-11-29 22:51:00, 2024-11-29 22:53:00}|BTC-USD   |97452.04125      |
# |{2024-11-29 22:51:00, 2024-11-29 22:53:00}|ETH-USD   |3603.160833333333|
# |{2024-11-29 22:51:00, 2024-11-29 22:53:00}|BTC-EUR   |92146.69         |
# |{2024-11-29 22:52:00, 2024-11-29 22:54:00}|ETH-BTC   |0.03699          |
# |{2024-11-29 22:52:00, 2024-11-29 22:54:00}|BTC-USD   |97452.04125      |
# |{2024-11-29 22:52:00, 2024-11-29 22:54:00}|BTC-EUR   |92146.69         |
# |{2024-11-29 22:52:00, 2024-11-29 22:54:00}|ETH-USD   |3603.160833333333|
# +------------------------------------------+----------+-----------------+

# Task 2

**Watermarking and delayed data.** 
--
Modify the query from Task 1 to demonstrate the watermarks and delayed data handling mechanism. You should be able to see in the console that you are updating the relevant rows of the result table (update mode), in particular updating previous time windows when delayed data arrives. **For the solution of this exercise, please include an example output and its description explaining the operation of the watermark and delayed data with a concrete example**. 

A script in the `/mock` directory written in [Scala-cli](https://scala-cli.virtuslab.org) can be used for the exercise, which will serve as a controlled source for CoinBase data via Websocket. 

The script can be run using Docker:

```
make image
make run
```

This will create a websocket server at `ws://mock:8025`.

Once the server is up and running, execute the following cell where the query reads data from the created websocket. The script sends sample messages in CoinBase format every 10 seconds:

- The first series sends messages with timestamps 0s, 14s, 7s  
- In the second series, messages with timestamps 15s, 8s, 21s are sent  
- In the third series, messages with timestamps 4s, 17s are sent  

For this data, the time window can be set to an interval of 10 seconds. The script can also be modified to send other data. 

In [6]:
spark = (
    SparkSession.builder.config("spark.sql.streaming.schemaInference", True)
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", True)
    .getOrCreate()
)

stream = (
    spark.readStream.format("ws")
    .option("schema", "ticker")
    .option("url", "ws://mock:8025")
    .load()
)
query = (
    stream.select("product_id", "price", "time")
    .withWatermark("time", "30 seconds")
    .groupBy(F.window("time", "10 seconds", "5 seconds"), "product_id")
    .mean("price")
)

query = (
    query.writeStream.format("console")
    .outputMode("update")
    .option("truncate", "false")
    .start()
)

query.awaitTermination(40)
query.stop()
spark.stop()

In [None]:
# Batch 1
# 2024-10-31 23:59:55, 2024-11-01 00:00:05 -> 0s timestamp update
# 2024-11-01 00:00:00, 2024-11-01 00:00:10 -> 0s, 7s timestamps updates
# 2024-11-01 00:00:05, 2024-11-01 00:00:15 -> 7s, 14s timestamps updates
# 2024-11-01 00:00:10, 2024-11-01 00:00:20 -> 14s timestamp update
# Batch 2
# No updates

# Batch 3
# 2024-10-31 23:59:55, 2024-11-01 00:00:05 -> no updates
# 2024-11-01 00:00:00, 2024-11-01 00:00:10 -> 8s timestamp update
# 2024-11-01 00:00:05, 2024-11-01 00:00:15 -> 8s timestamp update
# 2024-11-01 00:00:10, 2024-11-01 00:00:20 -> 15s timestamp update
# 2024-11-01 00:00:15, 2024-11-01 00:00:25 -> 15s, 21s timestamps updates (new window)
# 2024-11-01 00:00:20, 2024-11-01 00:00:30 -> 21s timestamp update (new window)
# Batch 4
# No updates

# Batch 5
# 2024-10-31 23:59:55, 2024-11-01 00:00:05 -> 4s timestamp update
# 2024-11-01 00:00:00, 2024-11-01 00:00:10 -> 4s timestamp update
# 2024-11-01 00:00:05, 2024-11-01 00:00:15 -> no updates
# 2024-11-01 00:00:10, 2024-11-01 00:00:20 -> 17s timestamp update
# 2024-11-01 00:00:15, 2024-11-01 00:00:25 -> 17s timestamp update
# 2024-11-01 00:00:20, 2024-11-01 00:00:30 -> no updates
# 30s watermark is enough to make all updates apply.

# Task 3

**Combine streams**

 Using stream merging, combine the data from the `ticker` channel (buy transactions `side=‘buy’`) together with the transaction data coming in every second `heartbeat` using the `trade_id` and the corresponding id in the heartbeat channel. Output the combined data stream.

At the time of task creation 15.11.2023, the `heartbeat` channel returns incorrect date data (e.g. `1970-01-04 13:53:57.645339`). Combining with the `ticker` channel yields the correct information. What a great use of join!

In [7]:
# Schema for heartbeat channel
spark = SparkSession.builder.config(
    "spark.sql.streaming.schemaInference", True
).getOrCreate()

stream = spark.readStream.format("ws").option("schema", "heartbeat").load()

query = (
    stream.select("*")
    .writeStream.format("console")
    .outputMode("append")
    .option("truncate", "false")
    .start()
)

query.awaitTermination(10)
query.stop()
stream.printSchema()

root
 |-- type: string (nullable = false)
 |-- sequence: long (nullable = false)
 |-- last_trade_id: long (nullable = false)
 |-- product_id: string (nullable = false)
 |-- time: timestamp (nullable = false)



In [8]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr, col

spark = (
    SparkSession.builder.config("spark.sql.streaming.schemaInference", True)
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", True)
    .getOrCreate()
)

# Streams
ticker_stream = spark.readStream.format("ws").option("schema", "ticker").load()

heartbeat_stream = spark.readStream.format("ws").option("schema", "heartbeat").load()

buy_stream = (
    ticker_stream.filter(col("side") == "buy")
    .select("trade_id", "product_id", "price", "time")
    .withColumnRenamed("time", "buy_time")
    .withColumnRenamed("product_id", "buy_product_id")
)

heartbeat_stream = (
    heartbeat_stream.select(
        col("last_trade_id").alias("trade_id"), "product_id", "time"
    )
    .withColumnRenamed("time", "heartbeat_time")
    .withColumnRenamed("product_id", "heartbeat_product_id")
)

joined_stream = buy_stream.join(heartbeat_stream, on="trade_id").select(
    "trade_id",
    "buy_product_id",
    "heartbeat_product_id",
    "price",
    "buy_time",
)

# Wyjście na konsolę
query = (
    joined_stream.writeStream.format("console")
    .outputMode("append")
    .option("truncate", "false")
    .start()
)

query.awaitTermination(20)
query.stop()

In [9]:
# -------------------------------------------
# Batch: 0
# -------------------------------------------
# +--------+--------------+--------------------+-----+--------+
# |trade_id|buy_product_id|heartbeat_product_id|price|buy_time|
# +--------+--------------+--------------------+-----+--------+
# +--------+--------------+--------------------+-----+--------+

# -------------------------------------------
# Batch: 1
# -------------------------------------------
# +---------+--------------+--------------------+--------+-----------------------+
# |trade_id |buy_product_id|heartbeat_product_id|price   |buy_time               |
# +---------+--------------+--------------------+--------+-----------------------+
# |86708202 |BTC-EUR       |BTC-EUR             |92227.42|2024-11-29 22:48:20.482|
# |86708202 |BTC-EUR       |BTC-EUR             |92227.42|2024-11-29 22:48:20.482|
# |86708202 |BTC-EUR       |BTC-EUR             |92227.42|2024-11-29 22:48:20.482|
# |728722908|BTC-USD       |BTC-USD             |97542.88|2024-11-29 22:48:29.69 |
# |728722893|BTC-USD       |BTC-USD             |97545.46|2024-11-29 22:48:26.898|
# +---------+--------------+--------------------+--------+-----------------------+

# -------------------------------------------
# Batch: 2
# -------------------------------------------
# +---------+--------------+--------------------+--------+-----------------------+
# |trade_id |buy_product_id|heartbeat_product_id|price   |buy_time               |
# +---------+--------------+--------------------+--------+-----------------------+
# |86708202 |BTC-EUR       |BTC-EUR             |92227.42|2024-11-29 22:48:20.482|
# |86708202 |BTC-EUR       |BTC-EUR             |92227.42|2024-11-29 22:48:20.482|
# |86708202 |BTC-EUR       |BTC-EUR             |92227.42|2024-11-29 22:48:20.482|
# |86708202 |BTC-EUR       |BTC-EUR             |92227.42|2024-11-29 22:48:20.482|
# |728722910|BTC-USD       |BTC-USD             |97542.88|2024-11-29 22:48:30.819|
# |574060435|ETH-USD       |ETH-USD             |3604.43 |2024-11-29 22:48:32.094|
# |574060435|ETH-USD       |ETH-USD             |3604.43 |2024-11-29 22:48:32.094|
# |728722921|BTC-USD       |BTC-USD             |97542.88|2024-11-29 22:48:31.761|
# +---------+--------------+--------------------+--------+-----------------------+

# -------------------------------------------
# Batch: 3
# -------------------------------------------
# +---------+--------------+--------------------+--------+-----------------------+
# |trade_id |buy_product_id|heartbeat_product_id|price   |buy_time               |
# +---------+--------------+--------------------+--------+-----------------------+
# |728722934|BTC-USD       |BTC-USD             |97542.78|2024-11-29 22:48:34.841|
# |86708202 |BTC-EUR       |BTC-EUR             |92227.42|2024-11-29 22:48:20.482|
# |86708202 |BTC-EUR       |BTC-EUR             |92227.42|2024-11-29 22:48:20.482|
# |86708202 |BTC-EUR       |BTC-EUR             |92227.42|2024-11-29 22:48:20.482|
# |86708202 |BTC-EUR       |BTC-EUR             |92227.42|2024-11-29 22:48:20.482|
# |728722933|BTC-USD       |BTC-USD             |97542.78|2024-11-29 22:48:33.311|
# |574060435|ETH-USD       |ETH-USD             |3604.43 |2024-11-29 22:48:32.094|
# |728722937|BTC-USD       |BTC-USD             |97542.78|2024-11-29 22:48:35.376|
# |728722937|BTC-USD       |BTC-USD             |97542.78|2024-11-29 22:48:35.376|
# |574060445|ETH-USD       |ETH-USD             |3604.61 |2024-11-29 22:48:36.822|
# |574060436|ETH-USD       |ETH-USD             |3604.61 |2024-11-29 22:48:34.208|
# +---------+--------------+--------------------+--------+-----------------------+

# -------------------------------------------
# Batch: 4
# -------------------------------------------
# +---------+--------------+--------------------+--------+-----------------------+
# |trade_id |buy_product_id|heartbeat_product_id|price   |buy_time               |
# +---------+--------------+--------------------+--------+-----------------------+
# |574060450|ETH-USD       |ETH-USD             |3604.61 |2024-11-29 22:48:39.359|
# |86708202 |BTC-EUR       |BTC-EUR             |92227.42|2024-11-29 22:48:20.482|
# |86708202 |BTC-EUR       |BTC-EUR             |92227.42|2024-11-29 22:48:20.482|
# |86708202 |BTC-EUR       |BTC-EUR             |92227.42|2024-11-29 22:48:20.482|
# |728722949|BTC-USD       |BTC-USD             |97542.88|2024-11-29 22:48:39.178|
# +---------+--------------+--------------------+--------+-----------------------+