#Etapa 1

Instalando pyspark

In [None]:
!pip install pyspark
!pip install names

Importando bibliotecas

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Exercicio Intro") \
    .getOrCreate()

Criando a coluna nome

In [None]:
df_nomes = spark.read.csv("nomes_aleatorios.txt", header=False, inferSchema=True)
df_nomes = df_nomes.withColumnRenamed("_c0", "nome")

df_nomes.show(5)

#Etapa 2

Ver o Schema

In [None]:
df_nomes.printSchema()

Renomeia coluna e mostrar os top 10

In [None]:
df_nomes = df_nomes.withColumnRenamed("nome", "Nomes")

df_nomes.show(10)

#Etapa 3

Adicionando coluna escolaridade

In [7]:
from pyspark.sql.functions import rand, when

df_nomes = df_nomes.withColumn(
    "Escolaridade",
    when(rand() < 0.33, "Fundamental")
    .when(rand() < 0.66, "Medio")
    .otherwise("Superior")
)

#Etapa 4

Adicionando coluna país

In [None]:
from pyspark.sql.functions import col, floor
from pyspark.sql.types import StringType

paises = ["Brasil", "Argentina", "Colômbia", "Peru", "Venezuela", "Chile",
          "Equador", "Bolívia", "Paraguai", "Uruguai", "Guiana",
          "Suriname", "Guiana Francesa"]

paisesNumeros = len(paises)
df_nomes = df_nomes.withColumn(
    "Pais",
    floor(rand() * paisesNumeros).cast("int")
).withColumn(
    "Pais", col("Pais").cast(StringType())
)

df_nomes = df_nomes.replace(
    {str(i): pais for i, pais in enumerate(paises)}, subset=["Pais"]
)

df_nomes.show(10)

#Etapa 5

Importando biblioteca e definindo seed

In [13]:
from pyspark.sql.functions import rand, floor


Adicionando coluna AnoNascimento

In [None]:
df_nomes = df_nomes.withColumn("AnoNascimento", floor(rand(seed=40) * (2010 - 1945 + 1) + 1945).cast("integer"))

df_nomes.show()

#Etapa 6

Pessoa do século XXI

In [None]:
df_select = df_nomes.select("Nomes","AnoNascimento").where("AnoNascimento >= 2001")
df_select.show(10)

#Etapa 7

Pessoa do século XXI, em SQL

In [None]:
df_nomes.createOrReplaceTempView("pessoas")
spark.sql("SELECT Nomes, AnoNascimento FROM pessoas WHERE AnoNascimento >= 2001").show(10)

#Etapa 8

Pessoas da geração Millennials


In [None]:
millennials = (df_nomes["AnoNascimento"] >= 1980) & (df_nomes["AnoNascimento"] <= 1994)
df_nomes.filter(millennials).count()

#Etapa 9

Pessoas da geração Millennials, em SQL


In [None]:
df_nomes.createOrReplaceTempView("pessoas")

millennialsSQL = spark.sql("""
    SELECT COUNT(*) AS Millennials
    FROM pessoas
    WHERE AnoNascimento BETWEEN 1980 AND 1994
""")

millennialsSQL.show()

#Etapa 10

Tabela com pessoas e suas gerações em cada país

In [None]:
df_nomes.createOrReplaceTempView("pessoas")

geracoesPais = spark.sql("""
SELECT Pais,
       CASE
           WHEN AnoNascimento BETWEEN 1944 AND 1964 THEN 'Baby Boomers'
           WHEN AnoNascimento BETWEEN 1965 AND 1979 THEN 'Geração X'
           WHEN AnoNascimento BETWEEN 1980 AND 1994 THEN 'Millennials'
           WHEN AnoNascimento BETWEEN 1995 AND 2015 THEN 'Geração Z'
           ELSE 'Outra Geração'
       END AS Geracao,
       COUNT(*) AS Quantidade
FROM pessoas
GROUP BY Pais, Geracao
ORDER BY Pais, Geracao, Quantidade
""")

geracoesPais.show()