In [11]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext
from pyspark.sql.functions import lit, rand, when
import random

In [12]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("Exercicio Intro") \
    .getOrCreate()

In [15]:
# Leitura do arquivo nomes_aleatorios.txt
df_nomes = spark.read.csv("nomes_aleatorios.txt", header=False, inferSchema=True)
df_nomes.show(5)

+----------------+
|             _c0|
+----------------+
|  Frances Bennet|
|   Jamie Russell|
|  Edward Kistler|
|   Sheila Maurer|
|Donald Golightly|
+----------------+
only showing top 5 rows



In [16]:
df_nomes = df_nomes.withColumnRenamed("_c0", "Nome")
df_nomes.printSchema()
df_nomes.show(10)

root
 |-- Nome: string (nullable = true)

+-----------------+
|             Nome|
+-----------------+
|   Frances Bennet|
|    Jamie Russell|
|   Edward Kistler|
|    Sheila Maurer|
| Donald Golightly|
|       David Gray|
|      Joy Bennett|
|      Paul Kriese|
|Berniece Ornellas|
|    Brian Farrell|
+-----------------+
only showing top 10 rows



In [17]:
# 3. Adicionar a coluna "Escolaridade" com valores aleatórios
escolaridade = ['Fundamental', 'Medio', 'Superior']
df_nomes = df_nomes.withColumn('Escolaridade', when(rand() < 0.33, lit('Fundamental'))
                                          .when(rand() < 0.66, lit('Medio'))
                                          .otherwise(lit('Superior')))
df_nomes.show(5)

+----------------+------------+
|            Nome|Escolaridade|
+----------------+------------+
|  Frances Bennet|       Medio|
|   Jamie Russell| Fundamental|
|  Edward Kistler| Fundamental|
|   Sheila Maurer|       Medio|
|Donald Golightly|       Medio|
+----------------+------------+
only showing top 5 rows



In [18]:
# 4. Adicionar a coluna "Pais" com valores aleatórios de países da América do Sul
paises_sul_america = ['Brasil', 'Argentina', 'Chile', 'Peru', 'Colombia', 'Venezuela', 'Equador', 'Bolivia', 'Paraguai', 'Uruguai', 'Guiana', 'Suriname', 'Franca Guiana']
df_nomes = df_nomes.withColumn('Pais', when(rand() < 0.0769, lit('Brasil'))
                                    .when(rand() < 0.1538, lit('Argentina'))
                                    .when(rand() < 0.2308, lit('Chile'))
                                    .when(rand() < 0.3077, lit('Peru'))
                                    .when(rand() < 0.3846, lit('Colombia'))
                                    .when(rand() < 0.4615, lit('Venezuela'))
                                    .when(rand() < 0.5385, lit('Equador'))
                                    .when(rand() < 0.6154, lit('Bolivia'))
                                    .when(rand() < 0.6923, lit('Paraguai'))
                                    .when(rand() < 0.7692, lit('Uruguai'))
                                    .when(rand() < 0.8461, lit('Guiana'))
                                    .when(rand() < 0.9230, lit('Suriname'))
                                    .otherwise(lit('Franca Guiana')))
df_nomes.show(5)

+----------------+------------+---------+
|            Nome|Escolaridade|     Pais|
+----------------+------------+---------+
|  Frances Bennet|       Medio|Argentina|
|   Jamie Russell| Fundamental|Argentina|
|  Edward Kistler| Fundamental|     Peru|
|   Sheila Maurer|       Medio|    Chile|
|Donald Golightly|       Medio|     Peru|
+----------------+------------+---------+
only showing top 5 rows



In [19]:
# 5. Adicionar a coluna "AnoNascimento" com valores aleatórios entre 1945 e 2010
df_nomes = df_nomes.withColumn('AnoNascimento', (rand() * 65 + 1945).cast('int'))
df_nomes.show(5)

+----------------+------------+---------+-------------+
|            Nome|Escolaridade|     Pais|AnoNascimento|
+----------------+------------+---------+-------------+
|  Frances Bennet|       Medio|Argentina|         1950|
|   Jamie Russell| Fundamental|Argentina|         1968|
|  Edward Kistler| Fundamental|     Peru|         1983|
|   Sheila Maurer|       Medio|    Chile|         1953|
|Donald Golightly|       Medio|     Peru|         1982|
+----------------+------------+---------+-------------+
only showing top 5 rows



In [20]:
# 6. Filtrando pessoas que nasceram neste século (2001 - 2100)
df_select = df_nomes.filter(df_nomes.AnoNascimento >= 2001)
df_select.show(10)

+---------------+------------+---------+-------------+
|           Nome|Escolaridade|     Pais|AnoNascimento|
+---------------+------------+---------+-------------+
|   David Medina|    Superior|     Peru|         2007|
| Lorenzo Woodis|    Superior|     Peru|         2007|
|   Charles Hill| Fundamental|  Bolivia|         2004|
|       Mary Lee|       Medio|  Equador|         2005|
|     Daryl Page| Fundamental|Venezuela|         2007|
| Wilfredo Grant|    Superior|     Peru|         2001|
| Katrina Graham|       Medio|   Brasil|         2003|
|Jerry Chynoweth| Fundamental| Colombia|         2005|
| Mary Dillahunt| Fundamental|     Peru|         2002|
|   James Barton|    Superior|Venezuela|         2005|
+---------------+------------+---------+-------------+
only showing top 10 rows



In [21]:
# 7. Usando Spark SQL
df_nomes.createOrReplaceTempView("pessoas")
df_select_sql = spark.sql("SELECT * FROM pessoas WHERE AnoNascimento >= 2001")
df_select_sql.show(10)

+---------------+------------+---------+-------------+
|           Nome|Escolaridade|     Pais|AnoNascimento|
+---------------+------------+---------+-------------+
|   David Medina|    Superior|     Peru|         2007|
| Lorenzo Woodis|    Superior|     Peru|         2007|
|   Charles Hill| Fundamental|  Bolivia|         2004|
|       Mary Lee|       Medio|  Equador|         2005|
|     Daryl Page| Fundamental|Venezuela|         2007|
| Wilfredo Grant|    Superior|     Peru|         2001|
| Katrina Graham|       Medio|   Brasil|         2003|
|Jerry Chynoweth| Fundamental| Colombia|         2005|
| Mary Dillahunt| Fundamental|     Peru|         2002|
|   James Barton|    Superior|Venezuela|         2005|
+---------------+------------+---------+-------------+
only showing top 10 rows



In [22]:
# 8. Contando pessoas que são da geração Millenials (nascidos entre 1980 e 1994)
df_millenials = df_nomes.filter(df_nomes.AnoNascimento.between(1980, 1994))
print(f"Total de Millenials: {df_millenials.count()}")

Total de Millenials: 2306200


In [23]:
# 9. Usando Spark SQL para contar Millenials
df_millenials_sql = spark.sql("SELECT COUNT(*) FROM pessoas WHERE AnoNascimento BETWEEN 1980 AND 1994")
df_millenials_sql.show()

+--------+
|count(1)|
+--------+
| 2306200|
+--------+



In [24]:
# 10. Quantidade de pessoas por país para cada geração
generations_sql = """
    SELECT 
        Pais,
        CASE 
            WHEN AnoNascimento BETWEEN 1944 AND 1964 THEN 'Baby Boomers'
            WHEN AnoNascimento BETWEEN 1965 AND 1979 THEN 'Geracao X'
            WHEN AnoNascimento BETWEEN 1980 AND 1994 THEN 'Millenials'
            WHEN AnoNascimento BETWEEN 1995 AND 2015 THEN 'Geracao Z'
        END AS Geracao,
        COUNT(*) AS Quantidade
    FROM pessoas
    GROUP BY Pais, Geracao
    ORDER BY Pais, Geracao, Quantidade
"""
df_generations_sql = spark.sql(generations_sql)
df_generations_sql.show()

+---------+------------+----------+
|     Pais|     Geracao|Quantidade|
+---------+------------+----------+
|Argentina|Baby Boomers|    437201|
|Argentina|   Geracao X|    327557|
|Argentina|   Geracao Z|    328567|
|Argentina|  Millenials|    326818|
|  Bolivia|Baby Boomers|    119598|
|  Bolivia|   Geracao X|     90235|
|  Bolivia|   Geracao Z|     90493|
|  Bolivia|  Millenials|     90696|
|   Brasil|Baby Boomers|    236257|
|   Brasil|   Geracao X|    177967|
|   Brasil|   Geracao Z|    177010|
|   Brasil|  Millenials|    177236|
|    Chile|Baby Boomers|    554553|
|    Chile|   Geracao X|    416199|
|    Chile|   Geracao Z|    416573|
|    Chile|  Millenials|    415817|
| Colombia|Baby Boomers|    492922|
| Colombia|   Geracao X|    369425|
| Colombia|   Geracao Z|    368624|
| Colombia|  Millenials|    369660|
+---------+------------+----------+
only showing top 20 rows

