## Importação das bibliotecas

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

## Criação do Dataframe com Spark

In [3]:
spark = ( 
 SparkSession
 .builder
 .master("local[*]")
 .appName('spark_dataframe_api')
 .getOrCreate()
)

df = (
    spark
    .read
    .option('delimiter', ';')
    .option('header', 'true')
    .option('inferSchema', 'true')
    .option('enconding', 'ISO-8859-1')
    .csv('./data/precos-gasolina-etanol-01.csv')
)

df.printSchema()

your 131072x1 screen size is bogus. expect trouble
25/03/20 20:34:24 WARN Utils: Your hostname, denis resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
25/03/20 20:34:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/20 20:34:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/20 20:34:25 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/03/20 20:34:25 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/03/20 20:34:25 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


root
 |-- Regiao - Sigla: string (nullable = true)
 |-- Estado - Sigla: string (nullable = true)
 |-- Municipio: string (nullable = true)
 |-- Revenda: string (nullable = true)
 |-- CNPJ da Revenda: string (nullable = true)
 |-- Nome da Rua: string (nullable = true)
 |-- Numero Rua: string (nullable = true)
 |-- Complemento: string (nullable = true)
 |-- Bairro: string (nullable = true)
 |-- Cep: string (nullable = true)
 |-- Produto: string (nullable = true)
 |-- Data da Coleta: string (nullable = true)
 |-- Valor de Venda: string (nullable = true)
 |-- Valor de Compra: string (nullable = true)
 |-- Unidade de Medida: string (nullable = true)
 |-- Bandeira: string (nullable = true)



## Criação de uma View

In [4]:
df.createOrReplaceTempView('combustiveis')

## Filtrar Colunas

In [5]:
columns = "`Estado - Sigla`, `Produto`, `Valor de Compra`, `Valor de Venda`, `Unidade de Medida`"

In [8]:
df = spark.sql(f"""
    select {columns} from combustiveis
""")

df.show(5)

+--------------+------------------+---------------+--------------+-----------------+
|Estado - Sigla|           Produto|Valor de Compra|Valor de Venda|Unidade de Medida|
+--------------+------------------+---------------+--------------+-----------------+
|            AC|          GASOLINA|           NULL|          6,99|       R$ / litro|
|            AC|            ETANOL|           NULL|          5,99|       R$ / litro|
|            AC|GASOLINA ADITIVADA|           NULL|          7,05|       R$ / litro|
|            AC|          GASOLINA|           NULL|          6,99|       R$ / litro|
|            AC|            ETANOL|           NULL|          6,12|       R$ / litro|
+--------------+------------------+---------------+--------------+-----------------+
only showing top 5 rows



## Removendo Coluna "Valor de Compra"

In [9]:
df_filtered = spark.sql(f"""
    select {columns} from combustiveis where `Valor de Compra` IS NOT NULL
""")
                        
df_filtered.show()

+--------------+-------+---------------+--------------+-----------------+
|Estado - Sigla|Produto|Valor de Compra|Valor de Venda|Unidade de Medida|
+--------------+-------+---------------+--------------+-----------------+
+--------------+-------+---------------+--------------+-----------------+



In [10]:
df_filtered = df_filtered.drop('Valor de Compra')
df_filtered.show()

+--------------+-------+--------------+-----------------+
|Estado - Sigla|Produto|Valor de Venda|Unidade de Medida|
+--------------+-------+--------------+-----------------+
+--------------+-------+--------------+-----------------+



## Convertendo coluna

In [None]:
view_prices = spark.sql("""
    select 
        `Estado - Sigla`, 
        `Produto`, 
        regexp_replace(`Valor de Venda`, ",", ".") as `Valor de Venda`,
        `Unidade de Medida`
    from combustiveis
""")

In [14]:
view_prices.createOrReplaceTempView('view_precos')
view_prices.show(5)

view_prices.printSchema()

+--------------+------------------+--------------+-----------------+
|Estado - Sigla|           Produto|Valor de Venda|Unidade de Medida|
+--------------+------------------+--------------+-----------------+
|            AC|          GASOLINA|          6.99|       R$ / litro|
|            AC|            ETANOL|          5.99|       R$ / litro|
|            AC|GASOLINA ADITIVADA|          7.05|       R$ / litro|
|            AC|          GASOLINA|          6.99|       R$ / litro|
|            AC|            ETANOL|          6.12|       R$ / litro|
+--------------+------------------+--------------+-----------------+
only showing top 5 rows

root
 |-- Estado - Sigla: string (nullable = true)
 |-- Produto: string (nullable = true)
 |-- Valor de Venda: string (nullable = true)
 |-- Unidade de Medida: string (nullable = true)



## Analise sobre o "Valor de Venda"

In [15]:
df_diferenca_precos = spark.sql("""
    select 
        `Estado - Sigla`,
        `Produto`,
        `Unidade de Medida`,
        MIN(`Valor de Venda`) as menor_valor,
        MAX(`Valor de Venda`) as maior_valor,
        MAX(`Valor de Venda`) - MIN(`Valor de Venda`) as diferenca
   from view_precos
   group by all
   order by diferenca DESC
""")
df_diferenca_precos.show(5)

+--------------+------------------+-----------------+-----------+-----------+------------------+
|Estado - Sigla|           Produto|Unidade de Medida|menor_valor|maior_valor|         diferenca|
+--------------+------------------+-----------------+-----------+-----------+------------------+
|            SP|GASOLINA ADITIVADA|       R$ / litro|      5.789|      8.699|              2.91|
|            RS|            ETANOL|       R$ / litro|      5.199|      7.797|             2.598|
|            RJ|GASOLINA ADITIVADA|       R$ / litro|      5.799|      8.209|2.4099999999999993|
|            MG|            ETANOL|       R$ / litro|       4.59|      6.999|             2.409|
|            PR|            ETANOL|       R$ / litro|      4.669|       6.99|2.3210000000000006|
+--------------+------------------+-----------------+-----------+-----------+------------------+
only showing top 5 rows

