### Exercício Apache Spark

### Etapa 1

In [231]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext
from pyspark.sql.functions import rand, element_at, array, lit, when, col

spark = SparkSession \
    .builder \
    .master("local[*]")\
    .appName("Exercício Intro") \
    .getOrCreate()


df_nomes = spark.read.csv(
    'D:/Gilberto/Gilberto Pb - Compass/Sprints/Sprint 08/Exercicios/exercicio_geracao_dados/nomes_aleatorios.txt')

df_nomes.show(5)

+----------------+
|             _c0|
+----------------+
|  Frances Bennet|
|   Jamie Russell|
|  Edward Kistler|
|   Sheila Maurer|
|Donald Golightly|
+----------------+
only showing top 5 rows



### Etapa 2

In [232]:
df_nomes = df_nomes.withColumnRenamed("_c0", "Nomes")

df_nomes.printSchema()

df_nomes.show(10)

root
 |-- Nomes: string (nullable = true)

+-----------------+
|            Nomes|
+-----------------+
|   Frances Bennet|
|    Jamie Russell|
|   Edward Kistler|
|    Sheila Maurer|
| Donald Golightly|
|       David Gray|
|      Joy Bennett|
|      Paul Kriese|
|Berniece Ornellas|
|    Brian Farrell|
+-----------------+
only showing top 10 rows



### Etapa 3

In [233]:
spark = SparkSession.builder.appName("AdicionarColunaAleatoria").getOrCreate()

colunas = ["nome"]

df_nomes = df_nomes.withColumn(
    "Escolaridade",
    when(rand() < 0.33, "Fundamental")
    .when(rand() < 0.66, "Médio")
    .otherwise("Superior")
)

df_nomes.show()

+-----------------+------------+
|            Nomes|Escolaridade|
+-----------------+------------+
|   Frances Bennet|       Médio|
|    Jamie Russell|    Superior|
|   Edward Kistler|       Médio|
|    Sheila Maurer|       Médio|
| Donald Golightly|       Médio|
|       David Gray|       Médio|
|      Joy Bennett|       Médio|
|      Paul Kriese|       Médio|
|Berniece Ornellas|       Médio|
|    Brian Farrell| Fundamental|
|   Kara Mcelwaine|       Médio|
|    Tracy Herring|    Superior|
|  Howard Lazarine| Fundamental|
|     Leroy Strahl|       Médio|
|     Ernest Hulet| Fundamental|
|     David Medina|       Médio|
|   Lorenzo Woodis| Fundamental|
|      Page Marthe|    Superior|
|   Herbert Morris| Fundamental|
|      Albert Leef|       Médio|
+-----------------+------------+
only showing top 20 rows



### Etapa 4

In [234]:

paises = [
    "Argentina", "Bolívia", "Brasil", "Chile", "Colômbia", "Equador",
    "Guiana", "Paraguai", "Peru", "Suriname", "Uruguai", "Venezuela", "Guiana Francesa"
]

df_nomes = df_nomes.withColumn(
    "País",
    element_at(array(*[lit(pais) for pais in paises]),
               (rand() * 13).cast("int") + 1)
)

df_nomes.show()

+-----------------+------------+---------------+
|            Nomes|Escolaridade|           País|
+-----------------+------------+---------------+
|   Frances Bennet|       Médio|Guiana Francesa|
|    Jamie Russell|    Superior|      Venezuela|
|   Edward Kistler|       Médio|       Suriname|
|    Sheila Maurer|       Médio|Guiana Francesa|
| Donald Golightly|       Médio|         Brasil|
|       David Gray|       Médio|         Guiana|
|      Joy Bennett|       Médio|          Chile|
|      Paul Kriese|       Médio|       Suriname|
|Berniece Ornellas|       Médio|          Chile|
|    Brian Farrell| Fundamental|      Argentina|
|   Kara Mcelwaine|       Médio|       Suriname|
|    Tracy Herring|    Superior|         Guiana|
|  Howard Lazarine| Fundamental|      Argentina|
|     Leroy Strahl|       Médio|         Brasil|
|     Ernest Hulet| Fundamental|        Bolívia|
|     David Medina|       Médio|          Chile|
|   Lorenzo Woodis| Fundamental|        Uruguai|
|      Page Marthe| 

### Etapa 5

In [235]:
df_nomes_completo = df_nomes.withColumn(
    "AnoNascimento",
    (rand() * (2010 - 1945) + 1945).cast("int")
)


df_nomes_completo.show()

+-----------------+------------+---------------+-------------+
|            Nomes|Escolaridade|           País|AnoNascimento|
+-----------------+------------+---------------+-------------+
|   Frances Bennet|       Médio|Guiana Francesa|         1945|
|    Jamie Russell|    Superior|      Venezuela|         1954|
|   Edward Kistler|       Médio|       Suriname|         1990|
|    Sheila Maurer|       Médio|Guiana Francesa|         1979|
| Donald Golightly|       Médio|         Brasil|         1960|
|       David Gray|       Médio|         Guiana|         1967|
|      Joy Bennett|       Médio|          Chile|         1950|
|      Paul Kriese|       Médio|       Suriname|         1962|
|Berniece Ornellas|       Médio|          Chile|         1992|
|    Brian Farrell| Fundamental|      Argentina|         1981|
|   Kara Mcelwaine|       Médio|       Suriname|         1958|
|    Tracy Herring|    Superior|         Guiana|         1960|
|  Howard Lazarine| Fundamental|      Argentina|       

### Etapa 6

In [236]:
# Considerei a partir de 2001 pois 2000 ainda é seculo XX
df_select = df_nomes_completo.filter(
    df_nomes_completo.AnoNascimento >= 2001).select("Nomes")


df_select.show(10)

+---------------+
|          Nomes|
+---------------+
|   Charles Hill|
|        Lois Ly|
|Jerry Chynoweth|
| Mary Dillahunt|
|  Ricky Gilbert|
|  Lynne Dustman|
|    Milton Rowe|
|      Ana Baker|
|Charles Randall|
|  Evelyn Shaver|
+---------------+
only showing top 10 rows



### Etapa 7

In [237]:
df_nomes_completo.createOrReplaceTempView("pessoas")


df_select = spark.sql("SELECT Nomes FROM pessoas WHERE AnoNascimento >= 2001")


df_select.show(10)

+---------------+
|          Nomes|
+---------------+
|   Charles Hill|
|        Lois Ly|
|Jerry Chynoweth|
| Mary Dillahunt|
|  Ricky Gilbert|
|  Lynne Dustman|
|    Milton Rowe|
|      Ana Baker|
|Charles Randall|
|  Evelyn Shaver|
+---------------+
only showing top 10 rows



### Etapa 8

In [238]:
df_millennials = df_nomes_completo.filter(
    (df_nomes_completo.AnoNascimento >= 1980) & (df_nomes_completo.AnoNascimento <= 1994))

num_millennials = df_millennials.count()

print(f"Número de pessoas da geração Millennials: {num_millennials}")

Número de pessoas da geração Millennials: 2306788


### Etapa 9

In [239]:
df_nomes_completo.createOrReplaceTempView("pessoas")

resultado = spark.sql("""
    SELECT COUNT(*) AS total_millennials
    FROM pessoas
    WHERE AnoNascimento BETWEEN 1980 AND 1994
""")

resultado.show()

+-----------------+
|total_millennials|
+-----------------+
|          2306788|
+-----------------+



### Etapa 10

In [240]:
novo_df = df_nomes_completo.withColumn(
    "Geração",
    when((col("AnoNascimento") >= 1944) & (
        col("AnoNascimento") <= 1964), "Baby Boomers")
    .when((col("AnoNascimento") >= 1965) & (col("AnoNascimento") <= 1979), "Geração X")
    .when((col("AnoNascimento") >= 1980) & (col("AnoNascimento") <= 1994), "Millennials")
    .when((col("AnoNascimento") >= 1995) & (col("AnoNascimento") <= 2015), "Geração Z")
)

novo_df = (
    novo_df.groupBy("País", "Geração")
    .count()
    .orderBy("País", "Geração", "count")
)

novo_df.show()

+---------+------------+------+
|     País|     Geração| count|
+---------+------------+------+
|Argentina|Baby Boomers|236919|
|Argentina|   Geração X|178123|
|Argentina|   Geração Z|177562|
|Argentina| Millennials|177079|
|  Bolívia|Baby Boomers|236305|
|  Bolívia|   Geração X|176859|
|  Bolívia|   Geração Z|177357|
|  Bolívia| Millennials|177078|
|   Brasil|Baby Boomers|236927|
|   Brasil|   Geração X|177522|
|   Brasil|   Geração Z|177018|
|   Brasil| Millennials|176988|
|    Chile|Baby Boomers|236765|
|    Chile|   Geração X|177540|
|    Chile|   Geração Z|177530|
|    Chile| Millennials|177957|
| Colômbia|Baby Boomers|236496|
| Colômbia|   Geração X|177004|
| Colômbia|   Geração Z|177653|
| Colômbia| Millennials|177811|
+---------+------------+------+
only showing top 20 rows

