In [0]:
# === 1) (Re)Criar a view 'dados_ml' === 
spark.sql("""
CREATE OR REPLACE TEMP VIEW dados_ml AS
WITH vendas_mensais AS (
    SELECT
        dp.product_key,
        dp.product_name,
        dp.product_category_name,
        dp.product_subcategory_name,
        fs.customer_key,
        dc.territory_id,
        fs.territory_name,
        fs.country_region_code,
        YEAR(fs.order_date) AS ano,
        MONTH(fs.order_date) AS mes,
        QUARTER(fs.order_date) AS trimestre,
        DATE_TRUNC('MONTH', fs.order_date) AS data_referencia,
        SUM(fs.order_quantity) AS quantidade_vendida,
        SUM(fs.net_amount) AS total_vendas_liquidas
    FROM ted_dev.dev_guilherme_sobrinho_marts.fact_sales fs
    JOIN ted_dev.dev_guilherme_sobrinho_marts.dim_product dp
        ON fs.product_key = dp.product_key
    JOIN ted_dev.dev_guilherme_sobrinho_marts.dim_customer dc
        ON fs.customer_key = dc.customer_key
    GROUP BY
        dp.product_key, dp.product_name, dp.product_category_name, dp.product_subcategory_name,
        fs.customer_key, dc.territory_id, fs.territory_name, fs.country_region_code,
        YEAR(fs.order_date), MONTH(fs.order_date), QUARTER(fs.order_date),
        DATE_TRUNC('MONTH', fs.order_date)
),
com_lags AS (
    SELECT
        vm.*,
        COALESCE(LAG(vm.quantidade_vendida, 1) OVER (PARTITION BY vm.product_key, vm.territory_id ORDER BY vm.data_referencia), 0) AS lag_1,
        COALESCE(LAG(vm.quantidade_vendida, 2) OVER (PARTITION BY vm.product_key, vm.territory_id ORDER BY vm.data_referencia), 0) AS lag_2,
        COALESCE(LAG(vm.quantidade_vendida, 3) OVER (PARTITION BY vm.product_key, vm.territory_id ORDER BY vm.data_referencia), 0) AS lag_3,
        COALESCE(LAG(vm.quantidade_vendida, 12) OVER (PARTITION BY vm.product_key, vm.territory_id ORDER BY vm.data_referencia), 0) AS lag_12
    FROM vendas_mensais vm
)
SELECT
    *,
    (lag_1 + lag_2 + lag_3) / 3.0 AS media_movel_3m
FROM com_lags
""")
print("View 'dados_ml' (re)criada com sucesso.")

# === 2) Checar se view existe e mostrar amostra ===
try:
    df_sample = spark.sql("SELECT product_key, product_name, territory_id, data_referencia, quantidade_vendida, lag_1, lag_2, lag_3, lag_12, media_movel_3m FROM dados_ml LIMIT 5")
    display(df_sample)
    print("A view 'dados_ml' está pronta e com amostra exibida acima.")
except Exception as e:
    print("Erro ao acessar a view 'dados_ml':", e)
    raise

# === 3) Contador rápido pra confirmar registros ===
count = spark.sql("SELECT count(*) AS cnt FROM dados_ml").collect()[0]["cnt"]
print(f"Total de linhas em dados_ml: {count}")


Checkpoint 5 - Script completo para Databricks (AdventureWorks)

 - Previsões 3 meses por produto x loja (RF, XGB, LGB + média móvel)
 - Comparação de modelos de regressão (Linear, RF, GBoost, LightGBM)
 - Análise de sazonalidade para 1 produto
 - Crescimento por centro de distribuição (EUA vs Resto do Mundo) (baseline)
 - Estimativa de zíperes para luvas (2 por par)


In [0]:
# descomentar se necessário
%pip install xgboost
%pip install lightgbm
# %pip install prophet   # opcional

from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, DoubleType, StringType
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings("ignore")

# Modelagem / ML
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.linear_model import LinearRegression
from xgboost import XGBRegressor
from lightgbm import LGBMRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score, mean_absolute_percentage_error
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
import sklearn
import matplotlib.pyplot as plt
import seaborn as sns


In [0]:
# -------------------------
# 0) Verificar existência da view 'dados_ml'
# -------------------------
try:
    display(spark.sql("SELECT * FROM dados_ml LIMIT 3"))
except Exception as e:
    raise RuntimeError("A view 'dados_ml' não existe. Execute seu SQL de preparação antes deste script.") from e

# -------------------------
# 1) Preparar DataFrame Spark para forecast granular
# -------------------------
df_ml_spark = spark.table("dados_ml")

df_for_forecast = df_ml_spark.select(
    F.col("product_name").alias("produto"),
    F.col("territory_id").cast("string").alias("id_loja"),
    F.col("data_referencia").alias("ano_mes"),
    F.col("quantidade_vendida").alias("qtde"),
    F.col("media_movel_3m").alias("media_movel_3m"),
    F.col("product_category_name"),
    F.col("product_subcategory_name"),
    F.col("country_region_code"),
    F.col("product_key")
)

# Filtrar nulos / qtde <= 0
df_for_forecast = df_for_forecast.filter(F.col("id_loja").isNotNull() & (F.col("qtde") > 0))
df_for_forecast = df_for_forecast.repartition("produto", "id_loja")
print("Registros preparados para forecast (spark):", df_for_forecast.count())

In [0]:
# -------------------------
# 2) Função de forecast (applyInPandas)
# -------------------------
def forecast_3meses_com_metrica(pdf):
    produto = pdf['produto'].iloc[0]
    id_loja = str(pdf['id_loja'].iloc[0])

    pdf = pdf.sort_values('ano_mes').copy()
    pdf['ano_mes'] = pd.to_datetime(pdf['ano_mes'])
    pdf = pdf.reset_index(drop=True)

    # fallback quando poucos dados
    if len(pdf) < 6 or pdf['qtde'].sum() == 0:
        ma_pred = float(pdf['qtde'].mean() * 3) if len(pdf) > 0 else 0.0
        return pd.DataFrame({
            'produto': [produto],
            'id_loja': [id_loja],
            'rf': [ma_pred], 'xgb': [ma_pred], 'lgb': [ma_pred], 'ma': [ma_pred],
            'rf_mae': [None], 'rf_rmse': [None], 'rf_r2': [None],
            'xgb_mae': [None], 'xgb_rmse': [None], 'xgb_r2': [None],
            'lgb_mae': [None], 'lgb_rmse': [None], 'lgb_r2': [None]
        })

    pdf['ano'] = pdf['ano_mes'].dt.year
    pdf['mes'] = pdf['ano_mes'].dt.month
    X = pdf[['ano', 'mes']]
    y = pdf['qtde'].values

    if len(X) <= 3:
        ma_pred = float(pdf['qtde'].rolling(3, min_periods=1).mean().iloc[-1] * 3)
        return pd.DataFrame({
            'produto':[produto],'id_loja':[id_loja],
            'rf':[ma_pred], 'xgb':[ma_pred], 'lgb':[ma_pred], 'ma':[ma_pred],
            'rf_mae':[None],'rf_rmse':[None],'rf_r2':[None],
            'xgb_mae':[None],'xgb_rmse':[None],'xgb_r2':[None],
            'lgb_mae':[None],'lgb_rmse':[None],'lgb_r2':[None]
        })

    X_train, X_test = X[:-3].values, X[-3:].values
    y_train, y_test = y[:-3], y[-3:]

    # Modelos (valores de n_estimators ajustáveis)
    rf_model = RandomForestRegressor(n_estimators=50, random_state=42)
    rf_model.fit(X_train, y_train)

    xgb_model = XGBRegressor(n_estimators=50, n_jobs=1, random_state=42, verbosity=0)
    xgb_model.fit(X_train, y_train)

    lgb_model = LGBMRegressor(n_estimators=50, n_jobs=1, random_state=42, verbose=-1)
    lgb_model.fit(X_train, y_train)

    rf_test_pred = rf_model.predict(X_test)
    xgb_test_pred = xgb_model.predict(X_test)
    lgb_test_pred = lgb_model.predict(X_test)

    # Métricas com try/except (segurança)
    try:
        rf_mae = float(mean_absolute_error(y_test, rf_test_pred))
        rf_rmse = float(mean_squared_error(y_test, rf_test_pred, squared=False))
        rf_r2 = float(r2_score(y_test, rf_test_pred))
    except:
        rf_mae = rf_rmse = rf_r2 = None

    try:
        xgb_mae = float(mean_absolute_error(y_test, xgb_test_pred))
        xgb_rmse = float(mean_squared_error(y_test, xgb_test_pred, squared=False))
        xgb_r2 = float(r2_score(y_test, xgb_test_pred))
    except:
        xgb_mae = xgb_rmse = xgb_r2 = None

    try:
        lgb_mae = float(mean_absolute_error(y_test, lgb_test_pred))
        lgb_rmse = float(mean_squared_error(y_test, lgb_test_pred, squared=False))
        lgb_r2 = float(r2_score(y_test, lgb_test_pred))
    except:
        lgb_mae = lgb_rmse = lgb_r2 = None

    # Prever próximos 3 meses
    last_date = pdf['ano_mes'].max()
    future_dates = pd.date_range(start=last_date + pd.offsets.MonthBegin(1), periods=3, freq='MS')
    future_df = pd.DataFrame({'ano': future_dates.year, 'mes': future_dates.month})

    try:
        rf_pred = float(np.clip(rf_model.predict(future_df[['ano','mes']]), 0, None).sum())
    except:
        rf_pred = float('nan')
    try:
        xgb_pred = float(np.clip(xgb_model.predict(future_df[['ano','mes']]), 0, None).sum())
    except:
        xgb_pred = float('nan')
    try:
        lgb_pred = float(np.clip(lgb_model.predict(future_df[['ano','mes']]), 0, None).sum())
    except:
        lgb_pred = float('nan')

    ma_pred = float(pdf['qtde'].rolling(3, min_periods=1).mean().iloc[-1] * 3)

    return pd.DataFrame({
        'produto':[produto],
        'id_loja':[id_loja],
        'rf':[rf_pred], 'xgb':[xgb_pred], 'lgb':[lgb_pred], 'ma':[ma_pred],
        'rf_mae':[rf_mae], 'rf_rmse':[rf_rmse], 'rf_r2':[rf_r2],
        'xgb_mae':[xgb_mae], 'xgb_rmse':[xgb_rmse], 'xgb_r2':[xgb_r2],
        'lgb_mae':[lgb_mae], 'lgb_rmse':[lgb_rmse], 'lgb_r2':[lgb_r2]
    })

# Schema resultado
schema = StructType([
    StructField("produto", StringType()),
    StructField("id_loja", StringType()),
    StructField("rf", DoubleType()),
    StructField("xgb", DoubleType()),
    StructField("lgb", DoubleType()),
    StructField("ma", DoubleType()),
    StructField("rf_mae", DoubleType()),
    StructField("rf_rmse", DoubleType()),
    StructField("rf_r2", DoubleType()),
    StructField("xgb_mae", DoubleType()),
    StructField("xgb_rmse", DoubleType()),
    StructField("xgb_r2", DoubleType()),
    StructField("lgb_mae", DoubleType()),
    StructField("lgb_rmse", DoubleType()),
    StructField("lgb_r2", DoubleType())
])

# -------------------------
# 3) Aplicar applyInPandas (forecast por produto x loja)
# -------------------------
print("Iniciando applyInPandas para gerar previsões por produto x loja...")
df_forecast = df_for_forecast.groupBy("produto", "id_loja").applyInPandas(
    forecast_3meses_com_metrica,
    schema=schema
)
print("Forecast gerado. Exibindo amostra:")
display(df_forecast.limit(50))


In [0]:
# -------------------------
# 4) Converter view 'dados_ml' para Pandas (modelagem/regressão)
# -------------------------
print("Convertendo view 'dados_ml' para Pandas (para modelagem/regressão)...")
dados_ml_pd = spark.table("dados_ml").toPandas()
print("Registros em pandas:", len(dados_ml_pd))

if 'data_referencia' not in dados_ml_pd.columns:
    raise RuntimeError("Coluna 'data_referencia' não encontrada no dataframe pandas.")

dados_ml_pd['data_referencia'] = pd.to_datetime(dados_ml_pd['data_referencia'])


In [0]:

# -------------------------
# 5) Análise de sazonalidade (produto exemplo)
# -------------------------
product_escolhido = None
if product_escolhido is None:
    top_prod = dados_ml_pd.groupby('product_name')['quantidade_vendida'].sum().sort_values(ascending=False).head(1)
    if len(top_prod) == 0:
        raise RuntimeError("Não há dados suficientes para análise de sazonalidade.")
    product_escolhido = top_prod.index[0]

print(f"Analisando sazonalidade para o produto: {product_escolhido}")

df_prod_saz = dados_ml_pd[dados_ml_pd['product_name'] == product_escolhido].copy()
df_prod_saz = df_prod_saz.groupby(df_prod_saz['data_referencia'].dt.to_period('M'))['quantidade_vendida'].sum().reset_index()
df_prod_saz['ano_mes'] = df_prod_saz['data_referencia'].dt.to_timestamp()

plt.figure(figsize=(12,5))
sns.lineplot(data=df_prod_saz, x='ano_mes', y='quantidade_vendida', marker="o")
plt.title(f"Sazonalidade: vendas mensais para '{product_escolhido}'")
plt.xlabel("Mês")
plt.ylabel("Quantidade vendida")
plt.xticks(rotation=30)
plt.grid(True)
plt.show()


In [0]:

# -------------------------
# 6) Comparação de Modelos de Regressão
# -------------------------
print("\nComparando modelos de regressão usando features: lags + time + categorias")

# Garantir colunas de lag existem
for lag in ['lag_1', 'lag_2', 'lag_3', 'lag_12']:
    if lag not in dados_ml_pd.columns:
        dados_ml_pd[lag] = 0
    else:
        dados_ml_pd[lag] = dados_ml_pd[lag].fillna(0)

# Garantir período
dados_ml_pd['mes'] = dados_ml_pd['data_referencia'].dt.month
dados_ml_pd['trimestre'] = dados_ml_pd['data_referencia'].dt.quarter

features = ['lag_1', 'lag_2', 'lag_3', 'lag_12', 'mes', 'trimestre', 'product_category_name', 'product_subcategory_name']
target = 'quantidade_vendida'
df_features = dados_ml_pd[dados_ml_pd[target].notnull()].copy()

max_date = df_features['data_referencia'].max()
corte = max_date - pd.DateOffset(months=3)
train_df = df_features[df_features['data_referencia'] <= corte].copy()
test_df = df_features[df_features['data_referencia'] > corte].copy()
print(f"Treino: {train_df.shape[0]} linhas, Teste: {test_df.shape[0]} linhas")

X_train = train_df[features].copy()
y_train = train_df[target].values
X_test = test_df[features].copy()
y_test = test_df[target].values

categorical_features = ['product_category_name', 'product_subcategory_name']
preprocessor = ColumnTransformer(
    transformers=[
        ("ohe", OneHotEncoder(handle_unknown="ignore", sparse_output=False), categorical_features)
    ],
    remainder='passthrough'
)

models = {
    "LinearRegression": LinearRegression(),
    "RandomForest": RandomForestRegressor(n_estimators=100, random_state=42),
    "GradientBoosting": GradientBoostingRegressor(n_estimators=100, random_state=42),
    "LightGBM": LGBMRegressor(n_estimators=100, random_state=42)
}

results = {}
for name, model in models.items():
    try:
        pipe = Pipeline([("pre", preprocessor), ("model", model)])
        pipe.fit(X_train, y_train)
        preds = pipe.predict(X_test)
        mae = mean_absolute_error(y_test, preds)
        rmse = np.sqrt(mean_squared_error(y_test, preds))
        mape = mean_absolute_percentage_error(y_test, preds)
        results[name] = {"MAE": mae, "RMSE": rmse, "MAPE": mape}
    except Exception as e:
        results[name] = {"MAE": None, "RMSE": None, "MAPE": None}
        print(f"Erro treinando {name}: {e}")

print("\nMétricas de comparação dos modelos (test set):")
for name, mets in results.items():
    mae = f"{mets['MAE']:.2f}" if mets['MAE'] is not None else "NA"
    rmse = f"{mets['RMSE']:.2f}" if mets['RMSE'] is not None else "NA"
    mape = f"{mets['MAPE']:.2f}" if mets['MAPE'] is not None else "NA"
    print(f"- {name}: MAE={mae} | RMSE={rmse} | MAPE={mape}")

valid_results = {k:v for k,v in results.items() if v['RMSE'] is not None}
if valid_results:
    best = min(valid_results.items(), key=lambda x: x[1]['RMSE'])
    print(f"\nMelhor modelo (menor RMSE): {best[0]} (RMSE={best[1]['RMSE']:.2f})")
else:
    print("\nNão foi possível determinar o melhor modelo (faltam métricas).")

In [0]:

# -------------------------
# 7) Crescimento por Centro de Distribuição (baseline média móvel)
# -------------------------

print("\nAnalisando crescimento por centro de distribuição (EUA vs Resto do Mundo) usando baseline média móvel")

df_pred_baseline = df_ml_spark.select(
    F.col("product_key"),
    F.col("product_name"),
    F.col("territory_id"),
    F.col("country_region_code"),
    F.col("media_movel_3m").cast("double").alias("media_movel_3m")
).toPandas()

df_pred_baseline['media_movel_3m'] = df_pred_baseline['media_movel_3m'].fillna(0.0)
df_pred_baseline['previsao_3m_baseline'] = df_pred_baseline['media_movel_3m'] * 3.0

df_pred_baseline['territorio_grupo'] = df_pred_baseline['country_region_code'].apply(lambda x: 'EUA' if str(x).upper() == 'US' else 'Resto do Mundo')

crescimento = df_pred_baseline.groupby('territorio_grupo')['previsao_3m_baseline'].sum().reset_index()
print("\nPrevisão (baseline média móvel) - Próximos 3 meses por grupo de territórios:")
display(crescimento)

if crescimento.loc[crescimento['previsao_3m_baseline'].idxmax(),'territorio_grupo'] == 'EUA':
    print("👉 O grupo com maior crescimento previsto é: EUA")
else:
    print("👉 O grupo com maior crescimento previsto é: Resto do Mundo")

In [0]:
# -------------------------
# 8) Estimativa de zíperes para luvas (2 por par)
# -------------------------
print("\nEstimando zíperes necessários para luvas (2 por par) usando baseline média móvel")

# Pegar info de categoria/subcategoria para cada product_key
prod_info = df_ml_spark.select("product_key","product_name","product_category_name","product_subcategory_name").distinct().toPandas()
df_luvas = df_pred_baseline.merge(prod_info, on="product_key", how="left")

if 'product_category_name' in df_luvas.columns and 'product_subcategory_name' in df_luvas.columns:
    df_luvas_filtered = df_luvas[
        (df_luvas['product_category_name'].str.lower() == 'clothing') &
        (df_luvas['product_subcategory_name'].str.lower() == 'gloves')
    ]
else:
    df_luvas_filtered = df_luvas[df_luvas['product_name'].str.contains('glove|gloves', case=False, na=False)]

total_luvas_prev = df_luvas_filtered['previsao_3m_baseline'].sum()
zippers_needed = int(np.round(total_luvas_prev * 2))

print(f"Previsão total de pares de luvas (próx 3 meses, baseline): {total_luvas_prev:.2f}")
print(f"Estimativa de zíperes necessária (2 por par): {zippers_needed}")

In [0]:
# -------------------------
# 9) Outputs / Salvamentos finais (opcionais)
# -------------------------
print("\nExemplos de saída e salvamento (comente/descomente conforme desejar):")
display(df_forecast.orderBy(F.desc("ma")).limit(20))

# Exemplos de escrita
# df_forecast.write.format("delta").mode("overwrite").saveAsTable("dev_guilherme.forecast_produto_loja_3m")
# df_pred_baseline[['product_key','product_name','territory_id','country_region_code','previsao_3m_baseline']].to_csv("/dbfs/tmp/previsao_baseline_3m.csv", index=False)

print("\nScript finalizado com sucesso.")
