In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


# Perguntas dessa tarefa

## 1.

- Inicialmente iremos preparar o ambiente, definindo o diretório onde nosso código será desenvolvido. Para este diretório iremos copiar o arquivo nomes_aleatorios.txt.

- Após, em nosso script Python, devemos importar as bibliotecas necessárias:

from pyspark.sql import SparkSession

from pyspark import SparkContext, SQLContext

- Aplicando as bibliotecas do Spark, podemos definir a Spark Session e sobre ela definir o Context para habilitar o módulo SQL

spark = SparkSession \

                .builder \

                .master("local[*]")\

                .appName("Exercicio Intro") \

                .getOrCreate()

- Nesta etapa, adicione código para ler o arquivo nomes_aleatorios.txt através do comando spark.read.csv. Carregue-o para dentro de um dataframe chamado df_nomes e, por fim, liste algumas linhas através do método show. Exemplo: df_nomes.show(5)

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SQLContext

spark = SparkSession \
    .builder \
    .master("local[*]")\
    .appName("Exercicio Intro") \
    .getOrCreate()

df_nomes = spark.read.csv('nomes_aleatorios.txt')

df_nomes.show(5)

+----------------+
|             _c0|
+----------------+
|  Frances Bennet|
|   Jamie Russell|
|  Edward Kistler|
|   Sheila Maurer|
|Donald Golightly|
+----------------+
only showing top 5 rows



## 2. 
 - No Python, é possível acessar uma coluna de um objeto dataframe pelo atributo (por exemplo df_nomes.nome) ou por índice (df_nomes['nome']). Enquanto a primeira forma é conveniente para a exploração de dados interativos, você deve usar o formato de índice, pois caso algum nome de coluna não esteja de acordo seu código irá falhar.

 - Como não informamos no momento da leitura do arquivo, o Spark não identificou o Schema por padrão e definiu todas as colunas como string. Para ver o Schema, use o método df_nomes.printSchema().



 - Nesta etapa, será necessário adicionar código para renomear a coluna para Nomes, imprimir o esquema e mostrar 10 linhas do dataframe.

In [3]:
df_nomes.printSchema()

nomesschema = "nome STRING"

df_nomes = df_nomes.withColumnRenamed("_c0", "Nomes")

df_nomes.printSchema()

df_nomes.show(10)

root
 |-- _c0: string (nullable = true)

root
 |-- Nomes: string (nullable = true)

+-----------------+
|            Nomes|
+-----------------+
|   Frances Bennet|
|    Jamie Russell|
|   Edward Kistler|
|    Sheila Maurer|
| Donald Golightly|
|       David Gray|
|      Joy Bennett|
|      Paul Kriese|
|Berniece Ornellas|
|    Brian Farrell|
+-----------------+
only showing top 10 rows



## 3.
- Ao dataframe (df_nomes), adicione nova coluna chamada Escolaridade e atribua para cada linha um dos três valores de forma aleatória: Fundamental, Medio ou Superior.

- Para esta etapa, evite usar funções de iteração, como por exemplo: for, while, entre outras. Dê preferência aos métodos oferecidos para próprio Spark.

In [4]:
from pyspark.sql.functions import when, rand

df_nomes = df_nomes.withColumn("Escolaridade", when(rand() < 0.33, "Fundamental")
                                        .when(rand() < 0.67, "Medio")
                                        .otherwise("Superior"))


## 4.

- Ao dataframe (df_nomes), adicione nova coluna chamada Pais e atribua para cada linha o nome de um dos 13 países da América do Sul, de forma aleatória.

- Para esta etapa, evite usar funções de iteração, como por exemplo: for, while, entre outras. Dê preferência aos métodos oferecidos para próprio Spark.

In [5]:
paises = [(1,'Brasil'), (2,'Argentina'), (3,'Paraguai'), (4,'Uruguai'), (5,'Bolivia'), (6,'Chile'), (7,'Equador'), (8,'Guiana Francesa'), (9,'Peru'), (10,'Colombia'), (11,'Suriname'), (12,'Venezuela'), (13,'Guiana')] 
paises_schema = "id INT, Pais STRING"
paises_df = spark.createDataFrame(paises, paises_schema)

df_nomes = df_nomes.withColumn("id_pais", (rand()*13+1).cast("int"))

df_nomes = df_nomes.join(paises_df, df_nomes.id_pais == paises_df.id, "inner")

df_nomes = df_nomes.drop("id_pais", "id")

## 5.
 - Ao dataframe (df_nomes), adicione nova coluna chamada AnoNascimento e atribua para cada linha um valor de ano entre 1945 e 2010, de forma aleatória. 
 - Para esta etapa, evite usar funções de iteração, como por exemplo: for, while, entre outras. Dê preferência aos métodos oferecidos para próprio Spark.

In [6]:
df_nomes = df_nomes.withColumn("AnoNascimento", (rand() * (2010 - 1945) + 1945).cast("int"))

## 6. 
- Usando o método select do dataframe (df_nomes), selecione as pessoas que nasceram neste século. Armazene o resultado em outro dataframe chamado df_select e mostre 10 nomes deste.

In [7]:
from pyspark.sql import functions as Func

df_select = df_nomes.select("*").where(Func.col('AnoNascimento') >= 2000)

df_select.show(10)

+-----------------+------------+------+-------------+
|            Nomes|Escolaridade|  Pais|AnoNascimento|
+-----------------+------------+------+-------------+
|    Milton Dillon| Fundamental|Brasil|         2003|
|        Leo Moore|       Medio|Brasil|         2006|
|    Joseph Fenton|    Superior|Brasil|         2005|
|    Stewart Hinds|       Medio|Brasil|         2007|
| Margaret Bowling|       Medio|Brasil|         2009|
|Francis Conaughty| Fundamental|Brasil|         2004|
|   Patricia Mixon|       Medio|Brasil|         2001|
|Peggy Quintanilla|       Medio|Brasil|         2001|
|     Doris Hanson|       Medio|Brasil|         2002|
|   Linda Weishaar| Fundamental|Brasil|         2002|
+-----------------+------------+------+-------------+
only showing top 10 rows



## 7. 
- Usando Spark SQL repita o processo da Pergunta 6. Lembre-se que, para trabalharmos com SparkSQL, precisamos registrar uma tabela temporária e depois executar o comando SQL. Abaixo um exemplo de como executar comandos SQL com SparkSQL:

 df_nomes.createOrReplaceTempView ("pessoas")

 spark.sql("select * from pessoas").show()

In [8]:
df_nomes.createOrReplaceTempView("pessoas")

seculo_XX_sql = """
SELECT *
FROM pessoas
WHERE AnoNascimento >= 2000
LIMIT 10
"""
spark.sql(seculo_XX_sql).show()

+-----------------+------------+------+-------------+
|            Nomes|Escolaridade|  Pais|AnoNascimento|
+-----------------+------------+------+-------------+
|    Milton Dillon| Fundamental|Brasil|         2003|
|        Leo Moore|       Medio|Brasil|         2006|
|    Joseph Fenton|    Superior|Brasil|         2005|
|    Stewart Hinds|       Medio|Brasil|         2007|
| Margaret Bowling|       Medio|Brasil|         2009|
|Francis Conaughty| Fundamental|Brasil|         2004|
|   Patricia Mixon|       Medio|Brasil|         2001|
|Peggy Quintanilla|       Medio|Brasil|         2001|
|     Doris Hanson|       Medio|Brasil|         2002|
|   Linda Weishaar| Fundamental|Brasil|         2002|
+-----------------+------------+------+-------------+



## 8.
- Usando o método select do Dataframe df_nomes, Conte o número de pessoas que são da geração Millennials (nascidos entre 1980 e 1994) no Dataset

In [13]:
df_nomes.select(Func.when((Func.col('AnoNascimento') >= 1980) & (Func.col('AnoNascimento') <= 1994), 'Millennials').otherwise('Outras geracoes').alias('geracao')).groupBy('geracao').count().filter(Func.col('geracao') == 'Millennials').show()


+-----------+-------+
|    geracao|  count|
+-----------+-------+
|Millennials|2307976|
+-----------+-------+



## 9.
- Repita o processo da Pergunta 8 utilizando Spark SQL

In [10]:
millennials_sql = """
SELECT 
    CASE 
        WHEN AnoNascimento BETWEEN 1980 AND 1994 THEN "Millennials"
        ELSE "Outras gerações"
    END AS geracao, 
    COUNT(*)
FROM pessoas
GROUP BY geracao
HAVING geracao = "Millennials"
"""

spark.sql(millennials_sql).show()


+-----------+--------+
|    geracao|count(1)|
+-----------+--------+
|Millennials| 2307976|
+-----------+--------+



## 10. 
 - Usando Spark SQL, obtenha a quantidade de pessoas de cada país para uma das gerações abaixo. Armazene o resultado em um novo dataframe e depois mostre todas as linhas em ordem crescente de Pais, Geração e Quantidade

    - Baby Boomers – nascidos entre 1944 e 1964;

    - Geração X – nascidos entre 1965 e 1979;

    - Millennials (Geração Y) – nascidos entre 1980 e 1994;

    - Geração Z – nascidos entre 1995 e 2015.

In [11]:
from pyspark.sql.functions import col

consulta_sql = """
SELECT Pais,
    CASE 
        WHEN (AnoNascimento BETWEEN 1944 AND 1964) THEN 'Baby Boomers'
        WHEN (AnoNascimento BETWEEN 1965 AND 1979) THEN 'Geração X'
        WHEN (AnoNascimento BETWEEN 1980 AND 1994) THEN 'Geração Y'
        WHEN (AnoNascimento BETWEEN 1995 AND 2015) THEN 'Geração Z'
        ELSE 'Outras gerações'
    END AS Geracao,
    COUNT(*) AS Quantidade
FROM pessoas
GROUP BY Pais, Geracao
"""
novo_df = spark.sql(consulta_sql)
novo_df.orderBy(col("Pais"), col("Geracao"), col("Quantidade")).show(1000)


+---------------+------------+----------+
|           Pais|     Geracao|Quantidade|
+---------------+------------+----------+
|      Argentina|Baby Boomers|    236567|
|      Argentina|   Geração X|    178229|
|      Argentina|   Geração Y|    178498|
|      Argentina|   Geração Z|    177422|
|        Bolivia|Baby Boomers|    236337|
|        Bolivia|   Geração X|    177821|
|        Bolivia|   Geração Y|    176823|
|        Bolivia|   Geração Z|    177842|
|         Brasil|Baby Boomers|    237850|
|         Brasil|   Geração X|    177511|
|         Brasil|   Geração Y|    177482|
|         Brasil|   Geração Z|    177327|
|          Chile|Baby Boomers|    236254|
|          Chile|   Geração X|    177512|
|          Chile|   Geração Y|    177548|
|          Chile|   Geração Z|    176886|
|       Colombia|Baby Boomers|    235868|
|       Colombia|   Geração X|    177659|
|       Colombia|   Geração Y|    177086|
|       Colombia|   Geração Z|    177130|
|        Equador|Baby Boomers|    