In [None]:
%region us-east-1
%iam_role # setar 
%idle_timeout 30
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

#%additional_python_modules s3://fiocruz-datalake-bucket/raw/aesop/dados_auxiliares/python_utils/teste.py
%extra_py_files s3://fiocruz-datalake-bucket/raw/aesop/dados_auxiliares/python_utils/util_custom.py
%additional_python_modules s3://fiocruz-datalake-bucket/raw/aesop/dados_auxiliares/epiweeks-2.3.0-py3-none-any.whl


import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

In [None]:
from util_custom import *
from pyspark.sql import functions as F
from pyspark.sql.types import *
import os
from datetime import datetime, timedelta
from pyspark.sql.window import Window
from epiweeks import Week

import boto3
from botocore.exceptions import ClientError

def create_folder_in_specific_location(bucket_name, full_folder_path):
    # Adiciona uma barra ao final do caminho da pasta, se não houver
    if not full_folder_path.endswith('/'):
        full_folder_path += '/'
    
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(bucket_name)
    
    # Verifica se a pasta já existe
    objs = list(bucket.objects.filter(Prefix=full_folder_path))
    if any(obj.key == full_folder_path for obj in objs):
        print(f"A pasta '{full_folder_path}' já existe no bucket '{bucket_name}'.")
    else:
        # Tenta criar a pasta (objeto S3 com chave terminando em '/')
        try:
            s3.Object(bucket_name, full_folder_path).put(Body=b'')
            print(f"Pasta '{full_folder_path}' criada com sucesso no bucket '{bucket_name}'.")
        except ClientError as e:
            print(f"Erro ao criar a pasta: {e}")


def list_all_files_in_folders_s3(bucket_name, prefix):
    """
    Lista todos os arquivos dentro de todas as subpastas de um prefixo específico em um bucket S3.

    :param bucket_name: Nome do bucket S3.
    :param prefix: Prefixo dentro do bucket onde as subpastas estão localizadas.
    :return: Uma lista de nomes de arquivos dentro de todas as subpastas sob o prefixo especificado.
    """
    s3 = boto3.client('s3')
    all_files = []

    # Garante que o prefixo termine com uma barra, se já não terminar.
    if not prefix.endswith('/'):
        prefix += '/'

    # Lista todas as 'subpastas' dentro do prefixo.
    response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix, Delimiter='/')

    subfolders = []
    if 'CommonPrefixes' in response:
        for folder in response['CommonPrefixes']:
            subfolders.append(folder['Prefix'])

    # Para cada subpasta, lista todos os arquivos dentro dela.
    for folder_name in subfolders:
        resp = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_name)
        if 'Contents' in resp:
            for file in resp['Contents']:
                # Adiciona o nome do arquivo à lista
                all_files.append(file['Key'])
    data_max = sorted(all_files)[-1][-12:-4]
    return data_max




# Exemplo de uso
bucket_name = 'fiocruz-datalake-bucket'


#### leitura banco HPC

In [None]:
#aesop_2017_2024 = spark.read.parquet(aesop_hpc_path+get_hpc_parquet_file_name())
#aesop_hpc_path+get_hpc_parquet_file_name()
# aesop_2017_2024 = spark.read.parquet('/dados10t/datalake/standard/aesop/aesop_hpc/2017_20240107_AESOP.parquet/')

aesop_2017_2024 = spark.read.parquet("s3://fiocruz-datalake-bucket/standard/output_hpc/2017_20240407_AESOP.parquet")

In [None]:
max_ano = aesop_2017_2024.agg(F.max("ano")).collect()[0][0]
semana_folder = aesop_2017_2024.filter(F.col("ano") == max_ano).agg(F.max("epiweek")).collect()[0][0]
semana_folder

In [None]:
aesop_report_folder  = "standard/reports/report_dqi/"

In [None]:
#caminho_pasta = aesop_report_folder+"semana_"+str(semana_folder)+"_"+str(max_ano)
#caminho_pasta
#create_folder_in_specific_location(bucket_name,caminho_pasta)

#### leitura banco raw parquet

In [None]:
#df = spark.read.parquet(aesop_parquet_path+get_parquet_file_name())
#aesop_parquet_path+get_parquet_file_name()
# df = spark.read.parquet('/dados10t/datalake/raw/aesop/parquet_explorer/aesop_dados_2017_20240107.parquet/')
df = spark.read.parquet("s3://fiocruz-datalake-bucket/raw/aesop/dados-ms-parquet/aesop_dados_2017_20240407.parquet/")

In [None]:
df = df.filter((F.col("FX_ETARIA")).isNotNull())
df_v2 = df.select("ANO","SEMANA","SG_UF","CO_MUNICIPIO_IBGE","SEXO","FX_ETARIA","CIDCIAP","QT","Data_folder")

In [None]:
#df_v2.filter(F.col("ANO") == 2023).show(5000,False)

#### converter cid_ciap

In [None]:
df_v3 = df_v2\
.withColumn("Tipo_CIDCIAP", F.substring_index(df_v2.CIDCIAP, '(', 1))\
.withColumn("Codigo_CIDCIAP", F.substring_index(df_v2.CIDCIAP, '(', -1))

df_v3 = df_v3.withColumn('Codigo_CIDCIAP', F.concat(F.lit("("), F.col('Codigo_CIDCIAP')))

# CIAP(R05) -> CIAP|(R05)|

In [None]:
""" sisab_diff.show()
+----+------+-----+-----------------+-----------+--------+
| ANO|SEMANA|SG_UF|CO_MUNICIPIO_IBGE|Data_folder|qt_total|
+----+------+-----+-----------------+-----------+--------+
|2023|    32|   RS|           431490| 2023-12-31|       7|
|2023|    37|   PR|           411850| 2023-12-31|      18|
"""
# OBS: max semana folder passa da epiweek

sisab_diff = df_v3.groupBy("ANO", "SEMANA", "SG_UF", "CO_MUNICIPIO_IBGE", "Data_folder").agg(F.sum("QT").alias("qt_total")).cache()

In [None]:
# sisab_diff.write.parquet(caminho_resultado+"sisabdiff_dqi_"+get_latest_date_aesop()+".parquet")
#caminho_resultado+"sisabdiff_dqi_"+get_latest_date_aesop()+".parquet"

#### gerar dados report dqi

In [None]:
ibge_munic = spark.read.csv('s3://fiocruz-datalake-bucket/raw/aesop/dados_auxiliares/ibge_municipios.csv', header=True, sep=';').drop('_c0')

""" semanas_epis.show()
+------+----+-------+-----------------+--------------------+--------+----------+
|SEMANA| ANO|uf_ibge|CO_MUNICIPIO_IBGE|           municipio|pop_2010|     porte|
+------+----+-------+-----------------+--------------------+--------+----------+
|    23|2024|     BA|           290590|Campo Alegre de L...|   28090|Pequeno II|
|    24|2017|     BA|           290590|Campo Alegre de L...|   28090|Pequeno II|
"""

#semanas_epis = spark.read.parquet("s3://fiocruz-datalake-bucket/raw/aesop/dados_auxiliares/epiweek_ibge2017_20241231.parquet/")
semanas_epis = spark.read.csv("s3://fiocruz-datalake-bucket/raw/aesop/dados_auxiliares/epiweek_ibge2017_20241231.csv",header=True)
semanas_epis = semanas_epis.withColumnRenamed("semana_ibge", "SEMANA")
semanas_epis = semanas_epis.withColumnRenamed("ano_ibge", "ANO")
semanas_epis = semanas_epis.withColumnRenamed("co_ibge", "CO_MUNICIPIO_IBGE")

In [None]:
#semanas_epis.count() #2328260

In [None]:
#semanas_epis.select("CO_MUNICIPIO_IBGE").distinct().count()

In [None]:
""" sisab_diff.show() max dt_encounter 2023
+----+------+-----+-----------------+-----------+--------+-----------------+------------+
| ANO|SEMANA|SG_UF|CO_MUNICIPIO_IBGE|Data_folder|qt_total|first_day_of_year|dt_encounter|
+----+------+-----+-----------------+-----------+--------+-----------------+------------+
|2023|    52|   RS|           430830| 2023-12-27|      28|       2023-01-01|  2023-12-30|
|2023|    52|   BA|           292840| 2023-12-29|      86|       2023-01-01|  2023-12-30|
"""

# Adicionando uma nova coluna com a data do primeiro dia do ano
sisab_diff = sisab_diff.withColumn("first_day_of_year", F.expr("make_date(ano, 1, 1)"))

# Calculando de forma estimada a data do atendimento a partir da variável semana usando a formula : 01/01/ano + numero de dias da formula:  (semana * 7) -1
sisab_diff = sisab_diff.withColumn("dt_encounter", F.expr("date_add(first_day_of_year, semana * 7 - 1)"))

# TODO run this only for 2024
sisab_diff = sisab_diff\
.withColumn("dt_encounter", F.when(F.col('ANO') == 2024, F.expr("date_add(first_day_of_year, semana * 7 - 2)")).otherwise(F.expr("date_add(first_day_of_year, semana * 7 - 1)")))

In [None]:
#dt_folder é criado a partir da variável data_folder, é um arredondamento dessa data para o proximo sabado usando a formula , data_folde + n dias(proximo domingo - 1)
sisab_diff = sisab_diff.withColumn("dt_folder", F.expr("date_add(next_day(data_folder, 'Sun'), -1)")) 

#diferença em dias de dt_folder para dt_encounter
sisab_diff = sisab_diff.withColumn("date_diff", F.datediff("dt_folder","dt_encounter") ) 

#diff de semanas entre atendimento estimado e data do envio
sisab_diff = sisab_diff.withColumn("week_diff", F.col("date_diff") / 7 )  

In [None]:
#sisab_diff.filter(F.col("ANO") == 2019).filter(F.col("SEMANA") == 52).show(100)

#### calcular filtros de 8 semanas

In [None]:
#semana_folder

In [None]:
semana_filter = semana_folder-7
ano_filter = max_ano
if semana_filter < 1:
    ano_filter = ano_filter-1
    semana_filter = 52+semana_filter
    
print(semana_filter)
print(ano_filter)

In [None]:
semana_folder

In [None]:
#sisab_diff.filter(F.col('ANO') == ano_filter).filter(F.col('SEMANA') == semana_filter).select("dt_encounter").distinct().count() #aqui tem somente 1 data

In [None]:
filter_date = sisab_diff.filter(F.col('ANO') == ano_filter)\
.filter(F.col('SEMANA') == semana_filter)\
.select(F.max(F.col('dt_encounter')) ).collect()[0][0]
print(filter_date) #2024-01-20

In [None]:
sisab_diff.filter(sisab_diff['dt_encounter'] >= filter_date).select("SEMANA").distinct().count()

In [None]:
#completude.printSchema()

#### completude

In [None]:
# Assuming `df` is your Spark DataFrame equivalent to `sisab_diff`
# and `week_filter` is already defined

#week_filter = "2023-01-07"

# Step 1: Filter based on `dt_folder`
completude = sisab_diff.filter(sisab_diff['dt_encounter'] >= filter_date)

# Step 2: Select specific columns and remove others
#tempestividade = tempestividade.drop('Data_folder', 'ANO', 'SEMANA', 'SG_UF')
completude = completude\
.drop('Data_folder', 'ANO', 'SEMANA', 'SG_UF', 'first_day_of_year', 'date_diff')\
.withColumnRenamed('CO_MUNICIPIO_IBGE', 'co_ibge')

# complete command
ibge = completude.select('co_ibge').distinct()
dt_encounter = completude.select('dt_encounter').distinct()
ibge_dtencounter = ibge.crossJoin(dt_encounter)
# complete join back
completude = ibge_dtencounter.join(completude, ['co_ibge', 'dt_encounter'], 'left')

"""
completude.show()
+-------+------------+----+
|co_ibge|dt_encounter|   n|
+-------+------------+----+
| 150375|  2023-11-25| 351|
| 150375|  2023-11-18| 336|
"""

# count 
completude = completude.groupBy('co_ibge', 'dt_encounter')\
.agg(F.sum('qt_total').alias('n'))\
.na.fill({'n': 0})

In [None]:
semana_folder

#### completude porcentagem

In [None]:
ibge_munic.printSchema()

In [None]:
# slider
ws = Window.partitionBy("co_ibge").orderBy("dt_encounter").rowsBetween(-7, Window.currentRow)

# UDF for epiweek and epiyear
epiweekUDF = F.udf(lambda z:  Week.fromdate(z).week, IntegerType())
epiyearUDF = F.udf(lambda z:  Week.fromdate(z).year, IntegerType())

# slider as window function
completude_missings = completude\
.withColumn("empty", F.when(F.col('n')==0, 1).otherwise(0))\
.withColumn("window_size", F.count("*").over(ws))\
.withColumn("start", F.min("dt_encounter").over(ws))\
.withColumn("end", F.max("dt_encounter").over(ws))\
.withColumn("no_missings", F.sum("empty").over(ws))\
.filter(F.col('window_size')==8)\
.withColumn("epi_week", epiweekUDF(F.col('dt_encounter')))\
.withColumn("epi_year", epiyearUDF(F.col('dt_encounter')))\
.drop('window_size', 'empty', 'n', 'dt_encounter')\
.filter(F.col('epi_week')==semana_folder)

# add missing cities 
completude_miss_ibge = ibge_munic.join(completude_missings, ['co_ibge'], 'full')

# perc completude
"""
+-------+---+--------------------+--------+----------+----------+----------+-----------+--------+--------+------------+---------------+
|co_ibge| uf|           municipio|pop_2010|     porte|     start|       end|no_missings|epi_week|epi_year|perc_missing|perc_completude|
+-------+---+--------------------+--------+----------+----------+----------+-----------+--------+--------+------------+---------------+
| 110001| RO|Alta Floresta D´o...|   24392|Pequeno II|2023-11-11|2023-12-30|          0|      52|    2023|         0.0|          100.0|
| 110002| RO|           Ariquemes|   90353|     Médio|2023-11-11|2023-12-30|          1|      52|    2023|        12.5|           87.5|
"""

# calculate percentages
comp_percentage = completude_miss_ibge.na.fill({'no_missings': 8})\
.withColumn('perc_missing',  F.col('no_missings')/8*100)\
.withColumn('perc_completude',  100-F.col('perc_missing'))




#### tempestividade 

In [None]:
# Step 1: Filter based on `dt_folder`
tempestividade = sisab_diff.filter( F.col("dt_folder")  >= filter_date)

# Step 2:dropa algumas colunas
tempestividade = tempestividade.drop('Data_folder', 'ANO', 'SEMANA', 'SG_UF')

# faz o complete dos dados levando em conta todas as possibilidades de CO_MUNICIPIO_IBGE x dt_folder
co_ibge_df = tempestividade.select("CO_MUNICIPIO_IBGE").distinct()
dt_folder_df = tempestividade.select("dt_folder").distinct()
all_combinations_df = co_ibge_df.crossJoin(dt_folder_df)

# faz o join para o lado completo criando anteriormente
tempestividade2 = all_combinations_df.join(tempestividade, ["CO_MUNICIPIO_IBGE", "dt_folder"], "left_outer")

#conta os atendimento para as 8 semanas de dados
tempestividade3_p1 = tempestividade2.groupBy("CO_MUNICIPIO_IBGE").agg(F.sum("qt_total").alias("total_atd_folder"))

#faz categorias  a partir da diff em semanas entre atendimento e envio para as ultimas 8 semanas. se ate 2 semanas de diff,  0_2weeks , se maior que duas semanas, 3+weeks,soma-se o total de atendimento para cada categoria criada.
tempestividade3_p2 = tempestividade2.withColumn("diff", F.when(  F.col("week_diff") <= 2, "0_2weeks").otherwise(
                                                        F.when(   F.col("week_diff") > 2, "3+weeks"  )    ) ) .groupBy("CO_MUNICIPIO_IBGE","diff").agg( F.sum("qt_total").alias("qt_diff") )

#junta o atendimento total das 8 semanas com os atendimentos das 8 semanas categorizado entre 0_2weeks e   3+weeks
tempestividade3 = tempestividade3_p1.join(tempestividade3_p2 , ['CO_MUNICIPIO_IBGE'])
tempestividade3 = tempestividade3.withColumn("diff_percent", F.col("qt_diff") / F.col("total_atd_folder") * 100)
tempestividade3 = tempestividade3.withColumn("diff_percent2",F.round("diff_percent",5))

#pivota os dados para as variáveis de porcetagem 0_2weeks e 3+weeks irem para colunas
tempestividade4  = tempestividade3.groupBy("CO_MUNICIPIO_IBGE").pivot("diff").agg(F.first("diff_percent2"))

""" tempestividade5
+--------+--------+-------+
| diff_2w| diff_3w|co_ibge|
+--------+--------+-------+
|99.83987| 0.16013| 150300|
|54.82866|45.17134| 150375|
|   100.0|    null| 172015|
"""

tempestividade5 = tempestividade4.withColumnRenamed("0_2weeks","diff_2w")
tempestividade5 = tempestividade5.withColumnRenamed("3+weeks","diff_3w")
tempestividade5 = tempestividade5.withColumn("co_ibge",F.col("CO_MUNICIPIO_IBGE").cast(StringType() ) ).drop("CO_MUNICIPIO_IBGE","diff_2w_new","diff_3w_new","null").cache()

#### Atendimentos (atd) nas últimas semanas
TODO: verificar filtro ano

In [None]:
#agrupar os dados de "ANO","SEMANA","SG_UF","CO_MUNICIPIO_IBGE" somando o qt_total
df_agg = sisab_diff.groupBy("ANO","SEMANA","SG_UF","CO_MUNICIPIO_IBGE").agg(F.sum("qt_total").alias("qt_total")) 

#junta com os dados das semanas epidemiologicas completas, filtrando em seguida para dados ate 2023 e que a semana sejam somente as duas últimas, ou seja menor que a semana folder(semana atual) e maior que a semana folder - 1, ex: semana 47 seria de 46 a 47
df_agg_v2 = semanas_epis.join(df_agg , ["ANO","SEMANA","CO_MUNICIPIO_IBGE"] , 'left')
df_agg_v3 = df_agg_v2.filter(F.col("ANO") == 2024)
df_agg_v4 = df_agg_v3.filter(F.col("SEMANA") <= semana_folder).filter(F.col("SEMANA") >= semana_folder-1)  # capturar somente ate a semana atual , e somente maiores que a semana atual - 1 

#para criar a variável sum_miss que tem 3 categorias, 0(com atendimentos nas 2 semanas) , 1 (com atendimento em somente 1 das duas semanas ) e 2 (sem atendimetnos nas duas semanas)
windowSpec = Window.partitionBy("CO_MUNICIPIO_IBGE")

df_agg_v5 = df_agg_v4.withColumn("count_empty", F.sum(F.when(F.col("qt_total").isNull(), 1).otherwise(0)).over(windowSpec))
df_agg_v6 = df_agg_v5.select("CO_MUNICIPIO_IBGE","SEMANA","ANO","count_empty").groupBy("CO_MUNICIPIO_IBGE").agg(F.first("count_empty").alias("sum_miss"))
df_agg_v7 = df_agg_v6.withColumn("co_ibge",F.col("CO_MUNICIPIO_IBGE").cast(IntegerType()).cast(StringType()) ).drop("CO_MUNICIPIO_IBGE")

dqi = df_agg_v7.join(tempestividade5,['co_ibge'],'left')

In [None]:
dqi_v2 = dqi.join(comp_percentage,['co_ibge'],'left')

dqi_v2 = dqi_v2.withColumn("tempestividade",F.when(F.col("diff_2w") >=  75,  0).otherwise(1)  )
dqi_v2 = dqi_v2.withColumn("completude",F.when(F.col("perc_completude") >=  85, 0).otherwise(1)  )

cond1 = F.col("completude") == 0
cond2 = F.col("tempestividade") == 0
cond3 = F.col("sum_miss") == 0

dqi_v2 = dqi_v2.withColumn("dqi",F.when( cond1 & cond2 & cond3, "Apto").otherwise("Não Apto")  )

In [None]:
""" dqi_v3.show()
+---+-------+-------------------+--------+--------+------------+---------------+--------+--------+--------+----------+--------------+--------+
| uf|co_ibge|          municipio|epi_week|epi_year|perc_missing|perc_completude| diff_2w| diff_3w|sum_miss|completude|tempestividade|     dqi|
+---+-------+-------------------+--------+--------+------------+---------------+--------+--------+--------+----------+--------------+--------+
| PA| 150200| Cachoeira do Arari|      52|    2023|        12.5|           87.5|32.79879|67.20121|       1|         0|             1|Não Apto|
| PI| 220290|           Corrente|      52|    2023|         0.0|          100.0| 91.6261|  8.3739|       0|         0|             0|    Apto|
"""

dqi_v3 = dqi_v2.select("uf", "co_ibge", "municipio", "epi_week", "epi_year", "perc_missing", "perc_completude", "diff_2w", "diff_3w", "sum_miss", "completude", "tempestividade", "dqi")
dqi_v3 = dqi_v3.na.fill({'diff_2w': 0, 'diff_3w': 0})

In [None]:
caminho_pasta

In [None]:
caminho_pasta

In [None]:
path_salvar = "s3://fiocruz-datalake-bucket/"+caminho_pasta
path_salvar

In [None]:
#path_salvar_dqi = aesop_dqi_report_folder+'semana_'+str(semana_folder)+"/dqi.csv"


dqi_v3.toPandas().to_csv(path_salvar+"/dqi.csv",index=False)
path_salvar

In [None]:
%stop_session