In [74]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext
from pyspark.sql.functions import lit, rand, when, col, array, element_at
import random

In [75]:
# Inicializar SparkSession
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Exercicio Intro") \
    .getOrCreate()


In [76]:
# Etapa 1 
# Ler o arquivo nomes_aleatorios.txt
df_nomes = spark.read.csv("nomes_aleatorios.txt", header=False).withColumnRenamed("_c0", "Nomes")


In [77]:
# Etapa 2
#  Renomeando a primeira coluna do df para "nomes"
df_nomes = df_nomes.withColumnRenamed("_c0", "Nomes")

# Verificar o schema após a renomeação
df_nomes.printSchema()

# Mostrar as primeiras 10 linhas do DataFrame renomeado
df_nomes.show(10)


root
 |-- Nomes: string (nullable = true)

+-----------------+
|            Nomes|
+-----------------+
|   Frances Bennet|
|    Jamie Russell|
|   Edward Kistler|
|    Sheila Maurer|
| Donald Golightly|
|       David Gray|
|      Joy Bennett|
|      Paul Kriese|
|Berniece Ornellas|
|    Brian Farrell|
+-----------------+
only showing top 10 rows



In [78]:
# Etapa 3
# Lista de escolaridades
escolaridades = ["Fundamental", "Medio", "Superior"]

# Adicionar a coluna Escolaridade com valores aleatórios
df_nomes = df_nomes.withColumn("Escolaridade",
                               when(rand() < 0.33, "Fundamental")
                               .when(rand() < 0.66, "Medio")
                               .otherwise("Superior"))

df_nomes.show(10)

+-----------------+------------+
|            Nomes|Escolaridade|
+-----------------+------------+
|   Frances Bennet| Fundamental|
|    Jamie Russell|    Superior|
|   Edward Kistler|       Medio|
|    Sheila Maurer| Fundamental|
| Donald Golightly|       Medio|
|       David Gray|       Medio|
|      Joy Bennett|       Medio|
|      Paul Kriese|    Superior|
|Berniece Ornellas| Fundamental|
|    Brian Farrell|       Medio|
+-----------------+------------+
only showing top 10 rows



In [79]:
# Etapa 4
# Definir a lista de países
paises = ["Brasil", "Argentina", "Chile", "Peru", "Colombia"]

# Adicionar coluna de País com valores aleatórios
df_nomes = df_nomes.withColumn("Pais",
                               when(rand() < 0.2, "Brasil")
                               .when(rand() < 0.4, "Argentina")
                               .when(rand() < 0.6, "Chile")
                               .when(rand() < 0.8, "Peru")
                               .otherwise("Colombia"))

# Mostrar as primeiras 10 linhas para verificar a distribuição
df_nomes.show(10)

+-----------------+------------+---------+
|            Nomes|Escolaridade|     Pais|
+-----------------+------------+---------+
|   Frances Bennet| Fundamental|Argentina|
|    Jamie Russell|    Superior|    Chile|
|   Edward Kistler|       Medio|   Brasil|
|    Sheila Maurer| Fundamental|   Brasil|
| Donald Golightly|       Medio|Argentina|
|       David Gray|       Medio|     Peru|
|      Joy Bennett|       Medio|   Brasil|
|      Paul Kriese|    Superior|   Brasil|
|Berniece Ornellas| Fundamental|Argentina|
|    Brian Farrell|       Medio|Argentina|
+-----------------+------------+---------+
only showing top 10 rows



In [80]:
# Etapa 5
# Adicionar coluna de Ano de Nascimento com valores aleatórios entre 1945 e 2010
df_nomes = df_nomes.withColumn("AnoNascimento", (rand() * (2010 - 1945) + 1945).cast("int"))

# Mostrar as primeiras 10 linhas do DataFrame atualizado
df_nomes.show(10)

+-----------------+------------+---------+-------------+
|            Nomes|Escolaridade|     Pais|AnoNascimento|
+-----------------+------------+---------+-------------+
|   Frances Bennet| Fundamental|Argentina|         1970|
|    Jamie Russell|    Superior|    Chile|         1958|
|   Edward Kistler|       Medio|   Brasil|         1977|
|    Sheila Maurer| Fundamental|   Brasil|         1996|
| Donald Golightly|       Medio|Argentina|         1980|
|       David Gray|       Medio|     Peru|         1953|
|      Joy Bennett|       Medio|   Brasil|         1962|
|      Paul Kriese|    Superior|   Brasil|         1964|
|Berniece Ornellas| Fundamental|Argentina|         1997|
|    Brian Farrell|       Medio|Argentina|         1998|
+-----------------+------------+---------+-------------+
only showing top 10 rows



In [81]:
# Etapa 6
# Selecionar pessoas nascidas a partir do ano 2000 usando o select do dataframe
df_select = df_nomes.filter(df_nomes.AnoNascimento >= 2000)

# Mostrar as primeiras 10 linhas
df_select.show(10)


+--------------------+------------+---------+-------------+
|               Nomes|Escolaridade|     Pais|AnoNascimento|
+--------------------+------------+---------+-------------+
|       Tracy Herring| Fundamental|   Brasil|         2004|
|        Leroy Strahl|       Medio|   Brasil|         2008|
|      Lorenzo Woodis| Fundamental|Argentina|         2005|
|       Ricky Gilbert|    Superior|Argentina|         2006|
|       Michael Agnew|       Medio|     Peru|         2003|
|       George Miller|       Medio|   Brasil|         2009|
|Christopher Williams|    Superior|Argentina|         2001|
|           Ana Baker|       Medio|    Chile|         2005|
|       Evelyn Shaver| Fundamental|     Peru|         2001|
|         Shelia Ceja|    Superior|    Chile|         2008|
+--------------------+------------+---------+-------------+
only showing top 10 rows



In [82]:
# Etapa 7
# Registrar a tabela temporária
df_nomes.createOrReplaceTempView("pessoas")

# Executar consulta SQL com o select SQL
df_select_sql = spark.sql("SELECT * FROM pessoas WHERE AnoNascimento >= 2000")

# Mostrar as primeiras 10 linhas
df_select_sql.show(10)


+--------------------+------------+---------+-------------+
|               Nomes|Escolaridade|     Pais|AnoNascimento|
+--------------------+------------+---------+-------------+
|       Tracy Herring| Fundamental|   Brasil|         2004|
|        Leroy Strahl|       Medio|   Brasil|         2008|
|      Lorenzo Woodis| Fundamental|Argentina|         2005|
|       Ricky Gilbert|    Superior|Argentina|         2006|
|       Michael Agnew|       Medio|     Peru|         2003|
|       George Miller|       Medio|   Brasil|         2009|
|Christopher Williams|    Superior|Argentina|         2001|
|           Ana Baker|       Medio|    Chile|         2005|
|       Evelyn Shaver| Fundamental|     Peru|         2001|
|         Shelia Ceja|    Superior|    Chile|         2008|
+--------------------+------------+---------+-------------+
only showing top 10 rows



In [83]:
# Etapa 8
# Filtrar e contar as pessoas da geração Millennials (nascidos entre 1980 e 1994) no dataset
millennials_count = df_nomes.select("AnoNascimento").filter((col("AnoNascimento") >= 1980) & (col("AnoNascimento") <= 1994)).count()

# Exibir o resultado
print(f"Total de pessoas da geração Millennials: {millennials_count}")

[Stage 75:>                                                         (0 + 8) / 8]

Total de pessoas da geração Millennials: 2310205


                                                                                

In [84]:
# Etapa 9
# Registrar a tabela temporária
df_nomes.createOrReplaceTempView("contagem")

# Executar a consulta SQL para contar as pessoas da geração Millennials
millennials_count_sql = spark.sql("""
    SELECT COUNT(*) as Total
    FROM contagem
    WHERE AnoNascimento BETWEEN 1980 AND 1994
""").collect()[0]['Total']

# Exibir o resultado
print(f"Total de pessoas da geração Millennials: {millennials_count_sql}")

Total de pessoas da geração Millennials: 2310205


In [85]:
# Adicionar coluna de Geração usando a função `when`
df_nomes = df_nomes.withColumn("Geracao", 
                               when(col("AnoNascimento").between(1944, 1964), "Baby Boomers")
                               .when(col("AnoNascimento").between(1965, 1979), "Geração X")
                               .when(col("AnoNascimento").between(1980, 1994), "Millennials")
                               .when(col("AnoNascimento").between(1995, 2015), "Geração Z"))


In [86]:
# Registrar a tabela temporária com o nome "geracao"
df_nomes.createOrReplaceTempView("geracao")

# Executar a consulta SQL para agrupar os dados por País e Geracao e contar as ocorrências
resultado = spark.sql("""
    SELECT 
        Pais,
        Geracao,
        COUNT(*) as Quantidade
    FROM geracao
    GROUP BY Pais, Geracao
    ORDER BY Pais ASC, Geracao ASC
""")


# Mostrar o resultado
resultado.show(50, truncate=False)


[Stage 81:>                                                         (0 + 8) / 8]

+---------+------------+----------+
|Pais     |Geracao     |Quantidade|
+---------+------------+----------+
|Argentina|Baby Boomers|983132    |
|Argentina|Geração X   |739076    |
|Argentina|Geração Z   |737381    |
|Argentina|Millennials |739438    |
|Brasil   |Baby Boomers|614653    |
|Brasil   |Geração X   |462387    |
|Brasil   |Geração Z   |462209    |
|Brasil   |Millennials |462478    |
|Chile    |Baby Boomers|885751    |
|Chile    |Geração X   |663869    |
|Chile    |Geração Z   |663621    |
|Chile    |Millennials |665801    |
|Colombia |Baby Boomers|118434    |
|Colombia |Geração X   |88797     |
|Colombia |Geração Z   |88192     |
|Colombia |Millennials |88253     |
|Peru     |Baby Boomers|472947    |
|Peru     |Geração X   |354057    |
|Peru     |Geração Z   |355289    |
|Peru     |Millennials |354235    |
+---------+------------+----------+



                                                                                