In [1]:
#Carga de librerias
from pyspark.sql.functions import from_json, col, to_timestamp, window, expr, sum,count
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [2]:
#creación de session spark
spark = SparkSession \
        .builder \
        .appName("Youtube Video Reaction RT") \
        .master("local[3]") \
        .config("spark.streaming.stopGracefullyOnShutdown", "true") \
        .config("spark.sql.shuffle.partitions", 2) \
        .getOrCreate()

In [3]:
#creamos el esquema para leer la data en formato JSON que recibimos de KAFKA
reactions_schema = StructType([
        StructField("CreatedTime", StringType()),
        StructField("Type", StringType()),
        StructField("Amount", IntegerType()),
        StructField("VideoName", StringType()),
        StructField("VideoCode", IntegerType())
    ])

In [4]:
#creamos el esquema para leer la data en formato JSON que recibimos de KAFKA
comments_schema = StructType([
        StructField("CreatedTime", StringType()),
        StructField("VideoName", StringType()),
        StructField("VideoCode", IntegerType()),
        StructField("Comment", StringType()),
    ])

In [5]:
#Creamos el objeto de lectura del KAFKA
kafka_reactions_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "reactions") \
        .option("startingOffsets", "earliest") \
        .load()

In [6]:
#Transformamos el campo 'value' a una estructura del tipo JSON usando el esquema definido previamente
value_reactions_df = kafka_reactions_df.select(from_json(col("value").cast("string"), reactions_schema).alias("value"))

In [7]:
#Realizamos calculos y casteamos el campo 'CreatedTime' a un tipo de dato TimeStamp
reactions_df = value_reactions_df.select("value.*") \
        .withColumn("CreatedTime", to_timestamp(col("CreatedTime"), "yyyy-MM-dd HH:mm:ss")) \
        .withColumn("Like", expr("case when Type == 'LIKE' then Amount else 0 end")) \
        .withColumn("Dislike", expr("case when Type == 'DISLIKE' then Amount else 0 end"))

In [8]:
#Se realiza una agregación considerando una ventana de 5 minutos a partir del campo 'CreatedTime'
windowr_agg_df = reactions_df \
      .groupBy(col("VideoCode"),window(col("CreatedTime"), "5 minute")) \
        .agg(sum("Like").alias("TotalLikes"),
             sum("Dislike").alias("TotalDislikes"))

In [9]:
#Creamos el objeto de lectura del KAFKA
kafka_comments_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "comments") \
        .option("startingOffsets", "earliest") \
        .load()

In [10]:
#Transformamos el campo 'value' a una estructura del tipo JSON usando el esquema definido previamente
value_comments_df = kafka_comments_df.select(from_json(col("value").cast("string"), comments_schema).alias("value"))

In [11]:
#Realizamos calculos y casteamos el campo 'CreatedTime' a un tipo de dato TimeStamp
comments_df = value_comments_df.select("value.*") \
        .withColumn("CreatedTime", to_timestamp(col("CreatedTime"), "yyyy-MM-dd HH:mm:ss")) 

In [12]:
comments_df=comments_df.withColumnRenamed("VideoCode", "VCC")

In [13]:
#Se realiza una agregación considerando una ventana de 5 minutos a partir del campo 'CreatedTime'
windowc_agg_df = comments_df \
      .groupBy(col("VCC"),window(col("CreatedTime"), "5 minute").alias("cwindow")) \
        .agg(count("Comment").alias("TotalComments"))

In [14]:
#join_expr = "VideoCode == VCC" #+ \
               # " AND window == cwindow"
#join_type = "inner"

In [15]:
#joined_df = windowr_agg_df.join(windowc_agg_df, expr(join_expr), join_type)

In [16]:
#Se realiza una agregación considerando una ventana de 15 minutos a partir del campo 'CreatedTime'
#Se declara como limite para la expiracion (Watermark) de los datos el rango de 30 minutos a partir del maximo 'CreatedTime'
#window_agg_df = trade_df \
  #      .withWatermark("CreatedTime", "30 minute") \
  #      .groupBy(window(col("CreatedTime"), "15 minute")) \
  #      .agg(sum("Buy").alias("TotalBuy"),
    #         sum("Sell").alias("TotalSell"))

In [17]:
#Se selecciona los resultados que se mostraran
#output_reactions_df = windowr_agg_df.select("VideoCode","window.start", "window.end", "TotalLikes", "TotalDislikes")
output_df = windowr_agg_df
#output_df = windowc_agg_df

In [18]:
#Se imprime los resultados en la consola
window_query = output_df.writeStream \
        .format("console") \
        .outputMode("update") \
        .option("checkpointLocation", "./checkpoint/reactions-comments-window") \
        .trigger(processingTime="30 second") \
        .start()

In [None]:
window_query.awaitTermination()