In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from random import randint

KAFKA_TOPIC_NAME = "test"
KAFKA_TOPIC_SINK_NAME = "sink"
KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"
# CHECKPOINT_LOCATION = "LOCAL DIRECTORY LOCATION (FOR DEBUGGING PURPOSES)"
CHECKPOINT_LOCATION = "/home/lucaslazzarini/Documentos/spark_project/temp"


In [2]:
scala_version = '2.12'
spark_version = '3.2.1'
kafka_version = '3.3.1'

packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_{scala_version}:{spark_version}',
    f'org.apache.kafka:kafka-clients:{kafka_version}',
    'org.xerial:sqlite-jdbc:3.34.0'
]

spark = (
    SparkSession.builder.appName("PysparkKafkaStreaming")
    .master("local[*]")
    .config("spark.jars.packages", ",".join(packages))
    .getOrCreate()
) 
spark.sparkContext.setLogLevel("ERROR")

:: loading settings :: url = jar:file:/home/lucaslazzarini/miniconda3/envs/class/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/lucaslazzarini/.ivy2/cache
The jars for the packages stored in: /home/lucaslazzarini/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.kafka#kafka-clients added as a dependency
org.xerial#sqlite-jdbc added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-5c804386-4429-4916-a92e-fecc1c41b69b;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.1 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-poo

22/12/12 18:21:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [12]:
df_sqlite = spark.read.format('jdbc') \
        .options(driver='org.sqlite.JDBC', dbtable='clientes',
                 url='jdbc:sqlite:/home/lucaslazzarini/Documentos/spark_project/database/database.db') \
        .load()

In [13]:
df_sqlite = df_sqlite.drop("index")

In [14]:
df_sqlite.show()


+--------------+-------+-----+----------------+--------------+-------------------+-----+
|codigo_cliente|   nome|idade|gerente_da_conta|conta_corrente|tipo_conta_corrente|score|
+--------------+-------+-----+----------------+--------------+-------------------+-----+
|             1|  Lucas|   22|         Leandro|          1234|              Povão|  400|
|             2|  Bruno|   35|         Eduardo|          4321|             Chefão|  800|
|             3|Roberto|   18|         Roberto|          5554|              Ricão|  650|
|             4|  Carla|   24|         Leandro|          4585|              Povão|  350|
|             5|Mirella|   60|         Eduardo|          4856|             Chefão|  850|
|             6|Matheus|   54|         Roberto|          9847|              Ricão|  700|
|             7|Gabriel|   44|         Leandro|          8854|              Povão|  375|
|             8| Filipe|   27|         Eduardo|          6541|             Chefão|  736|
|             9|Nicol

In [8]:
from pyspark.sql.functions import when

def funcao_de_tratamento(df, batchID):
    df_sqlite = spark.read.format('jdbc') \
        .options(driver='org.sqlite.JDBC', dbtable='clientes',
                 url='jdbc:sqlite:/home/lucaslazzarini/Documentos/spark_project/database/database.db') \
        .load()
    df_sqlite = df_sqlite.drop("index")
    if (df.count()>0):
        df.join(df_sqlite, on='codigo_cliente', how='left') \
        .withColumn("saldo_futuro", when(col('tipo_de_operacao') == "saque", col('saldo_em_conta') - col('valor_da_operacao'))
                                .when(col('tipo_de_operacao') == "deposito", col('saldo_em_conta') + col('valor_da_operacao'))
                                .otherwise('error')) \
        .withColumn("saldo_futuro", col("saldo_futuro").cast("Float")) \
        .withColumn("decisao", when(col('saldo_futuro') > 0, 'aprovado')
                        .otherwise('negado')) \
        .select(to_json(struct(col("*"))).alias("value")) \
        .write \
        .format("kafka") \
        .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
        .option("topic", KAFKA_TOPIC_SINK_NAME) \
        .save()
    return df

In [4]:
df_kfk = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
    .option("subscribe", KAFKA_TOPIC_NAME)
    .option("startingOffsets", "latest")
    .option("failOnDataLoss", "false")
    .load()
)

In [5]:
_schema = (StructType()
    .add('codigo_cliente', StringType())
    .add('agencia', StringType())
    .add('valor_da_operacao', LongType())
    .add('tipo_de_operacao', StringType())
    .add('data', StringType())
    .add('saldo_em_conta', LongType())
)

df_base = df_kfk.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)", "timestamp")
df_base = df_base.select(from_json(col("value"), _schema).alias("values"), "timestamp")
df_base = df_base.select("values.*")


In [9]:
stream_final = df_base \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
    .option("topic", KAFKA_TOPIC_SINK_NAME) \
    .option("checkpointLocation", CHECKPOINT_LOCATION) \
    .foreachBatch(funcao_de_tratamento) \
    .outputMode("append") \
    .start()


In [10]:
stream_final.status

{'message': 'Waiting for data to arrive',
 'isDataAvailable': False,
 'isTriggerActive': False}