In [None]:
archive_name  = ""
raw_bucket    = ""
output_bucket = ""

In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, lit, coalesce, concat, to_timestamp, when,
    last, array, sort_array, size, element_at, expr, collect_list, date_format
)
from pyspark.sql.window import Window
import sys

# ==============================================================================
# CONFIGURAÇÃO DO SPARK
# ==============================================================================
# Configurações para acesso ao S3 e pacotes do Hadoop AWS.
# A autenticação será feita via Instance Profile da EC2.
conf = SparkConf()
conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.11.901')
conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'com.amazonaws.auth.InstanceProfileCredentialsProvider')

spark = SparkSession.builder.config(conf=conf).appName("TratativaAtendimentoUPA").getOrCreate()

# ==============================================================================
# DEFINIÇÃO DE ARQUIVOS E CAMINHOS
# ==============================================================================
# s3_prefixo = 's3a://bucket-raw-upa-connect-sofh/arquivos/'
# s3_destino = 's3a://bucket-trusted-upa-connect-sofh/tabela_atendimento_tratada/'

# # Lista dos arquivos CSV a serem processados.
# arquivos_atendimento = [
#     'ATENDIMENTOS_SUJOS_2025-10-13.csv',
#     'ATENDIMENTOS_SUJOS_2025-10-12.csv',
#     'ATENDIMENTOS_SUJOS_2025-10-11.csv',
#     'ATENDIMENTOS_SUJOS_2025-10-10.csv',
#     'ATENDIMENTOS_SUJOS_2025-10-09.csv'
# ]

# ==============================================================================
# LEITURA E UNIFICAÇÃO DOS DADOS
# ==============================================================================
# Cria os caminhos completos para cada arquivo no S3.
caminhos_atendimento = f's3a://{raw_bucket}/{archive_name}'

# Lê os múltiplos arquivos CSV em um único DataFrame.
df_bruto = spark.read \
    .option('header', 'true') \
    .option('delimiter', ',') \
    .option('inferSchema', 'true') \
    .csv(caminhos_atendimento)

# ==============================================================================
# ETAPA 1: TRANSFORMAÇÕES INICIAIS (NOMES E COLUNAS)
# ==============================================================================
# 1.1: Renomeia todas as colunas para minúsculas.
df_renomeado = df_bruto.toDF(*[c.lower() for c in df_bruto.columns])

# 1.2: Renomeia a coluna 'fk_pessoa' para 'fk_paciente'.
df_renomeado = df_renomeado.withColumnRenamed("fk_pessoa", "fk_paciente")

# 1.3: CRIA/ATUALIZA A COLUNA 'chegou' para ser um TIMESTAMP completo,
# unindo 'data' com o primeiro horário disponível ('chegou' ou 'triagem_horario').
df_com_timestamp = df_renomeado.withColumn(
    "chegou",
    # A função coalesce retorna o primeiro valor não nulo da lista.
    coalesce(
        # TENTATIVA 1: Tenta montar o timestamp com a coluna original 'chegou'.
        # Se 'chegou' for nulo, esta expressão inteira se tornará nula.
        to_timestamp(
            concat(
                date_format(col("data"), "yyyy-MM-dd"), 
                lit(" "), 
                date_format(col("chegou"), "HH:mm:ss") # Extrai apenas a hora para segurança
            ),
            "yyyy-MM-dd HH:mm:ss"
        ),
        # TENTATIVA 2: Se a tentativa 1 falhou (retornou nulo), usa o horário da triagem.
        # Este é o valor de fallback.
        to_timestamp(
            concat(
                date_format(col("data"), "yyyy-MM-dd"), 
                lit(" "), 
                date_format(col("triagem_horario"), "HH:mm:ss") # Usa a hora da triagem
            ),
            "yyyy-MM-dd HH:mm:ss"
        )
    )
)

# 1.4: Corrige todas as colunas de horário para usar a data da coluna 'data'.
# Isso evita que o Spark atribua a data atual a colunas que contêm apenas horas.
# Foi usada a coluna 'data' para garantir que a data de todos os eventos seja a mesma
# dentro de um mesmo atendimento.
colunas_horario = ["triagem_horario", "sala_de_espera", "consultorio_horario", "saida"]
df_horarios_corrigidos = df_com_timestamp
for nome_coluna in colunas_horario:
    df_horarios_corrigidos = df_horarios_corrigidos.withColumn(
        nome_coluna,
        when(col(nome_coluna).isNotNull(),
            to_timestamp(
                concat(
                    date_format(col("data"), "yyyy-MM-dd"), # Pega a data correta.
                    lit(" "),
                    date_format(col(nome_coluna), "HH:mm:ss") # Pega APENAS o horário da coluna.
                ),
                "yyyy-MM-dd HH:mm:ss"
            )
        )
    )

# 1.5: Ordena o DataFrame pela coluna 'chegou' (agora com timestamp completo)
# para garantir a ordem correta dos eventos, essencial para as funções de janela.
df_ordenado = df_horarios_corrigidos.orderBy("chegou")


# ==============================================================================
# ETAPA 2: TRATATIVA DE VALORES INVÁLIDOS NA COLUNA 'fk_upa'
# ==============================================================================
# Define uma janela para buscar o último valor válido anterior, USANDO 'chegou'.
window_ffill = Window.orderBy("chegou").rowsBetween(Window.unboundedPreceding, 0)

# 2.1: Cria uma coluna temporária que é nula se 'fk_upa' estiver fora do intervalo [1, 34].
df_com_upa_valida = df_ordenado.withColumn(
    "upa_valida",
    when(col("fk_upa").between(1, 34), col("fk_upa"))
)

# 2.2: Usa a função 'last' para preencher os valores nulos com o último valor válido.
df_com_ultimo_upa = df_com_upa_valida.withColumn(
    "ultima_upa_valida",
    last("upa_valida", ignorenulls=True).over(window_ffill)
)

# 2.3: Atualiza a coluna 'fk_upa' original e remove as colunas temporárias.
df_upa_tratada = df_com_ultimo_upa.withColumn(
    "fk_upa",
    coalesce(col("upa_valida"), col("ultima_upa_valida"))
).drop("upa_valida", "ultima_upa_valida")

# ==============================================================================
# ETAPA 3: TRATATIVA DE OUTLIERS E NULOS (TEMPERATURA E OXIMETRIA)
# ==============================================================================
# Define a ordenação de janelas para usar a coluna 'chegou'.

# --- 3.1: Tratativa da Temperatura ---
# Identifica valores válidos de temperatura, marcando outliers e nulos como null.
df_temp_valida = df_upa_tratada.withColumn(
    "temp_valida",
    when((col("temperatura_paciente") >= 35) & (col("temperatura_paciente") <= 42), col("temperatura_paciente"))
)

# Coleta os últimos 3 valores válidos em um array (baseado na ordem de 'chegou').
df_com_array_temp = df_temp_valida.withColumn(
    "ultimas_3_temps",
    expr("slice(collect_list(temp_valida) OVER (ORDER BY chegou ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), -3, 3)")
)


# Calcula a mediana a partir do array, somente se o array não estiver vazio.
df_com_mediana_temp = df_com_array_temp.withColumn(
    "mediana_temp",
    when(size(col("ultimas_3_temps")) > 0,
        element_at(
            sort_array(col("ultimas_3_temps")),
            (size(col("ultimas_3_temps")) / 2 + 0.5).cast("int")
        )
    )
)

# Substitui os valores inválidos (outliers/nulos) pela mediana calculada.
df_temp_tratada = df_com_mediana_temp.withColumn(
    "temperatura_paciente",
    coalesce(col("temp_valida"), col("mediana_temp"))
)


# --- 3.2: Tratativa da Oximetria ---
# Identifica valores válidos de oximetria, marcando outliers e nulos como null.
df_oxi_valida = df_temp_tratada.withColumn(
    "oxi_valida",
    when((col("oximetria_paciente") >= 70) & (col("oximetria_paciente") <= 100), col("oximetria_paciente"))
)

# Coleta os últimos 3 valores válidos em um array (baseado na ordem de 'chegou').
df_com_array_oxi = df_oxi_valida.withColumn(
    "ultimas_3_oxis",
    expr("slice(collect_list(oxi_valida) OVER (ORDER BY chegou ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), -3, 3)")
)


# Calcula a mediana a partir do array, somente se o array não estiver vazio.
df_com_mediana_oxi = df_com_array_oxi.withColumn(
    "mediana_oxi",
    when(size(col("ultimas_3_oxis")) > 0,
        element_at(
            sort_array(col("ultimas_3_oxis")),
            (size(col("ultimas_3_oxis")) / 2 + 0.5).cast("int")
        )
    )
)

# Substitui os valores inválidos (outliers/nulos) pela mediana calculada.
df_final = df_com_mediana_oxi.withColumn(
    "oximetria_paciente",
    coalesce(col("oxi_valida"), col("mediana_oxi"))
)

# ==============================================================================
# ETAPA 4: LIMPEZA FINAL E SELEÇÃO DE COLUNAS
# ==============================================================================

# RENOVA O NOME DA COLUNA 'chegou' para 'data_hora'
df_renomeado_final = df_final.withColumnRenamed("chegou", "data_hora")

# Define a ordem final e as colunas desejadas, removendo colunas temporárias
# e a coluna 'data' (que foi combinada com 'chegou' no passo 1.3).
colunas_finais = [
    "data_hora", # AGORA É 'data_hora'
    "id_atendimento",
    "fk_paciente",
    "triagem_horario",
    "triagem_sala",
    "sala_de_espera",
    "consultorio_horario",
    "consultorio_sala",
    "saida",
    "temperatura_paciente",
    "oximetria_paciente",
    "fk_upa"
]

# A coluna 'data' é implicitamente removida por não estar em colunas_finais
# As colunas temporárias ultimas_3_temps, mediana_temp, etc. também são removidas.
tabela_unificada = df_renomeado_final.select(colunas_finais)


In [None]:
# FINAL_OUTPUT_DIR = "s3a://bucket-trusted-upa-connect-sofh/"
# FINAL_FILENAME = "tabela_atendimentos_tratada.csv"
# TEMP_STAGING_DIR = f"{FINAL_OUTPUT_DIR}/_temp_staging_integrated"

# # 1. Escreve o resultado no caminho temporário
# print(f"\nEscrevendo dados temporariamente em: {TEMP_STAGING_DIR}")

# # NOTA: Coalesce(1) para garantir a geração de um único arquivo CSV.
# tabela_unificada.coalesce(1).write \
#     .option('delimiter', ';') \
#     .option('header', 'true') \
#     .option('encoding', 'UTF-8') \
#     .mode('overwrite') \
#     .csv(TEMP_STAGING_DIR)

# # 2. Renomeia o arquivo gerado
# try:
#     # Acessa a classe 'Path' da JVM através do gateway do Spark
#     Path = spark._jvm.org.apache.hadoop.fs.Path
    
#     # Acessa a configuração do Hadoop
#     hadoop_conf = spark._jsc.hadoopConfiguration()
    
#     # Obtém o objeto FileSystem para o caminho temporário
#     fs = Path(TEMP_STAGING_DIR).getFileSystem(hadoop_conf)

#     # Encontra o arquivo gerado (part-00000-*.csv) dentro do diretório temporário
#     list_status = fs.globStatus(Path(TEMP_STAGING_DIR + "/part-00000-*.csv"))

#     if list_status:
#         # Pega o caminho completo do arquivo gerado
#         generated_file_path = list_status[0].getPath()

#         # Define o caminho final e o nome específico para o arquivo
#         final_output_path = Path(f"{FINAL_OUTPUT_DIR}/{FINAL_FILENAME}")

#         # Renomeia (move) o arquivo para o caminho e nome definitivos
#         fs.rename(generated_file_path, final_output_path)
        
#         # 3. Deleta o diretório temporário (que ficou vazio) e outros arquivos de metadados
#         fs.delete(Path(TEMP_STAGING_DIR), True) 
        
#         print(f"\n✅ Base integrada salva e renomeada com sucesso para: {final_output_path}")

#     else:
#         print("\nErro: Não foi possível encontrar o arquivo CSV gerado (part-00000-*.csv) no caminho temporário.")

# except Exception as e:
#     print(f"\nOcorreu um erro durante a renomeação do arquivo no S3: {e}")

# # Encerra a sessão Spark
# spark.stop()

In [None]:
%pip install boto3

import boto3
from io import StringIO, BytesIO
from botocore.exceptions import ClientError
import pandas as pd

FINAL_OUTPUT_DIR = f"s3a://{output_bucket}/"
FINAL_FILENAME = "tabela_atendimentos_tratada.csv"
FINAL_FILE_PATH = f"{FINAL_OUTPUT_DIR}{FINAL_FILENAME}"

s3 = boto3.client("s3")

try:
    # 🧾 Verifica se o arquivo existe
    s3.head_object(Bucket=output_bucket, Key=FINAL_FILENAME)

    print("📥 Arquivo existente encontrado. Lendo do S3...")

    # ⚙️ Lê o CSV existente direto do S3 (Spark)
    df_existente = (
        spark.read
        .option('header', 'true')
        .option('delimiter', ',')
        .csv(f"s3a://{output_bucket}/{FINAL_FILENAME}")
    )

    # 🔗 Concatena (Spark -> Pandas)
    df_existente_pd = df_existente.toPandas()
    sensor_pd = tabela_unificada.toPandas()  # sensor_df também é Spark, então converte
    df_final = pd.concat([df_existente_pd, sensor_pd], ignore_index=True)

    print("✅ Arquivo existente atualizado com novos dados.")

except ClientError as e:
    if e.response["Error"]["Code"] == "404":
        print("🚫 Arquivo não encontrado. Criando novo arquivo no bucket.")
        df_final = tabela_unificada.toPandas()
    else:
        raise

# 📤 Salva no S3 via boto3
csv_buffer = StringIO()
df_final.to_csv(csv_buffer, index=False)

s3.put_object(
    Bucket=output_bucket,
    Key=FINAL_FILENAME,
    Body=csv_buffer.getvalue()
)

print(f"✅ Arquivo '{FINAL_FILENAME}' salvo com sucesso no bucket '{output_bucket}'.")
spark.stop()