In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [52]:
spark = SparkSession.builder.config("spark.sql.streaming.schemaInference", True).getOrCreate()

stream = spark.\
    readStream.\
    format("ws").\
    option("schema", "ticker").\
    load() # we need to pass `option("schema", "ticker")` to get correct channel subscribed

query = stream.select("side", "product_id", "last_size", "best_bid", "best_ask", "time").\
    writeStream.\
    format("console").\
    outputMode("append").\
    option("truncate", "false").\
    start()

query.awaitTermination(10) # Let's wait for 10 seconds.
query.stop() # Let's stop the query
stream.printSchema()
#spark.stop() # And stop the whole session

root
 |-- type: string (nullable = false)
 |-- trade_id: long (nullable = false)
 |-- sequence: long (nullable = false)
 |-- time: timestamp (nullable = false)
 |-- product_id: string (nullable = false)
 |-- price: double (nullable = false)
 |-- side: string (nullable = false)
 |-- last_size: double (nullable = false)
 |-- best_bid: double (nullable = false)
 |-- best_ask: double (nullable = false)



Uruchamiająć `stream.start()` uruchamiamy w osobnym demonie websocket który streamuje wyniki. Jeżeli wystąpi jakiś błąd po stronie front-endu (np. błąd parsowania kolejnej linijki Pythona) fakt ten nie zostanie zgłoszony do sparka i socket pozostanie otwarty! Należy pamiętać, by zamykać stream za każdym razem używająć metody `stop()` (w powyższym przykładzie `query.stop()`). W przypadku utracenia referencji do zapytania, należy zastopować całą sesję również metodą `stop()` (w powyższym przykładzie `spark.stop()`) 

In [51]:
# Panic button - press only if you messed up opening new websocket and lost reference to it

query.stop()
spark.stop()

# Zadanie 1

**Analiza strumienia danych CoinBase (3p)**. Napisz zapytanie, które wypisuje średnią wartość wybranego parametru (np. `price`) w przesuwnych oknach czasowych względem czasu transakcji (kolumna `time`), grupując po relacji wymiany (z jakiej waluty na jaką walutę - kolumna `product_id`). 

In [49]:
from pyspark.sql.functions import window, avg

spark = (
    SparkSession
    .builder
    .appName('AverageBestBid')
    .config('spark.sql.streaming.schemaInference', True)
    .config('spark.sql.streaming.forceDeleteTempCheckpointLocation', True)
    .getOrCreate()
)

stream = (
    spark
    .readStream
    .format('ws')
    .option('schema', 'ticker')
    .load()
)

query = (
    stream
    .select('product_id', 'time', 'best_bid')
    .groupBy(
        window(timeColumn = 'time', windowDuration = '5 minutes', slideDuration = '1 minutes'), 
        'product_id'
    )
    .agg(avg('best_bid').alias('avg_best_bid'))
    .orderBy('window')
    .writeStream
    .format('memory')
    .outputMode('complete')
    .option('queryName', 'avg_best_bid_table')
    .option('truncate', 'false')
    .start()
)

query.awaitTermination(12)

for batch_num in range(5):
    print(f'Batch {batch_num}\n')
    avg_best_bid_df = spark.sql('select * from avg_best_bid_table').show(truncate = False)

query.stop()

Batch 0

+------------------------------------------+----------+------------------+
|window                                    |product_id|avg_best_bid      |
+------------------------------------------+----------+------------------+
|{2024-11-27 04:38:00, 2024-11-27 04:43:00}|ETH-USD   |3411.015          |
|{2024-11-27 04:38:00, 2024-11-27 04:43:00}|BTC-EUR   |88393.69          |
|{2024-11-27 04:38:00, 2024-11-27 04:43:00}|ETH-BTC   |0.0367975         |
|{2024-11-27 04:38:00, 2024-11-27 04:43:00}|BTC-USD   |92661.899         |
|{2024-11-27 04:39:00, 2024-11-27 04:44:00}|ETH-USD   |3410.9752083333333|
|{2024-11-27 04:39:00, 2024-11-27 04:44:00}|BTC-USD   |92665.62115384615 |
|{2024-11-27 04:39:00, 2024-11-27 04:44:00}|BTC-EUR   |88393.69          |
|{2024-11-27 04:39:00, 2024-11-27 04:44:00}|ETH-BTC   |0.0367975         |
|{2024-11-27 04:40:00, 2024-11-27 04:45:00}|BTC-USD   |92665.62115384615 |
|{2024-11-27 04:40:00, 2024-11-27 04:45:00}|ETH-BTC   |0.0367975         |
|{2024-11-27 04:

# Zadanie 2

**Watermarking i dane opóźnione (3p).** 
Zmodyfikuj zapytanie z zadania 1 tak, aby zademonstować mechanizm znaków wodnych (watermarks) i obsługi danych opóźnionych. W konsoli powinno być widać, że aktualizują się odpowiednie wiersze tabeli wynikowej (tryb update), w szczególności aktualizacja wcześniejszych okien czasowych po przybyciu danych opóźnionych. **Do rozwiązania tego zadania proszę dołączyć przykładowy output i jego opis wyjaśniający na konkretnym przykładzie działanie znaku wodnego i danych opóźnionych**. 

Do ćwiczenia można wykorzystać skrypt w katalogu `/mock` napisany w [Scala-cli](https://scala-cli.virtuslab.org), który posłuży jako kontrolowane źródło danych CoinBase przez Websocket. 

Skrypt można uruchomić wykorzystując Docker:

```
make image
make run
```

Spowoduje to utworzenie websocketowego serwera pod adresem `ws://mock:8025`

Po uruchomieniu serwera należy wykonać poniższą komórkę, w której zapytanie czyta dane z utworzonego websocketa. Skrypt wysyła przykładowe wiadomości w formacie CoinBase co 10 sekund:

- W pierwszej serii wysyłane wiadomości o znacznikach czasowych 0s, 14s, 7s  
- W drugiej serii wysyłane są wiadomości o znacznikach czasowych 15s, 8s, 21s  
- W trzeciej serii wysyłane są wiadomości o znacznikach czasowych 4s, 17s  

Dla tych danych można ustawić okno czasowe na interwał 10 sekund. Skrypt można też zmodyfikować, tak aby wysyłał inne dane. 


In [3]:
import pyspark.sql.functions as F

spark = (
    SparkSession
    .builder
    .config('spark.sql.streaming.schemaInference', True)
    .getOrCreate()
)

stream = (
    spark
    .readStream
    .format('ws')
    .option('schema', 'ticker')
    .option('url', 'ws://mock:8025')
    .load()
)


query = (
    stream
    .select("product_id", "price", "time")
    .withWatermark("time", "30 seconds")
    .groupBy(F.window("time", "10 seconds", "5 seconds"), "product_id").mean("price")
)

query = (
    query
    .writeStream
    .format("console")
    .outputMode("update")
    .option("truncate", "false")
    .start()
)

query.awaitTermination(40)
query.stop()

# Zadanie 3

**Łączenie strumieni (3p)**. Korzystając z łączenia strumieni połącz dane z kanału `ticker` (transakcje kupna `side="buy"`) razem z danymi o transakcjach napływających co sekundę `heartbeat` korzystając z `trade_id` i odpowiedniego id w kanale heartbeat. Wypisz połączony strumień danych.

Na moment tworzenia zadania 15.11.2023 kanał `heartbeat` zwraca błędne dane o dacie (np. `1970-01-04 13:53:57.645339`). Połączenie z kanałem `ticker` pozwala uzyskać poprawne informacje. Cóż za wspaniałe zastosowanie joina!


In [63]:
from pyspark.sql.functions import expr

spark = (
    SparkSession
    .builder
    .config('spark.sql.streaming.schemaInference', True)
    .config('spark.sql.streaming.forceDeleteTempCheckpointLocation', True)
    .getOrCreate() 
)

stream_ticker = (
    spark
    .readStream
    .format('ws')
    .option('schema', 'ticker')
    .load()
)

stream_heartbeat = (
    spark
    .readStream
    .format('ws')
    .option('schema', 'heartbeat')
    .load()
)

buy = (
    stream_ticker
    .filter(stream_ticker.side == 'buy')
    .withColumnRenamed('time', 'buy_time')
)

heartbeat = (
    stream_heartbeat
    .filter(stream_heartbeat.type == 'heartbeat')
    .withColumnRenamed('time', 'heartbeat_time')
    .selectExpr('last_trade_id AS trade_id')
)

joined_streams = (
    buy.join(heartbeat, 'trade_id').dropDuplicates(['trade_id'])
)

query = (
    joined_streams
    .writeStream
    .format('memory')
    .outputMode('append')
    .option('queryName', 'joined_table')
    .option('truncate', 'false')
    .start()
)

query.awaitTermination(60)

for batch_num in range(5):
    print(f'Batch {batch_num}\n')
    joined_df = spark.sql('select * from joined_table').show(truncate = False)

query.stop()

Batch 0

+---------+------+-----------+-----------------------+----------+--------+----+----------+--------+--------+
|trade_id |type  |sequence   |buy_time               |product_id|price   |side|last_size |best_bid|best_ask|
+---------+------+-----------+-----------------------+----------+--------+----+----------+--------+--------+
|730825133|ticker|93168283072|2024-12-03 18:28:15.806|BTC-USD   |95456.9 |buy |1.142E-5  |95456.89|95456.9 |
|575891113|ticker|71138240754|2024-12-03 18:28:14.805|ETH-USD   |3570.7  |buy |2.85247131|3570.69 |3570.93 |
|730825122|ticker|93168282239|2024-12-03 18:28:13.505|BTC-USD   |95456.9 |buy |4.978E-4  |95456.89|95456.9 |
|575891102|ticker|71138240452|2024-12-03 18:28:14.041|ETH-USD   |3570.94 |buy |0.0033763 |3570.93 |3570.94 |
|34713602 |ticker|7660274523 |2024-12-03 18:27:18.125|ETH-BTC   |0.03736 |buy |1.4404    |0.03735 |0.03737 |
|730825126|ticker|93168282690|2024-12-03 18:28:14.825|BTC-USD   |95456.9 |buy |9.9808E-4 |95456.89|95456.9 |
|575891116