Setup: Create a Simulated Inputs Source

In [0]:
dbutils.fs.mkdirs("/Volumes/my_retail_data/retail_schema/retail_volume/input")

True

Create a sample input json file

In [0]:
sample_data = """
{"order_id": 1001, "customer_id": 501, "item": "Mobile", "amount": 19999.99, "timestamp": "2025-07-29T12:00:00"}
{"order_id": 1002, "customer_id": 502, "item": "Laptop", "amount": 49999.99, "timestamp": "2025-07-29T12:01:00"}
{"order_id": 1003, "customer_id": 503, "item": "Tablet", "amount": 29999.99, "timestamp": "2025-07-29T12:02:00"}
{"order_id": 1004, "customer_id": 504, "item": "Headphones", "amount": 999.99, "timestamp": "2025-07-29T12:03:00"}
{"order_id": 1005, "customer_id": 505, "item": "Watch", "amount": 1999.99, "timestamp": "2025-07-29T12:05:00"}
{"order_id": 1006, "customer_id": 506, "item": "Washing Machine", "amount": 49999.99, "timestamp": "2025-07-29T12:06:00"}
{"order_id": 1007, "customer_id": 507, "item": "Refrigerator", "amount": 59999.99, "timestamp": "2025-07-29T12:07:00"}
{"order_id": 1008, "customer_id": 508, "item": "Television", "amount": 39999.99, "timestamp": "2025-07-29T12:08:00"}
{"order_id": 1009, "customer_id": 509, "item": "Camera", "amount": 69999.99, "timestamp": "2025-07-29T12:09:00"}
{"order_id": 1010, "customer_id": 510, "item": "Desktop", "amount": 29999.99, "timestamp": "2025-07-29T12:10:00"}
{"order_id": 1011, "customer_id": 511, "item": "Washing Machine", "amount": 49999.99, "timestamp": "2025-07-27T12:06:00"}
{"oreder_id": 1012, "customer_id": 512, "item": "Refrigerator", "amount": 59999.99, "timestamp": "2025-07-27T12:07:00"}
{"order_id": 1013, "customer_id": 513, "item": "Television", "amount": 39999.99, "timestamp": "2025-07-27T12:08:00"}
{"order_id": 1014, "customer_id": , "item": "Camera", "amount": 69999.99, "timestamp": "2025-07-27T12:09:00"}
{"order_id": 1015, "customer_id": 515, "item": "Desktop", "amount": 29999.99, "timestamp": "2025-07-27T12:10:00"}
"""
dbutils.fs.put("/Volumes/my_retail_data/retail_schema/retail_volume/input/batch1.json", sample_data, True)

Wrote 1734 bytes.


True

Define schema and read \
Streaming Dataframe

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType
schema = StructType() \
    .add("order_id", IntegerType()) \
    .add("customer_id", IntegerType()) \
    .add("item", StringType()) \
    .add("amount", DoubleType()) \
    .add("timestamp", TimestampType())

input_df = (spark.readStream.schema(schema).json("/Volumes/my_retail_data/retail_schema/retail_volume/input"))

In [0]:
df = spark.read.schema(schema).json("/Volumes/my_retail_data/retail_schema/retail_volume/input/batch1.json")
display(df)

order_id,customer_id,item,amount,timestamp
1001.0,501.0,Mobile,19999.99,2025-07-29T12:00:00.000Z
1002.0,502.0,Laptop,49999.99,2025-07-29T12:01:00.000Z
1003.0,503.0,Tablet,29999.99,2025-07-29T12:02:00.000Z
1004.0,504.0,Headphones,999.99,2025-07-29T12:03:00.000Z
1005.0,505.0,Watch,1999.99,2025-07-29T12:05:00.000Z
1006.0,506.0,Washing Machine,49999.99,2025-07-29T12:06:00.000Z
1007.0,507.0,Refrigerator,59999.99,2025-07-29T12:07:00.000Z
1008.0,508.0,Television,39999.99,2025-07-29T12:08:00.000Z
1009.0,509.0,Camera,69999.99,2025-07-29T12:09:00.000Z
1010.0,510.0,Desktop,29999.99,2025-07-29T12:10:00.000Z


In [0]:
from pyspark.sql.functions import current_timestamp, from_utc_timestamp
trans_df = (
    input_df.filter("order_id IS NOT NULL AND amount > 0")
    .withColumn("processed_time", from_utc_timestamp(current_timestamp(), "Asia/Kolkata"))
)

Load data to Delta Lake Table (Sink) \
Write output to a delta table in append mode:

Writing data to delta table

In [0]:
output_path = "/Volumes/my_retail_data/retail_schema/retail_volume/output/order_delta"

query = (
  trans_df.writeStream
  .format("delta").outputMode("append")
  .option("checkpointLocation", "/Volumes/my_retail_data/retail_schema/retail_volume/checkpoint/orders")
  .trigger(availableNow=True)
  .start(output_path)
  )

In [0]:
query.status

{'message': 'Stopped', 'isDataAvailable': False, 'isTriggerActive': False}

In [0]:
df = spark.read.format("delta").load(output_path)
display(df)

order_id,customer_id,item,amount,timestamp,processed_time
1001,501,Mobile,19999.99,2025-07-29T12:00:00.000Z,2025-07-29T16:45:23.846Z
1002,502,Laptop,49999.99,2025-07-29T12:01:00.000Z,2025-07-29T16:45:23.846Z
1003,503,Tablet,29999.99,2025-07-29T12:02:00.000Z,2025-07-29T16:45:23.846Z
1004,504,Headphones,999.99,2025-07-29T12:03:00.000Z,2025-07-29T16:45:23.846Z
1005,505,Watch,1999.99,2025-07-29T12:05:00.000Z,2025-07-29T16:45:23.846Z
1006,506,Washing Machine,49999.99,2025-07-29T12:06:00.000Z,2025-07-29T16:45:23.846Z
1007,507,Refrigerator,59999.99,2025-07-29T12:07:00.000Z,2025-07-29T16:45:23.846Z
1008,508,Television,39999.99,2025-07-29T12:08:00.000Z,2025-07-29T16:45:23.846Z
1009,509,Camera,69999.99,2025-07-29T12:09:00.000Z,2025-07-29T16:45:23.846Z
1010,510,Desktop,29999.99,2025-07-29T12:10:00.000Z,2025-07-29T16:45:23.846Z


appending missed values.

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, FloatType
from pyspark.sql.functions import current_timestamp, from_utc_timestamp
from datetime import datetime
#getting missed data from batch1.json or from cutomers, if it present we will add it to the dataframe manually
missed_data = [
    (1012, 512, "Refrigerator", 59999.99, datetime.strptime ("2025-07-27T12:07:00", "%Y-%m-%dT%H:%M:%S")),
    (1014, 514, "Camera", 69999.99, datetime.strptime("2025-07-27T12:09:00", "%Y-%m-%dT%H:%M:%S"))
]

#Define schema for dataframe creation
schema = StructType([
    StructField("order_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("item", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("timestamp", TimestampType(), True)
])

#Create dataframe with missed data
missed_df = spark.createDataFrame(missed_data, schema)

#cast timestamp to actual timestamp type
missed_df = missed_df.withColumn("timestamp", missed_df["timestamp"].cast(TimestampType()))

#add processed_time column in ist
missed_df = missed_df.withColumn("processed_time", from_utc_timestamp(current_timestamp(), "Asia/Kolkata"))

#append to delta table
output_path = "/Volumes/my_retail_data/retail_schema/retail_volume/output/order_delta"
missed_df.write.format("delta").mode("append").save(output_path)
df = spark.read.format("delta").load(output_path)
display(df)




order_id,customer_id,item,amount,timestamp,processed_time
1001,501,Mobile,19999.99,2025-07-29T12:00:00.000Z,2025-07-29T16:45:23.846Z
1002,502,Laptop,49999.99,2025-07-29T12:01:00.000Z,2025-07-29T16:45:23.846Z
1003,503,Tablet,29999.99,2025-07-29T12:02:00.000Z,2025-07-29T16:45:23.846Z
1004,504,Headphones,999.99,2025-07-29T12:03:00.000Z,2025-07-29T16:45:23.846Z
1005,505,Watch,1999.99,2025-07-29T12:05:00.000Z,2025-07-29T16:45:23.846Z
1006,506,Washing Machine,49999.99,2025-07-29T12:06:00.000Z,2025-07-29T16:45:23.846Z
1007,507,Refrigerator,59999.99,2025-07-29T12:07:00.000Z,2025-07-29T16:45:23.846Z
1008,508,Television,39999.99,2025-07-29T12:08:00.000Z,2025-07-29T16:45:23.846Z
1009,509,Camera,69999.99,2025-07-29T12:09:00.000Z,2025-07-29T16:45:23.846Z
1010,510,Desktop,29999.99,2025-07-29T12:10:00.000Z,2025-07-29T16:45:23.846Z


To check streaming \
add more data in form of json files and re run initialize streaming

In [0]:
new_data = """
  {"order_id": 1016, "customer_id": 516, "item": "chairs", "amount": 2999.89, "timestamp": "2025-07-26T13:06:00"}
  {"order_id": 1017, "customer_id": 517, "item": "Sofa", "amount": 58999.99, "timestamp": "2025-07-24T16:07:00"}
  {"order_id": 1018, "customer_id": 518, "item": "Kitchen_furniture", "amount": 77999.99, "timestamp": "2025-07-26T15:08:00"}
  {"order_id": 1019, "customer_id": 519, "item": "Air_conditioners", "amount": 89999.99, "timestamp": "2025-07-26T19:09:00"}
  {"order_id": 1020, "customer_id": 520, "item": "Iron_pipes", "amount": 100999.99, "timestamp": "2025-07-26T18:10:00"}
  """
dbutils.fs.put("/Volumes/my_retail_data/retail_schema/retail_volume/input/batch2.json", new_data, True)

Wrote 601 bytes.


True

checkin new data is appending or not

In [0]:
df = spark.read.format("delta").load(output_path)
display(df)

order_id,customer_id,item,amount,timestamp,processed_time
1001,501,Mobile,19999.99,2025-07-29T12:00:00.000Z,2025-07-29T16:45:23.846Z
1002,502,Laptop,49999.99,2025-07-29T12:01:00.000Z,2025-07-29T16:45:23.846Z
1003,503,Tablet,29999.99,2025-07-29T12:02:00.000Z,2025-07-29T16:45:23.846Z
1004,504,Headphones,999.99,2025-07-29T12:03:00.000Z,2025-07-29T16:45:23.846Z
1005,505,Watch,1999.99,2025-07-29T12:05:00.000Z,2025-07-29T16:45:23.846Z
1006,506,Washing Machine,49999.99,2025-07-29T12:06:00.000Z,2025-07-29T16:45:23.846Z
1007,507,Refrigerator,59999.99,2025-07-29T12:07:00.000Z,2025-07-29T16:45:23.846Z
1008,508,Television,39999.99,2025-07-29T12:08:00.000Z,2025-07-29T16:45:23.846Z
1009,509,Camera,69999.99,2025-07-29T12:09:00.000Z,2025-07-29T16:45:23.846Z
1010,510,Desktop,29999.99,2025-07-29T12:10:00.000Z,2025-07-29T16:45:23.846Z
