In [1]:
from pyspark.sql.functions import expr, col, from_json, to_timestamp, from_unixtime, window
from pyspark.sql.types import StructType, StructField, StringType, BooleanType, IntegerType, ArrayType

In [2]:
#Se crea la sesión de spark y se habilita la inferencia de Schema el cual no esta habilitado por defecto en streaming
spark = SparkSession \
        .builder \
        .appName("Proyecto") \
        .master("local[3]") \
        .config("spark.streaming.stopGracefullyOnShutdown", "true") \
        .config("spark.driver.extraClassPath", "/usr/share/java/mysql-connector-java-8.0.33.jar")\
        .config("spark.executor.extraClassPath", "/usr/share/java/mysql-connector-java-8.0.33.jar") \
        .getOrCreate()

In [3]:
from dotenv import load_dotenv
import os

load_dotenv()

jdbcHostname = os.environ['MYSQL_IP']
jdbcPort = os.environ['MYSQL_PORT']
jdbcUsername = os.environ['MYSQL_USER']
jdbcPassword = os.environ['MYSQL_PASS']
jdbcDatabase = os.environ['DBNAME']

jdbcUrl = f"jdbc:mysql://{jdbcHostname}:{jdbcPort}/{jdbcDatabase}"
jdbc_properties = {"user": jdbcUsername, "password": jdbcPassword, "driver": "com.mysql.cj.jdbc.Driver"}
    
def foreach_batch_function(df, epoch_id):
    df.write.jdbc(url=jdbcUrl, table="sparkkafka", mode="append", properties=jdbc_properties)

In [4]:
kafka_df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092") \
        .option("subscribe", "stackOverflow") \
        .option("startingOffsets", "earliest") \
        .load()

In [5]:
schema = StructType([
    StructField("tags", ArrayType(StringType())),
    StructField("owner", StructType([
            StructField("account_id", IntegerType()),
            StructField("reputation", IntegerType()),
            StructField("user_id", IntegerType()),
            StructField("user_type", StringType()),
            StructField("profile_image", StringType()),
            StructField("display_name", StringType()),
            StructField("link", StringType())            
    ])),
    StructField("is_answered", BooleanType()),
    StructField("view_count", IntegerType()),
    StructField("answer_count", IntegerType()),
    StructField("score", IntegerType()),
    StructField("last_activity_date", IntegerType()),
    StructField("creation_date", IntegerType()),
    StructField("question_id", IntegerType()),
    StructField("content_license", StringType()),
    StructField("link", StringType()),
    StructField("title", StringType())
])

In [6]:
value_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("value"))

In [7]:
questions_df = value_df.selectExpr("value.tags", "value.owner.display_name", "value.is_answered", "value.view_count",
                                   "value.answer_count", "value.score", "value.last_activity_date", 
                                   "value.creation_Date", "value.question_id", "value.title",
                                   "value.link")

In [8]:
questions_df.printSchema()

root
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- display_name: string (nullable = true)
 |-- is_answered: boolean (nullable = true)
 |-- view_count: integer (nullable = true)
 |-- answer_count: integer (nullable = true)
 |-- score: integer (nullable = true)
 |-- last_activity_date: integer (nullable = true)
 |-- creation_Date: integer (nullable = true)
 |-- question_id: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- link: string (nullable = true)



In [9]:
tags_df = questions_df\
                .withColumn("creation_Date", to_timestamp(from_unixtime(col("creation_Date")), "yyyy-MM-dd HH:mm:ss"))\
                .selectExpr("creation_Date", "explode(tags) as tag", "question_id")
#tags_df = questions_df.selectExpr("explode(tags) as tag", "question_id")

In [10]:
tags_df.printSchema()

root
 |-- creation_Date: timestamp (nullable = true)
 |-- tag: string (nullable = true)
 |-- question_id: integer (nullable = true)



In [16]:
counts_df=tags_df\
                .withWatermark("creation_Date", "1 hour")\
                .groupBy(col("tag"), window(col("creation_Date"), "15 minute"))\
                .count()

counts_df = counts_df.select("tag", "window.start", "window.end", "count")
#counts_df=tags_df\
#                .groupBy(col("tag"))\
#                .count()

In [17]:
counts_df.printSchema()

root
 |-- tag: string (nullable = true)
 |-- start: timestamp (nullable = true)
 |-- end: timestamp (nullable = true)
 |-- count: long (nullable = false)



In [19]:
query = counts_df.writeStream\
            .outputMode("append") \
            .foreachBatch(foreach_batch_function)\
            .option("checkpointLocation","./streaming/chk-point-dir")\
            .start()
query.awaitTermination()

ERROR:root:KeyboardInterrupt while sending command.                             
Traceback (most recent call last):
  File "/home/fabian/Spark/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/fabian/Spark/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

In [20]:
query.stop()