In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName('spark_streaming').getOrCreate()

In [0]:
emp = spark.read.format('csv').option('header', True).load('/Volumes/workspace/default/spark_data/emp.csv')
emp.display()

In [0]:
emp.write.mode('append').format('delta').saveAsTable('workspace.default.emp')

### ---->read and write streamig data from table

In [0]:
df = spark.readStream.option('header', True).table('workspace.default.emp').display(checkpointLocation = '/Volumes/workspace/default/spark_data/checkpoint') # create a checkpoint file
display(df)

In [0]:
%sql
DROP TABLE IF EXISTS stm_emp

In [0]:
df = spark.readStream.option('header',True).table('workspace.default.emp')

df.writeStream.format('delta')\
    .option('checkpointLocation', '/Volumes/workspace/default/spark_data/checkpoint/')\
        .trigger(availableNow = True)\
            .toTable('stm_emp')

In [0]:
df = spark.readStream.option('header',True).table('workspace.default.emp')

df.writeStream.format('delta')\
    .option('overwriteSchema',True)\
        .outputMode('append')\
            .option('checkpointLocation','/Volumes/workspace/default/spark_data/checkpoint1/')\
                .trigger(availableNow = True)\
                .toTable('cbcatalog.default.stm_emp2')

In [0]:
%sql
select * from stm_emp2

### ---->read and write streamig data from volume

In [0]:
schema = StructType([StructField('empno',IntegerType(),False),
                     StructField('ename',StringType(),True),
                     StructField('job',StringType(),True),
                     StructField('mgr',IntegerType(),True),
                     StructField('hiredate',StringType(),True),
                     StructField('sal',IntegerType(),True),
                     StructField('comm',IntegerType(),True),
                     StructField('deptno',IntegerType(),True) ])

In [0]:
df = spark.readStream\
    .format('csv')\
        .schema(schema)\
        .option('header',True)\
            .load('/Volumes/cbcatalog/cbschema/volume2/streaming_fol/')

df.writeStream\
    .format('delta')            \
        .option('overwriteSchema',True)\
            .outputMode('append')\
                .trigger(availableNow = True)\
                .option('checkpointLocation','/Volumes/cbcatalog/cbschema/volume2/checkpoint')\
                    .toTable('cbcatalog.default.stm_tab1') # option('path', volumme path)  for volume store

In [0]:
%sql
select * from cbcatalog.default.stm_tab1

### -->Modes in spark Streaming
pyspark has a method outputMood() to specify the saving mode
## 1.Complete
in complite mood the hole data of source file will be stored

In [0]:
emp_stream = spark.readStream\
    .format('cloudFiles')\
    .option('cloudFiles.format', 'CSV')\
    .option('header', True)\
    .option('cloudFiles.schemaLocation', '/Volumes/workspace/default/spark_data/schemaLocation')\
    .load('/Volumes/workspace/default/spark_data/streaming_data/') # create a directory to read increamental data coming from multiple sources

result = emp_stream.writeStream\
    .format('delta')\
    .outputMode('complete')\      # append mood or coplite mood
    .option('mergeSchema', True)\
    .option('checkpointLocation', '/Volumes/workspace/default/spark_data/checkpoint/')\
    .trigger(availableNow = True)\      # if the data is available then it process it otherwise not
    .toTable('streaming_emp') # store data as a delta table

## 2. append

In [0]:
emp_stream = spark.readStream\
    .format('cloudFiles')\
    .option('cloudFiles.format', 'CSV')\
    .option('header', True)\
    .option('cloudFiles.schemaLocation', '/Volumes/workspace/default/spark_data/schemaLocation')\
    .load('/Volumes/workspace/default/spark_data/streaming_data/') # create a directory to read increamental data coming from multiple sources

result = emp_stream.writeStream\
    .format('delta')\
    .outputMode('append')\    # in append mood only the increamental data will be store(previous data + new data)
    .option('mergeSchema', True)\
    .option('checkpointLocation', '/Volumes/workspace/default/spark_data/checkpoint/')\
    .trigger(availableNow = True)\      # if the data is available then it process it otherwise not
    .toTable('streaming_emp') # store data as a delta table

### 3. update

In [0]:
emp_stream = spark.readStream\
    .format('cloudFiles')\
    .option('cloudFiles.format', 'CSV')\
    .option('header', True)\
    .option('cloudFiles.schemaLocation', '/Volumes/workspace/default/spark_data/schemaLocation')\
    .load('/Volumes/workspace/default/spark_data/streaming_data/') # create a directory to read increamental data coming from multiple sources

result = emp_stream.writeStream\
    .format('delta')\
    .outputMode('update')\      # append mood or coplite mood
    .option('mergeSchema', True)\
    .option('checkpointLocation', '/Volumes/workspace/default/spark_data/checkpoint/')\
    .trigger(availableNow = True)\      # if the data is available then it process it otherwise not
    .toTable('streaming_emp') # store data as a delta table

### json transformation

In [0]:
cust = spark.read.format('json').option('multiLine',True).load('/Volumes/workspace/default/spark_data/cust_json.json')
cust.display()
# create a schema explicitly if you want but do not use inferSchema bcz it does not allow to select the columns

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, IntegerType, DoubleType

schema = StructType([
    StructField("customer", StructType([
        StructField("customerId", StringType(), True),
        StructField("name", StringType(), True),
        StructField("email", StringType(), True),
        StructField("phone", StringType(), True),
        StructField("address", StructType([
            StructField("street", StringType(), True),
            StructField("city", StringType(), True),
            StructField("state", StringType(), True),
            StructField("postalCode", StringType(), True),
            StructField("country", StringType(), True)
        ]), True)
    ]), True),
    StructField("order", StructType([
        StructField("orderId", StringType(), True),
        StructField("orderDate", StringType(), True),
        StructField("status", StringType(), True),
        StructField("items", ArrayType(
            StructType([
                StructField("productId", StringType(), True),
                StructField("productName", StringType(), True),
                StructField("quantity", IntegerType(), True),
                StructField("price", DoubleType(), True),
                StructField("supplier", StructType([
                    StructField("name", StringType(), True),
                    StructField("contact", StringType(), True)
                ]), True)
            ])
        ), True),
        StructField("totalAmount", DoubleType(), True),
        StructField("paymentMethod", StringType(), True)
    ]), True)
])

In [0]:
# schema is defined above cell
cust = spark.read.format('json').option('multiline',True).schema(schema).load('/Volumes/workspace/default/spark_data/simp_json2.json')

In [0]:
# get the items column
cust = cust.select(col('customer.customerId').alias('cust_id'), col('customer.name').alias('name'),col('order.items').alias('items'), col('order.orderDate').alias('ord_dt'), col('order.paymentMethod').alias('pay_mthd'))
display(cust)

In [0]:
cust1 = cust.withColumn('xpld_itm', explode_outer(col('items')))
cust1.display()

In [0]:
df = cust1.select('cust_id', 'name', col('items.productId').alias('p_id'), col('items.productName').alias('p_name'), col('items.quantity').alias('qty'), col('items.price').alias('price'), 'ord_dt', 'pay_mthd' )
df.display()

In [0]:
df = df.withColumn('id_nm', arrays_zip(col('p_id'), col('p_name')) ).withColumn('qt_pc', arrays_zip(col('qty'),col('price')))
df.display()

In [0]:
df1 = df.withColumn('map_prod', map_from_entries(col('id_nm'))).withColumn('map_val', map_from_entries(col('qt_pc')))
df1.display()

In [0]:
df_final = df1.select('cust_id','name',explode(col('map_prod')).alias('p_id', 'p_name'),explode(col('map_val')).alias('qty', 'price'), 'ord_dt', 'pay_mthd').dropDuplicates()
df_final.display()

### streaming in json files

In [0]:
# first we have to save the transformed json file as a delta table bcz spark does not allows to write a normal dataFrame as  streaming dataFrame
df_final.write.format('delta').option('header',True).option('overwriteSchema', 'true').saveAsTable('smpl_stm')

In [0]:
# we can read a normal delta table as a streaming dataFrame
df = spark.readStream.format('delta')\
    .option('header',True)\
        .load('workspace.default.smpl_stm')

df.writeStream.format('delta')\
    .outputMode('append')\
    .option('checkpointLocation','/Volumes/cbcatalog/cbschema/volume2/checkpoint1')\
        .option('mergeSchema',True)\
            .trigger(availableNow = True)\
            .option('path', '/Volumes/cbcatalog/cbschema/volume2/smpl_stm2')

In [0]:
%sql
create table workspace.default.stm_colr (color_nm string,timing timestamp)

In [0]:
%sql
insert into workspace.default.stm_colr values ('red','2025-1-10T10:05:00'), ('green','2025-2-10T10:10:00'), ('blue','2025-3-10T10:15:00'), ('yellow','2025-4-10T10:20:00'), ('orange','2025-5-10T10:25:00'), ('orange','2025-5-10T10:25:00'), ('red','2025-1-10T10:05:00')

In [0]:
%sql
select * from workspace.default. stm_colr;

In [0]:
df_colr = spark.readStream.format('delta')\
    .option('header',True)\
        .load('workspace.default.stm_colr')

In [0]:
cnt_colr = spark.sql('select color_nm, count(*) from workspace.default.stm_colr group by color_nm')
cnt_colr.display()

In [0]:
df_colr.writeStream.format('delta')\
    .option('header',True)\
        .outputMode('append')\
            .option('append',True)\
                .option('checkpointLocation','dbfs:/Volumes/workspace/default/spark_data/checkpoint_colr')\
                    .trigger(availableNow = True)\
                        .option('path','/Volumes/workspace/default/spark_data.stm_colr')

In [0]:
from pyspark.sql.functions import col

df_colr = spark.readStream.format('delta')\
    .option('header', True)\
    .load('workspace.default.stm_colr')

cnt_colr = df_colr.groupBy('color_nm').count()
display(cnt_colr)

In [0]:
# You must call .start() to begin the streaming write and actually store data at the specified location.
df_colr.writeStream.format('delta')\
    .option('header', True)\
    .outputMode('overwrite')\
    .option('mergeSchema', True)\
    .option('checkpointLocation', '/Volumes/cbcatalog/cbschema/volume2/checkpoint2/')\
    .trigger(availableNow=True)\
    .option('path', 'dbfs:/Volumes/cbcatalog/cbschema/volume2/cnt_colr')\
    .start()

### ---->window operation in Streaming
## 1.Tumbling window

In [0]:
from pyspark.sql.functions import window

# Example of applying tumbling window in Apache Spark
df = spark.readStream.format('csv').option('header', True).load('/path/to/data')

tumbling_window_df = df.withWatermark("event_time", "10 minutes") \
    .groupBy(window("event_time", "5 minutes"), "id") \
    .count()

query = tumbling_window_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

### 2.sliding window

### 3.session window

### WaterMarking

In [0]:
from pyspark.sql.functions import window

# Example of applying watermarking in Apache Spark
df = spark.readStream.format('csv').option('header', True).load('/path/to/data')

watermarked_df = df.withWatermark("event_time", "10 minutes") \
    .groupBy(window("event_time", "5 minutes"), "id") \
    .count()

query = watermarked_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

In [0]:
emp_stream = spark.readStream\
    .format('cloudFiles')\
    .option('cloudFiles.format', 'CSV')\
    .option('header', True)\
    .option('cloudFiles.schemaLocation', '/Volumes/workspace/default/spark_data/schemaLocation')\
    .load('/Volumes/workspace/default/spark_data/streaming_data/') # create a directory to read increamental data coming from multiple sources

result = emp_stream.writeStream\
    .format('delta')\
    .outputMode('append')\
    .option('mergeSchema', True)\
    .option('checkpointLocation', '/Volumes/workspace/default/spark_data/checkpoint/')\
    .trigger(availableNow = True)\
    .toTable('streaming_emp') # store data as a delta table

In [0]:
%sql
select * from streaming_emp

### ---->DLT(Delta Live Table)