# Instalando o Spark no colab

In [None]:

# Instalar o Java
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Baixar o Spark 3.3.1 (compatível com Python 3.10)
!wget -q https://archive.apache.org/dist/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz

# Extrair o arquivo do Spark
!tar xf spark-3.3.1-bin-hadoop3.tgz

# Mover o Spark para o diretório padrão
!mv spark-3.3.1-bin-hadoop3 /usr/local/spark


In [None]:
# Configurar as variáveis de ambiente
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/usr/local/spark"
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/usr/bin/python3"



# Instalar o findspark
!pip install -q findspark

In [None]:
import os

# Configurar as variáveis de ambiente
os.environ['SPARK_HOME'] = "/usr/local/spark"
os.environ['HADOOP_HOME'] = "/usr/local/spark"
os.environ['JAVA_HOME'] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ['PATH'] = os.environ['PATH'] + ":" + os.environ['SPARK_HOME'] + "/bin" + ":" + os.environ['HADOOP_HOME'] + "/bin"


In [None]:

import findspark
findspark.init()

from pyspark.sql import SparkSession

# Criar uma sessão Spark
spark = SparkSession.builder \
    .appName("Google Colab Spark") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .master("local[*]") \
    .getOrCreate()

# Verificar a versão do Spark
print(f"Versão do Spark: {spark.version}")



Versão do Spark: 3.3.1


# Código do exercício

## Etapa 1 - Importanto as bibliotecas e lendo o arquivo nomes_aleatorios.txt

In [None]:
# Bibliotecas
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext

# Iniciando sessão no Spark
spark = SparkSession \
  .builder \
  .master ("local[*]") \
  .appName("Exercicio intro") \
  .getOrCreate()

# Lendo o arquivo 'nomes_aleatorios.txt'
df_nomes = spark.read.text('nomes_aleatorios.txt')

# Printando as 5 primeiras linhas do df
df_nomes.show(5)

+----------------+
|           value|
+----------------+
|  Frances Bennet|
|   Jamie Russell|
|  Edward Kistler|
|   Sheila Maurer|
|Donald Golightly|
+----------------+
only showing top 5 rows



In [None]:
# Printando o Schema da DF
df_nomes.printSchema()

root
 |-- value: string (nullable = true)



## Etapa 2 - Adicionando a coluna **Nomes**

In [None]:
# Adicionando a coluna "Nomes"
df_nomes = df_nomes.withColumnRenamed("value", "Nomes")

df_nomes.show(10)

+-----------------+
|            Nomes|
+-----------------+
|   Frances Bennet|
|    Jamie Russell|
|   Edward Kistler|
|    Sheila Maurer|
| Donald Golightly|
|       David Gray|
|      Joy Bennett|
|      Paul Kriese|
|Berniece Ornellas|
|    Brian Farrell|
+-----------------+
only showing top 10 rows



## Etapa 3 - Adicionando a coluna **Escolaridade**

In [None]:
from pyspark.sql import functions as F

# Armazenando a escolaridade em uma lista
escolaridade = ['Fundamental', 'Medio', 'Superior']

# Criando uma nova coluna chamada "Escolaridade"
# Preenchendo a Nova coluna com os valores da lista de maneira aleatoria
df_nomes = df_nomes.withColumn(
    "Escolaridade",
   F.when(F.rand() < 1/3, escolaridade[0])
    .when(F.rand() < 2/3, escolaridade[1])
    .otherwise(escolaridade[2])
)

# Printando o novo df
df_nomes.show(10)

+-----------------+------------+
|            Nomes|Escolaridade|
+-----------------+------------+
|   Frances Bennet|       Medio|
|    Jamie Russell| Fundamental|
|   Edward Kistler| Fundamental|
|    Sheila Maurer|       Medio|
| Donald Golightly| Fundamental|
|       David Gray|       Medio|
|      Joy Bennett|    Superior|
|      Paul Kriese| Fundamental|
|Berniece Ornellas|       Medio|
|    Brian Farrell| Fundamental|
+-----------------+------------+
only showing top 10 rows



## Etapa 4 - Adicionando a coluna **Pais**

In [None]:
# Armazenando os paises da America do Sul em uma lista
paises = [
    'Argentina', 'Brasil', 'Chile', 'Colômbia', 'Equador',
    'Paraguai', 'Peru', 'Uruguai', 'Venezuela', 'Bolívia',
    'Guyana', 'Suriname', 'Guiana Francesa'
]

# Criando a coluna "Pais" e a preenchendo com valores aleatórios da lista
df_nomes = df_nomes.withColumn(
    "Pais",
    F.when(F.rand() < 1/13, paises[0])
    .when(F.rand() < 2/13, paises[1])
    .when(F.rand() < 3/13, paises[2])
    .when(F.rand() < 4/13, paises[3])
    .when(F.rand() < 5/13, paises[4])
    .when(F.rand() < 6/13, paises[5])
    .when(F.rand() < 7/13, paises[6])
    .when(F.rand() < 8/13, paises[7])
    .when(F.rand() < 9/13, paises[8])
    .when(F.rand() < 10/13, paises[9])
    .when(F.rand() < 11/13, paises[10])
    .when(F.rand() < 12/13, paises[11])
    .otherwise(paises[12])
)


# Mostrando o novo df
df_nomes.show(10)

+-----------------+------------+---------+
|            Nomes|Escolaridade|     Pais|
+-----------------+------------+---------+
|   Frances Bennet|       Medio|  Equador|
|    Jamie Russell| Fundamental|   Brasil|
|   Edward Kistler| Fundamental|   Brasil|
|    Sheila Maurer|       Medio|     Peru|
| Donald Golightly| Fundamental| Colômbia|
|       David Gray|       Medio|     Peru|
|      Joy Bennett|    Superior| Colômbia|
|      Paul Kriese| Fundamental|  Uruguai|
|Berniece Ornellas|       Medio|Argentina|
|    Brian Farrell| Fundamental| Colômbia|
+-----------------+------------+---------+
only showing top 10 rows



## Etapa 5 - Adicionando a coluna **AnoNascimento**

In [None]:
# Criando uma nova coluna chamada "AnoNascimento" e a preenchendo com anos aleatorios entre 1945 e 2010
df_nomes = df_nomes.withColumn(
    "AnoNascimento",
    (F.floor(F.rand() * (2010 - 1945 + 1)) + 1945)
)

# Printando o novo DF
df_nomes.show(10)


+-----------------+------------+---------+-------------+
|            Nomes|Escolaridade|     Pais|AnoNascimento|
+-----------------+------------+---------+-------------+
|   Frances Bennet|       Medio|  Equador|         2006|
|    Jamie Russell| Fundamental|   Brasil|         1964|
|   Edward Kistler| Fundamental|   Brasil|         1992|
|    Sheila Maurer|       Medio|     Peru|         2005|
| Donald Golightly| Fundamental| Colômbia|         1972|
|       David Gray|       Medio|     Peru|         1955|
|      Joy Bennett|    Superior| Colômbia|         2000|
|      Paul Kriese| Fundamental|  Uruguai|         1961|
|Berniece Ornellas|       Medio|Argentina|         1987|
|    Brian Farrell| Fundamental| Colômbia|         1993|
+-----------------+------------+---------+-------------+
only showing top 10 rows



## Etapa 6 - Mostrando pessoas que nasceram neste século

In [None]:
# Filtrando as pessoas que nasceram a partir do seculo 21
df_select = df_nomes.filter(F.col("AnoNascimento") >= 2001)

# Exibir os primeiros 10 nomes
df_select.select("Nomes", "AnoNascimento").show(10)

+---------------+-------------+
|          Nomes|AnoNascimento|
+---------------+-------------+
| Edward Kistler|         2005|
|     David Gray|         2005|
|    Joy Bennett|         2006|
|Howard Lazarine|         2002|
|   Roxie Bernal|         2004|
| Wilfredo Grant|         2005|
|     Anita Ross|         2008|
| Mary Dillahunt|         2008|
| Rosie Lovelady|         2002|
|    Donald Vogt|         2009|
+---------------+-------------+
only showing top 10 rows



## Etapa 7 - Refazendo a questão anterior com o Spark SQL

In [None]:
# Criando a tabela temporária
df_nomes.createOrReplaceTempView("pessoas")

# Executando a consulta para selecionar as pessoas nascidas no século 21
df_select_sql = spark.sql("""
    SELECT Nomes, AnoNascimento
    FROM pessoas
    WHERE AnoNascimento >= 2001
""")

# Mostrar os primeiros 10 nomes
df_select_sql.show(10)


+---------------+-------------+
|          Nomes|AnoNascimento|
+---------------+-------------+
| Edward Kistler|         2005|
|     David Gray|         2005|
|    Joy Bennett|         2006|
|Howard Lazarine|         2002|
|   Roxie Bernal|         2004|
| Wilfredo Grant|         2005|
|     Anita Ross|         2008|
| Mary Dillahunt|         2008|
| Rosie Lovelady|         2002|
|    Donald Vogt|         2009|
+---------------+-------------+
only showing top 10 rows



## Etapa 8 - Mostrando a quantidade de pesssoas da geração Millenium

In [None]:
# Filtrando pessoas que nasceram entre os anos de 1980 e 1994 e armazenando no dataframe "df_millennials"
df_millennials = df_nomes.filter((F.col("AnoNascimento") >= 1980) & (F.col("AnoNascimento") <= 1994)).count()

# Printando a quantidade de millennials
print(f"Número de Millennials: {df_millennials}")

Número de Millennials: 2272680


## Etapa 9 - Refazendo a questão anterior com o SQL

In [None]:
# Criando a tabela temporária
df_nomes.createOrReplaceTempView("pessoas")

# Realizando a consulta para saber quantas pessoas nasceram entre os anos de 1980 e 1945
millennials_num = spark.sql("""
    SELECT count(*) as num
    FROM pessoas
    WHERE AnoNascimento BETWEEN 1980 AND 1994
""").collect()[0]['num']

# Printando a quantidade de millenials
print(f"Número de millennials: {millennials_num}")


Número de millennials: 2272680


## Etapa 10 - Usando o Spark SQL para obter a quantidade de pessoas de cada paise e garação

In [None]:
# Criando uma tabela temporária
df_nomes.createOrReplaceTempView("pessoas")

# Realizando a consulta SQL para obter a quantidade de pessoas de cada país para cada geração
query = """
    SELECT
        Pais,
        CASE
            WHEN AnoNascimento BETWEEN 1944 AND 1964 THEN 'Baby Boomers'
            WHEN AnoNascimento BETWEEN 1965 AND 1979 THEN 'Geração X'
            WHEN AnoNascimento BETWEEN 1980 AND 1994 THEN 'Millennials (Geração Y)'
            WHEN AnoNascimento BETWEEN 1995 AND 2015 THEN 'Geração Z'
        END AS Geracao,
        COUNT(*) AS Quantidade
    FROM pessoas
    GROUP BY Pais, Geracao
    ORDER BY Pais, Geracao, Quantidade
"""

# Executar a consulta e armazenar o resultado em um DataFrame
df_resultado = spark.sql(query)

# Mostrar o resultado
df_resultado.show()

+---------+--------------------+----------+
|     Pais|             Geracao|Quantidade|
+---------+--------------------+----------+
|Argentina|        Baby Boomers|    233215|
|Argentina|           Geração X|    174430|
|Argentina|           Geração Z|    186562|
|Argentina|Millennials (Gera...|    175528|
|  Bolívia|        Baby Boomers|     17565|
|  Bolívia|           Geração X|     13193|
|  Bolívia|           Geração Z|     14045|
|  Bolívia|Millennials (Gera...|     13229|
|   Brasil|        Baby Boomers|    430202|
|   Brasil|           Geração X|    322832|
|   Brasil|           Geração Z|    344854|
|   Brasil|Millennials (Gera...|    323071|
|    Chile|        Baby Boomers|    546644|
|    Chile|           Geração X|    409845|
|    Chile|           Geração Z|    437575|
|    Chile|Millennials (Gera...|    409690|
| Colômbia|        Baby Boomers|    559452|
| Colômbia|           Geração X|    420365|
| Colômbia|           Geração Z|    448913|
| Colômbia|Millennials (Gera...|