In [None]:
'''
En este ejercicio se usarán Kafka, Spark, CSV y Avro

- Queremos serializar un CSV con Avro y enviarlo a un topic de Kafka, todo ello con NIFI

- Posteriormente consumimos de ese topic de Kafka el CSV serializado en Avro, y con Spark lo deserializamos y leemos

El CSV es:

nombre;apellido;sexo;edad;peso;altura
Pedro;Pérez;m;30;60;1.70
María;Díaz;F;35;55;1.65
Marcos;Rojo;M;20;62;1.80
Carolina;Martínez;f;21;59;1.71

'''

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr
from pyspark.sql.avro.functions import from_avro
import findspark
import pandas as pd
from deltalake.writer import write_deltalake

# Inicializar findspark
findspark.init()

In [2]:
# Crear la sesión de Spark
# Necesitaremos incluir las dependencias de AVRO y Kafka en spark.jars.packages
spark = SparkSession.builder \
    .appName("ReadingAvroFromKafka") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.13:3.5.1,org.apache.spark:spark-avro_2.13:3.5.1") \
    .getOrCreate()

24/07/16 10:14:41 WARN Utils: Your hostname, bosonituser-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
24/07/16 10:14:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/bosonituser/spark-3.5.1-bin-hadoop3-scala2.13/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/bosonituser/.ivy2/cache
The jars for the packages stored in: /home/bosonituser/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.13 added as a dependency
org.apache.spark#spark-avro_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-61a665cd-8484-470f-b0b5-a00531faf8b7;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.13;3.5.1 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.13;3.5.1 in central
	found org.apache.kafka#kafka-clients;3.4.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.10.3 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.4 in central
	found org.apache.hadoop#hadoop-client-api;3.3.4 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.scala-lang.modules#scala-parallel-collection

In [3]:
# Tendremos en kafka preparado el csv serializado en Avro, ésto se habrá realizado previamente en NIFI
# Leemos del topic que hemos especificado en el processor de NIFI

df_kafka = spark.read.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "csv_avro2") \
    .load()

df_kafka.show(truncate=False)

24/07/16 10:14:48 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
[Stage 0:>                                                          (0 + 1) / 1]

+----+----------------------------------------------------------------------------------------------------+---------+---------+------+-----------------------+-------------+
|key |value                                                                                               |topic    |partition|offset|timestamp              |timestampType|
+----+----------------------------------------------------------------------------------------------------+---------+---------+------+-----------------------+-------------+
|NULL|[02 0A 50 65 64 72 6F 02 0C 50 C3 A9 72 65 7A 02 02 6D 02 3C 02 78 02 9A 99 D9 3F]                  |csv_avro2|0        |0     |2024-07-16 10:14:14.864|0            |
|NULL|[02 0C 4D 61 72 C3 AD 61 02 0A 44 C3 AD 61 7A 02 02 46 02 46 02 6E 02 33 33 D3 3F]                  |csv_avro2|0        |1     |2024-07-16 10:14:14.864|0            |
|NULL|[02 0C 4D 61 72 63 6F 73 02 08 52 6F 6A 6F 02 02 4D 02 28 02 7C 02 66 66 E6 3F]                     |csv_avro2|0        |2     |2

                                                                                

In [4]:
# Definir el esquema Avro en formato JSON

avro_schema = '''
{
  "type": "record",
  "name": "nifiRecord",
  "namespace": "org.apache.nifi",
  "fields": [
    {"name": "nombre", "type": ["null", "string"], "default": null},
    {"name": "apellido", "type": ["null", "string"], "default": null},
    {"name": "sexo", "type": ["null", "string"], "default": null},
    {"name": "edad", "type": ["null", "int"], "default": null},
    {"name": "peso", "type": ["null", "int"], "default": null},
    {"name": "altura", "type": ["null", "float"], "default": null}
  ]
}
'''

# Deserializar los datos Avro leídos de Kafka
df_avro_deserialized = df_kafka.select(from_avro(col("value"), avro_schema).alias("person"))

# Seleccionar y mostrar los campos deserializados
df_person = df_avro_deserialized.select("person.*")
df_person.show(truncate=False)

24/07/16 10:14:57 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.


+--------+--------+----+----+----+------+
|nombre  |apellido|sexo|edad|peso|altura|
+--------+--------+----+----+----+------+
|Pedro   |Pérez   |m   |30  |60  |1.7   |
|María   |Díaz    |F   |35  |55  |1.65  |
|Marcos  |Rojo    |M   |20  |62  |1.8   |
|Carolina|Martínez|f   |21  |59  |1.71  |
+--------+--------+----+----+----+------+



                                                                                