# Preparação do ambiente

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
!pip install -q findspark pyspark

In [5]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("previsaodemanda") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()
print("Spark configurado corretamente!")

Spark configurado corretamente!


# Importando as bibliotecas

In [6]:
import os
import sys
import pytz
import numpy as np
import pandas as pd
import datetime
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import udf, lpad, translate
from datetime import datetime
from datetime import timedelta
from datetime import date
from dateutil.relativedelta import relativedelta
from pyspark.sql.types import *
from pyspark.sql.functions import count, avg, col, to_date, trim, when,sum, min, max,lit,to_timestamp, year, date_format,split, lpad, concat_ws
from pyspark.sql import functions as F
import builtins
import pyspark.sql.types as T

# Leitura e Governança dos Dados

In [None]:
tz       = pytz.timezone("Europe/Lisbon")
agora    = datetime.now(tz)
dthproc  = agora.strftime("%Y%m%d%H%M%S")
file     = f'previsao_demanda_{dthproc}.csv'
ts_file_generation = file.split("_")[-1].replace('.csv','') + '00'
print("ts_file_generation =", ts_file_generation)

In [None]:
base_dir        = "/content/drive/MyDrive/PREVISAO_DEMANDA/INGESTION"
lake_dir        = "/content/drive/MyDrive/PREVISAO_DEMANDA/RAW"
controle_dir    = os.path.join(lake_dir, "CONTROLE_RAW", f"raw_tb_controle_procesamento_{dthproc}")
metadados_dir   = os.path.join(lake_dir, "METADADOS_RAW")

In [None]:
path_input = os.path.join(base_dir, "Historical_Product_Demand.csv")
abt_demanda = spark.read.csv(
    path_input,
    header=True,
    inferSchema=True
)
abt_demanda.createOrReplaceTempView("abt_demanda")
abt_demanda.cache()
qtd = abt_demanda.count()
print("records abt_demanda:", qtd)

In [None]:
controle = spark.sql(f"""
  SELECT
    '{file}'               AS name_file,
    {qtd}                  AS qtd_records,
    '{dthproc}'            AS ts_proc,
    '{ts_file_generation}' AS ts_file_generation
""")

controle.write \
    .mode("append") \
    .partitionBy("ts_proc") \
    .parquet(controle_dir)
print("Controle gravado em:", controle_dir)

# EDA

### Visualização dos dados:

In [None]:
abt_demanda.show()

# Metadados

In [None]:
def generate_metadados(dataframe):
    # Nome das colunas
    column_names = dataframe.columns

    # Tipo de dados das colunas
    dtypes = [dtype for _, dtype in dataframe.dtypes]

    # Contagem de valores nulos
    null_counts = [dataframe.filter(F.col(c).isNull()).count() for c in column_names]

    # Percentual de valores nulos
    total_count = dataframe.count()
    null_percents = [round((null_count / total_count) * 100, 2) for null_count in null_counts]

    # Cardinalidade (número de valores únicos)
    cardinality = [dataframe.select(c).distinct().count() for c in column_names]

    # Criar um DataFrame PySpark com as informações
    metadata = spark.createDataFrame(
        zip(column_names, dtypes, null_counts, null_percents, cardinality),
        schema=["nome_variavel", "tipo", "qt_nulos", "percent_nulos", "cardinalidade"]
    )

    # Ordenar pelo tipo
    metadata = metadata.orderBy(F.desc("percent_nulos"))

    return metadata


In [None]:
metadata_df = generate_metadados(abt_demanda)
metadata_df.show(10, truncate=False)

In [None]:
path_metadados = os.path.join(metadados_dir, f"tb_metadados_inicial_{dthproc}")

md = metadata_df.withColumn("dthproc", lit(dthproc))

md.write \
    .mode("append") \
    .partitionBy("dthproc") \
    .parquet(path_metadados)
print("Metadados gravados em:", path_metadados)

## Correção do dtype da variável Order_Demand, e o formato da data conforme visto no metadados:
* Extração do ano e ano/mês para realizarmos a análise dos dados.


In [None]:
abt_demanda.createOrReplaceTempView("abt_demanda")


In [None]:
train_abt_demanda = abt_demanda.withColumn(
    "iso_date",
    concat_ws("-",
        split(col("Date"), "/")[0],
        lpad(split(col("Date"), "/")[1], 2, "0"),
        lpad(split(col("Date"), "/")[2], 2, "0")
    )
)

In [None]:
train_abt_demanda.show()

In [None]:
train_abt_demanda = train_abt_demanda \
    .withColumn("date", to_date(col("iso_date"), "yyyy-MM-dd")) \
    .withColumn("year", year(col("date"))) \
    .withColumn("month_reference", date_format(col("date"), "yyyyMM")) \
    .withColumn("Order_Demand", col("Order_Demand").cast("integer")) \
    .drop("Date")

train_abt_demanda.printSchema()
train_abt_demanda.show(truncate=False)
train_abt_demanda.createOrReplaceTempView('train_abt_demanda')

* Corringindo o dtype da variável iso_date e renomeando.

In [None]:
train_abt_demanda_00 = train_abt_demanda \
    .withColumn("Date", to_date(col("iso_date"), "yyyy-MM-dd")) \
    .drop("iso_date")

train_abt_demanda_00.show(truncate=False)
train_abt_demanda_00.createOrReplaceTempView('train_abt_demanda_00')

## Contagem dos nulos na variável Date e Order_Demand por Armazém .

In [None]:
spark.sql("""
  SELECT
    Warehouse,
    COUNT(*)  AS total_linhas,
    SUM(CASE WHEN Date IS NULL THEN 1 ELSE 0 END) AS nulls_date,
    SUM(CASE WHEN Date = 'NA' THEN 1 ELSE 0 END) AS nulls_na,
    SUM(CASE WHEN Order_Demand IS NULL THEN 1 ELSE 0 END) AS nulls_order_demand,
    ROUND(100.0 * SUM(CASE WHEN Date IS NULL THEN 1 ELSE 0 END) / COUNT(*), 2) AS pct_nulls,
    ROUND(100.0 * SUM(CASE WHEN Date = 'NA' THEN 1 ELSE 0 END) / COUNT(*), 2) AS pct_na,
    ROUND(100.0 * SUM(CASE WHEN Order_Demand IS NULL THEN 1 ELSE 0 END) / COUNT(*), 2) AS pct_nulls_order_demand
  FROM train_abt_demanda_00
  GROUP BY Warehouse
  ORDER BY nulls_date DESC
""").show(truncate=False)

* O Armazém Whse_A, é O único que contém datas null, que corresponde a 7,32%, ou seja, um percentual baixo para um total de 153574 linhas de registros. E Possui null também na variável nulls_order_demand, 4,16%, percentual baixo também.
Vamos ver a data mínima e máxima que estão no DF.

In [None]:
train_abt_demanda_00.agg(
    min("Date").alias("min_date"),
    max("Date").alias("max_date")
).show()

# Visualização por gráficos:

In [None]:
train_pandas = train_abt_demanda_00.toPandas()

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Mantém o estilo seaborn
sns.set(style="whitegrid")

# 1) Agrega: conta quantas linhas há em cada mês
df_monthly = (
    train_pandas
      .groupby('month_reference')
      .size()
      .reset_index(name='total_products')
)

# 2) Garante que o mês está em ordem cronológica
df_monthly = df_monthly.sort_values('month_reference')

# 3) Plot
plt.figure(figsize=(18, 8))
plt.plot(
    df_monthly['month_reference'].astype(str),
    df_monthly['total_products'],
    marker='o',
    linestyle='-',
    linewidth=2,
    markersize=6
)
plt.xlabel('Mês de Referência', fontsize=12)
plt.ylabel('Total de Produtos', fontsize=12)
plt.title('Total de Produtos por Mês de Referência', fontsize=16)
plt.xticks(rotation=90, fontsize=10)
plt.yticks(fontsize=10)
plt.grid(True, linestyle='--', alpha=0.7)
plt.tight_layout()

# Anotações a cada 3 meses, por exemplo
for i in range(0, len(df_monthly), 3):
    plt.annotate(
        df_monthly['total_products'].iloc[i],
        (df_monthly['month_reference'].astype(str).iloc[i],
         df_monthly['total_products'].iloc[i]),
        textcoords="offset points",
        xytext=(0, 8),
        ha='center',
        fontsize=9
    )

plt.show()

## Gráfico 1 - Observações Produtos por safra:

* Os meses do ano de 2011, não possuem um histórico,podemos retirar do df.

* A partir de 01/2012 que realmente começa inicia a grande movimentação.

* Houve um pico em 10/2013 onde chegou atingir quase 25.000 produtos, porém, retornou ao padrão.

* A partir de 08/2015 podemos enxergar uma leve queda no total de produtos que  mantem-se até o final do dataset.

In [None]:
sns.set(style="whitegrid")

# Agrega e conta quantos registros existem por mês e warehouse
df_ref_wh = (
    train_pandas
      .groupby(['month_reference', 'Warehouse'])
      .size()
      .reset_index(name='Total_produtos')
)

# Ordena os meses cronologicamente
df_ref_wh = df_ref_wh.sort_values('month_reference')

# Plota
plt.figure(figsize=(14, 7))

for wh in df_ref_wh['Warehouse'].unique():
    wh_data = df_ref_wh[df_ref_wh['Warehouse'] == wh]
    plt.plot(
        wh_data['month_reference'],
        wh_data['Total_produtos'],
        marker='o',
        linestyle='-',
        linewidth=2,
        label=f'Warehouse {wh}'
    )

plt.xlabel('Mês de Referência', fontsize=12)
plt.ylabel('Total de Produtos', fontsize=12)
plt.title('Total de Produtos por Mês de Referência e Warehouse', fontsize=16)
plt.xticks(df_ref_wh['month_reference'].unique(), rotation=90, fontsize=10)
plt.yticks(fontsize=10)
plt.legend(title='Warehouse')
plt.grid(True, linestyle='--', alpha=0.7)
plt.tight_layout()

plt.show()


## Gráfico 2 - Observações Produtos por safra por armazém:

* Claramente podemos enxergar que o armazém Whse_J movimenta, em média, de 11 000 a 15 000 produtos por mês, muito acima dos demais. Isso indica que ele serve como hub principal de distribuição.

* Existe um padrão sazonal, em 05/ e 08/ de cada ano (2014–2016) observam-se quedas acentuadas em todos (ou na maioria) dos armazéns, sugerindo um efeito de safra, férias ou restrições sazonais que afetam o fluxo de produtos.

* Agrupamento de comportamentos, Whse_A e Whse_S seguem movimentos bastante próximos, especialmente até 2015. A partir de 2016, o Whse_S apresenta um decréscimo mais forte, distanciando-se do Whse_A.
Whse_C exibe um padrão de baixa amplitude e estabilidade, movimentando sempre menos de 1 000 produtos por mês.

In [None]:
sns.set(style="whitegrid")

# Agrega registros (Product_Code) em cada Product_Category
df_category_counts = (
    train_pandas
      .groupby('Product_Category')['Product_Code']
      .count()
      .reset_index(name='count')
)

# 2) Ordena categorias por contagem decrescente
df_category_counts = df_category_counts.sort_values('count', ascending=False)

# 3) Plot de barras
plt.figure(figsize=(14, 7))
sns.barplot(
    data=df_category_counts,
    x='Product_Category',
    y='count'
)

plt.xlabel('Categorias', fontsize=12)
plt.ylabel('Quantidade de Produtos', fontsize=12)
plt.title('Quantidade de Produtos por Categoria', fontsize=16)
plt.xticks(rotation=90, fontsize=10)
plt.grid(True, linestyle='--', alpha=0.7, axis='y')
plt.tight_layout()
plt.show()

## Gráfico 3 - Observações por categoria:

* Claramente a catgria 019 lidera, disparado, das outras, ou seja ela impacta bastante, mas não sabemos se todos os armazéns.
* Depois temos 2 categorias, 005 e 001, na mesma faixa, a categoria 007 um pouco abaixo das duas e a 021 já na metade das duas.
* As demais vão caindo gradualmente.


# Vamos olhar por armazém:

In [None]:
# DataFrame pandas train_pandas
sns.set(style="whitegrid")

# 2) Agrupa por Warehouse e Product_Category para contar Product_Code
df_cat_wh_counts = (
    train_pandas
      .groupby(['Warehouse', 'Product_Category'])['Product_Code']
      .count()
      .reset_index(name='count')
)

# Lista de warehouses únicas
warehouses = df_cat_wh_counts['Warehouse'].unique()

# Cria subplots 2x2
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
axes = axes.flatten()

# Para cada warehouse, filtra os dados e plota o barplot
for ax, wh in zip(axes, warehouses):
    data_wh = (
        df_cat_wh_counts
          .loc[df_cat_wh_counts['Warehouse'] == wh]
          .sort_values('count', ascending=False)
    )
    sns.barplot(
        data=data_wh,
        x='Product_Category',
        y='count',
        ax=ax
    )
    ax.set_title(f'Warehouse {wh}', fontsize=14)
    ax.set_xlabel('Categoria', fontsize=12)
    ax.set_ylabel('Quantidade de Produtos', fontsize=12)
    ax.tick_params(axis='x', rotation=90, labelsize=10)
    ax.grid(True, linestyle='--', alpha=0.7, axis='y')

# Remove eixos extras
for idx in range(len(warehouses), len(axes)):
    fig.delaxes(axes[idx])

plt.tight_layout()
plt.show()


## Gráfico 4 - Distribuição de Volume por Categoria em Cada Armazém:

Em todos os quatro armazéns, a Categoria_019 aparece isoladamente no topo, representando de 30 % a quase 80 % do total de volume em cada local. Ilustrando uma distribuição do tipo Pareto (80/20), em que uma única categoria domina o fluxo de estoque.

Embora a categoria de maior peso seja a mesma, a segunda, terceira e demais posições variam significativamente de armazém para armazém. Isso reflete diferentes padrões regionais de demanda ou mix de clientes atendidos por cada depósito.

# Vamos observar quantos produtos possui em cada categoria, vamos ver o mix:

In [None]:
spark.sql("""
  SELECT
    Product_Category,
    COUNT(DISTINCT Product_Code) AS distinct_product_count
  FROM train_abt_demanda_00
  GROUP BY Product_Category
  ORDER BY distinct_product_count DESC
""").show()

* A Category_019 concentra 534 SKUs, mais que o dobro da segunda colocada (Category_001, com 278 SKUs), e quase triplica diversas outras categorias. Esse grau de concentração implica que qualquer variação na performance ou no abastecimento dessa categoria terá impacto desproporcional no resultado geral.

Vou confirmar se podem repetir SKUs em categorias diferentes:

In [None]:
spark.sql("""
  SELECT
    Product_Code,
    COUNT(DISTINCT Product_Category) AS num_categorias,
    COLLECT_SET(Product_Category)   AS categorias
  FROM train_abt_demanda_00
  GROUP BY Product_Code
  HAVING num_categorias > 1
  ORDER BY num_categorias DESC, Product_Code
""").show(truncate=False)

### Não há um mesmo SKU cadastrado em categorias diferentes, são únicos.

In [None]:
spark.sql("""
WITH prod_counts AS (
  SELECT
    Product_Category,
    Product_Code,
    COUNT(*) AS total_ocorrencias
  FROM train_abt_demanda_00
  WHERE Product_Category IN (
    'Category_019','Category_001','Category_005','Category_021',
    'Category_007','Category_006','Category_009','Category_030'
  )
  GROUP BY Product_Category, Product_Code
),
ranked AS (
  SELECT
    Product_Category,
    Product_Code,
    total_ocorrencias,
    ROW_NUMBER() OVER (
      PARTITION BY Product_Category
      ORDER BY total_ocorrencias DESC
    ) AS rn
  FROM prod_counts
)
SELECT
  rn
  /* Category 019 */
  , MAX(CASE WHEN Product_Category = 'Category_019' THEN Product_Code      END) AS Category_019_Product
  , MAX(CASE WHEN Product_Category = 'Category_019' THEN total_ocorrencias END) AS Category_019_Count
  /* Category 001 */
  , MAX(CASE WHEN Product_Category = 'Category_001' THEN Product_Code      END) AS Category_001_Product
  , MAX(CASE WHEN Product_Category = 'Category_001' THEN total_ocorrencias END) AS Category_001_Count
  /* Category 005 */
  , MAX(CASE WHEN Product_Category = 'Category_005' THEN Product_Code      END) AS Category_005_Product
  , MAX(CASE WHEN Product_Category = 'Category_005' THEN total_ocorrencias END) AS Category_005_Count
  /* Category 021 */
  , MAX(CASE WHEN Product_Category = 'Category_021' THEN Product_Code      END) AS Category_021_Product
  , MAX(CASE WHEN Product_Category = 'Category_021' THEN total_ocorrencias END) AS Category_021_Count
  /* Category 007 */
  , MAX(CASE WHEN Product_Category = 'Category_007' THEN Product_Code      END) AS Category_007_Product
  , MAX(CASE WHEN Product_Category = 'Category_007' THEN total_ocorrencias END) AS Category_007_Count
  /* Category 006 */
  , MAX(CASE WHEN Product_Category = 'Category_006' THEN Product_Code      END) AS Category_006_Product
  , MAX(CASE WHEN Product_Category = 'Category_006' THEN total_ocorrencias END) AS Category_006_Count
  /* Category 009 */
  , MAX(CASE WHEN Product_Category = 'Category_009' THEN Product_Code      END) AS Category_009_Product
  , MAX(CASE WHEN Product_Category = 'Category_009' THEN total_ocorrencias END) AS Category_009_Count
  /* Category 030 */
  , MAX(CASE WHEN Product_Category = 'Category_030' THEN Product_Code      END) AS Category_030_Product
  , MAX(CASE WHEN Product_Category = 'Category_030' THEN total_ocorrencias END) AS Category_030_Count
FROM ranked
WHERE rn <= 15
GROUP BY rn
ORDER BY rn
""").show(15, truncate = False)


# Ranking de produtos por categoria:

* Com esse janela, consegui identificar quais os produtos com mais ocorrência  em cada categoria (rn 1–15), o que é de suma importância, pois já pensando na modelagem: é crucial conferir se os produtos mais demandados estão representados nas features.

* Serve de baseline para comparar com as variáveis de entrada do modelo.

* Permite detectar data drift: mudanças nos top-N podem sinalizar desvios na demanda ou inconsistências na ingestão.

* Ajuda a priorizar features de “popularidade” e validar cobertura de produtos críticos no treinamento.

* Apoio a auditoria contínua: caso um top product deixe de aparecer,sinal de possível quebra de padrão ou erro nos dados.




### Retirando o ano de 2011, e salvando a nova tabela de dados.

In [None]:
df_train= spark.sql("""
  SELECT *
    FROM train_abt_demanda_00
   WHERE COALESCE(year, 0) <> 2011
""")

df_train.show()

# Check de sanidade!

In [None]:
df_train.agg(
    min("Date").alias("min_date"),
    max("Date").alias("max_date")
).show()
qtd = df_train.count()
print("records df_train:", qtd)

# Metadados

In [None]:
metadata_df = generate_metadados(df_train)
metadata_df.show(10, truncate=False)

* Olhando esse metadados, conseguimos ver a redução de nulos na variável Order_Demand, antes o total era de 5.37% após fazer a retirada do ano de 2011 resultou em 1% no dataframe.
Podemos retirar tambem, nesse caso, os registros que estão com data null, pois os graficos já nos trouxeram uma visualização sem esses registros, e para nossa previsão a data é um dado chave, sem ele não conseguimos utilizar os registros.

# Retirando do DF datas null.

In [None]:
df_train= spark.sql("""
  SELECT *
    FROM train_abt_demanda_00
   WHERE year <> 2011
""")

df_train.show()

# Check de sanidade!

In [None]:
qtd = df_train.count()
print("records df_train:", qtd)

# Removendo os Sufixos

* Agora vamos retirar os sufixos das váriaveis: Product_Code , Warehouse e Product_Category, para que nosso DF fique somente com números, que é muiro importante para a modelagem, e nesse caso podemos retirar o sufixo sem alterar a qualidade, o conteúdo e a explicabilidade dos dados.
E corrigir o dtype das mesmas.

In [None]:
from pyspark.sql.functions import regexp_replace

df_train = df_train \
    .withColumn(
        "Product_Code",
        regexp_replace(col("Product_Code"), "^Product_", "")
    ) \
    .withColumn(
        "Warehouse",
        regexp_replace(col("Warehouse"), "^Whse_", "")
    ) \
    .withColumn(
        "Product_Category",
        regexp_replace(col("Product_Category"), "^Category_", "")
    )

df_train.show(truncate=False)


In [None]:
df_train = df_train \
    .withColumn("Product_Code", col("Product_Code").cast("int")) \
    .withColumn("Product_Category", col("Product_Category").cast("int"))

# Check de sanidade!

In [None]:
qtd = df_train.count()
print("records df_train:", qtd)

### Salvando o metadados final.

In [None]:
metadata_df = generate_metadados(df_train)
metadata_df.show(10, truncate=False)

Agora sim vamos salvar os metadados finais, com a variável Date sem nulos.

In [None]:
path_metadados = os.path.join(metadados_dir, f"tb_metadados_final_{dthproc}")

md = metadata_df.withColumn("dthproc", lit(dthproc))

md.write \
    .mode("append") \
    .partitionBy("dthproc") \
    .parquet(path_metadados)
print("Metadados gravados em:", path_metadados)

# Salvando o DF pronto para iniciar a feature engeneering.

In [None]:
base_dados    = "/content/drive/MyDrive/PREVISAO_DEMANDA/RAW/DADOS"
df_train.write \
    .mode("overwrite") \
    .parquet(base_dados)

print(f"Gravado com sucesso em: {base_dados}")