In [None]:
!pip install pyspark


Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285397 sha256=b8358fd21142fe6dc85fc8ea57e5b065611383ec11886831d58fcd9785a36c46
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, expr, when
from pyspark.sql.types import IntegerType

In [None]:
spark = SparkSession.builder.master("local[*]").appName("Exercicio Intro").getOrCreate()


In [None]:
df_nomes = spark.read.csv("nomes_aleatorios.txt", header=False, inferSchema=True, sep="\t")
df_nomes = df_nomes.withColumnRenamed("_c0", "Nomes")


In [None]:
df_nomes.printSchema()
df_nomes.show(10, truncate=False)

root
 |-- Nomes: string (nullable = true)

+-----------------+
|Nomes            |
+-----------------+
|Frances Bennet   |
|Jamie Russell    |
|Edward Kistler   |
|Sheila Maurer    |
|Donald Golightly |
|David Gray       |
|Joy Bennett      |
|Paul Kriese      |
|Berniece Ornellas|
|Brian Farrell    |
+-----------------+
only showing top 10 rows



In [None]:
df_nomes = df_nomes.withColumn("Escolaridade", expr("CASE WHEN rand() < 0.33 THEN 'Fundamental' WHEN rand() < 0.66 THEN 'Medio' ELSE 'Superior' END"))


In [None]:
from pyspark.sql.functions import expr

paises = ["Brasil", "Argentina", "Colombia", "Peru", "Chile", "Venezuela", "Uruguai", "Paraguai", "Equador", "Bolivia", "Suriname", "Guiana", "Guiana Francesa"]

paises_str = ', '.join(f'"{pais}"' for pais in paises)

expr_str = f"element_at(arrays_zip(array({paises_str}), array({paises_str})), round(cast(rand()*((size(array({paises_str}))-1)) + 1 as int)))"

df_nomes = df_nomes.withColumn("Pais", expr(expr_str))



In [None]:
df_nomes = df_nomes.withColumn("AnoNascimento", expr("round(rand()*(2010-1945) + 1945)").cast(IntegerType()))

In [None]:
df_select = df_nomes.filter(df_nomes["AnoNascimento"] >= 2000)
df_select.show(10)

+---------------+------------+--------------------+-------------+
|          Nomes|Escolaridade|                Pais|AnoNascimento|
+---------------+------------+--------------------+-------------+
| Herbert Morris| Fundamental|      {Chile, Chile}|         2007|
|Helen Blackwell|       Medio|      {Chile, Chile}|         2005|
| Amanda Gravitt|       Medio|{Suriname, Suriname}|         2000|
|       Mary Lee|       Medio|{Colombia, Colombia}|         2002|
| Wilfredo Grant|       Medio|  {Bolivia, Bolivia}|         2004|
| Katrina Graham|       Medio|{Colombia, Colombia}|         2002|
|Kenneth Rayburn|    Superior|      {Chile, Chile}|         2005|
|     Anita Ross|       Medio|  {Uruguai, Uruguai}|         2000|
|    Sandra Todd|       Medio|  {Equador, Equador}|         2000|
|  Ricky Gilbert|       Medio|  {Equador, Equador}|         2000|
+---------------+------------+--------------------+-------------+
only showing top 10 rows



In [None]:
df_nomes.createOrReplaceTempView("pessoas")

In [None]:
millennials_count_df = spark.sql("SELECT COUNT(*) AS count FROM pessoas WHERE AnoNascimento BETWEEN 1980 AND 1994")
millennials_count = millennials_count_df.collect()[0]["count"]
print("Número de pessoas da geração Millennials:", millennials_count)

Número de pessoas da geração Millennials: 63741


In [None]:
generation_query = """
SELECT Pais,
       SUM(CASE WHEN AnoNascimento BETWEEN 1945 AND 1964 THEN 1 ELSE 0 END) AS BabyBoomers,
       SUM(CASE WHEN AnoNascimento BETWEEN 1965 AND 1979 THEN 1 ELSE 0 END) AS GenerationX,
       SUM(CASE WHEN AnoNascimento BETWEEN 1980 AND 1994 THEN 1 ELSE 0 END) AS Millennials,
       SUM(CASE WHEN AnoNascimento BETWEEN 1995 AND 2015 THEN 1 ELSE 0 END) AS GenerationZ
FROM pessoas
GROUP BY Pais
"""

result_df = spark.sql(generation_query)
result_df.show()

result_df = result_df.orderBy("Pais", "BabyBoomers", "GenerationX", "Millennials", "GenerationZ")
result_df.show()

+--------------------+-----------+-----------+-----------+-----------+
|                Pais|BabyBoomers|GenerationX|Millennials|GenerationZ|
+--------------------+-----------+-----------+-----------+-----------+
|    {Brasil, Brasil}|       6949|       5184|       5394|       5374|
|{Argentina, Argen...|       7124|       5383|       5316|       5460|
|{Paraguai, Paraguai}|       7003|       5344|       5288|       5534|
|{Colombia, Colombia}|       7001|       5329|       5259|       5497|
|  {Equador, Equador}|       7005|       5318|       5362|       5496|
|    {Guiana, Guiana}|       6936|       5136|       5374|       5445|
|{Suriname, Suriname}|       6917|       5417|       5358|       5482|
|        {Peru, Peru}|       6919|       5472|       5280|       5614|
|  {Bolivia, Bolivia}|       6886|       5249|       5313|       5432|
|      {Chile, Chile}|       7111|       5367|       5381|       5436|
|  {Uruguai, Uruguai}|       7042|       5291|       5227|       5568|
|{Vene