In [105]:
import matplotlib as plt
import csv
from statistics import pvariance
import findspark
findspark.init()

import pyspark 
from pyspark.sql import SparkSession
from pyspark.sql.types import FloatType
from pyspark.sql.functions import to_date, mean, col, lit, concat, unix_timestamp, month, year, regexp_replace, round, max, min, udf, concat_ws
spark = SparkSession.builder.getOrCreate()

In [106]:
df =  spark.read.csv(
      path="SEMANAL_MUNICIPIOS-2019.csv",
      sep=",",
      header=True,
      quote='"',
      inferSchema=True
)
df = df.withColumn('PREÇO MÉDIO REVENDA', regexp_replace('PREÇO MÉDIO REVENDA', ',', '.').cast('float'))
df = df.withColumn('PREÇO MÁXIMO REVENDA', regexp_replace('PREÇO MÁXIMO REVENDA', ',', '.').cast('float'))
df = df.withColumn('PREÇO MÍNIMO REVENDA', regexp_replace('PREÇO MÍNIMO REVENDA', ',', '.').cast('float'))
df.head()

Row(DATA INICIAL='30/12/2018', DATA FINAL='5/1/2019', REGIÃO='NORTE', ESTADO='PARA', MUNICÍPIO='ABAETETUBA', PRODUTO='ETANOL HIDRATADO', NÚMERO DE POSTOS PESQUISADOS=1, UNIDADE DE MEDIDA='R$/l', PREÇO MÉDIO REVENDA=4.650000095367432, DESVIO PADRÃO REVENDA='0,000', PREÇO MÍNIMO REVENDA=4.650000095367432, PREÇO MÁXIMO REVENDA=4.650000095367432, MARGEM MÉDIA REVENDA='-', COEF DE VARIAÇÃO REVENDA='0,000', PREÇO MÉDIO DISTRIBUIÇÃO='-', DESVIO PADRÃO DISTRIBUIÇÃO='-', PREÇO MÍNIMO DISTRIBUIÇÃO='-', PREÇO MÁXIMO DISTRIBUIÇÃO='-', COEF DE VARIAÇÃO DISTRIBUIÇÃO='-')

In [107]:
spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")
df = df.orderBy('DATA INICIAL')

In [108]:
df = (df.withColumn('DATA INICIAL', to_date(unix_timestamp('DATA INICIAL', 'dd/MM/yyyy').cast('timestamp'))))
df = (df.withColumn('Mês', month('DATA INICIAL')))
df = (df.withColumn('Ano', year('DATA INICIAL')))
df.describe()

DataFrame[summary: string, DATA FINAL: string, REGIÃO: string, ESTADO: string, MUNICÍPIO: string, PRODUTO: string, NÚMERO DE POSTOS PESQUISADOS: string, UNIDADE DE MEDIDA: string, PREÇO MÉDIO REVENDA: string, DESVIO PADRÃO REVENDA: string, PREÇO MÍNIMO REVENDA: string, PREÇO MÁXIMO REVENDA: string, MARGEM MÉDIA REVENDA: string, COEF DE VARIAÇÃO REVENDA: string, PREÇO MÉDIO DISTRIBUIÇÃO: string, DESVIO PADRÃO DISTRIBUIÇÃO: string, PREÇO MÍNIMO DISTRIBUIÇÃO: string, PREÇO MÁXIMO DISTRIBUIÇÃO: string, COEF DE VARIAÇÃO DISTRIBUIÇÃO: string, Mês: string, Ano: string]

In [109]:
df = df.orderBy("Ano", "Mês", "Município", "Produto")
exe1 = (df.groupBy("Ano","Mês","Município","Produto","Unidade de Medida").agg(round(mean("PREÇO MÉDIO REVENDA"),3).alias("Preço médio")))
exe1.show()

+----+---+--------------------+----------------+-----------------+-----------+
| Ano|Mês|           Município|         Produto|Unidade de Medida|Preço médio|
+----+---+--------------------+----------------+-----------------+-----------+
|2018| 12|          ABAETETUBA|ETANOL HIDRATADO|             R$/l|       4.65|
|2018| 12|          ABAETETUBA|  GASOLINA COMUM|             R$/l|      4.955|
|2018| 12|          ABAETETUBA|             GLP|          R$/13Kg|     70.667|
|2018| 12|          ABAETETUBA|     ÓLEO DIESEL|             R$/l|      4.088|
|2018| 12|          ABAETETUBA| ÓLEO DIESEL S10|             R$/l|       4.12|
|2018| 12|          ACAILANDIA|ETANOL HIDRATADO|             R$/l|      3.524|
|2018| 12|          ACAILANDIA|  GASOLINA COMUM|             R$/l|      4.189|
|2018| 12|          ACAILANDIA|             GLP|          R$/13Kg|       74.0|
|2018| 12|          ACAILANDIA|     ÓLEO DIESEL|             R$/l|      3.538|
|2018| 12|          ACAILANDIA| ÓLEO DIESEL S10|    

In [110]:
df = df.orderBy("Estado","Região")
exe2 =(df.groupBy("Estado","Região",).agg(round(mean("PREÇO MÉDIO REVENDA"),3).alias("Preço médio")))
exe2.show()

+-------------------+------------+-----------+
|             Estado|      Região|Preço médio|
+-------------------+------------+-----------+
|               ACRE|       NORTE|     19.451|
|            ALAGOAS|    NORDESTE|     15.369|
|              AMAPA|       NORTE|     21.808|
|           AMAZONAS|       NORTE|     20.866|
|              BAHIA|    NORDESTE|     15.238|
|              CEARA|    NORDESTE|     17.457|
|   DISTRITO FEDERAL|CENTRO OESTE|     17.239|
|     ESPIRITO SANTO|     SUDESTE|     14.959|
|              GOIAS|CENTRO OESTE|     17.045|
|           MARANHAO|    NORDESTE|     18.291|
|        MATO GROSSO|CENTRO OESTE|     22.364|
| MATO GROSSO DO SUL|CENTRO OESTE|     17.522|
|       MINAS GERAIS|     SUDESTE|     16.962|
|               PARA|       NORTE|     20.842|
|            PARAIBA|    NORDESTE|     15.817|
|             PARANA|         SUL|     16.837|
|         PERNAMBUCO|    NORDESTE|     17.882|
|              PIAUI|    NORDESTE|      18.08|
|     RIO DE 

In [111]:
def calculate(row):
    data = [float(x.strip()) for x in row.split(",")]
    return pvariance(data)

df = df.orderBy("Ano","Mês")
t_exe3= df.groupBy("Ano","Mês","Município").agg(max(round(col("PREÇO MÉDIO REVENDA"),3)).alias("Preço Máximo"),min(round(col("PREÇO MÉDIO REVENDA"),3)).alias("Preço Mínimo"))

varUDF = udf(calculate,FloatType())
t_exe3 = t_exe3.withColumn("Variância",varUDF(concat_ws(",", round(col("Preço máximo"),3), round(col("Preço Mínimo"),3))))
t_exe3 = t_exe3.withColumn("Variação Absoluta", round(( t_exe3["Preço máximo"] - t_exe3["Preço Mínimo"] ),3))
columns_to_drop = ['Preço máximo', 'Preço Mínimo']
exe3 = t_exe3.drop(*columns_to_drop)
exe3.show()

+----+---+--------------------+----------+-----------------+
| Ano|Mês|           Município| Variância|Variação Absoluta|
+----+---+--------------------+----------+-----------------+
|2018| 12|          ABAETETUBA| 1108.1908|           66.579|
|2018| 12|          ACAILANDIA| 1241.7167|           70.476|
|2018| 12|          ADAMANTINA| 1100.6138|           66.351|
|2018| 12|AGUAS LINDAS DE G...|  832.8707|           57.719|
|2018| 12|          ALAGOINHAS|   974.907|           62.447|
|2018| 12|            ALEGRETE| 1213.3031|           69.665|
|2018| 12|             ALFENAS|  994.0463|           63.057|
|2018| 12|       ALTA FLORESTA| 2654.2073|          103.038|
|2018| 12|            ALTAMIRA| 1919.2723|           87.619|
|2018| 12|            ALVORADA| 1067.7537|           65.353|
|2018| 12|           AMERICANA|   986.117|           62.805|
|2018| 12|              AMPARO| 1209.6136|           69.559|
|2018| 12|          ANANINDEUA| 1228.0469|           70.087|
|2018| 12|            AN

In [112]:
t_exe4 = df.groupBy("Município").agg(max(round(col("PREÇO MÁXIMO REVENDA"),3)).alias("Preço máximo"),min(round(col("PREÇO MÍNIMO REVENDA"),3)).alias("Preço Mínimo"))
t_exe4 = t_exe4.withColumn("Resultado", round(( t_exe4["Preço máximo"] - t_exe4["Preço Mínimo"] ),3))
exe4   = (t_exe4.orderBy(col("Resultado")).sort(col("Resultado").desc()).limit(5))
columns_to_drop = ['Preço máximo', 'Preço Mínimo']
exe4 = exe4.drop(*columns_to_drop)
exe4.show()

+-------------+---------+
|    Município|Resultado|
+-------------+---------+
|      SORRISO|   117.46|
|       CUIABA|  112.883|
|        SINOP|  107.611|
| RONDONOPOLIS|  107.601|
|ALTA FLORESTA|   107.34|
+-------------+---------+



In [113]:
exe1.toPandas().to_csv('exe1.csv')
exe2.toPandas().to_csv('exe2.csv')
exe3.toPandas().to_csv('exe3.csv')
exe4.toPandas().to_csv('exe4.csv')