In [0]:
# ===========================================================
# IMPORTAÇÕES
# ===========================================================
import requests
import zipfile
import pandas as pd
import unicodedata
import io
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
import matplotlib.pyplot as plt
import seaborn as sns
sns.set(style="whitegrid")

# ===========================================================
# BAIXAR E CARREGAR OS DADOS DA ANP (ZIP → CSV)
# ===========================================================
url_zip = "https://www.gov.br/anp/pt-br/centrais-de-conteudo/dados-abertos/arquivos/shpc/dsas/ca/ca-2025-01.zip"

response = requests.get(url_zip)
response.raise_for_status()
zip_bytes = io.BytesIO(response.content)

with zipfile.ZipFile(zip_bytes) as zip_ref:
    csv_name = [n for n in zip_ref.namelist() if n.lower().endswith(".csv")][0]
    csv_bytes = zip_ref.read(csv_name)

# ===========================================================
# LER CSV E NORMALIZAR COLUNAS
# ===========================================================
df = pd.read_csv(io.BytesIO(csv_bytes), sep=";", encoding="latin1", low_memory=False)

def normalize_column(col):
    col = col.lower()
    col = ''.join(c for c in unicodedata.normalize('NFD', col) if unicodedata.category(c) != 'Mn')
    col = col.replace(" ", "_")
    return col

df.columns = [normalize_column(c) for c in df.columns]
df.columns = [c.replace("ï»¿", "") for c in df.columns]  # remover BOM

# ===========================================================
# CONVERTER VALORES NUMÉRICOS
# ===========================================================
for coluna in ["valor_de_venda", "valor_de_compra"]:
    df[coluna] = df[coluna].astype(str).str.replace(",", ".", regex=False)
    df[coluna] = pd.to_numeric(df[coluna], errors="coerce")

# ===========================================================
# CRIAR DATAFRAME SPARK
# ===========================================================
spark_df = spark.createDataFrame(df)

spark_df = spark_df.withColumn("valor_de_venda",
    F.regexp_replace("valor_de_venda", ",", ".").cast(DoubleType())
)
spark_df = spark_df.withColumn("valor_de_compra",
    F.regexp_replace("valor_de_compra", ",", ".").cast(DoubleType())
)
spark_df = spark_df.withColumn("data_da_coleta",
    F.to_date("data_da_coleta", "dd/MM/yyyy")
)

# ===========================================================
# CORRIGIR NOMES DE COLUNAS
# ===========================================================
for col_name in spark_df.columns:
    if "regiao" in col_name:
        spark_df = spark_df.withColumnRenamed(col_name, "regiao_sigla")
    if "estado" in col_name:
        spark_df = spark_df.withColumnRenamed(col_name, "estado_sigla")

# ===========================================================
# LIMPEZA DE DADOS
# ===========================================================
from pyspark.sql import Window
from pyspark.sql.functions import col, row_number

window_state = Window.partitionBy("estado_sigla").orderBy("data_da_coleta")
spark_df = spark_df.withColumn("row_num", row_number().over(window_state))
spark_df = spark_df.filter(col("row_num") == 1).drop("row_num")

spark_df = spark_df.dropDuplicates()
spark_df = spark_df.withColumn("valor_de_compra", col("valor_de_venda") / 3)

# ===========================================================
# DIMENSÕES
# ===========================================================
dim_tempo = (
    spark_df
    .select("data_da_coleta").distinct()
    .withColumn("ano", F.year("data_da_coleta"))
    .withColumn("mes", F.month("data_da_coleta"))
    .withColumn("dia", F.dayofmonth("data_da_coleta"))
    .withColumnRenamed("data_da_coleta", "data")
    .withColumn("tempo_id", F.monotonically_increasing_id())
)
dim_tempo.createOrReplaceTempView("dim_tempo")

dim_produto = (
    spark_df
    .select("produto", "unidade_de_medida").distinct()
    .withColumn("produto_id", F.monotonically_increasing_id())
)
dim_produto.createOrReplaceTempView("dim_produto")

dim_posto = (
    spark_df.select(
        "cnpj_da_revenda", "revenda", "nome_da_rua", "numero_rua",
        "complemento", "bairro", "cep", "municipio",
        "estado_sigla", "regiao_sigla", "bandeira"
    )
    .distinct()
    .withColumnRenamed("cnpj_da_revenda", "posto_id")
)
dim_posto.createOrReplaceTempView("dim_posto")

# ===========================================================
# FATO PREÇOS
# ===========================================================
fato_precos = (
    spark_df.alias("df")
    .join(dim_posto.alias("po"), F.col("df.cnpj_da_revenda") == F.col("po.posto_id"), "left")
    .join(dim_produto.alias("pr"), F.col("df.produto") == F.col("pr.produto"), "left")
    .join(dim_tempo.alias("t"), F.col("df.data_da_coleta") == F.col("t.data"), "left")
    .select(
        F.col("po.posto_id"),
        F.col("pr.produto_id"),
        F.col("t.tempo_id"),
        F.col("df.valor_de_venda"),
        F.col("df.valor_de_compra")
    )
)
fato_precos.createOrReplaceTempView("fato_precos")

# ===========================================================
# ANALISES E GRÁFICOS (POSTOS, ESTADOS, BAIRROS, BANDEIRA, PRODUTOS, VARIAÇÃO)
# ===========================================================

# Top 10 postos mais rentáveis
query1 = spark.sql("""
    SELECT po.revenda,
           SUM(COALESCE(f.valor_de_venda,0) - COALESCE(f.valor_de_compra,0)) AS lucro_total,
           AVG(COALESCE(f.valor_de_venda,0) - COALESCE(f.valor_de_compra,0)) AS margem_media
    FROM fato_precos f
    JOIN dim_posto po ON f.posto_id = po.posto_id
    GROUP BY po.revenda
""")
top_postos = query1.toPandas().sort_values("lucro_total", ascending=False).head(10)
plt.figure(figsize=(10,6))
sns.barplot(data=top_postos, x="lucro_total", y="revenda", palette="viridis")
plt.title("Top 10 Postos Mais Rentáveis")
plt.show()

# Estado mais lucrativo
query_estado = spark.sql("""
    SELECT po.estado_sigla AS estado,
           SUM(COALESCE(f.valor_de_venda,0) - COALESCE(f.valor_de_compra,0)) AS lucro_total
    FROM fato_precos f
    JOIN dim_posto po ON f.posto_id = po.posto_id
    GROUP BY po.estado_sigla
    ORDER BY lucro_total DESC
    LIMIT 1
""")
estado_selecionado = query_estado.toPandas().loc[0, "estado"]
print("Estado mais lucrativo:", estado_selecionado)

# Top 10 bairros mais rentáveis
query_bairro = spark.sql(f"""
    SELECT po.bairro,
           SUM(COALESCE(f.valor_de_venda,0) - COALESCE(f.valor_de_compra,0)) AS lucro_total,
           AVG(COALESCE(f.valor_de_venda,0) - COALESCE(f.valor_de_compra,0)) AS margem_media
    FROM fato_precos f
    JOIN dim_posto po ON f.posto_id = po.posto_id
    WHERE po.estado_sigla = '{estado_selecionado}'
    GROUP BY po.bairro
    ORDER BY lucro_total DESC
    LIMIT 10
""")
bairros_lucrativos = query_bairro.toPandas()
plt.figure(figsize=(10,6))
sns.barplot(data=bairros_lucrativos, x="lucro_total", y="bairro", palette="viridis")
plt.title(f"Bairros Mais Rentáveis em {estado_selecionado}")
plt.show()

# Lucro por bandeira
query4 = spark.sql("""
    SELECT po.bandeira,
           SUM(COALESCE(f.valor_de_venda,0) - COALESCE(f.valor_de_compra,0)) AS lucro_total
    FROM fato_precos f
    JOIN dim_posto po ON f.posto_id = po.posto_id
    GROUP BY po.bandeira
""")
bandeiras = query4.toPandas().sort_values("lucro_total", ascending=False)
plt.figure(figsize=(8,5))
sns.barplot(data=bandeiras, x="lucro_total", y="bandeira", palette="coolwarm")
plt.title("Lucro Total por Bandeira")
plt.show()

# Produtos — preço médio x margem média
query5 = spark.sql("""
    SELECT p.produto,
           AVG(COALESCE(f.valor_de_venda,0)) AS preco_medio,
           AVG(COALESCE(f.valor_de_venda,0) - COALESCE(f.valor_de_compra,0)) AS margem_media
    FROM fato_precos f
    JOIN dim_produto p ON f.produto_id = p.produto_id
    GROUP BY p.produto
""")
produtos = query5.toPandas()
plt.figure(figsize=(8,6))
sns.scatterplot(data=produtos, x="preco_medio", y="margem_media", hue="produto", s=120)
plt.title("Preço Médio x Margem Média por Produto")
plt.show()

# Maiores aumentos percentuais de preço
query6 = spark.sql("""
    SELECT po.revenda,
           p.produto,
           MIN(COALESCE(f.valor_de_venda,0)) AS preco_min,
           MAX(COALESCE(f.valor_de_venda,0)) AS preco_max,
           ( (MAX(COALESCE(f.valor_de_venda,0)) - MIN(COALESCE(f.valor_de_venda,0)))
            / NULLIF(MIN(COALESCE(f.valor_de_venda,0)), 0) ) * 100 AS variacao_percentual
    FROM fato_precos f
    JOIN dim_posto po ON f.posto_id = po.posto_id
    JOIN dim_produto p ON f.produto_id = p.produto_id
    GROUP BY po.revenda, p.produto
""")
variacoes = query6.toPandas().sort_values("variacao_percentual", ascending=False).head(10)
plt.figure(figsize=(10,6))
sns.barplot(data=variacoes, x="variacao_percentual", y="revenda", hue="produto", dodge=False)
plt.title("Top 10 Maiores Aumentos de Preço")
plt.show()

# Margem negativa
query7 = spark.sql("""
    SELECT po.revenda,
           p.produto,
           AVG(COALESCE(f.valor_de_venda,0)) AS preco_medio,
           AVG(COALESCE(f.valor_de_compra,0)) AS custo_medio,
           AVG(COALESCE(f.valor_de_venda,0) - COALESCE(f.valor_de_compra,0)) AS margem_media
    FROM fato_precos f
    JOIN dim_posto po ON f.posto_id = po.posto_id
    JOIN dim_produto p ON f.produto_id = p.produto_id
    GROUP BY po.revenda, p.produto
    HAVING AVG(COALESCE(f.valor_de_venda,0) - COALESCE(f.valor_de_compra,0)) < 0
""")
margem_negativa = query7.toPandas()
if not margem_negativa.empty:
    plt.figure(figsize=(10,6))
    sns.barplot(data=margem_negativa, x="margem_media", y="revenda", hue="produto", palette="Reds_r", dodge=False)
    plt.title("Produtos/Postos com Margem Negativa")
    plt.show()
else:
    print("Nenhum produto/posto com margem negativa encontrado.")
