### Install the Maven package across all clusters in the available databricks workspace before running this notebook.

* `com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.17`

In [None]:
# Read Event Hub's stream
ehConnectionString = "<Event_Hub_Endpoint>"
 
ehConf = {}
ehConf["eventhubs.connectionString"] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(ehConnectionString)

read_df = (
  spark
    .readStream
    .format("eventhubs")
    .options(**ehConf)
    .load()
)

In [None]:
from pyspark.sql.types import *
import  pyspark.sql.functions as F

# defining the schema
read_schema = StructType([
  StructField("source", StringType(), True),
  StructField("id", StringType(), True),
  StructField("timestamp", TimestampType(), True),
  StructField("Export_Unit", DoubleType(), True),
  StructField("battery_current", DoubleType(), True),
  StructField("solar_KWH", DoubleType(), True),
  StructField("Grid_VOLTAGE", DoubleType(), True),
  StructField("turbine_RPM", DoubleType(), True)]
)
#capturating in dataframe
decoded_df = read_df.select(F.from_json(F.col("body").cast("string"), read_schema).alias("payload"))

# writing the stream content to parquet
query = (
  decoded_df
  #  .coalesce(100)
    .writeStream
    .format("parquet")
#    .format("json")
#    .option("format", "append")
    .option("path","/evh_stream/data")
    .option("checkpointLocation", "/streaming_checkpoint_path")
    .outputMode("append")
    .start()
)


In [None]:
spark.sql("set spark.sql.streaming.schemaInference=true")
df = sqlContext.read.parquet("/evh_stream/data/").select("payload.source","payload.id","payload.timestamp",
                                                                     "payload.turbine_RPM","payload.Grid_VOLTAGE","payload.solar_KWH")

display(df)

source,id,timestamp,turbine_RPM,Grid_VOLTAGE,solar_KWH
wind-turbine-geo-sensor,tst_sensor,2022-12-30T10:03:06.320+0000,483.0,762.0,15.0
wind-turbine-geo-sensor,tst_sensor,2022-12-30T10:03:06.358+0000,457.0,476.0,15.0
wind-turbine-geo-sensor,tst_sensor,2022-12-30T10:03:06.395+0000,456.0,583.0,16.0
wind-turbine-geo-sensor,tst_sensor,2022-12-30T10:03:06.432+0000,456.0,334.0,10.0
wind-turbine-geo-sensor,tst_sensor,2022-12-30T10:03:06.469+0000,400.0,637.0,12.0
wind-turbine-geo-sensor,tst_sensor,2022-12-30T10:03:06.508+0000,419.0,749.0,17.0
wind-turbine-geo-sensor,tst_sensor,2022-12-30T10:03:06.545+0000,429.0,522.0,18.0
wind-turbine-geo-sensor,tst_sensor,2022-12-30T10:03:06.580+0000,494.0,602.0,20.0
wind-turbine-geo-sensor,tst_sensor,2022-12-30T10:03:06.630+0000,407.0,640.0,10.0
wind-turbine-geo-sensor,tst_sensor,2022-12-30T10:03:06.669+0000,481.0,735.0,16.0


In [None]:
df.createOrReplaceTempView("tmpStreamTable")
spark.sql("create table WindTurbine as select * from tmpStreamTable")

Out[9]: DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [None]:
%sql

SELECT * FROM WindTurbine

source,id,timestamp,turbine_RPM,Grid_VOLTAGE,solar_KWH
wind-turbine-geo-sensor,tst_sensor,2022-12-30T10:05:16.796+0000,457.0,687.0,19.0
wind-turbine-geo-sensor,tst_sensor,2022-12-30T10:05:16.833+0000,494.0,493.0,13.0
wind-turbine-geo-sensor,tst_sensor,2022-12-30T10:05:16.871+0000,468.0,365.0,18.0
wind-turbine-geo-sensor,tst_sensor,2022-12-30T10:05:16.909+0000,499.0,702.0,16.0
wind-turbine-geo-sensor,tst_sensor,2022-12-30T10:05:16.992+0000,488.0,486.0,14.0
wind-turbine-geo-sensor,tst_sensor,2022-12-30T10:05:17.029+0000,409.0,685.0,19.0
wind-turbine-geo-sensor,tst_sensor,2022-12-30T10:05:17.066+0000,424.0,621.0,18.0
wind-turbine-geo-sensor,tst_sensor,2022-12-30T10:05:17.106+0000,416.0,430.0,16.0
wind-turbine-geo-sensor,tst_sensor,2022-12-30T10:05:17.144+0000,449.0,675.0,19.0
wind-turbine-geo-sensor,tst_sensor,2022-12-30T10:05:17.181+0000,402.0,432.0,17.0
