In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr

spark = SparkSession.builder.appName("Lab7_Zad1").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

rate_df = (spark.readStream
           .format("rate")
           .option("rowsPerSecond", 5)
           .load())

events = (rate_df
          .withColumn("user_id", expr("concat('u', cast(rand()*100 as int))"))
          .withColumn("event_type", expr("case when rand() > 0.7 then 'purchase' else 'view' end"))
          .select("timestamp", "user_id", "event_type"))

batch_counter = {"count": 0}
def process_batch(df, batch_id):
    batch_counter["count"] += 1
    print(f"Batch ID: {batch_id}")
    df.show(truncate=False)

query = (events.writeStream
         .format("console")
         .outputMode("append")
         .foreachBatch(process_batch)
         .trigger(processingTime="10 seconds")
         .start())
query.awaitTermination()    

In [None]:
from pyspark.sql.functions import col

purchases = events.filter(col("event_type") == "purchase")

query = (purchases.writeStream
         .format("console")
         .outputMode("append")
         .foreachBatch(process_batch)
         .trigger(processingTime="10 seconds")
         .start())
query.awaitTermination()


In [None]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType

schema = StructType([
    StructField("user_id", StringType()),
    StructField("event_type", StringType()),
    StructField("timestamp", TimestampType()),
    StructField("product_id", StringType()),
    StructField("category", StringType()),
    StructField("price", DoubleType())
])

stream = (spark.readStream
          .schema(schema)
          .json("stream_data")) 

agg = stream.groupBy("event_type").count()

query = (agg.writeStream
         .outputMode("update")
         .format("console")
         .foreachBatch(process_batch)
         .start())
query.awaitTermination()

In [None]:
from pyspark.sql.functions import window

windowed = stream.groupBy(window("timestamp", "5 minutes"), "event_type").count()

query = (windowed.writeStream
         .outputMode("update")
         .format("console")
         .foreachBatch(process_batch)
         .start())

query.awaitTermination() 


In [None]:

windowed_watermarked = (stream.withWatermark("timestamp", "1 minute")
    .groupBy(window("timestamp", "5 minutes"), "event_type").count())

query = (windowed_watermarked.writeStream
         .outputMode("complete")
         .format("console")
         .foreachBatch(process_batch)
         .start())
query.awaitTermination()
query.stop() 


In [None]:
query = (events.writeStream
         .format("console")
         .outputMode("append")
         .foreachBatch(process_batch)
         .trigger(processingTime="10 seconds")
         .start())

query.awaitTermination()
