In [None]:
%pip install findspark

In [None]:

import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

import time

KAFKA_BOOTSTRAP_SERVER = "kafka1:29092"
TOPIC_NAME = 'transactions'
KEY = None

SQLITE3_URL = "jdbc:sqlite:/home/jovyan/work/tmp/db.sqlite"

CHECKPOINT_PATH = "/home/jovyan/work/tmp/checkpoint"
OUTPUT_PATH = "/home/jovyan/work/tmp/output"

In [None]:
SCALA_VERSION = '2.12'
SPARK_VERSION = '3.2.1'
KAFKA_VERSION = '3.3.1'

PACKAGES = [
    f'org.apache.spark:spark-sql-kafka-0-10_{SCALA_VERSION}:{SPARK_VERSION}',
    f'org.apache.kafka:kafka-clients:{KAFKA_VERSION}',
    'org.xerial:sqlite-jdbc:3.34.0'
]

spark = (
    SparkSession.builder.appName("pyspark_kafka_streaming")
    .config("spark.driver.host", "localhost")
    .master("local[*]")
    .config("spark.jars.packages", ",".join(PACKAGES))\
    .getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")

In [None]:
df_sqlite = spark.read.format("jdbc").options(url = SQLITE3_URL, driver = "org.sqlite.JDBC", dbtable = "customers")\
    .load()

message_schema = pyspark.sql.types.StructType()\
    .add("id", pyspark.sql.types.StringType())\
    .add("user_id", pyspark.sql.types.StringType())\
    .add("agence", pyspark.sql.types.StringType())\
    .add("operation_value", pyspark.sql.types.IntegerType())\
    .add("operation_type", pyspark.sql.types.StringType())\
    .add("date", pyspark.sql.types.StringType())\
    .add("account_balance", pyspark.sql.types.IntegerType())

df_kfk = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER)
    .option("subscribe", TOPIC_NAME)
    .option("startingOffsets", "latest")
    .load()
)

df_base = df_kfk.selectExpr("CAST(value AS STRING)")\
                .select(F.from_json(F.col("value"), message_schema))\
                .select("from_json(value).*")

df_base = df_base.withColumnRenamed("id", "transaction_id")
df_base = df_base.join(df_sqlite, df_base.user_id == df_sqlite.id, "left")
df_base = df_base.withColumn("status", F.when((df_base.account_balance - df_base.operation_value <= 0) & (df_base.operation_type == "Saque"), "blocked")
                                        .otherwise("active"))
df_base = df_base.drop("id")

stream_final = df_base \
    .writeStream \
    .format("console") \
    .option("path", OUTPUT_PATH) \
    .option("checkpointLocation", CHECKPOINT_PATH) \
    .outputMode("append") \
    .start() \
    .awaitTermination()
    