# Mini projeto de ETL para consumo de dados históricos da B3

# Extraction Data

O arquivo com os dados históricos da B3 não possui delimitações (|,:, ;, etc). Na verdade ele é um arquivo posicional, ou seja, ele possui uma string gigantesca com os dados da série da bovespa.

Para poder manusear estes dados, é necessário ter um documento para entender com funciona sua estrutura, sendo assim, a bolsa de valores B3 disponibiliza um documento em PDF chamado `SeriesHistoricas_Layout.pdf`.

Link Series Históricas Layout PDF: https://www.b3.com.br/data/files/C8/F3/08/B4/297BE410F816C9E492D828A8/SeriesHistoricas_Layout.pdf

O arquivo COTAHIST.AAAA.TXT contém as informações das cotações históricas relativas à negociação de todos os papéis-mercado no período de um ano, classificado pelos campos Tipo de registro, Data do pregão, Código de BDI, Nome da empresa e Código de Negociação. Esta divisão não impede que o usuário o classifique de acordo com as suas necessidades, segundo o equipamento e software a serem usados.

In [116]:
# importing the libraries

import pandas as pd

In [117]:
# LAYOUT
# DATA_DO_PREGAO = 2 - 10
# CODIGO_DO_BDI = 10 - 12
# CODIGO_DO_PAPEL = 12 - 24
# NOME_DA_ACAO = 27 - 39
# PRECO_DE_ABERTURA = 56 - 69
# PRECO_MAXIMO = 69 - 82
# PRECO_MINIMO = 82 - 95
# PRECO_FECHAMENTO = 108 - 121

def read_files(path, name_file, year_date, type_file):
    
    _file = f"{path}/{name_file}{year_date}.{type_file}"

    colspecs = [(2,10),
                (10,12),
                (12,24),
                (27,39),
                (56,69),
                (69,82),
                (82,95),
                (108,121),
                (152,170),
                (170,188)
               ]

    # column names
    names = ["data_pregao", "codbdi", "sigla_acao", "nome_acao", "preco_abertura", "preco_maximo", "preco_minimo", "preco_fechamento",
            "qtd_negocios", "volume_negocios"]

    dataframe = pd.read_fwf(_file, colspecs = colspecs, names = names, skiprows=1)
    return dataframe

# Transformation Data

Ao visualizar a estrutura dos dados, precisamos formatar alguns e descartar outros. Neste caso, segue a lista de campos que precisam passar por uma transformação em suas estruturas:

`data_pregao` = precisa estar formatado em tipo `datetime` <br>
`preco_max` = precisa estar em formato `float` <br>
`preco_min` = precisa estar em formato `float` <br>
`preco_fechamento` = precisa estar em formato `float` <br>
`preco_abertura` = precisa estar em formato `float` <br>

Ademais, estabelecer o lote padrão igual a 2 (neste caso o codbdi igual a 2), descartando os outros lotes.

In [118]:
# filtering stocks

def filter_stocks(dataframe, ):
    # filtering lots equal to 2
    dataframe = dataframe[dataframe['codbdi'] == 2]
    # discarding the codbdi column, it will no longer be necessary
    dataframe = dataframe.drop(['codbdi'], axis=1)
    return dataframe


In [119]:
# adjusting date field

def parse_date(dataframe):
    dataframe['data_pregao'] = pd.to_datetime(dataframe['data_pregao'], format="%Y%m%d")
    return dataframe

In [120]:
# adjustment of numeric fields

def parse_values(dataframe):
    dataframe['preco_abertura'] = (dataframe['preco_abertura'] / 100).astype(float)
    dataframe['preco_maximo'] = (dataframe['preco_maximo'] / 100).astype(float)
    dataframe['preco_minimo'] = (dataframe['preco_minimo'] / 100).astype(float)
    dataframe['preco_fechamento'] = (dataframe['preco_fechamento'] / 100).astype(float)
    return dataframe

# Load Data

Finalmente, inicia o processo de carregar estes dados uma vez tratados durante o o pipeline. Neste caso, foi feito o uso de uma automação que executa todos os processos da ETL em uma sequência assíncrona. 

Por final, salvamos os dados em um arquivo do tipo `.csv` - chamado de `all_bovespa`. Disponibilizando uma estrutura automatizada para consumo de dados tratados para analistas e cientistas de dados em um equipe.

In [121]:
# concatenating the files

# If the user wants to pull several historical series from B3 (because the historical data starts from 1986),
# they can iterate several files and go through ETL processing automatically.

def concat_files(path, name_file, year_date, type_file, final_file):
    
    for i, y in enumerate(year_date):
        # Processing steps (filtering, parsing dates, parsing float values)
        dataframe = read_files(path, name_file, y, type_file)
        dataframe = filter_stocks(dataframe)
        dataframe = parse_date(dataframe)
        dataframe = parse_values(dataframe)
        
        if i == 0:
            dataframe_final = dataframe
        else:
            dataframe_final = pd.concat([dataframe_final, dataframe])
            
    dataframe_final.to_csv(f'{path}/{final_file}', index=False)

In [131]:
# Running ETL program

# list of years (files you have)
year_date_list = ['2024']
# file path where the downloaded files are located
path = "/home/gabriel/Documentos/Jupyter Notebooks"
name_file = "COTAHIST_A"
type_file = "TXT"
# final file name
final_file = "all_bovespa.csv"

concat_files(path, name_file, year_date_list, type_file, final_file)