In [13]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [15]:
spark = ( 
 SparkSession
 .builder
    .master("local[*]")
 .appName('spark_dataframe_api')
 .getOrCreate()
)

In [23]:
df = (
    spark
    .read
    .option('delimiter', ';')
    .option('header', 'true')
    .option('inferSchema', 'true')
    .option('enconding', 'ISO-8859-1')
    .csv('./dados/precos-gasolina-etanol-09.csv')
)

df.printSchema()

root
 |-- Regiao - Sigla: string (nullable = true)
 |-- Estado - Sigla: string (nullable = true)
 |-- Municipio: string (nullable = true)
 |-- Revenda: string (nullable = true)
 |-- CNPJ da Revenda: string (nullable = true)
 |-- Nome da Rua: string (nullable = true)
 |-- Numero Rua: string (nullable = true)
 |-- Complemento: string (nullable = true)
 |-- Bairro: string (nullable = true)
 |-- Cep: string (nullable = true)
 |-- Produto: string (nullable = true)
 |-- Data da Coleta: string (nullable = true)
 |-- Valor de Venda: string (nullable = true)
 |-- Valor de Compra: string (nullable = true)
 |-- Unidade de Medida: string (nullable = true)
 |-- Bandeira: string (nullable = true)



In [24]:
df_precos = (
    df
    .select('Estado - Sigla', 'Produto', 'Valor de Venda', 'Unidade de Medida')
    .withColumn(
        "Valor de Venda",
        F.regexp_replace(F.col("Valor de Venda"), ",", ".")
        .cast("float")
    )
)

In [25]:
(
    df_precos
    .where(
        F.col('Valor de Compra').isNotNull()
    )
    .show()
)

+--------------+-------+--------------+-----------------+
|Estado - Sigla|Produto|Valor de Venda|Unidade de Medida|
+--------------+-------+--------------+-----------------+
+--------------+-------+--------------+-----------------+



In [28]:
df_precos.show()

+--------------+------------------+--------------+-----------------+
|Estado - Sigla|           Produto|Valor de Venda|Unidade de Medida|
+--------------+------------------+--------------+-----------------+
|            SP|          GASOLINA|          5.39|       R$ / litro|
|            SP|            ETANOL|          3.49|       R$ / litro|
|            AC|          GASOLINA|          7.17|       R$ / litro|
|            AC|GASOLINA ADITIVADA|          7.22|       R$ / litro|
|            AC|            ETANOL|          4.96|       R$ / litro|
|            AL|          GASOLINA|          5.89|       R$ / litro|
|            AL|GASOLINA ADITIVADA|          6.29|       R$ / litro|
|            AL|            ETANOL|          4.99|       R$ / litro|
|            AL|          GASOLINA|          5.89|       R$ / litro|
|            AL|GASOLINA ADITIVADA|          5.99|       R$ / litro|
|            AL|            ETANOL|          4.99|       R$ / litro|
|            AL|          GASOLINA

In [29]:
(
    df_precos
    .where(
        F.col('Valor de Compra').isNotNull()
    )
    .show()
)

+--------------+-------+--------------+-----------------+
|Estado - Sigla|Produto|Valor de Venda|Unidade de Medida|
+--------------+-------+--------------+-----------------+
+--------------+-------+--------------+-----------------+



In [30]:
df_precos = (
    df
    .select('Estado - Sigla', 'Produto', 'Valor de Venda', 'Unidade de Medida')
    .withColumn(
        "Valor de Venda",
        F.regexp_replace(F.col("Valor de Venda"), ",", ".")
        .cast("float")
    )
)

In [41]:
df_precos_analise = (
    df_precos
    .groupBy(
        F.col('Estado - Sigla'),
        F.col('Produto'),
        F.col('Unidade de Medida')
    )
    .agg(
        F.min(F.col("Valor de Venda")).alias('menor_valor'),
        F.max(F.col("Valor de Venda")).alias('maior_valor')
    )
    .withColumn(
        "diferenca",
        F.col('maior_valor') - F.col('menor_valor')
    )
    .orderBy('diferenca', ascending=False)
)

In [42]:
df_precos_analise.show(df_precos_analise.count(), truncate=False)

+--------------+------------------+-----------------+-----------+-----------+-----------+
|Estado - Sigla|Produto           |Unidade de Medida|menor_valor|maior_valor|diferenca  |
+--------------+------------------+-----------------+-----------+-----------+-----------+
|RS            |GASOLINA ADITIVADA|R$ / litro       |5.39       |9.79       |4.4        |
|SP            |GASOLINA ADITIVADA|R$ / litro       |5.29       |8.89       |3.6000004  |
|SP            |GASOLINA          |R$ / litro       |4.99       |7.99       |3.0        |
|SP            |ETANOL            |R$ / litro       |3.19       |5.99       |2.7999997  |
|RJ            |GASOLINA ADITIVADA|R$ / litro       |5.49       |7.99       |2.5        |
|RJ            |GASOLINA          |R$ / litro       |5.39       |7.79       |2.4        |
|RS            |GASOLINA          |R$ / litro       |5.19       |7.33       |2.1399999  |
|SC            |ETANOL            |R$ / litro       |3.95       |6.06       |2.11       |
|PE       

In [40]:
df_filtrado = df_precos_analise.filter(df_precos_analise["Estado - Sigla"] == "ES")
df_filtrado.show()

+--------------+------------------+-----------------+-----------+-----------+----------+
|Estado - Sigla|           Produto|Unidade de Medida|menor_valor|maior_valor| diferenca|
+--------------+------------------+-----------------+-----------+-----------+----------+
|            ES|GASOLINA ADITIVADA|       R$ / litro|       5.87|       7.17| 1.3000002|
|            ES|          GASOLINA|       R$ / litro|       5.59|       6.89| 1.2999997|
|            ES|            ETANOL|       R$ / litro|       3.99|       4.99|0.99999976|
+--------------+------------------+-----------------+-----------+-----------+----------+

