In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, TimestampType
from pyspark.sql.functions import col, lit, window, when, round, concat_ws

# Step 1: Create Spark session

spark = (
    SparkSession
    .builder
    .appName("Streaming Application")
    .enableHiveSupport()
    .config("spark.dynamicAllocation.minExecutors", 12)  # 2 is generally recommended
    .config("spark.dynamicAllocation.maxExecutors", 30)  # 2 is generally recommended
    .config("spark.executor.memory", "8g")  # default is 3 - adjust if needed
    .getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")

# Step 2: Define schema
schema = StructType() \
    .add("id", StringType()) \
    .add("first_name", StringType()) \
    .add("last_name", StringType()) \
    .add("role", StringType()) \
    .add("salary", IntegerType()) \
    .add("event_time", TimestampType())

# Step 3: Read streaming XML data from S3
xml_df = spark.readStream \
    .format("xml") \
    .option("rowTag", "employee") \
    .schema(schema) \
    .load("s3a://your-bucket/input-xml/")

# Step 4: Transformations
transformed_df = xml_df \
    .withColumn("name", concat_ws(" ", col("first_name"), col("last_name"))) \
    .drop("first_name", "last_name") \
    .withColumn("bonus", round(
        when(col("role") == "ENGINEER", col("salary") * 0.15)
        .when(col("role") == "ANALYST", col("salary") * 0.10)
        .otherwise(0), 2)) \
    .withColumn("source", lit("S3_XML_STREAM"))

# Step 5: Windowed aggregation
windowed_df = transformed_df \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window(col("event_time"), "10 minutes"),
        col("role")
    ) \
    .agg(
        {"*": "count", "bonus": "sum"}
    ) \
k   # Step 6: Write results to S3 in Parquet format
query = windowed_df.writeStream \
    .format("parquet") \
    .option("path", "s3a://your-bucket/output-bonus-windowed/") \
    .option("checkpointLocation", "s3a://your-bucket/checkpoints/bonus-job/") \
    .outputMode("append") \ 
    .start()

# Step 7: Wait for termination
query.awaitTermination()

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, from_json, to_json
from pyspark.sql.types import StructType, StringType, DoubleType, TimestampType

# 1. Create Spark session
spark = SparkSession.builder \
    .appName("KafkaWindowWatermarkExample") \
    .getOrCreate()

# 2. Schema for JSON messages
schema = StructType() \
    .add("device_id", StringType()) \
    .add("temperature", DoubleType()) \
    .add("event_time", TimestampType())  # event timestamp

# 3. Read stream from Kafka
df_raw = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "input_topic") \
    .option("startingOffsets", "latest") \
    .load()

# 4. Parse JSON and extract columns
df_parsed = df_raw.select(
    from_json(col("value").cast("string"), schema).alias("data")
).select("data.*")

# 5. Apply watermark and window
# Watermark: Wait up to 10 minutes for late data
# Window: Group events into 5-minute intervals
df_windowed = df_parsed \
    .withWatermark("event_time", "10 minutes") \
    .groupBy(
        window(col("event_time"), "5 minutes"),
        col("device_id")
    ) \
    .avg("temperature") \
    .withColumnRenamed("avg(temperature)", "avg_temp")

# 6. Prepare output for Kafka
df_to_kafka = df_windowed.selectExpr(
    "CAST(device_id AS STRING) AS key",
    "CAST(to_json(struct(*)) AS STRING) AS value"
)

# 7. Write back to Kafka
query = df_to_kafka.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("topic", "output_topic") \
    .option("checkpointLocation", "/tmp/kafka_window_checkpoint") \
    .outputMode("update") \  # update because aggregations are used
    .start()

query.awaitTermination()
