## Test Structured Streaming

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

### 1. check can get waveforms from kafka

In [1]:
import streamlit as st
from kafka import KafkaConsumer
from json import loads

consumer = KafkaConsumer(
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',
    enable_auto_commit=True,
    key_deserializer=lambda x: loads(x.decode('utf-8')),
    value_deserializer=lambda x: loads(x.decode('utf-8'))
)

consumer.subscribe(['waveform_raw', 'phasenet_picks', 'gmma_events'])

2021-03-11 14:53:49.070 INFO    kafka.conn: <BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
2021-03-11 14:53:49.070 INFO    kafka.conn: Probing node bootstrap-0 broker version
2021-03-11 14:53:49.072 INFO    kafka.conn: <BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Connection complete.
2021-03-11 14:53:49.181 INFO    kafka.conn: Broker version identified as 2.5.0
2021-03-11 14:53:49.182 INFO    kafka.conn: Set configuration api_version=(2, 5, 0) to skip auto check_version requests on startup
2021-03-11 14:53:49.184 INFO    kafka.consumer.subscription_state: Updating subscribed topics to: ['waveform_raw', 'phasenet_picks', 'gmma_events']


In [2]:
for message in consumer:
    print(message)
    break

2021-03-11 14:53:49.248 INFO    kafka.consumer.subscription_state: Updated partition assignment: [TopicPartition(topic='waveform_raw', partition=0), TopicPartition(topic='gmma_events', partition=0), TopicPartition(topic='phasenet_picks', partition=0)]
2021-03-11 14:53:49.251 INFO    kafka.conn: <BrokerConnection node_id=0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: connecting to localhost:9092 [('::1', 9092, 0, 0) IPv6]
2021-03-11 14:53:49.356 INFO    kafka.conn: <BrokerConnection node_id=0 host=localhost:9092 <connecting> [IPv6 ('::1', 9092, 0, 0)]>: Connection complete.
2021-03-11 14:53:49.357 INFO    kafka.conn: <BrokerConnection node_id=bootstrap-0 host=localhost:9092 <connected> [IPv6 ('::1', 9092, 0, 0)]>: Closing connection. 


ConsumerRecord(topic='waveform_raw', partition=0, offset=0, timestamp=1615499060404, timestamp_type=0, key='CI.BOM..HH', value=['2020-10-01T00:00:00.003', [[0.0, 0.0, 0.0], [-1.2703502534350264e-06, 3.647651283245068e-06, 1.2030681091346196e-06], [-1.3712733561987989e-06, 3.364105168657261e-06, 1.2078739928256255e-06], [-1.6468095509480918e-06, 3.0341025194502436e-06, 1.3216127854320803e-06], [-1.8919087096946896e-06, 2.452592980262125e-06, 1.4641867664977326e-06], [-2.0569100342981983e-06, 1.8838989035430131e-06, 1.7204995401698397e-06], [-2.189872247981839e-06, 1.3312244391272543e-06, 1.8614715600051568e-06], [-2.44137913796294e-06, 8.650556537759257e-07, 1.6996741578623187e-06], [-2.7601681722444482e-06, 5.863154797225434e-07, 1.533070758341637e-06], [-2.9716263725276804e-06, 2.5631277367210714e-07, 1.6468095509480918e-06], [-3.0421122119150823e-06, -1.0252511373209927e-07, 2.0360846519906772e-06], [-3.1061904337548185e-06, -4.6777083184679213e-07, 2.3388540739688324e-06], [-3.23915

### 2. Structured Streaming

In [3]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.1 pyspark-shell'

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession \
    .builder \
    .appName("spark") \
    .getOrCreate()

In [5]:
df = spark \
     .readStream \
     .format("kafka") \
     .option("kafka.bootstrap.servers", "localhost:9092") \
     .option("subscribe", "waveform_raw") \
     .load()
# df = df.selectExpr("CAST(value AS STRING)")

In [11]:
data = df \
       .writeStream \
       .queryName("waveform1")\
       .format("memory")\
       .start()

In [12]:
data_ = spark.sql("select * from waveform1")
data_.show()
data_.printSchema()

+--------------------+--------------------+------------+---------+------+--------------------+-------------+
|                 key|               value|       topic|partition|offset|           timestamp|timestampType|
+--------------------+--------------------+------------+---------+------+--------------------+-------------+
|[22 43 49 2E 42 4...|[5B 22 32 30 32 3...|waveform_raw|        0| 59232|2021-03-11 14:54:...|            0|
|[22 43 49 2E 43 4...|[5B 22 32 30 32 3...|waveform_raw|        0| 59233|2021-03-11 14:54:...|            0|
+--------------------+--------------------+------------+---------+------+--------------------+-------------+

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [13]:
rdd = data_.rdd.map(lambda x: (x["value"]))
rdd.collect()

[bytearray(b'["2020-10-01T00:01:41.003", [[1.4577789215763914e-06, -1.9479771253827494e-06, 3.7373606573964935e-06], [1.4802062651142478e-06, -2.1642410956701497e-06, 4.040130079374649e-06], [1.6724409306334564e-06, -2.385310835961718e-06, 3.6604669730877504e-06], [1.8021992218564264e-06, -2.369291223658365e-06, 2.9860439099138603e-06], [1.928753590618726e-06, -2.3612815311935265e-06, 2.726527100094245e-06], [2.1145804112165933e-06, -2.358077608732856e-06, 2.848275698852376e-06], [2.044094344455516e-06, -2.170648940591491e-06, 3.160656888212543e-06], [1.914336053232546e-06, -1.8790930198520073e-06, 3.437795157879009e-06], [1.9063262470808695e-06, -1.6035568251027144e-06, 3.296823024356854e-06], [1.9800161226157798e-06, -1.4113222732703434e-06, 2.880314923459082e-06], [2.108172566295252e-06, -1.303991211898392e-06, 2.6800705654750345e-06], [2.1354057935241144e-06, -1.2319032975938171e-06, 2.8082267817808315e-06], [2.132201871063444e-06, -1.3152049405107391e-06, 3.2599782571196556e-06], 

In [14]:
df.timestamp

Column<'timestamp'>

In [22]:
query = df \
       .writeStream \
       .outputMode("update") \
       .format("console")\
       .trigger(continuous='1 second')\
       .start()

query.awaitTermination()

Py4JJavaError: An error occurred while calling o128.start.
: java.lang.AssertionError: assertion failed
	at scala.Predef$.assert(Predef.scala:208)
	at org.apache.spark.sql.execution.streaming.sources.ConsoleWrite.<init>(ConsoleWrite.scala:38)
	at org.apache.spark.sql.execution.streaming.ConsoleTable$$anon$1.buildForStreaming(console.scala:84)
	at org.apache.spark.sql.execution.streaming.StreamExecution.createStreamingWrite(StreamExecution.scala:634)
	at org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution.<init>(ContinuousExecution.scala:93)
	at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:302)
	at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:359)
	at org.apache.spark.sql.streaming.DataStreamWriter.startQuery(DataStreamWriter.scala:466)
	at org.apache.spark.sql.streaming.DataStreamWriter.startInternal(DataStreamWriter.scala:456)
	at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:301)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
