In [1]:
import findspark
findspark.init()
import pyspark

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import dayofweek, hour, col, sum as spark_sum
from pyspark.sql.window import Window
from pyspark.sql.functions import percent_rank
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, GBTRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark import SparkContext
from utils import preprocessing_data, create_features

In [3]:
# Adiciona o pacote spark-excel ao iniciar a sessão Spark
spark = SparkSession.builder \
    .appName("SalesPrediction") \
    .config("spark.jars.packages", "com.crealytics:spark-excel_2.12:0.13.5") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.driver.maxResultSize", "2g") \
    .getOrCreate()

24/07/29 21:15:53 WARN Utils: Your hostname, MacBook-Pro-de-Julia.local resolves to a loopback address: 127.0.0.1; using 192.168.1.8 instead (on interface en0)
24/07/29 21:15:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/kikuye/.ivy2/cache
The jars for the packages stored in: /Users/kikuye/.ivy2/jars
com.crealytics#spark-excel_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-10524317-c999-4e50-be1f-83ae55dc8734;1.0
	confs: [default]


:: loading settings :: url = jar:file:/opt/spark-3.5.1/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found com.crealytics#spark-excel_2.12;0.13.5 in central
	found org.apache.poi#poi;4.1.2 in central
	found commons-codec#commons-codec;1.13 in central
	found org.apache.commons#commons-collections4;4.4 in central
	found org.apache.commons#commons-math3;3.6.1 in central
	found com.zaxxer#SparseBitSet;1.2 in central
	found org.apache.poi#poi-ooxml;4.1.2 in central
	found org.apache.poi#poi-ooxml-schemas;4.1.2 in central
	found org.apache.xmlbeans#xmlbeans;3.1.0 in central
	found com.github.virtuald#curvesapi;1.06 in central
	found com.norbitltd#spoiwo_2.12;1.7.0 in central
	found org.scala-lang.modules#scala-xml_2.12;1.2.0 in central
	found com.github.pjfanning#excel-streaming-reader;2.3.4 in central
	found com.github.pjfanning#poi-shared-strings;1.0.4 in central
	found com.h2database#h2;1.4.200 in central
	found org.apache.commons#commons-text;1.8 in central
	found org.apache.commons#commons-lang3;3.9 in central
	found xml-apis#xml-apis;1.4.01 in central
	found org.slf4j#slf4j-api;1.7.3

In [4]:
# Carrega o arquivo Excel diretamente para um DataFrame do Spark
file_path = '../data/Coffee Shop Sales.xlsx'
spark_df = spark.read \
    .format("com.crealytics.spark.excel") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(file_path)

                                                                                

### Etapa 1: Preprocessamento dos Dados

In [5]:
# Preprocessamento dos dados
# correção de tipos e criação de novas colunas
spark_df = preprocessing_data(spark_df)

In [6]:
spark_df.limit(5).toPandas()

24/07/29 21:16:10 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
24/07/29 21:16:15 WARN TaskSetManager: Stage 1 contains a task of very large size (2706 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Unnamed: 0,transaction_id,transaction_date,transaction_time,transaction_qty,store_id,store_location,product_id,unit_price,product_category,product_type,product_detail,day_of_week,hour_of_day,hour_day_interaction
0,1.0,2023-01-01,07:06:11,2.0,5.0,Lower Manhattan,32.0,3.0,Coffee,Gourmet brewed coffee,Ethiopia Rg,1,7,7
1,2.0,2023-01-01,07:08:56,2.0,5.0,Lower Manhattan,57.0,3.1,Tea,Brewed Chai tea,Spicy Eye Opener Chai Lg,1,7,7
2,3.0,2023-01-01,07:14:04,2.0,5.0,Lower Manhattan,59.0,4.5,Drinking Chocolate,Hot chocolate,Dark chocolate Lg,1,7,7
3,4.0,2023-01-01,07:20:24,1.0,5.0,Lower Manhattan,22.0,2.0,Coffee,Drip coffee,Our Old Time Diner Blend Sm,1,7,7
4,5.0,2023-01-01,07:22:41,2.0,5.0,Lower Manhattan,57.0,3.1,Tea,Brewed Chai tea,Spicy Eye Opener Chai Lg,1,7,7


In [8]:
# Como se quer prever o volume de vendas para produtos individuais em diferentes lojas, deve-se agrupar os dados

# Agrupa os dados por product_id, store_id, dia da semana, hora do dia, e outros, somando a quantidade de produtos
grouped_df = spark_df.groupBy(
    'day_of_week', 
    'hour_of_day', 
    'hour_day_interaction',
    'store_id',
    'product_id',
    'product_category',
    'product_type',
    'product_detail',
    ).agg(
        spark_sum('transaction_qty').alias('total_qty')
    )

### Etapa 2: Criação de Features

In [9]:
categorical_cols = ['product_id', 'store_id', 'day_of_week', 'product_category', 'product_type', 'product_detail']
num_cols = ['hour_of_day', 'hour_day_interaction']
feature_df = create_features(
    grouped_df, 
    categorical_cols=categorical_cols, 
    num_cols=num_cols
)

24/07/29 21:16:31 WARN TaskSetManager: Stage 2 contains a task of very large size (1851 KiB). The maximum recommended task size is 1000 KiB.
24/07/29 21:16:41 WARN TaskSetManager: Stage 8 contains a task of very large size (1851 KiB). The maximum recommended task size is 1000 KiB.
24/07/29 21:16:56 WARN TaskSetManager: Stage 14 contains a task of very large size (1851 KiB). The maximum recommended task size is 1000 KiB.
24/07/29 21:17:04 WARN TaskSetManager: Stage 20 contains a task of very large size (1851 KiB). The maximum recommended task size is 1000 KiB.
24/07/29 21:17:10 WARN TaskSetManager: Stage 26 contains a task of very large size (1851 KiB). The maximum recommended task size is 1000 KiB.
24/07/29 21:17:19 WARN TaskSetManager: Stage 32 contains a task of very large size (1851 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [10]:
feature_df.limit(5).toPandas()

24/07/29 21:17:27 WARN TaskSetManager: Stage 38 contains a task of very large size (2106 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Unnamed: 0,day_of_week,hour_of_day,hour_day_interaction,store_id,product_id,product_category,product_type,product_detail,total_qty,features
0,1,13,13,5.0,24.0,Coffee,Drip coffee,Our Old Time Diner Blend Lg,7.0,"(0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
1,1,15,15,3.0,26.0,Coffee,Organic brewed coffee,Brazilian Rg,16.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
2,2,11,22,3.0,79.0,Bakery,Scone,Jumbo Savory Scone,7.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
3,2,12,24,3.0,50.0,Tea,Brewed Black tea,Earl Grey Rg,14.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."
4,2,15,30,5.0,69.0,Bakery,Biscotti,Hazelnut Biscotti,6.0,"(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ..."


### Etapa 3: Repartição dos dados em Treino e Teste

In [11]:
# Reparticiona o DataFrame para reduzir o tamanho das tasks
feature_df = feature_df.repartition(200)

# Renomea a coluna transaction_qty para label
feature_df = feature_df.withColumnRenamed('total_qty', 'label')

In [12]:
# Divide os dados em conjunto de treinamento e teste
train_data, test_data = feature_df.randomSplit([0.8, 0.2], seed=123)

# Para o conjunto de treinamento só pega a coluna `features`
train_data = train_data.select('features', 'label')

### Etapa 4: Avaliação de Diferentes Modelos

In [13]:
# Define os modelos
regressors = {
    "Linear Regression": LinearRegression(featuresCol='features', labelCol='label'),
    "Decision Tree Regressor": DecisionTreeRegressor(featuresCol='features', labelCol='label', seed=123),
    "Gradient Boosting Regressor": GBTRegressor(featuresCol='features', labelCol='label', seed=123),
    "Random Forest Regressor": RandomForestRegressor(featuresCol='features', labelCol='label', seed=123),
}

In [14]:
# Função para treinar e avaliar os modelos
def train_and_evaluate(regressor, train_data, test_data):
    model = regressor.fit(train_data)
    predictions = model.transform(test_data)
    evaluator_rmse = RegressionEvaluator(labelCol='label', predictionCol='prediction', metricName='rmse')
    evaluator_r2 = RegressionEvaluator(labelCol='label', predictionCol='prediction', metricName='r2')
    rmse = evaluator_rmse.evaluate(predictions)
    r2 = evaluator_r2.evaluate(predictions)
    return rmse, r2, model

In [15]:
# Treina e avalia cada modelo
results = {}
for name, regressor in regressors.items():
    rmse, r2, model = train_and_evaluate(regressor, train_data, test_data)
    results[name] = {"RMSE": rmse, "R2": r2, "model": model}

24/07/29 21:18:01 WARN TaskSetManager: Stage 41 contains a task of very large size (2106 KiB). The maximum recommended task size is 1000 KiB.
24/07/29 21:18:10 WARN TaskSetManager: Stage 44 contains a task of very large size (2106 KiB). The maximum recommended task size is 1000 KiB.
24/07/29 21:18:12 WARN Instrumentation: [37f3bceb] regParam is zero, which might cause numerical instability and overfitting.
24/07/29 21:18:12 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/07/29 21:18:12 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
24/07/29 21:18:14 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
24/07/29 21:18:14 WARN Instrumentation: [37f3bceb] Cholesky solver failed due to singular covariance matrix. Retrying with Quasi-Newton solver.
24/07/29 21:18:29 WARN TaskSetManager: Stage 51 contains a task of very large size (2106 KiB). The maximum recommended

In [16]:
# Resultado de cada modelo

best_rmse = float('inf')
best_model = None
best_model_name = None
for name, metrics in results.items():
    print(f"{name} - RMSE: {metrics['RMSE']}, R2: {metrics['R2']}")
    if metrics['RMSE'] < best_rmse:
        best_rmse = rmse
        best_model = model
        best_model_name = name
print("="*20)
print(f"Best model: {best_model_name}")

Linear Regression - RMSE: 6.357939970842916, R2: 0.45728439445885105
Decision Tree Regressor - RMSE: 5.879814511897173, R2: 0.5358410650444688
Gradient Boosting Regressor - RMSE: 4.724532442376335, R2: 0.7003204316626612
Random Forest Regressor - RMSE: 5.9876950554543, R2: 0.51865239876116
Best model: Gradient Boosting Regressor


#### Observação:
- As métricas utilizadas para avaliar os modelos foram:
  - RMSE (raiz quadrada do erro médio): que mede o quão bem o modelo está prevendo os valores reais;
  - R2 : mede o quão bem as variáveis independentes explicam a variabilidade da variável dependente, onde valores próximo de 1 indicam um bom ajuste do modelo
- Com base nessas 2 métricas, o modelo **Gradient Boosting Regressor** foi o que apresentou melhor desempenho.

### Etapa 5: Salva o Modelo Treinado

In [18]:
# Salva o modelo treinado
best_model.write().overwrite().save("../model")

                                                                                

### Overview:

- A partir das colunas `transaction_time` e `transaction_date` foram criadas as colunas `day_of_week`, `hour_of_day` e 
`hour_day_interaction`;
- Os dados foram agrupados com base nas colunas `day_of_week`, `hour_of_day`, `hour_day_interaction`, `store_id`, `product_id`, `product_category`, `product_type`, `product_detail`, e o `transaction_qty` do grupo foi somado.
- `transaction_qty` é o target que o modelo visa prever;
- Desta forma, informando `transaction_time`, `transaction_date`, `store_id`, `product_id`, `product_category`, `product_type`, `product_detail`, o modelo é capaz de prever a quantidade de vendas daquele produto para aquela loja naquele tempo;
- O modelo treinado foi persistido podendo ser usado mais tarde sem precisar re-treiná-lo.

### Como o modelo pode otimizar os níveis de estoque e minimizar desperdícios
- Antecipação de demandas: utilizando o modelo pode-se prever a quantidade de produtos que será vendida em diferentes horários e dias da semana para cada loja, com isso, pode-se garantir que haja estoque suficiente;
- Redução de desperdício: sabendo a quantidade de demanda, consegue-se reduzir o desperdício de produtos que não são vendidos e acabam sendo descartados.


### Trabalho Adicional / Próximos Passos
- Me pararece que há espaço ainda de evolução para o modelo: novas features poderiam ser criadas, ajustes de hiperparâmetros (grid search);
- Gostaria de entender (simular) se uma separação de dados de treinamento e teste que leve em consideração a ordem temporal dos dados seria mais correta;


### Etapas para aplicação de técnicas de MLOps
- Aqui estou entendendo as etapas necessárias para colocar o modelo em produção e disponibilizá-lo;
- Como é um modelo supervisionado, ele precisa ser re-treinado com alguma frequencia (semanal, mensal, trimestral, etc), a frequencia ideal vale um estudo a parte;
- Para (re-)treinar o modelo, ele precisa consumir esses dados de algum lugar, seja de um banco, do hdfs, etc, e esses dados transacionais devem ser enviados pelas lojas;
- Como o modelo foi feito utilizando PySpark, creio que não vai precisar fazer alteração para colocar em produção; 
- Uma vez re-treinado o modelo, pode-se:
  - ou salvar o modelo (etapa offline), e criar uma API (online) que quando fazem uma requisição informando `transaction_time`, `transaction_date`, `store_id`, `product_id`, `product_category`, `product_type`, `product_detail`, retorna a quandidade prevista daquele produto para aquela loja naquele tempo;
  - ou já executar o modelo e salvar os resultados das predições do proximo periodo em um banco (etapa offline), e a API (online) só consultaria o resultado já computado.