In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

# Preparing environment

In [0]:
%sql
Use catalog learn_adb_fikrat;
create schema if not exists bronze;
create schema if not exists silver;
Use bronze;

In [0]:
%sql
-- DROP TABLE IF EXISTS bronze.iot_measurements;
-- DROP TABLE IF EXISTS weather;
-- DROP TABLE IF EXISTS office;
-- DROP TABLE IF EXISTS iot_pivoted;
-- DROP TABLE IF EXISTS silver.iot_measurements;

In [0]:
root_data_folder='/Volumes/learn_adb_fikrat/bronze/landing'

checkpoint_root_path = "/Volumes/learn_adb_fikrat/bronze/ext_landing_volume/streaming-checkpoints"

checkpoint_path_sensor=f'{checkpoint_root_path}/iot_measurements'
checkpoint_path_sensor1=f'{checkpoint_root_path}/iot_measurements1'
checkpoint_path_sensor2=f'{checkpoint_root_path}/iot_measurements2'
checkpoint_path_sensor3=f'{checkpoint_root_path}/iot_measurements3'
checkpoint_path_sensor4=f'{checkpoint_root_path}/iot_measurements4'
checkpoint_path_sensor5=f'{checkpoint_root_path}/iot_measurements5'
checkpoint_path_sensor6=f'{checkpoint_root_path}/iot_measurements6'
checkpoint_path_sensor7=f'{checkpoint_root_path}/iot_measurements7'
checkpoint_path_sensor8=f'{checkpoint_root_path}/iot_measurements8'
checkpoint_path_weather=f'{checkpoint_root_path}/weather'


In [0]:
dbutils.fs.rm(checkpoint_path_weather, True)
dbutils.fs.rm(checkpoint_path_sensor, True)
dbutils.fs.rm(checkpoint_path_sensor1, True)
dbutils.fs.rm(checkpoint_path_sensor2, True)
dbutils.fs.rm(checkpoint_path_sensor3, True)
dbutils.fs.rm(checkpoint_path_sensor4, True)
dbutils.fs.rm(checkpoint_path_sensor5, True)
dbutils.fs.rm(checkpoint_path_sensor6, True)
dbutils.fs.rm(checkpoint_path_sensor7, True)
dbutils.fs.rm(checkpoint_path_sensor8, True)



In [0]:
spark.read.format('csv').option('header','true').load(f'{root_data_folder}/office')\
  .write.format('delta').mode('overwrite').saveAsTable('bronze.office')


#Basic streaming operations: read and write

### Basic read/write

In [0]:
iot_schema =StructType([StructField('EventTime', StringType(), True), 
                        StructField('Measurements', 
                            StructType(
                                [StructField('MeasurementType', StringType(), True), 
                                StructField('MeasurementValue', DoubleType(), True), 
                                StructField('Sensor', StringType(), True)]), 
                        True), 
                        StructField('Office', StringType(), True)])
weather_schema =StructType([StructField('City', StringType(), True), 
                            StructField('EventTime', StringType(), True), 
                            StructField('Temperature', DoubleType(), True)])

In [0]:
dfSensor = spark.readStream.format('json') \
    .schema(iot_schema) \
    .load(f'{root_data_folder}/sensor')\
    .withColumn('IngestionTimestamp', F.current_timestamp())\
    .withColumn('SourceFileName', F.input_file_name() )    

# display(dfSensor)

In [0]:
strms=dfSensor.writeStream.format("delta")\
    .queryName('iot_measurements')\
    .option('checkpointLocation', checkpoint_path_sensor)\
    .toTable('bronze.iot_measurements')

In [0]:
%sql
select  * from bronze.iot_measurements 

### Table- to-table streams
(Source table should be append only)

In [0]:
dfstt=spark.readStream.table('iot_measurements')\
    .selectExpr('Measurements.Sensor as Sensor',
           'Measurements.MeasurementType  as MeasurementType',
           'cast(Measurements.MeasurementValue as float) as MeasurementValue',
           'cast(EventTime as timestamp) as EventTime',
           'Office','IngestionTimestamp')\
    .writeStream.format("delta")\
    .queryName('iot_measurements_silver')\
    .option('checkpointLocation', checkpoint_path_sensor5)\
    .outputMode('append')\
    .toTable('silver.iot_measurements')    


In [0]:
%sql
select * from silver.iot_measurements

## Managing the stream

### Managing execution flow

In [0]:
dfstt.stop()

In [0]:
# strms.stop()

for strm in spark.streams.active:
    # print(strm.name)
    strm.stop()

In [0]:
dfstt.start()

In [0]:
# strms.awaitTermination()

### Trigger options

In [0]:
dfWeather=spark.readStream.format('json') \
    .schema(weather_schema) \
    .load(f'{root_data_folder}/weather')\
    .withColumn('IngestionTimestamp', F.current_timestamp())\
    .withColumn('SourceFileName', F.input_file_name() )    

In [0]:
strmw=dfWeather.writeStream.format("delta")\
    .trigger(processingTime='10 seconds')\
    .queryName('weather')\
    .option('checkpointLocation', checkpoint_path_weather)\
    .toTable('weather')

In [0]:
strms=dfSensor.writeStream.format("delta")\
    .option('checkpointLocation', checkpoint_path_sensor2)\
    .trigger(availableNow=True)\
    .queryName('iot_measurements_once')\
    .toTable('bronze.iot_measurements_once')

### Source options
- maxFilesPerTrigger
- latestFirst
(See https://spark.apache.org/docs/3.5.1/structured-streaming-programming-guide.html#input-sources for more details)

In [0]:
dfStrm = spark.readStream.format('json') \
    .schema(iot_schema) \
    .option('maxFilesPerTrigger', 5)\
    .option('latestFirst',True)\
    .load(f'{root_data_folder}/sensor')\
    .withColumn('IngestionTimestamp', F.current_timestamp())\
    .withColumn('SourceFileName', F.input_file_name())
display(dfStrm)        



In [0]:

#Archiving option didn't work on UC volumes
dfStrm = spark.readStream.format('json') \
    .schema(iot_schema) \
    .option("spark.sql.streaming.fileSource.log.compactInterval","1")\
    .option("spark.sql.streaming.fileSource.log.cleanupDelay","1")\
    .option('cleanSource', 'archive') \
    .option('sourceArchiveDir', root_archive_folder)\
    .load(f'{root_data_folder}/sensor')\
    .withColumn('IngestionTimestamp', F.current_timestamp())\
    .withColumn('SourceFileName', F.input_file_name())\
    .writeStream.format("delta")\
    .option('checkpointLocation', checkpoint_path_sensor2)\
    .queryName('iot_measurements2')\
    .toTable('bronze.iot_measurements_archive')       


In [0]:
dbutils.fs.ls(root_archive_folder)

### Sink options
Output options: Append,update complete

In [0]:
dfSensorSilver=spark.readStream.table('silver.iot_measurements')

In [0]:
dfSensorSilver.writeStream.format("memory")\
    .outputMode("update")\
    .option('checkpointLocation', checkpoint_path_sensor3)\
    .queryName('iot_measurements_mem')\
    .start()
    

In [0]:
%sql
select * from iot_measurements_mem

In [0]:
def fn_batch_processing(df,batchid):
    df.groupBy('Office','Sensor')\
        .pivot('MeasurementType',['temperature','humidity'])\
        .sum('MeasurementValue')\
        .write.mode('overwrite')\
        .saveAsTable('iot_pivoted')
    pass    

dfSensorSilver.writeStream\
    .option('checkpointLocation', checkpoint_path_sensor4)\
    .queryName('iot_measurements4')\
    .foreachBatch(fn_batch_processing)\
    .start()


In [0]:
%sql
select * from iot_pivoted

In [0]:
def persist_to_file(row):
    file_path=f'{root_data_folder}/iot_agg/IoT_{row.Office}_{row.window.start}.txt'
    with open(file_path, 'w') as f:
        f.write(str(row))
    pass

dfSensorSilver.withWatermark('EventTime','10 seconds')\
    .groupBy('Office',F.window('EventTime','1 hours'))\
    .agg(F.sum('MeasurementValue').alias('TotalValue'))\
    .writeStream.foreach(persist_to_file).start()

# Transformations

In [0]:
%sql
select  * from weather 

## Stateless transformations

In [0]:
dfb=spark.readStream.table('bronze.iot_measurements')

In [0]:
dfso=dfb.selectExpr('cast(EventTime as Timestamp) as EventTime','Measurements.*','Office')\
   .filter('Office="Office 1"')
display(dfso)  

In [0]:
dfso=dfs.dropDuplicates('EventTime','Office','Sensor')

## Stateful transformations

### Aggregations

In [0]:
dfSensorSilver = spark.readStream.table('silver.iot_measurements')

In [0]:
display(dfSensorSilver.groupBy().count())

In [0]:
dfso=dfSensorSilver.groupBy('Office','Sensor')\
    .agg(F.sum('MeasurementValue').alias('TotalValue'))
display(dfso)    

In [0]:
dfso=dfSensorSilver.groupBy('Office','Sensor',F.window('EventTime','1 hours'))\
    .agg(F.sum('MeasurementValue').alias('TotalValue'))
dfso.writeStream.format('delta').option('checkpointLocation', checkpoint_path_sensor6)\
    .outputMode('complete')\
    .toTable('silver.timed_agg_iot_measurements')    
# display(dfso)    

In [0]:
%sql
select * from silver.timed_agg_iot_measurements

In [0]:
dfso=dfSensorSilver.withWatermark('EventTime','10 seconds')\
    .groupBy('Office','Sensor',F.window('EventTime','1 hours'))\
    .sum('MeasurementValue').alias('TotalValue')
display(dfso)    

### Arbitrary state management

In [0]:
from pyspark.sql.streaming.state import GroupState, GroupStateTimeout
from typing import Iterator, Tuple
import pandas as pd

In [0]:
dfSensorSilver = spark.readStream.option('rowsPerBatch', '10')\
  .table('silver.iot_measurements')
# display(dfSensorSilver)  

#### Basic state management 

In [0]:
def custtom_agg(pkey: Tuple, dfi: Iterator[pd.DataFrame], state: GroupState) -> Iterator[pd.DataFrame]:
      office_key,sensor_key=pkey
      new_count=old_count=0
      if state.exists:
         old_count =state.get
      for df in dfi:
         new_count+=((df['MeasurementType']=='temperature') & (df['MeasurementValue']>23)).sum()
      new_count+=old_count
      state.update((new_count,))
      yield pd.DataFrame({'office':[office_key],'sensor':[sensor_key],'count':[new_count]})

output_schema = "office STRING, sensor STRING,count LONG"
state_schema = "count LONG"
dfa=dfSensorSilver.groupBy('Office','Sensor')\
  .applyInPandasWithState(custtom_agg,output_schema, state_schema, "append", GroupStateTimeout.NoTimeout)

display(dfa)


In [0]:
def custtom_agg(pkey: Tuple, dfi: Iterator[pd.DataFrame], state: GroupState) -> Iterator[pd.DataFrame]:
      #Receive old keys
      office_key,sensor_key=pkey
      new_temp=new_count_delta=new_alert=old_count=old_count_delta=old_alert=0
      process_time=0
      if state.exists:
         old_process_time,old_count,old_count_delta,old_alert =state.get
         process_time=state.getCurrentProcessingTimeMs()
      for df in dfi:
         new_count_delta+=((df['MeasurementType']=='temperature') & 
                     (df['MeasurementValue']>23)).sum()
         
         if new_count_delta>old_count_delta:
            new_alert=1
      total_count=new_count_delta+old_count
      state.update((process_time,total_count,new_count_delta,new_alert))
      yield pd.DataFrame({'office':[office_key],'sensor':[sensor_key], 
        'process_time':[process_time],'count':[total_count],
        'count_delta':[new_count_delta],'alert':[new_alert]})


output_schema = "office STRING, sensor STRING,process_time LONG, count LONG,count_delta LONG,alert INT"
state_schema = "process_time LONG,count LONG, count_delta LONG,alert INT"
dfa=dfSensorSilver.groupBy('Office','Sensor')\
  .applyInPandasWithState(custtom_agg,output_schema, state_schema, "append", GroupStateTimeout.NoTimeout)

display(dfa.filter('Office="Office 1"'))


#### State management with timeouts

In [0]:

def custtom_agg(pkey: Tuple, dfi: Iterator[pd.DataFrame], state: GroupState) -> Iterator[pd.DataFrame]:
    if state.hasTimedOut:
        office_key, sensor_key = pkey
        old_state = state.get[0]
        state.remove()
        yield pd.DataFrame({'office': [office_key], 'sensor': [sensor_key], 'count': [old_state]})
    else:
        count = old_state = 0
        if state.exists:
            old_state = state.get[0]
        for df in dfi:
            count += ((df['MeasurementType'] == 'temperature') & (df['MeasurementValue'] > 23)).sum()
        new_state = count + old_state
        state.update((new_state,))
        state.setTimeoutDuration(1000)
        yield pd.DataFrame()

output_schema = "office STRING, sensor STRING, count LONG"
state_schema = "count LONG"
dfa = dfSensorSilver.withWatermark("EventTime", "5 seconds")\
    .groupBy('Office', 'Sensor')\
    .applyInPandasWithState(custtom_agg, output_schema, state_schema, "append", 
                            GroupStateTimeout.ProcessingTimeTimeout)\
                          .writeStream.format('delta')\
                          .option("checkpointLocation", checkpoint_path_sensor8)\
                          .queryName('agg_iot_measurements2')\
                          .toTable('silver.agg_iot_measurements')





In [0]:
def custtom_agg(pkey: Tuple, dfi: Iterator[pd.DataFrame], state: GroupState) -> Iterator[pd.DataFrame]:
    if state.hasTimedOut:
        office_key, sensor_key = pkey
        old_state = state.get[0]
        state.remove()
        yield pd.DataFrame({'office': [office_key], 'sensor': [sensor_key], 'count': [old_state]})
    else:
        count = old_state = 0
        if state.exists:
            old_state = state.get[0]
        for df in dfi:
            count += ((df['MeasurementType'] == 'temperature') & (df['MeasurementValue'] > 23)).sum()
        new_state = count + old_state
        state.update((new_state,))
        state.setTimeoutTimestamp(2000)
        yield pd.DataFrame()

output_schema = "office STRING, sensor STRING, count LONG"
state_schema = "count LONG"
dfa = dfSensorSilver.withWatermark("EventTime", "5 seconds")\
    .groupBy('Office', 'Sensor')\
    .applyInPandasWithState(custtom_agg, output_schema, state_schema, "append", 
                            GroupStateTimeout.EventTimeTimeout)\
                          .writeStream.format('delta')\
                          .option("checkpointLocation", checkpoint_path_sensor8)\
                          .queryName('agg_iot_measurements')\
                          .toTable('silver.agg_iot_measurements')



In [0]:
%sql
select * from silver.agg_iot_measurements

### Stream joins

#### Stream to static joins

In [0]:
# Caching on static dataframe is recommended
dfo=spark.table('office').cache()   
dfso=spark.readStream.table('silver.iot_measurements')

In [0]:
dfj=dfso.join(dfo, 'Office')\
  .select(dfso.EventTime,dfso.MeasurementType,dfso.MeasurementValue,
          dfso.Office,dfso.Sensor,dfo.City)
display(dfj)  

#### Stream to stream joins

**Watermarking requirements: **
- Optional for inner joins
- Mandatory for outer joins

##### Inner joins

In [0]:
display(spark.readStream.table('weather').withColumn('WeatherEventTime', F.col('EventTime').cast('Timestamp')))

In [0]:
dfwstm=spark.readStream.table('weather')\
    .withColumn('WeatherEventTime', F.col('EventTime').cast('Timestamp'))\

dfssj=dfj\
    .join(dfwstm,(dfj.City==dfwstm.City)  &  (dfj.EventTime == dfwstm.WeatherEventTime),how='inner')\
    .select(dfj.EventTime,'WeatherEventTime','MeasurementType','MeasurementValue','Office','Sensor',
    dfj.City,dfwstm.Temperature)
display(dfssj)


In [0]:
dfwstm=spark.readStream.table('weather')\
    .withColumn('WeatherEventTime', F.col('EventTime').cast('Timestamp'))\

dfssj=dfj\
    .join(dfwstm,
    (dfj.City==dfwstm.City)  &
    (dfj.EventTime.between(F.expr('WeatherEventTime - interval 15 seconds'), dfwstm.WeatherEventTime)),how='inner')\
    .select(dfj.EventTime,'WeatherEventTime','MeasurementType','MeasurementValue','Office','Sensor',
    dfj.City,dfwstm.Temperature)
display(dfssj)

##### Outer joins

In [0]:
dfwstm=spark.readStream.table('weather')\
    .withColumn('WeatherEventTime', F.col('EventTime').cast('Timestamp'))\
    .drop('EventTime')\
    .withWatermark('WeatherEventTime','20 seconds')


In [0]:
dfssj=dfj.withWatermark('EventTime','30 seconds')\
    .join(dfwstm,
    (dfj.City==dfwstm.City)  &  (dfj.EventTime == dfwstm.WeatherEventTime),how='leftOuter')\
    .select(dfj.EventTime,'WeatherEventTime','MeasurementType','MeasurementValue','Office','Sensor',
    dfj.City,dfwstm.Temperature)
display(dfssj)

In [0]:
dfssj=dfj.withWatermark('EventTime','5 seconds')\
    .join(dfwstm,
    (dfj.City==dfwstm.City)  &
    F.expr('EventTime BETWEEN WeatherEventTime - interval 15 seconds AND WeatherEventTime'),how='left')\
    .select(dfj.EventTime,'WeatherEventTime','MeasurementType','MeasurementValue','Office','Sensor',
    dfj.City,dfwstm.Temperature)
display(dfssj)

## Monitoring stream execution

In [0]:
dfstt=spark.readStream.table('iot_measurements')\
    .selectExpr('Measurements.Sensor as Sensor',
           'Measurements.MeasurementType  as MeasurementType',
           'cast(Measurements.MeasurementValue as float) as MeasurementValue',
           'cast(EventTime as timestamp) as EventTime',
           'Office','IngestionTimestamp')\
    .writeStream.format("delta")\
    .queryName('iot_measurements_silver')\
    .option('checkpointLocation', checkpoint_path_sensor5)\
    .outputMode('append')\
    .toTable('silver.iot_measurements')    


In [0]:
dfstt.status

In [0]:
print(dfstt.lastProgress)

In [0]:
dfstt.recentProgress

In [0]:
for strm in spark.streams.active:
    print(f'Stream {strm.name} is active')

In [0]:
from pyspark.sql.streaming import StreamingQueryListener
from pyspark.sql.streaming.listener import QueryStartedEvent, QueryProgressEvent, QueryIdleEvent, QueryTerminatedEvent
from pyspark.sql.types import *
import json
class MyListener(StreamingQueryListener):
   def onQueryStarted(self, event: QueryStartedEvent) -> None:
       # Do something with event.
       print ("Streaming progress listener started")
       pass

   def onQueryProgress(self, event: QueryProgressEvent) -> None:
        try:
            schema = StructType([StructField("queryName", StringType()), 
                                 StructField("batchId", IntegerType()),
                                 StructField("numInputRows", IntegerType()),
                                 StructField("inputRowsPerSecond", FloatType()),
                                 StructField("processedRowsPerSecond", FloatType()),
                                 StructField("batchDuration", IntegerType()),
                                 StructField("timestamp", StringType())])
            data={"queryName": event.progress.name,
                "batchId":event.progress.batchId,
                "numInputRows":event.progress.numInputRows,
                "inputRowsPerSecond":event.progress.inputRowsPerSecond, 
                "processedRowsPerSecond":event.progress.processedRowsPerSecond,
                "batchDuration":event.progress.batchDuration,
                "timestamp":event.progress.timestamp}
            df = spark.createDataFrame([data],schema)
            print ("Streaming progress writing to silver.streaming_monitor")
            df.write.mode("append").saveAsTable("silver.streaming_monitor")
            # df.write.mode("append").saveAsTable("learn_adb_fikrat.silver.streaming_monitor")
        except Exception as e:
            print ('Streaming exception')
            print (e)            
        pass

   def onQueryIdle(self, event: QueryIdleEvent) -> None:
       # Do something with event.
       pass

   def onQueryTerminated(self, event: QueryTerminatedEvent) -> None:
       # Do something with event.
       pass

spark.streams.addListener(MyListener())

In [0]:
%sql
select * from silver.streaming_monitor order by queryname, `timestamp` desc

Databricks visualization. Run in Databricks to view.