In [2]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName("spark_dataframe_api").master("local[*]").getOrCreate()

In [6]:
df = (
    spark
    .read
    .option(key="delimiter", value=";")
    .option(key="header", value="true")
    .option(key="inferSchema", value="true")
    .option(key="encoding", value="ISO-8859-1")
    .csv("./data/precos-gasolina-etanol-3.csv")
)

In [8]:
df.printSchema()

root
 |-- Regiao - Sigla: string (nullable = true)
 |-- Estado - Sigla: string (nullable = true)
 |-- Municipio: string (nullable = true)
 |-- Revenda: string (nullable = true)
 |-- CNPJ da Revenda: string (nullable = true)
 |-- Nome da Rua: string (nullable = true)
 |-- Numero Rua: string (nullable = true)
 |-- Complemento: string (nullable = true)
 |-- Bairro: string (nullable = true)
 |-- Cep: string (nullable = true)
 |-- Produto: string (nullable = true)
 |-- Data da Coleta: string (nullable = true)
 |-- Valor de Venda: string (nullable = true)
 |-- Valor de Compra: string (nullable = true)
 |-- Unidade de Medida: string (nullable = true)
 |-- Bandeira: string (nullable = true)



In [13]:
df_precos = df.select(
    'Estado - Sigla',
    'Produto',
    'Valor de Compra',
    'Valor de Venda',
    'Unidade de Medida'
)

In [14]:
df_precos.show(5)

+--------------+------------------+---------------+--------------+-----------------+
|Estado - Sigla|           Produto|Valor de Compra|Valor de Venda|Unidade de Medida|
+--------------+------------------+---------------+--------------+-----------------+
|            PB|          GASOLINA|           NULL|          5,59|       R$ / litro|
|            PB|GASOLINA ADITIVADA|           NULL|          5,63|       R$ / litro|
|            PB|            ETANOL|           NULL|          3,99|       R$ / litro|
|            PB|          GASOLINA|           NULL|          5,59|       R$ / litro|
|            PB|GASOLINA ADITIVADA|           NULL|          5,79|       R$ / litro|
+--------------+------------------+---------------+--------------+-----------------+
only showing top 5 rows



In [15]:
# verifying if field "Valor de Compra" is always null
df_precos.where(
    F.col('Valor de Compra').isNotNull()
).show(5)

+--------------+-------+---------------+--------------+-----------------+
|Estado - Sigla|Produto|Valor de Compra|Valor de Venda|Unidade de Medida|
+--------------+-------+---------------+--------------+-----------------+
+--------------+-------+---------------+--------------+-----------------+



In [23]:
# removing "Valor de Compra" since it is always null and casting "Valor de Compra"
df_precos = df.select(
    'Estado - Sigla',
    'Produto',
    'Valor de Venda',
    'Unidade de Medida'
).withColumn(
    "Valor de Venda",
    F.regexp_replace(string="Valor de Venda", pattern=",", replacement=".").cast("float")
)

In [30]:
df_precos_analise = df_precos.groupBy(
    F.col('Estado - Sigla'),
    F.col('Produto'),
    F.col('Unidade de Medida')
).agg(
    F.min(F.col('Valor de Venda')).alias("menor_valor"),
    F.max(F.col('Valor de Venda')).alias("maior_valor")
).withColumn(
    "diferenca",
    F.col("maior_valor") - F.col("menor_valor")
).orderBy("diferenca", ascending=False)