In [6]:
import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, window, sum, avg, col, to_timestamp, expr
from pyspark.sql.types import StructType, TimestampType
from pyspark.sql.streaming import DataStreamReader


In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.sql.streaming.schemaInference", True).getOrCreate()

stream = spark.\
    readStream.\
    format("ws").\
    option("schema", "ticker").\
    load() # we need to pass `option("schema", "ticker")` to get correct channel subscribed

query = stream.select("side", "product_id", "last_size", "best_bid", "best_ask", "time").\
    writeStream.\
    format("console").\
    outputMode("append").\
    option("truncate", "false").\
    start()

query.awaitTermination(10) # Let's wait for 10 seconds.
query.stop() # Let's stop the query
stream.printSchema()
#spark.stop() # And stop the whole session


21/11/28 15:58:24 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-c7733ce3-3ddb-413a-b20c-e491c46debcb. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
21/11/28 15:58:24 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+----+----------+---------+--------+--------+----+
|side|product_id|last_size|best_bid|best_ask|time|
+----+----------+---------+--------+--------+----+
+----+----------+---------+--------+--------+----+

-------------------------------------------
Batch: 1
-------------------------------------------
+----+----------+----------+--------+--------+-----------------------+
|side|product_id|last_size |best_bid|best_ask|time                   |
+----+----------+----------+--------+--------+-----------------------+
|buy |ETH-BTC   |0.24504561|0.07505 |0.07509 |2021-11-28 15:58:26.027|
|buy |ETH-USD   |0.0175555 |4081.44 |4081.46 |2021-11-28 15:58:26.622|
|buy |ETH-USD   |0.02166157|4081.44 |4081.47 |2021-11-28 15:58:26.693|
|buy |ETH-USD   |0.02522314|4081.44 |4081.47 |2021-11-28 15:58:26.693|
+----+----------+----------+--------+--------+-----------------------+

--------------------------------

Uruchamiająć `stream.start()` uruchamiamy w osobnym demonie websocket który streamuje wyniki. Jeżeli wystąpi jakiś błąd po stronie front-endu (np. błąd parsowania kolejnej linijki Pythona) fakt ten nie zostanie zgłoszony do sparka i socket pozostanie otwarty! Należy pamiętać, by zamykać stream za każdym razem używająć metody `stop()` (w powyższym przykładzie `query.stop()`). W przypadku utracenia referencji do zapytania, należy zastopować całą sesję również metodą `stop()` (w powyższym przykładzie `spark.stop()`) 

In [4]:
# Panic button - press only if you messed up opening new websocket and lost reference to it

query.stop()
spark.stop()

# Zadanie 1

**Analiza strumienia danych CoinBase (3p)**. Napisz zapytanie, które wypisuje średnią wartość wybranego parametru (np. `price`) w przesuwnych oknach czasowych względem czasu transakcji (kolumna `time`), grupując po relacji wymiany (z jakiej waluty na jaką walutę - kolumna `product_id`). 

In [15]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.\
    config("spark.sql.streaming.schemaInference", True).\
    config("spark.sql.streaming.forceDeleteTempCheckpointLocation", True).getOrCreate()  

stream = spark.readStream.\
    format("ws").\
    option("schema", "ticker").\
    load()

# your code...

avg_price = stream.select(col("price"), col("product_id"), col("time"))\
            .withColumn("timestamp", col("time").cast(TimestampType()))\
            .groupBy(window("timestamp", "20 minutes", "5 minutes"), "product_id")\
            .agg(avg("price"))\
            .writeStream\
            .outputMode("complete")\
            .format("console")\
            .option("truncate", "false")\
            .trigger(processingTime="10 seconds")\
            .start()


21/11/28 13:57:54 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-45823070-def8-4a1b-a676-bfb5c8cb05d6. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
21/11/28 13:57:54 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
21/11/28 13:57:55 WARN WSMicroBatchStreamer: Failed to subscribe to websocket: Attempt to decode value on failed cursor: DownField(channels) 
 message: {"type":"ticker","sequence":4717200168,"product_id":"ETH-BTC","price":"0.07516","open_24h":"0.07529","volume_24h":"5558.64533590","low_24h":"0.0741","high_24h":"0.07547","volume_30d":"381209.73930978","best_bid":"0.07514","best_ask":"0.07516","side":"buy","time":"2021-11-28T13:57:02.125194Z","trade_id":23821227,"last_

-------------------------------------------
Batch: 0
-------------------------------------------
+------+----------+----------+
|window|product_id|avg(price)|
+------+----------+----------+
+------+----------+----------+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+----------+------------------+
|window                                    |product_id|avg(price)        |
+------------------------------------------+----------+------------------+
|{2021-11-28 13:40:00, 2021-11-28 14:00:00}|ETH-USD   |4080.5533333333333|
|{2021-11-28 13:55:00, 2021-11-28 14:15:00}|ETH-USD   |4080.5533333333333|
|{2021-11-28 13:45:00, 2021-11-28 14:05:00}|ETH-USD   |4080.5533333333333|
|{2021-11-28 13:50:00, 2021-11-28 14:10:00}|ETH-USD   |4080.5533333333333|
+------------------------------------------+----------+------------------+



                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+----------+-----------------+
|window                                    |product_id|avg(price)       |
+------------------------------------------+----------+-----------------+
|{2021-11-28 13:40:00, 2021-11-28 14:00:00}|ETH-USD   |4080.309285714286|
|{2021-11-28 13:55:00, 2021-11-28 14:15:00}|ETH-BTC   |0.07514          |
|{2021-11-28 13:55:00, 2021-11-28 14:15:00}|ETH-USD   |4080.309285714286|
|{2021-11-28 13:45:00, 2021-11-28 14:05:00}|ETH-BTC   |0.07514          |
|{2021-11-28 13:45:00, 2021-11-28 14:05:00}|ETH-USD   |4080.309285714286|
|{2021-11-28 13:40:00, 2021-11-28 14:00:00}|ETH-BTC   |0.07514          |
|{2021-11-28 13:50:00, 2021-11-28 14:10:00}|ETH-USD   |4080.309285714286|
|{2021-11-28 13:50:00, 2021-11-28 14:10:00}|ETH-BTC   |0.07514          |
+------------------------------------------+----------+-----------------+



                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+----------+------------------+
|window                                    |product_id|avg(price)        |
+------------------------------------------+----------+------------------+
|{2021-11-28 13:40:00, 2021-11-28 14:00:00}|ETH-USD   |4080.4952727272725|
|{2021-11-28 13:55:00, 2021-11-28 14:15:00}|ETH-BTC   |0.075146          |
|{2021-11-28 13:55:00, 2021-11-28 14:15:00}|ETH-USD   |4080.4952727272725|
|{2021-11-28 13:45:00, 2021-11-28 14:05:00}|ETH-BTC   |0.075146          |
|{2021-11-28 13:45:00, 2021-11-28 14:05:00}|ETH-USD   |4080.4952727272725|
|{2021-11-28 13:40:00, 2021-11-28 14:00:00}|ETH-BTC   |0.075146          |
|{2021-11-28 13:50:00, 2021-11-28 14:10:00}|ETH-USD   |4080.4952727272725|
|{2021-11-28 13:50:00, 2021-11-28 14:10:00}|ETH-BTC   |0.075146          |
+------------------------------------------+----------+------------------+



# Zadanie 2

**Watermarking i dane opóźnione (3p).** 
Zmodyfikuj zapytanie z zadania 1 tak, aby zademonstować mechanizm znaków wodnych (watermarks) i obsługi danych opóźnionych. W konsoli powinno być widać, że aktualizują się odpowiednie wiersze tabeli wynikowej (tryb update), w szczególności aktualizacja wcześniejszych okien czasowych po przybyciu danych opóźnionych. **Do rozwiązania tego zadania proszę dołączyć przykładowy output i jego opis wyjaśniający na konkretnym przykładzie działanie znaku wodnego i danych opóźnionych**. 

Do ćwiczenia można wykorzystać skrypt w katalogu `/mock` napisany w [Scala-cli](https://scala-cli.virtuslab.org), który posłuży jako kontrolowane źródło danych CoinBase przez Websocket. 

Skrypt można uruchomić wykorzystując Docker:

```
make image
make run
```

Spowoduje to utworzenie websocketowego serwera pod adresem `ws://mock:8025`

Po uruchomieniu serwera należy wykonać poniższą komórkę, w której zapytanie czyta dane z utworzonego websocketa. Skrypt wysyła przykładowe wiadomości w formacie CoinBase co 10 sekund:

- W pierwszej serii wysyłane wiadomości o znacznikach czasowych 0s, 14s, 7s  
- W drugiej serii wysyłane są wiadomości o znacznikach czasowych 15s, 8s, 21s  
- W trzeciej serii wysyłane są wiadomości o znacznikach czasowych 4s, 17s  

Dla tych danych można ustawić okno czasowe na interwał 10 sekund. Skrypt można też zmodyfikować, tak aby wysyłał inne dane. 


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.sql.streaming.schemaInference", True).getOrCreate()

# Note url option!
stream = spark.readStream.\
    format("ws").\
    option("schema", "ticker").\
    option("url", "ws://mock:8025").load() # we pass explicit url option to subscribe to our mock service

# your code here...

avg_price = stream.select(col("price"), col("product_id"), col("time"))\
            .withColumn("timestamp", col("time").cast(TimestampType()))\
            .withWatermark("timestamp", "1 hour")\
            .groupBy(window("timestamp", "20 minutes", "5 minutes"), "product_id")\
            .agg(avg("price"))\
            .writeStream\
            .outputMode("complete")\
            .format("console")\
            .option("truncate", "false")\
            .trigger(processingTime="10 seconds")\
            .start()

# Zadanie 3

**Łączenie strumieni (3p)**. Rozdziel sztucznie dane CoinBase z kanału `ticker` na dwa strumienie (wykorzystując filtrowanie subskrypcji): jeden strumień dla `side="sell"`, drugi dla `side="buy"`. Następnie stwórz zapytanie, które łączy te strumienie i wypisuje transakcje dla danego `product_id`, które występowały po sobie w ciągu 1s.

In [9]:
from pyspark.sql import SparkSession

spark = SparkSession.\
    builder.\
    config("spark.sql.streaming.schemaInference", True).\
    config("spark.sql.streaming.forceDeleteTempCheckpointLocation", True).\
    getOrCreate()  

stream = spark.\
    readStream.\
    format("ws").\
    option("schema", "ticker").\
    load()

# your code...

sell = stream.withColumn("timestamp", col("time").cast(TimestampType()))\
        .select(col("side"), col("price"), col("product_id"), col("timestamp"), window("timestamp", "1 second", "1 second"))\
        .filter(col("side") == "sell").withWatermark("timestamp", "1 second")

buy = stream.withColumn("timestamp", col("time").cast(TimestampType()))\
        .select(col("side"), col("price"), col("product_id"), col("timestamp"), window("timestamp", "1 second", "1 second"))\
        .filter(col("side") == "buy").withWatermark("timestamp", "1 second")

joined = sell.alias("sell").join(buy.alias("buy"), (col("sell.window") == col("buy.window")) & (col("sell.product_id") == col("buy.product_id")))\
        .withWatermark("sell.timestamp", "1 second")\
        .withWatermark("buy.timestamp", "1 second")

query = joined.select(col("sell.side"), col("sell.price"), col("sell.product_id"), col("sell.timestamp"), col("buy.side"), col("buy.price") , col("buy.product_id"), col("buy.timestamp"))\
        .withWatermark("sell.timestamp", "1 second").withWatermark("buy.timestamp", "1 second")\
        .writeStream\
        .outputMode("append")\
        .format("console")\
        .option("truncate", "false")\
        .trigger(processingTime="1 second")\
        .start()


21/11/28 16:02:39 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-95f7021c-36c0-4de2-8be1-2b8af9ef324c. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
21/11/28 16:02:39 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
21/11/28 16:02:51 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 11874 milliseconds


-------------------------------------------
Batch: 0
-------------------------------------------
+----+-----+----------+---------+----+-----+----------+---------+
|side|price|product_id|timestamp|side|price|product_id|timestamp|
+----+-----+----------+---------+----+-----+----------+---------+
+----+-----+----------+---------+----+-----+----------+---------+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+
|side|price  |product_id|timestamp              |side|price  |product_id|timestamp              |
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+
|sell|4082.86|ETH-USD   |2021-11-28 16:02:43.865|buy |4083.08|ETH-USD   |2021-11-28 16:02:43.406|
|sell|4082.79|ETH-USD   |2021-11-28 16:02:43.554|buy |4083.08|ETH-USD   |2021-11-28 16:02:43.406|
|sell|4082.86|ETH-USD   |2021-11-28 16:02:43.865|buy |4083.18|ETH-USD   |2021-11-28 16:02:43.688|
|sell|4082.79|ETH-USD   |2021-11-28 16:02:43.554|buy |4083.18|ETH-USD   |2021-11-28 16:02:43.688|
|sell|4082.86|ETH-USD   |2021-11-28 16:02:43.865|buy |4083.17|ETH-USD   |2021-11-28 16:02:43.754|
|sell|4082.79|ETH-USD   |2021-11-28 16:02:43.554|buy |4083.17|ETH-USD   |2021-11-28 16:02:43.754|
|sell|4082.21|ETH-USD

21/11/28 16:03:00 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 9305 milliseconds
21/11/28 16:03:09 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 8615 milliseconds


-------------------------------------------
Batch: 2
-------------------------------------------
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+
|side|price  |product_id|timestamp              |side|price  |product_id|timestamp              |
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+
|sell|4081.56|ETH-USD   |2021-11-28 16:02:54.717|buy |4081.96|ETH-USD   |2021-11-28 16:02:54.269|
|sell|4081.56|ETH-USD   |2021-11-28 16:02:54.717|buy |4082.04|ETH-USD   |2021-11-28 16:02:54.717|
|sell|4081.8 |ETH-USD   |2021-11-28 16:02:58.05 |buy |4081.81|ETH-USD   |2021-11-28 16:02:58.882|
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+



21/11/28 16:03:18 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 8572 milliseconds


-------------------------------------------
Batch: 3
-------------------------------------------
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+
|side|price  |product_id|timestamp              |side|price  |product_id|timestamp              |
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+
|sell|4081.71|ETH-USD   |2021-11-28 16:03:04.683|buy |4081.72|ETH-USD   |2021-11-28 16:03:04.934|
|sell|4081.71|ETH-USD   |2021-11-28 16:03:04.683|buy |4081.72|ETH-USD   |2021-11-28 16:03:04.345|
|sell|4081.63|ETH-USD   |2021-11-28 16:03:01.118|buy |4081.95|ETH-USD   |2021-11-28 16:03:01.115|
|sell|4081.35|ETH-USD   |2021-11-28 16:03:01.697|buy |4081.95|ETH-USD   |2021-11-28 16:03:01.115|
|sell|4081.63|ETH-USD   |2021-11-28 16:03:01.118|buy |4081.91|ETH-USD   |2021-11-28 16:03:01.407|
|sell|4081.35|ETH-USD   |2021-11-28 16:03:01.697|buy |4081.91|ETH-USD   |2021-11-28 16:03:01.407|
+----+-------+-------

21/11/28 16:03:26 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 8271 milliseconds


-------------------------------------------
Batch: 4
-------------------------------------------
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+
|side|price  |product_id|timestamp              |side|price  |product_id|timestamp              |
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+
|sell|4081.76|ETH-USD   |2021-11-28 16:03:10.247|buy |4081.77|ETH-USD   |2021-11-28 16:03:10.481|
|sell|4083.2 |ETH-USD   |2021-11-28 16:03:15.322|buy |4083.2 |ETH-USD   |2021-11-28 16:03:15.004|
|sell|4083.2 |ETH-USD   |2021-11-28 16:03:15.322|buy |4083.21|ETH-USD   |2021-11-28 16:03:15.308|
|sell|4081.73|ETH-USD   |2021-11-28 16:03:13.026|buy |4081.74|ETH-USD   |2021-11-28 16:03:13.722|
|sell|4081.73|ETH-USD   |2021-11-28 16:03:13.026|buy |4081.74|ETH-USD   |2021-11-28 16:03:13.731|
|sell|4081.73|ETH-USD   |2021-11-28 16:03:13.026|buy |4081.75|ETH-USD   |2021-11-28 16:03:13.731|
|sell|4081.73|ETH-USD

21/11/28 16:03:34 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 8494 milliseconds


-------------------------------------------
Batch: 5
-------------------------------------------
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+
|side|price  |product_id|timestamp              |side|price  |product_id|timestamp              |
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+
|sell|4082.96|ETH-USD   |2021-11-28 16:03:18.219|buy |4083.23|ETH-USD   |2021-11-28 16:03:18.442|
|sell|4082.64|ETH-USD   |2021-11-28 16:03:18.219|buy |4083.23|ETH-USD   |2021-11-28 16:03:18.442|
|sell|4082.93|ETH-USD   |2021-11-28 16:03:18.219|buy |4083.23|ETH-USD   |2021-11-28 16:03:18.442|
|sell|4082.62|ETH-USD   |2021-11-28 16:03:18.219|buy |4083.23|ETH-USD   |2021-11-28 16:03:18.442|
|sell|4082.82|ETH-USD   |2021-11-28 16:03:18.219|buy |4083.23|ETH-USD   |2021-11-28 16:03:18.442|
|sell|4082.65|ETH-USD   |2021-11-28 16:03:18.219|buy |4083.23|ETH-USD   |2021-11-28 16:03:18.442|
|sell|4082.96|ETH-USD

21/11/28 16:03:43 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 8795 milliseconds


-------------------------------------------
Batch: 6
-------------------------------------------
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+
|side|price  |product_id|timestamp              |side|price  |product_id|timestamp              |
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+
|sell|4086.25|ETH-USD   |2021-11-28 16:03:31.518|buy |4086.47|ETH-USD   |2021-11-28 16:03:31.378|
|sell|4086.25|ETH-USD   |2021-11-28 16:03:31.518|buy |4086.48|ETH-USD   |2021-11-28 16:03:31.378|
|sell|4084.46|ETH-USD   |2021-11-28 16:03:28.277|buy |4084.69|ETH-USD   |2021-11-28 16:03:28.704|
|sell|4084.46|ETH-USD   |2021-11-28 16:03:28.308|buy |4084.69|ETH-USD   |2021-11-28 16:03:28.704|
|sell|4084.46|ETH-USD   |2021-11-28 16:03:28.277|buy |4084.68|ETH-USD   |2021-11-28 16:03:28.704|
|sell|4084.46|ETH-USD   |2021-11-28 16:03:28.308|buy |4084.68|ETH-USD   |2021-11-28 16:03:28.704|
+----+-------+-------

21/11/28 16:03:51 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 8104 milliseconds


-------------------------------------------
Batch: 7
-------------------------------------------
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+
|side|price  |product_id|timestamp              |side|price  |product_id|timestamp              |
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+
|sell|4085.9 |ETH-USD   |2021-11-28 16:03:42.787|buy |4086.43|ETH-USD   |2021-11-28 16:03:42.938|
|sell|4085.9 |ETH-USD   |2021-11-28 16:03:42.787|buy |4086.04|ETH-USD   |2021-11-28 16:03:42.938|
|sell|4085.9 |ETH-USD   |2021-11-28 16:03:42.787|buy |4086.44|ETH-USD   |2021-11-28 16:03:42.938|
|sell|4085.9 |ETH-USD   |2021-11-28 16:03:42.787|buy |4086.04|ETH-USD   |2021-11-28 16:03:42.347|
|sell|4085.9 |ETH-USD   |2021-11-28 16:03:42.787|buy |4086.05|ETH-USD   |2021-11-28 16:03:42.938|
|sell|4085.9 |ETH-USD   |2021-11-28 16:03:42.787|buy |4086.05|ETH-USD   |2021-11-28 16:03:42.347|
|sell|4085.9 |ETH-USD

21/11/28 16:03:59 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 8215 milliseconds


-------------------------------------------
Batch: 8
-------------------------------------------
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+
|side|price  |product_id|timestamp              |side|price  |product_id|timestamp              |
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+
|sell|4088.99|ETH-USD   |2021-11-28 16:03:51.57 |buy |4089.0 |ETH-USD   |2021-11-28 16:03:51.638|
|sell|4088.99|ETH-USD   |2021-11-28 16:03:51.57 |buy |4089.0 |ETH-USD   |2021-11-28 16:03:51.133|
|sell|4088.99|ETH-USD   |2021-11-28 16:03:51.57 |buy |4089.0 |ETH-USD   |2021-11-28 16:03:51.716|
|sell|4088.99|ETH-USD   |2021-11-28 16:03:51.57 |buy |4089.0 |ETH-USD   |2021-11-28 16:03:51.739|
|sell|4088.99|ETH-USD   |2021-11-28 16:03:50.979|buy |4088.11|ETH-USD   |2021-11-28 16:03:50.386|
|sell|4088.99|ETH-USD   |2021-11-28 16:03:50.982|buy |4088.11|ETH-USD   |2021-11-28 16:03:50.386|
|sell|4088.99|ETH-USD

21/11/28 16:04:07 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 8037 milliseconds


-------------------------------------------
Batch: 9
-------------------------------------------
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+
|side|price  |product_id|timestamp              |side|price  |product_id|timestamp              |
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+
|sell|4091.65|ETH-USD   |2021-11-28 16:03:59.518|buy |4092.23|ETH-USD   |2021-11-28 16:03:59.101|
|sell|4091.65|ETH-USD   |2021-11-28 16:03:59.518|buy |4092.24|ETH-USD   |2021-11-28 16:03:59.101|
|sell|4091.65|ETH-USD   |2021-11-28 16:03:59.518|buy |4092.23|ETH-USD   |2021-11-28 16:03:59.026|
|sell|4088.99|ETH-USD   |2021-11-28 16:03:51.57 |buy |4089.0 |ETH-USD   |2021-11-28 16:03:51.828|
|sell|4088.99|ETH-USD   |2021-11-28 16:03:51.57 |buy |4089.0 |ETH-USD   |2021-11-28 16:03:51.828|
|sell|4088.99|ETH-USD   |2021-11-28 16:03:51.57 |buy |4089.0 |ETH-USD   |2021-11-28 16:03:51.931|
|sell|4088.99|ETH-USD

21/11/28 16:04:15 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 7782 milliseconds


-------------------------------------------
Batch: 10
-------------------------------------------
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+
|side|price  |product_id|timestamp              |side|price  |product_id|timestamp              |
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+
|sell|4090.95|ETH-USD   |2021-11-28 16:04:01.399|buy |4090.96|ETH-USD   |2021-11-28 16:04:01.064|
|sell|4091.66|ETH-USD   |2021-11-28 16:04:00.108|buy |4091.19|ETH-USD   |2021-11-28 16:04:00.573|
|sell|4091.37|ETH-USD   |2021-11-28 16:04:00.15 |buy |4091.19|ETH-USD   |2021-11-28 16:04:00.573|
|sell|4091.66|ETH-USD   |2021-11-28 16:04:00.108|buy |4090.82|ETH-USD   |2021-11-28 16:04:00.573|
|sell|4091.37|ETH-USD   |2021-11-28 16:04:00.15 |buy |4090.82|ETH-USD   |2021-11-28 16:04:00.573|
|sell|4091.66|ETH-USD   |2021-11-28 16:04:00.108|buy |4090.83|ETH-USD   |2021-11-28 16:04:00.573|
|sell|4091.37|ETH-US

21/11/28 16:04:23 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 7944 milliseconds


-------------------------------------------
Batch: 11
-------------------------------------------
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+
|side|price  |product_id|timestamp              |side|price  |product_id|timestamp              |
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+
|sell|4090.6 |ETH-USD   |2021-11-28 16:04:10.337|buy |4090.61|ETH-USD   |2021-11-28 16:04:10.087|
|sell|4090.67|ETH-USD   |2021-11-28 16:04:10.879|buy |4090.61|ETH-USD   |2021-11-28 16:04:10.087|
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+



                                                                                

-------------------------------------------
Batch: 12
-------------------------------------------
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+
|side|price  |product_id|timestamp              |side|price  |product_id|timestamp              |
+----+-------+----------+-----------------------+----+-------+----------+-----------------------+
|sell|4089.07|ETH-USD   |2021-11-28 16:04:17.993|buy |4088.89|ETH-USD   |2021-11-28 16:04:17.231|
|sell|4089.07|ETH-USD   |2021-11-28 16:04:17.993|buy |4089.08|ETH-USD   |2021-11-28 16:04:17.742|
|sell|0.07519|ETH-BTC   |2021-11-28 16:04:16.498|buy |0.0752 |ETH-BTC   |2021-11-28 16:04:16.739|
|sell|4089.48|ETH-USD   |2021-11-28 16:04:16.519|buy |4089.51|ETH-USD   |2021-11-28 16:04:16.192|
|sell|4089.5 |ETH-USD   |2021-11-28 16:04:16.411|buy |4089.51|ETH-USD   |2021-11-28 16:04:16.192|
|sell|4089.48|ETH-USD   |2021-11-28 16:04:16.521|buy |4089.51|ETH-USD   |2021-11-28 16:04:16.192|
|sell|4089.48|ETH-US

21/11/28 16:04:31 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 1000 milliseconds, but spent 8268 milliseconds