# Exercicio Apache Spark

### Etapa 1

In [107]:
from pyspark.sql import SparkSession

# Cria a Spark Session
spark = SparkSession.builder.master("local[*]").appName("Exercicio Intro").getOrCreate()

# Nome do arquivo csv
csv = "nomes_aleatorios.txt"

# Lê o arquivo CSV e cria o DataFrame
df_nomes = spark.read.csv(csv, header=False, inferSchema=True)

# Mostra as primeiras 5 linhas do DataFrame
df_nomes.show(5)

+----------------+
|             _c0|
+----------------+
|  Frances Bennet|
|   Jamie Russell|
|  Edward Kistler|
|   Sheila Maurer|
|Donald Golightly|
+----------------+
only showing top 5 rows



---

### Etapa 2

In [108]:
# Renomeia a coluna para 'Nomes'
df_nomes = df_nomes.withColumnRenamed("_c0", "Nomes")

# Imprime o esquema do DataFrame
df_nomes.printSchema()

# Mostra 10 linhas do DataFrame
df_nomes.show(10)

root
 |-- Nomes: string (nullable = true)

+-----------------+
|            Nomes|
+-----------------+
|   Frances Bennet|
|    Jamie Russell|
|   Edward Kistler|
|    Sheila Maurer|
| Donald Golightly|
|       David Gray|
|      Joy Bennett|
|      Paul Kriese|
|Berniece Ornellas|
|    Brian Farrell|
+-----------------+
only showing top 10 rows



---

### Etapa 3

In [109]:
from pyspark.sql.functions import lit, when, rand

# Adiciona a coluna 'Escolaridade' com valores aleatórios
df_nomes = df_nomes.withColumn(
    "Escolaridade",
    when(rand() < 0.33, lit("Fundamental"))
    .when((rand() >= 0.33) & (rand() < 0.66), lit("Medio"))
    .otherwise(lit("Superior"))
)

df_nomes.show(10)


+-----------------+------------+
|            Nomes|Escolaridade|
+-----------------+------------+
|   Frances Bennet| Fundamental|
|    Jamie Russell|    Superior|
|   Edward Kistler| Fundamental|
|    Sheila Maurer|       Medio|
| Donald Golightly|    Superior|
|       David Gray| Fundamental|
|      Joy Bennett|       Medio|
|      Paul Kriese|       Medio|
|Berniece Ornellas|    Superior|
|    Brian Farrell|       Medio|
+-----------------+------------+
only showing top 10 rows



---

### Etapa 4

In [110]:
from pyspark.sql.functions import lit, rand

# Lista de países da América do Sul
paises = ["Argentina", "Bolívia", "Brasil", "Chile", "Colômbia", "Equador", "Guiana", "Paraguai", "Peru", "Suriname", "Uruguai", "Venezuela", "Guiana Francesa"]

# Adiciona a coluna 'Pais' com valores aleatórios
df_nomes = df_nomes.withColumn(
    "Pais",
    when(rand() < 1/13, lit("Argentina"))
    .when((rand() >= 1/13) & (rand() < 2/13), lit("Bolívia"))
    .when((rand() >= 2/13) & (rand() < 3/13), lit("Brasil"))
    .when((rand() >= 3/13) & (rand() < 4/13), lit("Chile"))
    .when((rand() >= 4/13) & (rand() < 5/13), lit("Colômbia"))
    .when((rand() >= 5/13) & (rand() < 6/13), lit("Equador"))
    .when((rand() >= 6/13) & (rand() < 7/13), lit("Guiana"))
    .when((rand() >= 7/13) & (rand() < 8/13), lit("Paraguai"))
    .when((rand() >= 8/13) & (rand() < 9/13), lit("Peru"))
    .when((rand() >= 9/13) & (rand() < 10/13), lit("Suriname"))
    .when((rand() >= 10/13) & (rand() < 11/13), lit("Uruguai"))
    .when((rand() >= 11/13) & (rand() < 12/13), lit("Venezuela"))
    .otherwise(lit("Guiana Francesa"))
)

df_nomes.show(10)


+-----------------+------------+---------+
|            Nomes|Escolaridade|     Pais|
+-----------------+------------+---------+
|   Frances Bennet| Fundamental|    Chile|
|    Jamie Russell|    Superior|   Brasil|
|   Edward Kistler| Fundamental|Argentina|
|    Sheila Maurer|       Medio| Suriname|
| Donald Golightly|    Superior|  Bolívia|
|       David Gray| Fundamental| Colômbia|
|      Joy Bennett|       Medio| Suriname|
|      Paul Kriese|       Medio|    Chile|
|Berniece Ornellas|    Superior| Colômbia|
|    Brian Farrell|       Medio|    Chile|
+-----------------+------------+---------+
only showing top 10 rows



---

### Etapa 5

In [111]:
from pyspark.sql.functions import lit, when, rand

# Adiciona a coluna 'AnoNascimento' com valores aleatórios entre 1945 e 2010
df_nomes = df_nomes.withColumn(
    "AnoNascimento",
    (lit(1945) + (rand() * (lit(2010) - lit(1945)))).cast("integer")
)

df_nomes.show(10)


+-----------------+------------+---------+-------------+
|            Nomes|Escolaridade|     Pais|AnoNascimento|
+-----------------+------------+---------+-------------+
|   Frances Bennet| Fundamental|    Chile|         1945|
|    Jamie Russell|    Superior|   Brasil|         2004|
|   Edward Kistler| Fundamental|Argentina|         1974|
|    Sheila Maurer|       Medio| Suriname|         1994|
| Donald Golightly|    Superior|  Bolívia|         2009|
|       David Gray| Fundamental| Colômbia|         1969|
|      Joy Bennett|       Medio| Suriname|         2003|
|      Paul Kriese|       Medio|    Chile|         1967|
|Berniece Ornellas|    Superior| Colômbia|         1974|
|    Brian Farrell|       Medio|    Chile|         1955|
+-----------------+------------+---------+-------------+
only showing top 10 rows



---

### Etapa 6

In [112]:
# Seleciona pessoas que nasceram neste século (>= 2000)
df_select = df_nomes.filter(df_nomes.AnoNascimento >= 2000)

# Mostra 10 linhas do DataFrame resultante
df_select.show(10)

+----------------+------------+--------+-------------+
|           Nomes|Escolaridade|    Pais|AnoNascimento|
+----------------+------------+--------+-------------+
|   Jamie Russell|    Superior|  Brasil|         2004|
|Donald Golightly|    Superior| Bolívia|         2009|
|     Joy Bennett|       Medio|Suriname|         2003|
|   Tracy Herring|    Superior|  Brasil|         2002|
|    Ernest Hulet|    Superior|Paraguai|         2003|
|    David Medina|       Medio|  Brasil|         2006|
|    Roxie Bernal| Fundamental|Suriname|         2002|
| Jerry Chynoweth|    Superior|  Brasil|         2007|
|     Milton Rowe|    Superior|Paraguai|         2000|
|    Juliet Liles|    Superior|  Brasil|         2006|
+----------------+------------+--------+-------------+
only showing top 10 rows



---

### Etapa 7

In [113]:
# Cria uma tabela temporária
df_nomes.createOrReplaceTempView("pessoas")

# Executa o comando SQL para selecionar pessoas que nasceram neste século
df_select_sql = spark.sql("SELECT * FROM pessoas WHERE AnoNascimento >= 2000")

# Mostra 10 linhas do DataFrame resultante
df_select_sql.show(10)


+----------------+------------+--------+-------------+
|           Nomes|Escolaridade|    Pais|AnoNascimento|
+----------------+------------+--------+-------------+
|   Jamie Russell|    Superior|  Brasil|         2004|
|Donald Golightly|    Superior| Bolívia|         2009|
|     Joy Bennett|       Medio|Suriname|         2003|
|   Tracy Herring|    Superior|  Brasil|         2002|
|    Ernest Hulet|    Superior|Paraguai|         2003|
|    David Medina|       Medio|  Brasil|         2006|
|    Roxie Bernal| Fundamental|Suriname|         2002|
| Jerry Chynoweth|    Superior|  Brasil|         2007|
|     Milton Rowe|    Superior|Paraguai|         2000|
|    Juliet Liles|    Superior|  Brasil|         2006|
+----------------+------------+--------+-------------+
only showing top 10 rows



---

### Etapa 8

In [114]:
# Conta o número de pessoas da geração Millennials
df_millennials = df_nomes.filter((df_nomes.AnoNascimento >= 1980) & (df_nomes.AnoNascimento <= 1994))
df_millennials_count = df_millennials.count()

print(f"Numero de Millennials: {df_millennials_count}")


Numero de Millennials: 2307870


---

### Etapa 9

In [115]:
# Executa o comando SQL para contar pessoas da geração Millennials
df_millennials_sql = spark.sql("""SELECT COUNT(*) as num_millenials 
                               FROM pessoas  
                               WHERE AnoNascimento BETWEEN 1980 AND 1994""")
df_millennials_sql.show()


+--------------+
|num_millenials|
+--------------+
|       2307870|
+--------------+



---

### Etapa 10

In [116]:
from pyspark.sql.functions import when, lit

# Define as gerações
geracoes = [
    ("Baby Boomers", 1944, 1964),
    ("Geracao X", 1965, 1979),
    ("Millennials", 1980, 1994),
    ("Geracao Z", 1995, 2015)
]

# Inicia a coluna 'Geracao' com valores nulos
df_nomes = df_nomes.withColumn("Geracao", lit(None))

# Atualiza a coluna 'Geracao' com base nos intervalos de anos
for nome, inicio, fim in geracoes:
    df_nomes = df_nomes.withColumn("Geracao", when((df_nomes.AnoNascimento >= inicio) & (df_nomes.AnoNascimento <= fim), lit(nome)).otherwise(df_nomes.Geracao))

# Cria uma tabela temporária com a nova coluna 'Geracao'
df_nomes.createOrReplaceTempView("pessoas_geracoes")

# Executa o comando SQL para obter a quantidade de pessoas por país e geração
df_pessoas_pais_geracao = spark.sql("""
    SELECT Pais, Geracao, COUNT(*) AS Quantidade
    FROM pessoas_geracoes
    GROUP BY Pais, Geracao
    ORDER BY Pais, Geracao
""")

# Mostra todas as linhas do DataFrame resultante
total_linhas = df_pessoas_pais_geracao.count()
df_pessoas_pais_geracao.show(total_linhas, truncate=False)

+---------------+------------+----------+
|Pais           |Geracao     |Quantidade|
+---------------+------------+----------+
|Argentina      |Baby Boomers|236842    |
|Argentina      |Geracao X   |177560    |
|Argentina      |Geracao Z   |177513    |
|Argentina      |Millennials |177268    |
|Bolívia        |Baby Boomers|403099    |
|Bolívia        |Geracao X   |301942    |
|Bolívia        |Geracao Z   |303626    |
|Bolívia        |Millennials |302806    |
|Brasil         |Baby Boomers|475907    |
|Brasil         |Geracao X   |356743    |
|Brasil         |Geracao Z   |357429    |
|Brasil         |Millennials |357692    |
|Chile          |Baby Boomers|464482    |
|Chile          |Geracao X   |347904    |
|Chile          |Geracao Z   |346914    |
|Chile          |Millennials |347257    |
|Colômbia       |Baby Boomers|398563    |
|Colômbia       |Geracao X   |299765    |
|Colômbia       |Geracao Z   |298964    |
|Colômbia       |Millennials |298614    |
|Equador        |Baby Boomers|3121