In [18]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = ( 
 SparkSession
 .builder
    .master("local[*]")
 .appName('spark_dataframe_api')
 .getOrCreate()
)

In [19]:
df = (
    spark
    .read
    .option('delimiter', ';')
    .option('header', 'true')
    .option('inferSchema', 'true')
    .option('enconding', 'ISO-8859-1')
    .csv('./dados/precos-gasolina-etanol-08.csv')
)

df.printSchema()

root
 |-- Regiao - Sigla: string (nullable = true)
 |-- Estado - Sigla: string (nullable = true)
 |-- Municipio: string (nullable = true)
 |-- Revenda: string (nullable = true)
 |-- CNPJ da Revenda: string (nullable = true)
 |-- Nome da Rua: string (nullable = true)
 |-- Numero Rua: string (nullable = true)
 |-- Complemento: string (nullable = true)
 |-- Bairro: string (nullable = true)
 |-- Cep: string (nullable = true)
 |-- Produto: string (nullable = true)
 |-- Data da Coleta: string (nullable = true)
 |-- Valor de Venda: string (nullable = true)
 |-- Valor de Compra: string (nullable = true)
 |-- Unidade de Medida: string (nullable = true)
 |-- Bandeira: string (nullable = true)



In [20]:
df.show(truncate=False)

+--------------+--------------+---------+-------------------------------------------------------+-------------------+-----------------------------------------+----------+--------------+----------------+---------+------------------+--------------+--------------+---------------+-----------------+--------+
|Regiao - Sigla|Estado - Sigla|Municipio|Revenda                                                |CNPJ da Revenda    |Nome da Rua                              |Numero Rua|Complemento   |Bairro          |Cep      |Produto           |Data da Coleta|Valor de Venda|Valor de Compra|Unidade de Medida|Bandeira|
+--------------+--------------+---------+-------------------------------------------------------+-------------------+-----------------------------------------+----------+--------------+----------------+---------+------------------+--------------+--------------+---------------+-----------------+--------+
|N             |AM            |PARINTINS|F.J. COMERCIO DE COMBUSTIVEIS LTDA          

In [21]:
df_precos = (
    df
    .select('Estado - Sigla', 'Produto', 'Valor de Compra', 'Valor de Venda', 'Unidade de Medida')
)

In [22]:
df_precos.show(5)

+--------------+------------------+---------------+--------------+-----------------+
|Estado - Sigla|           Produto|Valor de Compra|Valor de Venda|Unidade de Medida|
+--------------+------------------+---------------+--------------+-----------------+
|            AM|          GASOLINA|           null|          7,68|       R$ / litro|
|            AM|GASOLINA ADITIVADA|           null|          7,68|       R$ / litro|
|            CE|          GASOLINA|           null|          6,77|       R$ / litro|
|            CE|GASOLINA ADITIVADA|           null|          6,78|       R$ / litro|
|            CE|            ETANOL|           null|          5,24|       R$ / litro|
+--------------+------------------+---------------+--------------+-----------------+
only showing top 5 rows



In [23]:
# verificar se no dataframe a coluna "Valor de Compra' tem valor nulos
(
    df_precos
    .where(
        F.col('Valor de Compra').isNotNull()
    )
    .show()
)

+--------------+-------+---------------+--------------+-----------------+
|Estado - Sigla|Produto|Valor de Compra|Valor de Venda|Unidade de Medida|
+--------------+-------+---------------+--------------+-----------------+
+--------------+-------+---------------+--------------+-----------------+



In [24]:
# Formatar Valor de Venda para FLOAT
df_precos = (
    df
    .select('Estado - Sigla', 'Produto', 'Valor de Venda', 'Unidade de Medida')
    .withColumn(
        "Valor de Venda",
        F.regexp_replace(F.col("Valor de Venda"), ",", ".")
        .cast("Float")    
    )
)


In [25]:
## Calcular o Valor de MIN e MAX e verificar a DIFERENCA
# Realizar agregação para obter as medidas
df_precos_analise = (
    df_precos
    .groupBy(
        F.col('Estado - Sigla'),
        F.col('Produto'),
        F.col('Unidade de Medida')
    )
    .agg(
        F.min(F.col("Valor de Venda")).alias('menor_valor'),
        F.max(F.col("Valor de Venda")).alias('maior_valor')
    )
    .withColumn(
        "diferenca",
       F.col('maior_valor') - F.col('menor_valor') 
    )
    .orderBy('diferenca', ascending=False)
)

In [26]:
df_precos_analise.show(10)

+--------------+------------------+-----------------+-----------+-----------+---------+
|Estado - Sigla|           Produto|Unidade de Medida|menor_valor|maior_valor|diferenca|
+--------------+------------------+-----------------+-----------+-----------+---------+
|            SP|GASOLINA ADITIVADA|       R$ / litro|       5.09|       8.89|3.8000002|
|            SP|          GASOLINA|       R$ / litro|       4.99|       7.99|      3.0|
|            SP|            ETANOL|       R$ / litro|       3.29|       5.99|2.6999998|
|            RJ|GASOLINA ADITIVADA|       R$ / litro|       5.49|       7.99|      2.5|
|            GO|            ETANOL|       R$ / litro|       3.39|       5.89|2.4999998|
|            RJ|          GASOLINA|       R$ / litro|       5.39|       7.79|      2.4|
|            PE|            ETANOL|       R$ / litro|       3.99|       5.99|1.9999998|
|            RS|            ETANOL|       R$ / litro|       4.15|       6.14|1.9899998|
|            SC|            ETAN