In [0]:
from pyspark.sql.functions import col, count, desc, lit, when, datediff, sum, round, avg, regexp_replace, to_date, to_timestamp, hour, struct, collect_list, to_json
import requests
import json

In [0]:

def spark_df_to_json_serializable(df, limit=100):
    """
    Converte um DataFrame Spark em uma string JSON que pode ser enviada via API.
    Retorna uma lista de dicionários Python.
    """
    # Limita o DataFrame para não enviar dados excessivos
    limited_df = df.limit(limit)
    
    # Usa o método nativo do Spark para converter para uma string JSON de um array de objetos
    json_string = limited_df.agg(
        to_json(collect_list(struct(*limited_df.columns)))
    ).first()[0]
    
    # Se o resultado for nulo ou vazio, retorna uma lista vazia
    if not json_string:
        return []
        
    # Converte a string JSON de volta para um objeto Python (lista de dicionários)
    return json.loads(json_string)

In [0]:
df_pedidos_ = spark.read.csv(
    "/Volumes/transacional/case_gocase/tmp/Case Dados - Pedidos.csv",
    header=True,
    inferSchema=True
)
df_itens = spark.read.csv(
    "/Volumes/transacional/case_gocase/tmp/Business Case Dados Itens.csv",
    header=True,
    inferSchema=True
)
df_supply = spark.read.csv(
    "/Volumes/transacional/case_gocase/tmp/Business Case Dados Supply.csv",
    header=True,
    inferSchema=True
)
# display(df_pedidos.limit(5))
# display(df_itens.limit(5))
# display(df_supply.limit(5))

In [0]:


# --- 1. Renomear colunas (sem alterações) ---
df_pedidos = (df_pedidos_
    .withColumnRenamed("CÃ³digo de Rastreio", "codigo_rastreio")
    .withColumnRenamed("Valor de NF (R$)", "valor_nf")
    .withColumnRenamed("Frete Cobrado do Cliente (R$)", "frete_cliente")
    .withColumnRenamed("Frete cobrado pela transportadora (R$)", "frete_transportadora")
    .withColumnRenamed("NÃºmero da NF", "numero_nf")
    .withColumnRenamed("Status do Pedido", "status_pedido")
    .withColumnRenamed("Prazo para Sair do CD", "prazo_saida_cd")
    .withColumnRenamed("Enviado em:", "data_envio")
    .withColumnRenamed("Entregue para o cliente em:", "data_entrega")
    .withColumnRenamed("Prazo a transportadora entregar no cliente", "prazo_entrega_transportadora")
    .withColumnRenamed("NÃºmero de Itens no Pedido", "num_itens")
    .withColumnRenamed("Peso (kg)", "peso_kg")
    .withColumnRenamed("id", "order_id")
)

# --- 2. Converter colunas numéricas (sem alterações) ---
cols_to_convert_pedidos = ["valor_nf", "frete_cliente", "frete_transportadora", "peso_kg"]
for column in cols_to_convert_pedidos:
    df_pedidos = df_pedidos.withColumn(column, regexp_replace(regexp_replace(col(column), "\\.", ""), ",", ".").cast("double"))

cols_to_convert_itens = ["material_weight_kg", "price"]
for column in cols_to_convert_itens:
    df_itens = df_itens.withColumn(column, regexp_replace(regexp_replace(col(column), "\\.", ""), ",", ".").cast("double"))

df_supply = df_supply.withColumn("quantity", regexp_replace(regexp_replace(col("quantity"), "\\.", ""), ",", ".").cast("double"))

# --- 3. NOVA ABORDAGEM PARA DATAS ---

# Dicionário para "traduzir" os meses, será usado para ambos os formatos
month_map = {
    "jan.,": "Jan", "fev.,": "Feb", "mar.,": "Mar", "abr.,": "Apr",
    "mai.,": "May", "jun.,": "Jun", "jul.,": "Jul", "ago.,": "Aug",
    "set.,": "Sep", "out.,": "Oct", "nov.,": "Nov", "dez.,": "Dec"
}

# 3.1: Tratar colunas que SÓ TÊM DATA (formato "10 mar., 2025")
date_only_cols = ["prazo_saida_cd", "data_envio", "data_entrega", "prazo_entrega_transportadora"]
format_date_only = "d MMM yyyy"

for date_col in date_only_cols:
    # Cria uma expressão inicial para a coluna
    expr = col(date_col)
    # Aplica a substituição de todos os meses
    for pt_month, en_month in month_map.items():
        expr = regexp_replace(expr, pt_month, en_month)
    
    # Atualiza a coluna no DataFrame, convertendo a string tratada para o tipo Date
    df_pedidos = df_pedidos.withColumn(date_col, to_date(expr, format_date_only))


# 3.2: Tratar a coluna `created_at` (formato "28 fev., 2025, 23:42")
format_datetime = "d MMM yyyy, HH:mm"

# Cria a expressão inicial para a coluna created_at
expr_ts = col("created_at")
# Aplica a substituição de todos os meses
for pt_month, en_month in month_map.items():
    expr_ts = regexp_replace(expr_ts, pt_month, en_month)

# Aplica as transformações em cadeia para criar as novas colunas
df_pedidos = (df_pedidos
    # 1. Cria uma coluna temporária com o timestamp completo
    .withColumn("created_at_ts", to_timestamp(expr_ts, format_datetime))
    # 2. Cria a coluna de DATA a partir do timestamp
    .withColumn("created_at_data", to_date(col("created_at_ts")))
    # 3. Cria a coluna de HORA a partir do timestamp
    .withColumn("created_at_hour", hour(col("created_at_ts")))
    # 4. Remove a coluna original e a temporária
    .drop("created_at", "created_at_ts")
)


# --- 4. Criar DataFrame base para análise ---
df_vendas = df_pedidos.join(df_itens, "order_id", "inner")


# --- 5. Verificação Final ---


# df_vendas.printSchema()


# display(df_pedidos.limit(5))


# display(df_vendas.limit(5))

In [0]:


# --- 1. Execute e colete os resultados de cada análise ---
print("Coletando resultados...")

# Análise 1.1: Oscilações nas Vendas e Impacto de Promoções
vendas_diarias_resultado = spark_df_to_json_serializable(vendas_diarias, limit=31)
print("-> Resultados de vendas diárias coletados.")

# Análise 1.2: Impacto da Promoção de Frete Grátis
analise_frete_resultado = spark_df_to_json_serializable(analise_frete, limit=10)
print("-> Resultados de desempenho por transportadora coletados.")

# Análise 1.3: Quais categorias ou produtos possuem maior impacto no faturamento?
faturamento_por_categoria_resultado = spark_df_to_json_serializable(faturamento_por_categoria, limit=50)
faturamento_por_produto_resultado = spark_df_to_json_serializable(faturamento_por_produto, limit=50)
print("-> Resultados da análise de cancelamento coletados.")

# Análise 2.1: Identificar Produtos Críticos em Ruptura
total_itens_unicos_resultado = spark_df_to_json_serializable(total_itens_unicos, limit=10)
itens_unicos_em_ruptura_resultado = spark_df_to_json_serializable(itens_unicos_em_ruptura, limit=10)
percentual_ruptura_resultado = spark_df_to_json_serializable(percentual_ruptura, limit=10)

# Análise 2.2: Priorizar a Reposição: Cruzando Ruptura com Demanda
ruptura_critica_resultado = spark_df_to_json_serializable(ruptura_critica_top20, limit=20)
ruptura_critica_faturamento_resultado = spark_df_to_json_serializable(ruptura_critica_faturamento_top20, limit=20)

# Análise 3.1: Calcular Tempos do Ciclo de Entrega
medias_logisticas_resultado = spark_df_to_json_serializable(medias_logisticas, limit=10)

# Analise 3.2: Análise de Atrasos e Desempenho por Transportadora
desempenho_transportadoras_resultado = spark_df_to_json_serializable(desempenho_transportadoras, limit=10)

# --- 2. Crie o "pacote" de dados (dicionário mestre) ---
# Cada chave do dicionário representa um relatório diferente.
master_payload = {
    "relatorio_vendas_diarias": vendas_diarias_resultado,
    "relatorio_analise_fretes": analise_frete_resultado,
    "faturamento_por_categoria_resultado": faturamento_por_categoria_resultado
    "faturamento_por_produto_resultado": faturamento_por_produto_resultado
    "total_itens_unicos_resultado": total_itens_unicos_resultado,
    "itens_unicos_em_ruptura_resultado": itens_unicos_em_ruptura_resultado,
    "percentual_ruptura_resultado": percentual_ruptura_resultado,
    "ruptura_critica_resultado": ruptura_critica_resultado,
    "ruptura_critica_faturamento_resultado": ruptura_critica_faturamento_resultado,
    "medias_logisticas_resultado": medias_logisticas_resultado,
    "desempenho_transportadoras_resultado": desempenho_transportadoras_resultado
}

# --- 3. Envie o pacote completo para o n8n ---
n8n_webhook_url = "https://emanoeljavier.app.n8n.cloud/webhook-test/2c0caced-9c0d-40b0-838f-04daab4f62ca" # <-- USE A URL DE PRODUÇÃO
headers = {'Content-Type': 'application/json'}

try:
    print("\nEnviando pacote de dados completo para o n8n...")
    response = requests.post(n8n_webhook_url, data=json.dumps(master_payload), headers=headers)
    response.raise_for_status()
    print("✅ Pacote de dados enviado com sucesso para o n8n!")
    print(f"Resposta do n8n: {response.json()}")

except Exception as e:
    print(f"❌ Falha ao enviar dados para o n8n: {e}")