In [23]:
from pyspark.sql.functions import col, from_json, sum, window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, sum, window, to_timestamp


spark = SparkSession.builder \
    .appName("StreamingApp") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0") \
    .getOrCreate()



In [24]:

# Định nghĩa schema cho dữ liệu JSON
schema = StructType([
   StructField("transaction_id", StringType(), True),
   StructField("user_id", StringType(), True), 
   StructField("amount", DoubleType(), True),
   StructField("timestamp", TimestampType(), True)
])

# Đọc dữ liệu từ Kafka và xử lý
streaming_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "ecommerce_topic") \
    .load()

In [25]:
checkpoint_dir = "/tmp/quickcommerce_streaming_checkpoint"

In [26]:
# Create and start streaming query
query = streaming_df.writeStream \
    .format("console") \
    .outputMode("append") \
    .option("checkpointLocation", checkpoint_dir) \
    .start()

25/02/08 15:16:03 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/02/08 15:16:03 WARN StreamingQueryManager: Stopping existing streaming query [id=f17bb05c-6e23-4c72-b4e5-62acbe13065f, runId=c3c424b2-65d2-4c55-96a7-b650165e5ede], as a new run is being started.


In [27]:
streaming_df = streaming_df.selectExpr("CAST(value AS STRING)") \
    .select(from_json(col("value"), schema).alias("data")) \
    .select("data.*")

25/02/08 15:16:03 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


In [28]:
streaming_query = streaming_df.writeStream\
.trigger(processingTime = "10 seconds") \
.option("checkpointLocation", "/tmp/trigger_checkpoint")\
.format("console")\
.start()

25/02/08 15:16:03 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/02/08 15:16:03 WARN StreamingQueryManager: Stopping existing streaming query [id=6b71e534-6065-4772-9566-2d08149a9dbf, runId=945613fe-30d6-42cd-9337-1548041a4c14], as a new run is being started.


In [29]:

# Chuyển đổi timestamp thành đúng định dạng
streaming_df = streaming_df.withColumn("timestamp", 
    col("timestamp").cast(TimestampType()))

# Sau đó mới thêm watermark
streaming_df = streaming_df.withWatermark("timestamp", "5 minute")
# Filter transactions greater than $300
filtered_df = streaming_df.filter("amount > 300")

# Group by user_id and calculate total amount per user
aggregated_df = filtered_df.groupBy("user_id").agg(sum("amount").alias("total_amount"))

# Write aggregated data to console for testing
aggregated_df.writeStream\
    .format("console")\
    .outputMode("complete")\
    .start()

 
# Calculate total amount per user in 10-minute windows
windowed_df = filtered_df.groupBy(
    window(col("timestamp"), "10 minutes"), 
    col("user_id")
).agg(sum(col("amount")).alias("total_amount"))


# Write windowed data to JSON files
windowed_df.writeStream\
    .format("json")\
    .option("path", "/tmp/late_data")\
    .option("checkpointLocation", "/tmp/late_data_checkpoint")\
    .start()

25/02/08 15:16:03 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/02/08 15:16:03 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /private/var/folders/1w/m7nb0f692jl8hnp5vcwrn0rh0000gn/T/temporary-b387bc5d-f84f-4dc8-9911-5de2fda74fef. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/02/08 15:16:03 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/02/08 15:16:03 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/02/08 15:16:03 WARN Re

<pyspark.sql.streaming.query.StreamingQuery at 0x1206689e0>

In [30]:
filtered_df = streaming_df.filter("amount > 1000")


In [31]:
from pyspark.sql.functions import col, when

classified_df = filtered_df.withColumn(
    "classification", 
    when(col("amount") >= 5000, "very high value")
    .when(col("amount") >= 3000, "high value")
    .otherwise("low value")  # Ensure fallback condition
)


In [32]:
classified_df.writeStream \
    .format("console") \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/classified_checkpoint") \
    .start() \
    .awaitTermination()


25/02/08 15:16:03 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 48
-------------------------------------------
+--------------+-------+-------+-------------------+---------------+
|transaction_id|user_id| amount|          timestamp| classification|
+--------------+-------+-------+-------------------+---------------+
|         11713|   6376|9298.32|2025-02-08 15:11:06|very high value|
+--------------+-------+-------+-------------------+---------------+



25/02/08 15:16:04 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
25/02/08 15:16:04 WARN HDFSBackedStateStoreProvider: The state for version 98 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
25/02/08 15:16:04 WARN HDFSBackedStateStoreProvider: The state for version 98 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
25/02/08 15:16:04 WARN HDFSBackedStateStoreProvider: The state for version 98 doesn't exist in loadedMaps. Reading snapshot file and delta files if needed...Note that this is normal for the first batch of starting query.
25/02/08 15:16:04 WARN HDFSBackedStateStoreProvider: The state for version 98 doesn't exist in loadedMaps. Reading snapshot file and delta f

-------------------------------------------
Batch: 49
-------------------------------------------
+--------------+-------+-------+-------------------+---------------+
|transaction_id|user_id| amount|          timestamp| classification|
+--------------+-------+-------+-------------------+---------------+
|         23598|   5885|9990.54|2025-02-08 15:11:07|very high value|
|         24430|   8905|9496.72|2025-02-08 15:11:09|very high value|
|         72761|   1500|4692.29|2025-02-08 15:11:10|     high value|
|         34192|   9794|6326.21|2025-02-08 15:11:11|very high value|
|         60831|   7091|4763.51|2025-02-08 15:11:12|     high value|
|         66167|   4109|3984.27|2025-02-08 15:11:13|     high value|
|         59725|   4348|1829.91|2025-02-08 15:11:14|      low value|
|         53171|   2885|8058.57|2025-02-08 15:11:15|very high value|
|         36352|   1495|2595.83|2025-02-08 15:11:17|      low value|
|         39287|   7936|1242.13|2025-02-08 15:11:18|      low value|
|    

ERROR:root:KeyboardInterrupt while sending command.                             
Traceback (most recent call last):
  File "/Users/phuongnguyen/Desktop/untitled folder/.venv/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/phuongnguyen/Desktop/untitled folder/.venv/lib/python3.12/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.12/lib/python3.12/socket.py", line 707, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [None]:
#calculate the total transaction amount for each user, updated it in real-time
aggregated_df = classified_df.groupBy("user_id").agg(sum("amount").alias("total_amount"))

In [None]:
#register streaming data frame as a temporary SQL table
classified_df.createOrReplaceTempView("transactions")

In [None]:
query = """
SELECT user_id, 
       SUM(amount) AS total_spent, 
       classification
FROM transactions
WHERE amount > 10000
GROUP BY user_id, classification
ORDER BY total_spent DESC 
"""
result_df = spark.sql(query)
query = result_df.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()

In [33]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, sum, window, to_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType

# Khởi tạo Spark session
spark = SparkSession.builder \
    .appName("QuickCommerce Streaming Pipeline") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1") \
    .getOrCreate()

# Định nghĩa schema cho dữ liệu JSON
schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("user_id", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("timestamp", StringType(), True)
])

# Đọc dữ liệu từ Kafka
streaming_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "ecommerce_topic") \
    .load()

# Chuyển đổi dữ liệu Kafka thành DataFrame
streaming_df = streaming_df.selectExpr("CAST(value AS STRING) as json") \
    .select(from_json(col("json"), schema).alias("data")) \
    .select("data.*")

# Chuyển đổi kiểu dữ liệu
streaming_df = streaming_df.withColumn("amount", col("amount").cast("double"))
streaming_df = streaming_df.withColumn("timestamp", to_timestamp(col("timestamp"), "yyyy-MM-dd HH:mm:ss"))

# Lọc giao dịch có giá trị cao
streaming_df = streaming_df.withWatermark("timestamp", "5 minutes")
filtered_df = streaming_df.filter(col("amount") > 1000)

# Tổng hợp dữ liệu theo cửa sổ 10 phút
windowed_df = filtered_df.groupBy(
    window(col("timestamp"), "10 minutes"),
    col("user_id")
).agg(sum("amount").alias("total_amount"))

# Ghi dữ liệu vào Parquet
query = windowed_df.writeStream \
    .format("parquet") \
    .option("path", "/tmp/high_value_transactions") \
    .option("checkpointLocation", "/tmp/high_value_checkpoint") \
    .outputMode("append") \
    .start()

# Giữ pipeline chạy
query.awaitTermination()


25/02/08 15:17:08 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
25/02/08 15:17:08 WARN StreamingQueryManager: Stopping existing streaming query [id=ee1e25ec-34b9-46f2-8566-2e3f3529f7d0, runId=ef9edb5e-8d5d-45de-86f2-0294f7a25316], as a new run is being started.
25/02/08 15:17:08 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/Users/phuongnguyen/Desktop/untitled folder/.venv/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/phuongnguyen/Desktop/untitled folder/.venv/lib/python3.12/site-packages/py4j/clientserver.py", line 511, in send_command
    a

KeyboardInterrupt: 

25/02/08 15:46:57 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 10000 milliseconds, but spent 87472 milliseconds
