In [0]:
dbutils.fs.cp("file:/Workspace/Shared/transaction.csv", "dbfs:/mnt/streaming/csv_files/transaction.csv")

True

In [0]:
dbutils.fs.cp("file:/Workspace/Shared/Products.csv", "dbfs:/mnt/streaming/csv_files/Products.csv")

True

In [0]:
#  Set Up a Structured Streaming Source to Read CSV Data Continuously
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

schema = StructType([
    StructField("TransactionID", StringType(), True),
    StructField("TransactionDate", StringType(), True),
    StructField("ProductID", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("Price", IntegerType(), True)
])

streaming_df = (
    spark.readStream
    .option("header", "true")
    .schema(schema)
    .csv("/mnt/streaming/csv_files/")
)

streaming_df.printSchema()

root
 |-- TransactionID: string (nullable = true)
 |-- TransactionDate: string (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: integer (nullable = true)



In [0]:
# 3.  Ensure that the streaming query reads the data continuously in append mode and displays the results in the console.
query = (
    streaming_df.writeStream
    .format("console")
    .outputMode("append")
    .start()
)

In [0]:
# Add a new column for the TotalAmount ( Quantity * Price )
# Filter records where the Quantity is greater than 1
from pyspark.sql.functions import col

transformed_streaming_df = (
    streaming_df
    .withColumn("TotalAmount", col("Quantity") * col("Price"))
    .filter(col("Quantity") > 1)
)

transformed_streaming_df.printSchema()


root
 |-- TransactionID: string (nullable = true)
 |-- TransactionDate: string (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: integer (nullable = true)
 |-- TotalAmount: integer (nullable = true)



In [0]:
# Write the transformed stream to a memory sink
query = (
    transformed_streaming_df.writeStream
    .format("memory")
    .outputMode("append")
    .queryName("transformed_transactions")  # The name of the in-memory table
    .start()
)


In [0]:
spark.sql("SELECT * FROM transformed_transactions").show()

+-------------+---------------+---------+--------+-----+-----------+
|TransactionID|TransactionDate|ProductID|Quantity|Price|TotalAmount|
+-------------+---------------+---------+--------+-----+-----------+
|         T101|     2024-01-01|   Laptop|       2| 1200|       2400|
|         T103|     2024-01-03|   Tablet|       3|  600|       1800|
|         T105|     2024-01-05|    Mouse|       5|   25|        125|
+-------------+---------------+---------+--------+-----+-----------+



In [0]:
#  Implement an aggregation on the streaming data
#Group the data by ProductID and calculate the total sales for each product
from pyspark.sql.functions import sum

aggregated_streaming_df = (
    streaming_df
    .groupBy("ProductID")
    .agg(sum(col("Quantity") * col("Price")).alias("TotalSales"))
)

aggregated_streaming_df.printSchema()


root
 |-- ProductID: string (nullable = true)
 |-- TotalSales: long (nullable = true)



In [0]:
# Ensure the stream runs in update mode, so only updated results are output to the sink.
query = (
    aggregated_streaming_df.writeStream
    .format("memory")
    .outputMode("update")
    .queryName("aggregated_sales")
    .start()
)


In [0]:
spark.sql("SELECT * FROM aggregated_sales").show()

+---------+----------+
|ProductID|TotalSales|
+---------+----------+
|    Phone|       800|
|   Laptop|      2400|
|    Mouse|       125|
|   Tablet|      1800|
|  Monitor|       300|
+---------+----------+



In [0]:
# After transforming and aggregating the data, write the streaming results to a Parquet sink.
output_path = "/mnt/streaming/output/parquet_files"

# Convert TransactionDate to TIMESTAMP type
aggregated_streaming_df = transformed_streaming_df.withColumn(
    "TransactionDate",
    transformed_streaming_df["TransactionDate"].cast("timestamp")
)

# Add a watermark to handle late data
aggregated_streaming_df_with_watermark = aggregated_streaming_df.withWatermark(
    "TransactionDate",
    "10 minutes"
)

query = (
    aggregated_streaming_df_with_watermark.writeStream
    .format("parquet")
    .outputMode("append")
    .option("path", output_path)
    .option("checkpointLocation", "/mnt/streaming/output/checkpoints")
    .start()
)

In [0]:
# Introduce a watermark on the TransactionDate column to handle late data arriving in the stream.
from pyspark.sql.functions import col

watermarked_streaming_df = streaming_df.withColumn(
    "TransactionDate",
   streaming_df["TransactionDate"].cast("timestamp")
)

In [0]:
# Simulate Two Streams of Data
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

transaction_schema = StructType([
    StructField("TransactionID", StringType(), True),
    StructField("TransactionDate", StringType(), True),
    StructField("ProductID", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("Price", DoubleType(), True)
])



product_schema = StructType([
    StructField("ProductID", StringType(), True),
    StructField("ProductName", StringType(), True),
    StructField("Category", StringType(), True)
])


In [0]:
dbutils.fs.mkdirs("/mnt/streaming/transaction_data")

# Ingest the streaming data
transaction_stream = (
    spark.readStream
    .option("header", "true")
    .schema(transaction_schema)
    .csv("/mnt/streaming/transaction_data")
)

product_schema = StructType([
    StructField("ProductID", StringType(), True),
    StructField("ProductName", StringType(), True),
    StructField("Category", StringType(), True)
])

product_df=(
    spark.readStream
    .option("header", "true")
    .schema(product_schema)
    .csv("/mnt/streaming/csv_files/")
)


In [0]:
# Perform a Join on the Two Streams
# Perform an inner join on the ProductID column
joined_stream = (
    transaction_stream.alias("t")
    .join(product_df.alias("p"), "ProductID", "inner")
)

joined_stream.printSchema()

joined_stream_result = joined_stream.select(
    "t.TransactionID", "t.TransactionDate", "t.ProductID", "p.ProductName", "p.Category", "t.Quantity", "t.Price"
)

query = (
    joined_stream_result.writeStream
    .format("console")
    .outputMode("append")
    .start()
)

root
 |-- ProductID: string (nullable = true)
 |-- TransactionID: string (nullable = true)
 |-- TransactionDate: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: double (nullable = true)
 |-- ProductName: string (nullable = true)
 |-- Category: string (nullable = true)



In [0]:
# Stop the streaming query
query.stop()

# Explore the results
result_df = spark.read.parquet("/mnt/streaming/output/parquet_files")
result_df.show()

+-------------+-------------------+---------+--------+-----+-----------+
|TransactionID|    TransactionDate|ProductID|Quantity|Price|TotalAmount|
+-------------+-------------------+---------+--------+-----+-----------+
|         T101|2024-01-01 00:00:00|   Laptop|       2| 1200|       2400|
|         T103|2024-01-03 00:00:00|   Tablet|       3|  600|       1800|
|         T105|2024-01-05 00:00:00|    Mouse|       5|   25|        125|
+-------------+-------------------+---------+--------+-----+-----------+



In [0]:
# Restart the streaming query using the same checkpoint location
query = (
    joined_stream_result.writeStream
    .format("parquet")
    .outputMode("append")
    .option("path", "/mnt/streaming/output/parquet_with_watermark")
    .option("checkpointLocation", "/mnt/streaming/output/checkpoints_with_watermark")
    .start()
)
