In [0]:
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

spark.conf.set("fs.s3a.access.key", "XXX")
spark.conf.set("fs.s3a.secret.key", "XXX")
spark.conf.set("fs.s3a.endpoint", "s3.us-east-1.amazonaws.com")
# df = spark.read.format("parquet").load("s3a://mlops-clean-data-bucket/path/customer_features/")
# df.show()

In [0]:
# Define schema
schema = StructType() \
    .add("customer_id", StringType()) \
    .add("event", StringType()) \
    .add("timestamp", TimestampType()) \
    .add("product_id", StringType()) \
    .add("amount", DoubleType())

kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "54.160.8.148:9092") \
    .option("subscribe", "customer_events") \
    .option("startingOffsets", "latest") \
    .load()

# Extract value (in JSON string format)
json_df = kafka_df.selectExpr("CAST(value AS STRING)")

# Parse JSON to columns
parsed_df = json_df.select(from_json(col("value"), schema).alias("data")).select("data.*")

parsed_df.printSchema()


root
 |-- customer_id: string (nullable = true)
 |-- event: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- amount: double (nullable = true)



In [0]:
from pyspark.sql.functions import count, avg, window, to_utc_timestamp

features_df = parsed_df \
    .withColumnRenamed("timestamp", "event_time") \
    .withWatermark("event_time", "1 minutes") \
    .groupBy(
        col("customer_id"),
        window(col("event_time"), "30 second")
        
    ).agg(
        count("event").alias("total_events"),
        avg("amount").alias("avg_amount")
    )


In [0]:
# use when required
# parsed_df.writeStream \
#     .format("console") \
#     .outputMode("append") \
#     .option("truncate", False) \
#     .option("checkpointLocation", "/tmp/spark_checkpoints") \
#     .start()
# use when required
features_df.writeStream \
    .format("parquet") \
    .option("path", "s3a://mlops-clean-data-bucket/customer_features/") \
    .option("checkpointLocation", "/tmp/checkpoints/feature-stream/") \
    .outputMode("append") \
    .trigger(processingTime='2 minutes') \
    .queryName("features_stream") \
    .start()
# query.awaitTermination()

In [0]:
#Extras
#--------removefolder --------
# dbutils.fs.rm("dbfs:/tmp/checkpoints/feature-stream", True)
#-------show---------
# df = spark.read.format("parquet").load("s3a://mlops-clean-data-bucket/path/customer_features/")
# df.show()
#----count------------
# df = spark.read.format("parquet").load("s3a://mlops-clean-data-bucket/customer_features/")
# print("Row count:", df.count())