# Etapa 1 - Configuração inicial e leitura:

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, rand, expr

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Exercicio Intro") \
    .getOrCreate()

df_nomes = spark.read.csv("nomes_aleatorios.txt")
df_nomes.show(5)

+----------------+
|             _c0|
+----------------+
|  Frances Bennet|
|   Jamie Russell|
|  Edward Kistler|
|   Sheila Maurer|
|Donald Golightly|
+----------------+
only showing top 5 rows



# Etapa 2 - Schema:

In [2]:
df_nomes = df_nomes.withColumnRenamed("_c0", "Nomes")
df_nomes.printSchema()
df_nomes.show(10)

root
 |-- Nomes: string (nullable = true)

+-----------------+
|            Nomes|
+-----------------+
|   Frances Bennet|
|    Jamie Russell|
|   Edward Kistler|
|    Sheila Maurer|
| Donald Golightly|
|       David Gray|
|      Joy Bennett|
|      Paul Kriese|
|Berniece Ornellas|
|    Brian Farrell|
+-----------------+
only showing top 10 rows



# Etapa 3-5 - Novas colunas:

In [23]:
from pyspark.sql import functions as F
from pyspark.sql.types import *

# Lista de valores possíveis para cada coluna
escolaridade = ["Fundamental", "Medio", "Superior"]
paises_america_sul = [
    "Argentina", "Bolívia", "Brasil", "Chile", "Colômbia",
    "Equador", "Guiana", "Paraguai", "Peru", "Suriname",
    "Uruguai", "Venezuela", "Guiana Francesa"
]

# Função para criar array literal do Spark
def create_array_literal(values):
    return F.array([F.lit(x) for x in values])

# Adicionando as novas colunas
df_nomes = df_nomes.withColumn(
    "Escolaridade",
    F.element_at(
        create_array_literal(escolaridade),
        (F.rand() * F.lit(len(escolaridade))).cast("int") + 1
    )
).withColumn(
    "Pais",
    F.element_at(
        create_array_literal(paises_america_sul),
        (F.rand() * F.lit(len(paises_america_sul))).cast("int") + 1
    )
).withColumn(
    "AnoNascimento",
    ((F.rand() * (2010 - 1945 + 1)) + 1945).cast("int")
)

# Mostrando os 10 primeiros resultados
df_nomes.show(10)

# Mostrando os 10 primeiros resultados apenas das novas colunas
print("\nApenas as novas colunas:")
df_nomes.select("Escolaridade", "Pais", "AnoNascimento").show(10)

+---+------------+---------+-------------+
| ID|Escolaridade|     Pais|AnoNascimento|
+---+------------+---------+-------------+
|  0|       Medio|   Brasil|         1999|
|  1| Fundamental|  Equador|         1983|
|  2|       Medio|    Chile|         1989|
|  3|    Superior|  Uruguai|         1991|
|  4| Fundamental|Venezuela|         2003|
|  5| Fundamental| Suriname|         1963|
|  6|       Medio| Colômbia|         2003|
|  7|       Medio|Argentina|         1951|
|  8|       Medio|Argentina|         1972|
|  9| Fundamental| Paraguai|         1993|
+---+------------+---------+-------------+


Apenas as novas colunas:
+------------+---------+-------------+
|Escolaridade|     Pais|AnoNascimento|
+------------+---------+-------------+
|       Medio|   Brasil|         1999|
| Fundamental|  Equador|         1983|
|       Medio|    Chile|         1989|
|    Superior|  Uruguai|         1991|
| Fundamental|Venezuela|         2003|
| Fundamental| Suriname|         1963|
|       Medio| Colôm

# Etapa 6-7 - Filtragens:

In [28]:
# Etapa 6 - Usando método select
df_select = df_nomes.filter(df_nomes.AnoNascimento >= 2000)

# Mostrando 10 registros do resultado usando select
print("Resultado usando método select:")
df_select.show(10)

# Etapa 7 - Usando Spark SQL
# Criando uma view temporária
df_nomes.createOrReplaceTempView("pessoas")

# Executando a mesma consulta usando SQL
df_sql = spark.sql("""
    SELECT *
    FROM pessoas
    WHERE AnoNascimento >= 2000
""")

# Mostrando 10 registros do resultado usando SQL
print("\nResultado usando Spark SQL:")
df_sql.show(10)

# Verificando se os resultados são equivalentes
print("\nTotal de registros usando select:", df_select.count())
print("Total de registros usando SQL:", df_sql.count())

Resultado usando método select:
+---+------------+---------+-------------+
| ID|Escolaridade|     Pais|AnoNascimento|
+---+------------+---------+-------------+
|  4| Fundamental|Venezuela|         2003|
|  6|       Medio| Colômbia|         2003|
+---+------------+---------+-------------+


Resultado usando Spark SQL:
+---+------------+---------+-------------+
| ID|Escolaridade|     Pais|AnoNascimento|
+---+------------+---------+-------------+
|  4| Fundamental|Venezuela|         2003|
|  6|       Medio| Colômbia|         2003|
+---+------------+---------+-------------+


Total de registros usando select: 2
Total de registros usando SQL: 2


# etapas 8 e 9

In [29]:
# Etapa 8 - Usando método filter
millennials_count = df_nomes.filter(
    (df_nomes.AnoNascimento >= 1980) &
    (df_nomes.AnoNascimento <= 1994)
).count()

print("Usando método filter:")
print(f"Número de Millennials (nascidos entre 1980 e 1994): {millennials_count}")

# Mostrando alguns exemplos dos Millennials encontrados
print("\nExemplos de Millennials encontrados:")
df_nomes.filter(
    (df_nomes.AnoNascimento >= 1980) &
    (df_nomes.AnoNascimento <= 1994)
).show(5)

# Etapa 9 - Usando Spark SQL
# Garantindo que a view temporária existe
df_nomes.createOrReplaceTempView("pessoas")

# Executando a contagem via SQL
millennials_sql = spark.sql("""
    SELECT COUNT(*) as total_millennials
    FROM pessoas
    WHERE AnoNascimento >= 1980
    AND AnoNascimento <= 1994
""")

print("\nUsando Spark SQL:")
print("Número de Millennials (nascidos entre 1980 e 1994):")
millennials_sql.show()

# Mostrando alguns exemplos via SQL
print("\nExemplos de Millennials encontrados via SQL:")
spark.sql("""
    SELECT *
    FROM pessoas
    WHERE AnoNascimento >= 1980
    AND AnoNascimento <= 1994
    LIMIT 5
""").show()

Usando método filter:
Número de Millennials (nascidos entre 1980 e 1994): 4

Exemplos de Millennials encontrados:
+---+------------+--------+-------------+
| ID|Escolaridade|    Pais|AnoNascimento|
+---+------------+--------+-------------+
|  1| Fundamental| Equador|         1983|
|  2|       Medio|   Chile|         1989|
|  3|    Superior| Uruguai|         1991|
|  9| Fundamental|Paraguai|         1993|
+---+------------+--------+-------------+


Usando Spark SQL:
Número de Millennials (nascidos entre 1980 e 1994):
+-----------------+
|total_millennials|
+-----------------+
|                4|
+-----------------+


Exemplos de Millennials encontrados via SQL:
+---+------------+--------+-------------+
| ID|Escolaridade|    Pais|AnoNascimento|
+---+------------+--------+-------------+
|  1| Fundamental| Equador|         1983|
|  2|       Medio|   Chile|         1989|
|  3|    Superior| Uruguai|         1991|
|  9| Fundamental|Paraguai|         1993|
+---+------------+--------+----------

# Etapa 10 - Análise final:

In [30]:
# Garantindo que a view temporária existe
df_nomes.createOrReplaceTempView("pessoas")

# Criando a query SQL para classificar e contar as gerações por país
df_geracoes = spark.sql("""
    SELECT
        Pais,
        CASE
            WHEN AnoNascimento BETWEEN 1944 AND 1964 THEN 'Baby Boomers'
            WHEN AnoNascimento BETWEEN 1965 AND 1979 THEN 'Geração X'
            WHEN AnoNascimento BETWEEN 1980 AND 1994 THEN 'Millennials'
            WHEN AnoNascimento BETWEEN 1995 AND 2015 THEN 'Geração Z'
        END as Geracao,
        COUNT(*) as Quantidade
    FROM pessoas
    WHERE AnoNascimento BETWEEN 1944 AND 2015
    GROUP BY
        Pais,
        CASE
            WHEN AnoNascimento BETWEEN 1944 AND 1964 THEN 'Baby Boomers'
            WHEN AnoNascimento BETWEEN 1965 AND 1979 THEN 'Geração X'
            WHEN AnoNascimento BETWEEN 1980 AND 1994 THEN 'Millennials'
            WHEN AnoNascimento BETWEEN 1995 AND 2015 THEN 'Geração Z'
        END
    ORDER BY Pais, Geracao, Quantidade
""")

# Mostrando todos os resultados
print("Distribuição de gerações por país:")
df_geracoes.show(100, truncate=False)  # Aumentando o limite para mostrar todas as linhas

# Mostrando o total geral por geração
print("\nTotal por geração:")
spark.sql("""
    SELECT
        CASE
            WHEN AnoNascimento BETWEEN 1944 AND 1964 THEN 'Baby Boomers'
            WHEN AnoNascimento BETWEEN 1965 AND 1979 THEN 'Geração X'
            WHEN AnoNascimento BETWEEN 1980 AND 1994 THEN 'Millennials'
            WHEN AnoNascimento BETWEEN 1995 AND 2015 THEN 'Geração Z'
        END as Geracao,
        COUNT(*) as Total
    FROM pessoas
    WHERE AnoNascimento BETWEEN 1944 AND 2015
    GROUP BY
        CASE
            WHEN AnoNascimento BETWEEN 1944 AND 1964 THEN 'Baby Boomers'
            WHEN AnoNascimento BETWEEN 1965 AND 1979 THEN 'Geração X'
            WHEN AnoNascimento BETWEEN 1980 AND 1994 THEN 'Millennials'
            WHEN AnoNascimento BETWEEN 1995 AND 2015 THEN 'Geração Z'
        END
    ORDER BY Geracao
""").show()

Distribuição de gerações por país:
+---------+------------+----------+
|Pais     |Geracao     |Quantidade|
+---------+------------+----------+
|Argentina|Baby Boomers|1         |
|Argentina|Geração X   |1         |
|Brasil   |Geração Z   |1         |
|Chile    |Millennials |1         |
|Colômbia |Geração Z   |1         |
|Equador  |Millennials |1         |
|Paraguai |Millennials |1         |
|Suriname |Baby Boomers|1         |
|Uruguai  |Millennials |1         |
|Venezuela|Geração Z   |1         |
+---------+------------+----------+


Total por geração:
+------------+-----+
|     Geracao|Total|
+------------+-----+
|Baby Boomers|    2|
|   Geração X|    1|
|   Geração Z|    3|
| Millennials|    4|
+------------+-----+

