### Perguntas dessa tarefa:

1. Inicialmente iremos preparar o ambiente, definindo o diretório onde nosso código será desenvolvido. Para este diretório iremos copiar o arquivo nomes_aleatorios.txt.

   Após, em nosso script Python, devemos importar as bibliotecas necessárias:
   ```python
   from pyspark.sql import SparkSession
   from pyspark import SparkContext, SQLContext
   ```
   Aplicando as bibliotecas do Spark, podemos definir a Spark Session e sobre ela definir o Context para habilitar o módulo SQL
   ```python
   spark = SparkSession \
                   .builder \
                   .master("local[*]")\
                   .appName("Exercicio Intro") \
                   .getOrCreate()
   ```
   Nesta etapa, adicione código para ler o arquivo nomes_aleatorios.txt através do comando `spark.read.csv`. Carregue-o para dentro de um dataframe chamado `df_nomes` e, por fim, liste algumas linhas através do método `show`. Exemplo: `df_nomes.show(5)`


In [27]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
import random

# Inicializando a Spark Session
spark = SparkSession \
    .builder \
    .appName("Exercicio Intro") \
    .getOrCreate()

# 1. Lendo o arquivo nomes_aleatorios.txt
df_nomes = spark.read.csv("nomes_aleatorios.txt", header=False, inferSchema=True)
df_nomes.show(5)

[Stage 121:>                                                        (0 + 8) / 8]

+----------------+
|             _c0|
+----------------+
|  Frances Bennet|
|   Jamie Russell|
|  Edward Kistler|
|   Sheila Maurer|
|Donald Golightly|
+----------------+
only showing top 5 rows



                                                                                

2. No Python, é possível acessar uma coluna de um objeto dataframe pelo atributo (por exemplo `df_nomes.nome`) ou por índice (`df_nomes['nome']`). Enquanto a primeira forma é conveniente para a exploração de dados interativos, você deve usar o formato de índice, pois caso algum nome de coluna não esteja de acordo seu código irá falhar.

   Como não informamos no momento da leitura do arquivo, o Spark não identificou o Schema por padrão e definiu todas as colunas como string. Para ver o Schema, use o método `df_nomes.printSchema()`.

   Nesta etapa, será necessário adicionar código para renomear a coluna para Nomes, imprimir o esquema e mostrar 10 linhas do dataframe.

In [28]:
# 2. Renomeando a coluna para 'Nomes'
df_nomes = df_nomes.withColumnRenamed("_c0", "Nomes")
df_nomes.show(10)

+-----------------+
|            Nomes|
+-----------------+
|   Frances Bennet|
|    Jamie Russell|
|   Edward Kistler|
|    Sheila Maurer|
| Donald Golightly|
|       David Gray|
|      Joy Bennett|
|      Paul Kriese|
|Berniece Ornellas|
|    Brian Farrell|
+-----------------+
only showing top 10 rows



3. Ao dataframe (`df_nomes`), adicione nova coluna chamada `Escolaridade` e atribua para cada linha um dos três valores de forma aleatória: Fundamental, Médio ou Superior.
   Para esta etapa, evite usar funções de iteração, como por exemplo: `for`, `while`, entre outras. Dê preferência aos métodos oferecidos para próprio Spark.

In [41]:
#3. Atribuindo valores de escolaridade com base em um único número aleatório por registro
df_nomes = df_nomes.withColumn("Random", rand())

# Definindo os limites para cada valor de escolaridade
fundamental_threshold = 1/3
medio_threshold = 2/3

# Atribuindo valores de escolaridade com base nos limites
df_nomes = df_nomes.withColumn("Escolaridade",
                               when(df_nomes["Random"] <= fundamental_threshold, "Fundamental")
                               .when((df_nomes["Random"] > fundamental_threshold) & (df_nomes["Random"] <= medio_threshold), "Médio")
                               .otherwise("Superior"))

# Removendo a coluna "Random" do DataFrame
df_nomes = df_nomes.drop("Random")

# Contando o número de ocorrências de cada valor de escolaridade no DataFrame
escolaridade_counts = df_nomes.groupBy("Escolaridade").count().orderBy("Escolaridade")

# Calculando a proporção de ocorrências de cada valor de escolaridade
total_count = df_nomes.count()
escolaridade_proportion = escolaridade_counts.withColumn("Proporção", escolaridade_counts["count"] / total_count)

# Exibindo a proporção de ocorrências de cada valor de escolaridade
escolaridade_proportion.show()


+------------+-------+---------+
|Escolaridade|  count|Proporção|
+------------+-------+---------+
| Fundamental|3334349|0.3334349|
|       Médio|3329643|0.3329643|
|    Superior|3336008|0.3336008|
+------------+-------+---------+



4. Ao dataframe (`df_nomes`), adicione nova coluna chamada `Pais` e atribua para cada linha o nome de um dos 13 países da América do Sul, de forma aleatória.
   Para esta etapa, evite usar funções de iteração, como por exemplo: `for`, `while`, entre outras. Dê preferência aos métodos oferecidos para próprio Spark.

In [42]:
# 4. Definindo a lista de países da América do Sul 
paises_americadosul = ['Argentina', 'Bolívia', 'Brasil', 'Chile', 'Colômbia', 'Equador', 'Guiana', 'Guiana Francesa', 'Paraguai', 'Peru', 'Suriname', 'Uruguai', 'Venezuela']

# Função UDF para selecionar um país aleatório
def random_country():
    return random.choice(paises_americadosul)

# Registrando a função UDF
spark.udf.register("random_country", random_country, StringType())

# Adicionando a nova coluna 'Pais' com valores aleatórios
df_nomes = df_nomes.withColumn("Pais", expr("random_country()"))

# Contando o número de ocorrências de cada país no DataFrame
counts = df_nomes.groupBy("Pais").count().orderBy("Pais")

# Calculando a proporção de ocorrências de cada país
total_count = df_nomes.count()
proportion = counts.withColumn("Proporção", F.col("count") / total_count)

# Exibindo a proporção
proportion.show()

24/03/24 18:07:27 WARN SimpleFunctionRegistry: The function random_country replaced a previously registered function.

+---------------+------+---------+
|           Pais| count|Proporção|
+---------------+------+---------+
|      Argentina|769483|0.0769483|
|        Bolívia|770986|0.0770986|
|         Brasil|767923|0.0767923|
|          Chile|769906|0.0769906|
|       Colômbia|768277|0.0768277|
|        Equador|768299|0.0768299|
|         Guiana|768394|0.0768394|
|Guiana Francesa|768487|0.0768487|
|       Paraguai|769967|0.0769967|
|           Peru|770242|0.0770242|
|       Suriname|769183|0.0769183|
|        Uruguai|769942|0.0769942|
|      Venezuela|768911|0.0768911|
+---------------+------+---------+



                                                                                

5. Ao dataframe (`df_nomes`), adicione nova coluna chamada `AnoNascimento` e atribua para cada linha um valor de ano entre 1945 e 2010, de forma aleatória.
   Para esta etapa, evite usar funções de iteração, como por exemplo: `for`, `while`, entre outras. Dê preferência aos métodos oferecidos para próprio Spark.

In [43]:
# 5. Adicionando a coluna 'AnoNascimento' com valores aleatórios entre 1945 e 2010
df_nomes = df_nomes.withColumn("AnoNascimento", (lit(1945) + (rand(seed=42) * (2010 - 1945 + 1)).cast("int")))
df_nomes.show(10)

+-----------------+------------+---------+-------------+
|            Nomes|Escolaridade|     Pais|AnoNascimento|
+-----------------+------------+---------+-------------+
|   Frances Bennet| Fundamental| Paraguai|         1985|
|    Jamie Russell|    Superior|  Equador|         1978|
|   Edward Kistler|       Médio|Argentina|         1999|
|    Sheila Maurer| Fundamental|Venezuela|         1962|
| Donald Golightly|    Superior|  Bolívia|         1989|
|       David Gray|    Superior|  Uruguai|         1979|
|      Joy Bennett|       Médio|     Peru|         2010|
|      Paul Kriese|    Superior| Colômbia|         1949|
|Berniece Ornellas|       Médio| Paraguai|         2008|
|    Brian Farrell|       Médio|  Uruguai|         1997|
+-----------------+------------+---------+-------------+
only showing top 10 rows



6. Usando o método `select` do dataframe (`df_nomes`), selecione as pessoas que nasceram neste século. Armazene o resultado em outro dataframe chamado `df_select` e mostre 10 nomes deste.

In [44]:
# 6. Selecionando as pessoas que nasceram neste século
df_select = df_nomes.filter(df_nomes["AnoNascimento"] >= 2000)
print("Pessoas que nasceram neste século:")
df_select.select("Nomes", "AnoNascimento").show(10)

Pessoas que nasceram neste século:
+-----------------+-------------+
|            Nomes|AnoNascimento|
+-----------------+-------------+
|      Joy Bennett|         2010|
|Berniece Ornellas|         2008|
|      Albert Leef|         2000|
|     Rebecca Snow|         2003|
|  Kenneth Rayburn|         2001|
|    Milton Dillon|         2002|
|       Ned Tester|         2010|
|    Lynne Dustman|         2003|
|    George Miller|         2002|
| Cristina Sheston|         2006|
+-----------------+-------------+
only showing top 10 rows



7. Usando Spark SQL repita o processo da Pergunta 6. Lembre-se que, para trabalharmos com SparkSQL, precisamos registrar uma tabela temporária e depois executar o comando SQL. Abaixo um exemplo de como executar comandos SQL com SparkSQL:

   ```python
   df_nomes.createOrReplaceTempView("pessoas")
   spark.sql("select * from pessoas").show()
   ```

In [45]:
# 7. Usando Spark SQL para selecionar as pessoas que nasceram neste século
df_nomes.createOrReplaceTempView("pessoas")
df_select_sql = spark.sql("SELECT Nomes, AnoNascimento FROM pessoas WHERE AnoNascimento >= 2000")
print("Pessoas que nasceram neste século (usando Spark SQL):")
df_select_sql.show(10)

Pessoas que nasceram neste século (usando Spark SQL):
+-----------------+-------------+
|            Nomes|AnoNascimento|
+-----------------+-------------+
|      Joy Bennett|         2010|
|Berniece Ornellas|         2008|
|      Albert Leef|         2000|
|     Rebecca Snow|         2003|
|  Kenneth Rayburn|         2001|
|    Milton Dillon|         2002|
|       Ned Tester|         2010|
|    Lynne Dustman|         2003|
|    George Miller|         2002|
| Cristina Sheston|         2006|
+-----------------+-------------+
only showing top 10 rows



8. Usando o método `select` do Dataframe `df_nomes`, Conte o número de pessoas que são da geração Millennials (nascidos entre 1980 e 1994) no Dataset

In [46]:
# 8. Contando o número de pessoas que são da geração Millennials (nascidos entre 1980 e 1994)
count_millennials = df_nomes.filter((df_nomes["AnoNascimento"] >= 1980) & (df_nomes["AnoNascimento"] <= 1994)).count()
print("Número de pessoas da geração Millennials:", count_millennials)

Número de pessoas da geração Millennials: 2270069


9. Repita o processo da Pergunta 8 utilizando Spark SQL

In [47]:
# 9. Repetindo o processo da Pergunta 8 utilizando Spark SQL
count_millennials_sql = spark.sql("SELECT COUNT(*) FROM pessoas WHERE AnoNascimento >= 1980 AND AnoNascimento <= 1994").collect()[0][0]
print("Número de pessoas da geração Millennials (usando Spark SQL):", count_millennials_sql)

Número de pessoas da geração Millennials (usando Spark SQL): 2270069


10. Usando Spark SQL, obtenha a quantidade de pessoas de cada país para uma das gerações abaixo. Armazene o resultado em um novo dataframe e depois mostre todas as linhas em ordem crescente de Pais, Geração e Quantidade

   - Baby Boomers – nascidos entre 1944 e 1964;
   - Geração X – nascidos entre 1965 e 1979;
   - Millennials (Geração Y) – nascidos entre 1980 e 1994;
   - Geração Z – nascidos entre 1995 e 2015.

In [48]:
# Registrando o DataFrame como uma tabela temporária
df_nomes.createOrReplaceTempView("pessoas")

# Escrevendo uma consulta SQL para calcular a quantidade de pessoas de cada país para cada geração
consulta = """
SELECT Pais,
       CASE WHEN AnoNascimento BETWEEN 1944 AND 1964 THEN 'Baby Boomers'
            WHEN AnoNascimento BETWEEN 1965 AND 1979 THEN 'Geração X'
            WHEN AnoNascimento BETWEEN 1980 AND 1994 THEN 'Millennials'
            WHEN AnoNascimento BETWEEN 1995 AND 2015 THEN 'Geração Z'
            ELSE 'Outros' END AS Geracao,
       COUNT(*) AS Quantidade
FROM pessoas
GROUP BY Pais, Geracao
ORDER BY Pais, Geracao, Quantidade
"""

# Executando a consulta SQL e armazenando o resultado em um novo DataFrame
df_resultado = spark.sql(consulta)

# Exibindo todas as linhas em ordem crescente de Pais, Geração e Quantidade
df_resultado.show(df_resultado.count(), truncate=False)




+---------------+------------+----------+
|Pais           |Geracao     |Quantidade|
+---------------+------------+----------+
|Argentina      |Baby Boomers|233113    |
|Argentina      |Geração X   |175051    |
|Argentina      |Geração Z   |186373    |
|Argentina      |Millennials |174270    |
|Bolívia        |Baby Boomers|233588    |
|Bolívia        |Geração X   |174986    |
|Bolívia        |Geração Z   |186126    |
|Bolívia        |Millennials |174730    |
|Brasil         |Baby Boomers|234191    |
|Brasil         |Geração X   |174648    |
|Brasil         |Geração Z   |186374    |
|Brasil         |Millennials |174405    |
|Chile          |Baby Boomers|233165    |
|Chile          |Geração X   |174762    |
|Chile          |Geração Z   |186780    |
|Chile          |Millennials |175132    |
|Colômbia       |Baby Boomers|233257    |
|Colômbia       |Geração X   |175018    |
|Colômbia       |Geração Z   |186078    |
|Colômbia       |Millennials |174915    |
|Equador        |Baby Boomers|2331

                                                                                