# PySpark consumer from Kafka setup

Mostly based on [this Medium article](https://medium.com/@mrugankray/real-time-avro-data-analysis-with-spark-streaming-and-confluent-kafka-in-python-426f5e05392d)

In [3]:
from confluent_kafka.schema_registry import SchemaRegistryClient
import pyspark.sql.functions as func
from pyspark.sql.avro.functions import from_avro
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

In [4]:
spark = (
    SparkSession.builder.appName("kafka_test")
    .config(
        "spark.jars.packages",
        "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1," "org.apache.spark:spark-avro_2.12:3.5.1",
    )
    .getOrCreate()
)

topic = "postgres.public.resources_01_2014"
subject = "".join([topic, "-value"])
kafka_bootstrap_servers = "localhost:9091,localhost:9092,localhost:9093"


# For some weird reason, I'm not able to access my schema registry from outside the container using localhost. 
# If thats the case, do a docker network inspect on the default network and set the schema-registry-1 IPv4 here.
schema_registry_url = "http://172.19.0.13:8096"


schema_registry_client = SchemaRegistryClient({"url": schema_registry_url})
avro_schema = schema_registry_client.get_latest_version(subject)

In [5]:
df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", kafka_bootstrap_servers)
    .option("subscribe", topic)
    .option("startingOffsets", "earliest")
    .load()
)

In [6]:
df = df.withColumn("magicByte", func.expr("substring(value, 1, 1)"))
df = df.withColumn("valueSchemaId", func.expr("substring(value, 2, 4)"))
df = df.withColumn("fixedValue", func.expr("substring(value, 6, len(value)-5)"))
v_df = df.select("magicByte", "valueSchemaId", "fixedValue")

In [7]:
avro_opts = {"mode": "PERMISSIVE"}
decoded_output = v_df.select(from_avro(func.col("fixedValue"), avro_schema.schema.schema_str, avro_opts).alias("resources"))
v_df = decoded_output.select("resources.*")

In [8]:
v_df.printSchema()

root
 |-- before: struct (nullable = true)
 |    |-- id: integer (nullable = true)
 |    |-- codigo_favorecido: string (nullable = true)
 |    |-- nome_favorecido: string (nullable = true)
 |    |-- sigla_uf: string (nullable = true)
 |    |-- nome_municipio: string (nullable = true)
 |    |-- codigo_orgao_superior: long (nullable = true)
 |    |-- nome_orgao_superior: string (nullable = true)
 |    |-- codigo_orgao: long (nullable = true)
 |    |-- nome_orgao: string (nullable = true)
 |    |-- codigo_unidade_gestora: long (nullable = true)
 |    |-- nome_unidade_gestora: string (nullable = true)
 |    |-- ano_e_mes_do_lancamento: integer (nullable = true)
 |    |-- valor_recebido: string (nullable = true)
 |-- after: struct (nullable = true)
 |    |-- id: integer (nullable = true)
 |    |-- codigo_favorecido: string (nullable = true)
 |    |-- nome_favorecido: string (nullable = true)
 |    |-- sigla_uf: string (nullable = true)
 |    |-- nome_municipio: string (nullable = true)
 |  

In [9]:
df = v_df.select("after").select("after.*")

query = df.writeStream \
    .outputMode("append") \
    .format("console") \
    .start()

query.awaitTermination()

24/08/12 10:24:53 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-32b798ce-e55b-47c8-95b2-b7b001a511e4. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/08/12 10:24:53 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/08/12 10:24:54 WARN AdminClientConfig: These configurations '[key.deserializer, value.deserializer, enable.auto.commit, max.poll.records, auto.offset.reset]' were supplied but are not used yet.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+---+-----------------+--------------------+--------+----------------+---------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+--------------+
| id|codigo_favorecido|     nome_favorecido|sigla_uf|  nome_municipio|codigo_orgao_superior| nome_orgao_superior|codigo_orgao|          nome_orgao|codigo_unidade_gestora|nome_unidade_gestora|ano_e_mes_do_lancamento|valor_recebido|
+---+-----------------+--------------------+--------+----------------+---------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+--------------+
|  1|   04890965000158|'' BG NORTE PETRO...|      SP| ITAQUAQUECETUBA|                25000|Ministério da Faz...|       25000|Ministério da Faz...|                170133|SUPERINTENDENCIA ...|                  1

                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+------+-----------------+--------------------+--------+-------------------+---------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+--------------+
|    id|codigo_favorecido|     nome_favorecido|sigla_uf|     nome_municipio|codigo_orgao_superior| nome_orgao_superior|codigo_orgao|          nome_orgao|codigo_unidade_gestora|nome_unidade_gestora|ano_e_mes_do_lancamento|valor_recebido|
+------+-----------------+--------------------+--------+-------------------+---------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+--------------+
|796001|   ***.195.626-**|JESSICA MARTINS B...|      MG|        MONTALVÂNIA|                26000|Ministério da Edu...|       26410|Instituto Federal...|                158377|INST.FED.DO NORT

                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+-------+-----------------+--------------------+--------+--------------------+---------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+--------------+
|     id|codigo_favorecido|     nome_favorecido|sigla_uf|      nome_municipio|codigo_orgao_superior| nome_orgao_superior|codigo_orgao|          nome_orgao|codigo_unidade_gestora|nome_unidade_gestora|ano_e_mes_do_lancamento|valor_recebido|
+-------+-----------------+--------------------+--------+--------------------+---------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+--------------+
|1359001|   ***.821.404-**|RAISSA SILVA RODR...|      PB|               PATOS|                26000|Ministério da Edu...|       26417|Instituto Federal...|                158138|     IFP

                                                                                

-------------------------------------------
Batch: 5
-------------------------------------------
+-------+-----------------+--------------------+--------+-------------------+---------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+--------------+
|     id|codigo_favorecido|     nome_favorecido|sigla_uf|     nome_municipio|codigo_orgao_superior| nome_orgao_superior|codigo_orgao|          nome_orgao|codigo_unidade_gestora|nome_unidade_gestora|ano_e_mes_do_lancamento|valor_recebido|
+-------+-----------------+--------------------+--------+-------------------+---------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+--------------+
|1629122|   08088541000125|WORLD TURISMO, TR...|      SP|SÃO JOSÉ DOS CAMPOS|                26000|Ministério da Edu...|       26414|Instituto Federal...|                158333|INST.FED.MAT

                                                                                

-------------------------------------------
Batch: 8
-------------------------------------------
+-------+-----------------+--------------------+--------+-----------------+---------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+--------------+
|     id|codigo_favorecido|     nome_favorecido|sigla_uf|   nome_municipio|codigo_orgao_superior| nome_orgao_superior|codigo_orgao|          nome_orgao|codigo_unidade_gestora|nome_unidade_gestora|ano_e_mes_do_lancamento|valor_recebido|
+-------+-----------------+--------------------+--------+-----------------+---------------------+--------------------+------------+--------------------+----------------------+--------------------+-----------------------+--------------+
|1720111|   ***.728.427-**|WAGNER ANDRE DA S...|      ES|            SERRA|                26000|Ministério da Edu...|       26271|Fundação Universi...|                154079|CENTRO DE SEL.E D...

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/home/vitor/dev/zoomcamp/receitas-beneficiarios/.venv/lib/python3.12/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/vitor/dev/zoomcamp/receitas-beneficiarios/.venv/lib/python3.12/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/socket.py", line 708, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 