In [None]:
import pandas as pd
import numpy as np
import scipy
import requests
import datetime
from tqdm import tqdm
import re
import plotly.express as px
import plotly.graph_objects as go

In [None]:
lib_path = f"technical_analysis.py"
url_lib = f"https://raw.githubusercontent.com/Genicleito/market_trading_analysis/master/lib/technical_analysis/__init__.py"
with open(lib_path, 'wb+') as f:
    f.write(requests.get(url_lib).text.encode('utf-8'))

import technical_analysis

### Leitura e configuração do DataSet

In [None]:
now = lambda: datetime.datetime.now()
read_path = f"https://github.com/Genicleito/analise-mercado-financeiro/raw/main/data/raw/hist_market_trading_yfinance.csv.zip"

In [None]:
df_raw = pd.read_csv(read_path)
df_raw.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 421366 entries, 0 to 421365
Data columns (total 40 columns):
 #   Column                  Non-Null Count   Dtype  
---  ------                  --------------   -----  
 0   date                    421366 non-null  object 
 1   ticker                  421366 non-null  object 
 2   open                    421366 non-null  float64
 3   high                    421366 non-null  float64
 4   low                     421366 non-null  float64
 5   close                   421366 non-null  float64
 6   adj_close               421366 non-null  float64
 7   volume                  421366 non-null  float64
 8   close_ema8              420603 non-null  float64
 9   close_ema20             419295 non-null  float64
 10  volume_ema20            419295 non-null  float64
 11  close_ema72             413627 non-null  float64
 12  close_ema200            399675 non-null  float64
 13  ind_volume              421366 non-null  int64  
 14  macd                

In [None]:
print(f"Quantidade de ativos: {len(df_raw['ticker'].unique())} | Quantidade de linhas: {df_raw.shape[0]} | Quantidade de colunas: {df_raw.shape[1]}")

Quantidade de ativos: 109 | Quantidade de linhas: 421366 | Quantidade de colunas: 40


In [None]:
df_raw.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 421366 entries, 0 to 421365
Data columns (total 40 columns):
 #   Column                  Non-Null Count   Dtype  
---  ------                  --------------   -----  
 0   date                    421366 non-null  object 
 1   ticker                  421366 non-null  object 
 2   open                    421366 non-null  float64
 3   high                    421366 non-null  float64
 4   low                     421366 non-null  float64
 5   close                   421366 non-null  float64
 6   adj_close               421366 non-null  float64
 7   volume                  421366 non-null  float64
 8   close_ema8              420603 non-null  float64
 9   close_ema20             419295 non-null  float64
 10  volume_ema20            419295 non-null  float64
 11  close_ema72             413627 non-null  float64
 12  close_ema200            399675 non-null  float64
 13  ind_volume              421366 non-null  int64  
 14  macd                

### Teste de Tendência `Cox-Stuart`

In [None]:
# Definição de alguns valores
ticker = 'ABEV3'
periods = 200
p = 0.05
# if not periods: periods = len(X)

df = pd.read_csv(read_path).query(f"ticker == '{ticker}'").sort_values('date').copy()

# # Unidimensional
# X = df['close'] # .to_numpy()
# # Considera todos os dados caso nenhum período tenha sido selecionado
# if not periods: periods = len(X)
# X = X[-periods:]

In [None]:
# Filtra por ativos da Petrobrás
df = df_raw.query(f"ticker == '{ticker}'").sort_values('date')

# Criar médias móveis curtas e longas
df = df.assign(**{
    'Média Móvel (8p)': df['close'].rolling(window=8).mean().round(4),
    'Média Móvel (20p)': df['close'].rolling(window=20).mean().round(4),
    'Média Móvel (72p)': df['close'].rolling(window=72).mean().round(4),
    'Média Móvel (200p)': df['close'].rolling(window=200).mean().round(4),
}).iloc[-periods:].reset_index(drop=True)

# Cria o vetor X com os preços de fechamento
X = df['close'] # .to_numpy()

In [None]:
#trend_type = "d" --> "decreasing trend" : "l"
#trend_type = "i" --> "increasing trend" : "r"

def simpleCS (x, trend_type = "l", p = 0.05):
    n0 = len(x) % 2
    if n0 == 1:
        remover = len(x) // 2
        x = np.delete(x, int((len(x))/2))

    half = len(x) // 2

    x1 = x[np.arange(0, half, dtype=int)]
    x2 = x[np.arange(half, len(x), dtype=int)]

    n = np.sum((x2 - x1) != 0)
    t = np.sum(x1 < x2)

    if trend_type == "d":
        return scipy.stats.binom.cdf(t, n, p) # p-value
    return 1 - scipy.stats.binom.cdf(t - 1, n, p) # p-value

In [None]:
############ Teste de Tendência ############
p_value = simpleCS(X.to_numpy(), trend_type='d', p=p)
print(f'Não há tendencia de baixa! \t [P-Value ({p_value}) >= p ({p}): Rejeita H0]' if p_value >= p else f'Há tendencia de baixa! \t\t [P-Value ({p_value}) < p ({p}): Aceita H0]')

p_value = simpleCS(X.to_numpy(), trend_type='i', p=p)
print(f'Não há tendencia de alta! \t [P-Value ({p_value}) >= p ({p}): Rejeita H0]' if p_value >= p else f'Há tendencia de alta! \t\t [P-Value ({p_value}) < p ({p}): Aceita H0]')

Há tendencia de baixa! 		 [P-Value (0.037081209327355036) < p (0.05): Aceita H0]
Não há tendencia de alta! 	 [P-Value (0.994079470779666) >= p (0.05): Rejeita H0]


### Análise da série

In [None]:
print(f"Valores nulos no preço de fechamento: {len(df['close'][df['close'].isna()])}")

Valores nulos no preço de fechamento: 0


In [None]:
print(f"Dados do último pregão:")
df.tail(1).reset_index(drop=True).T.rename(columns={0: 'Infos'})

Dados do último pregão:


Unnamed: 0,Infos
date,2024-02-26
ticker,ABEV3
open,12.83
high,13.1
low,12.83
close,13.04
adj_close,13.04
volume,12942000.0
close_ema8,12.88
close_ema20,13.0


In [None]:
fig = go.Figure()

fig.add_trace(
    go.Scatter(
        y=X,
        mode='lines',
        name='Preço de Fechamento',
        line = dict(width=2.5)
    )
)

fig.add_trace(
    go.Scatter(
        y=df['Média Móvel (8p)'],
        mode='lines',
        name='Média Móvel (8p)',
        line = dict(color='#cc66cc', width=1.25) # , dash='dash')
    )
)

fig.add_trace(
    go.Scatter(
        y=df['Média Móvel (20p)'],
        mode='lines',
        name='Média Móvel (20p)',
        line = dict(color='#000000', width=1.25) # , dash='dash')
    )
)

fig.add_trace(
    go.Scatter(
        y=df['Média Móvel (72p)'],
        mode='lines',
        name='Média Móvel (72p)',
        line = dict(color='#99994d', width=1.25) # , dash='dash')
    )
)

# fig.add_trace(
#     go.Scatter(
#         y=df['Média Móvel (200p)'],
#         mode='lines',
#         name='Média Móvel (200p)',
#         line = dict(color='#5c5c8a', width=2, dash='dash')
#     )
# )

fig.update_layout(
    title=f"Últimos {periods} pregões na Bolsa de Valores da ação {ticker} [Último registro: {df['date'].max()}]",
    xaxis_title="Períodos (dias)",
    yaxis_title="Preço de fechamento (R$)",
    legend_title="Série e médias móveis",
    # font=dict(
    #     family="Courier New, monospace",
    #     size=18,
    #     color="RebeccaPurple"
    # )
)

annotations = []
annotations.append(
    dict(
        xref='paper', yref='paper', x=0.9, y=-0.1,
        xanchor='center', yanchor='top', text=f"Último registro: {df['date'].max()}",
        # font=dict(
        #     family='Arial',
        #     size=12,
        #     color='rgb(150,150,150)'
        # ),
        showarrow=False
    )
)

fig.update_layout(annotations=annotations)

fig.show()

---

### Algoritmo de `Wald-Wolfowitz` (Teste de sazonalidade)

In [None]:
def simpleKW(y, freq=12):
    from scipy.stats import chi2
    Rank = np.array(pd.Series(y).rank(method='average', na_option='keep'))
    extra = freq - len(Rank) % freq
    dat = np.concatenate((np.repeat(np.nan, extra), Rank))
    yMAT = dat.reshape((int(len(dat) / freq), freq))
    Nobs = np.apply_along_axis(lambda x: np.count_nonzero(~np.isnan(x)), 0, yMAT)
    R2n = np.power(np.apply_along_axis(np.nansum, 0, yMAT), 2) / Nobs
    H = 12 / (sum(Nobs) * (sum(Nobs) + 1)) * sum(R2n) - 3 * (sum(Nobs) + 1)

    if sum(np.unique(Rank, return_counts=True)[1] > 1) > 0:
        valor = np.unique(Rank, return_counts=True)[1]
        valor = valor[valor > 1]
        sumT = sum(np.power(valor, 3) - valor)
        Correction = 1 - sumT/(np.power(len(y),3) - len(y))
        H = H / Correction

    return 1 - chi2.cdf(H, freq - 1)

In [None]:
for i in np.arange(2, 20):
    print(f"freq = {i} | p-value = {simpleKW(X, i)} \t => {'Não indica sazonalidade' if simpleKW(X, i) >= 0.05 else 'Indica sazonalidade'}")

freq = 2 | p-value = 0.9201994581777773 	 => Não indica sazonalidade
freq = 3 | p-value = 0.9866909097275774 	 => Não indica sazonalidade
freq = 4 | p-value = 0.9915802843499675 	 => Não indica sazonalidade
freq = 5 | p-value = 0.9999541648267832 	 => Não indica sazonalidade
freq = 6 | p-value = 0.9999399845211147 	 => Não indica sazonalidade
freq = 7 | p-value = 0.9998099680762884 	 => Não indica sazonalidade
freq = 8 | p-value = 0.9999932142075726 	 => Não indica sazonalidade
freq = 9 | p-value = 0.999999433483589 	 => Não indica sazonalidade
freq = 10 | p-value = 0.9999428381245381 	 => Não indica sazonalidade
freq = 11 | p-value = 0.9999997499297593 	 => Não indica sazonalidade
freq = 12 | p-value = 0.9999972575141687 	 => Não indica sazonalidade
freq = 13 | p-value = 0.9999998989703917 	 => Não indica sazonalidade
freq = 14 | p-value = 0.9999999484812143 	 => Não indica sazonalidade
freq = 15 | p-value = 0.9999999965043911 	 => Não indica sazonalidade
freq = 16 | p-value = 0.99999

#### Resultado da análise
- **`> Não indica sazonalidade`**

---

### Normalizar e suavizar dados da série (**Não necessário**)
* Não é necessário normalizar os dados pois a série tem um comportamento homogêneo com as médias móveis e não possui discrepâncias entre cada observação
    * Cada observação é um dado do pregão na bolsa de valores e portanto, também, não há dados faltantes pois os pregões acontecem e são registrados em todos os dias úteis do ano
* Também não é preciso suavizar a série, pelos mesmos motivos anteriores e por não haver ruídos nos dados registrados em cada pregão da bolsa de valores

---

### Modelo Holt-Winters (aditivo e/ou multiplicativo)

In [None]:
from statsmodels.tsa.holtwinters import ExponentialSmoothing
from sklearn.metrics import mean_squared_error as mse
from sklearn.metrics import mean_absolute_error as mae
from sklearn.metrics import mean_absolute_percentage_error as mape

In [None]:
fig = go.Figure()

fig.add_trace(
    go.Scatter(
        y=X,
        mode='lines',
        name='Preço de Fechamento',
        line = dict(width=2.5)
    )
)

fig.update_layout(
    title=f"Últimos {periods} pregões na Bolsa de Valores da ação {ticker} [Último registro: {df['date'].max()}]",
    xaxis_title="Períodos (dias)",
    yaxis_title="Preço de fechamento (R$)",
    legend_title="Série e médias móveis",
    # font=dict(
    #     family="Courier New, monospace",
    #     size=18,
    #     color="RebeccaPurple"
    # )
)

annotations = []
annotations.append(
    dict(
        xref='paper', yref='paper', x=0.9, y=-0.1,
        xanchor='center', yanchor='top', text=f"Último registro: {df['date'].max()}",
        # font=dict(
        #     family='Arial',
        #     size=12,
        #     color='rgb(150,150,150)'
        # ),
        showarrow=False
    )
)

fig.update_layout(annotations=annotations)
fig.show()

In [None]:
periods_forecast = 20
X_train = X[:len(X) - periods_forecast]
X_test = X[-periods_forecast: ]

print(f"X train: {len(X_train)} | X test: {len(X_test)}")

X train: 180 | X test: 20


In [None]:
# Model training (Aditivo)
model_add = ExponentialSmoothing(
    X_train,
    trend="add", seasonal="add",
    seasonal_periods=25
).fit()

# Prediction
Y_add = model_add.forecast(periods_forecast)

In [None]:
# Model training (Multiplicativo)
model_mul = ExponentialSmoothing(
    X_train,
    trend="mul", seasonal="mul",
    seasonal_periods=25
).fit()

# Prediction
Y_mul = model_mul.forecast(periods_forecast)

In [None]:
fig = go.Figure()

fig.add_trace(
    go.Scatter(
        y=X_train.tolist() + [None] * (len(X) - len(X_train)),
        mode='lines',
        name='Treinamento',
        line = dict(width=1.25)
    )
)

fig.add_trace(
    go.Scatter(
        y=[None] * len(X_train) + X_test.tolist(),
        mode='lines',
        name='Teste',
        line = dict(color='#cc66cc', width=1.25) # , dash='dash')
    )
)

fig.add_trace(
    go.Scatter(
        y=[None] * len(X_train) + Y_add.tolist(),
        mode='lines',
        name='Predito',
        line = dict(color='#000000', width=1.25) # , dash='dash')
    )
)

fig.update_layout(
    title=f"Predição dos valores de fechamento de {ticker} [Holt-Winters Aditivo]",
    xaxis_title="Períodos (dias)",
    yaxis_title="Preço (R$)",
    legend_title="",
    # font=dict(
    #     family="Courier New, monospace",
    #     size=18,
    #     color="RebeccaPurple"
    # )
)

annotations = []
annotations.append(
    dict(
        xref='paper', yref='paper', x=0.9, y=-0.1,
        xanchor='center', yanchor='top', text=f"Último registro: {df['date'].max()}",
        # font=dict(
        #     family='Arial',
        #     size=12,
        #     color='rgb(150,150,150)'
        # ),
        showarrow=False
    )
)

fig.update_layout(annotations=annotations)

fig.show()

In [None]:
fig = go.Figure()

fig.add_trace(
    go.Scatter(
        y=X_train.tolist() + [None] * (len(X) - len(X_train)),
        mode='lines',
        name='Treinamento',
        line = dict(width=1.25)
    )
)

fig.add_trace(
    go.Scatter(
        y=[None] * len(X_train) + X_test.tolist(),
        mode='lines',
        name='Teste',
        line = dict(color='#cc66cc', width=1.25) # , dash='dash')
    )
)

fig.add_trace(
    go.Scatter(
        y=[None] * len(X_train) + Y_mul.tolist(),
        mode='lines',
        name='Predito',
        line = dict(color='#000000', width=1.25) # , dash='dash')
    )
)

fig.update_layout(
    title=f"Predição dos valores de fechamento de {ticker} [Holt-Winters Multiplicativo]",
    xaxis_title="Períodos (dias)",
    yaxis_title="Preço (R$)",
    legend_title="",
    # font=dict(
    #     family="Courier New, monospace",
    #     size=18,
    #     color="RebeccaPurple"
    # )
)

annotations = []
annotations.append(
    dict(
        xref='paper', yref='paper', x=0.9, y=-0.1,
        xanchor='center', yanchor='top', text=f"Último registro: {df['date'].max()}",
        # font=dict(
        #     family='Arial',
        #     size=12,
        #     color='rgb(150,150,150)'
        # ),
        showarrow=False
    )
)

fig.update_layout(annotations=annotations)

fig.show()

#### Validação do modelo

In [None]:
print("Validando o modelo aditivo:\n")
print(f"MSE: {mse(X_test, Y_add)}")
print(f"RMSE: {np.sqrt(mse(X_test, Y_add))}")
print(f"MAE: {mae(X_test, Y_add)}")
print(f"MAPE: {mape(X_test, Y_add)}")

Validando o modelo aditivo:

MSE: 0.011828156034165788
RMSE: 0.10875732634708242
MAE: 0.09071059542587942
MAPE: 0.006978347797610848


In [None]:
print("Validando o modelo multiplicativo:\n")
print(f"MSE: {mse(X_test, Y_mul)}")
print(f"RMSE: {np.sqrt(mse(X_test, Y_mul))}")
print(f"MAE: {mae(X_test, Y_mul)}")
print(f"MAPE: {mape(X_test, Y_mul)}")

Validando o modelo multiplicativo:

MSE: 0.009154564865494248
RMSE: 0.09567949030745433
MAE: 0.07954801040045849
MAPE: 0.006116416791152478


#### Comparação das validações dos resultados
* Modelo multiplicativo teve menores indicadores de erro

In [None]:
print(f"{'`MSE`   Aditivo = ' + str(round(mse(X_test, Y_add)), 5) if mse(X_test, Y_add) < mse(X_test, Y_mul) else 'MSE   Multiplicativo = ' + str(round(mse(X_test, Y_mul), 5))}")
print(f"{'`RMSE`  Aditivo = ' + str(round(np.sqrt(mse(X_test, Y_add))), 5) if np.sqrt(mse(X_test, Y_add)) < np.sqrt(mse(X_test, Y_mul)) else 'RMSE  Multiplicativo = ' + str(round(np.sqrt(mse(X_test, Y_mul)), 5))}")
print(f"{'`MAE`   Aditivo = ' + str(round(mae(X_test, Y_add)), 5) if mae(X_test, Y_add) < mae(X_test, Y_mul) else 'RMSE  Multiplicativo = ' + str(round(mae(X_test, Y_mul), 5))}")
print(f"{'`MAPE`  Aditivo = ' + str(round(mape(X_test, Y_add)), 5) if mape(X_test, Y_add) < mape(X_test, Y_mul) else 'RMSE  Multiplicativo = ' + str(round(mape(X_test, Y_mul), 5))}")

MSE   Multiplicativo = 0.00915
RMSE  Multiplicativo = 0.09568
RMSE  Multiplicativo = 0.07955
RMSE  Multiplicativo = 0.00612


In [None]:
### Analisando o Erro Médio Absoluto (MAE)
df[['date', 'close']].rename(columns={'close': 'test'}).loc[X_test.index].assign(**{
    'predict': Y_mul,
    'mae': (df['close'].loc[X_test.index] - Y_mul).abs()
}).sort_values('date', ascending=False)

Unnamed: 0,date,test,predict,mae
199,2024-02-26,13.04,12.960055,0.079945
198,2024-02-23,12.89,12.932503,0.042503
197,2024-02-22,12.96,12.891116,0.068884
196,2024-02-21,12.9,12.781981,0.118019
195,2024-02-20,12.85,12.80291,0.04709
194,2024-02-19,12.81,12.783181,0.026819
193,2024-02-16,12.76,12.86239,0.10239
192,2024-02-15,12.85,12.749342,0.100658
191,2024-02-14,12.92,12.732426,0.187574
190,2024-02-09,12.95,12.844021,0.105979


Site: https://trading-analysis.streamlit.app/

#### SARIMA

In [None]:
!pip install pmdarima



In [None]:
import statsmodels
import statsmodels.api as sm

from pmdarima import auto_arima
import warnings
warnings.filterwarnings('ignore')

In [None]:
stepwise_fit = auto_arima(
    X_train.to_numpy(),
    start_p=0,
    start_q=0,
    max_p=6,
    max_q=3,
    information_criterion = 'aic',
    seasonal=True,
    trace=True,
    m=25
)

Performing stepwise search to minimize aic
 ARIMA(0,1,0)(1,0,1)[25] intercept   : AIC=-100.766, Time=0.73 sec
 ARIMA(0,1,0)(0,0,0)[25] intercept   : AIC=-104.368, Time=0.07 sec
 ARIMA(1,1,0)(1,0,0)[25] intercept   : AIC=-102.656, Time=0.37 sec
 ARIMA(0,1,1)(0,0,1)[25] intercept   : AIC=-102.785, Time=0.59 sec
 ARIMA(0,1,0)(0,0,0)[25]             : AIC=-106.073, Time=0.03 sec
 ARIMA(0,1,0)(1,0,0)[25] intercept   : AIC=-102.395, Time=0.49 sec
 ARIMA(0,1,0)(0,0,1)[25] intercept   : AIC=-102.402, Time=0.28 sec
 ARIMA(1,1,0)(0,0,0)[25] intercept   : AIC=-104.626, Time=0.06 sec
 ARIMA(0,1,1)(0,0,0)[25] intercept   : AIC=-104.752, Time=0.08 sec
 ARIMA(1,1,1)(0,0,0)[25] intercept   : AIC=-102.770, Time=0.21 sec

Best model:  ARIMA(0,1,0)(0,0,0)[25]          
Total fit time: 2.947 seconds


In [None]:
stepwise_fit = auto_arima(
    X_train.to_numpy(),
    start_p=0,
    start_q=0,
    max_p=6,
    max_q=3,
    information_criterion = 'bic',
    # stepwise = False,
    seasonal=True,
    trace=True,
    m=25
)

In [None]:
stepwise_fit.summary()

In [None]:
mod = sm.tsa.statespace.SARIMAX(
    X_train.to_numpy(),
    order=(0, 1, 0),
    seasonal_order=(0, 0, 0, 25),
    enforce_stationarity=False,
    enforce_invertibility=False
)

results = mod.fit(disp=0)

In [None]:
print(f'AIC: {results.aic} | BIC: {results.bic}')

In [None]:
Y = results.predict(
    start=len(X_train),
    end=len(X_train) + len(X_test) - 1
)#.rename('Forecast')

In [None]:
fig = go.Figure()

fig.add_trace(
    go.Scatter(
        y=X_train.tolist() + [None] * (len(X) - len(X_train)),
        mode='lines',
        name='Treinamento',
        line = dict(width=1.25)
    )
)

fig.add_trace(
    go.Scatter(
        y=[None] * len(X_train) + X_test.tolist(),
        mode='lines',
        name='Teste',
        line = dict(color='#cc66cc', width=1.25) # , dash='dash')
    )
)

fig.add_trace(
    go.Scatter(
        y=[None] * len(X_train) + Y.tolist(),
        mode='lines',
        name='Predito',
        line = dict(color='#000000', width=1.25) # , dash='dash')
    )
)

fig.update_layout(
    title=f"Predição dos valores de fechamento de {ticker}",
    xaxis_title="Períodos (dias)",
    yaxis_title="Preço (R$)",
    legend_title="",
    # font=dict(
    #     family="Courier New, monospace",
    #     size=18,
    #     color="RebeccaPurple"
    # )
)

annotations = []
annotations.append(
    dict(
        xref='paper', yref='paper', x=0.9, y=-0.1,
        xanchor='center', yanchor='top', text=f"Último registro: {df['date'].max()}",
        # font=dict(
        #     family='Arial',
        #     size=12,
        #     color='rgb(150,150,150)'
        # ),
        showarrow=False
    )
)

fig.update_layout(annotations=annotations)

fig.show()