In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [2]:
from pyspark.sql.functions import *

In [3]:
# Ler os arquivos

df = spark.read.format("csv").option("header","true").load("./nomes.csv")

# Na AWS utilizando o GlueContext

df_dynamic = glueContext.create_dynamic_frame.from_options(
    "s3",
    {
        "paths": [
            source_file
        ]
    },
    "csv",
    {"withHeader": True, "separator": ","},

)

In [4]:
# Imprima o schema do dataframe gerado no passo anterior.
df.schema

StructType([StructField('nome', StringType(), True), StructField('sexo', StringType(), True), StructField('total', StringType(), True), StructField('ano', StringType(), True)])

In [5]:
# Escrever o código necessário para alterar a caixa dos valores da coluna nome para MAIÚSCULO.

df_upper = df.withColumn('nome', upper('nome'))

# Na AWS 
def uppercase(rec):
    rec["nome"] = rec["nome"].upper()
    return rec

df = Map.apply(frame=df_dynamic, f=uppercase)

In [6]:
# Imprimir a contagem de linhas presentes no dataframe.

df.count()

1825433

In [7]:
# Imprimir a contagem de nomes, agrupando os dados do dataframe pelas colunas ano e sexo.
# Ordene os dados de modo que o ano mais recente apareça como primeiro registro do dataframe.

df.groupBy('ano', 'sexo').agg(count('*').alias('Contagem')).orderBy(desc('ano')).show()

+----+----+--------+
| ano|sexo|Contagem|
+----+----+--------+
|2014|   F|   19067|
|2014|   M|   13977|
|2013|   F|   19191|
|2013|   M|   14012|
|2012|   F|   19468|
|2012|   M|   14216|
|2011|   F|   19540|
|2011|   M|   14329|
|2010|   M|   14241|
|2010|   F|   19800|
|2009|   F|   20165|
|2009|   M|   14519|
|2008|   M|   14606|
|2008|   F|   20439|
|2007|   M|   14383|
|2007|   F|   20548|
|2006|   F|   20043|
|2006|   M|   14026|
|2005|   F|   19175|
|2005|   M|   13358|
+----+----+--------+
only showing top 20 rows



In [8]:
# Apresentar qual foi o nome feminino com mais registros e em que ano ocorreu.

df.filter(df.sexo == 'F').groupBy(df.nome, df.ano).agg(sum(df.total).alias('Registros')).orderBy(desc('Registros')).limit(1).show()

+-----+----+---------+
| nome| ano|Registros|
+-----+----+---------+
|Linda|1947|  99680.0|
+-----+----+---------+



In [9]:
# Apresentar qual foi o nome masculino com mais registros e em que ano ocorreu.
df.filter(df.sexo == 'M').groupBy(df.nome, df.ano).agg(sum(df.total).alias('Registros')).orderBy(desc('Registros')).limit(1).show()

+-----+----+---------+
| nome| ano|Registros|
+-----+----+---------+
|James|1947|  94755.0|
+-----+----+---------+



In [10]:
# Apresentar o total de registros (masculinos e femininos) para cada ano presente no dataframe.
# Considere apenas as primeiras 10 linhas, ordenadas pelo ano, de forma crescente.

df.groupBy(df.ano, df.sexo).agg(sum(df.total).alias('Registros')).orderBy(asc('ano')).limit(10).show()


+----+----+---------+
| ano|sexo|Registros|
+----+----+---------+
|1880|   M| 110491.0|
|1880|   F|  90993.0|
|1881|   M| 100745.0|
|1881|   F|  91954.0|
|1882|   M| 113688.0|
|1882|   F| 107850.0|
|1883|   M| 104629.0|
|1883|   F| 112321.0|
|1884|   M| 114445.0|
|1884|   F| 129022.0|
+----+----+---------+



In [12]:
# Particionar no S3

df.write.option("header", True) \
        .partitionBy("sexo", "ano") \
        .mode("overwrite") \
        .json(target_path)

<pyspark.sql.readwriter.DataFrameWriter at 0x7f3b3040afd0>