In [1]:
from pyspark.sql import SparkSession

In [2]:
from pyspark.sql.functions import window, sum, avg, col, to_timestamp, expr

In [3]:
spark = SparkSession.builder.config("spark.sql.streaming.schemaInference", True).getOrCreate()

stream = spark.\
    readStream.\
    format("ws").\
    option("schema", "ticker").\
    load() # we need to pass `option("schema", "ticker")` to get correct channel subscribed

query = stream.select("side", "product_id", "last_size", "best_bid", "best_ask", "time").\
    writeStream.\
    format("console").\
    outputMode("append").\
    option("truncate", "false").\
    start()

query.awaitTermination(10) # 2t's wait for 10 seconds.
query.stop() # Let's stop the query
# stream.printSchema()
#spark.stop() # And stop the whole session

In [4]:
stream.printSchema()

root
 |-- type: string (nullable = false)
 |-- trade_id: long (nullable = false)
 |-- sequence: long (nullable = false)
 |-- time: timestamp (nullable = false)
 |-- product_id: string (nullable = false)
 |-- price: double (nullable = false)
 |-- side: string (nullable = false)
 |-- last_size: double (nullable = false)
 |-- best_bid: double (nullable = false)
 |-- best_ask: double (nullable = false)



Uruchamiająć `stream.start()` uruchamiamy w osobnym demonie websocket który streamuje wyniki. Jeżeli wystąpi jakiś błąd po stronie front-endu (np. błąd parsowania kolejnej linijki Pythona) fakt ten nie zostanie zgłoszony do sparka i socket pozostanie otwarty! Należy pamiętać, by zamykać stream za każdym razem używająć metody `stop()` (w powyższym przykładzie `query.stop()`). W przypadku utracenia referencji do zapytania, należy zastopować całą sesję również metodą `stop()` (w powyższym przykładzie `spark.stop()`) 

In [5]:
# Panic button - press only if you messed up opening new websocket and lost reference to it

query.stop()
spark.stop()

# Zadanie 1

**Analiza strumienia danych CoinBase (3p)**. Napisz zapytanie, które wypisuje średnią wartość wybranego parametru (np. `price`) w przesuwnych oknach czasowych względem czasu transakcji (kolumna `time`), grupując po relacji wymiany (z jakiej waluty na jaką walutę - kolumna `product_id`). 

In [5]:
spark = SparkSession.builder.\
    config("spark.sql.streaming.schemaInference", True).\
    config("spark.sql.streaming.forceDeleteTempCheckpointLocation", True).getOrCreate()  

stream = spark.readStream.\
    format("ws").\
    option("schema", "ticker").\
    load()


WINDOW = '5'


query = stream.select("time", "product_id", "price").\
    groupBy(window("time", "{} seconds".format(WINDOW)), "product_id").\
    agg(avg("price").alias("mean {}s".format(WINDOW))).\
    writeStream.\
    outputMode("complete").\
    format("console").\
    option("truncate", "false").\
    start()

query.awaitTermination(20) 
query.stop() 
stream.printSchema()

root
 |-- type: string (nullable = false)
 |-- trade_id: long (nullable = false)
 |-- sequence: long (nullable = false)
 |-- time: timestamp (nullable = false)
 |-- product_id: string (nullable = false)
 |-- price: double (nullable = false)
 |-- side: string (nullable = false)
 |-- last_size: double (nullable = false)
 |-- best_bid: double (nullable = false)
 |-- best_ask: double (nullable = false)



# Zadanie 2

**Watermarking i dane opóźnione (3p).** 
Zmodyfikuj zapytanie z zadania 1 tak, aby zademonstować mechanizm znaków wodnych (watermarks) i obsługi danych opóźnionych. W konsoli powinno być widać, że aktualizują się odpowiednie wiersze tabeli wynikowej (tryb update), w szczególności aktualizacja wcześniejszych okien czasowych po przybyciu danych opóźnionych. **Do rozwiązania tego zadania proszę dołączyć przykładowy output i jego opis wyjaśniający na konkretnym przykładzie działanie znaku wodnego i danych opóźnionych**. 

Do ćwiczenia można wykorzystać skrypt w katalogu `/mock` napisany w [Scala-cli](https://scala-cli.virtuslab.org), który posłuży jako kontrolowane źródło danych CoinBase przez Websocket. 

Skrypt można uruchomić wykorzystując Docker:

```
make image
make run
```

Spowoduje to utworzenie websocketowego serwera pod adresem `ws://mock:8025`

Po uruchomieniu serwera należy wykonać poniższą komórkę, w której zapytanie czyta dane z utworzonego websocketa. Skrypt wysyła przykładowe wiadomości w formacie CoinBase co 10 sekund:

- W pierwszej serii wysyłane wiadomości o znacznikach czasowych 0s, 14s, 7s  
- W drugiej serii wysyłane są wiadomości o znacznikach czasowych 15s, 8s, 21s  
- W trzeciej serii wysyłane są wiadomości o znacznikach czasowych 4s, 17s  

Dla tych danych można ustawić okno czasowe na interwał 10 sekund. Skrypt można też zmodyfikować, tak aby wysyłał inne dane. 


In [7]:
spark = SparkSession.builder.config("spark.sql.streaming.schemaInference", True).getOrCreate()

# Note url option!
stream = spark.readStream.\
    format("ws").\
    option("schema", "ticker").\
    option("url", "ws://mock:8025").load() # we pass explicit url option to subscribe to our mock service


WINDOW = '5'


query = stream.select("time", "product_id", "price").\
    withWatermark("time", "30 seconds").\
    groupBy(window("time", "{} seconds".format(WINDOW)), "product_id").\
    agg(avg("price").alias("mean {}s".format(WINDOW))).\
    writeStream.\
    outputMode("complete").\
    format("console").\
    option("truncate", "false").\
    start()

query.awaitTermination(60) 
query.stop() 
stream.printSchema()

21/12/09 22:05:03 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-a2850f72-e841-4142-9e44-1ee91737fb84. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
21/12/09 22:05:03 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+------+----------+-------+
|window|product_id|mean 5s|
+------+----------+-------+
+------+----------+-------+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+----------+------------------+
|window                                    |product_id|mean 5s           |
+------------------------------------------+----------+------------------+
|{2021-11-01 00:00:05, 2021-11-01 00:00:10}|ETH-USD   |869.5649306044588 |
|{2021-11-01 00:00:00, 2021-11-01 00:00:05}|ETH-USD   |779.7320652551889 |
|{2021-11-01 00:00:10, 2021-11-01 00:00:15}|ETH-USD   |20.550742869765216|
+------------------------------------------+----------+------------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+----------+------------------+
|window                                    |product_id|mean 5s           |
+------------------------------------------+----------+------------------+
|{2021-11-01 00:00:05, 2021-11-01 00:00:10}|ETH-USD   |

# Zadanie 3

**Łączenie strumieni (3p)**. Rozdziel sztucznie dane CoinBase z kanału `ticker` na dwa strumienie (wykorzystując filtrowanie subskrypcji): jeden strumień dla `side="sell"`, drugi dla `side="buy"`. Następnie stwórz zapytanie, które łączy te strumienie i wypisuje transakcje dla danego `product_id`, które występowały po sobie w ciągu 1s.

In [8]:
spark = SparkSession.\
    builder.\
    config("spark.sql.streaming.schemaInference", True).\
    config("spark.sql.streaming.forceDeleteTempCheckpointLocation", True).\
    getOrCreate()  


stream = spark.readStream.\
    format("ws").\
    option("schema", "ticker").\
    load()


stream_sell = stream.select("side", "time", "product_id", "price").\
    filter(col("side")=="sell")



stream_buy = stream.select("side", "time", "product_id", "price").\
    filter(col("side")=="buy")


# query = stream_sell.\
#     writeStream.\
#     outputMode("append").\
#     format("console").\
#     option("truncate", "false").\
#     start()

query = stream_sell.\
    alias("stream_sell").\
    join(stream_buy.alias("stream_buy"),
        expr("""
        stream_sell.product_id = stream_buy.product_id AND
        (stream_sell.time - stream_buy.time) <= interval 1 seconds AND
        (stream_buy.time - stream_sell.time) <= interval 1 seconds
        """)).\
    writeStream.\
    outputMode("append").\
    format("console").\
    option("truncate", "false").\
    start()



query.awaitTermination(20) 
query.stop() 

21/12/09 22:06:05 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-e2be3795-aa89-4d14-b319-b0a2144362fb. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
21/12/09 22:06:05 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+----+----+----------+-----+----+----+----------+-----+
|side|time|product_id|price|side|time|product_id|price|
+----+----+----------+-----+----+----+----------+-----+
+----+----+----------+-----+----+----+----------+-----+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+----+-----------------------+----------+-------+----+-----------------------+----------+-------+
|side|time                   |product_id|price  |side|time                   |product_id|price  |
+----+-----------------------+----------+-------+----+-----------------------+----------+-------+
|sell|2021-12-09 22:06:06.02 |ETH-USD   |4198.01|buy |2021-12-09 22:06:06.062|ETH-USD   |4197.81|
|sell|2021-12-09 22:06:06.394|ETH-USD   |4197.8 |buy |2021-12-09 22:06:06.062|ETH-USD   |4197.81|
|sell|2021-12-09 22:06:07.97 |ETH-USD   |4198.14|buy |2021-12-09 22:06:07.644|ETH-USD   |4198.11|
|sell|2021-12-09 22:06:07.97 |ETH-USD   |4198.14|buy |2021-12-09 22:06:07.644|ETH-USD   |4198.11|
|sell|2021-12-09 22:06:07.97 |ETH-USD   |4198.11|buy |2021-12-09 22:06:07.644|ETH-USD   |4198.11|
|sell|2021-12-09 22:06:07.97 |ETH-USD   |4198.09|buy |2021-12-09 22:06:07.644|ETH-USD   |4198.11|
|sell|2021-12-09 22:0

                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+----+-----------------------+----------+-------+----+-----------------------+----------+-------+
|side|time                   |product_id|price  |side|time                   |product_id|price  |
+----+-----------------------+----------+-------+----+-----------------------+----------+-------+
|sell|2021-12-09 22:06:08.264|ETH-USD   |4198.15|buy |2021-12-09 22:06:07.644|ETH-USD   |4198.11|
|sell|2021-12-09 22:06:08.264|ETH-USD   |4198.15|buy |2021-12-09 22:06:07.68 |ETH-USD   |4198.11|
|sell|2021-12-09 22:06:08.264|ETH-USD   |4198.15|buy |2021-12-09 22:06:07.754|ETH-USD   |4198.11|
|sell|2021-12-09 22:06:08.264|ETH-USD   |4198.15|buy |2021-12-09 22:06:07.754|ETH-USD   |4198.14|
|sell|2021-12-09 22:06:08.264|ETH-USD   |4198.15|buy |2021-12-09 22:06:07.97 |ETH-USD   |4198.25|
|sell|2021-12-09 22:06:08.264|ETH-USD   |4198.15|buy |2021-12-09 22:06:07.758|ETH-USD   |4198.14|
|sell|2021-12-09 22:0

                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+----+-----------------------+----------+-------+----+-----------------------+----------+-------+
|side|time                   |product_id|price  |side|time                   |product_id|price  |
+----+-----------------------+----------+-------+----+-----------------------+----------+-------+
|sell|2021-12-09 22:06:11.573|ETH-BTC   |0.08684|buy |2021-12-09 22:06:10.673|ETH-BTC   |0.08685|
|sell|2021-12-09 22:06:11.573|ETH-BTC   |0.08684|buy |2021-12-09 22:06:10.673|ETH-BTC   |0.08685|
|sell|2021-12-09 22:06:11.573|ETH-BTC   |0.08684|buy |2021-12-09 22:06:10.673|ETH-BTC   |0.08685|
|sell|2021-12-09 22:06:11.573|ETH-BTC   |0.08684|buy |2021-12-09 22:06:10.673|ETH-BTC   |0.08686|
|sell|2021-12-09 22:06:11.573|ETH-BTC   |0.08684|buy |2021-12-09 22:06:10.673|ETH-BTC   |0.08685|
|sell|2021-12-09 22:06:11.573|ETH-BTC   |0.08684|buy |2021-12-09 22:06:10.673|ETH-BTC   |0.08685|
|sell|2021-12-09 22:0

                                                                                

-------------------------------------------
Batch: 4
-------------------------------------------
+----+-----------------------+----------+-------+----+-----------------------+----------+-------+
|side|time                   |product_id|price  |side|time                   |product_id|price  |
+----+-----------------------+----------+-------+----+-----------------------+----------+-------+
|sell|2021-12-09 22:06:13.736|ETH-USD   |4195.79|buy |2021-12-09 22:06:12.875|ETH-USD   |4196.66|
|sell|2021-12-09 22:06:13.792|ETH-USD   |4195.79|buy |2021-12-09 22:06:12.875|ETH-USD   |4196.66|
|sell|2021-12-09 22:06:13.792|ETH-USD   |4195.69|buy |2021-12-09 22:06:12.875|ETH-USD   |4196.66|
|sell|2021-12-09 22:06:13.841|ETH-USD   |4195.69|buy |2021-12-09 22:06:12.875|ETH-USD   |4196.66|
|sell|2021-12-09 22:06:13.841|ETH-USD   |4195.38|buy |2021-12-09 22:06:12.875|ETH-USD   |4196.66|
|sell|2021-12-09 22:06:13.033|ETH-USD   |4196.11|buy |2021-12-09 22:06:13.813|ETH-USD   |4195.8 |
|sell|2021-12-09 22:0

                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+----+-----------------------+----------+-------+----+-----------------------+----------+-------+
|side|time                   |product_id|price  |side|time                   |product_id|price  |
+----+-----------------------+----------+-------+----+-----------------------+----------+-------+
|sell|2021-12-09 22:06:18.357|ETH-USD   |4196.4 |buy |2021-12-09 22:06:18.686|ETH-USD   |4196.34|
|sell|2021-12-09 22:06:18.365|ETH-USD   |4195.01|buy |2021-12-09 22:06:18.686|ETH-USD   |4196.34|
|sell|2021-12-09 22:06:18.365|ETH-USD   |4193.84|buy |2021-12-09 22:06:18.686|ETH-USD   |4196.34|
|sell|2021-12-09 22:06:18.361|ETH-USD   |4196.37|buy |2021-12-09 22:06:18.686|ETH-USD   |4196.34|
|sell|2021-12-09 22:06:18.365|ETH-USD   |4195.0 |buy |2021-12-09 22:06:18.686|ETH-USD   |4196.34|
|sell|2021-12-09 22:06:18.365|ETH-USD   |4193.84|buy |2021-12-09 22:06:18.686|ETH-USD   |4196.34|
|sell|2021-12-09 22:0

21/12/09 22:06:25 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@4b5d784d is aborting.
21/12/09 22:06:25 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@4b5d784d aborted.
21/12/09 22:06:25 WARN Shell: Interrupted while joining on: Thread[Thread-106432,5,]
java.lang.InterruptedException
	at java.base/java.lang.Object.wait(Native Method)
	at java.base/java.lang.Thread.join(Thread.java:1300)
	at java.base/java.lang.Thread.join(Thread.java:1375)
	at org.apache.hadoop.util.Shell.joinThread(Shell.java:1043)
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:1003)
	at org.apache.hadoop.util.Shell.run(Shell.java:901)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1213)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1307)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1289)
	at org.apache.hadoop.fs.Fi