<a href="https://colab.research.google.com/github/MarcosVeniciu/Producao-de-cafe-MG/blob/main/LSTM_Coffee_Production_Prediction_with_Machine_Learning_in_Minas_Gerais.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# requeriment
%%capture
%pip install neuralforecast statsforecast
%pip install optuna

# Funções de suporte

In [None]:
from sklearn.metrics import r2_score, root_mean_squared_error
from sklearn.preprocessing import OneHotEncoder
from torch.optim.lr_scheduler import CyclicLR
from neuralforecast import NeuralForecast
from IPython.display import clear_output
from neuralforecast.models import LSTM
from datetime import datetime
from pathlib import Path
import pandas as pd
import numpy as np
import zipfile
import optuna
import math
import os

In [None]:
def train_model(
    train_df,
    test_df,
    exog_list,
    h: int,
    input_size: int,
    output_dir: str = './model',
    steps: int = 500,
    default_params: dict = None,
    n_trials: int = 20,
    val_df=None
):
    """
    Treina um modelo LSTM usando os parâmetros em default_params.
    - Se default_params for um dict, executa o treinamento com esses parâmetros.
    - Se default_params for None, exibe um aviso e não prossegue com o treinamento.
    """
    os.makedirs(output_dir, exist_ok=True)

    # Verifica se os parâmetros foram fornecidos
    if default_params is None:
        print("Você deve informar os parametros a serem usados no treinamento!")
        return None

    # Branch sem Optuna: usa os parâmetros fornecidos
    model = LSTM(
        h=h,
        input_size=input_size,
        **default_params
    )
    nf = NeuralForecast(models=[model], freq="YE")
    nf.fit(df=train_df)
    nf.save(path=output_dir, overwrite=True, save_dataset=False)

    # Empacota os artefatos em um ZIP
    zip_path = Path(output_dir) / "model_bundle.zip"
    with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf:
        for root, _, files in os.walk(output_dir):
            for f in files:
                if f != zip_path.name:
                    zf.write(os.path.join(root, f), arcname=f)
    print(f"\nModel artifacts zipped to {zip_path}")

In [None]:
def rolling_evaluation(model, full_df: pd.DataFrame, test_df: pd.DataFrame, context_size: int, horizon: int, inteiro=True) -> pd.DataFrame:
  # --- Preparação inicial ---
  # Garantir que as datas estejam em datetime e ordenar os dataframes
  full_df = full_df.copy()
  test_df = test_df.copy()
  full_df['ds'] = pd.to_datetime(full_df['ds'])
  test_df['ds'] = pd.to_datetime(test_df['ds'])

  # Todos os valores exclusivos de datas no período de teste, ordenados
  test_dates = sorted(test_df['ds'].unique())
  # Determina quantas janelas de previsão iremos executar
  n_dates = len(test_dates)
  n_loops = math.ceil(n_dates / horizon)

  all_forecasts = []

  # --- Loop de previsão por janelas de tamanho `horizon` ---
  for j in range(n_loops):
      # Identifica o bloco de datas que compõem o horizonte atual
      date_chunk = test_dates[j * horizon : (j + 1) * horizon]
      start_date = date_chunk[0]  # Data de início dessa janela de previsão

      print(f"\nExecutando window {j+1}/{n_loops}: datas {date_chunk[0].date()} a {date_chunk[-1].date()}")
      val_df = test_df[test_df['ds'].isin(date_chunk)] # possui os dados de teste para a janela atual
      history_df = full_df[full_df['ds'] < start_date] # possui os dados historicos para a janela atual
      futr_df = val_df.drop(columns=["y"]).copy()

      # Realiza a predição para a janela atual
      forecasts_df = model.predict(
        df=history_df,         # Dados históricos (para contexto)
        futr_df=futr_df      # Valores futuros das variáveis exógenas
      )
      # Combinar previsões com valores reais do teste
      evaluation_df = forecasts_df.merge(
          val_df[["unique_id", "ds", "y"]],
          on=["unique_id", "ds"]
      )
      evaluation_df['y'] = np.expm1(evaluation_df['y'])
      evaluation_df['LSTM'] = np.expm1(evaluation_df['LSTM'])

      if inteiro:
        evaluation_df['LSTM'] = evaluation_df['LSTM'].round().astype(int)
        evaluation_df['y'] = evaluation_df['y'].round().astype(int)

      all_forecasts.append(evaluation_df)

  # --- Concatena todas as previsões obtidas ---
  forecasts_df = pd.concat(all_forecasts, ignore_index=True)
  forecasts_df = calcular_diferenca_percentual(forecasts_df)

  return forecasts_df

In [None]:
def generate_steps_options():
  # Intervalos com granularidade fina no início
  step_1 = list(range(500, 1001, 100))
  step_2 = list(range(1100, 2001, 200))
  step_3 = list(range(2200, 5001, 400))

  steps_options = step_1 + step_2 + step_3
  return sorted(steps_options)

In [None]:
## WMAPE (Weighted MAPE)
def wmape(actual, predicted):
  return np.sum(np.abs(predicted - actual)) / np.sum(np.abs(actual))

In [None]:
def calcular_diferenca_percentual(df):
    """
    Calcula a diferença percentual entre valores preditos (LSTM) e reais (y)
    e retorna um novo DataFrame com a coluna adicional 'diferença_%'

    Fórmula: ((LSTM - y) / y) * 100

    Interpretação:
    - Valor negativo: previsão subestimada (LSTM < y)
    - Valor positivo: previsão superestimada (LSTM > y)

    Parâmetros:
    df (DataFrame): DataFrame com colunas 'y' e 'LSTM'

    Retorna:
    DataFrame: Novo DataFrame com a coluna 'diferença_%' adicionada
    """
    # Cria uma cópia do DataFrame para não modificar o original
    df_novo = df.copy()

    # Calcula a diferença percentual com sinal correto
    df_novo['diferença_%'] = round(((df_novo['LSTM'] - df_novo['y']) / df_novo['y']) * 100, 2)

    return df_novo

# Preparar dataset

In [None]:
dataset = pd.read_csv('/content/dataset_v6.csv')
print(f"Total de dados: {len(dataset)}")

In [None]:
num_features = [
    'target',
    'Municipio',
    'Ano',
    'latitude',
    'longitude',
    'altitude',
    'precipitacao (mm)',
    'temperatura minima (ºC)',
    'temperatura maxima (ºC)',
]
cat_features = [
  'Mesorregião'
]
dataset = dataset[num_features + cat_features].copy()

- Log-transform em Área colhida, Quantidade (em mil toneladas) e Valor da produção.

In [None]:
log_cols = [
  'Área colhida (Hectares)',
  'target',
  'Valor da produção (Mil Reais)'
]

# Verifica quais colunas de log_cols estão presentes no dataset
existing_log_cols = [col for col in log_cols if col in dataset.columns]

# Mostra as colunas que serão transformadas
print("Colunas encontradas para aplicar log1p:", existing_log_cols)

# Aplica log1p apenas nas colunas presentes
dataset[existing_log_cols] = dataset[existing_log_cols].apply(lambda x: np.log1p(x))

In [None]:
# Renomeando as colunas para o formato esperado pelo NeuralForecast
dataset = dataset.rename(columns={
  "Municipio": "unique_id",
  "Ano": "ds",
  "target": "y"
})
dataset = dataset.sort_values(by=["unique_id", "ds"]).reset_index(drop=True)
# Converte o ano inteiro para uma string no formato 'YYYY-12-31' (último dia do ano)
dataset['ds'] = pd.to_datetime(dataset['ds'].astype(str) + '-12-31')

In [None]:
# Inicializa o OneHotEncoder com as configurações desejadas
encoder = OneHotEncoder(sparse_output=False, drop='first')

# Aplica o encoder na coluna "Mesorregião" e converte para DataFrame
encoded_data = encoder.fit_transform(dataset[["Mesorregião"]])
encoded_df = pd.DataFrame(
    encoded_data,
    columns=encoder.get_feature_names_out(["Mesorregião"])
)

# Concatena o resultado com o dataset original e remove a coluna original
dataset = pd.concat([dataset, encoded_df], axis=1).drop(columns=["Mesorregião"])

In [None]:
exog_list = [col for col in dataset.columns.tolist() if col not in ["ds", "y", "unique_id"]]
print(f"Variaveis Exogenas:")
for col in exog_list:
  print(f" - {col}")

In [None]:
ano = 2023 # ano de inicio dos testes.

train_ds = dataset[dataset['ds'].dt.year < ano]
test_ds = dataset[dataset['ds'].dt.year >= ano]

print(f"Total de dados de treino: {len(train_ds)}")
print(f"Total de dados de teste: {len(test_ds)}")

# Treinamento

## Normal

In [None]:
h = 1
input_size = 4
max_steps = 800
default_params = {
  'batch_size': 32,
  'scaler_type': 'revin',
  'encoder_dropout': 0.2,
  'encoder_n_layers': 8,
  'encoder_hidden_size': 154,
  'decoder_layers': 1,
  'decoder_hidden_size': 117,
  'futr_exog_list': exog_list,
  'learning_rate': 0.002388703885848156,
  'max_steps': max_steps,
  'loss': HuberLoss(delta=1.0),
  'lr_scheduler': CyclicLR,
  'lr_scheduler_kwargs': {
      'base_lr': 1e-4,
      'max_lr': 1e-2,
      'step_size_up': int(max_steps * 0.45),
      'mode': 'triangular',
      'cycle_momentum': False
  }
}

model_output = f"model_{max_steps}"
train_model(
    train_df=train_ds,
    test_df=test_ds,
    exog_list=exog_list,
    h=h,
    input_size=input_size,
    default_params=default_params,
    output_dir=f'./{model_output}'
)

## Finetuning

In [None]:
h= 1
input_size = 4
steps_options = generate_steps_options()

In [None]:
local = "previsoes_2023"

best_score_global = float("inf")

def objective(trial):
    global best_score_global

    # hiperparâmetros
    batch_size = trial.suggest_categorical("batch_size", [16, 32, 64])
    encoder_n_layers = trial.suggest_int("encoder_n_layers", 3, 8)
    encoder_hidden_size = trial.suggest_int("encoder_hidden_size", 64, 256)
    decoder_layers = trial.suggest_int("decoder_layers", 1, 4)
    decoder_hidden_size = trial.suggest_int("decoder_hidden_size", 64, 256)
    learning_rate = trial.suggest_float("learning_rate", 1e-4, 1e-2, log=True)
    steps = trial.suggest_categorical("steps", steps_options)

    # Instanciar o modelo LSTM com os parâmetros sugeridos
    model = LSTM(
        h=h,
        input_size=input_size,
        batch_size=batch_size,
        scaler_type='revin',
        encoder_dropout=0.3,
        encoder_n_layers=encoder_n_layers,
        encoder_hidden_size=encoder_hidden_size,
        decoder_layers=decoder_layers,
        decoder_hidden_size=decoder_hidden_size,
        futr_exog_list=exog_list,
        learning_rate=learning_rate,
        max_steps=steps,
        lr_scheduler=CyclicLR,
        lr_scheduler_kwargs={
            "base_lr": 1e-4,
            "max_lr": 1e-2,
            "step_size_up": int(steps * 0.45),
            "mode": "triangular",
            "cycle_momentum": False
        }
    )

    # Treinar o modelo usando NeuralForecast
    nf = NeuralForecast(models=[model], freq="YE")  # "MS" = início de mês, YE = Anual
    nf.fit(df=train_ds)

    combined_df = rolling_evaluation(
      model=nf,
      full_df=dataset,
      test_df=test_ds,
      context_size=input_size,
      horizon=h,
      inteiro=False
    )
    clear_output(wait=False) # Limpa os textos da saida
    # Extrair valores reais e previstos
    actual = combined_df['y']
    predicted = combined_df['LSTM']
    # Calcular o RMSE (métrica a ser minimizada)
    score = root_mean_squared_error(actual, predicted)

    # Se o score atual for melhor que o melhor registrado, salva o modelo
    if score < best_score_global:
        best_score_global = score
        combined_df.to_excel(f"./Modelos/{local}/valores_predicao.xlsx")
        nf.save(path=f"./Modelos/{local}/",
            model_index=None,
            overwrite=True,
            save_dataset=False
        )
        print(f"\nNovo melhor score: {score:.4f} - Modelo salvo!\n\n")

    return score

In [None]:
%%time
# Criação do estudo e otimização (n_trials pode ser ajustado conforme sua necessidade)
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=100)

# Exibir os melhores resultados
print("Melhor trial:")
trial = study.best_trial
print(f"  WMAPE: {trial.value}")
print("  Parâmetros:")
for key, value in trial.params.items():
  print(f"    {key}: {value}")

# Predições

O dataframe com as predições deve ter:
* treino_id: um id que permite diferenciar o treino.
* Unique_Id: O identificador de cada serie.
* ds: a data de cada registro.
* y: o valor real do registro para a data e a serie.
* y_pred: o valor predito pelo modelo, caso não tenha deve preencher com NaN.
* flag: indica se o registro foi usado no treino ou teste.
* dataset: o nome do dataset usado para treinar e avaliar o modelo.
* modelo: nome do algoritmo do modelo (LSTM, Randon Florest).
* comentario: alguma anotação que pode ser util (pode ser vazio)
* data do teino: data que o modelo foi treinado.


In [None]:
# carregue o modelo
path = "/content/modelo"
print(f"Diretório do modelo: {path}")

model = NeuralForecast.load(path=path)
input_size = 4
h = 1
predictions = rolling_evaluation(
    model=model,
    full_df=dataset,
    test_df=test_ds,
    context_size=input_size,
    horizon=h
)

In [None]:
actual = predictions['y']
predicted = predictions['LSTM']
# Calcular o WMAPE (métrica a ser minimizada)
score = wmape(actual, predicted)
score = score * 100

# Calcular R² usando sklearn
r2 = r2_score(actual, predicted)

# Calcular RMSE usando sklearn
rmse = np.sqrt(mean_squared_error(actual, predicted))

print(f"WMAPE: {score:.2f}%")
print(f"R²: {r2:.4f}")
print(f"RMSE: {rmse:.4f} Toneladas")

Salva os resultados do treino.

In [None]:
colunas = [
  'treino_id',    # Identificador do treino
  'unique_id',    # Identificador de cada série
  'ds',           # Data de cada registro
  'y',            # Valor real
  'y_pred',       # Valor predito pelo modelo
  'diferença_%',  # Valor da diferença percentual entre o valor predito e o real
  'flag',         # Indica se foi usado em treino ou teste
  'dataset',      # Nome do dataset usado
  'modelo',       # Nome do algoritmo (LSTM, Random Forest, etc)
  'comentario',   # Anotações adicionais (pode ser vazio)
  'data_treino'   # Data que o modelo foi treinado
]

# Verificar existência do arquivo
arquivo_predicoes = 'predicoes_modelos.csv'

if not os.path.exists(arquivo_predicoes):
  # Criar DataFrame vazio com as colunas especificadas
  df = pd.DataFrame(columns=colunas)

  # Salvar o DataFrame vazio como CSV
  df.to_csv(arquivo_predicoes, index=False)
  print(f"Arquivo {arquivo_predicoes} criado com DataFrame vazio.")
else:
  df = pd.read_csv(arquivo_predicoes)
  print(f"Arquivo {arquivo_predicoes} já existe. Nenhuma ação necessária.")

Prepara os dados do teste com os valore da predição.

In [None]:
dados_teste = predictions[['unique_id', 'ds', 'y', 'LSTM', 'diferença_%']].copy()
dados_teste.columns = ['unique_id', 'ds', 'y', 'y_pred', 'diferença_%']
dados_teste['flag'] = 'teste'

Prepara os dados do treino, servirão como dados historicos para fazer os graficos posteriormente.

In [None]:
dados_treino = dataset[['unique_id', 'ds', 'y']].copy()
dados_treino['y'] = round(np.expm1(dados_treino['y']),2)

inicio_teste = dados_teste['ds'].min() # identifica o periodo de inicio dos teste
print(f"Periodo de inicio dos testes: {inicio_teste}\n")
dados_treino = dados_treino[dados_treino['ds'] < inicio_teste] # pega apenas as datas anteriores ao periodo de teste
dados_treino['y_pred'] = "-"  # "- " para previsões no treino, nesse ponto elas não exitem.
dados_treino['diferença_%'] = "-"  # "-" para a diferença percentual entre predito e real, nesse ponto elas não exitem.
dados_treino['flag'] = 'treino'

Combina os dois dataframes em um unico.

In [None]:
dados_completos = pd.concat([dados_treino, dados_teste], ignore_index=True)

Adiciona as informações sobre o treino atual.

In [None]:
dados_completos['treino_id'] = "terceira execução"

In [None]:
dados_completos['comentario'] = "primeira execução"

In [None]:
dados_completos['dataset'] = "meu_dataset"
dados_completos['modelo'] = "LSTM"
dados_completos['data_treino'] = datetime.now().strftime("%Y-%m-%d")

Junta os dados do treino atual com os anteriores.

In [None]:
nova_ordem_colunas = ['treino_id', 'unique_id', 'ds', 'y', 'y_pred', 'diferença_%', 'flag', 'dataset', 'modelo', 'comentario', 'data_treino']

# Reordenar as colunas
dados_completos = dados_completos[nova_ordem_colunas]

In [None]:
df_final = pd.concat([df, dados_completos], ignore_index=True)
df_final.to_csv(arquivo_predicoes, index=False)

In [None]:
print(f"Treinamentos realizados:")
for treino in df_final['treino_id'].unique():
  print(f" - {treino} ({df_final[df_final['treino_id']==treino]['data_treino'].unique()[0]})")