# Desafio Técnico – Hackathon Forecast Big Data 2025

# Previsão de vendas para as 4 semanas de 2023

In [2]:
# Importando as bibliotecas necessárias
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql import Window
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import subprocess

import warnings
warnings.filterwarnings("ignore")

## Iniciando a sessão do Spark

In [4]:
spark = (SparkSession.builder
         .appName("Desafio Técnico - Hackathon Forecast Big Data 2025")
         .getOrCreate())

## lendo o dataset

In [5]:
# Declarando o caminho do arquivo 1
arquivo = '/content/part-00000-5fcd6792-9caf-4390-a519-6be5c418e9c3-c000.snappy.parquet'

# Lendo o arquivo 1
df_analise = spark.read.parquet(arquivo)

In [6]:
# Visualizando o Schema
df_analise.printSchema()

root
 |-- pdv: long (nullable = true)
 |-- premise: string (nullable = true)
 |-- categoria_pdv: string (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- Semanal: timestamp (nullable = true)
 |-- distributor_id: string (nullable = true)
 |-- qtd_total: double (nullable = true)
 |-- valor_total: double (nullable = true)
 |-- liquido_total: double (nullable = true)
 |-- lucro_bruto: double (nullable = true)
 |-- desconto: double (nullable = true)
 |-- impostos: double (nullable = true)
 |-- produto: long (nullable = true)
 |-- categoria: string (nullable = true)
 |-- descricao: string (nullable = true)
 |-- tipos: string (nullable = true)
 |-- label: string (nullable = true)
 |-- subcategoria: string (nullable = true)
 |-- marca: string (nullable = true)
 |-- fabricante: string (nullable = true)



## Criação de Features Engineerings

In [7]:
# Extrair número da semana do ano
df_analise = df_analise.withColumn("semana", F.weekofyear("Semanal"))

# Extrair número do mês
df_analise = df_analise.withColumn("mes", F.month("Semanal"))

In [8]:
# Criando feature ticket_medio
df_analise = df_analise.withColumn(
    "ticket_medio",
    (F.col("valor_total") / F.col("qtd_total")).cast("double")
)

In [9]:
# Criando feature Margem percentual
df_analise = df_analise.withColumn(
    "margem_%",
    (F.col("lucro_bruto") / F.col("valor_total")).cast("double")
)

In [10]:
# Criando feature Desconto percentual
df_analise = df_analise.withColumn(
    "desconto_%",
    (F.col("desconto") / F.col("valor_total")).cast("double")
)

In [11]:
# Criando feature Imposto percentual
df_analise = df_analise.withColumn(
    "impostos_%",
    (F.col("impostos") / F.col("valor_total")).cast("double")
)

In [12]:
# Criação de features Lags de vendas semanais (qtd_total defasado por produto e pdv)

window_spec = Window.partitionBy("pdv", "produto").orderBy("Semanal")

df_analise = (
    df_analise
    .withColumn("qtd_lag_1", F.lag("qtd_total", 1).over(window_spec))
    .withColumn("qtd_lag_2", F.lag("qtd_total", 2).over(window_spec))
    .withColumn("qtd_lag_3", F.lag("qtd_total", 3).over(window_spec))
    .withColumn("qtd_lag_3", F.lag("qtd_total", 4).over(window_spec))
)

In [13]:
# Criando a feature de Médias móveis de 2 semanas
df_analise = df_analise.withColumn(
    "qtd_media_2s",
    F.avg("qtd_total").over(window_spec.rowsBetween(-2, -1))
)

In [14]:
# Criando a feature de Médias móveis de 3 semanas
df_analise = df_analise.withColumn(
    "qtd_media_3s",
    F.avg("qtd_total").over(window_spec.rowsBetween(-3, -1))
)

In [15]:
# Criando a feature de Médias móveis de 4 semanas
df_analise = df_analise.withColumn(
    "qtd_media_4s",
    F.avg("qtd_total").over(window_spec.rowsBetween(-4, -1))
)

In [16]:
# Criando a feature de variância de vendas de 2 semanas
df_analise = df_analise.withColumn(
    "qtd_var_2s",
    F.variance("qtd_total").over(window_spec.rowsBetween(-2, -1))
)

In [17]:
# Criando a feature de variância de vendas de 3 semanas
df_analise = df_analise.withColumn(
    "qtd_var_3s",
    F.variance("qtd_total").over(window_spec.rowsBetween(-3, -1))
)

In [18]:
# Criando a feature de variância de vendas de 4 semanas
df_analise = df_analise.withColumn(
    "qtd_var_4s",
    F.variance("qtd_total").over(window_spec.rowsBetween(-4, -1))
)

In [19]:
# Visualizando o Schema do dataframe
df_analise.printSchema()

root
 |-- pdv: long (nullable = true)
 |-- premise: string (nullable = true)
 |-- categoria_pdv: string (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- Semanal: timestamp (nullable = true)
 |-- distributor_id: string (nullable = true)
 |-- qtd_total: double (nullable = true)
 |-- valor_total: double (nullable = true)
 |-- liquido_total: double (nullable = true)
 |-- lucro_bruto: double (nullable = true)
 |-- desconto: double (nullable = true)
 |-- impostos: double (nullable = true)
 |-- produto: long (nullable = true)
 |-- categoria: string (nullable = true)
 |-- descricao: string (nullable = true)
 |-- tipos: string (nullable = true)
 |-- label: string (nullable = true)
 |-- subcategoria: string (nullable = true)
 |-- marca: string (nullable = true)
 |-- fabricante: string (nullable = true)
 |-- semana: integer (nullable = true)
 |-- mes: integer (nullable = true)
 |-- ticket_medio: double (nullable = true)
 |-- margem_%: double (nullable = true)
 |-- desconto_%: double

In [20]:
# Criando a lista de colunas para eliminar dados nulos
colunas = [
    "qtd_lag_1", "qtd_lag_2", "qtd_lag_3",
    "qtd_media_2s", "qtd_media_3s", "qtd_media_4s",
    "qtd_var_2s", "qtd_var_3s", "qtd_var_4s",
    "ticket_medio",
    "margem_%",
    "desconto_%",
    "impostos_%"
]

# Aplica o drop apenas nessas colunas
df_analise = df_analise.na.drop(subset=colunas)

## Transformação dos dados

In [21]:
# Convertendo os ids para strings

# Criando uma lista de colunas para converter
colunas_para_converter = ["pdv", "produto"]

# Cria o loop para converter numéricas em string
for coluna in colunas_para_converter:
    df_analise = df_analise.withColumn(coluna, col(coluna).cast("string"))

In [22]:
# Cria uma lista de colunas categóricas
colunas_categoricas = [ "pdv", "distributor_id", "produto",  "premise",  "categoria_pdv", "tipos", "categoria", "subcategoria", "marca", "fabricante"]

In [23]:
# Cria indexadores ou label encoding
indexers = [
    StringIndexer(inputCol=col, outputCol=col + "_idx", handleInvalid="keep")
    for col in colunas_categoricas
]

In [24]:
# Aplicando OneHotEncoder
encoder = OneHotEncoder(
    inputCols=[col + "_idx" for col in colunas_categoricas],
    outputCols=[col + "_ohe" for col in colunas_categoricas]
)

In [25]:
# Criando a lista de colunas numéricas
colunas_numericas = [
    # valores de vendas
    "qtd_total",
    "valor_total",
    "liquido_total",
    "lucro_bruto",
    "desconto",
    "impostos",

    # features de tempo
    "semana",
    "mes",

    # features derivadas
    "ticket_medio",
    "margem_%",
    "desconto_%",
    "impostos_%",

    # lags e médias móveis
    "qtd_lag_1",
    "qtd_lag_2",
    "qtd_lag_3",
    "qtd_media_2s",
    "qtd_media_3s",
    "qtd_media_4s",

    # variações semanais
    "qtd_var_2s",
    "qtd_var_3s",
    "qtd_var_4s"
]

In [26]:
# Aplica o VectorAssembler para as colunas numéricas
assembler_num = VectorAssembler(
    inputCols=colunas_numericas,
    outputCol="num_features"
)

In [27]:
# Aplica o StandScaler para normalizar as colunas numéricas
scaler = StandardScaler(
    inputCol="num_features",
    outputCol="scaled_num_features",
    withMean=True,
    withStd=True
)

In [28]:
# Aplica o VectorAssembler final
assembler_final = VectorAssembler(
    inputCols=["scaled_num_features"] + [col + "_ohe" for col in colunas_categoricas],
    outputCol="features"
)

In [29]:
# Montando pipeline
pipeline = Pipeline(stages=indexers + [encoder, assembler_num, scaler, assembler_final])

In [30]:
# Ajustar e transformar o DataFrame
df_encoded = pipeline.fit(df_analise).transform(df_analise)

In [31]:
# Ver algumas colunas transformadas
df_encoded.select(
    "premise", "premise_idx", "premise_ohe",
    "categoria_pdv", "categoria_pdv_idx", "categoria_pdv_ohe"
).show(5, truncate=False)

+-----------+-----------+-------------+--------------+-----------------+-----------------+
|premise    |premise_idx|premise_ohe  |categoria_pdv |categoria_pdv_idx|categoria_pdv_ohe|
+-----------+-----------+-------------+--------------+-----------------+-----------------+
|Off Premise|0.0        |(2,[0],[1.0])|Package/Liquor|1.0              |(51,[1],[1.0])   |
|Off Premise|0.0        |(2,[0],[1.0])|Package/Liquor|1.0              |(51,[1],[1.0])   |
|Off Premise|0.0        |(2,[0],[1.0])|Package/Liquor|1.0              |(51,[1],[1.0])   |
|Off Premise|0.0        |(2,[0],[1.0])|Package/Liquor|1.0              |(51,[1],[1.0])   |
|Off Premise|0.0        |(2,[0],[1.0])|Package/Liquor|1.0              |(51,[1],[1.0])   |
+-----------+-----------+-------------+--------------+-----------------+-----------------+
only showing top 5 rows



In [32]:
# Visualizando o Schema
df_encoded.printSchema()

root
 |-- pdv: string (nullable = true)
 |-- premise: string (nullable = true)
 |-- categoria_pdv: string (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- Semanal: timestamp (nullable = true)
 |-- distributor_id: string (nullable = true)
 |-- qtd_total: double (nullable = true)
 |-- valor_total: double (nullable = true)
 |-- liquido_total: double (nullable = true)
 |-- lucro_bruto: double (nullable = true)
 |-- desconto: double (nullable = true)
 |-- impostos: double (nullable = true)
 |-- produto: string (nullable = true)
 |-- categoria: string (nullable = true)
 |-- descricao: string (nullable = true)
 |-- tipos: string (nullable = true)
 |-- label: string (nullable = true)
 |-- subcategoria: string (nullable = true)
 |-- marca: string (nullable = true)
 |-- fabricante: string (nullable = true)
 |-- semana: integer (nullable = true)
 |-- mes: integer (nullable = true)
 |-- ticket_medio: double (nullable = true)
 |-- margem_%: double (nullable = true)
 |-- desconto_%: do

## Preparação de dados para treino e teste

In [33]:
# Dividindo em treino (80%) e teste (20%)
train_df = df_encoded.filter(col("semana") <= 42)
test_df  = df_encoded.filter(col("semana") > 42)

In [None]:
# Visualizando a quantidade de linhas
print("Quantidade de linhas no conjunto de treino:", train_df.count())
print("Quantidade de linhas no conjunto de teste:", test_df.count())

Quantidade de linhas no conjunto de treino: 2721967
Quantidade de linhas no conjunto de teste: 908728


## Modelo

### Regressão Linear

In [34]:
# Instancia o modelo de Regressão Linear
lr = LinearRegression(featuresCol="features", labelCol="qtd_total")

# Ajusta o dados de treino
lr_model = lr.fit(train_df)

In [35]:
# Prevendo
y_pred = lr_model.transform(test_df)

In [36]:
# Imprimindo o RMSE
print("Root Mean Squared Error - RMSE:", lr_model.summary.rootMeanSquaredError)

Root Mean Squared Error - RMSE: 0.002120275927114928


OBS: Eu não possui capacidade computacional suficiente para rodar modelos de Random Forest, Gradient Boosted Trees, CatBoost, XGBoost e LightGBM. Também é impossível rodar no ambiente colab, pois há limitações na conta free.

## Previsão de vendas para próximas semanas de 2023

In [37]:
# Ordena a coluna 'semana'
dados_futuro = test_df.orderBy("semana")

In [38]:
# Obtendo os dados das 4 ultimas semanas do ano de 2022
dados_futuro = dados_futuro.filter(col("semana") > 48)

In [39]:
# Realiza a previsão
y_pred_futuro = lr_model.transform(dados_futuro)

In [40]:
# Mapeando a semana
semana_mapeada = {49: 1, 50: 2, 51: 3, 52: 4}
mapping_expressao = F.create_map([F.lit(x) for x in sum(semana_mapeada.items(), ())])


# Aplica o mapeamento
y_pred_futuro = y_pred_futuro.withColumn("semana", mapping_expressao[F.col("semana")])

In [41]:
# Seleciona colunas que foi solicitado
y_pred_final = (
    y_pred_futuro
    .select("semana", "pdv", "produto", F.col("prediction").alias("quantidade"))
    .withColumn("quantidade", F.round("quantidade").cast("int"))
    .orderBy("semana")
)

In [42]:
# Visualizando o dataframe
y_pred_final.show(10)

+------+------------------+-------------------+----------+
|semana|               pdv|            produto|quantidade|
+------+------------------+-------------------+----------+
|     1|  6142382594066562|1938760505411922162|         1|
|     1|  8918984701561076| 637448570435263100|         3|
|     1| 40877388633024666|4797439216678436447|         2|
|     1| 10500207490061039|1155871899594394627|         1|
|     1|  7448253038834221|9158358945329890575|         7|
|     1|  8918984701561076| 802532509492578586|         1|
|     1|119393926838519218|8174625658473329985|         1|
|     1|  5075820835119585|3840201750077926393|         2|
|     1|  8918984701561076|2258747115919456585|         7|
|     1|  8918984701561076|3262679882836704514|         4|
+------+------------------+-------------------+----------+
only showing top 10 rows



## Salvando o arquivo csv e Requerements

In [43]:
# Salvando em CSV
y_pred_final.coalesce(1) \
    .write.mode("overwrite") \
    .option("header", True) \
    .csv("/content/previsao_4_semanas.csv")

### Requirements

In [44]:
# Criando o nome do arquivo
arquivo = 'requirements.txt'

In [45]:
# Criando o arquivo e salvando
with open(arquivo, 'w') as f:
    subprocess.check_call(['pip', 'freeze'], stdout=f)

# Imprimindo a mensagem
print(f"Arquivo '{arquivo}' criado com sucesso!")

Arquivo 'requirements.txt' criado com sucesso!
