In [1]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [2]:
spark = SparkSession.builder.config("spark.sql.streaming.schemaInference", True).getOrCreate()

stream = spark.\
    readStream.\
    format("ws").\
    option("schema", "ticker").\
    load() # we need to pass `option("schema", "ticker")` to get correct channel subscribed

query = stream.select("side", "product_id", "last_size", "best_bid", "best_ask", "time").\
    writeStream.\
    format("console").\
    outputMode("append").\
    option("truncate", "false").\
    start()

query.awaitTermination(10) # Let's wait for 10 seconds.
query.stop() # Let's stop the query
stream.printSchema()
#spark.stop() # And stop the whole session

root
 |-- type: string (nullable = false)
 |-- trade_id: long (nullable = false)
 |-- sequence: long (nullable = false)
 |-- time: timestamp (nullable = false)
 |-- product_id: string (nullable = false)
 |-- price: double (nullable = false)
 |-- side: string (nullable = false)
 |-- last_size: double (nullable = false)
 |-- best_bid: double (nullable = false)
 |-- best_ask: double (nullable = false)



Uruchamiająć `stream.start()` uruchamiamy w osobnym demonie websocket który streamuje wyniki. Jeżeli wystąpi jakiś błąd po stronie front-endu (np. błąd parsowania kolejnej linijki Pythona) fakt ten nie zostanie zgłoszony do sparka i socket pozostanie otwarty! Należy pamiętać, by zamykać stream za każdym razem używająć metody `stop()` (w powyższym przykładzie `query.stop()`). W przypadku utracenia referencji do zapytania, należy zastopować całą sesję również metodą `stop()` (w powyższym przykładzie `spark.stop()`) 

In [3]:
# Panic button - press only if you messed up opening new websocket and lost reference to it

query.stop()
spark.stop()

# Zadanie 1

**Analiza strumienia danych CoinBase (3p)**. Napisz zapytanie, które wypisuje średnią wartość wybranego parametru (np. `price`) w przesuwnych oknach czasowych względem czasu transakcji (kolumna `time`), grupując po relacji wymiany (z jakiej waluty na jaką walutę - kolumna `product_id`). 

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import window, avg
import pyspark.sql.functions as F

spark = SparkSession.builder.\
    config("spark.sql.streaming.schemaInference", True).\
    config("spark.sql.streaming.forceDeleteTempCheckpointLocation", True).getOrCreate()  

stream = spark.readStream.\
    format("ws").\
    option("schema", "ticker").\
    load()

query = (
    stream
    .select("product_id", "price", "time")
    .groupBy(F.window("time", "2 minutes", "1 minutes"), "product_id").mean("price")
    .orderBy("window")
)

query = (
    query
    .writeStream
    .format("console")
    .outputMode("complete")
    .option("truncate", "false")
    .start()
)

query.awaitTermination(30)
query.stop()

In [None]:
-------------------------------------------                                     
Batch: 0
-------------------------------------------
+------+----------+----------+
|window|product_id|avg(price)|
+------+----------+----------+
+------+----------+----------+

-------------------------------------------                                     
Batch: 1
-------------------------------------------
+------------------------------------------+----------+------------------+
|window                                    |product_id|avg(price)        |
+------------------------------------------+----------+------------------+
|{2024-11-12 16:20:00, 2024-11-12 16:22:00}|ETH-USD   |3233.6555072463766|
|{2024-11-12 16:20:00, 2024-11-12 16:22:00}|BTC-EUR   |81209.40833333334 |
|{2024-11-12 16:20:00, 2024-11-12 16:22:00}|ETH-BTC   |0.03742           |
|{2024-11-12 16:20:00, 2024-11-12 16:22:00}|BTC-USD   |86434.97687898087 |
|{2024-11-12 16:21:00, 2024-11-12 16:23:00}|BTC-USD   |86434.97687898087 |
|{2024-11-12 16:21:00, 2024-11-12 16:23:00}|ETH-BTC   |0.03742           |
|{2024-11-12 16:21:00, 2024-11-12 16:23:00}|ETH-USD   |3233.6555072463766|
|{2024-11-12 16:21:00, 2024-11-12 16:23:00}|BTC-EUR   |81209.40833333334 |
+------------------------------------------+----------+------------------+

-------------------------------------------                                     
Batch: 2
-------------------------------------------
+------------------------------------------+----------+--------------------+
|window                                    |product_id|avg(price)          |
+------------------------------------------+----------+--------------------+
|{2024-11-12 16:20:00, 2024-11-12 16:22:00}|ETH-USD   |3231.6759880239515  |
|{2024-11-12 16:20:00, 2024-11-12 16:22:00}|BTC-EUR   |81187.51325581396   |
|{2024-11-12 16:20:00, 2024-11-12 16:22:00}|ETH-BTC   |0.037412857142857144|
|{2024-11-12 16:20:00, 2024-11-12 16:22:00}|BTC-USD   |86416.15510086455   |
|{2024-11-12 16:21:00, 2024-11-12 16:23:00}|BTC-USD   |86416.15510086455   |
|{2024-11-12 16:21:00, 2024-11-12 16:23:00}|ETH-BTC   |0.037412857142857144|
|{2024-11-12 16:21:00, 2024-11-12 16:23:00}|ETH-USD   |3231.6759880239515  |
|{2024-11-12 16:21:00, 2024-11-12 16:23:00}|BTC-EUR   |81187.51325581396   |
+------------------------------------------+----------+--------------------+

# Zadanie 2

**Watermarking i dane opóźnione (3p).** 
Zmodyfikuj zapytanie z zadania 1 tak, aby zademonstować mechanizm znaków wodnych (watermarks) i obsługi danych opóźnionych. W konsoli powinno być widać, że aktualizują się odpowiednie wiersze tabeli wynikowej (tryb update), w szczególności aktualizacja wcześniejszych okien czasowych po przybyciu danych opóźnionych. **Do rozwiązania tego zadania proszę dołączyć przykładowy output i jego opis wyjaśniający na konkretnym przykładzie działanie znaku wodnego i danych opóźnionych**. 

Do ćwiczenia można wykorzystać skrypt w katalogu `/mock` napisany w [Scala-cli](https://scala-cli.virtuslab.org), który posłuży jako kontrolowane źródło danych CoinBase przez Websocket. 

Skrypt można uruchomić wykorzystując Docker:

```
make image
make run
```

Spowoduje to utworzenie websocketowego serwera pod adresem `ws://mock:8025`

Po uruchomieniu serwera należy wykonać poniższą komórkę, w której zapytanie czyta dane z utworzonego websocketa. Skrypt wysyła przykładowe wiadomości w formacie CoinBase co 10 sekund:

- W pierwszej serii wysyłane wiadomości o znacznikach czasowych 0s, 14s, 7s  
- W drugiej serii wysyłane są wiadomości o znacznikach czasowych 15s, 8s, 21s  
- W trzeciej serii wysyłane są wiadomości o znacznikach czasowych 4s, 17s  

Dla tych danych można ustawić okno czasowe na interwał 10 sekund. Skrypt można też zmodyfikować, tak aby wysyłał inne dane. 


In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.config("spark.sql.streaming.schemaInference", True).config("spark.sql.streaming.forceDeleteTempCheckpointLocation", True).getOrCreate()  
stream = spark.readStream.format("ws").option("schema", "ticker").load()

In [5]:
spark = (
    SparkSession
    .builder
    .config("spark.sql.streaming.schemaInference", True)
    .getOrCreate()
)

stream = (
    spark
    .readStream
    .format("ws")
    .option("schema", "ticker")
    .option("url", "ws://mock:8025")
    .load()
)


query = (
    stream
    .select("product_id", "price", "time")
    .withWatermark("time", "30 seconds")
    .groupBy(F.window("time", "10 seconds"), "product_id").mean("price")
)

query = (
    query
    .writeStream
    .format("console")
    .outputMode("update")
    .option("truncate", "false")
    .start()
)

query.awaitTermination(40)
query.stop()

In [None]:
Sending frames with T0+0s, T0+14s, T0+7s timestamps.
2021-11-01T12:00:00.123123Z - 890.8003280922992
2021-11-01T12:00:14.123123Z - 229.38523620812558
2021-11-01T12:00:07.123123Z - 394.37850273373664
Sending frames with T0+15s, T0+8s, T0+21s timestamps.
2021-11-01T12:00:15.123123Z - 440.95552764207724
2021-11-01T12:00:08.123123Z - 388.0909216896069
2021-11-01T12:00:21.123123Z - 451.51727372079375
Sending frames with T0+4s, T0+17s timestamps.
2021-11-01T12:00:04.123123Z - 408.20666675204853
2021-11-01T12:00:17.123123Z - 3.563696603498645

-------------------------------------------                                     
Batch: 0
-------------------------------------------
+------+----------+----------+
|window|product_id|avg(price)|
+------+----------+----------+
+------+----------+----------+

-------------------------------------------                                     
Batch: 1
-------------------------------------------
+------------------------------------------+----------+------------------+
|window                                    |product_id|avg(price)        |
+------------------------------------------+----------+------------------+
|{2021-11-01 00:00:00, 2021-11-01 00:00:10}|ETH-USD   |642.589415413018  |
|{2021-11-01 00:00:10, 2021-11-01 00:00:20}|ETH-USD   |229.38523620812558|
+------------------------------------------+----------+------------------+

-------------------------------------------                                     
Batch: 2
-------------------------------------------
+------+----------+----------+
|window|product_id|avg(price)|
+------+----------+----------+
+------+----------+----------+

-------------------------------------------                                     
Batch: 3
-------------------------------------------
+------------------------------------------+----------+------------------+
|window                                    |product_id|avg(price)        |
+------------------------------------------+----------+------------------+
|{2021-11-01 00:00:20, 2021-11-01 00:00:30}|ETH-USD   |451.51727372079375|
|{2021-11-01 00:00:00, 2021-11-01 00:00:10}|ETH-USD   |557.756584171881  |
|{2021-11-01 00:00:10, 2021-11-01 00:00:20}|ETH-USD   |335.1703819251014 |
+------------------------------------------+----------+------------------+

-------------------------------------------                                     
Batch: 4
-------------------------------------------
+------+----------+----------+
|window|product_id|avg(price)|
+------+----------+----------+
+------+----------+----------+

-------------------------------------------                                     
Batch: 5
-------------------------------------------
+------------------------------------------+----------+------------------+
|window                                    |product_id|avg(price)        |
+------------------------------------------+----------+------------------+
|{2021-11-01 00:00:00, 2021-11-01 00:00:10}|ETH-USD   |520.3691048169228 |
|{2021-11-01 00:00:10, 2021-11-01 00:00:20}|ETH-USD   |224.63482015123384|
+------------------------------------------+----------+------------------+



# Zadanie 3

**Łączenie strumieni (3p)**. Korzystając z łączenia strumieni połącz dane z kanału `ticker` (transakcje kupna `side="buy"`) razem z danymi o transakcjach napływających co sekundę `heartbeat` korzystając z `trade_id` i odpowiedniego id w kanale heartbeat. Wypisz połączony strumień danych.

Na moment tworzenia zadania 15.11.2023 kanał `heartbeat` zwraca błędne dane o dacie (np. `1970-01-04 13:53:57.645339`). Połączenie z kanałem `ticker` pozwala uzyskać poprawne informacje. Cóż za wspaniałe zastosowanie joina!


In [9]:
from pyspark.sql import SparkSession

spark = (
    SparkSession
    .builder
    .config("spark.sql.streaming.schemaInference", True)
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", True)
    .getOrCreate()
)

heartbeat_stream = (
    spark
    .readStream
    .format("ws")
    .option("schema", "heartbeat")
    .load()
)

ticker_stream = (
    spark
    .readStream
    .format("ws")
    .option("schema", "ticker")
    .load()
)

buy = (
    ticker_stream
    .select("side", "trade_id", "product_id", "price", "time")
    .filter(ticker_stream.side == "buy")
    .withColumnRenamed("side", "buy_side")
    .withColumnRenamed("trade_id", "buy_trade_id")
    .withColumnRenamed("product_id", "buy_product_id")
    .withColumnRenamed("price", "buy_price")
    .withColumnRenamed("time", "buy_time")
)

heartbeat = (
    heartbeat_stream
    .select("type", "sequence", "last_trade_id", "product_id", "time")
    .withColumnRenamed("type", "hr_type")
    .withColumnRenamed("sequence", "hr_sequence")
    .withColumnRenamed("last_trade_id", "hr_last_trade_id")
    .withColumnRenamed("product_id", "hr_product_id")
    .withColumnRenamed("time", "hr_time")
)


joined = (
    heartbeat.join(
        buy,
        F.expr(
            """
            buy_trade_id = hr_last_trade_id
            """
        )
    )
)

query = (
    joined
    .writeStream
    .format("console")
    .outputMode("append")
    .option("truncate", "false")
    .start()
)

query.awaitTermination(80)
query.stop()

In [None]:
-------------------------------------------                                     
Batch: 0
-------------------------------------------
+-------+-----------+----------------+-------------+-------+--------+------------+--------------+---------+--------+
|hr_type|hr_sequence|hr_last_trade_id|hr_product_id|hr_time|buy_side|buy_trade_id|buy_product_id|buy_price|buy_time|
+-------+-----------+----------------+-------------+-------+--------+------------+--------------+---------+--------+
+-------+-----------+----------------+-------------+-------+--------+------------+--------------+---------+--------+

[I 2024-11-12 18:03:43.085 ServerApp] Saving file at /work/Notebook.ipynb / 200]
-------------------------------------------                                     
Batch: 1
-------------------------------------------
+---------+-----------+----------------+-------------+--------------------------+--------+------------+--------------+---------+-----------------------+
|hr_type  |hr_sequence|hr_last_trade_id|hr_product_id|hr_time                   |buy_side|buy_trade_id|buy_product_id|buy_price|buy_time               |
+---------+-----------+----------------+-------------+--------------------------+--------+------------+--------------+---------+-----------------------+
|heartbeat|69544437888|566088651       |ETH-USD      |1970-01-04 13:53:57.645339|buy     |566088651   |ETH-USD       |3238.32  |2024-11-12 18:03:32.992|
|heartbeat|91250638866|715680931       |BTC-USD      |1970-01-04 13:53:57.645339|buy     |715680931   |BTC-USD       |87241.73 |2024-11-12 18:03:32.804|
|heartbeat|91250628014|715680826       |BTC-USD      |1970-01-04 13:53:57.645339|buy     |715680826   |BTC-USD       |87237.36 |2024-11-12 18:03:26.832|
|heartbeat|91250626430|715680820       |BTC-USD      |1970-01-04 13:53:57.645339|buy     |715680820   |BTC-USD       |87237.15 |2024-11-12 18:03:25.554|
|heartbeat|69544434369|566088637       |ETH-USD      |1970-01-04 13:53:57.645339|buy     |566088637   |ETH-USD       |3237.7   |2024-11-12 18:03:30.781|
|heartbeat|91250641259|715680961       |BTC-USD      |1970-01-04 13:53:57.645339|buy     |715680961   |BTC-USD       |87252.9  |2024-11-12 18:03:35.567|
|heartbeat|69544427351|566088619       |ETH-USD      |1970-01-04 13:53:57.645339|buy     |566088619   |ETH-USD       |3238.72  |2024-11-12 18:03:26.589|
|heartbeat|91250640366|715680954       |BTC-USD      |1970-01-04 13:53:57.645339|buy     |715680954   |BTC-USD       |87252.33 |2024-11-12 18:03:34.698|
|heartbeat|69544426176|566088614       |ETH-USD      |1970-01-04 13:53:57.645339|buy     |566088614   |ETH-USD       |3238.61  |2024-11-12 18:03:26.008|
|heartbeat|69544439332|566088660       |ETH-USD      |1970-01-04 13:53:57.645339|buy     |566088660   |ETH-USD       |3238.07  |2024-11-12 18:03:34.946|
|heartbeat|91250641754|715680973       |BTC-USD      |1970-01-04 13:53:57.645339|buy     |715680973   |BTC-USD       |87253.92 |2024-11-12 18:03:36.857|
|heartbeat|69544442498|566088684       |ETH-USD      |1970-01-04 13:53:57.645339|buy     |566088684   |ETH-USD       |3238.74  |2024-11-12 18:03:38.067|
|heartbeat|91250622931|715680812       |BTC-USD      |1970-01-04 13:53:57.645339|buy     |715680812   |BTC-USD       |87240.33 |2024-11-12 18:03:24.327|
|heartbeat|69544428150|566088621       |ETH-USD      |1970-01-04 13:53:57.645339|buy     |566088621   |ETH-USD       |3238.8   |2024-11-12 18:03:27.28 |
|heartbeat|7564275331 |34461607        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461607    |ETH-BTC       |0.03713  |2024-11-12 18:03:15.988|
|heartbeat|7564275460 |34461607        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461607    |ETH-BTC       |0.03713  |2024-11-12 18:03:15.988|
|heartbeat|7564275626 |34461607        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461607    |ETH-BTC       |0.03713  |2024-11-12 18:03:15.988|
|heartbeat|7564275707 |34461607        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461607    |ETH-BTC       |0.03713  |2024-11-12 18:03:15.988|
|heartbeat|7564275855 |34461607        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461607    |ETH-BTC       |0.03713  |2024-11-12 18:03:15.988|
|heartbeat|7564275974 |34461607        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461607    |ETH-BTC       |0.03713  |2024-11-12 18:03:15.988|
+---------+-----------+----------------+-------------+--------------------------+--------+------------+--------------+---------+-----------------------+
only showing top 20 rows

-------------------------------------------                                     
Batch: 2
-------------------------------------------
+---------+-----------+----------------+-------------+--------------------------+--------+------------+--------------+---------+-----------------------+
|hr_type  |hr_sequence|hr_last_trade_id|hr_product_id|hr_time                   |buy_side|buy_trade_id|buy_product_id|buy_price|buy_time               |
+---------+-----------+----------------+-------------+--------------------------+--------+------------+--------------+---------+-----------------------+
|heartbeat|91250647650|715681047       |BTC-USD      |1970-01-04 13:53:57.645339|buy     |715681047   |BTC-USD       |87248.82 |2024-11-12 18:03:41.832|
|heartbeat|69544449321|566088710       |ETH-USD      |1970-01-04 13:53:57.645339|buy     |566088710   |ETH-USD       |3238.88  |2024-11-12 18:03:44.903|
|heartbeat|69544450972|566088741       |ETH-USD      |1970-01-04 13:53:57.645339|buy     |566088741   |ETH-USD       |3239.28  |2024-11-12 18:03:48.013|
|heartbeat|69544450718|566088741       |ETH-USD      |1970-01-04 13:53:57.645339|buy     |566088741   |ETH-USD       |3239.28  |2024-11-12 18:03:48.013|
|heartbeat|32407637538|85723290        |BTC-EUR      |1970-01-04 13:53:57.645339|buy     |85723290    |BTC-EUR       |82032.29 |2024-11-12 18:03:48.972|
|heartbeat|32407637440|85723290        |BTC-EUR      |1970-01-04 13:53:57.645339|buy     |85723290    |BTC-EUR       |82032.29 |2024-11-12 18:03:48.972|
|heartbeat|32407637906|85723290        |BTC-EUR      |1970-01-04 13:53:57.645339|buy     |85723290    |BTC-EUR       |82032.29 |2024-11-12 18:03:48.972|
|heartbeat|69544450084|566088719       |ETH-USD      |1970-01-04 13:53:57.645339|buy     |566088719   |ETH-USD       |3238.89  |2024-11-12 18:03:45.758|
|heartbeat|69544448860|566088707       |ETH-USD      |1970-01-04 13:53:57.645339|buy     |566088707   |ETH-USD       |3238.89  |2024-11-12 18:03:43.565|
|heartbeat|91250649101|715681059       |BTC-USD      |1970-01-04 13:53:57.645339|buy     |715681059   |BTC-USD       |87259.28 |2024-11-12 18:03:42.784|
|heartbeat|69544450395|566088728       |ETH-USD      |1970-01-04 13:53:57.645339|buy     |566088728   |ETH-USD       |3239.1   |2024-11-12 18:03:46.872|
|heartbeat|7564276462 |34461611        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461611    |ETH-BTC       |0.03712  |2024-11-12 18:03:45.949|
|heartbeat|7564276490 |34461611        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461611    |ETH-BTC       |0.03712  |2024-11-12 18:03:45.949|
|heartbeat|7564276536 |34461611        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461611    |ETH-BTC       |0.03712  |2024-11-12 18:03:45.949|
|heartbeat|7564276403 |34461611        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461611    |ETH-BTC       |0.03712  |2024-11-12 18:03:45.949|
|heartbeat|7564276469 |34461611        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461611    |ETH-BTC       |0.03712  |2024-11-12 18:03:45.949|
|heartbeat|7564276522 |34461611        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461611    |ETH-BTC       |0.03712  |2024-11-12 18:03:45.949|
|heartbeat|69544447678|566088701       |ETH-USD      |1970-01-04 13:53:57.645339|buy     |566088701   |ETH-USD       |3238.67  |2024-11-12 18:03:42.996|
|heartbeat|32407635385|85723285        |BTC-EUR      |1970-01-04 13:53:57.645339|buy     |85723285    |BTC-EUR       |82019.07 |2024-11-12 18:03:38.477|
|heartbeat|32407635910|85723285        |BTC-EUR      |1970-01-04 13:53:57.645339|buy     |85723285    |BTC-EUR       |82019.07 |2024-11-12 18:03:38.477|
+---------+-----------+----------------+-------------+--------------------------+--------+------------+--------------+---------+-----------------------+
only showing top 20 rows

-------------------------------------------                                     
Batch: 3
-------------------------------------------
+---------+-----------+----------------+-------------+--------------------------+--------+------------+--------------+---------+-----------------------+
|hr_type  |hr_sequence|hr_last_trade_id|hr_product_id|hr_time                   |buy_side|buy_trade_id|buy_product_id|buy_price|buy_time               |
+---------+-----------+----------------+-------------+--------------------------+--------+------------+--------------+---------+-----------------------+
|heartbeat|69544457673|566088781       |ETH-USD      |1970-01-04 13:53:57.645339|buy     |566088781   |ETH-USD       |3238.8   |2024-11-12 18:03:56.016|
|heartbeat|32407642075|85723301        |BTC-EUR      |1970-01-04 13:53:57.645339|buy     |85723301    |BTC-EUR       |82053.92 |2024-11-12 18:04:00.638|
|heartbeat|32407641700|85723301        |BTC-EUR      |1970-01-04 13:53:57.645339|buy     |85723301    |BTC-EUR       |82053.92 |2024-11-12 18:04:00.638|
|heartbeat|32407642411|85723301        |BTC-EUR      |1970-01-04 13:53:57.645339|buy     |85723301    |BTC-EUR       |82053.92 |2024-11-12 18:04:00.638|
|heartbeat|69544455153|566088772       |ETH-USD      |1970-01-04 13:53:57.645339|buy     |566088772   |ETH-USD       |3238.98  |2024-11-12 18:03:54.045|
|heartbeat|69544460892|566088794       |ETH-USD      |1970-01-04 13:53:57.645339|buy     |566088794   |ETH-USD       |3239.13  |2024-11-12 18:03:58.346|
|heartbeat|69544453439|566088760       |ETH-USD      |1970-01-04 13:53:57.645339|buy     |566088760   |ETH-USD       |3239.0   |2024-11-12 18:03:51.67 |
|heartbeat|32407638602|85723290        |BTC-EUR      |1970-01-04 13:53:57.645339|buy     |85723290    |BTC-EUR       |82032.29 |2024-11-12 18:03:48.972|
|heartbeat|32407638971|85723290        |BTC-EUR      |1970-01-04 13:53:57.645339|buy     |85723290    |BTC-EUR       |82032.29 |2024-11-12 18:03:48.972|
|heartbeat|69544454551|566088765       |ETH-USD      |1970-01-04 13:53:57.645339|buy     |566088765   |ETH-USD       |3239.37  |2024-11-12 18:03:52.308|
|heartbeat|32407639179|85723291        |BTC-EUR      |1970-01-04 13:53:57.645339|buy     |85723291    |BTC-EUR       |82050.98 |2024-11-12 18:03:53.403|
|heartbeat|32407639764|85723291        |BTC-EUR      |1970-01-04 13:53:57.645339|buy     |85723291    |BTC-EUR       |82050.98 |2024-11-12 18:03:53.403|
|heartbeat|7564276673 |34461611        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461611    |ETH-BTC       |0.03712  |2024-11-12 18:03:45.949|
|heartbeat|7564276722 |34461611        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461611    |ETH-BTC       |0.03712  |2024-11-12 18:03:45.949|
|heartbeat|7564276785 |34461611        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461611    |ETH-BTC       |0.03712  |2024-11-12 18:03:45.949|
|heartbeat|7564276831 |34461611        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461611    |ETH-BTC       |0.03712  |2024-11-12 18:03:45.949|
|heartbeat|7564276837 |34461611        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461611    |ETH-BTC       |0.03712  |2024-11-12 18:03:45.949|
|heartbeat|7564276987 |34461611        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461611    |ETH-BTC       |0.03712  |2024-11-12 18:03:45.949|
|heartbeat|7564277190 |34461611        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461611    |ETH-BTC       |0.03712  |2024-11-12 18:03:45.949|
|heartbeat|7564276714 |34461611        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461611    |ETH-BTC       |0.03712  |2024-11-12 18:03:45.949|
+---------+-----------+----------------+-------------+--------------------------+--------+------------+--------------+---------+-----------------------+
only showing top 20 rows

-------------------------------------------                                     
Batch: 4
-------------------------------------------
+---------+-----------+----------------+-------------+--------------------------+--------+------------+--------------+---------+-----------------------+
|hr_type  |hr_sequence|hr_last_trade_id|hr_product_id|hr_time                   |buy_side|buy_trade_id|buy_product_id|buy_price|buy_time               |
+---------+-----------+----------------+-------------+--------------------------+--------+------------+--------------+---------+-----------------------+
|heartbeat|91250672996|715681383       |BTC-USD      |1970-01-04 13:53:57.645339|buy     |715681383   |BTC-USD       |87243.42 |2024-11-12 18:04:05.813|
|heartbeat|32407644593|85723308        |BTC-EUR      |1970-01-04 13:53:57.645339|buy     |85723308    |BTC-EUR       |81989.15 |2024-11-12 18:04:06.568|
|heartbeat|32407645296|85723308        |BTC-EUR      |1970-01-04 13:53:57.645339|buy     |85723308    |BTC-EUR       |81989.15 |2024-11-12 18:04:06.568|
|heartbeat|32407644883|85723308        |BTC-EUR      |1970-01-04 13:53:57.645339|buy     |85723308    |BTC-EUR       |81989.15 |2024-11-12 18:04:06.568|
|heartbeat|32407643713|85723306        |BTC-EUR      |1970-01-04 13:53:57.645339|buy     |85723306    |BTC-EUR       |82000.46 |2024-11-12 18:04:04.628|
|heartbeat|32407644173|85723306        |BTC-EUR      |1970-01-04 13:53:57.645339|buy     |85723306    |BTC-EUR       |82000.46 |2024-11-12 18:04:04.628|
|heartbeat|69544482852|566088914       |ETH-USD      |1970-01-04 13:53:57.645339|buy     |566088914   |ETH-USD       |3237.0   |2024-11-12 18:04:15.433|
|heartbeat|91250680902|715681464       |BTC-USD      |1970-01-04 13:53:57.645339|buy     |715681464   |BTC-USD       |87246.01 |2024-11-12 18:04:09.895|
|heartbeat|69544484504|566088920       |ETH-USD      |1970-01-04 13:53:57.645339|buy     |566088920   |ETH-USD       |3236.99  |2024-11-12 18:04:17.477|
|heartbeat|91250691417|715681556       |BTC-USD      |1970-01-04 13:53:57.645339|buy     |715681556   |BTC-USD       |87221.8  |2024-11-12 18:04:15.792|
|heartbeat|32407645609|85723309        |BTC-EUR      |1970-01-04 13:53:57.645339|buy     |85723309    |BTC-EUR       |82020.94 |2024-11-12 18:04:10.1  |
|heartbeat|7564277242 |34461611        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461611    |ETH-BTC       |0.03712  |2024-11-12 18:03:45.949|
|heartbeat|7564277358 |34461611        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461611    |ETH-BTC       |0.03712  |2024-11-12 18:03:45.949|
|heartbeat|7564277443 |34461611        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461611    |ETH-BTC       |0.03712  |2024-11-12 18:03:45.949|
|heartbeat|7564277633 |34461611        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461611    |ETH-BTC       |0.03712  |2024-11-12 18:03:45.949|
|heartbeat|7564277791 |34461611        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461611    |ETH-BTC       |0.03712  |2024-11-12 18:03:45.949|
|heartbeat|7564278051 |34461611        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461611    |ETH-BTC       |0.03712  |2024-11-12 18:03:45.949|
|heartbeat|7564278178 |34461611        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461611    |ETH-BTC       |0.03712  |2024-11-12 18:03:45.949|
|heartbeat|7564277263 |34461611        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461611    |ETH-BTC       |0.03712  |2024-11-12 18:03:45.949|
|heartbeat|7564277406 |34461611        |ETH-BTC      |1970-01-04 13:53:57.645339|buy     |34461611    |ETH-BTC       |0.03712  |2024-11-12 18:03:45.949|
+---------+-----------+----------------+-------------+--------------------------+--------+------------+--------------+---------+-----------------------+
only showing top 20 rows
