# Pyspark ETL

## Processamento inicial dos dados

In [50]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import to_date

In [51]:
# Configurar variáveis para acessar HDFS (caso necessário)
os.environ["HADOOP_USER_NAME"] = "root"  # executa operações no HDFS como usuário root

In [52]:
# Iniciar sessão Spark
spark = SparkSession.builder \
    .appName("SiasusETL") \
    .getOrCreate()

In [53]:
# Ler o arquivo CSV do HDFS
# vamos definir esquemas manualmente em alguns casos
# encoding Latin-1 para caracteres especiais
df = spark.read.format("csv") \
    .option("header", True) \
    .option("inferSchema", False) \
    .option("sep", ",") \
    .option("encoding", "ISO-8859-1") \
    .load("hdfs://localhost:9000/user/root/siasus/PSRS2301.csv")

In [54]:
df.printSchema()
df.show(5)

root
 |-- CNES_EXEC: string (nullable = true)
 |-- GESTAO: string (nullable = true)
 |-- CONDIC: string (nullable = true)
 |-- UFMUN: string (nullable = true)
 |-- TPUPS: string (nullable = true)
 |-- TIPPRE  : string (nullable = true)
 |-- MN_IND: string (nullable = true)
 |-- CNPJCPF: string (nullable = true)
 |-- CNPJMNT: string (nullable = true)
 |-- DT_PROCESS: string (nullable = true)
 |-- DT_ATEND: string (nullable = true)
 |-- CNS_PAC: string (nullable = true)
 |-- DTNASC: string (nullable = true)
 |-- TPIDADEPAC: string (nullable = true)
 |-- IDADEPAC: string (nullable = true)
 |-- NACION_PAC: string (nullable = true)
 |-- SEXOPAC: string (nullable = true)
 |-- RACACOR: string (nullable = true)
 |-- ETNIA: string (nullable = true)
 |-- MUNPAC: string (nullable = true)
 |-- MOT_COB: string (nullable = true)
 |-- DT_MOTCOB: string (nullable = true)
 |-- CATEND: string (nullable = true)
 |-- CIDPRI: string (nullable = true)
 |-- CIDASSOC: string (nullable = true)
 |-- ORIGEM_PAC:

In [55]:
from pyspark.sql.functions import col, trim, upper, when

# Exemplo 1: Renomear coluna com espaços e normalizar strings
df = df.withColumnRenamed("TIPPRE  ", "TIPPRE")  # remove espaços do nome da coluna
df = df.withColumn("CONDIC", upper(trim(col("CONDIC"))))  # normaliza CONDIC para maiúsculo sem espaços extras

# Exemplo 2: Converter tipos de dados
df = df.withColumn("IDADEPAC", col("IDADEPAC").cast(IntegerType()))
df = df.withColumn("PA_QTDPRO", col("PA_QTDPRO").cast(IntegerType()))
df = df.withColumn("PA_QTDAPR", col("PA_QTDAPR").cast(IntegerType()))

df = df.withColumn(
    "DTNASC",                      
    to_date(col("DTNASC"), "yyyyMMdd") 
)

# Exemplo 3: Filtrar registros inválidos (caso IDADEPAC não seja plausível, por ex: > 120 anos ou < 0)
df = df.filter((col("IDADEPAC") >= 0) & (col("IDADEPAC") < 120))

# Exemplo 4: Enriquecimento - criar faixa etária
df = df.withColumn(
    "FAIXA_ETARIA",
    when(col("IDADEPAC") < 18, "MENOR")
     .when((col("IDADEPAC") >= 18) & (col("IDADEPAC") < 60), "MAIOR")
     .when(col("IDADEPAC") >= 60, "IDOSO")
)

In [56]:
# df.summary().show()

In [57]:
json_rdd = df.toJSON().collect()

print(f"Total de registros a enviar: {len(json_rdd)}")
print("Exemplo de registro JSON:", json_rdd[0])

Total de registros a enviar: 82233
Exemplo de registro JSON: {"CNES_EXEC":"9573313","GESTAO":"432110","CONDIC":"PG","UFMUN":"432110","TPUPS":"70","TIPPRE":"00","MN_IND":"M","CNPJCPF":"88811948000178","CNPJMNT":"88811948000178","DT_PROCESS":"202301","DT_ATEND":"202301","CNS_PAC":"é{Ç{{äâ~}{üÇ~","DTNASC":"1969-11-27","TPIDADEPAC":"4","IDADEPAC":51,"NACION_PAC":"01","SEXOPAC":"F","RACACOR":"01","MUNPAC":"432110","MOT_COB":"28","CATEND":"01","CIDPRI":"F323","CIDASSOC":"F323","ORIGEM_PAC":"02","DT_INICIO":"20210121","COB_ESF":"S","CNES_ESF":"9573313","DESTINOPAC":"00","PA_PROC_ID":"0301080208","PA_QTDPRO":1,"PA_QTDAPR":1,"PA_SRV":"115","PA_CLASS_S":"002","SIT_RUA":"N","LOC_REALIZ":"C","INICIO":"20230101","FIM":"20230131","PERMANEN":"31","QTDATE":"1","QTDPCN":"1","NAT_JUR":"1244","FAIXA_ETARIA":"MAIOR"}


## Producer

In [58]:
from kafka import KafkaProducer

In [59]:
# Configura o produtor Kafka (envio de strings, então usaremos encoding de UTF-8)
producer = KafkaProducer(bootstrap_servers=['localhost:9092'], 
                         value_serializer=lambda v: v.encode('utf-8'))

In [60]:
topic_name = "atendimentos_psicossociais"

# Envia cada mensagem JSON para o tópico
for record in json_rdd:
    producer.send(topic_name, value=record)

In [61]:
# Força envio de qualquer mensagem pendente
producer.flush()
producer.close()

In [62]:
print(f"{len(json_rdd)} mensagens enviadas para o tópico '{topic_name}'.")

82233 mensagens enviadas para o tópico 'atendimentos_psicossociais'.


$kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic andimentos_psicossociais --from-beginning

## Consumer

In [63]:
from kafka import KafkaConsumer
from pymongo import MongoClient
import json

topic_name = "atendimentos_psicossociais"

# Conectar ao MongoDB (banco de dados 'siasus', coleção 'atendimentos')
mongo_client = MongoClient("localhost", 27017)
db = mongo_client["siasus"]
collection = db["atendimentos"]

# Configurar o consumidor Kafka
consumer = KafkaConsumer(
    topic_name,
    bootstrap_servers=['localhost:9092'],
    auto_offset_reset='earliest',  # garante começar do início do tópico
    enable_auto_commit=True,
    group_id="grupo-siasus-1",
    value_deserializer=lambda v: v.decode('utf-8')
)

# Consome as mensagens do tópico e insere no MongoDB
count = 0
for msg in consumer:
    # msg.value já é uma string JSON decodificada
    registro_json = msg.value  
    # converter string JSON para dicionário Python
    registro = json.loads(registro_json)
    # inserir no Mongo
    collection.insert_one(registro)
    count += 1
    if count % 1000 == 0:
        print(f"{count} registros inseridos...")

print(f"Consumo finalizado. Total de registros inseridos: {count}")
mongo_client.close()


1000 registros inseridos...
2000 registros inseridos...
3000 registros inseridos...
4000 registros inseridos...
5000 registros inseridos...
6000 registros inseridos...
7000 registros inseridos...
8000 registros inseridos...
9000 registros inseridos...
10000 registros inseridos...
11000 registros inseridos...
12000 registros inseridos...
13000 registros inseridos...
14000 registros inseridos...
15000 registros inseridos...
16000 registros inseridos...
17000 registros inseridos...
18000 registros inseridos...
19000 registros inseridos...
20000 registros inseridos...
21000 registros inseridos...
22000 registros inseridos...
23000 registros inseridos...
24000 registros inseridos...
25000 registros inseridos...
26000 registros inseridos...
27000 registros inseridos...
28000 registros inseridos...
29000 registros inseridos...
30000 registros inseridos...
31000 registros inseridos...
32000 registros inseridos...
33000 registros inseridos...
34000 registros inseridos...
35000 registros inserid

KeyboardInterrupt: 

In [64]:
print(collection.count_documents({}))
# ou, para ver um exemplo de documento:
doc = collection.find_one()
print(doc)

164466
{'_id': ObjectId('680aef97c48ba2998f1e4216'), 'CNES_EXEC': '9573313', 'GESTAO': '432110', 'CONDIC': 'PG', 'UFMUN': '432110', 'TPUPS': '70', 'TIPPRE': '00', 'MN_IND': 'M', 'CNPJCPF': '88811948000178', 'CNPJMNT': '88811948000178', 'DT_PROCESS': '202301', 'DT_ATEND': '202301', 'CNS_PAC': 'é{Ç{{äâ\x7f\x7f~}{üÇ~', 'DTNASC': '1969-11-27', 'TPIDADEPAC': '4', 'IDADEPAC': 51, 'NACION_PAC': '01', 'SEXOPAC': 'F', 'RACACOR': '01', 'MUNPAC': '432110', 'MOT_COB': '28', 'CATEND': '01', 'CIDPRI': 'F323', 'CIDASSOC': 'F323', 'ORIGEM_PAC': '02', 'DT_INICIO': '20210121', 'COB_ESF': 'S', 'CNES_ESF': '9573313', 'DESTINOPAC': '00', 'PA_PROC_ID': '0301080208', 'PA_QTDPRO': 1, 'PA_QTDAPR': 1, 'PA_SRV': '115', 'PA_CLASS_S': '002', 'SIT_RUA': 'N', 'LOC_REALIZ': 'C', 'INICIO': '20230101', 'FIM': '20230131', 'PERMANEN': '31', 'QTDATE': '1', 'QTDPCN': '1', 'NAT_JUR': '1244', 'FAIXA_ETARIA': 'MAIOR'}
