# Manipulação de Dados com Spark - Pyspark
### Cinthia Santos - Engenheira de Dados
[Linkedin](https://www.linkedin.com/in/cinthialpsantos/)

[Github](https://github.com/cinthialet)

cithsantos@gmail.com

# Objetivo
Ao longo da minha jornada no mundo dos dados, percebi a crescente demanda e eficiência de plataformas distribuídas como o Spark no tratamento de grandes volumes de dados - Big Data. Assim, busquei aprimorar minhas habilidades em PySpark, alinhando-as às melhores práticas de processamento distribuído.

### Neste notebook, pretendo compartilhar minha experiência de refatoração do projeto anterior de manipulação de dados com Python, aplicando o mesmo dataset e as mesmas demandas de negócio, porém, utilizando a robustez e escalabilidade do PySpark.

Meu objetivo não é apenas demonstrar minha evolução e proficiência nesse domínio, mas também oferecer um guia prático e instrutivo para aqueles que desejam fazer uma transição semelhante ou aprimorar seus conhecimentos em PySpark.

> **Sugestão : Acompanhe esse projeto junto com o meu outro projeto de [Manipulação de Dados com Python - Pandas](https://github.com/cinthialet/python-manipulacao-dados)**, que pode auxiliar a entender a diferença, vantagens e desafios de cada uma dessas ferramentas.

In [1]:
# Instalando a biblioteca pyspark no ambiente Google Colab.
# O '!' antes do 'pip' é usado para executar comandos do sistema no notebook.
!pip install pyspark



In [2]:
# Importando os pacotes necessários
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import lit, monotonically_increasing_id, concat, current_date, date_sub , to_date, datediff , sum, when, col , max , min
from pyspark.sql.types import FloatType
from pyspark.sql.window import Window


# Importações e Descrições:

Este bloco descreve e importa todos os módulos e funções essenciais para as operações de processamento de dados com o Spark no projeto.

- **from pyspark.sql import SparkSession**:
  - Importa a `SparkSession`, que é o ponto de entrada para utilizar funcionalidades do Spark com o dataframes API.

- **from pyspark.sql import functions as F**:
  - Importa todas as funções disponíveis para operações em DataFrames do Spark, permitindo o acesso via um alias (apelido) `F`.

- **from pyspark.sql.functions import lit, monotonically_increasing_id, concat, current_date, date_sub, to_date, datediff, sum, when, col, max, min**:
  - Importa funções específicas para manipular e operar sobre colunas em dataframes. Alguns exemplos:
    - `lit`: Cria uma coluna de valores constantes.
    - `monotonically_increasing_id`: Gera IDs únicos para cada linha, garantindo que eles sejam sempre crescentes.
    - `concat`: Concatena múltiplas colunas em uma.
    - `current_date`: Retorna a data atual.
    - `date_sub`: Subtrai um número de dias de uma data.
    - `to_date`: Converte uma string para uma data.
    - `datediff`: Calcula a diferença entre duas datas.
    - `sum, max, min`: Funções agregadas para somar, obter o valor máximo e mínimo.
    - `when`: Função condicional para operar sobre colunas.
    - `col`: Retorna uma coluna com base em um nome de coluna.

- **from pyspark.sql.types import FloatType**:
  - Importa o tipo de dado `FloatType`, utilizado para definir ou converter colunas para o tipo float em dataframes.

- **from pyspark.sql.window import Window**:
  - Importa a classe `Window`, utilizada para definir janelas sobre as quais operações como funções de janela (por exemplo, cálculos de lag ou lead) podem ser aplicadas em dataframes.



# Sobre o Spark
O Apache Spark é uma plataforma poderosa para processamento distribuído de grandes conjuntos de dados. Para começar a trabalhar com o Spark usando a API PySpark, é necessário um ponto de entrada principal: a SparkSession.


###SparkSession: O que é e por quê?

SparkSession é o ponto de entrada para trabalhar com a Spark no PySpark.
Ela fornece uma interface para a interação com as funcionalidades do Spark
- Sem iniciar uma SparkSession, você não poderá executar operações Spark.

**builder**: Método para criar uma nova SparkSession.

**appName(...)**: Define um nome para a sessão, útil para identificar sua aplicação no Spark UI.

**getOrCreate()**: Se uma SparkSession com esse nome já existe, ela a recupera; caso contrário, cria uma nova.

In [3]:
# Iniciando a SparkSession com o nome "Manipulacao de dados com Spark".
spark = SparkSession.builder.appName("Manipulacao de dados com Spark").getOrCreate()

# Leitura de Arquivos CSV

In [4]:
# Carregar CSV em DataFrame
df_csv = spark.read.csv("/content/dados/GasPricesinBrazil_2004-2019.csv")

# Checar o resultado : .show() corresponde a .head(20) no Pandas
df_csv.show()

+--------------------+
|                 _c0|
+--------------------+
|Unnamed: 0;DATA I...|
|0;2004-05-09;2004...|
|1;2004-05-09;2004...|
|2;2004-05-09;2004...|
|3;2004-05-09;2004...|
|4;2004-05-09;2004...|
|5;2004-05-09;2004...|
|6;2004-05-09;2004...|
|7;2004-05-09;2004...|
|8;2004-05-09;2004...|
|9;2004-05-09;2004...|
|10;2004-05-09;200...|
|11;2004-05-09;200...|
|12;2004-05-09;200...|
|13;2004-05-09;200...|
|14;2004-05-09;200...|
|15;2004-05-09;200...|
|16;2004-05-09;200...|
|17;2004-05-09;200...|
|18;2004-05-09;200...|
+--------------------+
only showing top 20 rows



## Separador
Assim como na manipulação de dados com python, a leitura do arquivo padrão usa vírgula como separador, e o arquivo em questão usa ponto-e-vírgula. No comando a seguir, definirei o separador certo para a correta leitura.
- Também é necessário setar header=True , para informar que os dados possuem a primeira linha com os nomes das colunas

In [5]:
df_csv = spark.read.csv("/content/dados/GasPricesinBrazil_2004-2019.csv", sep=";",header=True)
df_csv.show()

+----------+------------+----------+------------+-------------------+----------------+----------------------------+-----------------+-------------------+---------------------+--------------------+--------------------+--------------------+------------------------+------------------------+--------------------------+-------------------------+-------------------------+-----------------------------+---+----+
|Unnamed: 0|DATA INICIAL|DATA FINAL|      REGIÃO|             ESTADO|         PRODUTO|NÚMERO DE POSTOS PESQUISADOS|UNIDADE DE MEDIDA|PREÇO MÉDIO REVENDA|DESVIO PADRÃO REVENDA|PREÇO MÍNIMO REVENDA|PREÇO MÁXIMO REVENDA|MARGEM MÉDIA REVENDA|COEF DE VARIAÇÃO REVENDA|PREÇO MÉDIO DISTRIBUIÇÃO|DESVIO PADRÃO DISTRIBUIÇÃO|PREÇO MÍNIMO DISTRIBUIÇÃO|PREÇO MÁXIMO DISTRIBUIÇÃO|COEF DE VARIAÇÃO DISTRIBUIÇÃO|MÊS| ANO|
+----------+------------+----------+------------+-------------------+----------------+----------------------------+-----------------+-------------------+---------------------+-----------

## Otimização pela conversão de arquivos

## Parquet vs CSV no Spark

### CSV:
- **Formato:** Texto simples, com valores separados por delimitadores (geralmente vírgulas).
- **Uso:** Portátil e amplamente utilizado.
- **Legibilidade:** Diretamente legível por humanos, facilitando a visualização e o entendimento.
- **Desempenho:** Mais lento para ler e escrever em comparação com formatos binários em grandes conjuntos de dados.

### Parquet:
- **Formato:** Colunar e binário, otimizado para processamento de dados.
- **Uso:** Oferece compressão eficiente e leitura rápida.
- **Legibilidade:** Não é diretamente legível por humanos, sendo otimizado para máquinas.
- **Desempenho:** Ideal para o Spark devido à sua eficiência e integração.

**Recomendação:** Converta dados CSV em Parquet ao trabalhar com grandes volumes no Spark para maximizar a eficiência.


### Salvando o dataframe em um arquivo já existente
Quando você tenta salvar um DataFrame como um arquivo Parquet em um caminho que já existe, o Spark não permitirá essa operação por padrão. Isso é uma medida de segurança para evitar a sobreposição acidental de dados valiosos.

Se sua intenção for substituir o arquivo ou diretório Parquet existente, você deve especificar explicitamente o modo de gravação como "overwrite". Isso instrui o Spark a sobrescrever os dados existentes.

In [6]:
# Salvar DataFrame como Parquet
df_csv.write.mode("overwrite").parquet("manipulacao_dos_dados.parquet")

In [7]:
# Carregar dados Parquet em DataFrame
df = spark.read.parquet("manipulacao_dos_dados.parquet")
df.show()

+----------+------------+----------+------------+-------------------+----------------+----------------------------+-----------------+-------------------+---------------------+--------------------+--------------------+--------------------+------------------------+------------------------+--------------------------+-------------------------+-------------------------+-----------------------------+---+----+
|Unnamed: 0|DATA INICIAL|DATA FINAL|      REGIÃO|             ESTADO|         PRODUTO|NÚMERO DE POSTOS PESQUISADOS|UNIDADE DE MEDIDA|PREÇO MÉDIO REVENDA|DESVIO PADRÃO REVENDA|PREÇO MÍNIMO REVENDA|PREÇO MÁXIMO REVENDA|MARGEM MÉDIA REVENDA|COEF DE VARIAÇÃO REVENDA|PREÇO MÉDIO DISTRIBUIÇÃO|DESVIO PADRÃO DISTRIBUIÇÃO|PREÇO MÍNIMO DISTRIBUIÇÃO|PREÇO MÁXIMO DISTRIBUIÇÃO|COEF DE VARIAÇÃO DISTRIBUIÇÃO|MÊS| ANO|
+----------+------------+----------+------------+-------------------+----------------+----------------------------+-----------------+-------------------+---------------------+-----------

In [8]:
# Perceba que temos um dataframe spark, ao invés de um dataframe pandas
type(df)

pyspark.sql.dataframe.DataFrame

### Insight
Assim como no caso do pandas, a coluna 'Unnamed: 0' originou-se do uso de um separador na importação e também não tem valor informativo relevante. Para deletar essa coluna no pyspark, usarei o .drop() .

In [9]:
df = df.drop('Unnamed: 0')

### Informações básicas dos dados
No PySpark, não há um método direto equivalente ao .info() do pandas, mas é possível obter informações semelhantes combinando alguns métodos e propriedades do DataFrame Spark. Nesse caso,simularei o .info com o seguinte:

- Número de Linhas: Use df.count().
- Nomes de Colunas: Use df.columns.
- Tipos de Dados por coluna: Use df.dtypes.

In [10]:
# Número de linhas e colunas - equivale a .shape no pandas
num_linhas = df.count()
num_cols = len(df.columns)

print(f"Total de linhas: {num_linhas}")
print(f"Total de cols: {num_cols}")

# Checando as colunas do df spark e seus respectivos tipos
df.dtypes

Total de linhas: 106823
Total de cols: 20


[('DATA INICIAL', 'string'),
 ('DATA FINAL', 'string'),
 ('REGIÃO', 'string'),
 ('ESTADO', 'string'),
 ('PRODUTO', 'string'),
 ('NÚMERO DE POSTOS PESQUISADOS', 'string'),
 ('UNIDADE DE MEDIDA', 'string'),
 ('PREÇO MÉDIO REVENDA', 'string'),
 ('DESVIO PADRÃO REVENDA', 'string'),
 ('PREÇO MÍNIMO REVENDA', 'string'),
 ('PREÇO MÁXIMO REVENDA', 'string'),
 ('MARGEM MÉDIA REVENDA', 'string'),
 ('COEF DE VARIAÇÃO REVENDA', 'string'),
 ('PREÇO MÉDIO DISTRIBUIÇÃO', 'string'),
 ('DESVIO PADRÃO DISTRIBUIÇÃO', 'string'),
 ('PREÇO MÍNIMO DISTRIBUIÇÃO', 'string'),
 ('PREÇO MÁXIMO DISTRIBUIÇÃO', 'string'),
 ('COEF DE VARIAÇÃO DISTRIBUIÇÃO', 'string'),
 ('MÊS', 'string'),
 ('ANO', 'string')]

In [11]:
# Outro método interessante é o printSchema()
df.printSchema()

root
 |-- DATA INICIAL: string (nullable = true)
 |-- DATA FINAL: string (nullable = true)
 |-- REGIÃO: string (nullable = true)
 |-- ESTADO: string (nullable = true)
 |-- PRODUTO: string (nullable = true)
 |-- NÚMERO DE POSTOS PESQUISADOS: string (nullable = true)
 |-- UNIDADE DE MEDIDA: string (nullable = true)
 |-- PREÇO MÉDIO REVENDA: string (nullable = true)
 |-- DESVIO PADRÃO REVENDA: string (nullable = true)
 |-- PREÇO MÍNIMO REVENDA: string (nullable = true)
 |-- PREÇO MÁXIMO REVENDA: string (nullable = true)
 |-- MARGEM MÉDIA REVENDA: string (nullable = true)
 |-- COEF DE VARIAÇÃO REVENDA: string (nullable = true)
 |-- PREÇO MÉDIO DISTRIBUIÇÃO: string (nullable = true)
 |-- DESVIO PADRÃO DISTRIBUIÇÃO: string (nullable = true)
 |-- PREÇO MÍNIMO DISTRIBUIÇÃO: string (nullable = true)
 |-- PREÇO MÁXIMO DISTRIBUIÇÃO: string (nullable = true)
 |-- COEF DE VARIAÇÃO DISTRIBUIÇÃO: string (nullable = true)
 |-- MÊS: string (nullable = true)
 |-- ANO: string (nullable = true)



## Atribuindo valor constante à uma coluna existente
Agora irei replicar a lógica feita em pandas para atribuir um valor constante para pyspark.

1. Criar um novo df (df2) que sofrerá as alterações, mantendo assim o df original intacto
- Atribuir à uma nova variável

2. Modificar uma coluna
- O método withColumn do DataFrame é usado para substituir/modificar valores das colunas.

3. Atribuir o valor constante com a Função `lit`

- `lit` é uma função do PySpark cujo nome é uma abreviação de "literal". Ela é utilizada para criar uma coluna de valores constantes em um DataFrame.
  - Funciona também com valores numéricos
- **Objetivo**: Converter um valor literal constante em uma coluna de valores constantes.

In [12]:
# Copiando o conteúdo de df para df2
df2 = df # atribuir a nova variável
df2.printSchema()

root
 |-- DATA INICIAL: string (nullable = true)
 |-- DATA FINAL: string (nullable = true)
 |-- REGIÃO: string (nullable = true)
 |-- ESTADO: string (nullable = true)
 |-- PRODUTO: string (nullable = true)
 |-- NÚMERO DE POSTOS PESQUISADOS: string (nullable = true)
 |-- UNIDADE DE MEDIDA: string (nullable = true)
 |-- PREÇO MÉDIO REVENDA: string (nullable = true)
 |-- DESVIO PADRÃO REVENDA: string (nullable = true)
 |-- PREÇO MÍNIMO REVENDA: string (nullable = true)
 |-- PREÇO MÁXIMO REVENDA: string (nullable = true)
 |-- MARGEM MÉDIA REVENDA: string (nullable = true)
 |-- COEF DE VARIAÇÃO REVENDA: string (nullable = true)
 |-- PREÇO MÉDIO DISTRIBUIÇÃO: string (nullable = true)
 |-- DESVIO PADRÃO DISTRIBUIÇÃO: string (nullable = true)
 |-- PREÇO MÍNIMO DISTRIBUIÇÃO: string (nullable = true)
 |-- PREÇO MÁXIMO DISTRIBUIÇÃO: string (nullable = true)
 |-- COEF DE VARIAÇÃO DISTRIBUIÇÃO: string (nullable = true)
 |-- MÊS: string (nullable = true)
 |-- ANO: string (nullable = true)



In [13]:
# Atribuindo o valor "Combustível" para toda a coluna "PRODUTO" do df2 e salvando o resultado no df2
df2 = df2.withColumn("PRODUTO", lit("Combustível"))

# Visualizando as 15 primeiras linhas para conferência
df2.show(15)


+------------+----------+------------+-------------------+-----------+----------------------------+-----------------+-------------------+---------------------+--------------------+--------------------+--------------------+------------------------+------------------------+--------------------------+-------------------------+-------------------------+-----------------------------+---+----+
|DATA INICIAL|DATA FINAL|      REGIÃO|             ESTADO|    PRODUTO|NÚMERO DE POSTOS PESQUISADOS|UNIDADE DE MEDIDA|PREÇO MÉDIO REVENDA|DESVIO PADRÃO REVENDA|PREÇO MÍNIMO REVENDA|PREÇO MÁXIMO REVENDA|MARGEM MÉDIA REVENDA|COEF DE VARIAÇÃO REVENDA|PREÇO MÉDIO DISTRIBUIÇÃO|DESVIO PADRÃO DISTRIBUIÇÃO|PREÇO MÍNIMO DISTRIBUIÇÃO|PREÇO MÁXIMO DISTRIBUIÇÃO|COEF DE VARIAÇÃO DISTRIBUIÇÃO|MÊS| ANO|
+------------+----------+------------+-------------------+-----------+----------------------------+-----------------+-------------------+---------------------+--------------------+--------------------+-----------------

In [14]:
#Visualizando as 15 ultimas linhas para conferência
# No Pyspark não tem um método direto equivalente ao tail() do Pandas.
# No entanto, pode-se usar uma combinação de count() e orderBy() para obter um resultado similar, usando uma col de referência.
# A col de referencia usada foi a DATA INICIAL, pois o df já está naturalmente ordenado por essa col.
df2.orderBy("DATA INICIAL", ascending=False).show(15)

+------------+----------+------------+------------------+-----------+----------------------------+-----------------+-------------------+---------------------+--------------------+--------------------+--------------------+------------------------+------------------------+--------------------------+-------------------------+-------------------------+-----------------------------+---+----+
|DATA INICIAL|DATA FINAL|      REGIÃO|            ESTADO|    PRODUTO|NÚMERO DE POSTOS PESQUISADOS|UNIDADE DE MEDIDA|PREÇO MÉDIO REVENDA|DESVIO PADRÃO REVENDA|PREÇO MÍNIMO REVENDA|PREÇO MÁXIMO REVENDA|MARGEM MÉDIA REVENDA|COEF DE VARIAÇÃO REVENDA|PREÇO MÉDIO DISTRIBUIÇÃO|DESVIO PADRÃO DISTRIBUIÇÃO|PREÇO MÍNIMO DISTRIBUIÇÃO|PREÇO MÁXIMO DISTRIBUIÇÃO|COEF DE VARIAÇÃO DISTRIBUIÇÃO|MÊS| ANO|
+------------+----------+------------+------------------+-----------+----------------------------+-----------------+-------------------+---------------------+--------------------+--------------------+--------------------

In [15]:
# Conferindo que o df original não foi modificado - checando primeiras 100
df.select('PRODUTO').show(100)

+----------------+
|         PRODUTO|
+----------------+
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDR

In [16]:
# Conferindo que o df original não foi modificado - checando ultimas 100
df.select('PRODUTO').orderBy("DATA INICIAL", ascending=False).show(100)

+----------------+
|         PRODUTO|
+----------------+
|ETANOL HIDRATADO|
|  GASOLINA COMUM|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|ETANOL HIDRATADO|
|     ÓLEO DIESEL|
|     ÓLEO DIESEL|
|     ÓLEO DIESEL|
|     ÓLEO DIESEL|
|     ÓLEO DIESEL|
|     ÓLEO DIESEL|
|     ÓLEO DIESEL|
|     ÓLEO DIESEL|
|     ÓLEO DIESEL|
|     ÓLEO DIESEL|
|     ÓLEO DIESEL|
|     ÓLEO DIESEL|
|     ÓLEO DIESEL|
|     ÓLEO DIESEL|
|     ÓLEO DIESEL|
|     ÓLEO DIESEL|
|     ÓLEO DIESEL|
|     ÓLEO DIESEL|
|     ÓLEO DIESEL|
|     ÓLEO DIESEL|
|     ÓLEO DIESEL|
|     ÓLEO DIESEL|
|     ÓLEO D

### Checando o resultado de atribuição de valor constante

1. value_counts()

No python, usei value_counts() na col PRODUTOS para fazer essa checagem.
No Pyspark, não tem essa função. Devemos, como no SQL, primeiro agrupar os valores (.groupBy) da coluna alvo e então fazer a contagem (.count()) .


####**Para mostrar o resultado de um dataframe, é sempre necessário usar o .show()**


In [17]:
df2.groupBy("PRODUTO").count().show()

+-----------+------+
|    PRODUTO| count|
+-----------+------+
|Combustível|106823|
+-----------+------+



2. nunique()

No python, usei nunique() na col PRODUTOS para fazer essa checagem de valores únicos/distintos.

No PySpark, para verificar a quantidade de valores distintos em uma coluna (equivalente ao nunique() do Pandas), pode-se usar o método distinct() seguido de count().

####**Nesse caso, como o resultado não é um dataframe, não é necessário o .show(). Inclusive, dará erro se usar.**


In [18]:
df2.select("PRODUTO").distinct().count()


1

### Verificando valores distintos em todas as cols de um df spark
Para verificar o número de valores únicos em cada coluna do DataFrame Pandas, bastava usar df2.nunique().

No PySpark, é necessário usar um loop 'for' para iterar sobre as colunas e aplicar 'distinct().count()' em cada uma delas.

O resultado será um um dicionário onde as chaves serão os nomes das colunas, e os respectivos valores serão a quantidade de valores distintos.

In [19]:
# iniciando um dicionário vazio
contagem_valores_unicos_na_coluna = {}

#iterando nas colunas do df2
for col in df2.columns:
    # colocando no dict a contagem dos val distintos
    contagem_valores_unicos_na_coluna[col] = df2.select(col).distinct().count()

# printando o dicionário com o resultado
print(contagem_valores_unicos_na_coluna)

{'DATA INICIAL': 785, 'DATA FINAL': 785, 'REGIÃO': 5, 'ESTADO': 27, 'PRODUTO': 1, 'NÚMERO DE POSTOS PESQUISADOS': 2219, 'UNIDADE DE MEDIDA': 3, 'PREÇO MÉDIO REVENDA': 18612, 'DESVIO PADRÃO REVENDA': 6341, 'PREÇO MÍNIMO REVENDA': 2047, 'PREÇO MÁXIMO REVENDA': 2078, 'MARGEM MÉDIA REVENDA': 11930, 'COEF DE VARIAÇÃO REVENDA': 210, 'PREÇO MÉDIO DISTRIBUIÇÃO': 15997, 'DESVIO PADRÃO DISTRIBUIÇÃO': 5858, 'PREÇO MÍNIMO DISTRIBUIÇÃO': 21620, 'PREÇO MÁXIMO DISTRIBUIÇÃO': 22576, 'COEF DE VARIAÇÃO DISTRIBUIÇÃO': 397, 'MÊS': 12, 'ANO': 16}


# Demanda 1
No df2, criar uma coluna que seja um identificador único de cada registro da tabela
- Atribuindo valor variável à uma nova coluna

#### No pyspark, já existe a função monotonically_increasing_id(), que gera um identificador único e crescente para cada linha do DataFrame.

In [20]:

# 1. Adicionando uma coluna de identificador único:
df2 = df2.withColumn("ID_REGISTRO", monotonically_increasing_id())

# 2. Mostrando o resultado:
df2.select("ID_REGISTRO").show(10)

+-----------+
|ID_REGISTRO|
+-----------+
|          0|
|          1|
|          2|
|          3|
|          4|
|          5|
|          6|
|          7|
|          8|
|          9|
+-----------+
only showing top 10 rows



##Nota
A solução acima é suficiente para atender à demanda 1. Porém, para ficar igual ao que executei em Python, irei concatenar a palavra "Registro" em cada linha da coluna "ID_REGISTRO". Será necessário converter os numeros do resultado anterior em string, para fazer a concatenaão com a palavra "Registro", e isso será feito pelo cast().

- df2.withColumn("ID_REGISTRO", ): Permite adicionar uma nova coluna ou substituir uma coluna existente. Aqui, estamos substituindo a coluna "ID_REGISTRO" existente com os novos valores que formaremos.

- concat(): Concatena duas ou mais colunas em uma única coluna. Ela aceita um número variável de argumentos, e esses argumentos devem ser colunas ou valores literais que queremos concatenar.

- lit("Registro "): lit cria uma coluna de valores constantes. Aqui, estamos criando uma coluna em que todos os valores são a string "Registro  ", com um espaço em branco após a palavra.

- df2["ID_REGISTRO"].cast("string"): Aqui, estamos pegando a coluna "ID_REGISTRO" do DataFrame df2 e convertendo (ou "casting") todos os seus valores para o tipo string. Isso é necessário porque, originalmente, os valores dessa coluna são números inteiros (IDs) e, para concatená-los com uma string, precisamos que ambos os lados da concatenação sejam strings.

In [21]:
# Convertendo o ID para o formato "Registro X":
df2 = df2.withColumn("ID_REGISTRO", concat(lit("Registro "), df2["ID_REGISTRO"].cast("string")))


In [22]:
# checando pela contagem de valores distintos da coluna criada
df2.select("ID_REGISTRO").distinct().count()

106823

In [23]:
# Visualizando as primeiras 10 linhas da coluna criada com o formato final
df2.select("ID_REGISTRO").show()

+-----------+
|ID_REGISTRO|
+-----------+
| Registro 0|
| Registro 1|
| Registro 2|
| Registro 3|
| Registro 4|
| Registro 5|
| Registro 6|
| Registro 7|
| Registro 8|
| Registro 9|
|Registro 10|
|Registro 11|
|Registro 12|
|Registro 13|
|Registro 14|
|Registro 15|
|Registro 16|
|Registro 17|
|Registro 18|
|Registro 19|
+-----------+
only showing top 20 rows



# Demanda 2
Criar coluna com 'PREÇO MÉDIO REVENDA' convertido em dólares, considerando taxa cambial de 1:5

In [24]:
# Usando o .withColumn para criar a nova coluna, usando a col PREÇO MÉDIO REVENDA
# dividida por 5
df2 = df2.withColumn("PREÇO MÉDIO REVENDA(DÓLAR)", df2["PREÇO MÉDIO REVENDA"] / 5)

# Checando a col original e a criada
df2.select("PREÇO MÉDIO REVENDA", "PREÇO MÉDIO REVENDA(DÓLAR)").show()

+-------------------+--------------------------+
|PREÇO MÉDIO REVENDA|PREÇO MÉDIO REVENDA(DÓLAR)|
+-------------------+--------------------------+
|              1.288|                    0.2576|
|              1.162|                    0.2324|
|              1.389|                    0.2778|
|              1.262|                    0.2524|
|              1.181|       0.23620000000000002|
|              1.383|                    0.2766|
|              1.453|                    0.2906|
|              1.631|                    0.3262|
|              1.284|       0.25680000000000003|
|              1.224|                    0.2448|
|              1.573|                    0.3146|
|              1.276|                    0.2552|
|              1.301|                    0.2602|
|              1.805|                     0.361|
|              1.785|                     0.357|
|              1.578|                    0.3156|
|              1.914|                    0.3828|
|              1.545

### Nota : ìndices (index)
No Spark, os DataFrames não possuem um "índice" no mesmo sentido que os DataFrames do pandas.

## Salvando um Data Frame Spark em arquivo
- Como o Spark é usado para Big Data, faz sentido exportar os dados como .parquet , para otimizar questão de armazenamento.

- O Spark é uma ferramenta de processamento distribuído e, quando você salva um DataFrame, ele é dividido em várias partes (partições) e salvo. Cada partição é salva como um arquivo separado. No caso, o salvamento foi feito em duas partições (00000 e 00001), em uma pasta com o nome definido no comando (nome-definido.parquet)
  - A razão para isso é permitir que o Spark processe e escreva dados em paralelo, utilizando múltiplos nós em um cluster.  
  - Após salvar, além do arquivo CSV desejado, será criado também um arquivo _SUCCESS que indica que os dados foram escritos com sucesso.
  - Quando salvamos em formato Parquet, não precisa se preocupar com parâmetros como header e sep que são típicos de arquivos CSV.

O formato Parquet é binário e mantém a estrutura e os metadados do schema.

In [25]:
## Salvando como arquivo parquet, modo overwrite para sobrescrever caso já exista
# Sem o movo overwrite, não será possível salvar novamente com o mesmo nome, no mesmo caminho.
df2.write.mode("overwrite").parquet('/content/output/resultado-em-parquet-particionado.parquet')


# Demanda 3
Mostrar todos os estados cujos os preços dos combustíveis foram aferidos

In [26]:
# Selecionar os valores únicos da coluna
df2.select("ESTADO").distinct().show(100)

+-------------------+
|             ESTADO|
+-------------------+
|     RIO DE JANEIRO|
|  RIO GRANDE DO SUL|
|           RONDONIA|
|        MATO GROSSO|
|     ESPIRITO SANTO|
|            SERGIPE|
|           AMAZONAS|
|RIO GRANDE DO NORTE|
|     SANTA CATARINA|
|            PARAIBA|
|              GOIAS|
|       MINAS GERAIS|
|          SAO PAULO|
|             PARANA|
|            ALAGOAS|
|           MARANHAO|
|   DISTRITO FEDERAL|
|              PIAUI|
|               ACRE|
|               PARA|
| MATO GROSSO DO SUL|
|              BAHIA|
|         PERNAMBUCO|
|              CEARA|
|              AMAPA|
|          TOCANTINS|
|            RORAIMA|
+-------------------+



### Coloquei 100, pois inserindo nenhuma informação, o spark retonas apenas 20 resultados, o que não é suficiente para minha demanda(São 27 estados)

# Demanda 4
Quantos registros há no dataset por estado? Ordenar do Maior para o menor

In [27]:
df2.groupBy("ESTADO").count().orderBy("count", ascending=False).show()

# O DataFrame foi agrupado pelo campo "ESTADO" e foi contado o número de
# ocorrências para cada estado. O orderBy é usado para ordenar os estados
# pela contagem em ordem decrescente (ou seja, ascending = False)

+-------------------+-----+
|             ESTADO|count|
+-------------------+-----+
|     RIO DE JANEIRO| 4263|
|  RIO GRANDE DO SUL| 4263|
|            PARAIBA| 4263|
|          SAO PAULO| 4263|
|              BAHIA| 4263|
|              CEARA| 4263|
|RIO GRANDE DO NORTE| 4262|
|     SANTA CATARINA| 4262|
|         PERNAMBUCO| 4262|
|       MINAS GERAIS| 4261|
|     ESPIRITO SANTO| 4260|
|            SERGIPE| 4259|
|             PARANA| 4251|
|            ALAGOAS| 4250|
| MATO GROSSO DO SUL| 4246|
|        MATO GROSSO| 4037|
|           AMAZONAS| 3889|
|              GOIAS| 3663|
|              PIAUI| 3548|
|           MARANHAO| 3499|
+-------------------+-----+
only showing top 20 rows



# Demanda 5
Selecionar os preços dos Postos de São Paulo


### Filtros - Filtrando amostras com Seleção Condicional
No PySpark, para filtrar linhas com base em uma condição, você usa o método 'filter()'

In [28]:
df_sp = df2.filter(df2['ESTADO'] == 'SAO PAULO')
df_sp.show()

+------------+----------+-------+---------+-----------+----------------------------+-----------------+-------------------+---------------------+--------------------+--------------------+--------------------+------------------------+------------------------+--------------------------+-------------------------+-------------------------+-----------------------------+---+----+------------+--------------------------+
|DATA INICIAL|DATA FINAL| REGIÃO|   ESTADO|    PRODUTO|NÚMERO DE POSTOS PESQUISADOS|UNIDADE DE MEDIDA|PREÇO MÉDIO REVENDA|DESVIO PADRÃO REVENDA|PREÇO MÍNIMO REVENDA|PREÇO MÁXIMO REVENDA|MARGEM MÉDIA REVENDA|COEF DE VARIAÇÃO REVENDA|PREÇO MÉDIO DISTRIBUIÇÃO|DESVIO PADRÃO DISTRIBUIÇÃO|PREÇO MÍNIMO DISTRIBUIÇÃO|PREÇO MÁXIMO DISTRIBUIÇÃO|COEF DE VARIAÇÃO DISTRIBUIÇÃO|MÊS| ANO| ID_REGISTRO|PREÇO MÉDIO REVENDA(DÓLAR)|
+------------+----------+-------+---------+-----------+----------------------------+-----------------+-------------------+---------------------+--------------------+---

In [29]:
# validando o resultado
df_sp.groupBy("ESTADO").count().show()

+---------+-----+
|   ESTADO|count|
+---------+-----+
|SAO PAULO| 4263|
+---------+-----+



# Demanda 6
Selecionar os preços dos Postos de São Paulo, Rio de Janeiro e Maranhao

In [30]:
df_estados = df2.filter(df2['ESTADO'].isin(['SAO PAULO', 'RIO DE JANEIRO', 'MARANHAO']))
df_estados.show()

+------------+----------+--------+--------------+-----------+----------------------------+-----------------+-------------------+---------------------+--------------------+--------------------+--------------------+------------------------+------------------------+--------------------------+-------------------------+-------------------------+-----------------------------+---+----+------------+--------------------------+
|DATA INICIAL|DATA FINAL|  REGIÃO|        ESTADO|    PRODUTO|NÚMERO DE POSTOS PESQUISADOS|UNIDADE DE MEDIDA|PREÇO MÉDIO REVENDA|DESVIO PADRÃO REVENDA|PREÇO MÍNIMO REVENDA|PREÇO MÁXIMO REVENDA|MARGEM MÉDIA REVENDA|COEF DE VARIAÇÃO REVENDA|PREÇO MÉDIO DISTRIBUIÇÃO|DESVIO PADRÃO DISTRIBUIÇÃO|PREÇO MÍNIMO DISTRIBUIÇÃO|PREÇO MÁXIMO DISTRIBUIÇÃO|COEF DE VARIAÇÃO DISTRIBUIÇÃO|MÊS| ANO| ID_REGISTRO|PREÇO MÉDIO REVENDA(DÓLAR)|
+------------+----------+--------+--------------+-----------+----------------------------+-----------------+-------------------+---------------------+------

In [31]:
# validando o resultado
df_estados.groupBy("ESTADO").count().show()

+--------------+-----+
|        ESTADO|count|
+--------------+-----+
|RIO DE JANEIRO| 4263|
|     SAO PAULO| 4263|
|      MARANHAO| 3499|
+--------------+-----+



# Demanda 7 - Múltiplas condições
Selecionar: registros de postos do Rio de Janeiro com Preços Médio de Revenda acima de 2 reais

 No Pyspark, permanecem os mesmos símbolos de condicionais que o Pandas.

In [32]:
df_rj_acima_2_reais = df2.filter((df2['ESTADO'] == 'RIO DE JANEIRO') & (df2['PREÇO MÉDIO REVENDA'] > 2))
df_rj_acima_2_reais.show()

+------------+----------+-------+--------------+-----------+----------------------------+-----------------+-------------------+---------------------+--------------------+--------------------+--------------------+------------------------+------------------------+--------------------------+-------------------------+-------------------------+-----------------------------+---+----+--------------+--------------------------+
|DATA INICIAL|DATA FINAL| REGIÃO|        ESTADO|    PRODUTO|NÚMERO DE POSTOS PESQUISADOS|UNIDADE DE MEDIDA|PREÇO MÉDIO REVENDA|DESVIO PADRÃO REVENDA|PREÇO MÍNIMO REVENDA|PREÇO MÁXIMO REVENDA|MARGEM MÉDIA REVENDA|COEF DE VARIAÇÃO REVENDA|PREÇO MÉDIO DISTRIBUIÇÃO|DESVIO PADRÃO DISTRIBUIÇÃO|PREÇO MÍNIMO DISTRIBUIÇÃO|PREÇO MÁXIMO DISTRIBUIÇÃO|COEF DE VARIAÇÃO DISTRIBUIÇÃO|MÊS| ANO|   ID_REGISTRO|PREÇO MÉDIO REVENDA(DÓLAR)|
+------------+----------+-------+--------------+-----------+----------------------------+-----------------+-------------------+---------------------+-----

In [33]:
## Checando condição1 : Apenas estado do RJ
df_rj_acima_2_reais.groupBy("ESTADO").count().show()

+--------------+-----+
|        ESTADO|count|
+--------------+-----+
|RIO DE JANEIRO| 1694|
+--------------+-----+



# Nota importante
### Aqui, detectei uma divergência entre os valores encontrados no Pyspark e no Pandas. Investigando, cheguei nessa conclusão :

a) Pandas e String:

Pandas consegue realizar operações numéricas em strings que se parecem com números sem lançar um erro. Por exemplo, '2' > 1 retorna True em Python (pois o Python compara a string com o número e, implicitamente, converte o número em uma string para a comparação). Mas esta não é uma boa prática e pode levar a comportamentos inesperados em casos mais complicados.

b) Pyspark e string :

PySpark não tentará implicitamente converter tipos de dados incompatíveis, como Pandas às vezes faz. Se tentarmos comparar uma string com um número em PySpark, ele não tentará converter a string em um número, mas sim comparar como duas strings. Neste caso, a comparação '10' > 2 será verdadeira em Pandas/Python, mas em PySpark isso não acontece.


In [34]:
## Primeira tentativa de soluçao - converter a coluna PREÇO MÉDIO DISTRIBUIÇÃO de string para float
## Mais detalhes sobre conversão na demanda 13
## Usando um novo df (df3) para teste

df3 = df2.withColumn("PREÇO MÉDIO DISTRIBUIÇÃO", df2["PREÇO MÉDIO DISTRIBUIÇÃO"].cast(FloatType()))
df3.printSchema()


root
 |-- DATA INICIAL: string (nullable = true)
 |-- DATA FINAL: string (nullable = true)
 |-- REGIÃO: string (nullable = true)
 |-- ESTADO: string (nullable = true)
 |-- PRODUTO: string (nullable = false)
 |-- NÚMERO DE POSTOS PESQUISADOS: string (nullable = true)
 |-- UNIDADE DE MEDIDA: string (nullable = true)
 |-- PREÇO MÉDIO REVENDA: string (nullable = true)
 |-- DESVIO PADRÃO REVENDA: string (nullable = true)
 |-- PREÇO MÍNIMO REVENDA: string (nullable = true)
 |-- PREÇO MÁXIMO REVENDA: string (nullable = true)
 |-- MARGEM MÉDIA REVENDA: string (nullable = true)
 |-- COEF DE VARIAÇÃO REVENDA: string (nullable = true)
 |-- PREÇO MÉDIO DISTRIBUIÇÃO: float (nullable = true)
 |-- DESVIO PADRÃO DISTRIBUIÇÃO: string (nullable = true)
 |-- PREÇO MÍNIMO DISTRIBUIÇÃO: string (nullable = true)
 |-- PREÇO MÁXIMO DISTRIBUIÇÃO: string (nullable = true)
 |-- COEF DE VARIAÇÃO DISTRIBUIÇÃO: string (nullable = true)
 |-- MÊS: string (nullable = true)
 |-- ANO: string (nullable = true)
 |-- ID_RE

In [35]:
df_rj_acima_2_reais = df3.filter((df3['ESTADO'] == 'RIO DE JANEIRO') & (df3['PREÇO MÉDIO REVENDA'] > 2))
df_rj_acima_2_reais.groupBy("ESTADO").count().show()

+--------------+-----+
|        ESTADO|count|
+--------------+-----+
|RIO DE JANEIRO| 1694|
+--------------+-----+



### Tentativa 1 falhou
Converter a coluna para a tipagem certa não resolveu o problema

In [36]:
## Segunda tentativa de soluçao - sem converter a coluna, deixar explícito no filtro que a comparação é com dado float
# Isso será feito colocando '2.0' no lugar de apenas '2' no filtro, como feito anteriormente.
## Mais detalhes sobre conversão na demanda 13
df_rj_acima_2_reais = df2.filter((df2['ESTADO'] == 'RIO DE JANEIRO') & (df2['PREÇO MÉDIO REVENDA'] > 2.0))
df_rj_acima_2_reais.groupBy("ESTADO").count().show()

+--------------+-----+
|        ESTADO|count|
+--------------+-----+
|RIO DE JANEIRO| 3054|
+--------------+-----+



### Tentativa 2 deu certo, sendo então a soluçao aplicada.
Spark necessita que o dado seja bem explícito sobre o tipo, na filtragem

## Checando condição 2
Checando se o valor mínimo é acima de 2 reais.

> .agg(F.min("PREÇO MÉDIO REVENDA"))

O método agg vai agregar os dados. O F.min usa a função 'min' do módulo 'pyspark.sql.functions' (geralmente importado como F) para calcular o valor mínimo da coluna especificada.


> .collect()

No PySpark, as transformações (como filtros e agregações) são lazy ("preguiçosa"), o que significa que elas não são executadas até que uma ação seja chamada. '.collect()' é uma dessas ações e irá coletar os resultados das transformações e retorná-los como uma lista de linhas.
Algumas outras ações :
- .collect(): Retorna todos os registros do DataFrame ou RDD como uma lista de linhas.
- .count(): Retorna a contagem de registros no DataFrame ou RDD.
- .first(): Retorna a primeira linha do DataFrame ou RDD.
- .take(n): Retorna as primeiras n linhas do DataFrame ou RDD como uma lista.
- .show(n): Exibe as primeiras n linhas do DataFrame em um formato tabular. Útil para visualização rápida.

> [0][0]

Quando executamos agregação como .agg(F.min("PREÇO MÉDIO REVENDA")), o resultado é um DataFrame com apenas uma coluna contendo o valor mínimo.
Depois de coletar os resultados, preciso acessar o valor real dentro desse Dataframe. Como o resultado é uma lista de linhas e cada linha é uma lista de valores, usamos [0][0] para acessar a primeira linha e o primeiro valor dessa linha, que neste caso é o valor mínimo que quero buscar.



In [37]:
#Sem o [0][0], o print mostra uma mensagem grande.

print(df_rj_acima_2_reais.agg(F.min("PREÇO MÉDIO REVENDA")).collect())

[Row(min(PREÇO MÉDIO REVENDA)='2.001')]


In [38]:
## Checando condição2: Apenas PREÇO MÉDIO REVENDA > 2 reais
##  Checando se o valor mínimo é acima de 2 reais
##  Irei usar o [0][0] para capturar apenas o valor
df_rj_acima_2_reais.agg(F.min("PREÇO MÉDIO REVENDA")).collect()[0][0]

'2.001'

#Demanda 8
Selecionar os registros de postos do Rio de Janeiro OU com Preços Médio de Revenda acima de 2 reais

In [39]:
# Seguindo como aplicado na demanda anteiror coloquei o 2.0 também
df_rj_ou_acima_2_reais = df2.filter(
    (df2['ESTADO'] == 'RIO DE JANEIRO') |
    (df2['PREÇO MÉDIO REVENDA'] > 2.0)
)

### Farei um count da ocorrência de 'RIO DE JANEIRO' no meu resultado acima. Sendo um valor diferente de zero, o filtro fica validado para esse caso.

In [40]:
print(df_rj_ou_acima_2_reais.filter(df_rj_ou_acima_2_reais['ESTADO'] == 'RIO DE JANEIRO').count())

4263


### Semelhante ao caso anterior, sendo um valor diferente de zero, o filtro fica validado para esse caso.

In [41]:
print(df_rj_ou_acima_2_reais.filter(df_rj_ou_acima_2_reais['PREÇO MÉDIO REVENDA'] > 2).count())

43377


# Demanda 9
Selecionar os registros de postos do Rio de Janeiro OU do Maranhão com Preços Médio de Revenda entre 1 e 2 reais (incluso)

In [42]:
# Filtrando os registros
df_filtrado = df2.filter(
    ((df2['ESTADO'] == 'RIO DE JANEIRO') | (df2['ESTADO'] == 'MARANHAO')) &
    (F.col('PREÇO MÉDIO REVENDA') >= 1.0) &  # Lembrando de sinalizar o Float no filtro colocando ".0"
    (F.col('PREÇO MÉDIO REVENDA') <= 2.0)    # Lembrando de sinalizar o Float no filtro colocando ".0"
)

df_filtrado.show()


+------------+----------+--------+--------------+-----------+----------------------------+-----------------+-------------------+---------------------+--------------------+--------------------+--------------------+------------------------+------------------------+--------------------------+-------------------------+-------------------------+-----------------------------+---+----+------------+--------------------------+
|DATA INICIAL|DATA FINAL|  REGIÃO|        ESTADO|    PRODUTO|NÚMERO DE POSTOS PESQUISADOS|UNIDADE DE MEDIDA|PREÇO MÉDIO REVENDA|DESVIO PADRÃO REVENDA|PREÇO MÍNIMO REVENDA|PREÇO MÁXIMO REVENDA|MARGEM MÉDIA REVENDA|COEF DE VARIAÇÃO REVENDA|PREÇO MÉDIO DISTRIBUIÇÃO|DESVIO PADRÃO DISTRIBUIÇÃO|PREÇO MÍNIMO DISTRIBUIÇÃO|PREÇO MÁXIMO DISTRIBUIÇÃO|COEF DE VARIAÇÃO DISTRIBUIÇÃO|MÊS| ANO| ID_REGISTRO|PREÇO MÉDIO REVENDA(DÓLAR)|
+------------+----------+--------+--------------+-----------+----------------------------+-----------------+-------------------+---------------------+------

In [43]:
# Testar os estados
df_filtrado.groupby('ESTADO').count().show()

+--------------+-----+
|        ESTADO|count|
+--------------+-----+
|RIO DE JANEIRO| 1209|
|      MARANHAO|  598|
+--------------+-----+



In [44]:
# Calculando o mínimo e máximo do preço médio de revenda para testar
min_preco = df_filtrado.agg(F.min('PREÇO MÉDIO REVENDA')).collect()[0][0]
max_preco = df_filtrado.agg(F.max('PREÇO MÉDIO REVENDA')).collect()[0][0]

print(f"O menor preço médio de revenda é: {min_preco}")
print(f"O maior preço médio de revenda é: {max_preco}")

O menor preço médio de revenda é: 1.036
O maior preço médio de revenda é: 2.0


# Demanda 10
Selecionar os registros de postos de São Paulo ou do Rio de Janeiro com Gasolina Comum acima de 2 reais

In [45]:
# 1. Criar o filtro composto por cada condição em pyspark
selecao_1 = (df['ESTADO'] == 'SAO PAULO') | (df['ESTADO'] == 'RIO DE JANEIRO')
selecao_2 = df['PRODUTO'] == 'GASOLINA COMUM'
selecao_3 = df['PREÇO MÉDIO REVENDA'] > 2.0 # Use 2.0 para garantir comparação como float



In [46]:
# 2. Aplicar as condições juntas
df_resultado = df.filter(selecao_1 & selecao_2 & selecao_3)
df_resultado.show()

+------------+----------+-------+--------------+--------------+----------------------------+-----------------+-------------------+---------------------+--------------------+--------------------+--------------------+------------------------+------------------------+--------------------------+-------------------------+-------------------------+-----------------------------+---+----+
|DATA INICIAL|DATA FINAL| REGIÃO|        ESTADO|       PRODUTO|NÚMERO DE POSTOS PESQUISADOS|UNIDADE DE MEDIDA|PREÇO MÉDIO REVENDA|DESVIO PADRÃO REVENDA|PREÇO MÍNIMO REVENDA|PREÇO MÁXIMO REVENDA|MARGEM MÉDIA REVENDA|COEF DE VARIAÇÃO REVENDA|PREÇO MÉDIO DISTRIBUIÇÃO|DESVIO PADRÃO DISTRIBUIÇÃO|PREÇO MÍNIMO DISTRIBUIÇÃO|PREÇO MÁXIMO DISTRIBUIÇÃO|COEF DE VARIAÇÃO DISTRIBUIÇÃO|MÊS| ANO|
+------------+----------+-------+--------------+--------------+----------------------------+-----------------+-------------------+---------------------+--------------------+--------------------+--------------------+-----------------

In [47]:
# Garantindo que só tem os estados estipulados na condição
print(df_resultado.select('ESTADO').distinct().collect())

# Garantindo que só tem o produto estipulado na condição
print(df_resultado.select('PRODUTO').distinct().collect())

# Garantindo que o menor valor presente no resultado é igual ou maior que 2
min_preco = df_resultado.agg(F.min('PREÇO MÉDIO REVENDA')).collect()[0][0]
print(min_preco)

[Row(ESTADO='RIO DE JANEIRO'), Row(ESTADO='SAO PAULO')]
[Row(PRODUTO='GASOLINA COMUM')]
2.028


# Demanda 11
Achar a data referente às últimas 2 semanas , considerando que a data atual sempre mudará a cada dia diferente de processamento.


In [48]:
# Determinando a data de hoje e a data de 2 semanas atrás


# calculando a data de hoje - muda de acordo com o dia em que for processado
data_hoje = current_date()


# o data_sub calcula a diferença entre duas datas em dias
data_duas_semanas = date_sub(data_hoje, 14)

# Criando um DataFrame com uma linha qualquer para armazenar as datas depois
df_temp = spark.createDataFrame([(1,)], ["dummy"])

# Adicionando colunas com a data atual e a data de duas semanas atrás
df_temp = df_temp.withColumn("data_atual", current_date()) \
                 .withColumn("data_duas_semanas_atras", date_sub(current_date(), 14))

# Verificando
df_temp.show()


+-----+----------+-----------------------+
|dummy|data_atual|data_duas_semanas_atras|
+-----+----------+-----------------------+
|    1|2023-10-29|             2023-10-15|
+-----+----------+-----------------------+



# Demanda 12
Criar uma coluna com os os dias que se passaram entre a data final e a data inicial

In [49]:
# Checando o tipo das datas
df2.printSchema()

root
 |-- DATA INICIAL: string (nullable = true)
 |-- DATA FINAL: string (nullable = true)
 |-- REGIÃO: string (nullable = true)
 |-- ESTADO: string (nullable = true)
 |-- PRODUTO: string (nullable = false)
 |-- NÚMERO DE POSTOS PESQUISADOS: string (nullable = true)
 |-- UNIDADE DE MEDIDA: string (nullable = true)
 |-- PREÇO MÉDIO REVENDA: string (nullable = true)
 |-- DESVIO PADRÃO REVENDA: string (nullable = true)
 |-- PREÇO MÍNIMO REVENDA: string (nullable = true)
 |-- PREÇO MÁXIMO REVENDA: string (nullable = true)
 |-- MARGEM MÉDIA REVENDA: string (nullable = true)
 |-- COEF DE VARIAÇÃO REVENDA: string (nullable = true)
 |-- PREÇO MÉDIO DISTRIBUIÇÃO: string (nullable = true)
 |-- DESVIO PADRÃO DISTRIBUIÇÃO: string (nullable = true)
 |-- PREÇO MÍNIMO DISTRIBUIÇÃO: string (nullable = true)
 |-- PREÇO MÁXIMO DISTRIBUIÇÃO: string (nullable = true)
 |-- COEF DE VARIAÇÃO DISTRIBUIÇÃO: string (nullable = true)
 |-- MÊS: string (nullable = true)
 |-- ANO: string (nullable = true)
 |-- ID_R

In [50]:
# Convertendo strings para datas no formato "yyyy-MM-dd" com to_date
df2 = df2.withColumn("DATA FINAL", to_date(df2["DATA FINAL"], 'yyyy-MM-dd'))
df2 = df2.withColumn("DATA INICIAL", to_date(df2["DATA INICIAL"], 'yyyy-MM-dd'))

# Criar nova coluna com a diferença entre as datas com datediff (em dias)
df2 = df2.withColumn("dias_passados", datediff(df2["DATA FINAL"], df2["DATA INICIAL"]))

# Mostrar o resultado
df2.select("DATA FINAL", "DATA INICIAL", "dias_passados").show()

+----------+------------+-------------+
|DATA FINAL|DATA INICIAL|dias_passados|
+----------+------------+-------------+
|2004-05-15|  2004-05-09|            6|
|2004-05-15|  2004-05-09|            6|
|2004-05-15|  2004-05-09|            6|
|2004-05-15|  2004-05-09|            6|
|2004-05-15|  2004-05-09|            6|
|2004-05-15|  2004-05-09|            6|
|2004-05-15|  2004-05-09|            6|
|2004-05-15|  2004-05-09|            6|
|2004-05-15|  2004-05-09|            6|
|2004-05-15|  2004-05-09|            6|
|2004-05-15|  2004-05-09|            6|
|2004-05-15|  2004-05-09|            6|
|2004-05-15|  2004-05-09|            6|
|2004-05-15|  2004-05-09|            6|
|2004-05-15|  2004-05-09|            6|
|2004-05-15|  2004-05-09|            6|
|2004-05-15|  2004-05-09|            6|
|2004-05-15|  2004-05-09|            6|
|2004-05-15|  2004-05-09|            6|
|2004-05-15|  2004-05-09|            6|
+----------+------------+-------------+
only showing top 20 rows



# Demanda 13
Corrigir todos os dados numéricos que estão com tipagem object, para o correto.

In [51]:
# 1) Checando os tipos dados
df2.printSchema()

root
 |-- DATA INICIAL: date (nullable = true)
 |-- DATA FINAL: date (nullable = true)
 |-- REGIÃO: string (nullable = true)
 |-- ESTADO: string (nullable = true)
 |-- PRODUTO: string (nullable = false)
 |-- NÚMERO DE POSTOS PESQUISADOS: string (nullable = true)
 |-- UNIDADE DE MEDIDA: string (nullable = true)
 |-- PREÇO MÉDIO REVENDA: string (nullable = true)
 |-- DESVIO PADRÃO REVENDA: string (nullable = true)
 |-- PREÇO MÍNIMO REVENDA: string (nullable = true)
 |-- PREÇO MÁXIMO REVENDA: string (nullable = true)
 |-- MARGEM MÉDIA REVENDA: string (nullable = true)
 |-- COEF DE VARIAÇÃO REVENDA: string (nullable = true)
 |-- PREÇO MÉDIO DISTRIBUIÇÃO: string (nullable = true)
 |-- DESVIO PADRÃO DISTRIBUIÇÃO: string (nullable = true)
 |-- PREÇO MÍNIMO DISTRIBUIÇÃO: string (nullable = true)
 |-- PREÇO MÁXIMO DISTRIBUIÇÃO: string (nullable = true)
 |-- COEF DE VARIAÇÃO DISTRIBUIÇÃO: string (nullable = true)
 |-- MÊS: string (nullable = true)
 |-- ANO: string (nullable = true)
 |-- ID_REGIS

In [52]:
# 2. Verificando possíveis valores anômalos em uma das colunas que será transformada
df2.groupBy("PREÇO MÉDIO DISTRIBUIÇÃO").count().orderBy("count", ascending=False).show()

# identificados 3400 valores anômalos como '-'

+------------------------+-----+
|PREÇO MÉDIO DISTRIBUIÇÃO|count|
+------------------------+-----+
|                       -| 3400|
|                   1.283|   96|
|                   1.681|   92|
|                   1.684|   92|
|                   1.683|   90|
|                   1.682|   88|
|                    1.25|   88|
|                   2.183|   85|
|                   1.688|   85|
|                   2.173|   81|
|                   1.685|   80|
|                   2.278|   80|
|                   1.785|   80|
|                    1.68|   79|
|                   2.252|   79|
|                   1.706|   78|
|                    2.27|   78|
|                    1.93|   78|
|                   2.272|   78|
|                   1.705|   77|
+------------------------+-----+
only showing top 20 rows



In [53]:
# 3) Convertendo as colunas

cols_convert = [
   'MARGEM MÉDIA REVENDA',
    'PREÇO MÉDIO DISTRIBUIÇÃO',
    'DESVIO PADRÃO DISTRIBUIÇÃO',
    'PREÇO MÍNIMO DISTRIBUIÇÃO',
    'PREÇO MÁXIMO DISTRIBUIÇÃO',
    'COEF DE VARIAÇÃO DISTRIBUIÇÃO'
]

for coluna in cols_convert:
    df2 = df2.withColumn(coluna, df2[coluna].cast(FloatType()))


## Nota sobre conversão e nulls no pyspark
#### O método cast é usado para converter o tipo de coluna no PySpark. Neste caso, estou convertendo as colunas especificadas para FloatType. Se algum valor não puder ser convertido (por exemplo, se houver uma string que não pode ser transformada em float, como no caso do valor anômalo encontrado), esse valor será substituído por null no PySpark diretamente, não sendo necessário um tratamento para isso , como no Python com coerce.

In [54]:
# Checando tipagem após transformação
df2.printSchema()

root
 |-- DATA INICIAL: date (nullable = true)
 |-- DATA FINAL: date (nullable = true)
 |-- REGIÃO: string (nullable = true)
 |-- ESTADO: string (nullable = true)
 |-- PRODUTO: string (nullable = false)
 |-- NÚMERO DE POSTOS PESQUISADOS: string (nullable = true)
 |-- UNIDADE DE MEDIDA: string (nullable = true)
 |-- PREÇO MÉDIO REVENDA: string (nullable = true)
 |-- DESVIO PADRÃO REVENDA: string (nullable = true)
 |-- PREÇO MÍNIMO REVENDA: string (nullable = true)
 |-- PREÇO MÁXIMO REVENDA: string (nullable = true)
 |-- MARGEM MÉDIA REVENDA: float (nullable = true)
 |-- COEF DE VARIAÇÃO REVENDA: string (nullable = true)
 |-- PREÇO MÉDIO DISTRIBUIÇÃO: float (nullable = true)
 |-- DESVIO PADRÃO DISTRIBUIÇÃO: float (nullable = true)
 |-- PREÇO MÍNIMO DISTRIBUIÇÃO: float (nullable = true)
 |-- PREÇO MÁXIMO DISTRIBUIÇÃO: float (nullable = true)
 |-- COEF DE VARIAÇÃO DISTRIBUIÇÃO: float (nullable = true)
 |-- MÊS: string (nullable = true)
 |-- ANO: string (nullable = true)
 |-- ID_REGISTRO: s

## Contagem de valores Nulos no Pyspark

#### Funções
- `sum`: Função agregada que retorna a soma dos valores em uma coluna.
- `when`: Função condicional que permite especificar ações com base em uma condição. É frequentemente usada em conjunto com outras funções para definir transformações condicionais.


Com essas funções importadas, consigo criar uma estrutura para contar os valores nulos:


O processo consiste em:
- **Checagem da Condição Nula**: `F.col(c).isNull()` Checa se o valor na coluna `c` é nulo.
- **Atribuição de Valores com base na Condição**: `F.when(col(c).isNull(), 1)` usa o `when` para retornar 1 quando o valor na coluna `c` é nulo. Se não for nulo, não retorna nada .
- **Agregação**: `F.sum(...)` soma todos os 1s (que representam valores nulos) na coluna.
- **Manter o nome original das colunas** com o `alias`: Após a agregação para contar valores nulos, uso a função `.alias(c)` para (re)nomear o resultado. No contexto do código, ele renomeia a coluna resultante da contagem agregada para ter o mesmo nome que a coluna original.
- **Iteração por todas as Colunas**: `[F.sum(F.when(F.col(c).isNull(), 1)).alias(c) for c in df2.columns]` usa uma iteração pelas colunas para aplicar o processo acima a todas as colunas em `df2`.
- **Seleção dos Resultados**: `df2.select(...)` seleciona os valores calculados para criar um novo DataFrame que contém a contagem de valores nulos para cada coluna.

In [55]:
# Calculando os nulos por coluna
nulos_por_coluna = df2.select([F.sum(F.when(F.col(c).isNull(),1)).alias(c) for c in df2.columns])

nulos_por_coluna.show()


+------------+----------+------+------+-------+----------------------------+-----------------+-------------------+---------------------+--------------------+--------------------+--------------------+------------------------+------------------------+--------------------------+-------------------------+-------------------------+-----------------------------+----+----+-----------+--------------------------+-------------+
|DATA INICIAL|DATA FINAL|REGIÃO|ESTADO|PRODUTO|NÚMERO DE POSTOS PESQUISADOS|UNIDADE DE MEDIDA|PREÇO MÉDIO REVENDA|DESVIO PADRÃO REVENDA|PREÇO MÍNIMO REVENDA|PREÇO MÁXIMO REVENDA|MARGEM MÉDIA REVENDA|COEF DE VARIAÇÃO REVENDA|PREÇO MÉDIO DISTRIBUIÇÃO|DESVIO PADRÃO DISTRIBUIÇÃO|PREÇO MÍNIMO DISTRIBUIÇÃO|PREÇO MÁXIMO DISTRIBUIÇÃO|COEF DE VARIAÇÃO DISTRIBUIÇÃO| MÊS| ANO|ID_REGISTRO|PREÇO MÉDIO REVENDA(DÓLAR)|dias_passados|
+------------+----------+------+------+-------+----------------------------+-----------------+-------------------+---------------------+--------------------

Foram identificadas as seugintes colunas contendo valores NULL : MARGEM MÉDIA REVENDA , PREÇO MÉDIO DISTRIBUIÇÃO, DESVIO PADRÃO DISTRIBUIÇÃO, PREÇO MÍNIMO DISTRIBUIÇÃO , PREÇO MÁXIMO DISTRIBUIÇÃO, COEF DE VARIAÇÃO DISTRIBUIÇÃO.

A pŕoxima etapa é realizar a limpeza e tratamento desses dados.

### Limpeza de dados

In [56]:

# 1) Visualização dos registros com valores nulos
## Checando os valores nulos de uma coluna especifica convertida

df_nulos = df2.filter(F.col('PREÇO MÉDIO DISTRIBUIÇÃO').isNull())

## Mostrando os registros filtrados
df_nulos.show()

+------------+----------+--------+--------+-----------+----------------------------+-----------------+-------------------+---------------------+--------------------+--------------------+--------------------+------------------------+------------------------+--------------------------+-------------------------+-------------------------+-----------------------------+---+----+-------------+--------------------------+-------------+
|DATA INICIAL|DATA FINAL|  REGIÃO|  ESTADO|    PRODUTO|NÚMERO DE POSTOS PESQUISADOS|UNIDADE DE MEDIDA|PREÇO MÉDIO REVENDA|DESVIO PADRÃO REVENDA|PREÇO MÍNIMO REVENDA|PREÇO MÁXIMO REVENDA|MARGEM MÉDIA REVENDA|COEF DE VARIAÇÃO REVENDA|PREÇO MÉDIO DISTRIBUIÇÃO|DESVIO PADRÃO DISTRIBUIÇÃO|PREÇO MÍNIMO DISTRIBUIÇÃO|PREÇO MÁXIMO DISTRIBUIÇÃO|COEF DE VARIAÇÃO DISTRIBUIÇÃO|MÊS| ANO|  ID_REGISTRO|PREÇO MÉDIO REVENDA(DÓLAR)|dias_passados|
+------------+----------+--------+--------+-----------+----------------------------+-----------------+-------------------+----------------

# Demanda 14 - Tratamento de NULL
Preencher os valores numéricos nulos por zero
Preencher os valores nulos por zero é uma das estratégias, mas não a única e nem recomendada para algumas das colunas (ex: a MARGEM MÉDIA REVENDA seria melhor preencher com a média dos valores). Para o exercício, seguirei com o proposto na demanda.

In [57]:
# Lista de colunas que você deseja substituir os nulos por zero
col_nulos =  ['MARGEM MÉDIA REVENDA',
              'PREÇO MÉDIO DISTRIBUIÇÃO',
              'DESVIO PADRÃO DISTRIBUIÇÃO',
              'PREÇO MÍNIMO DISTRIBUIÇÃO',
              'PREÇO MÁXIMO DISTRIBUIÇÃO',
              'COEF DE VARIAÇÃO DISTRIBUIÇÃO']

# Usando fillna para preencher os NaN com zero - 'fillna' similar ao pandas
df2 = df2.fillna({coluna:0 for coluna in col_nulos})

In [58]:
# Checagem
# Calculando e mostrando a quantidade de nulos por coluna - espera-se ver NULL em todas as cols agora, pois NULL foi substituído por zero
nulos_por_coluna = df2.select([F.sum(F.when(F.col(c).isNull(), 1)).alias(c) for c in df2.columns])
nulos_por_coluna.show()

+------------+----------+------+------+-------+----------------------------+-----------------+-------------------+---------------------+--------------------+--------------------+--------------------+------------------------+------------------------+--------------------------+-------------------------+-------------------------+-----------------------------+----+----+-----------+--------------------------+-------------+
|DATA INICIAL|DATA FINAL|REGIÃO|ESTADO|PRODUTO|NÚMERO DE POSTOS PESQUISADOS|UNIDADE DE MEDIDA|PREÇO MÉDIO REVENDA|DESVIO PADRÃO REVENDA|PREÇO MÍNIMO REVENDA|PREÇO MÁXIMO REVENDA|MARGEM MÉDIA REVENDA|COEF DE VARIAÇÃO REVENDA|PREÇO MÉDIO DISTRIBUIÇÃO|DESVIO PADRÃO DISTRIBUIÇÃO|PREÇO MÍNIMO DISTRIBUIÇÃO|PREÇO MÁXIMO DISTRIBUIÇÃO|COEF DE VARIAÇÃO DISTRIBUIÇÃO| MÊS| ANO|ID_REGISTRO|PREÇO MÉDIO REVENDA(DÓLAR)|dias_passados|
+------------+----------+------+------+-------+----------------------------+-----------------+-------------------+---------------------+--------------------

## Demandas 15 e 16 puladas, não fazem sentido para o spark

# Demanda 17
Identificar o maior preço médio de revenda para cada região

In [59]:
# Convertendo a coluna de string para float
df2 = df2.withColumn('PREÇO MÉDIO REVENDA', df2['PREÇO MÉDIO REVENDA'].cast(FloatType()))



In [60]:
# Agrupando as linhas do df de acordo com as regiões pelo critério de MAX da col 'PREÇO MÉDIO REVENDA'
# Coloquei também o 'alias' para renomear a coluna resultante
# Agrupando as linhas do dataframe de acordo com as regiões pelo critério de MAX da coluna 'PREÇO MÉDIO REVENDA'
max_revenda_regiao = df2.groupBy('REGIÃO').agg(F.max('PREÇO MÉDIO REVENDA').alias('Máximo PREÇO MÉDIO REVENDA'))

# Exibindo o resultado
max_revenda_regiao.show()

+------------+--------------------------+
|      REGIÃO|Máximo PREÇO MÉDIO REVENDA|
+------------+--------------------------+
|CENTRO OESTE|                    99.357|
|       NORTE|                     87.27|
|    NORDESTE|                     79.63|
|         SUL|                    72.088|
|     SUDESTE|                    79.192|
+------------+--------------------------+



# Demanda 18
Identificar o menor preço médio de revenda por região e por data final

In [61]:
# Utilizei pyspark_min para referir à função min() do PySpark, evitando assim o conflito com a função nativa min() do Python.
# Agrupando o df2 pelas colunas 'REGIÃO' e 'DATA FINAL'
# Agregando o menor valor da coluna 'PREÇO MÉDIO REVENDA' para cada grupo
# Renomeando o resultado da agregação para 'Menor PREÇO MÉDIO REVENDA'
agrupamento = (df3.groupBy('REGIÃO', 'DATA FINAL')
               .agg(F.min('PREÇO MÉDIO REVENDA').alias('Menor PREÇO MÉDIO REVENDA'))
               .orderBy('REGIÃO', 'DATA FINAL'))

agrupamento.show()

+------------+----------+-------------------------+
|      REGIÃO|DATA FINAL|Menor PREÇO MÉDIO REVENDA|
+------------+----------+-------------------------+
|CENTRO OESTE|2004-05-15|                    1.099|
|CENTRO OESTE|2004-05-22|                    1.099|
|CENTRO OESTE|2004-05-29|                    1.099|
|CENTRO OESTE|2004-06-05|                    1.099|
|CENTRO OESTE|2004-06-12|                    1.099|
|CENTRO OESTE|2004-06-19|                    1.099|
|CENTRO OESTE|2004-06-26|                    1.097|
|CENTRO OESTE|2004-07-03|                    1.099|
|CENTRO OESTE|2004-07-10|                    1.099|
|CENTRO OESTE|2004-07-17|                    1.144|
|CENTRO OESTE|2004-07-24|                    1.144|
|CENTRO OESTE|2004-07-31|                    1.144|
|CENTRO OESTE|2004-08-07|                    1.144|
|CENTRO OESTE|2004-08-14|                    1.144|
|CENTRO OESTE|2004-08-21|                    1.144|
|CENTRO OESTE|2004-08-28|                    1.144|
|CENTRO OEST

In [62]:
# Checando se todas regiões estão presentes
agrupamento.groupBy('REGIÃO').count().show()

+------------+-----+
|      REGIÃO|count|
+------------+-----+
|CENTRO OESTE|  785|
|       NORTE|  785|
|    NORDESTE|  785|
|         SUL|  785|
|     SUDESTE|  785|
+------------+-----+



# Demanda 19
Demonstrar o min e max do 'PREÇO MÉDIO REVENDA' para cada região

In [63]:
# Agrupando pelo 'REGIÃO' e calculando o menor e maior valor de 'PREÇO MÉDIO REVENDA'
min_max_pmr = df.groupBy('REGIÃO').agg(
    F.min('PREÇO MÉDIO REVENDA').alias('min_PREÇO MÉDIO REVENDA'),
    F.max('PREÇO MÉDIO REVENDA').alias('max_PREÇO MÉDIO REVENDA')
)

# Exibindo o resultado
min_max_pmr.show()

+------------+-----------------------+-----------------------+
|      REGIÃO|min_PREÇO MÉDIO REVENDA|max_PREÇO MÉDIO REVENDA|
+------------+-----------------------+-----------------------+
|CENTRO OESTE|                  1.097|                 99.357|
|    NORDESTE|                  0.975|                  79.63|
|       NORTE|                  1.219|                  87.27|
|     SUDESTE|      0.765999999999999|                 79.192|
|         SUL|                  0.922|                 72.088|
+------------+-----------------------+-----------------------+



# Demanda 20
Criar um ranking das regiões com maior preço médio de revenda.


In [64]:
# 1. Calcular o Máximo 'preço médio' de revenda por região.
# 2. Ordenar valores pelo valor agrupado em ordem descendente
max_pmv_regiao = (df2.groupBy('REGIÃO')
                  .agg(F.max('PREÇO MÉDIO REVENDA').alias('MAX_PREÇO MÉDIO REVENDA'))
                  .orderBy(F.col('MAX_PREÇO MÉDIO REVENDA').desc()))

# Exibindo o resultado
max_pmv_regiao.show()

+------------+-----------------------+
|      REGIÃO|MAX_PREÇO MÉDIO REVENDA|
+------------+-----------------------+
|CENTRO OESTE|                 99.357|
|       NORTE|                  87.27|
|    NORDESTE|                  79.63|
|     SUDESTE|                 79.192|
|         SUL|                 72.088|
+------------+-----------------------+



### Adicionando Ranking usando monotonically_increasing_id()

Para criar um ranking mais simplificado, usarei a função `monotonically_increasing_id()`

O processo é:

1. **Ordenação**: Primeiro, ordenei  df2 de acordo com o critério desejado. No caso, ordenei pelo 'PREÇO MÉDIO REVENDA' em ordem decrescente.
2. **Adicionar IDs Monotonicamente Crescentes**: Após a ordenação, usei `monotonically_increasing_id()` para atribuir IDs. Como já foi ordenado os dados anteriormente, esses IDs servirão como o ranking.
3. **Exibição do Resultado**: Por fim, mostro o df2 resultante com a coluna de ranking adicionada.

In [65]:
# Ordenando o DataFrame
max_pmv_regiao = max_pmv_regiao.orderBy(F.col('MAX_PREÇO MÉDIO REVENDA').desc())

# Adicionando coluna de ranking usando monotonically_increasing_id
# Adicionado o critério "+1", pois um ranking não pode ser inicado de zero
max_pmv_regiao = max_pmv_regiao.withColumn("RANKING", monotonically_increasing_id() + 1)

# Exibindo o resultado
max_pmv_regiao.show()


+------------+-----------------------+-------+
|      REGIÃO|MAX_PREÇO MÉDIO REVENDA|RANKING|
+------------+-----------------------+-------+
|CENTRO OESTE|                 99.357|      1|
|       NORTE|                  87.27|      2|
|    NORDESTE|                  79.63|      3|
|     SUDESTE|                 79.192|      4|
|         SUL|                 72.088|      5|
+------------+-----------------------+-------+



# Demanda 21
Para as análises futuras, a área de negócio precisa de dados que contemplem apenas anos cheios (Jan-Dez). Exclua os anos que não corresponderem a esse critério

In [66]:
# 1. Obtendo a contagem de meses únicos por ano
meses_por_ano = df.groupBy("ANO").agg(F.countDistinct("MÊS").alias("Quantidade Meses"))
meses_por_ano.show()

+----+----------------+
| ANO|Quantidade Meses|
+----+----------------+
|2016|              12|
|2012|              12|
|2019|               6|
|2017|              12|
|2014|              12|
|2013|              12|
|2005|              12|
|2009|              12|
|2018|              12|
|2006|              12|
|2004|               8|
|2011|              12|
|2008|              12|
|2007|              12|
|2015|              12|
|2010|              12|
+----+----------------+



In [67]:
# 2. Removendo os dados da analise dos anos de 2004 e 2019, que não atendem a demanda
df = df.filter(~F.col("ANO").isin([2019, 2004]))


In [68]:
# Checando
df.groupBy('ANO').count().orderBy('ANO').show()

+----+-----+
| ANO|count|
+----+-----+
|2005| 6186|
|2006| 6598|
|2007| 6475|
|2008| 6453|
|2009| 6232|
|2010| 6489|
|2011| 6487|
|2012| 6634|
|2013| 7921|
|2014| 7932|
|2015| 7771|
|2016| 7873|
|2017| 7990|
|2018| 7823|
+----+-----+



# Demanda 22
Qual a proporção de postos pesquisados para cada tipo de combustível, em cada região ?

In [69]:
# 1. Agrupando por 'REGIÃO' e somando o número de postos pesquisados [TOTAL]
total_postos_por_regiao = df.groupBy("REGIÃO").agg(F.sum("NÚMERO DE POSTOS PESQUISADOS").alias("Total Postos"))
total_postos_por_regiao.show()


# O resultado foi diferente do python, porque não formam removidas as linhas que contínham zero

+------------+------------+
|      REGIÃO|Total Postos|
+------------+------------+
|CENTRO OESTE|   1948303.0|
|       NORTE|   1521359.0|
|    NORDESTE|   4199760.0|
|         SUL|   4166977.0|
|     SUDESTE| 1.1102769E7|
+------------+------------+



In [70]:

# 2. Agrupando por 'REGIÃO' e 'PRODUTO' e somando o número de postos pesquisados [PARTE]
postos_por_regiao_produto = df.groupBy("REGIÃO", "PRODUTO") \
                             .agg(F.sum("NÚMERO DE POSTOS PESQUISADOS").alias("Parte Postos")) \
                             .orderBy("REGIÃO")

postos_por_regiao_produto.show(100)  # coloquei 100 somente para aparecerem todas as regiões

+------------+----------------+------------+
|      REGIÃO|         PRODUTO|Parte Postos|
+------------+----------------+------------+
|CENTRO OESTE|             GLP|    593192.0|
|CENTRO OESTE|  GASOLINA COMUM|    443923.0|
|CENTRO OESTE|     ÓLEO DIESEL|    377377.0|
|CENTRO OESTE| ÓLEO DIESEL S10|     86085.0|
|CENTRO OESTE|ETANOL HIDRATADO|    443547.0|
|CENTRO OESTE|             GNV|      4179.0|
|    NORDESTE|             GNV|     87446.0|
|    NORDESTE| ÓLEO DIESEL S10|    258937.0|
|    NORDESTE|     ÓLEO DIESEL|    882007.0|
|    NORDESTE|ETANOL HIDRATADO|   1004012.0|
|    NORDESTE|             GLP|    835940.0|
|    NORDESTE|  GASOLINA COMUM|   1131418.0|
|       NORTE| ÓLEO DIESEL S10|     86258.0|
|       NORTE|  GASOLINA COMUM|    393678.0|
|       NORTE|             GNV|       564.0|
|       NORTE|ETANOL HIDRATADO|    231449.0|
|       NORTE|     ÓLEO DIESEL|    344935.0|
|       NORTE|             GLP|    464475.0|
|     SUDESTE|  GASOLINA COMUM|   3002336.0|
|     SUDE

### Passo 1: Primeiro, fiz um join entre dois conjuntos de dados:

- postos_por_regiao_produto, que contém a quantidade de postos por região e tipo de produto.
- total_postos_por_regiao, que traz o total de postos em cada região.

O método join é uma operação de álgebra relacional usada para combinar registros de duas ou mais tabelas em um banco de dados relacional ou DataFrames em plataformas de processamento de dados como o PySpark. A combinação é feita com base em uma ou mais colunas-chave comuns no caso "REGIÃO"

### Passo 2: Criei uma nova coluna chamada "Proporção". Ela divide o número de postos de um tipo específico de produto pelo total de postos daquela região.

### Passo 3: Depois de fazer essa conta, organizei os resultados e mostrei apenas as informações mais importantes. Para isso, usei o método .select.

In [71]:
# Calculando a proporção total diretamente após o join
proporcao_postos = postos_por_regiao_produto.join(total_postos_por_regiao, "REGIÃO") \
                                            .withColumn("Proporção", F.col("Parte Postos") / F.col("Total Postos")) \
                                            .select("REGIÃO", "PRODUTO", "Proporção")

proporcao_postos.show(100)


+------------+----------------+--------------------+
|      REGIÃO|         PRODUTO|           Proporção|
+------------+----------------+--------------------+
|CENTRO OESTE|             GNV|0.002144943573971...|
|CENTRO OESTE|ETANOL HIDRATADO| 0.22765812093909418|
|CENTRO OESTE| ÓLEO DIESEL S10| 0.04418460578257078|
|CENTRO OESTE|     ÓLEO DIESEL| 0.19369523118323997|
|CENTRO OESTE|  GASOLINA COMUM| 0.22785110940136108|
|CENTRO OESTE|             GLP|  0.3044659891197622|
|       NORTE|             GLP| 0.30530269318418596|
|       NORTE|     ÓLEO DIESEL| 0.22672820813496355|
|       NORTE|ETANOL HIDRATADO| 0.15213305998124046|
|       NORTE|             GNV|3.707211775787306E-4|
|       NORTE|  GASOLINA COMUM| 0.25876732579226863|
|       NORTE| ÓLEO DIESEL S10| 0.05669799172976266|
|    NORDESTE|  GASOLINA COMUM|  0.2694006324170905|
|    NORDESTE|             GLP| 0.19904470731660856|
|    NORDESTE|ETANOL HIDRATADO| 0.23906413699830467|
|    NORDESTE|     ÓLEO DIESEL| 0.210013667447

# Demanda 23
Como os preços da Gasolina Comum em São Paulo variaram em 2018?

In [72]:
# 1. Filtrando o DataFrame
filtro_gasolina_sp_2018 = (
    (F.col('PRODUTO') == 'GASOLINA COMUM') &
    (F.col('ESTADO') == 'SAO PAULO') &
    (F.col('ANO') == 2018)
)

gasolina_sp_2018 = df.filter(filtro_gasolina_sp_2018)

In [73]:
# 2. Ordenando os dados pela 'DATA FINAL' para garantir a sequência correta das datas
gasolina_sp_2018 = gasolina_sp_2018.orderBy('DATA FINAL')


In [74]:
# 3. Atualizando o df `gasolina_sp_2018`
# subtraindo o 'PMR' atual pela função `lag`, que pega o valor do 'PMR' da linha anterior.
# O`over` define como a função `lag` deve se comportar, no caso, ORDENEI pela 'DATA FINAL'
# para garantir que estou pegando o 'PREÇO MÉDIO REVENDA' da data anterior correta.

gasolina_sp_2018 = gasolina_sp_2018 \
.withColumn("VARIAÇÃO",
F.col('PREÇO MÉDIO REVENDA') - F.lag('PREÇO MÉDIO REVENDA').over(Window.orderBy('DATA FINAL'))
)

In [75]:
# Ajuste, selecionando as colunas relevantes
df_variacao_sp_2018 = gasolina_sp_2018.select(
    'ESTADO', 'PRODUTO', 'ANO', 'DATA FINAL', 'PREÇO MÉDIO REVENDA', 'VARIAÇÃO'
)
df_variacao_sp_2018.show()

+---------+--------------+----+----------+-------------------+--------------------+
|   ESTADO|       PRODUTO| ANO|DATA FINAL|PREÇO MÉDIO REVENDA|            VARIAÇÃO|
+---------+--------------+----+----------+-------------------+--------------------+
|SAO PAULO|GASOLINA COMUM|2018|2018-01-13|              3.988|                NULL|
|SAO PAULO|GASOLINA COMUM|2018|2018-01-20|              4.002| 0.01399999999999979|
|SAO PAULO|GASOLINA COMUM|2018|2018-01-27|              4.006|0.004000000000000448|
|SAO PAULO|GASOLINA COMUM|2018|2018-02-03|              4.019|  0.0129999999999999|
|SAO PAULO|GASOLINA COMUM|2018|2018-02-10|              4.004|-0.01500000000000...|
|SAO PAULO|GASOLINA COMUM|2018|2018-02-17|              4.014|0.010000000000000675|
|SAO PAULO|GASOLINA COMUM|2018|2018-02-24|              4.008|-0.00600000000000...|
|SAO PAULO|GASOLINA COMUM|2018|2018-03-03|              3.992|-0.01600000000000...|
|SAO PAULO|GASOLINA COMUM|2018|2018-03-10|               3.97|-0.02199999999

# Demanda 24
Como os preços da Gasolina Comum e do Etanol em São Paulo variaram em 2018?

In [76]:
# Filtrando o df para Gasolina Comum e Etanol em São Paulo no ano de 2018
filtro_combustiveis_sp_2018 = (
    df['PRODUTO'].isin(['GASOLINA COMUM', 'ETANOL HIDRATADO']) &
    (df['ESTADO'] == 'SAO PAULO') &
    (df['ANO'] == 2018)
)
combustiveis_sp_2018 = df.filter(filtro_combustiveis_sp_2018)

In [77]:
# Particionando os dados por produtos, para calcular a variação de cada um de acordo
# com suas respectivas datas finais ordenadas
window = Window.partitionBy('PRODUTO').orderBy('DATA FINAL')

# Criando a variação do preço em relação ao preço do dia anterior, para cada produto
combustiveis_sp_2018 = combustiveis_sp_2018.withColumn("VARIAÇÃO",
    F.col('PREÇO MÉDIO REVENDA') - F.lag('PREÇO MÉDIO REVENDA').over(window)
)

In [78]:
#Exibir resultados.
combustiveis_sp_2018.select(['ESTADO', 'PRODUTO', 'ANO', 'DATA FINAL', 'PREÇO MÉDIO REVENDA', 'VARIAÇÃO']).show()

+---------+----------------+----+----------+-------------------+--------------------+
|   ESTADO|         PRODUTO| ANO|DATA FINAL|PREÇO MÉDIO REVENDA|            VARIAÇÃO|
+---------+----------------+----+----------+-------------------+--------------------+
|SAO PAULO|ETANOL HIDRATADO|2018|2018-01-13|              2.833|                NULL|
|SAO PAULO|ETANOL HIDRATADO|2018|2018-01-20|              2.868|  0.0349999999999997|
|SAO PAULO|ETANOL HIDRATADO|2018|2018-01-27|              2.874|0.006000000000000227|
|SAO PAULO|ETANOL HIDRATADO|2018|2018-02-03|              2.891|0.016999999999999904|
|SAO PAULO|ETANOL HIDRATADO|2018|2018-02-10|              2.884|-0.00700000000000...|
|SAO PAULO|ETANOL HIDRATADO|2018|2018-02-17|              2.892|0.008000000000000007|
|SAO PAULO|ETANOL HIDRATADO|2018|2018-02-24|              2.886|-0.00599999999999...|
|SAO PAULO|ETANOL HIDRATADO|2018|2018-03-03|              2.874|-0.01200000000000001|
|SAO PAULO|ETANOL HIDRATADO|2018|2018-03-10|          

In [79]:
#Checando
combustiveis_sp_2018.groupBy('PRODUTO').count().show()

+----------------+-----+
|         PRODUTO|count|
+----------------+-----+
|ETANOL HIDRATADO|   52|
|  GASOLINA COMUM|   52|
+----------------+-----+

