In [0]:
%pip install xgboost lightgbm


In [0]:
# =========================
# Imports
# =========================
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, DoubleType, LongType, StringType
from sklearn.ensemble import RandomForestRegressor
import xgboost as xgb
import lightgbm as lgb
import pandas as pd
import numpy as np

# =========================
# Carregar dados agregados por produto + loja + mês
# =========================
df_vendas = spark.table("ted_dev.dbt_acroccia_aw_marts.fato_pedidos_2")

# Criar coluna ano_mes e converter para date
df_vendas = df_vendas.withColumn("ano_mes", F.date_format("data_de_venda", "yyyy-MM"))
df_agg = df_vendas.groupBy("produto", "id_loja", "ano_mes") \
                  .agg(F.sum("qtde").alias("qtde"))

df_agg = df_agg.withColumn(
    "ano_mes",
    F.to_date(F.concat_ws("-", F.col("ano_mes"), F.lit("01")), "yyyy-MM-dd")
)

df_agg = df_agg.filter(F.col("id_loja").isNotNull() & (F.col("qtde") > 0))
df_agg = df_agg.repartition("produto", "id_loja")

# =========================
# Função de forecast otimizada
# =========================
def forecast_3meses(pdf):
    produto = pdf['produto'].iloc[0]
    loja = pdf['id_loja'].iloc[0]
    pdf = pdf.sort_values('ano_mes').copy()
    pdf['ano_mes'] = pd.to_datetime(pdf['ano_mes'])
    
    # Se histórico insuficiente ou vendas zeradas: usar média móvel
    if len(pdf) < 3 or pdf['qtde'].sum() == 0:
        ma_pred = pdf['qtde'].mean() * 3 if len(pdf) > 0 else 0
        return pd.DataFrame({
            'produto':[produto],
            'id_loja':[loja],
            'rf':[0],
            'xgb':[0],
            'lgb':[0],
            'ma':[ma_pred]
        })
    
    # Features de tempo
    pdf['ano'] = pdf['ano_mes'].dt.year
    pdf['mes'] = pdf['ano_mes'].dt.month
    X = pdf[['ano','mes']]
    y = pdf['qtde'].values
    
    # Treinar modelos
    rf_model = RandomForestRegressor(n_estimators=5, random_state=42)
    rf_model.fit(X, y)
    xgb_model = xgb.XGBRegressor(n_estimators=5, n_jobs=-1, random_state=42)
    xgb_model.fit(X, y)
    lgb_model = lgb.LGBMRegressor(n_estimators=5, n_jobs=-1, random_state=42)
    lgb_model.fit(X, y)
    
    # Próximos 3 meses
    last_date = pdf['ano_mes'].max()
    future_dates = pd.date_range(start=last_date + pd.offsets.MonthBegin(1), periods=3, freq='MS')
    future_df = pd.DataFrame({'ano': future_dates.year, 'mes': future_dates.month})
    
    # Previsões
    rf_pred = np.clip(rf_model.predict(future_df), 0, None).sum()
    xgb_pred = np.clip(xgb_model.predict(future_df), 0, None).sum()
    lgb_pred = np.clip(lgb_model.predict(future_df), 0, None).sum()
    ma_pred = pdf['qtde'].rolling(3, min_periods=1).mean().iloc[-1] * 3
    
    return pd.DataFrame({
        'produto':[produto],
        'id_loja':[loja],
        'rf':[rf_pred],
        'xgb':[xgb_pred],
        'lgb':[lgb_pred],
        'ma':[ma_pred]
    })

# =========================
# Schema
# =========================
# =========================
# Schema corrigido
# =========================

schema = StructType([
    StructField("produto", StringType()),   # produto como string
    StructField("id_loja", StringType()),   # id_loja como string
    StructField("rf", DoubleType()),
    StructField("xgb", DoubleType()),
    StructField("lgb", DoubleType()),
    StructField("ma", DoubleType())
])


# =========================
# Aplicar forecast em paralelo
# =========================
df_forecast = df_agg.groupBy("produto","id_loja") \
                    .applyInPandas(forecast_3meses, schema=schema)

# =========================
# Mostrar resultado rápido
# =========================
df_forecast.show(5, truncate=False)

# =========================
# Salvar em Delta
# =========================
spark.sql("CREATE SCHEMA IF NOT EXISTS ted_dev.dev_andre_silva")
df_forecast.write.mode("overwrite") \
    .format("delta") \
    .saveAsTable("ted_dev.dev_andre_silva.forecast_loja_3meses")

In [0]:
%sql
-- =========================
-- Exemplo Sazonalidade
-- =========================
SELECT
    data_de_venda,
    SUM(qtde) AS qtde_mensal
FROM ted_dev.dbt_acroccia_aw_marts.fato_pedidos_2
WHERE produto = 'AWC Logo Cap'
  AND id_loja = '1002'
GROUP BY data_de_venda
ORDER BY data_de_venda;


In [0]:
%sql
WITH previsao_luvas AS (
    SELECT 
        produto,
        id_loja,
        rf, xgb, lgb, ma,
        (rf + xgb + lgb + ma)/4 AS media_modelos
    FROM ted_dev.dev_andre_silva.forecast_loja_3meses 
    WHERE LOWER(produto) LIKE '%gloves%'
)
SELECT
    SUM(media_modelos) * 2 AS ziperes_necessarios_prox_3_meses
FROM previsao_luvas;


In [0]:
%sql
    SELECT 
        produto,
        id_loja,
        rf, xgb, lgb, ma,
        (rf + xgb + lgb + ma)/4 AS media_modelos
    FROM ted_dev.dev_andre_silva.forecast_loja_3meses 
    WHERE LOWER(produto) LIKE '%gloves%'


In [0]:
%sql
SELECT
    YEAR(data_de_venda) AS ano,
    MONTH(data_de_venda) AS mes,
    SUM(qtde) AS total_luvas_vendidas
FROM ted_dev.dbt_acroccia_aw_marts.fato_pedidos_2
WHERE LOWER(produto) LIKE '%glove%'
GROUP BY YEAR(data_de_venda), MONTH(data_de_venda)
ORDER BY ano, mes;



