### Configuração do Ambiente e Leitura dos Dados

In [2]:
# Importações padrão
import os
import json

# Importações Análise de Dados
import pandas as pd
import numpy as np

# Importações de visualização
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
from matplotlib import font_manager
import seaborn as sns

# Importações Spark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Importações Machine Learning Spark
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.clustering import KMeans
from pyspark.ml.classification import LogisticRegression

# Importações Previsão de Séries Temporais
from prophet import Prophet
from prophet.plot import plot_plotly, plot_components_plotly

# Importações MongoDB
from pymongo import MongoClient

In [42]:
# Configurar sessão Spark
spark = SparkSession.builder \
    .appName("Integracao PySpark MongoDB") \
    .getOrCreate()

# Obter URI do MongoDB das variáveis de ambiente
mongo_uri = os.getenv("MONGO_URI")

# Configurar cliente MongoDB
client = MongoClient(mongo_uri)

# Selecionar banco de dados
db = client.get_database('SparkDB')  # Substitua 'test' pelo nome do seu banco de dados

# Criar a coleção de séries temporais com as opções especificadas
db.create_collection(
    'spark_collection',  # Substitua 'your_collection_name' pelo nome da sua coleção
    timeseries={
        'timeField': 'timestamp',
        'metaField': 'metadata',
        'granularity': 'seconds'
    },
    expireAfterSeconds=86400
)

# Selecionar a coleção criada
collection = db.get_collection('spark_collection')

CollectionInvalid: collection spark_collection already exists

In [43]:
# Lê o arquivo CSV
df = (
    spark
    .read 
    .option('delimiter', ';')
    .option('header', 'true')
    .option('inferSchema', 'true')
    .option('encoding', 'ISO-8859-1')
    .csv('./src/Preços semestrais - AUTOMOTIVOS_2023.02.csv')
)

In [44]:
# Exibe o esquema do dataframe
df.printSchema()

root
 |-- Regiao - Sigla: string (nullable = true)
 |-- Estado - Sigla: string (nullable = true)
 |-- Municipio: string (nullable = true)
 |-- Revenda: string (nullable = true)
 |-- CNPJ da Revenda: string (nullable = true)
 |-- Nome da Rua: string (nullable = true)
 |-- Numero Rua: string (nullable = true)
 |-- Complemento: string (nullable = true)
 |-- Bairro: string (nullable = true)
 |-- Cep: string (nullable = true)
 |-- Produto: string (nullable = true)
 |-- Data da Coleta: string (nullable = true)
 |-- Valor de Venda: string (nullable = true)
 |-- Valor de Compra: string (nullable = true)
 |-- Unidade de Medida: string (nullable = true)
 |-- Bandeira: string (nullable = true)



### Tratamento e Exploração de Dados

In [45]:
# Seleciona as colunas relevantes
df_precos = (
    df
    .select('Estado - Sigla', 'Produto', 'Valor de Compra', 'Valor de Venda', 'Unidade de Medida', 'Data da Coleta')
)

In [46]:
# Verificando registros iniciais
df_precos.show(5)

+--------------+----------+---------------+--------------+-----------------+--------------+
|Estado - Sigla|   Produto|Valor de Compra|Valor de Venda|Unidade de Medida|Data da Coleta|
+--------------+----------+---------------+--------------+-----------------+--------------+
|            SP|  GASOLINA|           NULL|          4,87|       R$ / litro|    03/07/2023|
|            SP|DIESEL S10|           NULL|          4,88|       R$ / litro|    03/07/2023|
|            SP|    ETANOL|           NULL|          3,27|       R$ / litro|    03/07/2023|
|            AC|  GASOLINA|           NULL|          6,95|       R$ / litro|    03/07/2023|
|            AC|DIESEL S10|           NULL|          6,85|       R$ / litro|    03/07/2023|
+--------------+----------+---------------+--------------+-----------------+--------------+
only showing top 5 rows



In [47]:
# Filtra e mostra as linhas onde o valor da coluna 'Valor de Compra' não é nulo
(
    df_precos
    .where(
        F.col('Valor de Compra').isNotNull()
    )
    .show()
)

+--------------+-------+---------------+--------------+-----------------+--------------+
|Estado - Sigla|Produto|Valor de Compra|Valor de Venda|Unidade de Medida|Data da Coleta|
+--------------+-------+---------------+--------------+-----------------+--------------+
+--------------+-------+---------------+--------------+-----------------+--------------+



In [48]:
# Verificação de valores nulos em cada coluna
print("\nValores nulos por coluna:")
df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).show()


Valores nulos por coluna:
+--------------+--------------+---------+-------+---------------+-----------+----------+-----------+------+---+-------+--------------+--------------+---------------+-----------------+--------+
|Regiao - Sigla|Estado - Sigla|Municipio|Revenda|CNPJ da Revenda|Nome da Rua|Numero Rua|Complemento|Bairro|Cep|Produto|Data da Coleta|Valor de Venda|Valor de Compra|Unidade de Medida|Bandeira|
+--------------+--------------+---------+-------+---------------+-----------+----------+-----------+------+---+-------+--------------+--------------+---------------+-----------------+--------+
|             0|             0|        0|      0|              0|          0|       140|     362579|   854|  0|      0|             0|             0|         472424|                0|       0|
+--------------+--------------+---------+-------+---------------+-----------+----------+-----------+------+---+-------+--------------+--------------+---------------+-----------------+--------+



In [49]:
# Quantidade de registros e variáveis disponíveis
print(f"Quantidade de registros: {df.count()}")
print(f"Variáveis disponíveis: {len(df.columns)}")

Quantidade de registros: 472424
Variáveis disponíveis: 16


In [50]:
# Seleção e transformação de colunas
df_precos = (
    df
    .select('Estado - Sigla', 'Produto', 'Valor de Venda', 'Unidade de Medida')
    .withColumn(
        "Valor de Venda",
        F.regexp_replace(F.col("Valor de Venda"), ",", ".")
        .cast("float")
    )
)

In [51]:
# Análise estatística dos preços por Estado, Produto e Unidade de Medida
df_precos_analise = (
    df_precos
    .groupBy(
        F.col('Estado - Sigla'),
        F.col('Produto'),
        F.col('Unidade de Medida')
    )
    .agg(
        F.min(F.col("Valor de Venda")).alias('menor_valor'),
        F.max(F.col("Valor de Venda")).alias('maior_valor')
    )
    .withColumn(
        "diferenca",
        F.col('maior_valor') - F.col('menor_valor')
    )
    .orderBy('diferenca', ascending=False)
)

In [52]:
# Exibição dos 10 primeiros registros da análise de preços
df_precos_analise.show(10)

+--------------+------------------+-----------------+-----------+-----------+---------+
|Estado - Sigla|           Produto|Unidade de Medida|menor_valor|maior_valor|diferenca|
+--------------+------------------+-----------------+-----------+-----------+---------+
|            RS|GASOLINA ADITIVADA|       R$ / litro|       4.89|       9.79|      4.9|
|            SP|        DIESEL S10|       R$ / litro|       4.27|        9.0|     4.73|
|            RJ|        DIESEL S10|       R$ / litro|       4.48|       8.89|4.4100003|
|            AM|        DIESEL S10|       R$ / litro|       4.79|       8.89|4.1000004|
|            BA|GASOLINA ADITIVADA|       R$ / litro|       4.89|       8.94|4.0499997|
|            SP|GASOLINA ADITIVADA|       R$ / litro|       4.69|       8.69|3.9999995|
|            PE|        DIESEL S10|       R$ / litro|       4.34|       8.19|3.8499994|
|            PR|GASOLINA ADITIVADA|       R$ / litro|       4.79|       8.49|3.6999998|
|            SP|            ETAN

In [53]:
# Análise de Preços por Estado
df_precos_analise = (
    df_precos
    .groupBy('Estado - Sigla', 'Produto', 'Unidade de Medida')
    .agg(
        F.min(F.col("Valor de Venda")).alias('menor_valor'),
        F.max(F.col("Valor de Venda")).alias('maior_valor'),
        F.mean(F.col("Valor de Venda")).alias('media_valor'),
        F.stddev(F.col("Valor de Venda")).alias('desvio_padrao_valor'),
        F.count(F.col("Valor de Venda")).alias('contagem')
    )
    .withColumn("diferenca", F.col('maior_valor') - F.col('menor_valor'))
    .orderBy('diferenca', ascending=False)
)

In [54]:
# Exibe as primeiras linhas do dataframe resultante
df_precos_analise.show(10)

+--------------+------------------+-----------------+-----------+-----------+-----------------+-------------------+--------+---------+
|Estado - Sigla|           Produto|Unidade de Medida|menor_valor|maior_valor|      media_valor|desvio_padrao_valor|contagem|diferenca|
+--------------+------------------+-----------------+-----------+-----------+-----------------+-------------------+--------+---------+
|            RS|GASOLINA ADITIVADA|       R$ / litro|       4.89|       9.79|5.853811392165644|0.31784309071244626|    9036|      4.9|
|            SP|        DIESEL S10|       R$ / litro|       4.27|        9.0| 5.87926028785255| 0.6079625454624801|   22267|     4.73|
|            RJ|        DIESEL S10|       R$ / litro|       4.48|       8.89|5.862711620882171| 0.5228997434652143|    5362|4.4100003|
|            AM|        DIESEL S10|       R$ / litro|       4.79|       8.89|6.334521780078043| 0.7730660228625731|    1192|4.1000004|
|            BA|GASOLINA ADITIVADA|       R$ / litro|  

In [55]:
# Identificação de Outliers (primeiro calculando a média e desvio padrão)
df_outliers_stats = (
    df_precos
    .groupBy('Produto')
    .agg(
        F.mean(F.col("Valor de Venda")).alias('media_valor'),
        F.stddev(F.col("Valor de Venda")).alias('desvio_padrao_valor')
    )
)

In [56]:
# Juntar as estatísticas de volta ao dataframe original
df_precos = df_precos.join(df_outliers_stats, on="Produto", how="left")

In [None]:
# Identificação de Outliers
df_outliers = (
    df_precos
    .withColumn(
        "outlier",
        F.when(
            (F.col("Valor de Venda") > F.col("media_valor") + 3 * F.col("desvio_padrao_valor")) |
            (F.col("Valor de Venda") < F.col("media_valor") - 3 * F.col("desvio_padrao_valor")),
            1
        ).otherwise(0)
    )
)

In [57]:
# Exibe os outliers identificados
df_outliers.show()

+------------------+--------------+--------------+-----------------+------------------+-------------------+-------+
|           Produto|Estado - Sigla|Valor de Venda|Unidade de Medida|       media_valor|desvio_padrao_valor|outlier|
+------------------+--------------+--------------+-----------------+------------------+-------------------+-------+
|        DIESEL S10|            SP|          4.88|       R$ / litro| 5.897939561361159| 0.6014171589611345|      0|
|        DIESEL S10|            AC|          6.85|       R$ / litro| 5.897939561361159| 0.6014171589611345|      0|
|        DIESEL S10|            AC|          6.84|       R$ / litro| 5.897939561361159| 0.6014171589611345|      0|
|        DIESEL S10|            AC|          5.99|       R$ / litro| 5.897939561361159| 0.6014171589611345|      0|
|        DIESEL S10|            AC|          5.85|       R$ / litro| 5.897939561361159| 0.6014171589611345|      0|
|            DIESEL|            AC|          6.82|       R$ / litro| 5.7

In [58]:
# Contagem das bandeiras em ordem crescente
df_bandeiras = (
    df
    .groupBy('Bandeira')
    .count()
    .orderBy('count', ascending=True)
)

In [59]:
# Exibe as bandeiras em ordem crescente de contagem
df_bandeiras.show(23)

+-------------------+-----+
|           Bandeira|count|
+-------------------+-----+
|        AMERICANOIL|   18|
|            REJAILE|   20|
|        PETROBRASIL|   23|
|          ROYAL FIC|   30|
|            ESTRADA|   40|
|             AIR BP|   45|
|  RZD DISTRIBUIDORA|   48|
|                UNI|   77|
|          SIMARELLI|   79|
|        WALENDOWSKY|   80|
|           PELIKANO|   95|
|  SUL COMBUSTÃVEIS|   95|
|           ON PETRO|   99|
|            DIBRAPE|  108|
|    FEDERAL ENERGIA|  127|
|             TEMAPE|  143|
|SETTA DISTRIBUIDORA|  178|
|  TDC DISTRIBUIDORA|  216|
|             D`MAIS|  240|
|              IDAZA|  246|
|         RIO BRANCO|  311|
|MASUT DISTRIBUIDORA|  323|
|              LARCO|  329|
+-------------------+-----+
only showing top 23 rows



In [60]:
# Estatísticas de preços para cada bandeira
df_bandeira_precos = (
    df
    .withColumn(
        "Valor de Venda",
        F.regexp_replace(F.col("Valor de Venda"), ",", ".").cast("float")
    )
    .groupBy('Bandeira')
    .agg(
        F.mean(F.col("Valor de Venda")).alias('preco_medio'),
        F.min(F.col("Valor de Venda")).alias('menor_preco'),
        F.max(F.col("Valor de Venda")).alias('maior_preco')
    )
    .orderBy('preco_medio')
)

In [61]:
# Exibe as estatísticas de preços por bandeira
df_bandeira_precos.show(23)

+-------------------+------------------+-----------+-----------+
|           Bandeira|       preco_medio|menor_preco|maior_preco|
+-------------------+------------------+-----------+-----------+
|        PETROBRASIL| 4.268695665442425|       2.99|       5.19|
|    FEDERAL ENERGIA| 4.954015713038407|       3.39|       6.19|
|             D`MAIS| 5.028666603565216|       2.99|       5.99|
|          SIMARELLI|  5.04645566698871|       3.32|       6.75|
|      TOTALENERGIES| 5.102309747234635|       2.99|       6.49|
|           CIAPETRO| 5.176007434058545|       2.99|       6.59|
|  TDC DISTRIBUIDORA| 5.206666647284119|       3.53|       6.19|
|          ROYAL FIC|  5.21233328183492|       3.39|       6.39|
|              STANG| 5.216778752445119|       3.39|       6.31|
|             BRANCA| 5.242196665507463|       2.69|       8.89|
|      VIBRA ENERGIA| 5.270136823919084|       3.25|       8.39|
|MASUT DISTRIBUIDORA| 5.279969014619526|       3.19|       6.59|
|             TAURUS| 5.3

In [62]:
# Região com Maior Volume de Vendas
df_regiao_vendas = df.groupBy("Regiao - Sigla") \
                   .agg(F.sum("Valor de Venda").alias("Total_Vendas")) \
                   .orderBy("Total_Vendas", ascending=False) \
                   .first()

print(f"Região com maior volume de vendas: {df_regiao_vendas['Regiao - Sigla']}")

Região com maior volume de vendas: N


In [63]:
# Ticket Médio por Região
df_regiao_ticket_medio = df.groupBy("Regiao - Sigla") \
                           .agg(F.avg("Valor de Venda").alias("Ticket_Medio")) \
                           .orderBy("Ticket_Medio", ascending=False)

df_regiao_ticket_medio.show(5)

+--------------+-----------------+
|Regiao - Sigla|     Ticket_Medio|
+--------------+-----------------+
|             N|6.042918454935623|
|            SE|5.763513513513513|
|             S|5.756756756756757|
|            NE|5.432160804020101|
|            CO|              4.8|
+--------------+-----------------+



In [64]:
# Produto mais vendido
df_produto_mais_vendido = df.groupBy("Produto") \
                          .agg(F.sum("Valor de Venda").alias("Total_Vendas")) \
                          .orderBy("Total_Vendas", ascending=False) \
                          .first()

print(f"Produto mais vendido: {df_produto_mais_vendido['Produto']}")

Produto mais vendido: DIESEL


In [65]:
# Ticket Médio por Produto
df_produto_ticket_medio = df.groupBy("Produto") \
                           .agg(F.avg("Valor de Venda").alias("Ticket_Medio")) \
                           .orderBy("Ticket_Medio", ascending=False)

df_produto_ticket_medio.show(6)

+------------------+-----------------+
|           Produto|     Ticket_Medio|
+------------------+-----------------+
|        DIESEL S10|6.283687943262412|
|            DIESEL|              6.2|
|GASOLINA ADITIVADA|5.880281690140845|
|          GASOLINA|5.858974358974359|
|            ETANOL|4.212765957446808|
|               GNV|             NULL|
+------------------+-----------------+



In [66]:
# Filtrar valores nulos na coluna 'Valor de Venda'
df_precos_filtrado = df_precos.filter(df_precos['Valor de Venda'].isNotNull())

In [67]:
# Análise de preços por estado e produto, removendo dados nulos de media_valor e desvio_padrao_valor
df_precos_estado_produto = (
    df_precos_filtrado
    .groupBy('Estado - Sigla', 'Produto')
    .agg(
        F.mean(F.col("Valor de Venda")).alias('media_valor'),
        F.stddev(F.col("Valor de Venda")).alias('desvio_padrao_valor')
    )
    .filter((F.col("media_valor").isNotNull()) & (F.col("desvio_padrao_valor").isNotNull()))  # Filtra valores nulos
    .orderBy('Estado - Sigla', 'Produto')
)

In [68]:
# Exibe as linhas do dataframe resultante
df_precos_estado_produto.show()

+--------------+------------------+------------------+-------------------+
|Estado - Sigla|           Produto|       media_valor|desvio_padrao_valor|
+--------------+------------------+------------------+-------------------+
|            AC|            DIESEL| 6.908141144023222| 0.6137181922203635|
|            AC|        DIESEL S10| 6.993102643474843| 0.6160033272184371|
|            AC|            ETANOL| 4.826099615374047|0.35412526689469315|
|            AC|          GASOLINA|  6.80286260506579| 0.4157144992728159|
|            AC|GASOLINA ADITIVADA| 6.743036654607164| 0.4062977990776531|
|            AL|            DIESEL| 5.952175704338893| 0.6496441001040734|
|            AL|        DIESEL S10|  5.95028788966858| 0.6187064842524237|
|            AL|            ETANOL|4.5055818808706185| 0.4408193417309498|
|            AL|          GASOLINA|  5.88424327181286|  0.355004429800144|
|            AL|GASOLINA ADITIVADA|5.9905605669694175| 0.3309449829136531|
|            AL|         

In [69]:
# Filtra os valores nulos na coluna 'Valor de Venda' e converte para o formato adequado
df_precos_filtrado = df.filter(df['Valor de Venda'].isNotNull()) \
    .withColumn('Valor de Venda', F.regexp_replace(F.col('Valor de Venda'), ',', '.').cast('float'))

In [70]:
# Análise estatística por estado e produto, com contagem dos registros válidos
df_precos_estado_produto = df_precos_filtrado.groupBy('Estado - Sigla', 'Produto') \
    .agg(
        F.mean('Valor de Venda').alias('media_valor'),
        F.stddev('Valor de Venda').alias('desvio_padrao_valor'),
        F.count('Valor de Venda').alias('contagem')
    ) \
    .filter((F.col('media_valor').isNotNull()) & (F.col('desvio_padrao_valor').isNotNull())) \
    .orderBy('Estado - Sigla', 'Produto')

In [71]:
# Exibe os registros do dataframe resultante
df_precos_estado_produto.show()

+--------------+------------------+------------------+-------------------+--------+
|Estado - Sigla|           Produto|       media_valor|desvio_padrao_valor|contagem|
+--------------+------------------+------------------+-------------------+--------+
|            AC|            DIESEL| 6.908141144023222| 0.6137181922203635|     425|
|            AC|        DIESEL S10| 6.993102643474843| 0.6160033272184371|     477|
|            AC|            ETANOL| 4.826099615374047|0.35412526689469315|     241|
|            AC|          GASOLINA|  6.80286260506579| 0.4157144992728159|     524|
|            AC|GASOLINA ADITIVADA| 6.743036654607164| 0.4062977990776531|     382|
|            AL|            DIESEL| 5.952175704338893| 0.6496441001040734|     717|
|            AL|        DIESEL S10|  5.95028788966858| 0.6187064842524237|     903|
|            AL|            ETANOL|4.5055818808706185| 0.4408193417309498|    1349|
|            AL|          GASOLINA|  5.88424327181286|  0.355004429800144|  

### Armazenar Dados no MongoDB

In [3]:
# Armazenar Dados no MongoDB
mongo_uri = os.getenv("MONGO_URI")
if not mongo_uri:
    raise ValueError("MONGO_URI não foi configurado corretamente.")

# Conectar ao MongoDB
client = MongoClient(mongo_uri)
db = client.get_database('sparkGLP')
collection = db.get_collection('spark_collection')

# Salvar dados no MongoDB
try:
    df.write.format("com.mongodb.spark.sql.DefaultSource") \
        .option("uri", mongo_uri) \
        .mode("append") \
        .save()
    print("Dados salvos com sucesso no MongoDB.")
except Exception as e:
    print(f"Erro ao salvar dados no MongoDB: {e}")

ValueError: MONGO_URI não foi configurado corretamente.