In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=893486087e8b9b46a2e08180cad158252581f0f1b8640d7a00b13c714d71b3d2
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col,to_timestamp,expr

#Initialize SparkSession
spark=SparkSession.builder.appName("Structured Streaming Example").getOrCreate()
date_format = "yyyy-MM-dd"
#Define the schema for the csv data
TRANSACTION_schema="TransactionID STRING , TransactionDate STRING , ProductID STRING , Quantity INT ,Price DOUBLE"

#Read streaming data from csv files
df_transaction_stream=spark.readStream \
    .format("csv") \
        .option("header","true") \
            .schema(TRANSACTION_schema) \
                .load("/content/sample_data/")

df_transaction_stream.printSchema()
df_with_timestamp = df_transaction_stream.withColumn("TransactionDate", to_timestamp("TransactionDate", date_format))

root
 |-- TransactionID: string (nullable = true)
 |-- TransactionDate: string (nullable = true)
 |-- ProductID: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- Price: double (nullable = true)



In [6]:
# Task 1: Ingest Streaming Data from CSV Files
# 1. Create a folder for streaming CSV files.
# 2. Set up a structured streaming source to continuously read CSV data from this
# folder.

# 3. Ensure that the streaming query reads the data continuously in append mode and
# displays the results in the console.
df_with_timestamp.writeStream \
    .format("console") \
        .outputMode("append") \
            .start()


<pyspark.sql.streaming.query.StreamingQuery at 0x7b9842ce6a10>

In [20]:
# Task 2: Stream Transformations
# 1. Once the data is streaming in, perform transformations on the incoming data:
# Add a new column for the TotalAmount ( Quantity * Price ).
# Filter records where the Quantity is greater than 1.
transformed_df = df_with_timestamp.withColumn("TotalAmount", df_with_timestamp["Quantity"] * df_with_timestamp["Price"]) \
                             .filter(df_with_timestamp["Quantity"] > 1)


watermarked_df = transformed_df \
    .withWatermark("TransactionDate", "1 day") \
    .groupBy("ProductID") \
    .agg(expr("sum(TotalAmount) as TotalSales"))
# 2. Write the transformed stream to a memory sink to see the updated results
# continuously.
# spark.streams.active[0].stop()
query = transformed_df.writeStream \
    .format("memory") \
    .queryName("transformed_stream") \
    .outputMode("append") \
    .start()






In [21]:
spark.sql("SELECT * FROM transformed_stream").show(truncate=False)

+-------------+-------------------+---------+--------+------+-----------+
|TransactionID|TransactionDate    |ProductID|Quantity|Price |TotalAmount|
+-------------+-------------------+---------+--------+------+-----------+
|T101         |2024-01-01 00:00:00|Laptop   |2       |1200.0|2400.0     |
|T103         |2024-01-03 00:00:00|Tablet   |3       |600.0 |1800.0     |
|T105         |2024-01-05 00:00:00|Mouse    |5       |25.0  |125.0      |
+-------------+-------------------+---------+--------+------+-----------+



In [8]:
# Task 3: Aggregations on Streaming Data
# 1. Implement an aggregation on the streaming data:
# Group the data by ProductID and calculate the total sales for each

# Use the transformed_df DataFrame which contains the TotalAmount column
from pyspark.sql import functions as F

aggregated_df = transformed_df.groupBy("ProductID") \
                            .agg(F.sum(transformed_df["Quantity"] * transformed_df["Price"]).alias("TotalSales"))


# product (i.e., sum of Quantity * Price for each product).
# 2. Ensure the stream runs in update mode, so only updated results are output to
# the sink.

memory_query = transformed_df.writeStream \
    .format("memory") \
    .queryName("product_sales") \
    .outputMode("update") \
    .start()

In [None]:
# Task 4: Writing Streaming Data to File Sinks
# 1. After transforming and aggregating the data, write the streaming results to a
# Parquet sink.
# 2. Ensure that you configure a checkpoint location to store progress and ensure
# recovery in case of failure.
parquet_query = aggregated_df.writeStream \
    .format("parquet") \
    .option("path", "/mnt/streaming_output/") \
    .option("checkpointLocation", "/mnt/checkpoints/") \
    .outputMode("append") \
    .start()


In [10]:
# Task 5: Handling Late Data using Watermarks
# 1. Introduce a watermark on the TransactionDate column to handle late data
# arriving in the stream.

# 2. Set the watermark to 1 day to allow late data within a 24-hour period and
# discard data that is older.

watermark_df = transformed_df.withWatermark("TransactionDate", "1 day")
watermarked_aggregated_df = watermark_df.groupBy("ProductID") \
                                        .agg({"TotalAmount": "sum"}) \
                                        .withColumnRenamed("sum(TotalAmount)", "TotalSales")

In [None]:
# Task 6: Streaming from Multiple Sources
# 1. Simulate a scenario where two streams of data are being ingested:
# Stream 1: Incoming transaction data (same as Task 1).
# Stream 2: Product information (CSV with columns: ProductID, ProductName,
# Category).
# 2. Perform a join on the two streams using the ProductID column and display the
# combined stream results.
product_schema = "ProductID STRING, ProductName STRING, Category STRING"

product_stream = spark.readStream.format("csv") \
    .option("header", "true") \
    .schema(product_schema) \
    .load("/product_data/")

joined_stream = watermarked_df.join(product_stream, "ProductID")

joined_query = joined_stream.writeStream \
    .format("console") \
    .outputMode("append") \
    .start()

joined_query.awaitTermination()

In [None]:
# Task 7: Stopping and Restarting Streaming Queries
# 1. Stop the streaming query and explore the results.
# 2. Restart the query and ensure that it continues from the last processed data by
# utilizing the checkpoint.
restarted_query = transformed_df.writeStream \
    .format("console") \
    .option("checkpointLocation", "/mnt/checkpoints/") \
    .start()

restarted_query.awaitTermination()