In [1]:
import yfinance as yf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from dotenv import load_dotenv
import os
import requests
from transformers import AutoTokenizer, BertForSequenceClassification
import torch
import PyPDF2
import json

# Coleta de Dados

In [2]:
folders = ["dataset", "dataset/prices", "dataset/prices_processed"]

# Verifica se as pastas existem, se não, cria-as
for folder in folders:
    if not os.path.exists(folder):
        os.makedirs(folder)
        print(f"Pasta '{folder}' foi criada.")
    else:
        print(f"Pasta '{folder}' já existe.")

Pasta 'dataset' já existe.
Pasta 'dataset/prices' já existe.
Pasta 'dataset/prices_processed' já existe.


In [15]:
# API YahooFinance para baixar os dados historicos
def HistoricalData(ticker, startDate, endDate, path2save = ''):

  """
  ticker: Simbolo ação. Ex: VALE
  startDate: Data inicial. Ex: 2010-01-01
  endDate: Data final. Ex: 2020-12-31
  path2save: Caminho para salvar o dataframe.
  """
  data = yf.download(ticker, start=startDate, end=endDate)
  df = pd.DataFrame(data)

  if path2save != '':
    df.to_csv(path2save)

  return df


dados = HistoricalData("ANIM3.SA", startDate='2010-01-01', endDate='2023-01-01')
dados

[*********************100%***********************]  1 of 1 completed


Unnamed: 0_level_0,Open,High,Low,Close,Adj Close,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
2013-10-28,6.500000,6.500000,6.353333,6.403333,5.971816,12658800
2013-10-29,6.400000,6.416666,6.283333,6.310000,5.884772,1794300
2013-10-30,6.266666,6.266666,6.170000,6.170000,5.754208,1532100
2013-10-31,6.116666,6.436666,6.116666,6.393333,5.962490,2972700
2013-11-01,6.323333,6.656666,6.296666,6.633333,6.186316,539700
...,...,...,...,...,...,...
2022-12-23,3.990000,4.190000,3.920000,4.180000,4.180000,4693300
2022-12-26,4.170000,4.260000,4.060000,4.160000,4.160000,1076000
2022-12-27,4.170000,4.230000,3.830000,3.850000,3.850000,4134200
2022-12-28,3.880000,4.110000,3.820000,4.110000,4.110000,5200300


In [3]:
# Criando ferramenta para adicionar novos campos para o dataframe
def catalog_return(row, x, name_return):
    if row[name_return] > x * row[f'Cumulative_std_{name_return}']:
        return 1
    elif row[name_return] < -x * row[f'Cumulative_std_{name_return}']:
        return -1
    else:
        return 0


class DataProcessing:
    def __init__(self, data):
        self.dataframe = data
        self.dataframe['Date'] = pd.to_datetime(self.dataframe['Date'])
        self.dataframe = self.dataframe.sort_values(by='Date')

    def get_by_date_range(self, start_date, end_date):
        mask = ((self.dataframe['Date'] >= start_date) & (self.dataframe['Date'] <= end_date))
        return self.dataframe.loc[mask]

    def get_by_date(self, date):
        return self.dataframe.loc[(self.dataframe['Date'] == date)]

    def create_return_by_period(self, name_return, period, remove_nan=False):
        self.dataframe[f'{name_return}'] = np.log(
            self.dataframe['Close'] / self.dataframe['Close'].shift(period))
        if remove_nan:
            self.dataframe = self.dataframe.dropna()

    def create_cumulative_std(self, name_return):
        self.dataframe[f'Cumulative_std_{name_return}'] = self.dataframe[name_return].expanding().std()

    def create_indicator(self, name_return, factor):
        self.dataframe[f'Indicator_{name_return}'] = self.dataframe.apply(lambda row:
                                                                          catalog_return(row, factor, name_return),
                                                                          axis=1)

In [16]:
# Baixar precos de varias empresas
empresas = ["ANIM3.SA", "AZUL4.SA", "BBAS3.SA", "MGLU3.SA"]
for emp in empresas:
  HistoricalData(ticker=emp, startDate="2010-01-01", endDate="2024-01-01", path2save=f"dataset/prices/{emp}.csv")

[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed
[*********************100%***********************]  1 of 1 completed


In [42]:
files = os.listdir('dataset/prices')
for file in files:
    data_processed = DataProcessing(pd.read_csv(f'dataset/prices/{file}'))
    data_processed.create_return_by_period(name_return='Daily_Return', period=1, remove_nan=False)
    data_processed.create_return_by_period(name_return='Week_Return', period=6, remove_nan=False)
    data_processed.create_return_by_period(name_return='Month_Return', period=22, remove_nan=False)
    data_processed.create_cumulative_std(name_return='Daily_Return')
    data_processed.create_cumulative_std(name_return='Week_Return')
    data_processed.create_cumulative_std(name_return='Month_Return')
    data_processed.create_indicator(name_return='Daily_Return', factor=0.1)
    data_processed.create_indicator(name_return='Week_Return', factor=0.1)
    data_processed.create_indicator(name_return='Month_Return', factor=0.1)
    data_processed.dataframe.to_csv(f'dataset/prices_processed/{file}', index_label=False)
    print(f'File {file} created and save in dataset/prices_processed/{file}')

In [39]:
load_dotenv()
userName = os.getenv("USERNAME")
password = os.getenv("PASSWORD")

In [43]:
def EventsDate(ticker, userName=userName, password=password, startDate='01012010', endDate="01012024"):
  url = "https://www.comdinheiro.com.br/Clientes/API/EndPoint001.php"
  querystring = {"code":"import_data"}
  payload = f"username={userName}&password={password}&URL=HistoricoIndicadoresFundamentalistas001.php%3F%26data_ini%3D{startDate}%26data_fim%3D{endDate}%26trailing%3D12%26conv%3DMIXED%26moeda%3DMOEDA_ORIGINAL%26c_c%3Dconsolidado%26m_m%3D1000000%26n_c%3D5%26f_v%3D1%26papel%3D{ticker}%26indic%3DNOME_EMPRESA%2BRL%2BLL%2BEBITDA%2BDATA_PUBLICACAO%2BPRECO_ABERTURA%2BPRECO_FECHAMENTO%26periodicidade%3Dtri%26graf_tab%3Dtabela%26desloc_data_analise%3D1%26flag_transpor%3D0%26c_d%3Dd%26enviar_email%3D0%26enviar_email_log%3D0%26cabecalho_excel%3Dmodo1%26relat_alias_automatico%3Dcmd_alias_01&format=json3"
  headers = {'Content-Type': 'application/x-www-form-urlencoded'}
  response = requests.request("POST", url, data=payload, headers=headers, params=querystring)
  data = json.loads(response.text)
  df = pd.DataFrame(data["tables"]["tab0"]).T
  novas_colunas = ["Data", "Empresa", "Receita", "Lucro", "EBITDA", "Data_Publicacao", "Preco_Abertura", "Preco_fechamento", "Consolidado", "Convencao", "Moeda", "Data_Demonstracao", "Meses", "Data_Analise"]
  df.columns = novas_colunas
  df = df.drop("lin0")
  df['Data_Publicacao'] = pd.to_datetime(df['Data_Publicacao'], errors = 'coerce', format='%d/%m/%Y')
  df.reset_index(drop=True, inplace=True)
  df['Data_Publicacao'] = pd.to_datetime(df['Data_Publicacao'], format='%d/%m/%Y').dt.strftime('%Y-%m-%d')
  return df


In [41]:
files = os.listdir('dataset/prices_processed')
for emp in files:
  price_p = pd.read_csv(f"dataset/prices_processed/{emp}")
  date_df = EventsDate(ticker= emp[0:-7])
  price_p.insert(1, 'event', price_p['Date'].apply(lambda date: 1 if date in date_df['Data_Publicacao'].values else 0))
  price_p.to_csv(f"dataset/prices_processed/{emp}", index=False)

In [9]:
df_processed = pd.read_csv("dataset/prices_processed/ITUB4.SA.csv")
df_processed

Unnamed: 0,Date,event,Open,High,Low,Close,Adj Close,Volume,Daily_Return,Week_Return,Month_Return,Cumulative_std_Daily_Return,Cumulative_std_Week_Return,Cumulative_std_Month_Return,Indicator_Daily_Return,Indicator_Week_Return,Indicator_Month_Return
0,2010-01-04,0,17.708261,18.436810,17.708261,18.268333,10.518468,11843397,,,,,,,0,0,0
1,2010-01-05,0,18.313868,18.386723,18.168158,18.386723,10.586637,8593315,0.006460,,,,,,0,0,0
2,2010-01-06,0,18.327526,18.436810,18.077089,18.227352,10.494876,10602572,-0.008706,,,0.010723,,,-1,0,0
3,2010-01-07,0,18.099855,18.236460,18.008787,18.040663,10.387384,9966567,-0.010295,,,0.009249,,,-1,0,0
4,2010-01-08,0,18.113516,18.113516,17.721922,17.767456,10.230077,9748709,-0.015260,,,0.009366,,,-1,0,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3220,2022-12-23,0,25.030001,25.510000,24.870001,25.190001,24.271969,25327300,0.015604,0.072857,-0.032034,0.020074,0.046628,0.089398,1,1,-1
3221,2022-12-26,0,24.950001,25.180000,24.549999,24.690001,23.790190,15730800,-0.020049,0.041768,-0.087214,0.020074,0.046627,0.089398,-1,1,-1
3222,2022-12-27,0,24.660000,24.730000,24.270000,24.549999,23.655294,17203600,-0.005687,0.006539,-0.057000,0.020071,0.046620,0.089390,-1,1,-1
3223,2022-12-28,0,24.750000,25.200001,24.520000,25.049999,24.137072,22696400,0.020162,0.016097,-0.018589,0.020071,0.046613,0.089377,1,1,-1


In [None]:
def separate_returns(final_data):
    # Inicializar colunas se não existirem
    if 'return_daily' not in final_data.columns:
        final_data['return_daily'] = np.nan
    if 'return_week' not in final_data.columns:
        final_data['return_week'] = np.nan
    if 'return_month' not in final_data.columns:
        final_data['return_month'] = np.nan

    first_return_daily_list = []
    remaining_return_daily_list = []
    first_return_week_list = []
    remaining_return_week_list = []
    first_return_month_list = []
    remaining_return_month_list = []

    start_idx = 0

    while start_idx < len(final_data):
        # Encontra o evento
        if 1 in final_data[start_idx:]['event'].values:
            event_idx = final_data[start_idx:]['event'].eq(1).idxmax()
        else:
            break
        # Encontrar o próximo evento
        if 1 in final_data[event_idx+1:]['event'].values:
            prox_event_idx = final_data[event_idx+1:]['event'].eq(1).idxmax()
        else:
            prox_event_idx = len(final_data)

        # Calcular o primeiro retorno diario logo após o evento
        if event_idx + 1 < len(final_data):
            final_data.loc[event_idx + 1, 'return_daily'] = np.log(final_data.loc[event_idx + 1, 'Close'] / final_data.loc[event_idx, 'Close'])
            first_return_daily_list.append(final_data.iloc[event_idx + 1])

        # Calcular os retornos diarios restantes até o próximo evento
        for i in range(event_idx + 2, prox_event_idx, 1):
            final_data.loc[i, 'return_daily'] = np.log(final_data.loc[i, 'Close'] / final_data.loc[i-1, 'Close'])
            remaining_return_daily_list.append(final_data.iloc[i])

        # Calcular o primeiro retorno semanal logo após o evento
        if event_idx + 5 < len(final_data):
            final_data.loc[event_idx + 5, 'return_week'] = np.log(final_data.loc[event_idx + 5, 'Close'] / final_data.loc[event_idx, 'Close'])
            first_return_week_list.append(final_data.iloc[event_idx + 5])

        # Calcular os retornos semanais restantes até o próximo evento
        for i in range(event_idx + 10, prox_event_idx, 5):
            final_data.loc[i, 'return_week'] = np.log(final_data.loc[i, 'Close'] / final_data.loc[i-5, 'Close'])
            remaining_return_week_list.append(final_data.iloc[i])


        # Calcular o primeiro retorno mensal logo após o evento
        if event_idx + 21 < len(final_data):
            final_data.loc[event_idx + 21, 'return_month'] = np.log(final_data.loc[event_idx + 21, 'Close'] / final_data.loc[event_idx, 'Close'])
            first_return_month_list.append(final_data.iloc[event_idx + 21])

        # Calcular os retornos mensais restantes até o próximo evento
        for i in range(event_idx + 42, prox_event_idx, 22):
            final_data.loc[i, 'return_month'] = np.log(final_data.loc[i, 'Close'] / final_data.loc[i-21, 'Close'])
            remaining_return_month_list.append(final_data.iloc[i])

        # Reinicia após o evento
        start_idx = event_idx + 1

    # Criar DataFrames para o primeiro e os demais retornos semanais e mensais
    first_return_daily_df = pd.DataFrame(first_return_daily_list).reset_index()[['index',
    'Close', 'Date'
, 'event', 'return_daily']]
    remaining_return_daily_df = pd.DataFrame(remaining_return_daily_list).reset_index()[['index',
 'Close', 'Date'
, 'event', 'return_daily']]
    first_return_week_df = pd.DataFrame(first_return_week_list).reset_index()[['index',
 'Close', 'Date'
, 'event', 'return_week']]
    remaining_return_week_df = pd.DataFrame(remaining_return_week_list).reset_index()[['index',
 'Close', 'Date'
, 'event', 'return_week']]
    first_return_month_df = pd.DataFrame(first_return_month_list).reset_index()[['index',
 'Close', 'Date'
, 'event', 'return_month']]
    remaining_return_month_df = pd.DataFrame(remaining_return_month_list).reset_index()[['index',
 'Close', 'Date'
, 'event', 'return_month']]

    first_return_daily_df.rename(columns={'return_daily': 'return'}, inplace=True)
    remaining_return_daily_df.rename(columns={'return_daily': 'return'}, inplace=True)
    first_return_week_df.rename(columns={'return_week': 'return'}, inplace=True)
    remaining_return_week_df.rename(columns={'return_week': 'return'}, inplace=True)
    first_return_month_df.rename(columns={'return_month': 'return'}, inplace=True)
    remaining_return_month_df.rename(columns={'return_month': 'return'}, inplace=True)

    return first_return_daily_df, remaining_return_daily_df, first_return_week_df, remaining_return_week_df, first_return_month_df, remaining_return_month_df


first_return_daily_df, remaining_return_daily_df, first_return_week_df, remaining_return_week_df, first_return_month_df, remaining_return_month_df = separate_returns(df_processed)

# Análise de Sentimento

In [None]:
name_prices = {
    'azul': 'AZUL4.SA',
    'BB': 'BBAS3.SA',
    'bradesco': 'BBDC4.SA',
    'brf': 'BRFS3.SA',
    'ccr': 'CCRO3.SA',
    'cosan': 'CSAN3.SA',
    'cpfl_energia': 'CPFE3.SA',
    'dasa': 'DASA3.SA',
    'fleury': 'FLRY3.SA',
    'gol': 'GOLL4.SA',
    'hapvida': 'HAPV3.SA',
    'itau': 'ITUB4.SA',
    'locaweb': 'LWSA3.SA',
    'magazine_luiza': 'MGLU3.SA',
    'mrv_engenharia': 'MRVE3.SA',
    'natura': 'NTCO3.SA',
    'petrobras': 'PETR4.SA',
    'santander': 'SANB11.SA',
    'sul_america': 'SULA11.SA',
    'totvs': 'TOTS3.SA',
    'vale': 'VALE3.SA',
    'via_varejo': 'BHIA3.SA'

}


def contar_indicadores(dataframe1, dataframe2, indicador, tipo=1):
    contagem = []
    for i, row in dataframe1.iterrows():
        inicio = row['data']
        fim = row['prox_reuniao']
        if not pd.isna(fim):
            filtro = dataframe2[(dataframe2['Date'] >= inicio) & (dataframe2['Date'] <= fim) & (dataframe2[f'Indicator_{indicador}'] == tipo)]
        else:
            fim = inicio + pd.DateOffset(months=3)
            filtro = dataframe2[(dataframe2['Date'] >= inicio) & (dataframe2['Date'] <= fim) & (dataframe2[f'Indicator_{indicador}'] == tipo)]
        contagem.append(len(filtro))
    return contagem


def extract_text_from_pdf(pdf_path):
    # Abrir o arquivo PDF
    with open(pdf_path, 'rb') as file:
        # Criar um objeto PDF Reader
        reader = PyPDF2.PdfReader(file)

        # Inicializar uma variável para armazenar o texto
        all_text = ""

        # Iterar sobre cada página e extrair o texto
        for page_num in range(len(reader.pages)):
            page = reader.pages[page_num]
            all_text += page.extract_text()
    # Remover os caracteres de nova linha
    all_text = all_text.replace('\n', ' ')
    # Segmentar o texto usando "." como separador
    segments = all_text.split('. ')

    # Remover espaços em branco desnecessários e segmentos vazios
    segments = [segment.strip() for segment in segments if segment.strip() and len(segment.strip()) >= 15]

    return segments


In [None]:
file_data_transcripts = os.listdir('dataset/transcripts')
print(file_data_transcripts)
for file in file_data_transcripts:
    if file not in name_prices:
        print(f"Skipping file: {file}")
        continue
    if os.path.exists(f'dataset/transcripts_and_returns/{name_prices[file]}.csv'):
        print("Transcript and returns file already exists for: " + name_prices[file])
        continue
    if not os.path.exists(f'dataset/transcripts/{file}/datas.csv'):
        print("Data file does not exist: "+file)
        continue
    df = pd.read_csv(f'dataset/transcripts/{file}/datas.csv')
    df = df.sort_values(by=['data'], ignore_index=True)
    df['data'] = pd.to_datetime(df['data'])
    df['prox_reuniao'] = df['data'].shift(-1)
    df['prox_reuniao'] = pd.to_datetime(df['prox_reuniao'])
    prices = pd.read_csv(f'dataset/prices_processed/{name_prices[file]}.csv')
    prices['Date'] = pd.to_datetime(prices['Date'])
    df['Daily_Return_Positive'] = contar_indicadores(df, prices, 'Daily_Return', tipo=1)
    df['Daily_Return_Neutral'] = contar_indicadores(df, prices, 'Daily_Return', tipo=0)
    df['Daily_Return_Negative'] = contar_indicadores(df, prices, 'Daily_Return', tipo=-1)
    df['Daily_Return_Total'] = df['Daily_Return_Positive'] + df['Daily_Return_Neutral'] + df['Daily_Return_Negative']
    df['Week_Return_Positive'] = contar_indicadores(df, prices, 'Week_Return', tipo=1)
    df['Week_Return_Neutral'] = contar_indicadores(df, prices, 'Week_Return', tipo=0)
    df['Week_Return_Negative'] = contar_indicadores(df, prices, 'Week_Return', tipo=-1)
    df['Week_Return_Total'] = df['Week_Return_Positive'] + df['Week_Return_Neutral'] + df['Week_Return_Negative']
    df['Month_Return_Positive'] = contar_indicadores(df, prices, 'Month_Return', tipo=1)
    df['Month_Return_Neutral'] = contar_indicadores(df, prices, 'Month_Return', tipo=0)
    df['Month_Return_Negative'] = contar_indicadores(df, prices, 'Month_Return', tipo=-1)
    df['Month_Return_Total'] = df['Month_Return_Positive'] + df['Month_Return_Neutral'] + df['Month_Return_Negative']
    df.to_csv(f'dataset/transcripts_and_returns/{name_prices[file]}.csv', index=False)
    print(f"Transcript and return created in: {name_prices[file]}.csv")

['.ipynb_checkpoints', 'magazine_luiza']
Skipping file: .ipynb_checkpoints
Transcript and returns file already exists for: MGLU3.SA


In [None]:
pred_mapper = {
    0: 1,
    1: -1,
    2: 0
  }
tokenizer = AutoTokenizer.from_pretrained("lucas-leme/FinBERT-PT-BR")
finbertptbr = BertForSequenceClassification.from_pretrained("lucas-leme/FinBERT-PT-BR")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [None]:
files = ['magazine_luiza']  # os.listdir("../../dataset/transcripts")
for file in files:
    try:
        df = pd.read_csv(f"dataset/transcripts_and_returns/{name_prices[file]}.csv")
        for i, row in df.iterrows():
            # if not pd.isna(row['positive_sentiment']):
            #     print(row['positive_sentiment'])
            #     continue
            text_extract_name = row['trimestre']
            text = extract_text_from_pdf(f'dataset/transcripts/{file}/{text_extract_name}.pdf')
            tokens = tokenizer(text, return_tensors="pt",
                                   padding=True, truncation=True, max_length=512)

            finbertptbr_outputs = finbertptbr(**tokens)
            preds = [pred_mapper[np.argmax(pred)] for pred in finbertptbr_outputs.logits.cpu().detach().numpy()]
            posi, neut, nega = preds.count(1), preds.count(0), preds.count(-1)
            df.loc[i, 'positive_sentiment'] = posi
            df.loc[i, 'neutral_sentiment'] = neut
            df.loc[i, 'negative_sentiment'] = nega
            df.to_csv(f"dataset/transcripts_and_returns/{name_prices[file]}.csv", index=False)

            del tokens, finbertptbr_outputs, preds
            torch.cuda.empty_cache()

    except FileNotFoundError as e:
        print(f"Erro de arquivo não encontrado: {e}")
    except Exception as e:
        print(f"Ocorreu um erro: {e}")

Ocorreu um erro: name 'pd' is not defined


# Previsão de Preços

In [12]:
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error

In [19]:
# Criar um DataFrame
df = pd.read_csv('dataset/prices/AZUL4.SA.csv')

# Converter a coluna 'Date' para o formato datetime
df['Date'] = pd.to_datetime(df['Date'])

# Extrair recursos da data (Ano, Mês, Dia) para usar como variáveis independentes
df['Ano'] = df['Date'].dt.year
df['Mes'] = df['Date'].dt.month
df['Dia'] = df['Date'].dt.day

# Definir variáveis independentes (X) e dependentes (y)
X = df[['Ano', 'Mes', 'Dia']]  # variáveis independentes
y = df['Close']  # variável dependente

# Dividir os dados em conjuntos de treino e teste (80% treino, 20% teste)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Criar o modelo Random Forest
modelo = RandomForestRegressor(n_estimators=100, random_state=42)

# Treinar o modelo
modelo.fit(X_train, y_train)

# Fazer previsões no conjunto de teste
y_pred = modelo.predict(X_test)

# Avaliar o modelo usando o erro quadrático médio (MSE)
mse = mean_squared_error(y_test, y_pred)
print(f'Erro Quadrático Médio: {mse}')

# Prever o preço para uma nova data (ex: 2024-08-11)
nova_data = pd.to_datetime('2024-08-11')
ano = nova_data.year
mes = nova_data.month
dia = nova_data.day

novo_dia = np.array([[ano, mes, dia]])
preco_previsto = modelo.predict(novo_dia)
print(f'Preço previsto para 2024-08-11: {preco_previsto[0]}')

Erro Quadrático Médio: 0.8151249761458873
Preço previsto para 2024-08-11: 17.2836993598938


