In [None]:
import SparkSessionSession
from pyspark.sql import SparkSession
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, upper, count, max, desc

## @params: [JOB_NAME, S3_INPUT_PATH, S3_OUTPUT_PATH]
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'S3_INPUT_PATH', 'S3_OUTPUT_PATH'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

#Parâmetros de entrada e saída
source_file = args['S3_INPUT_PATH']
output_path = args['S3_OUTPUT_PATH']

#Ler o arquivo CSV no S3
df = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": [source_file]},
    format="csv",
    format_options={"withHeader": True}
)

#Schema do DataFrame
df_spark = df.toDF()
print("Schema do DataFrame:")
df_spark.printSchema()

#Alterar os valores da coluna 'nome' para MAIÚSCULO
df_upper = df_spark.withColumn("nome", upper(col("nome")))

#Contagem de linhas presentes no DataFrame
row_count = df_upper.count()
print(f"Total de linhas no DataFrame: {row_count}")

#Ano e sexo, ordenando pelo ano mais recente
grouped_count = (
    df_upper.groupBy("ano", "sexo")
    .agg(count("*").alias("contagem"))
    .orderBy(desc("ano"))
)
print("Contagem de nomes agrupados por ano e sexo:")
grouped_count.show()

#Nome feminino com mais registros e o ano em que ocorreu
most_common_female = (
    df_upper.filter(col("sexo") == "F")
    .groupBy("nome", "ano")
    .agg(count("*").alias("contagem"))
    .orderBy(desc("contagem"))
    .limit(1)
)
print("Nome feminino com mais registros e o ano:")
most_common_female.show()

#Nome masculino com mais registros e o ano em que ocorreu
most_common_male = (
    df_upper.filter(col("sexo") == "M")
    .groupBy("nome", "ano")
    .agg(count("*").alias("contagem"))
    .orderBy(desc("contagem"))
    .limit(1)
)
print("Nome masculino com mais registros e o ano:")
most_common_male.show()

#Total de registros para cada ano
total_per_year = (
    df_upper.groupBy("ano")
    .agg(count("*").alias("total_registros"))
    .orderBy("ano")
    .limit(10)
)
print("Total de registros por ano (10 primeiros anos):")
total_per_year.show()

#Gravação no S3 em JSON, com particionamento por 'sexo' e 'ano'
df_upper.write.partitionBy("sexo", "ano").mode("overwrite").json(f"{output_path}/frequencia_registro_nomes_eua")

print(f"DataFrame gravado no S3 em {output_path}/frequencia_registro_nomes_eua")

job.commit()
