In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.sql.streaming.schemaInference", True).getOrCreate()

stream = spark.readStream.format("ws").option("schema", "ticker").load() # we need to pass `option("schema", "ticker")` to get correct channel subscribed
query = stream.select("side", "product_id", "last_size", "best_bid", "best_ask", "time").writeStream.format("console").outputMode("append").option("truncate", "false").start()

query.awaitTermination(3) # Let's wait for 10 seconds.
query.stop() # Let's stop the query
stream.printSchema()
spark.stop() # And stop the whole session

Uruchamiająć `stream.start()` uruchamiamy w osobnym demonie websocket który streamuje wyniki. Jeżeli wystąpi jakiś błąd po stronie front-endu (np. błąd parsowania kolejnej linijki Pythona) fakt ten nie zostanie zgłoszony do sparka i socket pozostanie otwarty! Nalezy pamiętać, by zamykać stream za każdym razem używająć metody `stop()` (w powyższym przykładzie `query.stop()`). W przypadku utracenia referencji do zapytania, nalezy zastopować całą sesję również metodą `stop()` (w powyższym przykładzie `spark.stop()`) 

In [None]:
# Panic button - press only if you messed up opening new websocket and lost reference to it

query.stop()
spark.stop()

# Zadanie 1

Analiza danych CoinBase strumieniowanych przez WebSocket (3p). Napisz zapytanie, które wypisuje średnią wartość wybranego parametru (np. `price`) w przesuwnych oknach czasowych względem czasu transakcji (kolumna `time`), grupując po relacji wymiany (z jakiej waluty na jaką walutę - kolumna `product_id`). 

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.sql.streaming.schemaInference", True).config("spark.sql.streaming.forceDeleteTempCheckpointLocation", True).getOrCreate()  
stream = spark.readStream.format("ws").option("schema", "ticker").load()

# your code...

# Zadanie 2

Watermarking i dane opóźnione (3p). Zmodyfikuj zapytanie z zadania 1 tak, aby zademonstować mechanizm znaków wodnych (watermarks) i obsługi danych opóźnionych. W konsoli powinno być widać, że aktualizują się odpowiednie wiersze tabeli wynikowej (tryb update).

Do ćwiczenia można wykorzystać skrypt w katalogu `/mock` napisany w [Scala-cli](https://scala-cli.virtuslab.org/)

Skrypt można uruchomić wykorzystując dockera

```
make image
make run
```

mock service dostępny będzie pod adresem `ws://mock:8025`

Skrypt należy uruchomić przed wykonaniem poniższej komórki, uruchomi on server websocketowy. Po uruchomieniu uruchamiamy komórkę i czekamy aż w skrypcie pojawi się informacja `Opened connection successfully!`. Skrypt wysyła kolejne porcje wiadomości po wciśnięciu ENTER

W pierwszej serii polecą wiadomości o offsetach: 0s, 14s, 7s  
W drugiej serii polecą wiadomości o offsetach: 15s, 8s, 21s  
W trzeciej serii polecą wiadomości o offsetach: 4s, 17s  

Zadanie wygodnie prezentuje się na watermarku i oknie ustawionym na interwał 10 sekund.


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.sql.streaming.schemaInference", True).getOrCreate()

# Note url option!
stream = spark.readStream.format("ws").option("schema", "ticker").option("url", "ws://mock:8025").load() # we pass explicit url option to subscribe to our mock service

# your code here...

# Zadanie 3

Łączenie strumieni (3p). Rozdziel sztucznie dane CoinBase na dwa strumienie (wykorzystując filtrowanie subskrypcji): stwórz dwie kolejki WS strumieniujące dane z kanału `ticker`, jedna dla jednego wybranego parametru `product_id` (np. `"ETH-BTC"`), a druga dla innego (np. `"ETH-USD"`). Następnie stwórz zapytanie, które łączy te strumienie i wypisuje średnią cenę obu parametrów w tych samych oknach czasowych.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.sql.streaming.schemaInference", True).config("spark.sql.streaming.forceDeleteTempCheckpointLocation", True).getOrCreate()  
stream = spark.readStream.format("ws").option("schema", "ticker").load()

# your code...