####  Run this cell to set up and start your interactive session.


In [None]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

In [None]:
from pyspark.sql.functions import *
from functools import reduce

In [2]:
def read_s3_file(bucket_name:str, file_path:str)-> DataFrame:
  """
  Function used to read files from a s3 bucket
  args:
    bucket_name: Name of the s3 bucket
    file_path: Path of the file to be read
  returns:
    DataFrame with the file content
  """
  return spark.read.parquet(f"s3a://{bucket_name}/{file_path}/")




In [3]:
silver_bucket_name = 'project-pnad-covid-ibge-silver-layer'
gold_bucket_name = 'project-pnad-covid-ibge-gold-layer'
path_silver_df = 'pnad_covid_datasets_cleaned_transformed_unified'
path_gold_df = 'pnad_covid_datasets_aggregated'
catalogDatabase = 'big_data_project'
catalogTableName = 'curated_pnad_covid_datasets'




In [4]:
silver_df = spark.read.parquet(f"s3a://{silver_bucket_name}/{path_silver_df}/")




In [5]:
silver_agregated = (silver_df
                     .groupBy('month', 
                              'unidade_federacao', 
                              'capital_uf', 
                             )
                    .pivot("question_description")
                    .agg(collect_list('question_answer').alias('question_answer'))
                )




In [25]:
cols_to_considered = [
                        'teve_febre',
                        'teve_tosse',
                        'teve_dor_no_peito',
                        'teve_dor_de_garganta',
                        'teve_dor_de_cabeca',
                        'teve_nausea',
                        'teve_dificuldade_para_respirar',
                        'teve_nariz_entupido_ou_escorrendo',
                        'teve_fadiga',
                        'teve_dor_nos_olhos',
                        'teve_perda_de_cheiro_ou_sabor',
                        'teve_dor_muscular',
                        'teve_diarreia',
                        'recuperacao_sintomas_em_casa',
                        'providencia_recuperacao_sintomas_procurar_profissional_saude',
                        'recuperacao_sintomas_por_remedios_auto_medicado',
                        'recuperacao_sintomas_por_remedios_medicado',
                        'recuperacao_sintomas_por_sus_a_domicilio',
                        'recuperacao_sintomas_por_medico_particular_a_domicilio',
                        'providencia_recuperacao_sintomas_foi_outra',
                        'buscou_atendimento_ubs',
                        'buscou_atendimento_ps_sus_upa',
                        'buscou_atendimento_hospital_sus',
                        'buscou_atendimento_ambulantorio_forcas_armadas',
                        'buscou_atendimento_ps_privado_forcas_armadas',
                        'ficou_internado_por_1dia_ou_mais',
                        'foi_sedado_entubado_com_respiracao_artificial',
                        'tem_plano_de_saude',
                        'fez_algum_test_covid',
                        'fez_exame_cotonete_swab',
                        'fez_exame_sangue_do_furo_no_dedo',
                        'fez_exame_sangue_veia_braco',
                        'ja_teve_diagnostico_diabetes',
                        'ja_teve_diagnostico_hipertensao',
                        'ja_teve_doenca_respiratoria',
                        'ja_teve_diagnostico_depressao',
                        'ja_teve_diagnostico_cancer',
                        'ja_teve_diagnostico_doencas_coracao',
                        'trabalhou_fez_algum_bico_na_semana_anterior',
                        'ficou_afastado_temporiaramente_do_trabalho_na_semana_anterior',
                        'foi_remunerado_nesse_periodo',
                        'tem_mais_de_um_trabalho',
                        'carteira_assinada_ou_funcionario_estatutario',
                        'home_office_na_semana_passada',
                        'contribuidor_inss',
                        'fez_emprestimo_na_pandemia',
                        'emprestimo_em_banco_ou_financeira',
                        'tem_itens_basico_de_limpeza_em_casa', 
                        'resultado_exame_swab', 
                        'resultado_exame_sangue_do_furo_no_dedo', 
                        'resultado_exame_sangue_veia_braco',
                        'tipo_de_area',
                         'cor_ou_raca',
                         'sexo',
                         'faixa_etaria',
                         'setor_da_empresa_do_trabalho',
                         'trabalho_setor_privado_ou_publico',
                         'tipo_trabalho_ou_cargo',
                         'escolaridade',
                         'tipo_escola_faculdade',
                         'situacao_do_domicilio',
                         'idade_do_morador'
                    ]




In [26]:
df_list = []
for c in cols_to_considered:
    try:
        df = (silver_agregated
              .select('month', 
                      'unidade_federacao', 
                      'capital_uf', 
                      explode(col(c)).alias(c)
                     )
            )
        df_list.append(df)
    except:
        continue




In [27]:
def merge_df(list_df: list):
    return reduce(lambda df1, df2: df1.join(df2, ['month', 'unidade_federacao', 'capital_uf'], 'full'), list_df)




In [28]:
df_gold = merge_df(df_list)




In [34]:
list_cols_to_aggregated2 = ['resultado_exame_swab', 
                            'resultado_exame_sangue_do_furo_no_dedo', 
                            'resultado_exame_sangue_veia_braco'
                            ]
to_ignore = ['tipo_de_area',
             'cor_ou_raca',
             'sexo',
             'faixa_etaria',
             'setor_da_empresa_do_trabalho',
             'trabalho_setor_privado_ou_publico',
             'tipo_trabalho_ou_cargo',
             'escolaridade',
             'tipo_escola_faculdade',
             'situacao_do_domicilio',
              'idade_do_morador',
             'month', 
             'unidade_federacao', 
             'capital_uf'
        ]

list_cols_to_aggregated = [c for c in df_gold.columns if c not in list_cols_to_aggregated2 and c not in to_ignore]




In [35]:
agg_list = []
for c in list_cols_to_aggregated:
    case_sim = when(col(c) == 'sim', 1).otherwise(0)
    case_nao = when(col(c) == 'nao', 1).otherwise(0)
    case_nao_sabe = when(col(c) == 'nao_sabe', 1).otherwise(0)
    case_nao_aplicavel = when(col(c) == 'nao_aplicavel', 1).otherwise(0)

    agg_list.append(sum(case_sim).alias(f'qtd_pessoas_sim_para_{c}'))
    agg_list.append(sum(case_nao).alias(f'qtd_pessoas_nao_para_{c}'))
    agg_list.append(sum(case_nao_sabe).alias(f'qtd_pessoas_nao_sabe_para_{c}'))
    agg_list.append(sum(case_nao_aplicavel).alias(f'qtd_pessoas_nao_aplicavel_para_{c}'))




In [36]:
for c in list_cols_to_aggregated2:
    case_positivo = when(col(c) == 'Positivo', 1).otherwise(0)
    case_negativo = when(col(c).isin('Negativo', 'Inconclusivo'), 1).otherwise(0)
    agg_list.append(sum(case_positivo).alias(f'qtd_pessoas_positivo_para_{c}'))
    agg_list.append(sum(case_negativo).alias(f'qtd_pessoas_negativo_para_{c}'))




In [37]:
df_gold_aggregated = (df_gold
                    .withColumn('faixa_etaria', when(col('idade_do_morador').cast('int') <= 25, '0-18')
                                .when((col('idade_do_morador').cast('int') > 18) & (col('idade_do_morador').cast('int') <= 35), '18-35')
                                .when((col('idade_do_morador').cast('int') > 35) & (col('idade_do_morador').cast('int') <= 45), '36-45')
                                .when((col('idade_do_morador').cast('int') > 45) & (col('idade_do_morador').cast('int') <= 55), '46-55')
                                .when((col('idade_do_morador').cast('int') > 55) & (col('idade_do_morador').cast('int') <= 65), '56-65')
                                .when(col('idade_do_morador').cast('int') > 65, '65+')
                                .otherwise('Unknown')
                                )
                        .groupBy('month', 
                                 'unidade_federacao', 
                                 'tipo_de_area',
                                 'cor_ou_raca',
                                 'sexo',
                                 'faixa_etaria',
                                 'setor_da_empresa_do_trabalho',
                                 'trabalho_setor_privado_ou_publico',
                                 'tipo_trabalho_ou_cargo',
                                 'escolaridade',
                                 'tipo_escola_faculdade',
                                 'situacao_do_domicilio'
                                )
                        .agg(*agg_list)
                    )                       




#### Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog


In [38]:
df_gold_aggregated.show(5)

Py4JJavaError: An error occurred while calling o6934.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: ShuffleMapStage 71 (showString at NativeMethodAccessorImpl.java:0) has failed the maximum allowable number of times: 4. Most recent failure reason:
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 2 partition 7
	at org.apache.spark.MapOutputTracker$.validateStatus(MapOutputTracker.scala:1701)
	at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$10(MapOutputTracker.scala:1648)
	at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$10$adapted(MapOutputTracker.scala:1647)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:1647)
	at org.apache.spark.MapOutputTrackerWorker.getMap

In [None]:
from awsglue.dynamicframe import DynamicFrame
dynamic_frame = DynamicFrame.fromDF(df_gold_aggregated, glueContext, "dynamic_frame")

s3output = glueContext.getSink(
  path = f"s3://{gold_bucket_name}/{path_gold_df}",
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",           # Important for catalog creation
  partitionKeys=['month'],                       # Optional: specify if you want partitions
  compression="snappy",                          # Recommended for Parquet
  enableUpdateCatalog=True,                      # This triggers Glue Catalog update/create
  transformation_ctx="s3output",
)

s3output.setCatalogInfo(
  catalogDatabase=catalogDatabase,               # Glue database (must exist)
  catalogTableName=catalogTableName              # Table will be created if not existing
)

s3output.setFormat("glueparquet")                # Recommended format for Glue compatibility
s3output.writeDynamicFrame(dynamic_frame) 