<a href="https://colab.research.google.com/github/GuilhermeMCP/Stock-Prediction/blob/main/MLP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [17]:
#Importando pacotes necessários
import pandas as pd
import matplotlib as mpl
import matplotlib.pyplot as plt
import tensorflow as tf
import numpy as np
import math
import warnings
from google.colab import files

from keras.regularizers import L1L2
from keras.preprocessing.sequence import TimeseriesGenerator
from keras.models import Sequential
from keras.layers import Dense, LSTM, Dropout, LeakyReLU
from keras.optimizers import Adam
from keras.callbacks import EarlyStopping, ReduceLROnPlateau, ModelCheckpoint, TensorBoard

from sklearn.metrics import r2_score, mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler

mpl.rcParams['figure.figsize'] = (20, 8)
mpl.rcParams['axes.grid'] = False

In [95]:
#--------------------VARIÁVEIS--------------------------

cols = ['Data', 'LNCfech', 'LNCab', 'CFech', 'Cab', 'Ibov']

df = pd.read_csv('TesteBBDC.csv', thousands=',')
df = df[2000:]
df.dropna(inplace=True)
df = df.drop(df[(df.LNCfech == '-') | (df.Ibov == '-')].index)
df = df.reset_index()
#
df = df[cols].astype(str)
for i in cols:
    for j in range(0, len(df)):
        df[i][j] = df[i][j].replace(',', '')

for col in df.columns:
  if (col != "Data") & (col != "date"):
    df = df.astype({col:"float32"})


# -----------------ANÁLISE DE CORRELAÇÃO----------------------------
#shift é correlação do preço com x dias atras
# shift = 0
# correlations = correlation_calc(cols[2], cols[1:], df, shift)
# correlations
#--------------------------------------------------------------------

#fazendo retabilidade ibovespa
change = []
change.append("-")
for i in range(0,len(df['Ibov'])-1):
  change.append(df['Ibov'].iloc[i+1] - df['Ibov'].iloc[i])
df['IbovChange'] = change

#fazendo mm10rent
change = []
for i in range(0,10):
  change.append("-")
for i in range(0,len(df['CFech'])-10):
  soma = 0
  for j in range(1,11):
    rent = df['CFech'].iloc[i+j] - df['CFech'].iloc[i]
    soma = soma + rent
  change.append(soma/10)
df['MM10'] = change


df = df.drop(df[(df.MM10 == '-') | (df.IbovChange == '-')].index)
df = df.reset_index()

cols2 = ['LNCfech', 'LNCab', 'MM10', 'IbovChange']

targetColumn = 'LNCab'
predTargetColumn = targetColumn + '_Pred'
targetColumnIndex = 1

df_input = df[cols2]
scaler = MinMaxScaler()
data_scaled = scaler.fit_transform(df_input)
features = data_scaled
target = data_scaled[:,targetColumnIndex]


horizonte_de_predicao = [0,1,2,3,4]


In [None]:
warnings.filterwarnings('ignore')
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)


for j in range(0, len(horizonte_de_predicao)):

  x_train, x_test, y_train, y_test = train_test_split(features, target, test_size=0.30, random_state=123, shuffle = False)
  
  #Faz um shift nos dados de acordo com o horizonte de predição
  if (horizonte_de_predicao[j] > 0):
    y_train = y_train[horizonte_de_predicao[j]:]
    x_train = x_train[:-horizonte_de_predicao[j]]
    y_test = y_test[horizonte_de_predicao[j]:]
    x_test = x_test[:-horizonte_de_predicao[j]]

  #esse loop existe para repetir o mesmo teste mais de uma vez
  for f in range(0, 5):

    win_length = 1
    batch_size = 256
    epochs = 100
    num_features = len(df_input.columns)
    train_generator = TimeseriesGenerator(x_train, y_train, length=win_length, sampling_rate=1, batch_size=batch_size)
    test_generator = TimeseriesGenerator(x_test, y_test, length=win_length, sampling_rate=1, batch_size=batch_size)


    model = build_neural_network( win_length=win_length, epochs=epochs, batch_size=batch_size,
                                num_features=num_features, data=train_generator, evaluate_data=test_generator,
                                  learning_rate=0.1)



    #-------------------------------------- PREDICTIONS -----------------------------------------------------
    #predictions
    predictions_TEST=model.predict_generator(test_generator)
    predictions_TEST=predictions_TEST[:,0]
    df_pred=pd.concat([pd.DataFrame(predictions_TEST), pd.DataFrame(x_test[:,1:][win_length:])], axis=1)
    rev_trans_TEST=scaler.inverse_transform(df_pred)
    df_final_TEST=df_input[predictions_TEST.shape[0]*-1:]
    df_final_TEST[predTargetColumn]=rev_trans_TEST[:,0]

    #avaliando resultado
    pocid_TEST = pocid_calculate(df_final_TEST[targetColumn], df_final_TEST[predTargetColumn], 1)
    pocid_ingenuo = pocid_calculate(df_final_TEST[targetColumn][1:], df_final_TEST[targetColumn][:-1], 1)
    theil_TEST = theil_calculate(df_final_TEST[targetColumn], df_final_TEST[predTargetColumn], 1)
    emq_TEST = emq_calculate(df_final_TEST[targetColumn], df_final_TEST[predTargetColumn])
    r2_TEST = r2_calculate(df_final_TEST[targetColumn], df_final_TEST[predTargetColumn])
    
    print("d+ (days future)",horizonte_de_predicao[j]+1)
    print("POCID ingenuo (%, bom alto) = ",pocid_ingenuo)
    print("POCID TEST (%, bom alto) = ",pocid_TEST)
    print("THEIL TEST (bom < 1) = ",theil_TEST)
    print("R2 TEST (bom prox 1) = ",r2_TEST)
    print("EMQ TEST (quanto menor melhor) = ",emq_TEST)
    print("--------------------------------------------------")
  
  print("\n\n")
  print("---------------------------------------------------------------------")



In [77]:

def emq_calculate(real, pred):
  soma = 0
  for i in range(0, len(pred)):
    soma = soma + (real.iloc[i] - pred.iloc[i])**2
  return (soma/len(pred))


def r2_calculate(real, pred):
    r2 = r2_score( real, pred )
    return r2


def pocid_calculate(Cfech, Cfech_pred, horizonte_de_predicao):
  mi = 0
  soma = 0
  for i in range(1, (len(Cfech))):
    mi = (Cfech.iloc[i]-Cfech.iloc[i-horizonte_de_predicao])*(Cfech_pred.iloc[i]-Cfech_pred.iloc[i-horizonte_de_predicao])
    if (mi > 0) :
      soma = soma + 1
  return (soma*100)/(len(Cfech)-horizonte_de_predicao-1)


def theil_calculate(Cfech, Cfech_pred, horizonte_de_predicao):
  soma1 = 0
  soma2 = 0
  for i in range(1, len(Cfech)):
    soma1 = soma1 + (Cfech.iloc[i] - Cfech_pred.iloc[i])**2
    soma2 = soma2 + (Cfech.iloc[i] - Cfech.iloc[i - horizonte_de_predicao])**2
  return math.sqrt(soma1)/math.sqrt(soma2)

def correlation_calc(principal, lista, dataFrame, shift = 0):
  correlations = []
  df_shifted = df.drop(df.index[:shift])
  df_shifted = df_shifted.reset_index(drop=True)
  for i in range(0, len(lista)):
    correlations.append(lista[i] + " = " + str(df_shifted[principal].corr(dataFrame[lista[i]])))
  return correlations


def build_neural_network(win_length, epochs, batch_size, num_features, data, evaluate_data, learning_rate):

  model = tf.keras.Sequential()
  model.add(Dense(6, input_dim=num_features, activation='sigmoid'))
  model.add(Dense(3,  activation='tanh'))
  model.add(Dense(3,  activation='linear'))
  model.add(Dense(1))

  model.compile(optimizer = Adam(learning_rate=learning_rate), loss='mean_squared_error', metrics=[tf.metrics.MeanAbsoluteError()])   
  early_stopping = EarlyStopping(monitor='var_loss', patience=2, mode='min')
  history = model.fit(data, epochs=epochs, batch_size=batch_size, validation_data=evaluate_data, shuffle=False, callbacks=[early_stopping], verbose=0)
  model.evaluate_generator(evaluate_data, verbose=0)

  return model
