# Section 4 | Exercícios Apache Spark
#### Nesta etapa, adicione código para ler o arquivo nomes_aleatorios.txt através do comando spark.read.csv. Carregue-o para dentro de um dataframe chamado df_nomes e, por fim, liste algumas linhas através do método show. Exemplo: df_nomes.show(5)

In [None]:
# Importando as bibliotecas necessárias
from pyspark.sql import SparkSession

# Definindo a Spark Session
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Exercicio Intro") \
    .getOrCreate()

# Lendo o arquivo 'nomes_aleatorios.txt' e carregando-o em um dataframe
df_nomes = spark.read.text('nomes_aleatorios.txt')

# Imprimindo as primeiras 5 linhas do dataframe
df_nomes.show(5)


# Section 4 | Exercícios Apache Spark
#### No Python, é possível acessar uma coluna de um objeto dataframe pelo atributo (por exemplo df_nomes.nome) ou por índice (df_nomes['nome']). Enquanto a primeira forma é conveniente para a exploração de dados interativos, você deve usar o formato de índice, pois caso algum nome de coluna não esteja de acordo seu código irá falhar.Como não informamos no momento da leitura do arquivo, o Spark não identificou o Schema por padrão e definiu todas as colunas como string. Para ver o Schema, use o método df_nomes.printSchema(). Nesta etapa, será necessário adicionar código para renomear a coluna para Nomes, imprimir o esquema e mostrar 10 linhas do dataframe.

In [None]:
# Importando as bibliotecas necessárias
from pyspark.sql import SparkSession

# Definindo a Spark Session
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Exercicio Intro") \
    .getOrCreate()

# Lendo o arquivo 'nomes_aleatorios.txt' e carregando-o em um dataframe
df_nomes = spark.read.text('nomes_aleatorios.txt')

# Renomeando a coluna
df_nomes = df_nomes.withColumnRenamed("value", "Nomes")

# Imprimindo o esquema
df_nomes.printSchema()

# Imprimindo as primeiras 10 linhas do dataframe
df_nomes.show(10)


# Section 4 | Exercícios Apache Spark
#### Ao dataframe (df_nomes), adicione nova coluna chamada Escolaridade e atribua para cada linha um dos três valores de forma aleatória: Fundamental, Medio ou Superior.Para esta etapa, evite usar funções de iteração, como por exemplo: for, while, entre outras. Dê preferência aos métodos oferecidos para próprio Spark.

In [None]:
# Importando as bibliotecas necessárias
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, when

# Definindo a Spark Session
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Exercicio Intro") \
    .getOrCreate()

# Lendo o arquivo 'nomes_aleatorios.txt' e carregando-o em um dataframe
df_nomes = spark.read.text('nomes_aleatorios.txt')

# Renomeando a coluna
df_nomes = df_nomes.withColumnRenamed("value", "Nomes")

# Adicionando a coluna "Escolaridade" com valores aleatórios "Fundamental", "Medio" ou "Superior"
df_nomes = df_nomes.withColumn("Escolaridade", 
                               when(rand() < 1/3, "Fundamental")
                               .when(rand() < 2/3, "Medio")
                               .otherwise("Superior"))

# Imprimindo as primeiras 10 linhas do dataframe
df_nomes.show(10)


# Section 4 | Exercícios Apache Spark
#### Ao dataframe (df_nomes), adicione nova coluna chamada Pais e atribua para cada linha o nome de um dos 13 países da América do Sul, de forma aleatória. Para esta etapa, evite usar funções de iteração, como por exemplo: for, while, entre outras. Dê preferência aos métodos oferecidos para próprio Spark.

In [None]:
# Importando as bibliotecas necessárias
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, when

# Definindo a Spark Session
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Exercicio Intro") \
    .getOrCreate()

# Lendo o arquivo 'nomes_aleatorios.txt' e carregando-o em um dataframe
df_nomes = spark.read.text('nomes_aleatorios.txt')

# Renomeando a coluna
df_nomes = df_nomes.withColumnRenamed("value", "Nomes")

# Adicionando a coluna "Escolaridade" com valores aleatórios "Fundamental", "Medio" ou "Superior"
df_nomes = df_nomes.withColumn("Escolaridade", 
                               when(rand() < 1/3, "Fundamental")
                               .when(rand() < 2/3, "Medio")
                               .otherwise("Superior"))

# Imprimindo as primeiras 10 linhas do dataframe
df_nomes.show(10)


# Section 4 | Exercícios Apache Spark
#### Ao dataframe (df_nomes), adicione nova coluna chamada AnoNascimento e atribua para cada linha um valor de ano entre 1945 e 2010, de forma aleatória. Para esta etapa, evite usar funções de iteração, como por exemplo: for, while, entre outras. Dê preferência aos métodos oferecidos para próprio Spark.

In [None]:
# Importando as bibliotecas necessárias
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, floor

# Definindo a Spark Session
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Exercicio Intro") \
    .getOrCreate()

# Lendo o arquivo 'nomes_aleatorios.txt' e carregando-o em um dataframe
df_nomes = spark.read.text('nomes_aleatorios.txt')

# Renomeando a coluna
df_nomes = df_nomes.withColumnRenamed("value", "Nomes")

# Adicionando a coluna "Escolaridade" com valores aleatórios "Fundamental", "Medio" ou "Superior"
df_nomes = df_nomes.withColumn("Escolaridade", 
                               when(rand() < 1/3, "Fundamental")
                               .when(rand() < 2/3, "Medio")
                               .otherwise("Superior"))

# Adicionando a coluna "AnoNascimento" com um ano aleatório entre 1945 e 2010
df_nomes = df_nomes.withColumn("AnoNascimento", floor(rand() * (2010 - 1945 + 1)) + 1945)

# Imprimindo as primeiras 10 linhas do dataframe
df_nomes.show(10)


# Section 4 | Exercícios Apache Spark
#### Usando o método select do dataframe (df_nomes), selecione as pessoas que nasceram neste século. Armazene o resultado em outro dataframe chamado df_select e mostre 10 nomes deste.

In [None]:
# Importando as bibliotecas necessárias
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, floor, when

# Definindo a Spark Session
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Exercicio Intro") \
    .getOrCreate()

# Lendo o arquivo 'nomes_aleatorios.txt' e carregando-o em um dataframe
df_nomes = spark.read.text('nomes_aleatorios.txt')

# Renomeando a coluna
df_nomes = df_nomes.withColumnRenamed("value", "Nomes")

# Adicionando a coluna "Escolaridade" com valores aleatórios "Fundamental", "Medio" ou "Superior"
df_nomes = df_nomes.withColumn("Escolaridade", 
                               when(rand() < 1/3, "Fundamental")
                               .when(rand() < 2/3, "Medio")
                               .otherwise("Superior"))

# Adicionando a coluna "AnoNascimento" com um ano aleatório entre 1945 e 2010
df_nomes = df_nomes.withColumn("AnoNascimento", floor(rand() * (2010 - 1945 + 1)) + 1945)

# Selecione as pessoas que nasceram neste século (AnoNascimento >= 2000)
df_select = df_nomes.filter(df_nomes["AnoNascimento"] >= 2000)

# Imprimindo as primeiras 10 linhas do dataframe
df_select.show(10)


# Section 4 | Exercícios Apache Spark
#### Usando Spark SQL repita o processo da Pergunta 6.

In [None]:
# Importando as bibliotecas necessárias
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, floor, when

# Definindo a Spark Session
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Exercicio Intro") \
    .getOrCreate()

# Lendo o arquivo 'nomes_aleatorios.txt' e carregando-o em um dataframe
df_nomes = spark.read.text('nomes_aleatorios.txt')

# Renomeando a coluna
df_nomes = df_nomes.withColumnRenamed("value", "Nomes")

# Adicionando a coluna "Escolaridade" com valores aleatórios "Fundamental", "Medio" ou "Superior"
df_nomes = df_nomes.withColumn("Escolaridade", 
                               when(rand() < 1/3, "Fundamental")
                               .when(rand() < 2/3, "Medio")
                               .otherwise("Superior"))

# Adicionando a coluna "AnoNascimento" com um ano aleatório entre 1945 e 2010
df_nomes = df_nomes.withColumn("AnoNascimento", floor(rand() * (2010 - 1945 + 1)) + 1945)

# Registre o DataFrame como uma tabela SQL temporária
df_nomes.createOrReplaceTempView("pessoas")

# Execute a consulta SQL para selecionar pessoas que nasceram neste século
df_select_sql = spark.sql("SELECT * FROM pessoas WHERE AnoNascimento >= 2000")

# Imprima as primeiras 10 linhas do resultado
df_select_sql.show(10)


# Section 4 | Exercícios Apache Spark
#### Usando o método select do Dataframe df_nomes, Conte o número de pessoas que são da geração Millennials (nascidos entre 1980 e 1994) no Dataset

In [None]:
# Importando as bibliotecas necessárias
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, floor, when

# Definindo a Spark Session
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Exercicio Intro") \
    .getOrCreate()

# Lendo o arquivo 'nomes_aleatorios.txt' e carregando-o em um dataframe
df_nomes = spark.read.text('nomes_aleatorios.txt')

# Renomeando a coluna
df_nomes = df_nomes.withColumnRenamed("value", "Nomes")

# Adicionando a coluna "Escolaridade" com valores aleatórios "Fundamental", "Medio" ou "Superior"
df_nomes = df_nomes.withColumn("Escolaridade", 
                               when(rand() < 1/3, "Fundamental")
                               .when(rand() < 2/3, "Medio")
                               .otherwise("Superior"))

# Adicionando a coluna "AnoNascimento" com um ano aleatório entre 1945 e 2010
df_nomes = df_nomes.withColumn("AnoNascimento", floor(rand() * (2010 - 1945 + 1)) + 1945)

# Filtre as linhas onde o "AnoNascimento" está entre 1980 e 1994
df_millennials = df_nomes.filter((df_nomes["AnoNascimento"] >= 1980) & (df_nomes["AnoNascimento"] <= 1994))

# Conte o número de linhas no dataframe resultante
millennials_count = df_millennials.count()

print(f"O número de pessoas da geração Millennials é: {millennials_count}")


# Section 4 | Exercícios Apache Spark
#### Repita o processo da Pergunta 8 utilizando Spark SQL

In [None]:
# Importando as bibliotecas necessárias
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, floor, when

# Definindo a Spark Session
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Exercicio Intro") \
    .getOrCreate()

# Lendo o arquivo 'nomes_aleatorios.txt' e carregando-o em um dataframe
df_nomes = spark.read.text('nomes_aleatorios.txt')

# Renomeando a coluna
df_nomes = df_nomes.withColumnRenamed("value", "Nomes")

# Adicionando a coluna "Escolaridade" com valores aleatórios "Fundamental", "Medio" ou "Superior"
df_nomes = df_nomes.withColumn("Escolaridade", 
                               when(rand() < 1/3, "Fundamental")
                               .when(rand() < 2/3, "Medio")
                               .otherwise("Superior"))

# Adicionando a coluna "AnoNascimento" com um ano aleatório entre 1945 e 2010
df_nomes = df_nomes.withColumn("AnoNascimento", floor(rand() * (2010 - 1945 + 1)) + 1945)

# Registre o DataFrame como uma tabela SQL temporária
df_nomes.createOrReplaceTempView("pessoas")

# Execute a consulta SQL para contar pessoas da geração Millennials
millennials_count_sql = spark.sql("SELECT COUNT(*) AS MillennialsCount FROM pessoas WHERE AnoNascimento BETWEEN 1980 AND 1994")

# Imprima o resultado
millennials_count_sql.show()


# Section 4 | Exercícios Apache Spark
#### Usando Spark SQL, obtenha a quantidade de pessoas de cada país para uma das gerações abaixo. Armazene o resultado em um novo dataframe e depois mostre todas as linhas em ordem crescente de Pais, Geração e Quantidade Baby Boomers – nascidos entre 1944 e 1964; Geração X – nascidos entre 1965 e 1979;4 Millennials (Geração Y) – nascidos entre 1980 e 1994; Geração Z – nascidos entre 1995 e 2015.

In [None]:
# Importando as bibliotecas necessárias
from pyspark.sql import SparkSession
from pyspark.sql.functions import rand, floor, when

# Definindo a Spark Session
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("Exercicio Intro") \
    .getOrCreate()

# Lendo o arquivo 'nomes_aleatorios.txt' e carregando-o em um dataframe
df_nomes = spark.read.text('nomes_aleatorios.txt')

# Renomeando a coluna
df_nomes = df_nomes.withColumnRenamed("value", "Nomes")

# Adicionando a coluna "Escolaridade" com valores aleatórios "Fundamental", "Medio" ou "Superior"
df_nomes = df_nomes.withColumn("Escolaridade", 
                               when(rand() < 1/3, "Fundamental")
                               .when(rand() < 2/3, "Medio")
                               .otherwise("Superior"))

# Adicionando a coluna "AnoNascimento" com um ano aleatório entre 1945 e 2010
df_nomes = df_nomes.withColumn("AnoNascimento", floor(rand() * (2010 - 1945 + 1)) + 1945)

# Registre o DataFrame como uma tabela SQL temporária
df_nomes.createOrReplaceTempView("pessoas")

# Execute a consulta SQL para contar pessoas de cada país por geração
generation_count_sql = spark.sql("""
    SELECT Pais, 
    CASE
        WHEN AnoNascimento BETWEEN 1944 AND 1964 THEN 'Baby Boomers'
        WHEN AnoNascimento BETWEEN 1965 AND 1979 THEN 'Geração X'
        WHEN AnoNascimento BETWEEN 1980 AND 1994 THEN 'Millennials'
        WHEN AnoNascimento BETWEEN 1995 AND 2010 THEN 'Geração Z'
    END as Geracao,
    COUNT(*) as Quantidade
    FROM pessoas
    GROUP BY Pais, Geracao
    ORDER BY Pais ASC, Geracao ASC, Quantidade ASC
""")

# Imprima o resultado
generation_count_sql.show()
