# State tax (ICMS) collection forecast using the LSTM (Long Short-Term Memory) recurrent neural network

### By Carlos Henrique C. Jr.
### Rio de Janeiro, Brazil

Importação das bibliotecas Pandas e Numpy 

In [None]:
import pandas as pd
import numpy as np

Carregamento da planilha 

In [None]:
df = pd.read_excel('ICMS_1999-2023.xlsx')
df

Média Móvel Simples dos últimos 12, 6, 3 e 2 períodos 

In [None]:
df['SMA(12)'] = df.ICMS.rolling(12).mean()
df['SMA(6)'] = df.ICMS.rolling(6).mean()
df['SMA(3)'] = df.ICMS.rolling(3).mean()
df['SMA(2)'] = df.ICMS.rolling(2).mean()
df.head(14)

In [None]:
df['lag(12)'] = df.ICMS.shift(12)
df['lag(6)'] = df.ICMS.shift(6)
df['lag(4)'] = df.ICMS.shift(4)
df['lag(3)'] = df.ICMS.shift(3)
df['lag(2)'] = df.ICMS.shift(2)
df['lag(1)'] = df.ICMS.shift(1)
df = df.dropna()

In [None]:
df

Importação da biblioteca Datetime, que fornece as classes para manipulação de datas e horas (para retornar o ordinal gregoriano proléptico de uma data).

In [None]:
import datetime as dt

In [None]:
df.head()

In [None]:
df['DATA'] = pd.to_datetime(df['DATA'])
df['DATA'] = df['DATA'].map(dt.datetime.toordinal)

In [None]:
df.head()

Divisão da base de dados em uma base de treino, com os dados 75% iniciais, e uma base de testes com os 25% restantes, mais recentes:

In [None]:
train_size = int(len(df) * 0.75)

In [None]:
train_dataset, test_dataset = df.iloc[:train_size], df.iloc[train_size:]

In [None]:
test_dataset['ICMS'].min()

Importação da Matplotlib, biblioteca Python de plotagem 2d, que auxilia a biblioteca matemática NumPy

In [None]:
import matplotlib.pyplot as plt

In [None]:
plt.figure(figsize = (10, 6))
plt.plot(train_dataset.ICMS)
plt.plot(test_dataset.ICMS)
plt.xlabel('Mês')
plt.ylabel('ICMS')
plt.legend(['Treino', 'Teste'], loc = 'upper left')
print('Dimensão do dado de treino: ', train_dataset.shape)
print('Dimensão do dado de teste: ', test_dataset.shape)

In [None]:
train_dataset

Por volta do mês 255, que equivale a abril, maio, junho e julho de 2020, percebe-se uma queda expressiva de arrecadação de ICMS, depois de um pico sem precedentes em novembro de 2018.

Separação da variável dependente das independentes, tanto na base de treino, quanto na de testes:

In [None]:
X_train = train_dataset.drop('ICMS', axis = 1)
y_train = train_dataset.loc[:, ['ICMS']]
X_test = test_dataset.drop('ICMS', axis = 1)
y_test = test_dataset.loc[:, ['ICMS']]

In [None]:
print('X_train.shape: ', X_train.shape)
print('y_train.shape: ', y_train.shape)
print('X_test.shape: ', X_test.shape)
print('y_test.shape: ', y_test.shape)

Importação da ferramenta de pré-processamento de dados MinMaxScaler da biblioteca scikit-learn. O MinMaxScaler dimensiona e traduz cada característica individualmente de modo que esteja na faixa dada no conjunto de treinamento, entre zero e um. Essa transformação é útil porque muitos algoritmos de aprendizado de máquina funcionam melhor quando os dados estão em uma escala comum.

In [None]:
from sklearn.preprocessing import MinMaxScaler

In [None]:
scaler_x = MinMaxScaler(feature_range = (0, 1))
scaler_y = MinMaxScaler(feature_range = (0, 1))

In [None]:
input_scaler = scaler_x.fit(X_train)
output_scaler = scaler_y.fit(y_train)

In [None]:
train_y_norm = output_scaler.transform(y_train)
train_x_norm = input_scaler.transform(X_train)

In [None]:
test_y_norm = output_scaler.transform(y_test)
test_x_norm = input_scaler.transform(X_test)

In [None]:
print('train_y_norm.shape: ', train_y_norm.shape)
print('train_x_norm.shape: ', train_x_norm.shape)
print('test_y_norm.shape: ', test_y_norm.shape)
print('test_x_norm.shape: ', test_x_norm.shape)

Redimensionamento da matriz para três dimensões. Sempre deve ser fornecida uma matriz tridimensional como uma entrada para uma rede LSTM. A primeira dimensão representa o tamanho do lote, a segunda dimensão representa o número de etapas de tempo com que se está alimentando uma sequência e a terceira dimensão representa o número de unidades em uma sequência de entrada.

In [None]:
X_test = test_x_norm.reshape(70, 1, 11)
X_train = train_x_norm.reshape(209, 1, 11)
y_test = test_y_norm.reshape(70, 1)
y_train = train_y_norm.reshape(209, 1)

Importação da TensorFlow, plataforma completa de código aberto para machine learning, além do numpy.mean, para computar média aritmética, numpy.std, para computar o desvio padrão.

In [None]:
import tensorflow as tf
from numpy import mean
from numpy import std
from matplotlib import pyplot

Ajuste do modelo com 32, 64 e 128 neurônios para verificação do mais adequado para a aplicação. É buscado o parâmetro que retorne o menor erro, com a menor variação.

In [None]:
# Ajustando e validando um modelo
def evaluate_model(X_train, y_train, X_test, y_test, neurons):
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.LSTM(units = neurons, return_sequences = True, 
                                  input_shape = [X_train.shape[1], X_train.shape[2]]))
    model.add(tf.keras.layers.Dropout(0.1))
    model.add(tf.keras.layers.LSTM(units = neurons))
    model.add(tf.keras.layers.Dropout(0.1))
    model.add(tf.keras.layers.Dense(units = 1))
    
    # Compile model
    model.compile(loss = 'mse', optimizer = 'adam')
    
    # Ajustando a rede
    model.fit(X_train, y_train, epochs = 100, validation_split = 0.2, 
             batch_size = 4, shuffle = False)
    
    # Validando o modelo
    loss = model.evaluate(X_test, y_test, batch_size = 4, verbose = 0)
    return loss

# Resumindo as pontuações
def summarize_results(scores, params):
    print(scores, params)
    # Resumindo média e desvio padrão
    for i in range(len(scores)):
        m = np.mean(scores[i])
        s = np.std(scores[i])
        print('Param=%d: %.3f%% (+/-%.3f)' % (params[i], m , s))
        
    # Boxplot das pontuações 
    pyplot.boxplot(scores, labels = params)
    pyplot.savefig('figura[0].png')

# Rodando um experimento 
def run_experiment(params, repeats = 10):
    # Testando cada parâmetro
    all_scores = list()
    for p in params:
        # Repetindo experimento
        scores = list()
        for r in range(repeats):
            score = evaluate_model(X_train, y_train, X_test, y_test, p)
            print('>p=%d #%d: %.3f' % (p, r+1, score))
            scores.append(score)
        all_scores.append(scores)
        
    # Resumindo resultados
    summarize_results(all_scores, params)
    
# Rodando o experimento
n_params = [32, 64, 128]
run_experiment(n_params)     


In [None]:
# Ajustando e validando um modelo
def evaluate_model(X_train, y_train, X_test, y_test, neurons):
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.LSTM(units = neurons, return_sequences = True, 
                                  input_shape = [X_train.shape[1], X_train.shape[2]]))
    model.add(tf.keras.layers.Dropout(0.1))
    model.add(tf.keras.layers.LSTM(units = neurons))
    model.add(tf.keras.layers.Dropout(0.1))
    model.add(tf.keras.layers.Dense(units = 1))
    
    # Compile model
    model.compile(loss = 'mse', optimizer = 'adam')
    
    # Ajustando a rede
    model.fit(X_train, y_train, epochs = 50, validation_split = 0.2, batch_size = 4, shuffle = False)
    
    # Validando o modelo
    loss = model.evaluate(X_test, y_test, batch_size = 4, verbose = 0)
    return loss

# Resumindo as pontuações
def summarize_results(scores, params):
    print(scores, params)
    
    # Resumindo média e desvio padrão
    for i in range(len(scores)):
        m, s = mean(scores[i]), std(scores[i])
        print('Param=%d: %.3f%% (+/-%.3f)' % (params[i], m, s))
        
    # Boxplot das pontuações
    pyplot.boxplot(scores, labels = params)
    pyplot.savefig('figura[0].png')

# Rodando um experimento
def run_experiment(params, repeats = 10):
    # Testando cada parâmetro
    all_scores = list()
    for p in params:
        # Repetindo experimento
        scores = list()
        for r in range(repeats):
            score = evaluate_model(X_train, y_train, X_test, y_test, p)
            print('>p=%d #%d: %.3f' % (p, r+1, score))
            scores.append(score)
        all_scores.append(scores)
        
    # Resumindo resultados
    summarize_results(all_scores, params)

# Rodando o experimento
n_params = [10, 15, 20, 25, 30]
run_experiment(n_params)

Ajuste de modelo com Tamanho do Lote 1, 2, 3, 4, 5, 6, 7 e 8 para verificação do mais adequado para a aplicação. Tamanho do lote (Batch Size) é um termo usado em aprendizado de máquina e refere-se ao número de exemplos de treinamento usados em iteração.

In [None]:
# Ajustando e validando um modelo

# Ajustando e validando o modelo
def evaluate_model(X_train, y_train, X_test, y_test, batch_size):
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.LSTM(units = 32, return_sequences = True, 
                                   input_shape = [X_train.shape[1], X_train.shape[2]]))
    model.add(tf.keras.layers.Dropout(0.1))
    model.add(tf.keras.layers.LSTM(units = 32))
    model.add(tf.keras.layers.Dropout(0.1))
    model.add(tf.keras.layers.Dense(units = 1))
    
    # Compile model
    model.compile(loss = 'mse', optimizer = 'adam')
    # Ajustando a rede
    model.fit(X_train, y_train, epochs = 50, validation_split = 0.2, batch_size = batch_size, shuffle = False)
    # Validando o modelo
    loss = model.evaluate(X_test, y_test, batch_size = batch_size, verbose = 0)
    return loss

#Resumindo as pontuações
def summarize_results(scores, params):
    print(scores, params)
    # Resumindo média e desvio padrão
    for i in range(len(scores)):
        m, s = mean(scores[i]), std(scores[i])
        print('Param=%d: %.3f%% (+/-%.3f)' % (params[i], m, s))
        
    # Boxplot das pontuações
    pyplot.boxplot(scores, labels = params)
    pyplot.savefig('figura[0].png')

# Rodando um experimento
def run_experiment(params, repeats = 10):
    # Testando cada parâmetro
    all_scores = list()
    for p in params:
        # Repetindo experimento
        scores = list()
        for r in range(repeats):
            score = evaluate_model(X_train, y_train, X_test, y_test, p)
            print('>p=%d #%d: %.3f' % (p, r+1, score))
            scores.append(score)
        all_scores.append(scores)
        
    # Resumindo resultados
    summarize_results(all_scores, params)
    
# Rodando o experimento
n_params = [1, 2, 3, 4, 5, 6, 7, 8]
run_experiment(n_params)

Ajuste de modelo com diluição (Dropout) 0.05, 0.1, 0.2, 0.3 para verificação do mais adequado para a aplicação. A diluição é uma técnica de regularização para reduzir o sobreajuste em redes neurais artificiais, impedindo co-adaptações complexas nos dados de treinamento. 

In [None]:
# Ajustando e validando o modelo
def evaluate_model(X_train, y_train, X_test, y_test, dropout):
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.LSTM(units = 32, return_sequences = True, 
                                   input_shape = [X_train.shape[1], X_train.shape[2]]))
    model.add(tf.keras.layers.Dropout(dropout))
    model.add(tf.keras.layers.LSTM(units = 32))
    model.add(tf.keras.layers.Dropout(dropout))
    model.add(tf.keras.layers.Dense(units = 1))
    
    # Compile model
    model.compile(loss = 'mse', optimizer = 'adam')
    # Ajustando a rede
    model.fit(X_train, y_train, epochs = 50, validation_split = 0.2, batch_size = 1, shuffle = False)
    # Validando o modelo
    loss = model.evaluate(X_test, y_test, batch_size = 1, verbose = 0)
    return loss

#Resumindo as pontuações
def summarize_results(scores, params):
    print(scores, params)
    # Resumindo média e desvio padrão
    for i in range(len(scores)):
        m, s = mean(scores[i]), std(scores[i])
        print('Param=%d: %.3f%% (+/-%.3f)' % (params[i], m, s))
        
    # Boxplot das pontuações
    pyplot.boxplot(scores, labels = params)
    pyplot.savefig('figura[0].png')

# Rodando um experimento
def run_experiment(params, repeats = 10):
    # Testando cada parâmetro
    all_scores = list()
    for p in params:
        # Repetindo experimento
        scores = list()
        for r in range(repeats):
            score = evaluate_model(X_train, y_train, X_test, y_test, p)
            print('>p=%d #%d: %.3f' % (p, r+1, score))
            scores.append(score)
        all_scores.append(scores)
        
    # Resumindo resultados
    summarize_results(all_scores, params)
    
# Rodando o experimento
n_params = [0.05, 0.1, 0.2, 0.3]
run_experiment(n_params)
    

Ajuste do modelo com os parâmetros mais adequados nos testes anteriores:

In [None]:
def evaluate_model(X_train, y_train, X_test, y_test):
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.LSTM(units = 32, return_sequences = True, 
                                   input_shape = [X_train.shape[1], 
                                                X_train.shape[2]]))
    model.add(tf.keras.layers.Dropout(0.05))
    model.add(tf.keras.layers.LSTM(units = 32))
    model.add(tf.keras.layers.Dropout(0.05))
    model.add(tf.keras.layers.Dense(units = 1))
    
    # Compile model
    model.compile(loss = 'mse', optimizer = 'adam')
    # Ajustando a rede
    model.fit(X_train, y_train, epochs = 150, validation_split = 0.2, 
             batch_size = 1, shuffle = False)
    
    # Validando o modelo
    loss = model.evaluate(X_test, y_test, batch_size = 1, verbose = 0)
    return loss

# Resumindo as pontuações 
def summarize_results(scores):
    print(scores)
    m, s = mean(scores), std(scores)
    print('Loss: %.3f%% (+/-%.3f)' % (m, s))
    
# Rodando um experimento 
def run_experiment(repeats = 10):
    # Repetindo o experimento
    scores = list()
    for r in range(repeats):
        score = evaluate_model(X_train, y_train, X_test, y_test)
        score = score
        print('>#%d: %.3f' % (r + 1, score))
        scores.append(score)
        
    # Resumindo os resultados
    summarize_results(scores)
    
# Rodando o experimento
run_experiment()

In [None]:
def create_model(units, dropout):
    model = tf.keras.models.Sequential()
    model.add(tf.keras.layers.LSTM(units = units, return_sequences = True, input_shape = 
                                   [X_train.shape[1], X_train.shape[2]]))
    model.add(tf.keras.layers.Dropout(dropout))
    model.add(tf.keras.layers.LSTM(units = units))
    model.add(tf.keras.layers.Dropout(dropout))
    model.add(tf.keras.layers.Dense(units = 1))
    
    # Compile model
    model.compile(loss = 'mse', optimizer = 'adam')
    return model

In [None]:
model_lstm = create_model(32, 0.05)

In [None]:
def fit_model(model):
    early_stop = tf.keras.callbacks.EarlyStopping(monitor = 'val_loss', patience = 10)
    history = model.fit(X_train, y_train, epochs = 150, 
                        validation_split = 0.2, batch_size = 1, shuffle = False, callbacks = [early_stop])
    return history

In [None]:
history_lstm = fit_model(model_lstm)
model_lstm.save('TCC-MBA')

Salvamento do modelo para posterior uso:

In [None]:
model_lstm = tf.keras.models.load_model('TCC-MBA')

In [None]:
y_test = scaler_y.inverse_transform(y_test)
y_train = scaler_y.inverse_transform(y_train)

In [None]:
def prediction(model):
    prediction = model.predict(X_test)
    prediction = scaler_y.inverse_transform(prediction)
    return prediction

In [None]:
prediction_lstm = prediction(model_lstm)

In [None]:
def plot_future(prediction, y_test):
    plt.figure(figsize = (10, 6))
    range_future = len(prediction)
    plt.plot(np.arange(range_future), np.array(y_test), label = 'True Future')
    plt.plot(np.arange(range_future), np.array(prediction), label = 'Prediction')
    plt.legend(loc = 'upper left')
    plt.xlabel('Mês')
    plt.ylabel('ICMS')

In [None]:
plot_future(prediction_lstm, y_test)

Cálculo do Erro Médio Absoluto e Raiz Quadrática Média:

In [None]:
def evaluate_prediction(predictions, actual, model_name):
    errors = predictions - actual
    mse = np.square(errors).mean()
    rmse = np.sqrt(mse)
    mae = np.abs(errors).mean()
    print('Mean Absolute Error: {:.4f}'.format(mae))
    print('Root Mean Square Error: {:.4f}'.format(rmse))
    print('')

In [None]:
evaluate_prediction(prediction_lstm, y_test, 'LSTM')

Pré-processamento dos dados e aplicação do modelo em toda a base de dados (treino + teste):

In [None]:
X = df.drop('ICMS', axis = 1)
y = df.loc[:, ['ICMS']]

In [None]:
y_norm = output_scaler.transform(y)
x_norm = input_scaler.transform(X)

In [None]:
X = x_norm.reshape(279, 1, 11)
y = y_norm.reshape(279, 1)

In [None]:
y = scaler_y.inverse_transform(y)

In [None]:
def prediction(model):
    prediction = model.predict(X)
    prediction = scaler_y.inverse_transform(prediction)
    return prediction

In [None]:
prediction_lstm = prediction(model_lstm)

In [None]:
def plot_future(prediction, y):
    plt.figure(figsize = (10, 6))
    range_future = len(prediction)
    plt.plot(np.arange(range_future), np.array(y), label = 'Real')
    plt.plot(np.arange(range_future), np.array(prediction), label = 'Previsto')
    plt.legend(loc = 'upper left')
    plt.xlabel('Mês')
    plt.ylabel('ICMS-RJ (Bilhões de Reais), 1999-2023')

In [None]:
plot_future(prediction_lstm, y)

In [None]:
real = y.flatten()
previsto = prediction_lstm.flatten()

In [None]:
tabela = pd.DataFrame([real, previsto]).T

In [None]:
tabela = tabela.rename(columns={0: "Real", 1: "Previsto"})

In [None]:
tabela['Diferença'] = 1 - (tabela['Real'] / tabela['Previsto'])

In [None]:
print(tabela['Diferença'].mean())

In [None]:
tabela

Distribuição dos erros

In [None]:
import seaborn as sns
sns.histplot(data = tabela, x = "Diferença", kde = True)

In [None]:
previsao = df

In [None]:
periodo = 1 # períodos adiante
for i in range(periodo):
    row = pd.DataFrame([], columns = previsao.columns)
    row.loc[0, 'SMA(12)'] = previsao.ICMS.iloc[-12:].mean()
    row.loc[0, 'SMA(6)'] = previsao.ICMS.iloc[-6:].mean()
    row.loc[0, 'SMA(3)'] = previsao.ICMS.iloc[-3:].mean()
    row.loc[0, 'SMA(2)'] = previsao.ICMS.iloc[-2:].mean()
    row.loc[0, 'lag(12)'] = previsao.ICMS.iloc[-12]
    row.loc[0, 'lag(6)'] = previsao.ICMS.iloc[-6]
    row.loc[0, 'lag(4)'] = previsao.ICMS.iloc[-4]
    row.loc[0, 'lag(3)'] = previsao.ICMS.iloc[-3]
    row.loc[0, 'lag(2)'] = previsao.ICMS.iloc[-2]
    row.loc[0, 'lag(1)'] = previsao.ICMS.iloc[-1]
    row.loc[0, 'DATA'] = previsao.DATA.iloc[-1]+1
    row = row.drop(['ICMS'], axis = 1)
    row = np.array(row.iloc[-1]).reshape(1, -1)
    row_norm = input_scaler.transform(row)
    to_prev = row_norm.reshape(1, 1, 11)
    prev = model_lstm.predict(to_prev)
    prev = scaler_y.inverse_transform(prev)
    row_ = pd.DataFrame(row, columns = ['DATA', 'SMA(12)', 'SMA(6)', 'SMA(3)', 
                                        'SMA(2)', 'lag(12)',  'lag(6)', 'lag(4)', 
                                        'lag(3)', 'lag(2)', 'lag(1)'])
    row_.loc[0, 'ICMS'] = prev[0]
    previsao = previsao.append(row_)

In [None]:
jupyter nbconvert --to slides presentation.ipynb