In [1]:
spark = SparkSession \
    .builder \
    .master("local[3]") \
    .appName("IABD Kafka join MariaDB") \
    .config("spark.streaming.stopGracefullyOnShutdown", "true") \
    .config("spark.sql.shuffle.partitions", 2) \
    .getOrCreate()

In [2]:
from pyspark.sql.types import StructType, StructField, StringType
kafkaSchema = StructType([
    StructField("login_id", StringType()),
    StructField("login_time", StringType())
])

In [3]:
kafkaDF = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "iabd-virtualbox:9092") \
    .option("subscribe", "usuariosk") \
    .option("startingOffsets", "earliest") \
    .load()

In [4]:
from pyspark.sql.functions import from_json, col

# Pasamos el value de Kafka a string y luego a JSON
valueDF = kafkaDF.select(from_json(col("value").cast("string"), kafkaSchema).alias("value"))

In [5]:
from pyspark.sql.functions import to_timestamp

# Cast del campo login_time a tipo fecha
loginDF = valueDF.select("value.*") \
    .withColumn("login_time", to_timestamp(col("login_time"), "yyyy-MM-dd HH:mm:ss"))

In [6]:
jdbcDF = spark.read \
    .format("jdbc") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("url", "jdbc:mysql://localhost") \
    .option("dbtable", "spark.usuarios") \
    .option("user", "iabd") \
    .option("password", "iabd") \
    .load()

In [7]:
jdbcDF.printSchema()

root
 |-- id: decimal(20,0) (nullable = true)
 |-- nombre: string (nullable = true)
 |-- ultimoLogin: timestamp (nullable = true)



In [8]:
joinExpr = loginDF.login_id == jdbcDF.id
joinType = "inner"

In [9]:
joinDF = loginDF.join(jdbcDF, joinExpr, joinType) \
    .drop(loginDF.login_id)
joinDF.printSchema()
# root
#  |-- login_time: timestamp (nullable = true)
#  |-- id: decimal(20,0) (nullable = true)
#  |-- nombre: string (nullable = true)
#  |-- ultimoLogin: timestamp (nullable = true)

root
 |-- login_time: timestamp (nullable = true)
 |-- id: decimal(20,0) (nullable = true)
 |-- nombre: string (nullable = true)
 |-- ultimoLogin: timestamp (nullable = true)



In [10]:
resultadoDF = joinDF.select(col("id"), col("nombre"), col("login_time"), col("ultimoLogin"))
resultadoDF = resultadoDF.filter(col("ultimoLogin") < col("login_time"))
resultadoDF.printSchema()

root
 |-- id: decimal(20,0) (nullable = true)
 |-- nombre: string (nullable = true)
 |-- login_time: timestamp (nullable = true)
 |-- ultimoLogin: timestamp (nullable = true)



In [11]:
queryWriter = resultadoDF.writeStream \
    .format("console") \
    .outputMode("update") \
    .option("checkpointLocation", "chk-point-dir-05") \
    .trigger(processingTime="1 minute") \
    .start()

In [None]:
queryWriter.awaitTermination()

In [None]:
def guardarMysql = (batch_df, batch_id):
    val url = """jdbc:mysql://localhost:3306/training"""
    batch_df
        .withColumn("batchId", lit(batchId)) \
        .write.format("jdbc") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .option("url", "jdbc:mysql://localhost") \
        .option("dbtable", "spark.usuarios") \
        .option("user", "iabd") \
        .option("password", "iabd") \
        .mode("append")
        .save()
        

In [None]:
queryWriter = resultadoDF.writeStream \
  .outputMode("append") \
  .foreachBatch(guardarMysql) \
  .start()

queryWriter.awaitTermination()