### 1. Initialize spark

- Run this cell to initialize spark session and install required packages for communication with EventHub. 
- This cell needs to executed once in the beginning to establish the SparkSession, you can later run other cells without re-executing this one.

In [None]:
import logging
logging.getLogger("py4j").setLevel(logging.CRITICAL)
import pyspark
from pyspark.sql import SparkSession
import os
from pyspark.sql.functions import from_json, col

scala_version = '2.12' 

print("Spark version", pyspark.__version__)

packages = [
    f'com.microsoft.azure:azure-eventhubs-spark_{scala_version}:2.3.18'
]

args = os.environ.get('PYSPARK_SUBMIT_ARGS', '')
if not args:
    args = f'--packages {",".join(packages)}'
    print('Using packages', packages)
    os.environ['PYSPARK_SUBMIT_ARGS'] = f'{args} pyspark-shell'
else:
    print(f'Found existing args: {args}') 

spark = SparkSession.builder\
   .master("local")\
   .appName("eventhub-example")\
   .getOrCreate()

spark.conf.set("spark.sql.shuffle.partitions", 1)
spark.conf.set("spark.sql.session.timeZone", "Europe/Zagreb")
spark.conf.set("spark.sql.streaming.checkpointLocation", "/tmp/pmf/lab3")

### 2. Set Eventhub parameters & model of the incoming telemetry messages

Eventhub configuration and JSON structure of the telemetry messages

In [2]:
from pyspark.sql.types import StringType, StructField, StructType, DoubleType, TimestampType

# Configure the connection string for your Eventhub
EVENTHUB_CONNECTION_STRING = ""

# Define Eventhub parameters configurations
eventhubs_reader_conf = {
    "eventhubs.connectionString" : spark._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(f"{EVENTHUB_CONNECTION_STRING};EntityPath=telemetry"),
}
eventhubs_publisher_conf = {
    "eventhubs.connectionString" :  spark._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(f"{EVENTHUB_CONNECTION_STRING};EntityPath=aggregates"),
}
# JSON schema of incoming telemetry message
json_schema = StructType([ \
    StructField('timestamp', TimestampType(), True), \
    StructField('vehicle_id', StringType(), True), \
    StructField('engine_speed', DoubleType(), True), \
    StructField('vehicle_speed', DoubleType(), True), \
    StructField('odometer', DoubleType(), True), \
    StructField('latitude', DoubleType(), True), \
    StructField('longitude', DoubleType(), True), \
])

### 3. Example: Reading from Eventhub and displaying telemetry messages in the console output

- check your console output where you started the docker command to create jupyter container, you should see an empty table output
- now it's time to start your publisher.py - the messages will start to appear in the console output as they are read by your notebook script
- output format displayed in the console is as follows:  **[timestamp, vehicle_id, engine_speed, vehicle_speed, odometer, latitude, longitude]**

Stop the script by stopping the execution of this cell (Stop button in the toolbar)
- When stopping the cell with stream query KeyboardInterrupt Error log is shown, but that is expected due to the internal logic of how jupyter works with Spark Structured Streaming
- In case you mistakenly don't stop the cell with active stream/query and run a new one, please restart the jupyter kernel to avoid painfull debugging :)

In [None]:
def start_and_await_termination(query):
    try:
        query.awaitTermination()
    except KeyboardInterrupt:
        query.stop()

# Read from Eventhub stream
df = spark.readStream.format("eventhubs").options(**eventhubs_reader_conf).load()

# Decode the binary messages contained in "value" column
df = df.withColumn("body", col("body").cast("string"))

# Parse the JSON structure using the schema
df = df.select(from_json(col("body").cast("string"), json_schema).alias("body")).select("body.*")

# Output messages to console output (where the docker is running)
query = df.writeStream.outputMode("append").format("console").option("truncate", False).start()

start_and_await_termination(query)

### 4. Solving the task

In [3]:
from pyspark.sql.functions import window, lit, to_json, struct, max as pyspark_max, min as pyspark_min, mean as pyspark_mean

def start_and_await_termination(query):
    try:
        query.awaitTermination()
    except KeyboardInterrupt:
        query.stop()

def aggregate_data(df, aggregation_function, aggregation_name):
    return df.withWatermark("timestamp", "5 seconds") \
             .groupby("vehicle_id", window("timestamp", "5 seconds")) \
             .agg(aggregation_function("vehicle_speed").alias("value")) \
             .select(
                 "vehicle_id",
                 lit(aggregation_name).alias("type"),
                 lit("vehicle_speed").alias("channel"),
                 col("value"),
                 col("window.start").alias("window_start"),
                 col("window.end").alias("window_end")
             )


df = spark.readStream.format("eventhubs").options(**eventhubs_reader_conf).load()
df = df.withColumn("body", col("body").cast("string"))
df = df.select(from_json(col("body").cast("string"), json_schema).alias("body")).select("body.*")

output_max = aggregate_data(df, pyspark_max, "max")
output_min = aggregate_data(df, pyspark_min, "min")
output_avg = aggregate_data(df, pyspark_mean, "avg")

output = output_max.union(output_min).union(output_avg)
output_json = output.select(to_json(struct("*")).alias("body"))

In [None]:
query = output_json.writeStream \
    .format("eventhubs") \
    .options(**eventhubs_publisher_conf) \
    .start()

start_and_await_termination(query)