In [0]:
awsAccessKeyId = spark.conf.get("awsAccessKeyId")
awsSecretKey = spark.conf.get("awsSecretKey")
kinesisStreamName = spark.conf.get("kinesisStreamName")
kinesisRegion = spark.conf.get("kinesisRegion")

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from pyspark.sql.functions import col, from_json, current_timestamp, count, when, coalesce, lit, window
import dlt

# Define schema for the JSON data
schema = StructType([
  StructField("post_id", StringType()), 
  StructField("platform", StringType()),
  StructField("engagement_type", StringType()),
  StructField("user_id", StringType()),
  StructField("timestamp", TimestampType())
])

# Read streaming data from Kinesis
@dlt.table
def social_media_engagement_streaming_bronze():
    return (
        spark
        .readStream
        .format("kinesis")
        .option("streamName", kinesisStreamName)
        .option("initialPosition", "trim_horizon")
        .option("awsAccessKey", awsAccessKeyId)
        .option("awsSecretKey", awsSecretKey)
        .option("region", kinesisRegion)
        .load()
        # Parse JSON data and extract fields
        .withColumn("value", from_json(col("data").cast("string"), schema))
        .withColumn("process_time", current_timestamp())
        .select("value.post_id", "value.platform", "value.engagement_type", "value.user_id", "value.timestamp", "process_time")
  )

In [0]:
from pyspark.sql.functions import coalesce, lit

@dlt.table
def social_media_engagement_streaming_silver():
    return (
        dlt.read_stream("social_media_engagement_streaming_bronze")
        .withColumn("platform", coalesce(col("platform"), lit("unknown_platform")))
        .withColumn("engagement_type", coalesce(col("engagement_type"), lit("unknown_engagement")))
        .withColumn("user_id", coalesce(col("user_id"), lit("unknown_user")))
    )

In [0]:
from pyspark.sql.functions import window

@dlt.table
def social_media_engagement_streaming_gold():
    return (
        dlt.read_stream("social_media_engagement_streaming_silver")
        # Apply watermark to handle late data by one minute
        .withWatermark("timestamp", "1 minute")
        # Sliding window by 5 minutes
        groupBy(col("post_id"), col("platform"), window(col("timestamp"), "2 minutes"))
        # Count the number of likes, shares, and comments within the window
        # per post_id and platform
        .agg(
            count(when(col("engagement_type").equalTo("like"), 1)).alias("likes"),
            count(when(col("engagement_type").equalTo("share"), 1)).alias("shares"),
            count(when(col("engagement_type").equalTo("comment"), 1)).alias("comments"),
            count("*").alias("total_engagements")
        )
    )

In [0]:
# from pyspark.sql.types import StructType, StructField, StringType, TimestampType
# from pyspark.sql.functions import col, from_json, current_timestamp, count, when

# # Define schema for the JSON data
# schema = StructType([
#   StructField("post_id", StringType(), True), 
#   StructField("platform", StringType(), True),
#   StructField("engagement_type", StringType(), True),
#   StructField("user_id", StringType(), True),
#   StructField("timestamp", TimestampType())
# ])

# # Read streaming data from Kinesis
# (
#   spark
#     .readStream
#     .format("kinesis")
#     .option("streamName", kinesisStreamName)
#     .option("initialPosition", "trim_horizon")
#     .option("awsAccessKey", awsAccessKeyId)
#     .option("awsSecretKey", awsSecretKey)
#     .option("region", kinesisRegion)
#   .load()
#   # Parse JSON data and extract fields
#   .withColumn("value", from_json(col("data").cast("string"), schema))
#   .withColumn("process_time", current_timestamp())
#   .select("value.post_id", "value.platform", "value.engagement_type", "value.user_id", "value.timestamp", "process_time")
#     .writeStream
#     .queryName("social_media_streaming_engagemen_bronzet")
#     .option("checkpointLocation", "/Volumes/workspace/default/checkpoint/kinesis_stream_bronze")
#     .trigger(availableNow=True)  # Trigger the query to process available data
#     .outputMode("append")  # Append new records to the table 
#   .toTable("workspace.default.social_media_engagement_streaming_bronze")  
# )

In [0]:
# from pyspark.sql.functions import coalesce, lit

# # Read streaming data from the bronze table
# (
#   spark
#     .readStream
#       .table("workspace.default.social_media_engagement_streaming_bronze")
#       # Handle null values by replacing them with default values
#       .withColumn("post_id", coalesce(col("post_id"), lit("unknown post")))
#       .withColumn("platform", coalesce(col("platform"), lit("unknown_platform")))
#       .withColumn("engagement_type", coalesce(col("engagement_type"), lit("unknown_engagement")))
#       .withColumn("user_id", coalesce(col("user_id"), lit("unknown_user")))
#     # Write the cleaned data to the silver table
#     .writeStream
#       .queryName("social_media_engagement_streaming_silver")
#       .option("checkpointLocation", "/Volumes/workspace/default/checkpoint/kinesis_stream_silver")
#       .trigger(availableNow=True)
#       .outputMode("append")
#     .toTable("workspace.default.social_media_engagement_streaming_silver")
# )

In [0]:
# from pyspark.sql.functions import window

# (
#   spark
#     .readStream
#     # Read the streaming data from the silver table
#     .table("workspace.default.social_media_engagement_streaming_silver")
#     # Apply watermark to handle late data by one minute
#     .withWatermark("timestamp", "1 minute")
#     # Sliding window by 5 minutes
#     .groupBy("post_id", "platform", window(col("timestamp"), "2 minutes"))
#     # Count the number of likes, shares, and comments within the window
#     # per post_id and platform
#     .agg(
#       count(when(col("engagement_type") == "like", 1)).alias("likes"),
#       count(when(col("engagement_type") == "share", 1)).alias("shares"),
#       count(when(col("engagement_type") == "comment", 1)).alias("comments"),
#       count("*").alias("total_engagements")
#     )
#     # Write the aggregated data to the gold table
#     .writeStream
#     .queryName("social_media_engagement_streaming_gold")
#     .option("checkpointLocation", "/Volumes/workspace/default/checkpoint/kinesis_stream_gold")
#     .trigger(availableNow=True) 
#     .outputMode("append")
#     .toTable("workspace.default.social_media_engagement_streaming_gold")
# )