In [54]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [55]:
# Aqui serão usados DataFrames pra mexer nos dados

spark = (
    SparkSession
    .builder
    .master("local[*]") # usa nosso processador com todas as threads disponíveis
    .appName('spark_dataframe_api')
    .getOrCreate()
)

In [56]:
# dados obtidos em:
# https://dados.gov.br/dados/conjuntos-dados/serie-historica-de-precos-de-combustiveis-e-de-glp?source=post_page-----4185005771e5---------------------------------------
# csv de outubro/2023

df = (
    spark
    .read
    .option('delimiter', ';') # delimitador será ;
    .option('header', 'true') # a primeira linha do csv será o cabeçalho
    .option('inferSchema', 'true') # infere o schema automaticamente (nomes das colunas e tipos de dados)
    .option('encoding', 'ISO-8859-1') # às vezes é dado, às vezes precisamos fazer testes até descobrir
    .csv('./dados/precos-gasolina-etanol-10.csv')
)

In [57]:
df.printSchema()

root
 |-- Regiao - Sigla: string (nullable = true)
 |-- Estado - Sigla: string (nullable = true)
 |-- Municipio: string (nullable = true)
 |-- Revenda: string (nullable = true)
 |-- CNPJ da Revenda: string (nullable = true)
 |-- Nome da Rua: string (nullable = true)
 |-- Numero Rua: string (nullable = true)
 |-- Complemento: string (nullable = true)
 |-- Bairro: string (nullable = true)
 |-- Cep: string (nullable = true)
 |-- Produto: string (nullable = true)
 |-- Data da Coleta: string (nullable = true)
 |-- Valor de Venda: string (nullable = true)
 |-- Valor de Compra: string (nullable = true)
 |-- Unidade de Medida: string (nullable = true)
 |-- Bandeira: string (nullable = true)



In [58]:
# dataframe apenas com colunas que importam
df_precos = (
    df.select(
        'Estado - Sigla',
        'Produto',
        'Valor de Compra',
        'Valor de Venda',
        'Unidade de Medida'
    )
)

In [59]:
df_precos.show(5)

+--------------+------------------+---------------+--------------+-----------------+
|Estado - Sigla|           Produto|Valor de Compra|Valor de Venda|Unidade de Medida|
+--------------+------------------+---------------+--------------+-----------------+
|            AL|          GASOLINA|           NULL|          6,08|       R$ / litro|
|            AL|GASOLINA ADITIVADA|           NULL|          6,08|       R$ / litro|
|            AL|            ETANOL|           NULL|          4,78|       R$ / litro|
|            AL|          GASOLINA|           NULL|          5,79|       R$ / litro|
|            AL|            ETANOL|           NULL|          4,29|       R$ / litro|
+--------------+------------------+---------------+--------------+-----------------+
only showing top 5 rows



In [60]:
# pra verificar se há linhas não nulas na coluna Valor de Compra
(
    df_precos
    .where(
        F.col('Valor de Compra').isNotNull()
    )
    .show()
)

+--------------+-------+---------------+--------------+-----------------+
|Estado - Sigla|Produto|Valor de Compra|Valor de Venda|Unidade de Medida|
+--------------+-------+---------------+--------------+-----------------+
+--------------+-------+---------------+--------------+-----------------+



In [61]:
# como Valor de Compra não tem valores os dados de df_precos serão substituído
# substituímos , por . em Valor de Venda para que cast pra float funcione corretamente

df_precos = (
    df
    .select('Estado - Sigla', 'Produto', 'Valor de Venda', 'Unidade de Medida')
    .withColumn(         # retorna um novo df com valores de 'Valor de Venda' modificados
        'Valor de Venda',
        F.regexp_replace(F.col('Valor de Venda'), ',', '.')
        .cast('float')
    )
)

In [62]:
df_precos.show()

+--------------+------------------+--------------+-----------------+
|Estado - Sigla|           Produto|Valor de Venda|Unidade de Medida|
+--------------+------------------+--------------+-----------------+
|            AL|          GASOLINA|          6.08|       R$ / litro|
|            AL|GASOLINA ADITIVADA|          6.08|       R$ / litro|
|            AL|            ETANOL|          4.78|       R$ / litro|
|            AL|          GASOLINA|          5.79|       R$ / litro|
|            AL|            ETANOL|          4.29|       R$ / litro|
|            AL|          GASOLINA|          5.89|       R$ / litro|
|            AL|GASOLINA ADITIVADA|          6.09|       R$ / litro|
|            AL|            ETANOL|          4.49|       R$ / litro|
|            AL|          GASOLINA|          5.89|       R$ / litro|
|            AL|GASOLINA ADITIVADA|          6.09|       R$ / litro|
|            AL|            ETANOL|          4.99|       R$ / litro|
|            AL|          GASOLINA

In [63]:
df_precos_analise = (
    df_precos
    .groupBy(
        F.col('Estado - Sigla'),
        F.col('Produto'),
        F.col('Unidade de Medida'),
    )
    # com groupBy o spark define agrupamentos de linhas logicamente, mas para aplicar de fato 
    # funções de agregação (como min e max) é necessário usar agg. Ambas são transformações, e
    # por conta do Lazy Evaluation, o spark só computa esses cálculos quando uma ação é chamada
    # por exemplo um .show()
    .agg(
        F.min(F.col("Valor de Venda")).alias('menor_valor'),
        F.max(F.col("Valor de Venda")).alias('maior_valor')
    )
    .withColumn(    # retorna um novo df com adição da nova coluna 'diferenca'
        "diferenca",
        F.col('maior_valor') - F.col('menor_valor')
    )
    .orderBy('diferenca', ascending=False)
)

In [65]:
df_precos_analise.show(10)

+--------------+------------------+-----------------+-----------+-----------+---------+
|Estado - Sigla|           Produto|Unidade de Medida|menor_valor|maior_valor|diferenca|
+--------------+------------------+-----------------+-----------+-----------+---------+
|            SP|GASOLINA ADITIVADA|       R$ / litro|       4.79|       8.69|3.8999996|
|            SP|            ETANOL|       R$ / litro|       2.78|       6.19|     3.41|
|            SP|          GASOLINA|       R$ / litro|       4.59|       7.59|      3.0|
|            PA|            ETANOL|       R$ / litro|       3.85|        6.6|     2.75|
|            RS|            ETANOL|       R$ / litro|       3.88|       6.29|2.4099998|
|            BA|          GASOLINA|       R$ / litro|       4.69|       6.98|     2.29|
|            SC|            ETANOL|       R$ / litro|       3.84|       5.89|     2.05|
|            AL|            ETANOL|       R$ / litro|       3.79|        5.8|2.0100002|
|            PE|            ETAN