In [0]:
df = spark.read.csv("/FileStore/tables/Streaming/Stream_readStream/csv/streaming_01.csv", header=True, inferSchema=True)

**Create folder in DBFS**

In [0]:
dbutils.fs.mkdirs("/FileStore/tables/Streaming/Stream_checkpoint/csv")
dbutils.fs.mkdirs("/FileStore/tables/Streaming/Stream_checkpoint/json")
dbutils.fs.mkdirs("/FileStore/tables/Streaming/Stream_checkpoint/parquet")
dbutils.fs.mkdirs("/FileStore/tables/Streaming/Stream_checkpoint/orc")
dbutils.fs.mkdirs("/FileStore/tables/Streaming/Stream_checkpoint/avro")

dbutils.fs.mkdirs("/FileStore/tables/Streaming/Stream_readStream/csv/")
dbutils.fs.mkdirs("/FileStore/tables/Streaming/Stream_readStream/json/")
dbutils.fs.mkdirs("/FileStore/tables/Streaming/Stream_readStream/parquet/")
dbutils.fs.mkdirs("/FileStore/tables/Streaming/Stream_readStream/orc/")
dbutils.fs.mkdirs("/FileStore/tables/Streaming/Stream_readStream/avro/")

dbutils.fs.mkdirs("/FileStore/tables/Streaming/Stream_writeStream/csv/")
dbutils.fs.mkdirs("/FileStore/tables/Streaming/Stream_writeStream/json/")
dbutils.fs.mkdirs("/FileStore/tables/Streaming/Stream_writeStream/parquet/")
dbutils.fs.mkdirs("/FileStore/tables/Streaming/Stream_writeStream/orc/")
dbutils.fs.mkdirs("/FileStore/tables/Streaming/Stream_writeStream/avro/")

**Delete folder in DBFS**

In [0]:
dbutils.fs.rm("/FileStore/tables/Streaming/Stream_checkpoint/csv", True)
dbutils.fs.rm("/FileStore/tables/Streaming/Stream_checkpoint/json", True)
dbutils.fs.rm("/FileStore/tables/Streaming/Stream_checkpoint/parquet", True)
dbutils.fs.rm("/FileStore/tables/Streaming/Stream_checkpoint/orc", True)
dbutils.fs.rm("/FileStore/tables/Streaming/Stream_checkpoint/avro", True)

dbutils.fs.rm("/FileStore/tables/Streaming/Stream_readStream/csv", True)
dbutils.fs.rm("/FileStore/tables/Streaming/Stream_readStream/json", True)
dbutils.fs.rm("/FileStore/tables/Streaming/Stream_readStream/parquet", True)
dbutils.fs.rm("/FileStore/tables/Streaming/Stream_readStream/orc", True)
dbutils.fs.rm("/FileStore/tables/Streaming/Stream_readStream/avro", True)

dbutils.fs.rm("/FileStore/tables/Streaming/Stream_writeStream/csv", True)
dbutils.fs.rm("/FileStore/tables/Streaming/Stream_writeStream/json", True)
dbutils.fs.rm("/FileStore/tables/Streaming/Stream_writeStream/parquet", True)
dbutils.fs.rm("/FileStore/tables/Streaming/Stream_writeStream/orc", True)
dbutils.fs.rm("/FileStore/tables/Streaming/Stream_writeStream/avro", True)

**Define schema for input csv file**
- schema must be specified when creating a streaming source dataframe, otherwise it will through error.

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Define the schema based on the CSV structure
schema_csv = StructType([
    StructField("Index", StringType(), True),
    StructField("Category", StringType(), True),
    StructField("Type", StringType(), True),
    StructField("Effective_Date", StringType(), True),
    StructField("Income_Level", StringType(), True),
    StructField("Department", StringType(), True),
    StructField("TARGET", StringType(), True),
    StructField("Input_Timestamp_UTC", IntegerType(), True)
])

#### **1) readStream**

In [0]:
stream_csv = spark.readStream\
                  .format("csv")\
                  .option("header", True)\
                  .schema(schema_csv)\
                  .csv("/FileStore/tables/Streaming/Stream_readStream/csv/")

print(stream_csv.isStreaming)
print(stream_csv.printSchema())
                        
display(stream_csv)

# stream_csv.awaitTermination()

Index,Category,Type,Effective_Date,Income_Level,Department,TARGET,Input_Timestamp_UTC
DISCOUNT,Top,average,6-Feb-23,Low,TESTING,SQL Server,1709109264.0
DISCOUNT,Top,average,6-Feb-23,Low,TESTING,SQL Server,1710234895.0
DISCOUNT,Top,average,8-Jan-24,Low,TESTING,SQL Server,1709109264.0
DISCOUNT,Top,average,8-Jan-24,Low,TESTING,SQL Server,1707813327.0
DISCOUNT,Top,average,6-Mar-23,Low,TESTING,SQL Server,1707813327.0
DISCOUNT,Forward,medium,6-Mar-23,Low,TESTING,SQL Server,1707813327.0
DISCOUNT,Forward,medium,6-Jan-25,Low,TESTING,SQL Server,1707813327.0
DISCOUNT,Forward,medium,6-Jan-25,Low,TESTING,SQL Server,1707813327.0
DISCOUNT,Forward,medium,6-Apr-23,Low,TESTING,SQL Server,1707813327.0
DISCOUNT,Forward,medium,6-Apr-23,Low,TESTING,SQL Server,1707813327.0


#### **2) writeStream**

**format('console')**

In [0]:
stream_csv.writeStream\
                .format('console')\
                .outputMode('append')\
                .start()

display(stream_csv)

Index,Category,Type,Effective_Date,Income_Level,Department,TARGET,Input_Timestamp_UTC
DISCOUNT,Top,average,6-Feb-23,Low,TESTING,SQL Server,1709109264.0
DISCOUNT,Top,average,6-Feb-23,Low,TESTING,SQL Server,1710234895.0
DISCOUNT,Top,average,8-Jan-24,Low,TESTING,SQL Server,1709109264.0
DISCOUNT,Top,average,8-Jan-24,Low,TESTING,SQL Server,1707813327.0
DISCOUNT,Top,average,6-Mar-23,Low,TESTING,SQL Server,1707813327.0
DISCOUNT,Forward,medium,6-Mar-23,Low,TESTING,SQL Server,1707813327.0
DISCOUNT,Forward,medium,6-Jan-25,Low,TESTING,SQL Server,1707813327.0
DISCOUNT,Forward,medium,6-Jan-25,Low,TESTING,SQL Server,1707813327.0
DISCOUNT,Forward,medium,6-Apr-23,Low,TESTING,SQL Server,1707813327.0
DISCOUNT,Forward,medium,6-Apr-23,Low,TESTING,SQL Server,1707813327.0


**format('parquet')**

In [0]:
stream_csv.writeStream\
                .format('parquet')\
                .outputMode('append')\
                .option("path", "/FileStore/tables/Streaming/Stream_writeStream/csv")\
                .option("checkpointLocation", "dbfs:/FileStore/tables/Streaming/Stream_checkpoint")\
                .start()

display(stream_csv)

Index,Category,Type,Effective_Date,Income_Level,Department,TARGET,Input_Timestamp_UTC
DISCOUNT,Top,average,6-Feb-23,Low,TESTING,SQL Server,1709109264.0
DISCOUNT,Top,average,6-Feb-23,Low,TESTING,SQL Server,1710234895.0
DISCOUNT,Top,average,8-Jan-24,Low,TESTING,SQL Server,1709109264.0
DISCOUNT,Top,average,8-Jan-24,Low,TESTING,SQL Server,1707813327.0
DISCOUNT,Top,average,6-Mar-23,Low,TESTING,SQL Server,1707813327.0
DISCOUNT,Forward,medium,6-Mar-23,Low,TESTING,SQL Server,1707813327.0
DISCOUNT,Forward,medium,6-Jan-25,Low,TESTING,SQL Server,1707813327.0
DISCOUNT,Forward,medium,6-Jan-25,Low,TESTING,SQL Server,1707813327.0
DISCOUNT,Forward,medium,6-Apr-23,Low,TESTING,SQL Server,1707813327.0
DISCOUNT,Forward,medium,6-Apr-23,Low,TESTING,SQL Server,1707813327.0


**verify the written stream data**

In [0]:
display(spark.read.format("parquet").load("/FileStore/tables/Streaming/Stream_writeStream/csv/*.parquet"))

Index,Category,Type,Effective_Date,Income_Level,Department,TARGET,Input_Timestamp_UTC
DISCOUNT,Top,average,6-Feb-23,Low,TESTING,SQL Server,1709109264.0
DISCOUNT,Top,average,6-Feb-23,Low,TESTING,SQL Server,1710234895.0
DISCOUNT,Top,average,8-Jan-24,Low,TESTING,SQL Server,1709109264.0
DISCOUNT,Top,average,8-Jan-24,Low,TESTING,SQL Server,1707813327.0
DISCOUNT,Top,average,6-Mar-23,Low,TESTING,SQL Server,1707813327.0
DISCOUNT,Forward,medium,6-Mar-23,Low,TESTING,SQL Server,1707813327.0
DISCOUNT,Forward,medium,6-Jan-25,Low,TESTING,SQL Server,1707813327.0
DISCOUNT,Forward,medium,6-Jan-25,Low,TESTING,SQL Server,1707813327.0
DISCOUNT,Forward,medium,6-Apr-23,Low,TESTING,SQL Server,1707813327.0
DISCOUNT,Forward,medium,6-Apr-23,Low,TESTING,SQL Server,1707813327.0
