# Funcionalidades básicas do PySpark no Google Colab

Bem-vindo ao nosso notebook interativo sobre as funcionalidades básicas do PySpark! Neste notebook, exploraremos as principais funcionalidades oferecidas pelo PySpark, uma poderosa biblioteca para processamento de Big Data usando Apache Spark, tudo isso em um ambiente familiar e acessível como o Google Colab.

Vamos aprender como carregar e manipular conjuntos de dados, realizar operações comuns de transformação e análise de dados, aplicar consultas SQL diretamente aos nossos DataFrames e muito mais.

Sem mais delongas, vamos mergulhar nas funcionalidades básicas do PySpark e descobrir como podemos aproveitar ao máximo essa incrível ferramenta para lidar com grandes volumes de dados!

## Preparando o ambiente para o uso do spark

Realizando uma instalação silenciosa do OpenJDK 8 JDK Headless, que é uma implementação do Java Development Kit (JDK) sem a necessidade de uma interface gráfica. O redirecionamento `> /dev/null` é usado para suprimir a saída do terminal durante a instalação.

In [None]:
# Instalando o Java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Realizando o download e a descompactação dos arquivos do Apache Spark.

- `!wget https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz`: Baixa o arquivo compactado `.tgz` do Spark 3.5.1 com suporte ao Hadoop 3.
- `!tar xvf spark-3.5.1-bin-hadoop3.tgz`: Descompacta o arquivo baixado, extraindo os arquivos para o diretório atual.

In [None]:
# Fazendo download
!wget https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz

# Descompactando os arquivos
!tar xvf spark-3.5.1-bin-hadoop3.tgz

--2024-04-06 14:15:39--  https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
Resolving dlcdn.apache.org (dlcdn.apache.org)... 151.101.2.132, 2a04:4e42::644
Connecting to dlcdn.apache.org (dlcdn.apache.org)|151.101.2.132|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 400446614 (382M) [application/x-gzip]
Saving to: ‘spark-3.5.1-bin-hadoop3.tgz’


2024-04-06 14:15:45 (59.3 MB/s) - ‘spark-3.5.1-bin-hadoop3.tgz’ saved [400446614/400446614]

spark-3.5.1-bin-hadoop3/
spark-3.5.1-bin-hadoop3/sbin/
spark-3.5.1-bin-hadoop3/sbin/spark-config.sh
spark-3.5.1-bin-hadoop3/sbin/stop-slave.sh
spark-3.5.1-bin-hadoop3/sbin/stop-mesos-dispatcher.sh
spark-3.5.1-bin-hadoop3/sbin/start-workers.sh
spark-3.5.1-bin-hadoop3/sbin/start-slaves.sh
spark-3.5.1-bin-hadoop3/sbin/start-all.sh
spark-3.5.1-bin-hadoop3/sbin/stop-all.sh
spark-3.5.1-bin-hadoop3/sbin/workers.sh
spark-3.5.1-bin-hadoop3/sbin/start-mesos-dispatcher.sh
spark-3.5.1-bin-hadoop3/sbin/spark-daemon.sh
sp

In [None]:
# instalando a findspark
!pip install -q findspark

Configurando as variáveis de ambiente necessárias para o Java e o Spark:

- A biblioteca `os` é importada para permitir a manipulação do ambiente operacional.
- `os.environ["JAVA_HOME"]` é definido como o caminho para a instalação do Java no sistema. Isso é necessário para que o Spark possa ser executado, pois o Spark é construído sobre o Java.
- `os.environ["SPARK_HOME"]` é definido como o caminho para a instalação do Spark no sistema. Essa variável de ambiente informa ao Python onde encontrar a instalação do Spark.

In [None]:
# Importando a biblioteca os
import os

# Definindo a variável de ambiente do Java
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

# Definindo a variável de ambiente do Spark
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

Importando e inicializando a biblioteca `findspark`:

- `findspark` é uma biblioteca Python que facilita a localização do Spark dentro do ambiente Python. Ela adiciona automaticamente o Spark ao `sys.path` do Python, permitindo que você o importe como uma biblioteca Python regular.
- `findspark.init()` é usado para inicializar o `findspark` e configurar o ambiente Python para usar o Spark. Ele encontra automaticamente a instalação do Spark no sistema e configura as variáveis de ambiente necessárias para usar o Spark a partir do Python.

In [None]:
# Importando a findspark
import findspark

# Iniciando o findspark
findspark.init()

### Importando o Framework PySpark
Importando as classes e funções necessárias do módulo `pyspark.sql` para trabalhar com o Spark DataFrame:

- `SparkSession` é importado da subbiblioteca `session` do módulo `pyspark.sql`. Ele é usado para criar a sessão Spark.
- `functions` é importado do módulo `pyspark.sql.functions`. Ele fornece várias funções de manipulação de dados que podem ser aplicadas a colunas em um DataFrame.
- `types` é importado do módulo `pyspark.sql.types`. Ele contém os tipos de dados que podem ser usados para definir o esquema de um DataFrame.

Esses imports são comuns ao iniciar um ambiente de desenvolvimento PySpark para análise e processamento de dados.

In [None]:
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

### Criando  Sessão Spark
Criaremos uma sessão Spark utilizando a classe `SparkSession`. A função `builder` é utilizada para configurar a sessão, onde definimos o nome da aplicação com o método `appName('Avaliação imóveis - SP')`. Por fim, invocamos `getOrCreate()` para obter a sessão Spark existente ou criar uma nova, se necessário. O objeto resultante é atribuído à variável `spark`, que será usado para interagir com o ambiente Spark.

In [None]:
spark = SparkSession.builder.appName('Avaliação imóveis - SP').getOrCreate()

### Lendo arquivo CSV
Iremos agora ler um arquivo CSV chamado 'Preços Imóveis - Sao paulo.csv' e carregar seus dados em um DataFrame Spark. Para isso, utilizamos o método `spark.read.csv()`, onde especificamos o nome do arquivo, o separador de colunas (nesse caso, o ponto e vírgula ';') e indicamos que a primeira linha contém o cabeçalho (parâmetro `header=True`). Os dados são então carregados no DataFrame `df`.

In [None]:
df = spark.read.csv(
    'Preços Imóveis - Sao paulo.csv',
    sep=';',
    header=True
)


**`.show()`** exibe o DataFrame, mostrando as primeiras linhas na saída:

In [None]:
# Exibindo o DataFrame
df.show()

+--------------------+------+-------------------+---------+-----+-------+---------+-----+---------+-------+
|                 Rua|Número|             Bairro|   Cidade| Área|Quartos|Banheiros|Vagas|    Preço|Aluguel|
+--------------------+------+-------------------+---------+-----+-------+---------+-----+---------+-------+
|     Avenida Itacira|   255|  Planalto Paulista|São Paulo|480.0|      4|        8|    6|3196250.0|  False|
|Rua Aurelia Perez...|    42| Jardim dos Estados|São Paulo|496.5|      4|        4|    4|3700000.0|  False|
|      Jardim Morumbi|     0|     Jardim Morumbi|São Paulo|310.0|      3|        2|    4| 685000.0|  False|
|  Rua Tobias Barreto|   195|              Mooca|São Paulo|100.0|      3|        2|    2| 540000.0|  False|
|    Rua Graham Bell |     0|        Santo Amaro|São Paulo|440.0|      4|        4|    6|1980000.0|  False|
| Rua Francisco Paulo|    31|  Cidade Mãe do Céu|São Paulo|145.0|      4|        4|    2| 850000.0|  False|
|         Rua Tapaji |     0

### Visualizando a estrutura do DataFrame

In [None]:
# Exibindo o esquema do DataFrame
df.printSchema()

root
 |-- Rua: string (nullable = true)
 |-- Número: string (nullable = true)
 |-- Bairro: string (nullable = true)
 |-- Cidade: string (nullable = true)
 |-- Área: string (nullable = true)
 |-- Quartos: string (nullable = true)
 |-- Banheiros: string (nullable = true)
 |-- Vagas: string (nullable = true)
 |-- Preço: string (nullable = true)
 |-- Aluguel: string (nullable = true)



## Operações de Transformação

### Renomeando colunas
Renomeando as colunas 'Preço' e 'Área' para 'Preco' e 'Area', respectivamente, no DataFrame `df`. Isso é feito utilizando o método `withColumnRenamed()`, onde especificamos o nome original da coluna e o novo nome desejado como argumentos. Após a renomeação, os dados são exibidos utilizando o método `show()`:

In [None]:
df = df.withColumnRenamed('Preço', 'Preco').withColumnRenamed('Área', 'Area')
df.show()

+--------------------+------+-------------------+---------+-----+-------+---------+-----+---------+-------+
|                 Rua|Número|             Bairro|   Cidade| Area|Quartos|Banheiros|Vagas|    Preco|Aluguel|
+--------------------+------+-------------------+---------+-----+-------+---------+-----+---------+-------+
|     Avenida Itacira|   255|  Planalto Paulista|São Paulo|480.0|      4|        8|    6|3196250.0|  False|
|Rua Aurelia Perez...|    42| Jardim dos Estados|São Paulo|496.5|      4|        4|    4|3700000.0|  False|
|      Jardim Morumbi|     0|     Jardim Morumbi|São Paulo|310.0|      3|        2|    4| 685000.0|  False|
|  Rua Tobias Barreto|   195|              Mooca|São Paulo|100.0|      3|        2|    2| 540000.0|  False|
|    Rua Graham Bell |     0|        Santo Amaro|São Paulo|440.0|      4|        4|    6|1980000.0|  False|
| Rua Francisco Paulo|    31|  Cidade Mãe do Céu|São Paulo|145.0|      4|        4|    2| 850000.0|  False|
|         Rua Tapaji |     0

### Removendo colunas
Removendo a coluna 'Número' do DataFrame, resultando em um novo DataFrame onde essa coluna não está mais presente. Isso é feito utilizando o método `drop()` passando o nome da coluna que se deseja eliminar como argumento:

In [None]:
# Removendo
df = df.drop('Número')
df.show()

+--------------------+-------------------+---------+-----+-------+---------+-----+---------+-------+
|                 Rua|             Bairro|   Cidade| Area|Quartos|Banheiros|Vagas|    Preco|Aluguel|
+--------------------+-------------------+---------+-----+-------+---------+-----+---------+-------+
|     Avenida Itacira|  Planalto Paulista|São Paulo|480.0|      4|        8|    6|3196250.0|  False|
|Rua Aurelia Perez...| Jardim dos Estados|São Paulo|496.5|      4|        4|    4|3700000.0|  False|
|      Jardim Morumbi|     Jardim Morumbi|São Paulo|310.0|      3|        2|    4| 685000.0|  False|
|  Rua Tobias Barreto|              Mooca|São Paulo|100.0|      3|        2|    2| 540000.0|  False|
|    Rua Graham Bell |        Santo Amaro|São Paulo|440.0|      4|        4|    6|1980000.0|  False|
| Rua Francisco Paulo|  Cidade Mãe do Céu|São Paulo|145.0|      4|        4|    2| 850000.0|  False|
|         Rua Tapaji |        Vila Alpina|São Paulo|114.0|      3|        3|    2| 585000.0

### Selecionar colunas específicas

#### Apenas a coluna
O método **`df.select('Preco')`** Seleciona apenas a coluna 'Preco' do DataFrame `df`. Isso mantém somente essa coluna para exibição subsequente:

In [None]:
df.select('Preco').show()

+---------+
|    Preco|
+---------+
|3196250.0|
|3700000.0|
| 685000.0|
| 540000.0|
|1980000.0|
| 850000.0|
| 585000.0|
| 700000.0|
|2500000.0|
|1700000.0|
|1150000.0|
|2700000.0|
|4190000.0|
| 860000.0|
| 490000.0|
|1485000.0|
| 550000.0|
|1100000.0|
|2985000.0|
|1700000.0|
+---------+
only showing top 20 rows



#### Aplicando uma condição na seleção da coluna
Acrescendo agora o método **`.where(col('Preco') > 5000000)`** serão Filtrados os dados selecionados anteriormente, mantendo apenas os registros onde o valor da coluna 'Preco' é maior que 5000000 (cinco milhões). Isso seleciona os dados dos imóveis com preço superior a cinco milhões:

In [None]:
df.select('Preco').where(col('Preco') > 5000000).show()

+---------+
|    Preco|
+---------+
|5500000.0|
|6500000.0|
|6200000.0|
|5500000.0|
|6360000.0|
|5100000.0|
|6500000.0|
|5600000.0|
|5200000.0|
|5600000.0|
|5995500.0|
|5500000.0|
|5050000.0|
|5950000.0|
|5320000.0|
|5500000.0|
|6500000.0|
|6500000.0|
|6300000.0|
|6000000.0|
+---------+
only showing top 20 rows



### Filtrar linhas com base em condições

O método **`df.filter(col('Preço') > 5000000)`** filtra o DataFrame `df` para manter apenas os registros onde o valor da coluna 'Preço' é maior que 5000000 (cinco milhões). Isso seleciona apenas os dados dos imóveis com preço superior a cinco milhões:

In [None]:
df.filter(col('Preço')> 5000000).show()

+--------------------+-------------------+---------+-----+-------+---------+-----+---------+-------+
|                 Rua|             Bairro|   Cidade| Area|Quartos|Banheiros|Vagas|    Preco|Aluguel|
+--------------------+-------------------+---------+-----+-------+---------+-----+---------+-------+
|      Jardim Guedala|     Jardim Guedala|São Paulo|650.0|      4|        6|    4|5500000.0|  False|
|     Rua Cordisburgo|      Jardim Leonor|São Paulo|850.0|      4|        8|    6|6500000.0|  False|
|   Rua Prudentópolis|            Boaçava|São Paulo|480.0|      4|        3|    6|6200000.0|  False|
|       Rua Aquiramun|  Alto de Pinheiros|São Paulo|450.0|      5|        6|    5|5500000.0|  False|
|Rua Doutor Antôni...|    Jardim Cordeiro|São Paulo|480.0|      4|        7|    8|6360000.0|  False|
|         Rua Teviot |Vila Nova Conceição|São Paulo|363.0|      4|        6|    4|5100000.0|  False|
|        Rua Banibás |  Alto de Pinheiros|São Paulo|750.0|      5|        5|    8|6500000.0

### Filtrando e Ordenando o df

Nesta parte do codigo iremos:

1. **`df.filter(col('Bairro') == 'Jardim Paulista')`**: Este trecho filtra o DataFrame `df` para manter apenas os registros onde o valor da coluna 'Bairro' é igual a 'Jardim Paulista'. Isso significa que estamos selecionando apenas os dados dos imóveis localizados no bairro 'Jardim Paulista'.

2. **`df.orderBy(col('Preco').desc())`**: Após o filtro, o DataFrame resultante é ordenado com base no preço em ordem descendente. A função `orderBy()` classifica os registros com base no valor da coluna 'Preco', usando a opção `desc()` para indicar que a ordenação deve ser feita de forma descendente, ou seja, do maior para o menor preço.

3. **`.show()`**: Por fim, o método `show()` é chamado para exibir o DataFrame resultante na saída padrão. Isso mostrará os dados dos imóveis do bairro 'Jardim Paulista', ordenados pelo preço em ordem decrescente, facilitando a análise e a visualização dessas informações.

In [None]:
# Analisando os quartos, de forma descendente(maior para o menor), do bairro Jardim Paulista
df.filter(col('Bairro') == 'Jardim Paulista').orderBy(col('Preco').desc()).show()

+--------------------+---------------+---------+-----+-------+---------+-----+---------+-------+
|                 Rua|         Bairro|   Cidade| Area|Quartos|Banheiros|Vagas|    Preco|Aluguel|
+--------------------+---------------+---------+-----+-------+---------+-----+---------+-------+
|     Jardim Paulista|Jardim Paulista|São Paulo|380.0|      3|        3|    5|6500000.0|  False|
|     Jardim Paulista|Jardim Paulista|São Paulo|380.0|      3|        3|    5|6500000.0|  False|
|Rua Doutor João P...|Jardim Paulista|São Paulo|457.0|      5|        6|    3|6500000.0|  False|
|  Rua José Clemente |Jardim Paulista|São Paulo|400.0|      2|        6|    6|6400000.0|  False|
|Rua Colatino Marques|Jardim Paulista|São Paulo|548.0|      4|        6|    6|6200000.0|  False|
|     Jardim Paulista|Jardim Paulista|São Paulo|400.0|      3|        3|    9|6000000.0|  False|
|     Jardim Paulista|Jardim Paulista|São Paulo|400.0|      3|        3|    9|6000000.0|  False|
|     Jardim Paulista|Jardim P

Exemplo usando a linguagem SQL no Spark:

1. **`df.createOrReplaceTempView("df")`**: Registra o DataFrame como uma visualização temporária com o nome "df". Isso permite executar consultas SQL usando o DataFrame.

2. **`spark.sql(""" ... """)`**: Executa uma consulta SQL no ambiente Spark. A consulta seleciona todas as colunas do DataFrame onde o bairro é "Jardim Paulista", ordenando os resultados pelo preço em ordem decrescente.

3. **`.show()`**: Exibe o resultado da consulta na saída padrão. Isso mostrará as primeiras linhas do DataFrame resultante da consulta.

In [None]:
# Registrando o DataFrame como uma visualização temporária
df.createOrReplaceTempView("df")

# Executando a consulta SQL
spark.sql("""
    SELECT *
    FROM df
    WHERE Bairro = "Jardim Paulista"
    ORDER BY Preco DESC
""").show()

+--------------------+---------------+---------+-----+-------+---------+-----+---------+-------+
|                 Rua|         Bairro|   Cidade| Area|Quartos|Banheiros|Vagas|    Preco|Aluguel|
+--------------------+---------------+---------+-----+-------+---------+-----+---------+-------+
|     Jardim Paulista|Jardim Paulista|São Paulo|380.0|      3|        3|    5|6500000.0|  False|
|     Jardim Paulista|Jardim Paulista|São Paulo|380.0|      3|        3|    5|6500000.0|  False|
|Rua Doutor João P...|Jardim Paulista|São Paulo|457.0|      5|        6|    3|6500000.0|  False|
|  Rua José Clemente |Jardim Paulista|São Paulo|400.0|      2|        6|    6|6400000.0|  False|
|Rua Colatino Marques|Jardim Paulista|São Paulo|548.0|      4|        6|    6|6200000.0|  False|
|     Jardim Paulista|Jardim Paulista|São Paulo|400.0|      3|        3|    9|6000000.0|  False|
|     Jardim Paulista|Jardim Paulista|São Paulo|400.0|      3|        3|    9|6000000.0|  False|
|     Jardim Paulista|Jardim P

### Realizando a conversão de tipos das colunas no DataFrame

Realizando a conversão de tipos das colunas no DataFrame `df`, utilizando o método `withColumn()` para cada coluna que precisa ser convertida, especificando a nova coluna e o tipo desejado usando a função `cast()`. Cada chamada de `withColumn()` adiciona ou substitui uma coluna no DataFrame com o tipo de dados convertido.

Ao final, o DataFrame resultante terá as colunas 'Area', 'Quartos', 'Banheiros', e 'Preco' convertidas para o tipo desejado (float ou int), e a coluna 'Aluguel' convertida para o tipo booleano.

In [None]:
# Realizando a conversão de tipos das colunas no DataFrame
df = df.withColumn(
    'Area', col('Area').cast('float')
).withColumn(
    'Quartos', col('Quartos').cast('int')
).withColumn(
    'Banheiros', col('Banheiros').cast('int')
).withColumn(
    'Vagas', col('Vagas').cast('int')
).withColumn(
    'Preco', col('Preco').cast('float')
).withColumn(
    'Aluguel', col('Aluguel').cast('boolean')
)

In [None]:
# Exibindo o esquema do DataFrame, ápos as adequaçãoes nas colunas
df.printSchema()

root
 |-- Rua: string (nullable = true)
 |-- Bairro: string (nullable = true)
 |-- Cidade: string (nullable = true)
 |-- Area: float (nullable = true)
 |-- Quartos: integer (nullable = true)
 |-- Banheiros: integer (nullable = true)
 |-- Vagas: integer (nullable = true)
 |-- Preco: float (nullable = true)
 |-- Aluguel: boolean (nullable = true)



In [None]:
# Exibindo o DataFrame
df.show()

+--------------------+-------------------+---------+-----+-------+---------+-----+---------+-------+
|                 Rua|             Bairro|   Cidade| Area|Quartos|Banheiros|Vagas|    Preco|Aluguel|
+--------------------+-------------------+---------+-----+-------+---------+-----+---------+-------+
|     Avenida Itacira|  Planalto Paulista|São Paulo|480.0|      4|        8|    6|3196250.0|  false|
|Rua Aurelia Perez...| Jardim dos Estados|São Paulo|496.5|      4|        4|    4|3700000.0|  false|
|      Jardim Morumbi|     Jardim Morumbi|São Paulo|310.0|      3|        2|    4| 685000.0|  false|
|  Rua Tobias Barreto|              Mooca|São Paulo|100.0|      3|        2|    2| 540000.0|  false|
|    Rua Graham Bell |        Santo Amaro|São Paulo|440.0|      4|        4|    6|1980000.0|  false|
| Rua Francisco Paulo|  Cidade Mãe do Céu|São Paulo|145.0|      4|        4|    2| 850000.0|  false|
|         Rua Tapaji |        Vila Alpina|São Paulo|114.0|      3|        3|    2| 585000.0

### Adicionar colunas derivadas

Adicionando uma nova coluna chamada 'Valor m2' ao DataFrame `df`, que é derivada da divisão da coluna 'Preco' pela coluna 'Area'. O resultado é arredondado para duas casas decimais usando a função `round()`. O método `show()` é então utilizado para exibir o DataFrame resultante, mostrando a coluna recém-adicionada 'Valor m2' juntamente com as outras colunas existentes:

In [None]:
df.withColumn('Valor m2',  round(col('Preco') / col('Area'), 2)).show()

+--------------------+-------------------+---------+-----+-------+---------+-----+---------+-------+--------+
|                 Rua|             Bairro|   Cidade| Area|Quartos|Banheiros|Vagas|    Preco|Aluguel|Valor m2|
+--------------------+-------------------+---------+-----+-------+---------+-----+---------+-------+--------+
|     Avenida Itacira|  Planalto Paulista|São Paulo|480.0|      4|        8|    6|3196250.0|  false| 6658.85|
|Rua Aurelia Perez...| Jardim dos Estados|São Paulo|496.5|      4|        4|    4|3700000.0|  false| 7452.17|
|      Jardim Morumbi|     Jardim Morumbi|São Paulo|310.0|      3|        2|    4| 685000.0|  false| 2209.68|
|  Rua Tobias Barreto|              Mooca|São Paulo|100.0|      3|        2|    2| 540000.0|  false|  5400.0|
|    Rua Graham Bell |        Santo Amaro|São Paulo|440.0|      4|        4|    6|1980000.0|  false|  4500.0|
| Rua Francisco Paulo|  Cidade Mãe do Céu|São Paulo|145.0|      4|        4|    2| 850000.0|  false| 5862.07|
|         

## Operações de Agregação

### Agrupar dados
Com o código `df.groupBy('Bairro').count().show()` realizaremos as seguintes operações:

1. **`groupBy('Bairro')`**: Agrupa os dados do DataFrame `df` pela coluna 'Bairro', ou seja, cria grupos onde os valores da coluna 'Bairro' são iguais.

2. **`count()`**: Conta o número de registros em cada grupo criado pela operação de agrupamento. Isso resulta em um novo DataFrame com duas colunas: 'Bairro' e 'count', onde 'count' representa a contagem de registros em cada grupo.

3. **`show()`**: Exibe o conteúdo do DataFrame resultante na saída padrão, mostrando os resultados da contagem de registros para cada valor único na coluna 'Bairro'.

In [None]:
# Agrupando dados, verificando a frequência
df.groupBy('Bairro').count().show()

+--------------------+-----+
|              Bairro|count|
+--------------------+-----+
|Vila Califórnia(Z...|    3|
|      Vila Antonieta|    6|
|      Vila Guilherme|    5|
|           Água Rasa|    6|
|     Jardim Cordeiro|   23|
|        Vila Olímpia|   58|
|       Jardim Hípico|    7|
|     Vila Alexandria|    5|
|    Vila São Geraldo|    5|
|       Vila Paulista|    8|
|         Vila Romano|    7|
|Jardim das Vertentes|    9|
|         Rio Pequeno|   12|
|         Vila Santos|    8|
|Vila Guarani (Z Sul)|   10|
|     Parque da Mooca|    5|
|         Vila Guedes|    4|
|         Jardim Peri|   11|
|  Jardim Bonfiglioli|   11|
|             Butantã|   45|
+--------------------+-----+
only showing top 20 rows



### Realizando agregações personalizadas
Agora com agregações personalizadas permitindo definir parâmetros para o agrupamento, realiza o seguinte:

1. **`groupBy('Bairro')`**: Agrupa os dados do DataFrame `df` pela coluna 'Bairro'.

2. **`agg(round(mean('Preco'), 2).alias('Media Preco'))`**: Aplica uma agregação aos dados agrupados. Dentro desta agregação:
   - `mean('Preco')`: Calcula a média dos valores da coluna 'Preco' para cada grupo de bairro.
   - `round(...)`: Arredonda o resultado da média para duas casas decimais.
   - `alias('Media Preco')`: Renomeia a coluna resultante da média para 'Media Preco'.

3. **`show()`**: Exibe o conteúdo do DataFrame resultante na saída padrão, mostrando a média de preço para cada bairro.

In [None]:
# Calcula a média do preço por bairro
media_bairro = df.groupBy('Bairro').agg(round(mean('Preco'), 2).alias('Media Preco'))
media_bairro.show()

+--------------------+-----------+
|              Bairro|Media Preco|
+--------------------+-----------+
|Vila Califórnia(Z...|   850000.0|
|      Vila Antonieta|  620002.17|
|      Vila Guilherme|   472000.0|
|           Água Rasa|  716166.67|
|     Jardim Cordeiro| 4195326.09|
|        Vila Olímpia| 2332844.83|
|       Jardim Hípico| 1598571.43|
|     Vila Alexandria|   636000.0|
|    Vila São Geraldo|   674400.0|
|       Vila Paulista|   767037.5|
|         Vila Romano|  782857.14|
|Jardim das Vertentes|  770555.56|
|         Rio Pequeno|  534583.33|
|         Vila Santos|   510250.0|
|Vila Guarani (Z Sul)|   668700.0|
|     Parque da Mooca|  1296000.0|
|         Vila Guedes|   579250.0|
|         Jardim Peri|  1720000.0|
|  Jardim Bonfiglioli|   900000.0|
|             Butantã| 2394541.53|
+--------------------+-----------+
only showing top 20 rows



## Junção de DataFrames

### Join
O método `join` é utilizado para realizar junções de forma semelhante ao SQL, onde você especifica as colunas de junção e o tipo de junção (por exemplo, inner, outer, left, right).

Exemplo:

In [None]:
# Exibindo DataFrame antes da aplicação do método "join"
df.show(5)

+--------------------+------------------+---------+-----+-------+---------+-----+---------+-------+
|                 Rua|            Bairro|   Cidade| Area|Quartos|Banheiros|Vagas|    Preco|Aluguel|
+--------------------+------------------+---------+-----+-------+---------+-----+---------+-------+
|     Avenida Itacira| Planalto Paulista|São Paulo|480.0|      4|        8|    6|3196250.0|  false|
|Rua Aurelia Perez...|Jardim dos Estados|São Paulo|496.5|      4|        4|    4|3700000.0|  false|
|      Jardim Morumbi|    Jardim Morumbi|São Paulo|310.0|      3|        2|    4| 685000.0|  false|
|  Rua Tobias Barreto|             Mooca|São Paulo|100.0|      3|        2|    2| 540000.0|  false|
|    Rua Graham Bell |       Santo Amaro|São Paulo|440.0|      4|        4|    6|1980000.0|  false|
+--------------------+------------------+---------+-----+-------+---------+-----+---------+-------+
only showing top 5 rows



In [None]:
# Aplicando o método "join"
df_juncao = df.join(media_bairro, 'Bairro', how='inner')
df_juncao.show()

+-------------------+--------------------+---------+-----+-------+---------+-----+---------+-------+-----------+
|             Bairro|                 Rua|   Cidade| Area|Quartos|Banheiros|Vagas|    Preco|Aluguel|Media Preco|
+-------------------+--------------------+---------+-----+-------+---------+-----+---------+-------+-----------+
|  Planalto Paulista|     Avenida Itacira|São Paulo|480.0|      4|        8|    6|3196250.0|  false| 1943378.61|
| Jardim dos Estados|Rua Aurelia Perez...|São Paulo|496.5|      4|        4|    4|3700000.0|  false| 3851392.11|
|     Jardim Morumbi|      Jardim Morumbi|São Paulo|310.0|      3|        2|    4| 685000.0|  false| 2571230.95|
|              Mooca|  Rua Tobias Barreto|São Paulo|100.0|      3|        2|    2| 540000.0|  false|  885613.64|
|        Santo Amaro|    Rua Graham Bell |São Paulo|440.0|      4|        4|    6|1980000.0|  false| 2137403.65|
|  Cidade Mãe do Céu| Rua Francisco Paulo|São Paulo|145.0|      4|        4|    2| 850000.0|  fa

## Funções de UDF (User Defined Functions)
As Funções de UDF (User Defined Functions) permitem aos usuários definir suas próprias funções personalizadas para executar operações específicas nos dados de um DataFrame Spark. Essas funções podem ser aplicadas a cada linha ou a uma coluna inteira do DataFrame, oferecendo flexibilidade e capacidade de processamento personalizado.

No exemplo a seguir, vamos definir a função avaliacao que compara o preço de cada item com a média e atribuí-la a uma UDF (User Defined Function) registrada. Em seguida, aplicaremos essa UDF ao DataFrame para criar uma nova coluna que indica se o preço de cada item está acima, abaixo ou igual à média:

In [None]:
def avaliacao(preco_col, media_col):
    preco = preco_col
    media = media_col
    if preco == media:
        return 'No valor'
    elif preco > media:
        return 'Acima da média'
    else:
        return 'Abaixo da média'

In [None]:
# Registrando a UDF
udf_avaliacao = udf(avaliacao, StringType())

In [None]:
# Aplicando a UDF ao DataFrame
df_juncao = df_juncao.withColumn('avaliacao', udf_avaliacao(col('Preco'), col('Media Preco')))
df_juncao.show()

+-------------------+--------------------+---------+-----+-------+---------+-----+---------+-------+-----------+---------------+
|             Bairro|                 Rua|   Cidade| Area|Quartos|Banheiros|Vagas|    Preco|Aluguel|Media Preco|      avaliacao|
+-------------------+--------------------+---------+-----+-------+---------+-----+---------+-------+-----------+---------------+
|  Planalto Paulista|     Avenida Itacira|São Paulo|480.0|      4|        8|    6|3196250.0|  false| 1943378.61| Acima da média|
| Jardim dos Estados|Rua Aurelia Perez...|São Paulo|496.5|      4|        4|    4|3700000.0|  false| 3851392.11|Abaixo da média|
|     Jardim Morumbi|      Jardim Morumbi|São Paulo|310.0|      3|        2|    4| 685000.0|  false| 2571230.95|Abaixo da média|
|              Mooca|  Rua Tobias Barreto|São Paulo|100.0|      3|        2|    2| 540000.0|  false|  885613.64|Abaixo da média|
|        Santo Amaro|    Rua Graham Bell |São Paulo|440.0|      4|        4|    6|1980000.0|  fal

##  Particionamento
O particionamento é uma técnica utilizada para dividir os dados em conjuntos menores com base nos valores de uma ou mais colunas específicas.

Isso permite que os dados sejam armazenados de forma mais eficiente e distribuídos em diferentes diretórios ou blocos de armazenamento, o que pode melhorar significativamente o desempenho de consultas e operações de leitura.

No Spark, o particionamento pode ser feito durante a escrita dos dados, dividindo-os em diretórios separados com base nos valores de uma coluna escolhida. Isso facilita a execução de consultas mais eficientes, pois o Spark pode acessar apenas os dados relevantes para a operação, minimizando a quantidade de dados a serem processados.

In [None]:
# Obtém o número de partições do DataFrame
n_particoes = df_juncao.rdd.getNumPartitions()
print('O número de partições é: ', n_particoes)

O número de partições é:  1


In [None]:
# descobrindo qual maquina estou utilizando e verificando a quantidade de CPU(s)
!lscpu

Architecture:            x86_64
  CPU op-mode(s):        32-bit, 64-bit
  Address sizes:         46 bits physical, 48 bits virtual
  Byte Order:            Little Endian
CPU(s):                  2
  On-line CPU(s) list:   0,1
Vendor ID:               GenuineIntel
  Model name:            Intel(R) Xeon(R) CPU @ 2.20GHz
    CPU family:          6
    Model:               79
    Thread(s) per core:  2
    Core(s) per socket:  1
    Socket(s):           1
    Stepping:            0
    BogoMIPS:            4400.44
    Flags:               fpu vme de pse tsc msr pae mce cx8 apic sep mtrr pge mca cmov pat pse36 clf
                         lush mmx fxsr sse sse2 ss ht syscall nx pdpe1gb rdtscp lm constant_tsc rep_
                         good nopl xtopology nonstop_tsc cpuid tsc_known_freq pni pclmulqdq ssse3 fm
                         a cx16 pcid sse4_1 sse4_2 x2apic movbe popcnt aes xsave avx f16c rdrand hyp
                         ervisor lahf_lm abm 3dnowprefetch invpcid_single ssbd i

A função `repartition()` é usada para redistribuir os dados de um DataFrame Spark em um número específico de partições. Isso é útil para melhorar o desempenho de operações subsequentes, especialmente durante operações de join e agregação, onde a distribuição dos dados entre as partições pode afetar significativamente o desempenho.

Quando você chama `repartition(n)`, o Spark redistribuirá os dados em exatamente **n** partições. O Spark irá particionar os dados de maneira uniforme, o que significa que ele tentará distribuir igualmente os dados entre as partições.

Aqui está um exemplo de como usar a função `repartition()`:

`

In [None]:
# Reparticionar os dados em 4 partições
df_repartitioned = df_juncao.repartition(4)

In [None]:
# Verificando o número de partições do DataFrame
n_particoes = df_repartitioned.rdd.getNumPartitions()
print('O número de partições, após o reparticionamento: ', n_particoes)

O número de partições, após o reparticionamento:  4


A função `coalesce()` é usada para reduzir o número de partições de um DataFrame Spark para um valor específico. Ao contrário da função `repartition()`, que redistribui os dados entre todas as partições, a função `coalesce()` apenas mescla as partições adjacentes, minimizando a movimentação de dados entre as partições.

Quando você chama `coalesce(n)`, o Spark tentará mesclar as partições existentes em exatamente **n** partições. Se o número de partições for reduzido, o Spark tentará mesclar as partições de forma a manter a distribuição dos dados o mais uniforme possível.

Aqui está um exemplo de como usar a função `coalesce()`:

In [None]:
# Coalescer os dados em 2 partições
df_coalesced = df_repartitioned.coalesce(2)

In [None]:
# Obtém o número de partições do DataFrame
n_particoes = df_coalesced.rdd.getNumPartitions()
print('O número de partições, após coalecer os dados: ', n_particoes)

O número de partições, após coalecer os dados:  2


Uma outra alternativa de particionamento é com o uso do `partitionBy()`, citada no topico "Escrita de Dados",

## Escrita de Dados e particionamento

A escrita de dados em Apache Spark é uma etapa crucial no processamento de grandes conjuntos de dados distribuídos. Permite que você armazene o resultado do processamento realizado em um DataFrame em diferentes formatos de arquivo, como JSON, CSV, Parquet, Avro, entre outros. Vamos explorar alguns conceitos importantes relacionados à escrita de dados:

- **write()**: Este método é utilizado para iniciar o processo de escrita dos dados contidos em um DataFrame para um local específico. Ele é aplicado ao DataFrame e configura as opções necessárias para a escrita, como o formato do arquivo, o modo de escrita, as opções de compressão, entre outros.

- **mode**: O parâmetro `mode` especifica o modo de escrita dos dados. Existem diferentes modos disponíveis:
    - **append**: Adiciona os dados ao final do arquivo existente, se houver.
    - **overwrite**: Sobrescreve o arquivo existente com os novos dados, se o arquivo já existir.
    - **ignore**: Ignora a operação de escrita se o arquivo já existir, sem lançar erros.
    - **error/errorifexists**: Lança um erro se o arquivo já existir, interrompendo o processo de escrita.
    O modo padrão é geralmente `error` ou `errorifexists`.

- **partitionBy**: Este parâmetro é opcional e permite particionar os dados escritos com base nos valores de uma ou mais colunas do DataFrame. A particionamento é uma técnica importante para organizar os dados em subdiretórios com base em valores específicos de colunas, facilitando a consulta e a recuperação de dados posteriormente.

- **Parquet**: É um formato de arquivo colunar altamente eficiente para armazenamento e processamento de dados no ecossistema do Hadoop. Ele oferece compressão eficiente, esquema de dados embutido e suporte para operações de leitura e gravação paralelas. O Parquet é amplamente utilizado no Apache Spark devido ao seu desempenho e interoperabilidade com outras ferramentas do ecossistema Hadoop.

Ao usar o método `write()` em um DataFrame no Spark, você pode especificar o formato de arquivo desejado, configurar opções de escrita relevantes, como o modo de escrita e o particionamento, e salvar os dados processados em disco para uso futuro. Isso é fundamental em pipelines de dados para armazenar resultados intermediários ou finais de forma eficiente e escalável.

In [None]:
# Criando o arquivo a ser escrito
df_escrito = df_juncao

In [None]:
# Escrevendo o DataFrame no formato Parquet com modo 'overwrite', particionando por 'Vagas' e salvando em "output/parquet/base"
df_escrito.write.mode('overwrite').partitionBy('Vagas').parquet("output/parquet/base")