In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import col, current_timestamp



In [0]:
spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()

Read the CSV file to infer schema


In [0]:
df = spark.read.option("inferSchema", "true").option("header", "true") \
    .csv("dbfs:/FileStore/shared_uploads/teenakurian4u@gmail.com/employees.csv")
df.printSchema()
df.show()

root
 |-- EMPLOYEE_ID: integer (nullable = true)
 |-- FIRST_NAME: string (nullable = true)
 |-- LAST_NAME: string (nullable = true)
 |-- EMAIL: string (nullable = true)
 |-- PHONE_NUMBER: string (nullable = true)
 |-- HIRE_DATE: string (nullable = true)
 |-- JOB_ID: string (nullable = true)
 |-- SALARY: integer (nullable = true)
 |-- COMMISSION_PCT: string (nullable = true)
 |-- MANAGER_ID: string (nullable = true)
 |-- DEPARTMENT_ID: integer (nullable = true)

+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|  SH_CLERK|  2600|            - |       124|           50|
|        199|   Douglas|    Grant|

Extract inferred Schema

In [0]:
inferred_schema = df.schema

using inferred schema for streaming and Read streaming data from CSV

In [0]:
streaming_df = spark.readStream.schema(inferred_schema).option("header", "true") \
    .csv("dbfs:/FileStore/shared_uploads/teenakurian4u@gmail.com/")
    


Display the streaming DataFrame

In [0]:
query = streaming_df.writeStream.format("console").outputMode("append").trigger(processingTime="5 seconds").start()


In [0]:
query.stop()


Process data (Add a simple transformation)

In [0]:
processed_df = streaming_df.withColumn("SALARY", col("SALARY")+500)

Write the processed data to Delta table 

In [0]:
query1 = processed_df.writeStream.format("delta").outputMode("append").option("checkpointLocation", "dbfs:/FileStore/shared_uploads/teenakurian4u@gmail.com/checkpoints/").trigger(processingTime="10 seconds").start("dbfs:/FileStore/shared_uploads/teenakurian4u@gmail.com/employee_data/")


In [0]:
query1.stop()

In [0]:
df = spark.read.format("delta").load("dbfs:/FileStore/shared_uploads/teenakurian4u@gmail.com/employee_data/")
df.show()


+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|HIRE_DATE|    JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+---------+----------+------+--------------+----------+-------------+
|        198|    Donald| OConnell|DOCONNEL|650.507.9833|21-JUN-07|  SH_CLERK|  3100|            - |       124|           50|
|        199|   Douglas|    Grant|  DGRANT|650.507.9844|13-JAN-08|  SH_CLERK|  3100|            - |       124|           50|
|        200|  Jennifer|   Whalen| JWHALEN|515.123.4444|17-SEP-03|   AD_ASST|  4900|            - |       101|           10|
|        201|   Michael|Hartstein|MHARTSTE|515.123.5555|17-FEB-04|    MK_MAN| 13500|            - |       100|           20|
|        202|       Pat|      Fay|    PFAY|603.123.6666|17-AUG-05|    MK_REP|  6500|            - |       201|           20|


WATERMARK

Adding an event-time column (simulating timestamp generation)

In [0]:
processed_df1 = streaming_df.withColumn("event_time", current_timestamp())

 Applying watermark to manage late data (keeping data of max 2 minutes old)

In [0]:
watermarked_df = processed_df1.withWatermark("event_time", "2 minutes")


Performing aggregation with watermarking

In [0]:
aggregated_df = watermarked_df.groupBy("JOB_ID").count()

Write to Delta Table with checkpointing and trigger intervals

In [0]:
query2 = aggregated_df.writeStream.format("delta").outputMode("complete").option("mergeSchema","True").option("checkpointLocation", "dbfs:/FileStore/shared_uploads/teenakurian4u@gmail.com/watermark_checkpoints/").trigger(processingTime="10 seconds").start("dbfs:/FileStore/shared_uploads/teenakurian4u@gmail.com/employee_data/")

In [0]:
query2.stop()