In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql.functions  import from_unixtime
from pyspark.sql.functions  import to_date
from pyspark.sql import Row
from pyspark.sql.functions import to_json, struct
from pyspark.sql import functions as F

In [None]:
#Storage account and key you will get it from the portal as shown in the Cookbook Recipe.We are mounting Blob storage account and not Gen-2.
storageAccount="cookbookblobstorage1"
storageKey ="xxx-xxxxx-xxxxxx"
mountpoint = "/mnt/Blob"
storageEndpoint =   "wasbs://rawdata@{}.blob.core.windows.net".format(storageAccount)
storageConnSting = "fs.azure.account.key.{}.blob.core.windows.net".format(storageAccount)

try:
  dbutils.fs.mount(
  source = storageEndpoint,
  mount_point = mountpoint,
  extra_configs = {storageConnSting:storageKey})
except:
    print("Already mounted...."+mountpoint)

In [None]:
# dbutils.fs.unmount("/mnt/Blob")

In [None]:
%fs ls /mnt/Blob

path,name,size
dbfs:/mnt/Blob/Customer/,Customer/,0
dbfs:/mnt/Blob/CustomerDelta/,CustomerDelta/,0
dbfs:/mnt/Blob/Orders/,Orders/,0
dbfs:/mnt/Blob/Vehicle_Agg/,Vehicle_Agg/,0
dbfs:/mnt/Blob/Vehicle_Chkpoint/,Vehicle_Chkpoint/,0
dbfs:/mnt/Blob/VehiclechkpointKafkaEventHub_Delta_Agg_Delta11/,VehiclechkpointKafkaEventHub_Delta_Agg_Delta11/,0
dbfs:/mnt/Blob/VehiclechkpointKafkaEventHub_Delta_Agg_Parquet/,VehiclechkpointKafkaEventHub_Delta_Agg_Parquet/,0
dbfs:/mnt/Blob/VehiclechkpointKafkaEventHub_Delta_Agg_Parquet1/,VehiclechkpointKafkaEventHub_Delta_Agg_Parquet1/,0
dbfs:/mnt/Blob/VehiclechkpointKafkaEventHub_Delta_Agg_Parquet2/,VehiclechkpointKafkaEventHub_Delta_Agg_Parquet2/,0
dbfs:/mnt/Blob/VehiclechkpointKafkaEventHub_Delta_Agg_Tumbling/,VehiclechkpointKafkaEventHub_Delta_Agg_Tumbling/,0


In [None]:
#Creating the schema for the vehicle data json structure
jsonschema = StructType() \
.add("id", StringType()) \
.add("timestamp", TimestampType()) \
.add("rpm", IntegerType()) \
.add("speed", IntegerType()) \
.add("kms", IntegerType()) 

In [None]:
#Reading from EventHub "kafkaenabledhub2"
TOPIC = "kafkaenabledhub2"
BOOTSTRAP_SERVERS = "kafkaenabledeventhubns.servicebus.windows.net:9093"
EH_SASL = "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"Endpoint=sb://kafkaenabledeventhubns.servicebus.windows.net/;SharedAccessKeyName=sendreceivekafka;SharedAccessKey=zzzzzzzzzzz\";"
GROUP_ID = "$Default"

# // Read stream using Spark SQL (structured streaming)
# // consider adding .option("startingOffsets", "earliest") to read earliest available offset during testing
kafkaDF = spark.readStream \
    .format("kafka") \
    .option("subscribe", TOPIC) \
    .option("kafka.bootstrap.servers", BOOTSTRAP_SERVERS) \
    .option("kafka.sasl.mechanism", "PLAIN") \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.jaas.config", EH_SASL) \
    .option("kafka.request.timeout.ms", "60000") \
    .option("kafka.session.timeout.ms", "60000") \
    .option("kafka.group.id", GROUP_ID) \
    .option("failOnDataLoss", "false") \
    .option("startingOffsets", "latest") \
    .load()




In [None]:
#Converting binary datatype to string for the dataframe columns. Without this you cannot use from_json function as it expects the column datatype as string not binary
kafkaDF=kafkaDF.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [None]:
#Adding new column vehiclejson which is a struct and has 5 columns id, timestamp,rpm,speed and kms
newkafkaDF=kafkaDF.withColumn('vehiclejson', from_json(col('value'),schema=jsonschema))
kafkajsonDF=newkafkaDF.select("key","value", "vehiclejson.*")

In [None]:
#you can uncomment and run the below command to view the column values
# display(kafkajsonDF)

### Tumbling window non-overlapping event time

In [None]:
kafkajsonDF.withWatermark("timestamp","4 minutes").groupBy(window('timestamp',"1 minutes"),'id').count().coalesce(1) \
.writeStream.format("delta") \
.outputMode("complete") \
.option("truncate", "false") \
.option("checkpointLocation", "/mnt/Blob/Vehicle_Chkpoint1/") \
.start("/mnt/Blob/Vehicle_Agg") 

In [None]:
%sql
DROP TABLE IF  EXISTS Vehicle_Agg

In [None]:
%sql
CREATE TABLE IF NOT EXISTS Vehicle_Agg
USING delta
LOCATION "/mnt/Blob/Vehicle_Agg/"

In [None]:
%fs ls /mnt/Blob/Vehicle_Agg/

path,name,size
dbfs:/mnt/Blob/Vehicle_Agg/_delta_log/,_delta_log/,0
dbfs:/mnt/Blob/Vehicle_Agg/part-00000-1241eaac-07fb-4745-b360-00125346290d-c000.snappy.parquet,part-00000-1241eaac-07fb-4745-b360-00125346290d-c000.snappy.parquet,711
dbfs:/mnt/Blob/Vehicle_Agg/part-00000-68cb2a26-aafb-4632-8ca1-6c47b70c6ce8-c000.snappy.parquet,part-00000-68cb2a26-aafb-4632-8ca1-6c47b70c6ce8-c000.snappy.parquet,1951
dbfs:/mnt/Blob/Vehicle_Agg/part-00000-723c3911-6518-460e-a5cf-2f7a7a13b000-c000.snappy.parquet,part-00000-723c3911-6518-460e-a5cf-2f7a7a13b000-c000.snappy.parquet,1904
dbfs:/mnt/Blob/Vehicle_Agg/part-00000-9f4e9b5c-c69c-4748-8005-ce7277b8d165-c000.snappy.parquet,part-00000-9f4e9b5c-c69c-4748-8005-ce7277b8d165-c000.snappy.parquet,1909
dbfs:/mnt/Blob/Vehicle_Agg/part-00000-c84510a1-7814-44c7-b77f-c5243d454c74-c000.snappy.parquet,part-00000-c84510a1-7814-44c7-b77f-c5243d454c74-c000.snappy.parquet,1955


In [None]:
%sql
-- truncate table Delta_Agg_Tumbling
SELECT * FROM Vehicle_Agg ORDER BY Window desc

window,id,count
"List(2021-07-30T03:59:00.000+0000, 2021-07-30T04:00:00.000+0000)",41c904bd-8da0-4749-8318-82228dbee9dc,6
"List(2021-07-30T03:59:00.000+0000, 2021-07-30T04:00:00.000+0000)",4904e925-77aa-486f-91ef-db750149e330,6
"List(2021-07-30T03:59:00.000+0000, 2021-07-30T04:00:00.000+0000)",b2b1d387-8604-4ddf-9b78-4add10b8ef8c,6
"List(2021-07-30T03:59:00.000+0000, 2021-07-30T04:00:00.000+0000)",1e7c0abb-44fc-49a4-acbb-b8742dcace83,6
"List(2021-07-30T03:59:00.000+0000, 2021-07-30T04:00:00.000+0000)",764583c2-63ef-4abb-9904-48d0ddb8188a,6
"List(2021-07-30T03:59:00.000+0000, 2021-07-30T04:00:00.000+0000)",6f6ea1e1-22ed-45e0-a63b-e84d3f9bf87a,6
"List(2021-07-30T03:59:00.000+0000, 2021-07-30T04:00:00.000+0000)",f248e307-7fb3-4a56-8a43-7348723f06a0,7
"List(2021-07-30T03:59:00.000+0000, 2021-07-30T04:00:00.000+0000)",7a6a8afc-7dff-4067-a831-46c393c3dbde,6
"List(2021-07-30T03:59:00.000+0000, 2021-07-30T04:00:00.000+0000)",8f8cf9f4-d290-49b6-bd53-c82511d0d105,6
"List(2021-07-30T03:59:00.000+0000, 2021-07-30T04:00:00.000+0000)",c5ca380f-694c-4fd4-a696-686a72dfef64,6


### Overlapping windows time with sliding time

In [None]:
kafkajsonDF.groupBy(window('timestamp',"2 minutes","1 minutes"),'id').count().orderBy('window') \
.writeStream.format("delta") \
.outputMode("complete") \
.option("truncate", "false") \
.option("checkpointLocation", "dbfs:/Vehiclechkpointkafkaeventhub_Agg_Chkpoint_Overlapping5/") \
.option("mergeSchema", "true") \
.start("dbfs:/VehiclechkpointKafkaEventHub_Delta_Agg_Overlapping5") 


In [None]:
%sql
-- Creating the table on delta location
drop  table if exists VehiclechkpointKafkaEventHub_Delta_Agg_Overlapping;
CREATE TABLE IF NOT EXISTS VehiclechkpointKafkaEventHub_Delta_Agg_Overlapping
USING DELTA
LOCATION "dbfs:/VehiclechkpointKafkaEventHub_Delta_Agg_Overlapping5/"

In [None]:
%sql
select * from VehiclechkpointKafkaEventHub_Delta_Agg_Overlapping
-- where id ='7e0d39fd-7251-483c-92d6-7d6bb5cc164e' 
order by window desc

window,id,count
"List(2021-07-30T03:08:00.000+0000, 2021-07-30T03:10:00.000+0000)",c2c7cb35-2f97-4fab-ab23-62fe24eca7af,2
"List(2021-07-30T03:07:00.000+0000, 2021-07-30T03:09:00.000+0000)",c2c7cb35-2f97-4fab-ab23-62fe24eca7af,14
"List(2021-07-30T03:07:00.000+0000, 2021-07-30T03:09:00.000+0000)",544f5760-81bf-478c-b78a-f3cbba753d37,12
"List(2021-07-30T03:07:00.000+0000, 2021-07-30T03:09:00.000+0000)",ff9bff56-b8be-4668-bb81-acc68da68c50,12
"List(2021-07-30T03:07:00.000+0000, 2021-07-30T03:09:00.000+0000)",0efe5ed2-7fc8-4544-b6eb-3016b2faa436,12
"List(2021-07-30T03:07:00.000+0000, 2021-07-30T03:09:00.000+0000)",469cc591-fdc7-4f46-a721-a1a80554c359,12
"List(2021-07-30T03:07:00.000+0000, 2021-07-30T03:09:00.000+0000)",fc806137-2796-426b-be35-b5e76db9af16,12
"List(2021-07-30T03:07:00.000+0000, 2021-07-30T03:09:00.000+0000)",14b522d2-e33d-4dd5-a8fe-9a7e637f1cc9,14
"List(2021-07-30T03:07:00.000+0000, 2021-07-30T03:09:00.000+0000)",c872c0b8-4880-48e3-a981-a9430ed2effb,12
"List(2021-07-30T03:07:00.000+0000, 2021-07-30T03:09:00.000+0000)",96d491ff-11ef-4518-82b8-59c0ce521595,12
