In [1]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, concat_ws, initcap, when, monotonically_increasing_id, split, explode,
    trim, regexp_replace, row_number, upper, translate, rand, floor, lit,
    to_date, date_format, coalesce # Adicionando coalesce para tentar múltiplos formatos
)
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window

# =======================================================================
# 1. CONFIGURAÇÃO E INICIALIZAÇÃO DO SPARK
# =======================================================================
conf = SparkConf()
conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901')
conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'com.amazonaws.auth.InstanceProfileCredentialsProvider')

spark = SparkSession.builder.config(conf=conf).getOrCreate()

# Definição dos caminhos de S3
RAW_INPUT_PATH = 's3a://bucket-raw-upa-connect-sofh/basesExternas/SindromeGripal/part-00000-9700a2d8-0bd0-49e1-a88a-97ac633d9d6a.c000.csv'
FINAL_OUTPUT_DIR = "s3a://bucket-trusted-upa-connect-sofh/BasesExternas/SindromeGripal"
FINAL_FILENAME = "sindromeGripalTratada.csv"
FINAL_OUTPUT_PATH = f"{FINAL_OUTPUT_DIR}/{FINAL_FILENAME}"
TEMP_STAGING_DIR = f"{FINAL_OUTPUT_DIR}/_temp_staging"


# Função para remover acentos de uma coluna (mantida)
def remover_acentos(df, nome_coluna):
    acentos = "áàâãäéèêëíìîïóòôõöúùûüçÁÀÂÃÄÉÈÊËÍÌÎÏÓÒÔÕÖÚÙÛÜÇ"
    sem_acentos = "aaaaaeeeeiiiiooooouuuucAAAAAEEEEIIIIOOOOOUUUUC"
    return df.withColumn(nome_coluna, translate(col(nome_coluna), acentos, sem_acentos))

# ... (Seção 2: LISTA DE BAIRROS - mantida) ...
bairros_sp = {
    "CENTRO": [
        "SE", "BELA VISTA", "BOM RETIRO", "CAMBUCI", "CONSOLACAO",
        "LIBERDADE", "REPUBLICA", "SANTA CECILIA"
    ],
    "NORTE": [
        "CASA VERDE", "CACHOEIRINHA", "LIMAO", "BRASILANDIA",
        "FREGUESIA DO O", "JACANA", "TREMEMBE", "PERUS",
        "ANHANGUERA", "PIRITUBA", "JARAGUA", "SAO DOMINGOS",
        "SANTANA", "TUCURUVI", "MANDAQUI", "VILA MARIA", "VILA GUILHERME"
    ],
    "SUL": [
        "CAMPO BELO", "CAMPO LIMPO", "CAPAO REDONDO", "CIDADE ADEMAR",
        "CIDADE DUTRA", "CURSINO", "GRAJAU", "ITAIM BIBI",
        "IPIRANGA", "JABAQUARA", "JARDIM ANGELA", "JARDIM SAO LUIS",
        "MARSILAC", "MOEMA", "MORUMBI", "PARELHEIROS", "PEDREIRA",
        "SACOMA", "SANTO AMARO", "SOCORRO", "SAUDE", "VILA ANDRADE",
        "VILA MARIANA", "VILA OLIMPIA"
    ],
    "LESTE": [
        "AGUA RASA", "ARICANDUVA", "ARTUR ALVIM", "BELEM", "BRAS",
        "CANGAIBA", "CARRAO", "CIDADE LIDER", "CIDADE TIRADENTES",
        "ERMELINO MATARAZZO", "GUAJANASES", "ITAIM PAULISTA",
        "ITAQUERA", "JARDIM HELENA", "JOSE BONIFACIO", "LAJEADO",
        "MOOCA", "PARI", "PARQUE DO CARMO", "PENHA", "PONTE RASA",
        "SAO LUCAS", "SAO MATEUS", "SAO MIGUEL", "SAO RAFAEL",
        "SAPOPEMBA", "TATUAPE", "VILA CURUCA", "VILA FORMOSA",
        "VILA JACUI", "VILA MATILDE", "VILA PRUDENTE"
    ],
    "OESTE": [
        "ALTO DE PINHEIROS", "BARRA FUNDA", "BUTANTA", "JAGUARA",
        "JAGUARE", "JARDIM PAULISTA", "JARDIM PAULISTANO", "JARDIM EUROPA",
        "JARDIM AMERICA", "LAPA", "MORUMBI", "PERDIZES", "PINHEIROS",
        "RAPOSO TAVARES", "RIO PEQUENO", "VILA LEOPOLDINA", "VILA MADALENA",
        "VILA SONIA", "ITAIM BIBI"
    ]
}

lista_bairros = [b for bairros in bairros_sp.values() for b in bairros]
# ... (Seção 2.1: EXCLUSÃO DO ARQUIVO FINAL ANTERIOR - mantida) ...

print(f"Verificando e excluindo o arquivo final anterior em: {FINAL_OUTPUT_PATH}")
try:
    Path = spark._jvm.org.apache.hadoop.fs.Path
    hadoop_conf = spark._jsc.hadoopConfiguration()
    fs = Path(FINAL_OUTPUT_DIR).getFileSystem(hadoop_conf)

    if fs.exists(Path(FINAL_OUTPUT_PATH)):
        fs.delete(Path(FINAL_OUTPUT_PATH), False)
        print(f"Arquivo anterior {FINAL_FILENAME} excluído com sucesso.")
    else:
        print(f"Arquivo anterior {FINAL_FILENAME} não encontrado. Prosseguindo.")

    if fs.exists(Path(TEMP_STAGING_DIR)):
         fs.delete(Path(TEMP_STAGING_DIR), True)
         print(f"Diretório temporário {TEMP_STAGING_DIR} anterior limpo.")

except Exception as e:
    print(f"\nATENÇÃO: Não foi possível verificar/excluir o arquivo final no S3: {e}")


# =======================================================================
# 3. LEITURA E LIMPEZA INICIAL DOS DADOS
# =======================================================================
df_raw = spark.read.option('delimiter', ';') \
                     .option('header', 'true') \
                     .option('nullValue', 'null') \
                     .option('encoding', 'UTF-8') \
                     .csv(RAW_INPUT_PATH)

# Incluindo "dataNotificacao" na seleção
df_selecionado = df_raw.select("idade", "sexo", "sintomas", "municipio", "outrosSintomas", "dataNotificacao")

# **INSPEÇÃO CRÍTICA DO FORMATO DE DATA**
print("\n>>> INSPECIONANDO COLUNA 'dataNotificacao' BRUTA (Primeiras 10 linhas) <<<")
df_selecionado.select("dataNotificacao").distinct().limit(10).show(truncate=False)


df_selecionado = remover_acentos(df_selecionado, "municipio")
df_selecionado = remover_acentos(df_selecionado, "sexo")
df_selecionado = remover_acentos(df_selecionado, "sintomas")
df_selecionado = remover_acentos(df_selecionado, "outrosSintomas")
df_selecionado = df_selecionado.withColumn("municipio", upper(col("municipio")))

df_limpo = df_selecionado.filter(col('idade').isNotNull() & (col('idade') != ''))
df_limpo = df_limpo.withColumn("idade", col("idade").cast(IntegerType()))

# =======================================================================
# 3.1. TRATAMENTO DA COLUNA DE DATA DE NOTIFICAÇÃO (Múltiplas Tentativas)
# =======================================================================
# Lista de formatos de data mais prováveis (dd/MM/yyyy, dd-MM-yyyy, yyyy-MM-dd)
FORMATOS_DATA = [
    "dd/MM/yyyy",  # Formato brasileiro comum
    "dd-MM-yyyy",  # Formato com hífen
    "yyyy-MM-dd"   # Formato ISO (comum em exports de banco de dados)
]

# Tenta converter a coluna "dataNotificacao" usando múltiplos formatos.
# coalesce() retorna o primeiro valor não nulo.
date_conversion_expr = coalesce(*[
    to_date(col("dataNotificacao"), fmt) for fmt in FORMATOS_DATA
])

df_limpo = df_limpo.withColumn(
    "DATA_NOTIFICACAO_TRATADA",
    date_conversion_expr
).drop("dataNotificacao")

# Se o DataFrame continuar vazio, esta é a linha responsável.
df_limpo = df_limpo.filter(col("DATA_NOTIFICACAO_TRATADA").isNotNull())

print("\n>>> CONTAGEM de Linhas após filtro de Data e Idade (Verificação) <<<")
print(f"Linhas restantes: {df_limpo.count()}")

# =======================================================================
# 4. TRATAMENTO DE SINTOMAS (mantida)
# =======================================================================
df_sintomas_unificados = df_limpo.withColumn(
    "sintomas",
    initcap(
        when(
            col("outrosSintomas").isNull() | (col("outrosSintomas") == ''),
            col("sintomas")
        ).otherwise(
            concat_ws(", ", col("sintomas"), col("outrosSintomas"))
        )
    )
).drop("outrosSintomas")

# ... (Seções 5, 6, 7 e 8 - mantidas) ...

# =======================================================================
# 5. SUBSTITUIR MUNICIPIO POR BAIRRO ALEATÓRIO (COM REPOSIÇÃO)
# =======================================================================
df_sao_paulo = df_sintomas_unificados.filter(col('municipio') == 'SAO PAULO')
n_bairros = len(lista_bairros)
df_sao_paulo = df_sao_paulo.withColumn("idx_bairro", floor(rand(seed=42) * n_bairros))
mapping_expr = when(lit(1) == 0, lit(None))
for i, bairro in enumerate(lista_bairros):
    mapping_expr = mapping_expr.when(col("idx_bairro") == i, lit(bairro))
df_com_bairro = df_sao_paulo.withColumn("bairro", mapping_expr).drop("municipio", "idx_bairro")

# =======================================================================
# 6. GERAÇÃO DE ID
# =======================================================================
window_spec = Window.orderBy(monotonically_increasing_id())
df_com_id = df_com_bairro.withColumn("id", row_number().over(window_spec))

# =======================================================================
# 7. EXPLOSÃO E LIMPEZA FINAL DOS SINTOMAS
# =======================================================================
df_sintomas_separados = df_com_id.withColumn(
    "sintomas", regexp_replace(col("sintomas"), " E ", ", ")
).withColumn(
    "sintomas", regexp_replace(col("sintomas"), "/", ", ")
).withColumn(
    "sintomas", regexp_replace(col("sintomas"), "\\+", ", ")
).withColumn(
    "sintomas", regexp_replace(col("sintomas"), ",,", ",")
).withColumn(
    "sintomas", regexp_replace(col("sintomas"), "\\.$", "")
)

df_explodido = df_sintomas_separados.withColumn(
    "sintoma_individual", explode(split(col("sintomas"), ", "))
)

df_normalizado = df_explodido.withColumn(
    "sintoma_individual", trim(col("sintoma_individual"))
).filter(col("sintoma_individual") != '')

df_normalizado = df_normalizado.filter(~col("sintoma_individual").rlike("[0-9]"))
df_normalizado = df_normalizado.withColumn(
    "sintoma_individual",
    regexp_replace(col("sintoma_individual"), "´", "")
)

df_normalizado = df_normalizado.withColumn(
    "sintoma_individual",
    when(col("sintoma_individual") == "Outros", "").otherwise(col("sintoma_individual"))
)

df_normalizado = df_normalizado.filter(col("sintoma_individual") != "")

# =======================================================================
# 8. SELEÇÃO FINAL
# =======================================================================
df_final_output = df_normalizado.select(
    "id",
    "idade",
    "sexo",
    upper(col("sintoma_individual")).alias("sintoma_individual"),
    "bairro",
    date_format(col("DATA_NOTIFICACAO_TRATADA"), "yyyy-MM-dd").alias("DATA_NOTIFICACAO_TRATADA")
)

print("\n--- AMOSTRA FINAL ---")
df_final_output.show(n=5, truncate=False)

# =======================================================================
# 9. ANÁLISE DE FREQUÊNCIA
# =======================================================================
df_contagem_sintomas = df_final_output.groupBy("sintoma_individual").count().sort(col("count").desc())
print("\n--- CONTAGEM DE SINTOMAS (TOP 10) ---")
df_contagem_sintomas.show(n=10, truncate=False)

# =======================================================================
# 10. SALVANDO E RENOMEANDO O RESULTADO NO S3
# (Mantendo a lógica robusta de stage/rename)
# =======================================================================

# 1. Escreve o resultado no caminho temporário
print(f"\nEscrevendo dados temporariamente em: {TEMP_STAGING_DIR}")
df_final_output.coalesce(1).write \
    .option("header", "true") \
    .option("delimiter", ";") \
    .option("encoding", "UTF-8") \
    .mode("overwrite") \
    .csv(TEMP_STAGING_DIR)

# 2. Renomeia o arquivo gerado
try:
    Path = spark._jvm.org.apache.hadoop.fs.Path
    hadoop_conf = spark._jsc.hadoopConfiguration()
    fs = Path(TEMP_STAGING_DIR).getFileSystem(hadoop_conf)

    list_status = fs.globStatus(Path(TEMP_STAGING_DIR + "/part-00000-*.csv"))

    if list_status:
        generated_file_path = list_status[0].getPath()
        final_output_path = Path(FINAL_OUTPUT_PATH)
        fs.rename(generated_file_path, final_output_path)
        fs.delete(Path(TEMP_STAGING_DIR), True) 
        
        print(f"\nDados salvos e renomeados com sucesso para: {final_output_path}")

    else:
        print("\nErro: Não foi possível encontrar o arquivo CSV gerado (part-00000-*.csv) no caminho temporário.")

except Exception as e:
    print(f"\nOcorreu um erro durante a renomeação do arquivo no S3: {e}")

spark.stop()

:: loading settings :: url = jar:file:/usr/local/lib/python3.7/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-e20cf170-a4d7-4675-94cd-71b7b0bec05c;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 454ms :: artifacts dl 19ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.4 from central in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
	:: evicted modules:
	com.amazonaws#aws-java-sdk-bundle;1.11.901 by [com.amazonaws#aws-java-sdk-bundle;1.12.262] in [default]
	---------------------------------------------------------------------
	|     

Verificando e excluindo o arquivo final anterior em: s3a://bucket-trusted-upa-connect-sofh/BasesExternas/SindromeGripal/sindromeGripalTratada.csv


25/10/29 00:51:31 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


Arquivo anterior sindromeGripalTratada.csv excluído com sucesso.


                                                                                


>>> INSPECIONANDO COLUNA 'dataNotificacao' BRUTA (Primeiras 10 linhas) <<<


                                                                                

+---------------+
|dataNotificacao|
+---------------+
|2022-10-05     |
|2021-11-03     |
|2024-01-19     |
|2023-05-01     |
|2024-07-14     |
|2023-05-18     |
|2024-08-20     |
|2024-10-24     |
|2024-09-15     |
|2020-04-13     |
+---------------+


>>> CONTAGEM de Linhas após filtro de Data e Idade (Verificação) <<<


                                                                                

Linhas restantes: 1658105

--- AMOSTRA FINAL ---


25/10/29 00:53:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/10/29 00:53:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/10/29 00:53:30 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/10/29 00:55:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/10/29 00:55:06 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

+---+-----+---------+------------------+----------+------------------------+
|id |idade|sexo     |sintoma_individual|bairro    |DATA_NOTIFICACAO_TRATADA|
+---+-----+---------+------------------+----------+------------------------+
|1  |34   |Masculino|ASSINTOMATICO     |ITAQUERA  |2022-01-05              |
|2  |19   |Masculino|DOR DE CABECA     |ARICANDUVA|2022-11-24              |
|2  |19   |Masculino|FEBRE             |ARICANDUVA|2022-11-24              |
|3  |5    |Masculino|TOSSE             |BUTANTA   |2022-03-02              |
|3  |5    |Masculino|CORIZA            |BUTANTA   |2022-03-02              |
+---+-----+---------+------------------+----------+------------------------+
only showing top 5 rows


--- CONTAGEM DE SINTOMAS (TOP 10) ---


                                                                                

+---------------------+------+
|sintoma_individual   |count |
+---------------------+------+
|TOSSE                |205512|
|CORIZA               |166967|
|DOR DE CABECA        |145433|
|FEBRE                |141093|
|DOR DE GARGANTA      |95849 |
|ASSINTOMATICO        |39123 |
|DISPNEIA             |32586 |
|MIALGIA              |12917 |
|DOR NO CORPO         |12006 |
|DISTURBIOS GUSTATIVOS|11369 |
+---------------------+------+
only showing top 10 rows


Escrevendo dados temporariamente em: s3a://bucket-trusted-upa-connect-sofh/BasesExternas/SindromeGripal/_temp_staging


25/10/29 00:56:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/10/29 00:56:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/10/29 00:56:31 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/10/29 00:58:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/10/29 00:58:05 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/10/29 00:58:05 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
25/10/29 00:58:06 WARN AbstractS3AC


Dados salvos e renomeados com sucesso para: s3a://bucket-trusted-upa-connect-sofh/BasesExternas/SindromeGripal/sindromeGripalTratada.csv


In [2]:
# # =======================================================================
# # 10. SALVANDO E RENOMEANDO O RESULTADO NO S3
# # =======================================================================

# # Define o caminho de saída definitivo no S3
# FINAL_OUTPUT_DIR = "s3a://bucket-trusted-upa-connect-sofh/BasesExternas/SindromeGripal"

# # Define o nome do arquivo final
# FINAL_FILENAME = "sindromeGripalTratada.csv"

# # Define o caminho temporário (subdiretório dentro da pasta principal)
# TEMP_STAGING_DIR = f"{FINAL_OUTPUT_DIR}/_temp_staging" 

# # 1. Escreve o resultado no caminho temporário
# print(f"Escrevendo dados temporariamente em: {TEMP_STAGING_DIR}")
# df_final.coalesce(1).write \
#     .option("header", "true") \
#     .option("delimiter", ";") \
#     .mode("overwrite") \
#     .csv(TEMP_STAGING_DIR)

# # 2. Renomeia o arquivo gerado
# try:
#     # Acessa a classe 'Path' da JVM através do gateway do Spark
#     Path = spark._jvm.org.apache.hadoop.fs.Path
    
#     # Acessa a configuração do Hadoop
#     hadoop_conf = spark._jsc.hadoopConfiguration()
    
#     # Obtém o objeto FileSystem para o caminho temporário
#     # Agora, Path() cria uma instância do objeto Java corretamente
#     fs = Path(TEMP_STAGING_DIR).getFileSystem(hadoop_conf)

#     # Encontra o arquivo gerado (part-00000-*.csv) dentro do diretório temporário
#     # O globStatus retorna a lista de status de arquivos que correspondem ao padrão.
#     list_status = fs.globStatus(Path(TEMP_STAGING_DIR + "/part-00000-*.csv"))

#     if list_status:
#         # Pega o caminho completo do arquivo gerado
#         generated_file_path = list_status[0].getPath()

#         # Define o caminho final e o nome específico para o arquivo
#         final_output_path = Path(f"{FINAL_OUTPUT_DIR}/{FINAL_FILENAME}")

#         # Renomeia (move) o arquivo para o caminho e nome definitivos
#         # Esta operação move o arquivo dentro do S3 e o renomeia.
#         fs.rename(generated_file_path, final_output_path)
        
#         # 3. Deleta o diretório temporário (que ficou vazio) e outros arquivos de metadados
#         fs.delete(Path(TEMP_STAGING_DIR), True) 
        
#         print(f"\nDados salvos e renomeados com sucesso para: {final_output_path}")

#     else:
#         print("\nErro: Não foi possível encontrar o arquivo CSV gerado (part-00000-*.csv) no caminho temporário.")

# except Exception as e:
#     print(f"\nOcorreu um erro durante a renomeação do arquivo no S3: {e}")