In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = ( 
 SparkSession
 .builder
    .master("local[*]")
 .appName('spark_sql')
 .getOrCreate()
)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/10/22 11:24:58 WARN Utils: Your hostname, GTI-DADOS, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/10/22 11:24:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/22 11:24:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = (
    spark
    .read
    .option('delimiter', ';')
    .option('header', 'true')
    .option('inferSchema', 'true')
    .option('enconding', 'ISO-8859-1')
    .csv('../data/raw/precos-gasolina-etanol-09.csv')
)

In [4]:
df.printSchema()

root
 |-- Regiao - Sigla: string (nullable = true)
 |-- Estado - Sigla: string (nullable = true)
 |-- Municipio: string (nullable = true)
 |-- Revenda: string (nullable = true)
 |-- CNPJ da Revenda: string (nullable = true)
 |-- Nome da Rua: string (nullable = true)
 |-- Numero Rua: string (nullable = true)
 |-- Complemento: string (nullable = true)
 |-- Bairro: string (nullable = true)
 |-- Cep: string (nullable = true)
 |-- Produto: string (nullable = true)
 |-- Data da Coleta: string (nullable = true)
 |-- Valor de Venda: string (nullable = true)
 |-- Valor de Compra: string (nullable = true)
 |-- Unidade de Medida: string (nullable = true)
 |-- Bandeira: string (nullable = true)



In [5]:
df.createOrReplaceTempView('combustiveis')

In [6]:
spark.sql("""
    select * from combustiveis where `Valor de Compra` IS NOT NULL
""").show()

+--------------+--------------+---------+-------+---------------+-----------+----------+-----------+------+---+-------+--------------+--------------+---------------+-----------------+--------+
|Regiao - Sigla|Estado - Sigla|Municipio|Revenda|CNPJ da Revenda|Nome da Rua|Numero Rua|Complemento|Bairro|Cep|Produto|Data da Coleta|Valor de Venda|Valor de Compra|Unidade de Medida|Bandeira|
+--------------+--------------+---------+-------+---------------+-----------+----------+-----------+------+---+-------+--------------+--------------+---------------+-----------------+--------+
+--------------+--------------+---------+-------+---------------+-----------+----------+-----------+------+---+-------+--------------+--------------+---------------+-----------------+--------+



In [7]:
view_precos = spark.sql("""
    select 
        `Estado - Sigla`, 
        `Produto`, 
        regexp_replace(`Valor de Venda`, ",", ".") as `Valor de Venda`,
        `Unidade de Medida`
    from combustiveis
""")

In [8]:
view_precos.createOrReplaceTempView('view_precos')
view_precos.show(5)

+--------------+--------+--------------+-----------------+
|Estado - Sigla| Produto|Valor de Venda|Unidade de Medida|
+--------------+--------+--------------+-----------------+
|            SP|GASOLINA|          5.59|       R$ / litro|
|            SP|  ETANOL|          3.69|       R$ / litro|
|            AP|GASOLINA|          5.95|       R$ / litro|
|            AP|GASOLINA|          5.95|       R$ / litro|
|            AP|GASOLINA|          5.95|       R$ / litro|
+--------------+--------+--------------+-----------------+
only showing top 5 rows


In [11]:
view_precos.printSchema()

root
 |-- Estado - Sigla: string (nullable = true)
 |-- Produto: string (nullable = true)
 |-- Valor de Venda: string (nullable = true)
 |-- Unidade de Medida: string (nullable = true)



In [12]:
df_diferenca_precos = spark.sql("""
    SELECT
    `Estado - Sigla`,
    `Produto`,
    `Unidade de Medida`,
    MIN(CAST(`Valor de Venda` AS DECIMAL(10, 3))) AS menor_valor, -- Cast within MIN/MAX
    MAX(CAST(`Valor de Venda` AS DECIMAL(10, 3))) AS maior_valor, -- Cast within MIN/MAX
    MAX(CAST(`Valor de Venda` AS DECIMAL(10, 3))) - MIN(CAST(`Valor de Venda` AS DECIMAL(10, 3))) AS diferenca -- Re-calculate the difference
FROM view_precos
GROUP BY ALL
ORDER BY diferenca DESC
""")
df_diferenca_precos.show(5)

+--------------+------------------+-----------------+-----------+-----------+---------+
|Estado - Sigla|           Produto|Unidade de Medida|menor_valor|maior_valor|diferenca|
+--------------+------------------+-----------------+-----------+-----------+---------+
|            SP|GASOLINA ADITIVADA|       R$ / litro|      5.190|      9.590|    4.400|
|            SP|          GASOLINA|       R$ / litro|      4.990|      9.190|    4.200|
|            SP|            ETANOL|       R$ / litro|      3.190|      6.390|    3.200|
|            BA|GASOLINA ADITIVADA|       R$ / litro|      5.440|      8.490|    3.050|
|            RJ|          GASOLINA|       R$ / litro|      5.390|      7.790|    2.400|
+--------------+------------------+-----------------+-----------+-----------+---------+
only showing top 5 rows
