In [None]:
%%writefile /content/MVP_Objetivo.md

#@title **Objetivo do Projeto**

### 1. Propósito do MVP

Este projeto tem como objetivo principal a criação de um pipeline para extração, transformação, carga, análise e previsão da movimentação intradiária dos preços de um ativo financeiro em intervalos de 5 minutos. O modelo preditivo central será baseado em redes neurais recorrentes (LSTM), mas outras abordagens serão exploradas. O MVP visa garantir previsões para embasar decisões estratégicas de day trade.

### 2. Problema a Ser Resolvido

A alta volatilidade dos mercados financeiros exige ferramentas robustas para antecipação de movimentos de preço. A dificuldade está em capturar padrões de curto prazo e projetá-los com precisão. Traders e investidores necessitam de um modelo que consiga interpretar os padrões históricos e transformá-los em previsões úteis.

### 3. Pipeline do Projeto

O projeto será estruturado em sete etapas principais:

1. **Extração e armazenamento dos dados brutos:** Coleta de dados históricos de ativos financeiros em intervalos de 15 minutos utilizando a API do Yahoo Finance (yfinance). Os dados serão armazenados em um repositório GitHub sincronizado com Google Colab, garantindo acesso remoto e backup na nuvem.

2. **Limpeza e organização dos dados:** Padronização de colunas, tratamento de dados ausentes, eliminação de duplicatas e organização cronológica. Resultado salvo como `dados_limpos.csv`.

3. **Transformação e engenharia de features:** Adição de indicadores técnicos (como médias móveis, RSI, MACD), criação de variáveis de lag e retornos. Resultado salvo como `dados_transformados.csv`.

4. **Modelagem e estruturação do banco de dados:**

   a) Organização em arquivos CSV:

   - `dados_brutos.csv`: dados originais extraídos da API.
   - `dados_limpos.csv`: após limpeza e padronização.
   - `dados_transformados.csv`: após adição de features técnicas.
   - `dados_final.csv`: versão padronizada e normalizada dos dados.

   b) Banco de dados dimensional:

   - **Fato**: `fato_precos`, contendo os valores de fechamento e chaves para dimensões.
   - **Dimensões**:
     - `dim_tempo`: atributos temporais.
     - `dim_indicadores`: indicadores técnicos.
     - `dim_lags`: variações e lags recentes.

   Um **Catálogo de Dados** será elaborado com descrição, domínio, categorias e linhagem de cada variável.

5. **Carga e Pipeline ETL:** Pipeline estruturado com as seguintes etapas:

   - **Extração:** via API do Yahoo Finance.
   - **Limpeza:** tratamento e estruturação básica.
   - **Transformação:** geração de variáveis técnicas e derivadas.
   - **Carga:** integração dos dados transformados no banco dimensional (fato e dimensões).

6. **Treinamento e ajuste do modelo:** Implementação e ajuste de modelos preditivos (LSTM como baseline), com avaliação por métricas como MSE e R².

7. **Interpretação dos resultados e resposta às perguntas:** Validação das previsões, análise de variáveis relevantes e contribuição dos resultados para decisões de trading.

### 4. Perguntas a Serem Respondidas

- É possível prever com precisão a movimentação intradiária de um ativo a cada 15 minutos?
- Os dados do dia anterior fornecem informações suficientes para a previsão do dia seguinte?
- A modelagem com LSTM captura corretamente as tendências de curto prazo?
- A previsão da movimentação intradiária também permite derivar com precisão as targets globais do dia (abertura, mínima, máxima e fechamento)?
- Quais indicadores técnicos e features são mais relevantes para melhorar a acurácia do modelo?
- Como considerar corretamente as quebras de fim de semana (exemplo: prever a segunda-feira usando os dados de sexta-feira)?
- A normalização e padronização das variáveis melhora a precisão do modelo?

### 5. Critérios de Sucesso

Para que o MVP seja considerado bem-sucedido o esperado é que:

1. O pipeline de extração, transformação e previsão funcione de forma eficiente.
2. O modelo consiga prever a movimentação dos preços com um erro médio aceitável (avaliado por MSE ou R2).
3. A previsão de targets globais (abertura, máxima, mínima, fechamento) seja consistente com os valores reais.
4. O modelo consiga lidar corretamente com fins de semana e feriados.
5. As previsões sejam suficientes para auxiliar na tomada de decisão de trading.



In [None]:
#@title ## Instalando dependências
'''
Usamos `-q` para ocultar a saída detalhada e mostrar apenas a barra de progresso

'''
!pip install -q tensorflow > /dev/null  # Framework para redes neurais e deep learning
!pip install -q keras > /dev/null  # Biblioteca de alto nível para redes neurais
!pip install -q pandas > /dev/null  # Manipulação e análise de dados
!pip install -q numpy > /dev/null  # Computação numérica eficiente
!pip install -q matplotlib > /dev/null  # Visualização de gráficos e análise exploratória
!pip install -q scikit-learn > /dev/null  # Ferramentas para pré-processamento e métricas de avaliação
!pip install -q gitpython > /dev/null  # Gerenciamento de repositórios Git via Python
!pip install -q python-dotenv > /dev/null  # Manipulação de variáveis de ambiente
!pip install -q seaborn > /dev/null  # Biblioteca de visualização estatística baseada no Matplotlib
!pip install -q yfinance > /dev/null  # Coleta de dados financeiros diretamente do Yahoo Finance
!pip install -q sqlalchemy > /dev/null  # ORM para interagir com bancos de dados relacionais
!pip install -q dotenv > /dev/null # Manipulação de variáveis de ambiente

# Importações das bibliotecas
import pandas as pd  # Manipulação de DataFrames
import numpy as np  # Cálculos numéricos e matrizes
import matplotlib.pyplot as plt  # Geração de gráficos
import sqlite3  # Integração com banco de dados SQLite

# Pré-processamento dos dados
from sklearn.preprocessing import StandardScaler, MinMaxScaler  # Normalização e padronização dos dados
from sklearn.model_selection import train_test_split  # Divisão dos dados em treino e teste
from sklearn.metrics import mean_absolute_error, mean_squared_error  # Avaliação do desempenho do modelo

# Construção do modelo preditivo
from keras.models import Sequential  # Modelo sequencial de rede neural
from keras.layers import Dense  # Camada densa para aprendizado profundo

# Controle de versão e variáveis de ambiente
import git  # Gerenciamento do repositório Git
import dotenv  # Carregamento de variáveis de ambiente

import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
import matplotlib.pyplot as plt
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import warnings

warnings.filterwarnings("ignore")  # Oculta todos os warnings


In [None]:
# Capturamdo todas as dependencias do ambiente nesta primeira etapa
!pip freeze > requirements.txt

In [None]:
%%writefile /content/configurar_git.py
# Configurar Git e sincronizar com GitHub
import os
from dotenv import load_dotenv

def git_config():
    """Configura o Git localmente e sincroniza com o repositório remoto no GitHub."""

    # Carregar variáveis do ambiente
    load_dotenv(dotenv_path='/content/.env')

    GITHUB_USERNAME = os.getenv('GITHUB_USERNAME')
    EMAIL = os.getenv('EMAIL')
    GITHUB_TOKEN = os.getenv('GITHUB_TOKEN')
    PROJECT_NAME = os.getenv('PROJECT_NAME')

    if not all([GITHUB_USERNAME, EMAIL, GITHUB_TOKEN, PROJECT_NAME]):
        raise ValueError("Variáveis de ambiente faltando. Verifique o arquivo .env.")

    REPO_URL = f"https://{GITHUB_USERNAME}:{GITHUB_TOKEN}@github.com/{GITHUB_USERNAME}/{PROJECT_NAME}.git"

    # Configurações globais do Git
    os.system(f'git config --global user.name "{GITHUB_USERNAME}"')
    os.system(f'git config --global user.email "{EMAIL}"')

    if os.path.isdir(PROJECT_NAME):
        print(f"[INFO] Diretório '{PROJECT_NAME}' já existe. Sincronizando...")

        os.chdir(PROJECT_NAME)

        os.system("git init")
        os.system("git remote remove origin || true")
        os.system(f"git remote add origin {REPO_URL}")
        os.system("git fetch origin")
        os.system("git checkout -B main")
        os.system("git pull origin main --allow-unrelated-histories --no-rebase")
    else:
        print(f"[INFO] Clonando o repositório '{PROJECT_NAME}'...")

        os.system(f"git clone {REPO_URL}")
        os.chdir(PROJECT_NAME)

    print(f"[SUCESSO] Git configurado e sincronizado com: {REPO_URL}")

if __name__ == "__main__":
    git_config()


In [None]:
#@title Sincronizando repositório
!python /content/configurar_git.py


In [None]:
#@title ## Definindo estrutura de pastas inicial do projeto

"""
Estrutura inicial do repositório Piloto_Day_Trade:

|- notebooks/         → Jupyter Notebooks para exploração e análises
|- scripts/           → Funções reutilizáveis (pré-processamento, modelagem, avaliação, etc.)
   |-modelagem_machine_learning
   |-operacional
   |-pipeline
|- data/              → Dados organizados em 3 níveis:
   |- raw/            → Dados brutos extraídos diretamente de fontes externas
   |- cleaned/        → Dados limpos com tratamento básico (ex: datas, nulos, nomes de colunas)
   |- transformed/    → Dados com features criadas e prontos para modelagem
   |- prepared/          → Dados prontos para serem utilizados no modelo
|- modelagem/         → Modelagem do banco de dados.
   |- database/       → Banco de dados
   |- catalog/        → Catálogo de dados
   |- esquema/        → Esquema do banco de dados
|- .github/
   |-workflows
|- models/            → Modelos treinados
|- reports/           → Resultados, gráficos, relatórios de performance
|- MVP_Objetivo.md    → Documento explicando o objetivo do projeto
|- README.md          → Instruções gerais do projeto
|-.env                → Variáveis de ambiente e configurações sensíveis
|- requirements.txt   → Lista de dependências
|- LICENSE            → Licença do projeto
"""

# Criar estrutura de diretórios
%cd /content/

!mkdir -p /content/Piloto_Day_Trade/notebooks
!mkdir -p /content/Piloto_Day_Trade/scripts/modelagem_machine_learning
!mkdir -p /content/Piloto_Day_Trade/scripts/operacional
!mkdir -p /content/Piloto_Day_Trade/scripts/pipeline
!mkdir -p /content/Piloto_Day_Trade/data/raw
!mkdir -p /content/Piloto_Day_Trade/data/cleaned
!mkdir -p /content/Piloto_Day_Trade/data/transformed
!mkdir -p /content/Piloto_Day_Trade/data/prepared
!mkdir -p /content/Piloto_Day_Trade/modelagem/catalog
!mkdir -p /content/Piloto_Day_Trade/modelagem/esquema
!mkdir -p /content/Piloto_Day_Trade/modelagem/database
!mkdir -p /content/Piloto_Day_Trade/models
!mkdir -p /content/Piloto_Day_Trade/reports
!mkdir -p /content/Piloto_Day_Trade/.github/workflows

# Criar arquivos principais
!touch /content/Piloto_Day_Trade/.gitignore
!touch /content/Piloto_Day_Trade/README.md

# Mover arquivos existentes
!mv /content/.env /content/Piloto_Day_Trade/.env
!mv /content/configurar_git.py /content/Piloto_Day_Trade/scripts/operacional/configurar_git.py
!mv /content/requirements.txt /content/Piloto_Day_Trade/requirements.txt
!mv /content/MVP_Objetivo.md /content/Piloto_Day_Trade/MVP_Objetivo.md

# Conferir estrutura de diretórios
!apt-get install tree -y > /dev/null 2>&1 # Instala o tree e oculta a saida da instalação
!tree /content/Piloto_Day_Trade -d



In [None]:
%%writefile /content/Piloto_Day_Trade/scripts/operacional/atualizar_repo.py

# Função operacional para atualizar o repositório remoto com segurança e validação
from git import Repo, GitCommandError

def atualizar_repo(mensagem_commit="Atualizações automáticas via Colab"):
    """
    Envia alterações locais para o repositório remoto GitHub.
    - Detecta alterações
    - Adiciona, comita e envia
    - Valida o sucesso do push
    """
    repo_path = "/content/Piloto_Day_Trade"
    try:
        repo = Repo(repo_path)

        if repo.is_dirty(untracked_files=True):
            print("[INFO] Alterações locais detectadas. Enviando para o GitHub...")

            repo.git.add(all=True)
            repo.index.commit(mensagem_commit)
            origin = repo.remote(name='origin')
            result = origin.push(refspec='main')

            for res in result:
                if res.flags & res.ERROR:
                    print(f"[ERRO] Falha ao enviar: {res.summary}")
                else:
                    print(f"[SUCESSO] Push concluído: {res.summary}")
        else:
            print("[INFO] Nenhuma alteração detectada. Nada a enviar.")

    except GitCommandError as ge:
        print(f"[ERRO] Comando Git falhou: {ge}")
    except Exception as e:
        print(f"[ERRO] Falha inesperada ao enviar alterações: {e}")

if __name__ == "__main__":
    atualizar_repo()


In [None]:
# %%writefile /content/scanner_git.py

import os
from dotenv import load_dotenv
import git

def imprimir(msg):
    print(f"[INFO] {msg}")

def erro(msg):
    print(f"[ERRO] {msg}")

def scanner_git():
    imprimir("Carregando variáveis de ambiente...")
    load_dotenv('/content/.env')

    GITHUB_USERNAME = os.getenv('GITHUB_USERNAME')
    GITHUB_TOKEN = os.getenv('GITHUB_TOKEN')
    EMAIL = os.getenv('EMAIL')
    PROJECT_NAME = os.getenv('PROJECT_NAME')

    if not all([GITHUB_USERNAME, GITHUB_TOKEN, EMAIL, PROJECT_NAME]):
        erro("Variáveis ausentes no .env. Verifique GITHUB_USERNAME, GITHUB_TOKEN, EMAIL, PROJECT_NAME.")
        return

    REPO_URL = f"https://{GITHUB_USERNAME}:{GITHUB_TOKEN}@github.com/{GITHUB_USERNAME}/{PROJECT_NAME}.git"

    imprimir(f"Usuário: {GITHUB_USERNAME}")
    imprimir(f"Email: {EMAIL}")
    imprimir(f"Repositório: {REPO_URL}")

    try:
        repo_path = f"/content/{PROJECT_NAME}"
        imprimir(f"Tentando abrir repositório em {repo_path}...")
        repo = git.Repo(repo_path)
    except Exception as e:
        erro(f"Não foi possível abrir o repositório local: {e}")
        return

    try:
        # Mostrar status do repositório
        imprimir("Verificando status do repositório local...")
        if repo.is_dirty(untracked_files=True):
            imprimir("Alterações locais detectadas.")
        else:
            imprimir("Sem alterações locais pendentes.")

        imprimir("Arquivos modificados:")
        print(repo.git.status())

        # Verificar branch atual
        branch = repo.active_branch.name
        imprimir(f"Branch atual: {branch}")

        # Remoto
        origin = repo.remote(name='origin')
        imprimir(f"Remoto configurado: {origin.url}")

        # Verificar diferenças locais vs remoto
        imprimir("Comparando local com remoto (git fetch)...")
        origin.fetch()
        behind = repo.iter_commits(f'{branch}..origin/{branch}')
        ahead = repo.iter_commits(f'origin/{branch}..{branch}')

        num_behind = sum(1 for _ in behind)
        num_ahead = sum(1 for _ in ahead)

        imprimir(f"Commits atrás do remoto: {num_behind}")
        imprimir(f"Commits à frente do remoto: {num_ahead}")

        # Pull do remoto
        imprimir("Tentando sincronizar com remoto (pull)...")
        origin.pull(branch)

        # Push (se houver commits locais)
        if num_ahead > 0 or repo.is_dirty(untracked_files=True):
            imprimir("Fazendo commit e push de alterações...")
            repo.git.add(all=True)
            repo.index.commit("Commit automático via scanner")
            push_result = origin.push(branch)

            for res in push_result:
                if res.flags & res.ERROR:
                    erro(f"Erro ao enviar: {res.summary}")
                else:
                    imprimir(f"Push realizado com sucesso: {res.summary}")
        else:
            imprimir("Nada a enviar. Tudo sincronizado.")

    except Exception as e:
        erro(f"Erro inesperado durante sincronização: {e}")

if __name__ == "__main__":
    scanner_git()


In [None]:
%cd /content/Piloto_Day_Trade/scripts/operacional
!python atualizar_repo.py

In [None]:
%%writefile /content/Piloto_Day_Trade/scripts/pipeline/extracao_dadosvf.py

#@title  Extração de dados

import yfinance as yf
import pandas as pd
from datetime import datetime, timedelta
import os
import dotenv
import logging

logging.getLogger("yfinance").setLevel(logging.CRITICAL)
dotenv.load_dotenv()

"""
Essa função tem como objetivo extrair dados históricos de um ativo financeiro do Yahoo Finance
com controle de incremental, salvando tudo em um CSV que serve como base bruta do pipeline.
Etapas:
- Define intervalo de coleta:
- Início: hoje menos dias
- Fim: ontem às 18:10 (ajustado para o fechamento)
- Verifica se já existem dados anteriores salvos (dados_brutos.csv):
- Se sim, tenta encontrar a última data registrada válida e usa como novo início e(xtração incremental).
- Faz a requisição ao Yahoo Finance (yf.download), no intervalo necessário.
- Ajusta o fuso horário dos dados para "America/Sao_Paulo".
- Mescla os dados novos aos antigos (se houver), remove duplicatas e linhas com muitos nulos.
- Salva o conjunto atualizado no CSV.
- Retorna o conjunto de dados atualizado.

"""

def extrair_dados(ticker, dias, intervalo, dados_brutos):
    """Extrai e organiza dados do Yahoo Finance no intervalo correto."""
    df_total = pd.DataFrame()
    data_inicio = data_inicio = (datetime.today() - timedelta(days=dias)).date()
    data_fim = datetime.now().replace(hour=18, minute=10, second=0, microsecond=0) - timedelta(days=1)

    primeira_extracao = True

    if os.path.exists(dados_brutos) and os.path.getsize(dados_brutos) > 0:
        df_temp = pd.read_csv(
            dados_brutos,
            index_col=0,
            parse_dates=True,
            date_format="%Y-%m-%d %H:%M:%S"
        )
        if not df_temp.empty:
            df_temp = df_temp.iloc[2:].copy()
            df_temp.index.name = 'data'
            df_temp = df_temp.reset_index()
            df_temp['data'] = pd.to_datetime(df_temp['data']).dt.date
            ultima_data = pd.to_datetime(df_temp['data'], errors='coerce').max()
            print(f"Última data encontrada: {ultima_data}")

            if pd.notnull(ultima_data):
                data_inicio = ultima_data + timedelta(days=1)
                primeira_extracao = False

    if primeira_extracao:
        print(f"\nPrimeira extração de dados.")
        print(f"Data de início: {data_inicio.strftime('%Y-%m-%d %H:%M:%S')}")
        print(f"Data de fim: {data_fim.strftime('%Y-%m-%d %H:%M:%S')}\n")
    else:
        print(f"\nExtração complementar a partir de {data_inicio.strftime('%Y-%m-%d %H:%M:%S')}.")
        print(f"Data de fim: {data_fim.strftime('%Y-%m-%d %H:%M:%S')}\n")

    df_novo = yf.download(
        ticker,
        start=data_inicio.strftime("%Y-%m-%d"),
        end=data_fim.strftime("%Y-%m-%d"),
        interval=intervalo,
        progress=False
    )


    if not df_novo.empty:
        df_novo.index = (
            df_novo.index.tz_convert("America/Sao_Paulo")
            if df_novo.index.tz is not None
            else df_novo.index.tz_localize("UTC").tz_convert("America/Sao_Paulo")
        )

        if os.path.exists(dados_brutos) and not primeira_extracao:
            df_antigo = pd.read_csv(dados_brutos, index_col=0, parse_dates=True)
            df_total = pd.concat([df_antigo, df_novo])
        else:
            df_total = df_novo

        df_total = (
            df_total.drop_duplicates()
            .dropna(thresh=df_total.shape[1] * 0.5)

        )

        df_total.to_csv(dados_brutos)
        print("Dados somados e salvos com sucesso.")
    else:
        print("Nenhum dado complementar foi adicionado.")

    return df_total

if __name__ == "__main__":
    ticker = "BBDC4.SA"
    intervalo = "5m"
    dias = 45
    dados_brutos = "/content/Piloto_Day_Trade/data/raw/dados_brutos.csv"
    df = extrair_dados(ticker, dias, intervalo, dados_brutos)


In [None]:
# @title Executando extração de dados
from Piloto_Day_Trade.scripts.pipeline.extracao_dadosvf import extrair_dados

ticker = "BBDC4.SA"
intervalo = "5m"
dias = 45
dados_brutos = "/content/Piloto_Day_Trade/data/raw/dados_brutos.csv"
df = extrair_dados(ticker, dias, intervalo, dados_brutos)


In [None]:
df = pd.read_csv('/content/Piloto_Day_Trade/data/raw/dados_brutos.csv', index_col=0, parse_dates=True, dayfirst=True)

In [None]:
df.info()

In [None]:
%cd /content/Piloto_Day_Trade

from scripts.operacional.atualizar_repo import atualizar_repo

atualizar_repo("Extração de dados brutos")

In [None]:
%%writefile /content/Piloto_Day_Trade/scripts/pipeline/limpeza_dados.py

#@title ## Limpeza de dados

import pandas as pd

def limpeza_dados(df, path_dados_limpos):
    # Verificar se os dados estão corretos
    print("Dados originais:")
    print(df.head())
    print(df.info())

    # Remover as primeiras duas linhas (com 'Ticker' e 'Datetime')
    df = df.iloc[2:].copy()

    # Verificar após a remoção
    print("Após remoção das duas primeiras linhas:")
    print(df.head())

    # Garantir que o índice esteja no formato de data e hora (timezone UTC)
    df.index = pd.to_datetime(df.index, utc=True)

    # Definir o fuso horário como "America/Sao_Paulo"
    df.index = df.index.tz_convert("America/Sao_Paulo")

    # Remover a referência de fuso horário
    df.index = df.index.tz_localize(None)

    # Criar a coluna 'hora' com base no índice
    df['hora'] = df.index.strftime('%H:%M:%S')

    # Renomear o índice para 'data'
    df.index.name = 'data'

    # Resetar o índice para transformar o Datetime em uma coluna normal
    df = df.reset_index()

    # Verificar após a transformação do índice
    print("\nApós conversão de índice:")
    print(df.head())

    # Remover o horário da coluna 'data', mantendo apenas a data
    df['data'] = df['data'].dt.date

    # Mapeamento das colunas para nomes padronizados
    mapeamento_colunas = {
        'Close': 'fechamento',
        'High': 'maximo',
        'Low': 'minimo',
        'Open': 'abertura',
        'Volume': 'volume'
    }

    # Renomear as colunas
    df.rename(columns=mapeamento_colunas, inplace=True)

    # Converte e arredonda as colunas numéricas
    for col in ['abertura', 'minimo', 'maximo', 'fechamento']:
        df[col] = pd.to_numeric(df[col], errors='coerce').round(2)

    # Converte a coluna 'volume' para número inteiro
    df['volume'] = pd.to_numeric(df['volume'], errors='coerce', downcast='integer')

    # Reorganiza as colunas na ordem desejada
    df = df[['data', 'hora', 'abertura', 'minimo', 'maximo', 'fechamento', 'volume']]

    # Verificar após reorganizar as colunas
    print("\nApós reorganizar as colunas:")
    print(df.head())

    # Verificar e remover duplicatas mantendo a primeira ocorrência
    df = df.drop_duplicates(keep='first')

    # Remover as linhas com 50% ou mais de valores nulos
    df = df.dropna(thresh=df.shape[1] * 0.5)

    # Verificar após remoção de duplicatas e nulos
    print("\nApós remover duplicatas e nulos:")
    print(df.head())

    # Garantir que 'data' e 'hora' estejam no formato datetime
    df['data'] = pd.to_datetime(df['data'], format='%Y-%m-%d')
    df['hora'] = pd.to_datetime(df['hora'], format='%H:%M:%S').dt.time

    # Filtra apenas os dias úteis (segunda a sexta)
    df = df[df['data'].dt.weekday < 5]

    # Verificar após filtrar dias úteis
    print("\nApós filtrar apenas os dias úteis:")
    print(df.head())

    # Filtra apenas horários entre 09:55 e 18:05
    df = df[(df['hora'] >= pd.to_datetime('09:55:00').time()) &
            (df['hora'] <= pd.to_datetime('18:05:00').time())]

    # Verificar após filtrar o intervalo de horário
    print("\nApós filtrar o intervalo de horário (09:55-18:05):")
    print(df.head())

    # Caso o DataFrame fique vazio, informar o motivo
    if df.empty:
        print("O DataFrame ficou vazio após o filtro de horário. Verifique se os dados estão dentro do intervalo de 09:55-18:05.")
    else:
        print("\nLimpeza de dados concluída com sucesso.")

    # Ordenar os dados
    df = df.sort_values(["data", "hora"], ascending=[False, True])
    print("\nDados limpos e ordenados:")
    print(df.head(10))

    # Salva os dados limpos em CSV
    df.to_csv(path_dados_limpos, index=False)
    print(f"\nOs dados foram limpos e salvos em csv.")

    return df


if __name__ == "__main__":
    # Ler os dados brutos
    dados_brutos = pd.read_csv(f"/content/Piloto_Day_Trade/data/raw/dados_brutos.csv", index_col=0, parse_dates=True, dayfirst=True)
    path_dados_limpos = '/content/Piloto_Day_Trade/data/cleaned/dados_limpos.csv'
    # Aplicar limpeza nos dados
    df_limpo = limpeza_dados(dados_brutos, path_dados_limpos)




In [None]:
#@title Aplicando limpeza de dados
from Piloto_Day_Trade.scripts.pipeline.limpeza_dados import limpeza_dados

dados_brutos = pd.read_csv(f"/content/Piloto_Day_Trade/data/raw/dados_brutos.csv", index_col=0, parse_dates=True, dayfirst=True)
path_dados_limpos = '/content/Piloto_Day_Trade/data/cleaned/dados_limpos.csv'
df_limpo = limpeza_dados(dados_brutos, path_dados_limpos)



In [None]:
df_limpo = pd.read_csv('/content/Piloto_Day_Trade/data/cleaned/dados_limpos.csv')

In [None]:
df_limpo.info()

In [None]:
df_limpo.tail()

In [None]:
%%writefile /content/Piloto_Day_Trade/scripts/pipeline/transformacao_dados_versao_final.py

#@title Transformação de dados

"""
Script de Transformação de Dados - Piloto Day Trade

Este script realiza o processo de transformação dos dados financeiros, incluindo o cálculo de indicadores técnicos,
adição de características temporais e diárias, além de filtrar novos dados a partir da última data registrada.
"""

import os
import pandas as pd
import numpy as np
from dotenv import load_dotenv

load_dotenv()

def carregar_dados(arquivo):
    """
    Carrega dados de um arquivo CSV ou retorna o DataFrame se já for fornecido como argumento.

    Parâmetros:
    Caminho do arquivo ou DataFrame a ser carregado.

    Retorna:
    Dados carregados a partir do arquivo ou argumento.
    """
    if isinstance(arquivo, pd.DataFrame):
        return arquivo
    if not os.path.exists(arquivo):
        print(f"O arquivo {arquivo} não existe.")
        return pd.DataFrame()
    try:
        return pd.read_csv(arquivo, parse_dates=["data"])
    except Exception as e:
        print(f"Erro ao carregar {arquivo}: {e}")
        return pd.DataFrame()

def obter_ultima_data(df):
    """
    Retorna a última data registrada no DataFrame.

    Parâmetros:
    df (pd.DataFrame): O DataFrame a ser analisado.

    Retorna:
    datetime: Última data registrada no DataFrame ou None se não houver dados.
    """
    return df["data"].max() if "data" in df.columns and not df.empty else None

def filtrar_novos_dados(df, ultima_data):
    """
    Filtra os dados para obter apenas os registros com data posterior à última data fornecida.

    Parâmetros:
    O DataFrame a ser filtrado.
    A última data registrada nos dados transformados.

    Retorna:
    pd.DataFrame: Dados filtrados com data posterior à última data.
    """
    return df[df["data"] > ultima_data] if ultima_data else df

def calcular_indicadores(df):
    """
    Calcula indicadores técnicos financeiros e adiciona novas colunas ao DataFrame.

    Parâmetros:
    O DataFrame com dados financeiros que será transformado.

    Retorna:
    DataFrame com as colunas dos indicadores calculados.
    """
    # Verifica se o DataFrame possui as colunas necessárias para cálculo dos indicadores
    if df.empty or not all(c in df.columns for c in ['data', 'hora', 'abertura', 'minimo', 'maximo', 'fechamento', 'volume']):
        return df

    # Ordena os dados por data e hora
    df = df.sort_values(by=['data', 'hora'])

    # Calcula o retorno percentual
    df['retorno'] = df['fechamento'].pct_change()

    # Calcula volatilidade, médias móveis e outros indicadores
    df['volatilidade'] = df['retorno'].rolling(20).std()
    # Substituir os primeiros NaN por 0
    df['volatilidade'] = df['volatilidade'].fillna(0)
    df['SMA_10'] = df['fechamento'].rolling(10).mean()
    df['EMA_10'] = df['fechamento'].ewm(span=10).mean()
    df['MACD'] = df['fechamento'].ewm(span=12).mean() - df['fechamento'].ewm(span=26).mean()
    df['Signal_Line'] = df['MACD'].ewm(span=9).mean()

    # Cálculo do índice de força relativa (RSI)
    ganho = df['retorno'].clip(lower=0)
    perda = -df['retorno'].clip(upper=0)
    media_ganho = ganho.ewm(span=14).mean()
    media_perda = perda.ewm(span=14).mean() + 1e-10
    df['rsi'] = 100 - (100 / (1 + media_ganho / media_perda))

    # Cálculo do On-Balance Volume (OBV)
    df['OBV'] = (df['volume'] * np.sign(df['fechamento'].diff())).fillna(0).cumsum()

    # Cálculo do ADX e outros indicadores de tendência
    delta_high = df['maximo'].diff()
    delta_low = -df['minimo'].diff()
    plus_dm = np.where((delta_high > delta_low) & (delta_high > 0), delta_high, 0)
    minus_dm = np.where((delta_low > delta_high) & (delta_low > 0), delta_low, 0)
    tr1 = df['maximo'] - df['minimo']
    tr2 = abs(df['maximo'] - df['fechamento'].shift())
    tr3 = abs(df['minimo'] - df['fechamento'].shift())
    tr = np.max([tr1, tr2, tr3], axis=0)
    atr = pd.Series(tr).rolling(14).mean()
    df['ADX'] = 100 * abs((pd.Series(plus_dm).rolling(14).mean() - pd.Series(minus_dm).rolling(14).mean()) /
                          (pd.Series(plus_dm).rolling(14).mean() + pd.Series(minus_dm).rolling(14).mean() + 1e-10)).rolling(14).mean()

    # Calcula as Bandas de Bollinger
    df['BB_MA20'] = df['fechamento'].rolling(20).mean()
    df['BB_STD20'] = df['fechamento'].rolling(20).std()
    df['BB_upper'] = df['BB_MA20'] + (2 * df['BB_STD20'])
    df['BB_lower'] = df['BB_MA20'] - (2 * df['BB_STD20'])

    # Calcula o Stochastic Oscillator (%K e %D)
    low14 = df['minimo'].rolling(14).min()
    high14 = df['maximo'].rolling(14).max()
    df['%K'] = 100 * ((df['fechamento'] - low14) / (high14 - low14 + 1e-10))
    df['%D'] = df['%K'].rolling(3).mean()

    # Calcula o Commodity Channel Index (CCI)
    tp = (df['maximo'] + df['minimo'] + df['fechamento']) / 3
    cci_ma = tp.rolling(20).mean()
    cci_std = tp.rolling(20).std()
    df['CCI'] = (tp - cci_ma) / (0.015 * cci_std + 1e-10)

    # Inclui a volatilidade (ATR)
    df['ATR'] = atr

    # Cria colunas com lags de fechamento, retorno e volume
    for lag in range(1, 4):
        df[f'fechamento_lag{lag}'] = df['fechamento'].shift(lag)
        df[f'retorno_lag{lag}'] = df['retorno'].shift(lag)
        df[f'volume_lag{lag}'] = df['volume'].shift(lag)

    return df

def adicionar_features_temporais(df):
    """
    Adiciona características temporais ao DataFrame, como o dia da semana e hora numérica.

    Parâmetros:
    df (pd.DataFrame): O DataFrame com dados financeiros.

    Retorna:
    pd.DataFrame: DataFrame com as características temporais adicionadas.
    """
    if df.empty:
        return df

    # Adiciona o dia da semana e a previsão do dia da semana
    df['data'] = pd.to_datetime(df['data'], errors='coerce')
    df['dia_da_semana_entrada'] = df['data'].dt.weekday
    df.loc[df['dia_da_semana_entrada'] == 4, 'dia_da_semana_previsao'] = (df['data'] + pd.DateOffset(days=3)).dt.weekday

    # Adiciona hora e minuto
    if 'hora' in df.columns:
        df['hora'] = pd.to_datetime(df['hora'].astype(str), format='%H:%M:%S', errors='coerce').dt.time
        df['hora_num'] = df['hora'].apply(lambda x: x.hour if pd.notnull(x) else np.nan)
        df['minuto'] = df['hora'].apply(lambda x: x.minute if pd.notnull(x) else np.nan)
    else:
        df['hora_num'] = np.nan
        df['minuto'] = np.nan

    return df

def adicionar_features_diarias(df):
    """
    Adiciona características diárias, como fechamento, volume e máximos/mínimos diários.

    Parâmetros:
    df (pd.DataFrame): O DataFrame com dados financeiros.

    Retorna:
    pd.DataFrame: DataFrame com as características diárias adicionadas.
    """
    if df.empty:
        return df

    # Adiciona características diárias, agrupadas por data
    df['fechamento_dia'] = df.groupby('data')['fechamento'].transform('last')
    df['volume_dia'] = df.groupby('data')['volume'].transform('sum')
    df['maximo_dia'] = df.groupby('data')['maximo'].transform('max')
    df['minimo_dia'] = df.groupby('data')['minimo'].transform('min')

    # Adiciona as características do dia anterior
    df['fechamento_dia_anterior'] = df['fechamento_dia'].shift(1)
    df['volume_dia_anterior'] = df['volume_dia'].shift(1).fillna(0).astype(int)
    df['maximo_dia_anterior'] = df['maximo_dia'].shift(1)
    df['minimo_dia_anterior'] = df['minimo_dia'].shift(1)

    return df

def transformar_dados(dados_limpos, dados_transformados):
    """
    Função principal para carregar, filtrar, calcular indicadores e transformar os dados financeiros.

    Parâmetros:
    dados_limpos: Caminho para os dados limpos a serem processados.
    dados_transformados: Caminho onde os dados transformados serão salvos.

    Retorna:
    DataFrame com os dados transformados.
    """
    # Carrega os dados limpos e transformados
    df_transformado = carregar_dados(dados_transformados)
    df_limpo = carregar_dados(dados_limpos)

    # Obtem a última data dos dados transformados
    ultima_data = obter_ultima_data(df_transformado)

    # Filtra os dados limpos para obter apenas os novos dados
    novos_dados = filtrar_novos_dados(df_limpo, ultima_data)

    if not novos_dados.empty:
        # Calcula os indicadores e adiciona as features temporais e diárias
        novos_dados = calcular_indicadores(novos_dados)
        novos_dados = adicionar_features_temporais(novos_dados)
        novos_dados = adicionar_features_diarias(novos_dados)

        # Concatena os novos dados com os dados transformados existentes
        df_final = pd.concat([df_transformado, novos_dados], ignore_index=True) if not df_transformado.empty else novos_dados

        # Remove registros com valores ausentes
        linhas_antes = len(df_final)
        df_final = df_final.dropna(subset=['fechamento', 'retorno', 'SMA_10', 'EMA_10', 'MACD', 'rsi'])
        linhas_depois = len(df_final)
        print(f"Linhas perdidas no dropna: {linhas_antes - linhas_depois}")

        # Salva os dados transformados
        os.makedirs(os.path.dirname(dados_transformados), exist_ok=True)
        df_final.to_csv(dados_transformados, index=False)
        print(f"Dados transformados salvos em {dados_transformados} ({len(df_final)} registros)")
        return df_final

    else:
        print("Nenhum novo dado para processar.")
        return df_transformado

if __name__ == "__main__":
    # Define os caminhos dos arquivos de dados
    path_dados_limpos = '/content/Piloto_Day_Trade/data/cleaned/dados_limpos.csv'
    path_dados_transformados = '/content/Piloto_Day_Trade/data/transformed/dados_transformados.csv'

    # Executa a transformação de dados
    df_transformado = transformar_dados(path_dados_limpos, path_dados_transformados)


In [None]:
from Piloto_Day_Trade.scripts.pipeline.transformacao_dados_versao_final import transformar_dados

#@title Aplicando transformação de dados
path_dados_limpos = '/content/Piloto_Day_Trade/data/cleaned/dados_limpos.csv'
path_dados_transformados = '/content/Piloto_Day_Trade/data/transformed/dados_transformados.csv'
df_transformado = transformar_dados(path_dados_limpos, path_dados_transformados)
print(df_transformado.head(5))

In [None]:
df_transformado = pd.read_csv('/content/Piloto_Day_Trade/data/transformed/dados_transformados.csv')

In [None]:
df_transformado.info()

In [None]:
# Verificando os tipos de dados
df_transformado.dtypes

In [None]:
%%writefile /content/Piloto_Day_Trade/data/transformed/dados_transformados_doc.md

## Escolha das Features na Transformação dos Dados

O conjunto de dados transformado para o modelo de previsão intradiária de preços do ativo BBDC4 foi projetado para capturar diferentes aspectos do comportamento do mercado financeiro. Abaixo estão listadas as features selecionadas e as respectivas razões para sua inclusão:

### 1. **Retorno e Volatilidade**
- **retorno**: Variação percentual do preço de fechamento entre candles consecutivos.
- **volatilidade**: Desvio padrão do retorno em uma janela móvel, mede a instabilidade e o risco.

### 2. **Médias Móveis**
- **SMA_10**: Média móvel simples de 10 períodos do fechamento.
- **EMA_10**: Média móvel exponencial de 10 períodos, mais sensível a variações recentes.

### 3. **Indicadores de Tendência**
- **MACD**: Diferença entre a média móvel exponencial de 12 e 26 períodos.
- **Signal_Line**: Média móvel exponencial de 9 períodos do MACD.
- **ADX**: Índice direcional médio que quantifica a força da tendência.

### 4. **Indicadores de Momento**
- **RSI (Relative Strength Index)**: Índice de força relativa, indica sobrecompra ou sobrevenda.
- **CCI (Commodity Channel Index)**: Mede variações do preço em relação à média, útil para detectar desvios extremos.

### 5. **Indicadores de Volume**
- **volume**: Volume negociado no candle.
- **OBV (On-Balance Volume)**: Indicador cumulativo que relaciona variação de preço com volume.

### 6. **Bandas de Bollinger**
- **BB_upper**: Banda superior (média + 2 desvios).
- **BB_lower**: Banda inferior (média - 2 desvios).
- **BB_MA20**: Média móvel central de 20 períodos.
- **BB_STD20**: Desvio padrão usado no cálculo das bandas.

### 7. **Estocástico**
- **%K**: Oscilador estocástico baseado em máximas e mínimas.
- **%D**: Média móvel de 3 períodos de %K.

### 8. **ATR (Average True Range)**
- **ATR**: Média da faixa verdadeira, mede a amplitude de oscilação de preços.

### 9. **Lags Temporais**
- **fechamento_lag1, fechamento_lag2, fechamento_lag3**: Preços de fechamento anteriores.
- **retorno_lag1, retorno_lag2, retorno_lag3**: Retornos anteriores.
- **volume_lag1, volume_lag2, volume_lag3**: Volumes negociados anteriores.

### 10. **Variáveis Temporais**
- **dia_da_semana_entrada**: Dia da semana correspondente ao candle atual.
- **dia_da_semana_previsao**: Dia da semana do candle de previsão.
- **hora_num**: Representação numérica da hora do candle.
- **minuto**: Minuto do candle.

### 11. **Resumo Diário do Dia Anterior**
- **fechamento_dia_anterior**: Preço de fechamento do último candle do dia anterior.
- **volume_dia_anterior**: Volume total negociado no dia anterior.
- **maximo_dia_anterior**: Maior preço registrado no dia anterior.
- **minimo_dia_anterior**: Menor preço registrado no dia anterior.


In [None]:
%%writefile /content/Piloto_Day_Trade/scripts/operacional/diagnostico_qualidade_dados.py

# diagnostico_qualidade_dados.py

import pandas as pd
import os

def diagnosticar_qualidade_dados(df, path_output):
    """
    Gera diagnóstico de qualidade dos dados:
    duplicação, nulos, valores únicos, gaps e outliers.

    Parâmetros:
    df (pd.DataFrame): Conjunto de dados analisado.
    path_output (str): Caminho para salvar o relatório CSV.

    Retorna:
    None
    """
    os.makedirs(os.path.dirname(path_output), exist_ok=True)

    relatorio = []

    # Linhas duplicadas
    relatorio.append(["Duplicatas (linhas completas)", df.duplicated().sum()])

    # Gaps de tempo (se houver colunas data e hora)
    if {'data', 'hora'}.issubset(df.columns):
        try:
            df_ordenado = df.sort_values(['data', 'hora'])
            df_ordenado['timestamp'] = pd.to_datetime(df_ordenado['data'] + ' ' + df_ordenado['hora'])
            gaps = df_ordenado['timestamp'].diff().gt(pd.Timedelta(minutes=5)).sum()
            relatorio.append(["Gaps (>5min entre registros)", int(gaps)])
        except Exception as e:
            relatorio.append(["Erro ao verificar gaps", str(e)])
    else:
        relatorio.append(["Gaps", "Colunas 'data' e 'hora' não encontradas"])

    # Valores nulos
    for col in df.columns:
        pct_null = df[col].isnull().mean() * 100
        relatorio.append([f"Nulos (%) - {col}", round(pct_null, 2)])

    # Valores únicos
    for col in df.columns:
        unicos = df[col].nunique()
        relatorio.append([f"Valores únicos - {col}", unicos])

    # Outliers (IQR)
    for col in df.select_dtypes(include=['float64', 'int64']).columns:
        Q1 = df[col].quantile(0.25)
        Q3 = df[col].quantile(0.75)
        IQR = Q3 - Q1
        outliers = df[(df[col] < Q1 - 1.5 * IQR) | (df[col] > Q3 + 1.5 * IQR)]
        relatorio.append([f"Outliers (IQR) - {col}", len(outliers)])

    # Converter para DataFrame e salvar
    relatorio_df = pd.DataFrame(relatorio, columns=["Verificação", "Resultado"])
    relatorio_df.to_csv(path_output, index=False)
    print(f"Relatório salvo em: {path_output}")


In [None]:
#@title Aplicar dignoticar dados

import sys
sys.path.append("/content/Piloto_Day_Trade")

from Piloto_Day_Trade.scripts.operacional.diagnostico_qualidade_dados import diagnosticar_qualidade_dados
# Carregar dados a serem verificados

# Definir caminhos
df = pd.read_csv('/content/Piloto_Day_Trade/data/transformed/dados_transformados.csv')
path_output = '/content/Piloto_Day_Trade/reports/diagnostico_qualidade.csv'

# Aplicar função diagnostica
diagnosticar_qualidade_dados(df, path_output)


In [None]:
relatorio = pd.read_csv('/content/Piloto_Day_Trade/reports/diagnostico_qualidade.csv')
print(relatorio.head())

## Banco de dados

In [None]:
%%writefile /content/Piloto_Day_Trade/modelagem/esquema/esquema_dimensional.md

#@title Definição do esquema - Modelo Estrela
"""
O modelo estrela foi escolhido por sua simplicidade e clareza na organização dos dados para análise. Ele é ideal para consultas rápidas e análise preditiva. No nosso projeto, temos um único fato (preços OHLC) e múltiplas variáveis explicativas que os influenciam.

A estrutura facilita agregações temporais e análises do comportamento dos preços, sendo também eficiente para alimentar o pipeline de machine learning. Ao organizar as variáveis preditoras ao redor das medidas de preço, conseguimos isolar responsabilidades e tornar as análises mais precisas e escaláveis.
"""

## Tabela Fato: `fato_precos`
| Coluna         | Tipo   | Descrição                                   |
|----------------|--------|---------------------------------------------|
| id_fato_precos | int    | PK, identificador único da linha            |
| id_tempo       | int    | FK para a dimensão tempo                    |
| abertura       | float  | Preço de abertura                           |
| minimo         | float  | Preço mínimo                                |
| maximo         | float  | Preço máximo                                |
| fechamento     | float  | Preço de fechamento (variável alvo)         |

## Dimensão: `dim_tempo`
| Coluna                | Tipo   | Descrição                                 |
|------------------------|--------|-------------------------------------------|
| id_tempo              | int    | PK                                        |
| data                  | object | Data da observação                        |
| hora                  | object | Hora da observação                        |
| dia_da_semana_entrada | int    | Dia da semana da entrada (0=Seg, 6=Dom)   |
| hora_num              | int    | Hora como número inteiro                  |
| minuto                | int    | Minuto da observação                      |

## Dimensão: `dim_indicadores`
| Coluna       | Tipo   | Descrição                                       |
|--------------|--------|--------------------------------------------------|
| id_indicadores | int  | PK                                               |
| id_tempo     | int    | FK para a dimensão tempo                        |
| SMA_10       | float  | Média móvel simples de 10 períodos              |
| EMA_10       | float  | Média móvel exponencial de 10 períodos          |
| MACD         | float  | Moving Average Convergence Divergence           |
| Signal_Line  | float  | Linha de sinal do MACD                          |
| rsi          | float  | Índice de força relativa                        |
| OBV          | float  | On-Balance Volume                               |
| CCI          | float  | Commodity Channel Index                         |
| ATR          | float  | Average True Range                              |
| retorno      | float  | Retorno do período                              |
| volatilidade | float  | Volatilidade do período                         |

## Dimensão: `dim_lags`
| Coluna          | Tipo   | Descrição                                       |
|-----------------|--------|--------------------------------------------------|
| id_lags         | int    | PK                                              |
| id_tempo        | int    | FK para a dimensão tempo                        |
| fechamento_lag1 | float  | Fechamento no candle anterior (1 lag)           |
| retorno_lag1    | float  | Retorno do candle anterior (1 lag)              |
| volume_lag1     | float  | Volume do candle anterior (1 lag)               |
| fechamento_lag2 | float  | Fechamento dois candles atrás (2 lags)          |
| retorno_lag2    | float  | Retorno dois candles atrás (2 lags)             |
| volume_lag2     | float  | Volume dois candles atrás (2 lags)              |
| fechamento_lag3 | float  | Fechamento três candles atrás (3 lags)          |
| retorno_lag3    | float  | Retorno três candles atrás (3 lags)             |
| volume_lag3     | float  | Volume três candles atrás (3 lags)              |

## Dimensão: `dim_operacional`
| Coluna                 | Tipo   | Descrição                                      |
|------------------------|--------|------------------------------------------------|
| id_operacional         | int    | PK                                             |
| id_tempo               | int    | FK para a dimensão tempo                       |
| data_previsao          | object | Data prevista para o modelo                    |
| dia_da_semana_previsao | int    | Dia da semana da previsão                      |
| hora_num               | int    | Hora como número inteiro                       |
| minuto                 | int    | Minuto da observação                           |
| mercado_aberto         | int    | Indicador binário (1=aberto, 0=fechado)        |
| fechamento_dia         | float  | Fechamento do dia (valor agregado diário)      |
| volume_dia             | float  | Volume do dia (valor agregado diário)          |
| maximo_dia             | float  | Máximo do dia                                  |
| minimo_dia             | float  | Mínimo do dia                                  |
| fechamento_dia_anterior| float  | Fechamento do dia anterior                     |
| volume_dia_anterior    | float  | Volume do dia anterior                         |
| maximo_dia_anterior    | float  | Máximo do dia anterior                         |
| minimo_dia_anterior    | float  | Mínimo do dia anterior                         |


In [None]:
%%writefile /content/Piloto_Day_Trade/scripts/operacional/gerar_catalogo_dados_versao_final.py

#@title Script para geração do catálogo de dados

import pandas as pd
from pathlib import Path

def gerar_catalogo_dados(df: pd.DataFrame, path_output: str) -> None:
    """
    Gera o catálogo de dados a partir do DataFrame df,
    salvando em path_output no formato CSV.
    """

    # Detecta tipos das colunas
    col_types = df.dtypes.apply(lambda x: 'float' if 'float' in str(x) else ('int' if 'int' in str(x) else 'object')).to_dict()

    # Define tabelas com base nas colunas
    tabelas = {
        "fato_precos": {},
        "dim_tempo": {},
        "dim_indicadores": {},
        "dim_lags": {},
        "dim_operacional": {}
    }

    for col, tipo in col_types.items():
        if col == "id_fato_precos":
            tabelas["fato_precos"][col] = tipo
        elif col == "id_tempo":
            for t in ["dim_tempo", "dim_indicadores", "dim_lags", "dim_operacional"]:
                tabelas[t][col] = tipo
        elif "abertura" in col or "fechamento" in col or "minimo" in col or "maximo" in col:
            tabelas["fato_precos"][col] = tipo
        elif any(ind in col for ind in ["SMA", "EMA", "MACD", "Signal", "rsi", "OBV", "volatilidade", "retorno"]):
            tabelas["dim_indicadores"][col] = tipo
        elif "lag" in col:
            tabelas["dim_lags"][col] = tipo
        elif col in ["data", "hora", "dia_da_semana_entrada"]:
            tabelas["dim_tempo"][col] = tipo
        elif col in ["data_previsao", "dia_da_semana_previsao", "hora_num", "minuto", "mercado_aberto",
                     "fechamento_dia", "volume_dia", "maximo_dia", "minimo_dia",
                     "fechamento_dia_anterior", "volume_dia_anterior", "maximo_dia_anterior", "minimo_dia_anterior"]:
            tabelas["dim_operacional"][col] = tipo

    def dominio(col, tipo):
        if tipo in ["float", "int"]:
            if "retorno" in col:
                return "-0.05 a 0.05 (retorno percentual por intervalo de 5 min)"
            elif "volatilidade" in col:
                return "0 a 0.1 (desvio padrão do retorno por janela de tempo)"
            elif "abertura" in col or "fechamento" in col or "minimo" in col or "maximo" in col:
                return "10.0 a 50.0 (valores típicos para BBDC4)"
            elif "MACD" in col or "Signal" in col:
                return "-5 a 5"
            elif "rsi" in col:
                return "0 a 100"
            elif "OBV" in col:
                return "valor acumulativo, depende do ativo"
            elif "volume" in col:
                return "0 a 1.000.000 (valores inteiros positivos)"
            elif "dia_da_semana" in col:
                return "0=Segunda, ..., 6=Domingo"
            elif "mercado_aberto" in col:
                return "0=Fechado, 1=Aberto"
            else:
                return "valores numéricos contínuos"
        elif tipo == "object":
            if "data" in col:
                return "formato YYYY-MM-DD"
            elif "hora" in col:
                return "formato HH:MM:SS"
            else:
                return "texto livre"
        return "não especificado"

    def descricao(col):
        descricoes = {
            "abertura": "Preço de abertura do ativo BBDC4 no intervalo de 5 minutos",
            "minimo": "Menor preço do ativo BBDC4 no intervalo de 5 minutos",
            "maximo": "Maior preço do ativo BBDC4 no intervalo de 5 minutos",
            "fechamento": "Preço de fechamento do ativo BBDC4 no intervalo de 5 minutos",
            "retorno": "Retorno percentual do ativo no intervalo de 5 minutos",
            "volatilidade": "Volatilidade dos retornos do ativo em janela deslizante",
            "SMA_10": "Média móvel simples de 10 períodos calculada sobre os preços",
            "EMA_10": "Média móvel exponencial de 10 períodos",
            "MACD": "Moving Average Convergence Divergence, indicador técnico",
            "Signal_Line": "Linha de sinal do MACD",
            "rsi": "Índice de força relativa (RSI), oscilador técnico",
            "OBV": "On Balance Volume, indicador técnico baseado em volume",
            "hora_num": "Hora expressa como número inteiro",
            "minuto": "Minuto do intervalo de tempo",
            "mercado_aberto": "Indica se o mercado está aberto no horário (1) ou não (0)",
            "fechamento_dia": "Preço de fechamento do ativo no dia corrente",
            "volume_dia": "Volume de negociações do ativo no dia corrente",
            "maximo_dia": "Maior preço do ativo no dia corrente",
            "minimo_dia": "Menor preço do ativo no dia corrente",
            "fechamento_dia_anterior": "Preço de fechamento do ativo no dia anterior",
            "volume_dia_anterior": "Volume de negociações do ativo no dia anterior",
            "maximo_dia_anterior": "Maior preço do ativo no dia anterior",
            "minimo_dia_anterior": "Menor preço do ativo no dia anterior"
        }
        for key in descricoes:
            if key in col:
                return descricoes[key]
        if "lag" in col:
            return f"Valor defasado de {col.replace('_lag', '')}"
        if "dia_da_semana" in col:
            return "Dia da semana correspondente à data"
        if "id_" in col:
            return "Identificador único para relacionar com outras tabelas"
        return ""

    def tecnica(col):
        if any(ind in col for ind in ["SMA", "EMA", "MACD", "Signal", "rsi", "OBV"]):
            return "calculado internamente via engenharia de features técnicas"
        if "lag" in col:
            return "calculado como valor defasado (lag)"
        if col in ["data", "hora", "hora_num", "minuto", "dia_da_semana_entrada", "dia_da_semana_previsao"]:
            return "extraído de data/hora original"
        if col == "mercado_aberto":
            return "derivado da data/hora com base em calendário de mercado"
        return "cópia ou identificador"

    # Geração do catálogo
    linhas = []
    for tabela, colunas in tabelas.items():
        for col, tipo in colunas.items():
            linhas.append({
                "tabela": tabela,
                "coluna": col,
                "tipo": tipo,
                "descricao": descricao(col),
                "dominio": dominio(col, tipo),
                "tecnica": tecnica(col),
                "linhagem": "Fonte: Yahoo Finance via yfinance"
            })

    catalogo_df = pd.DataFrame(linhas)
    Path(path_output).parent.mkdir(parents=True, exist_ok=True)
    catalogo_df.to_csv(path_output, index=False)
    print(f"Catálogo de dados gerado em: {path_output}")


In [None]:
from Piloto_Day_Trade.scripts.operacional.gerar_catalogo_dados_versao_final import gerar_catalogo_dados

#@title Aplicar gerar catálogo de dados
df = pd.read_csv('/content/Piloto_Day_Trade/data/transformed/dados_transformados.csv')
gerar_catalogo_dados(df, "/content/Piloto_Day_Trade/modelagem/catalog/catalogo_de_dados_vf.csv")
print("Catálogo de dados gerado com sucesso!")

In [None]:
%%writefile /content/Piloto_Day_Trade/scripts/operacional/criar_banco_dimensional_vf.py
# @title Script para criar banco e tabelas

import sqlite3
import os

def criar_banco(db_path):
    os.makedirs(os.path.dirname(db_path), exist_ok=True)
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    sql_script = """
    -- Tabela Fato
    CREATE TABLE IF NOT EXISTS fato_precos (
        id_fato_precos INTEGER PRIMARY KEY,
        id_tempo INTEGER,
        abertura REAL,
        minimo REAL,
        maximo REAL,
        fechamento REAL,
        volume REAL,
        fechamento_dia REAL,  -- Novo campo de fechamento diário
        volume_dia REAL,      -- Novo campo de volume diário
        maximo_dia REAL,     -- Novo campo de máximo diário
        minimo_dia REAL,     -- Novo campo de mínimo diário
        FOREIGN KEY (id_tempo) REFERENCES dim_tempo(id_tempo)
    );

    -- Dimensão Tempo
    CREATE TABLE IF NOT EXISTS dim_tempo (
        id_tempo INTEGER PRIMARY KEY,
        data TEXT,
        hora TEXT,
        dia_da_semana_entrada INTEGER
    );

    -- Dimensão Indicadores Técnicos
    CREATE TABLE IF NOT EXISTS dim_indicadores (
        id_indicadores INTEGER PRIMARY KEY,
        id_tempo INTEGER,
        SMA_10 REAL,
        EMA_10 REAL,
        MACD REAL,
        Signal_Line REAL,
        rsi REAL,
        OBV REAL,
        retorno REAL,
        volatilidade REAL,
        fechamento_dia REAL,  -- Novo campo de fechamento diário
        volume_dia REAL,      -- Novo campo de volume diário
        maximo_dia REAL,     -- Novo campo de máximo diário
        minimo_dia REAL,     -- Novo campo de mínimo diário
        FOREIGN KEY (id_tempo) REFERENCES dim_tempo(id_tempo)
    );

    -- Dimensão Lags
    CREATE TABLE IF NOT EXISTS dim_lags (
        id_lags INTEGER PRIMARY KEY,
        id_tempo INTEGER,
        fechamento_lag1 REAL,
        retorno_lag1 REAL,
        volume_lag1 REAL,
        fechamento_lag2 REAL,
        retorno_lag2 REAL,
        volume_lag2 REAL,
        fechamento_lag3 REAL,
        retorno_lag3 REAL,
        volume_lag3 REAL,
        FOREIGN KEY (id_tempo) REFERENCES dim_tempo(id_tempo)
    );

    -- Dimensão Operacional
    CREATE TABLE IF NOT EXISTS dim_operacional (
        id_operacional INTEGER PRIMARY KEY,
        id_tempo INTEGER,
        dia_da_semana_previsao INTEGER,
        hora_num INTEGER,
        minuto INTEGER,
        mercado_aberto INTEGER,
        fechamento_dia_anterior REAL,  -- Novo campo de fechamento diário anterior
        volume_dia_anterior REAL,      -- Novo campo de volume diário anterior
        maximo_dia_anterior REAL,     -- Novo campo de máximo diário anterior
        minimo_dia_anterior REAL,     -- Novo campo de mínimo diário anterior
        FOREIGN KEY (id_tempo) REFERENCES dim_tempo(id_tempo)
    );
    """

    cursor.executescript(sql_script)
    conn.commit()
    conn.close()
    print("Banco e tabelas criados com sucesso.")

if __name__ == "__main__":
    db_path = "/content/Piloto_Day_Trade/modelagem/database/banco_dimensional_vf.db"
    criar_banco(db_path)


In [None]:
#@title Excecutar criar banco e tabelas
!python /content/Piloto_Day_Trade/scripts/operacional/criar_banco_dimensional_vf.py

In [None]:
atualizar_repo("Gerando catálogo de dados")

In [None]:
# Verificar criação do banco
import sqlite3

conn = sqlite3.connect("/content/Piloto_Day_Trade/modelagem/database/banco_dimensional.db")
cursor = conn.cursor()

cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
print(cursor.fetchall())

conn.close()


In [None]:
# Verificar colunas
import sqlite3

conn = sqlite3.connect("/content/Piloto_Day_Trade/modelagem/database/banco_dimensional_vf.db")
cursor = conn.cursor()
cursor.execute("PRAGMA table_info(dim_tempo);")
print(cursor.fetchall())

conn.close()


In [None]:
# @title Carga de dados
%%writefile /content/Piloto_Day_Trade/scripts/pipeline/carga_dados_vf.py

# @title Carga de dados

import sqlite3
import pandas as pd
import os

def carregar_dados(df: pd.DataFrame, db_path: str):
    # Lista de colunas obrigatórias que devem existir no DataFrame
    colunas_obrigatorias = [
        'data', 'hora', 'dia_da_semana_entrada',
        'SMA_10', 'EMA_10', 'MACD', 'Signal_Line', 'rsi', 'OBV', 'retorno', 'volatilidade',
        'fechamento_lag1', 'retorno_lag1', 'volume_lag1',
        'fechamento_lag2', 'retorno_lag2', 'volume_lag2',
        'fechamento_lag3', 'retorno_lag3', 'volume_lag3',
        'hora_num', 'minuto', 'abertura', 'minimo', 'maximo', 'fechamento'
    ]

    # Verifica se todas as colunas obrigatórias estão presentes no DataFrame
    colunas_faltantes = [col for col in colunas_obrigatorias if col not in df.columns]
    if colunas_faltantes:
        raise ValueError(f"Colunas ausentes no DataFrame: {colunas_faltantes}")

    # Conexão com o banco de dados SQLite
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    registros_inseridos = 0

    # Loop para inserir os dados do DataFrame nas tabelas do banco
    for _, row in df.iterrows():
        # Verifica se já existe um registro com a mesma data e hora
        cursor.execute("SELECT id_tempo FROM dim_tempo WHERE data = ? AND hora = ?", (row['data'], row['hora']))
        resultado = cursor.fetchone()
        if resultado:
            continue  # Se o registro já existir, pula para o próximo

        # Obtém o ID da tabela dim_tempo
        cursor.execute("SELECT MAX(id_tempo) FROM dim_tempo")
        max_id = cursor.fetchone()[0]
        id_tempo = 1 if max_id is None else max_id + 1

        # 1. Inserção na tabela dim_tempo (data, hora, dia da semana)
        cursor.execute("""
            INSERT INTO dim_tempo (id_tempo, data, hora, dia_da_semana_entrada)
            VALUES (?, ?, ?, ?)
        """, (id_tempo, row['data'], row['hora'], row['dia_da_semana_entrada']))

        # 2. Inserção na tabela dim_indicadores (indicadores técnicos como SMA, EMA, etc.)
        cursor.execute("""
            INSERT INTO dim_indicadores (id_indicadores, id_tempo, SMA_10, EMA_10, MACD, Signal_Line, rsi, OBV, retorno, volatilidade, fechamento_dia, volume_dia, maximo_dia, minimo_dia)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            id_tempo, id_tempo, row['SMA_10'], row['EMA_10'], row['MACD'],
            row['Signal_Line'], row['rsi'], row['OBV'], row['retorno'], row['volatilidade'],
            row.get('fechamento_dia', None), row.get('volume_dia', None),
            row.get('maximo_dia', None), row.get('minimo_dia', None)
        ))

        # 3. Inserção na tabela dim_lags (valores defasados de fechamento, retorno e volume)
        cursor.execute("""
            INSERT INTO dim_lags (
                id_lags, id_tempo,
                fechamento_lag1, retorno_lag1, volume_lag1,
                fechamento_lag2, retorno_lag2, volume_lag2,
                fechamento_lag3, retorno_lag3, volume_lag3
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            id_tempo, id_tempo,
            row['fechamento_lag1'], row['retorno_lag1'], row['volume_lag1'],
            row['fechamento_lag2'], row['retorno_lag2'], row['volume_lag2'],
            row['fechamento_lag3'], row['retorno_lag3'], row['volume_lag3']
        ))

        # 4. Inserção na tabela dim_operacional (informações operacionais como hora e minuto)
        cursor.execute("""
            INSERT INTO dim_operacional (
                id_operacional, id_tempo,
                dia_da_semana_previsao, hora_num, minuto, mercado_aberto,
                fechamento_dia_anterior, volume_dia_anterior, maximo_dia_anterior, minimo_dia_anterior
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            id_tempo, id_tempo,
            row.get('dia_da_semana_previsao', row['dia_da_semana_entrada']),
            row['hora_num'], row['minuto'],
            row.get('mercado_aberto', 1),
            row.get('fechamento_dia_anterior', None), row.get('volume_dia_anterior', None),
            row.get('maximo_dia_anterior', None), row.get('minimo_dia_anterior', None)
        ))

        # 5. Inserção na tabela fato_precos (preços e volume)
        cursor.execute("""
            INSERT INTO fato_precos (id_fato_precos, id_tempo, abertura, minimo, maximo, fechamento, fechamento_dia, volume_dia, maximo_dia, minimo_dia)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            id_tempo, id_tempo, row['abertura'], row['minimo'], row['maximo'], row['fechamento'],
            row.get('fechamento_dia', None), row.get('volume_dia', None),
            row.get('maximo_dia', None), row.get('minimo_dia', None)
        ))

        registros_inseridos += 1  # Conta o número de registros inseridos

    # Commit das alterações no banco e fechamento da conexão
    conn.commit()
    conn.close()
    print(f"Carga incremental concluída. {registros_inseridos} novos registros inseridos.")

if __name__ == "__main__":
    # Caminho do banco de dados e arquivo CSV de dados transformados
    db_path = "/content/Piloto_Day_Trade/modelagem/database/banco_dimensional_vf.db"
    df_path = "/content/Piloto_Day_Trade/data/transformed/dados_transformados.csv"

    # Verifica se o arquivo CSV existe
    if not os.path.exists(df_path):
        raise FileNotFoundError(f"O arquivo {df_path} não existe.")

    # Carrega o DataFrame do arquivo CSV
    df = pd.read_csv(df_path)

    # Verifica se o DataFrame está vazio
    if df.empty:
        raise ValueError("O DataFrame carregado está vazio.")

    # Chama a função para carregar os dados no banco
    carregar_dados(df, db_path)



In [None]:
#@title Executar realizar a carga de dados
!python /content/Piloto_Day_Trade/scripts/pipeline/carga_dados_vf.py


In [None]:
# @title Verificando a carga de dados

import sqlite3

def verificar_carga_dados(db_path: str):
    """
    Função para verificar a carga de dados nas tabelas do banco dimensional.

    Parâmetros:
    - Caminho para o banco de dados.

    A função realiza uma consulta de contagem de registros em cada uma das principais tabelas
    do banco e imprime o número de registros encontrados.
    """

    # Conexão com o banco de dados
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()

    # Definir as tabelas principais a serem verificadas
    tabelas = ['dim_tempo', 'dim_indicadores', 'dim_lags', 'dim_operacional', 'fato_precos']

    # Laço para realizar a consulta de contagem de registros em cada tabela
    for tabela in tabelas:
        # Consulta SQL para contar o número de registros em cada tabela
        cursor.execute(f"SELECT COUNT(*) FROM {tabela}")
        count = cursor.fetchone()[0]  # Recupera o número de registros
        print(f"{tabela}: {count} registros")  # Exibe o resultado

    # Fechamento da conexão com o banco de dados
    conn.close()

# Caminho para o banco de dados
db_path = "/content/Piloto_Day_Trade/modelagem/database/banco_dimensional_vf.db"

# Chamada da função para verificar a carga de dados
verificar_carga_dados(db_path)


In [None]:
# Visualizar os primeiros 5 registros da fato_precos
import pandas as pd

conn = sqlite3.connect(db_path)
df_check = pd.read_sql_query("SELECT * FROM fato_precos LIMIT 5", conn)
print(df_check)
conn.close()


In [None]:
# Consulta com JOIN para verificar o relacionamento entre as tabelas
query = """
SELECT
    ft.id_fato_precos,
    dt.data,
    dt.hora,
    ft.abertura,
    ft.fechamento,
    di.SMA_10,
    dl.fechamento_lag1,
    do.dia_da_semana_previsao,  -- Corrigido para usar dia_da_semana_previsao
    do.mercado_aberto
FROM fato_precos ft
JOIN dim_tempo dt ON ft.id_tempo = dt.id_tempo
JOIN dim_indicadores di ON ft.id_tempo = di.id_tempo
JOIN dim_lags dl ON ft.id_tempo = dl.id_tempo
JOIN dim_operacional do ON ft.id_tempo = do.id_tempo
LIMIT 5;
"""

# Executando e exibindo
conn = sqlite3.connect(db_path)
df_verificacao = pd.read_sql_query(query, conn)
print(df_verificacao)
conn.close()


In [None]:
import sqlite3
import pandas as pd

# Ajustando a consulta para considerar os campos corretos
query = """
SELECT
    MAX(dt.data || ' ' || dt.hora) AS data_hora_mais_recente,
    MAX(ft.id_fato_precos) AS id_fato_mais_recente,
    MAX(do.dia_da_semana_previsao) AS dia_da_semana_previsao_mais_recente -- Corrigido para dia_da_semana_previsao
FROM fato_precos ft
JOIN dim_tempo dt ON ft.id_tempo = dt.id_tempo
JOIN dim_indicadores di ON ft.id_tempo = di.id_tempo
JOIN dim_lags dl ON ft.id_tempo = dl.id_tempo
JOIN dim_operacional do ON ft.id_tempo = do.id_tempo;
"""

# Executando e exibindo
conn = sqlite3.connect(db_path)
df_max_data = pd.read_sql_query(query, conn)
print("Datas mais recentes no banco:")
print(df_max_data)
conn.close()


In [None]:
#@title Escrevendo o Readme do projeto
%%writefile /content/Piloto_Day_Trade/README.md

# Objetivo do Projeto

## 1. Propósito do MVP

Este projeto tem como objetivo principal a criação de um pipeline para extração, transformação, carga, análise e previsão da movimentação intradiária dos preços de um ativo financeiro em intervalos de 5 minutos. O modelo preditivo central será baseado em redes neurais recorrentes (LSTM), mas outras abordagens serão exploradas. O MVP visa garantir previsões para embasar decisões estratégicas de day trade.

## 2. Problema a Ser Resolvido

A alta volatilidade dos mercados financeiros exige ferramentas robustas para antecipação de movimentos de preço. A dificuldade está em capturar padrões de curto prazo e projetá-los com precisão. Traders e investidores necessitam de um modelo que consiga interpretar os padrões históricos e transformá-los em previsões úteis.

## 3. Pipeline do Projeto

O pipeline está estruturado em sete etapas principais:

### 3.1. Extração e armazenamento dos dados brutos
- Coleta de dados históricos do ativo BBDC4 em intervalos de 5 minutos, via API do Yahoo Finance (yfinance).
- Armazenamento dos dados no GitHub sincronizado com Google Colab, com backup em nuvem.
- Salvo como `dados_brutos.csv`

### 3.2. Limpeza e organização dos dados
- Padronização dos tipos de dados
- Padronização dos nomes de colunas
- Remoção de valores nulos ou duplicados
- Remoção de colunas desnecessárias
- Ordenação cronológica
- Salvo como `dados_limpos.csv`

### 3.3. Transformação de dados e engenharia de features
- Cálculo de indicadores técnicos (SMA, EMA, MACD, RSI, OBV)
- Cálculo de retornos e variância (volatilidade)
- Criação de variáveis de lag de preço, volume e retorno
- Adição de variáveis temporais (hora, dia da semana, mercado aberto)
- Salvo como `dados_transformados.csv`

### 3.4. Modelagem e estruturação do banco dimensional
- **Fato**: `fato_precos` com preços e chave para `dim_tempo`
- **Dimensões**:
  - `dim_tempo`: data, hora, dia da semana
  - `dim_indicadores`: indicadores técnicos
  - `dim_lags`: lags de preço, volume, retorno
  - `dim_operacional`: hora, minuto, data da previsão, mercado aberto
- Banco gerado em SQLite via script automatizado (`banco_dimensional.db`)

### 3.5. Carga (ETL)
- **Extração:** via API (automatizada)
- **Limpeza:** padronização, remoção de nulos/duplicatas
- **Transformação:** features técnicas e derivadas
- **Carga:** população das tabelas do banco dimensional
- ETL organizado em scripts Python e automatizado

### 3.6. Treinamento e ajuste do modelo preditivo
- **Preparação dos dados**:
  - Padronização com StandardScaler para retornos e indicadores
  - Normalização com MinMaxScaler para preços e volumes
  - Separar features (X) e targets (y)
  - Divisão treino/teste com base em dias útis
- **Modelo base:** LSTM com duas camadas ocultas, camada densa e MSE como perda
- **Avaliação:** Métricas de MSE, R², comparação com targets reais

### 3.7. Análise dos resultados
- Comparativo entre preços previstos vs. reais
- Validação das previsões para abertura, máxima, mínima e fechamento
- Importância das variáveis
- Interpretação dos erros e possíveis melhorias

## 4. Perguntas a Serem Respondidas
- É possível prever com precisão a movimentação intradiária a cada 5 minutos?
- Os dados do dia anterior são suficientes para prever o comportamento do dia seguinte?
- O modelo LSTM é eficaz para padrões de curtíssimo prazo?
- É viável derivar os targets globais do dia a partir das previsões intradiárias?
- Quais indicadores mais contribuem para a previsão?
- Como lidar corretamente com fins de semana e feriados?
- A padronização/normalização das variáveis afeta o desempenho?

## 5. Critérios de Sucesso
- Pipeline funcional de extração → transformação → carga → previsão
- Modelo com bom desempenho em MSE e R²
- Targets globais coerentes com valores reais
- Correta gestão de datas (incluindo segundas-feiras)
- Previsões utilizáveis para tomada de decisão



In [None]:
#@title Escrevendo Licença do projeto
%%writefile /content/Piloto_Day_Trade/LICENSE

Copyright (c) 2025 Carolina Brescowitt

Todos os direitos reservados.

Este software é fornecido gratuitamente apenas para uso **pessoal, acadêmico e de pesquisa**.

O uso comercial deste software é estritamente proibido sem uma **licença comercial paga**, a ser negociada com o autor.

Empresas, startups, desenvolvedores ou qualquer entidade que deseje utilizar este código em produtos, serviços ou plataformas comerciais devem entrar em contato com o autor para **negociar os termos de licenciamento** (incluindo percentual, royalties ou valores fixos).

É proibida a redistribuição ou sublicenciamento sem autorização por escrito.

Para mais informações, entre em contato: carolbrescowitt@yahoo.com.br



In [None]:
atualizar_repo("Atualizando a licença do projeto")

# Modelagem MAchine Learning

In [None]:
df_transformado = pd.read_csv('/content/Piloto_Day_Trade/data/transformed/dados_transformados.csv')
df_transformado.head()

In [None]:
print(df_transformado.columns.tolist())


In [None]:
%%writefile /content/Piloto_Day_Trade/scripts/modelagem_machine_learning/preparar_dados_modelagem_LSTM_global.py

# @title Preparação dos dados para o modelo LSTM Global

import pandas as pd
import numpy as np
import joblib
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split

def preparar_dados_lstm_global(df, janela=16, test_size=0.2, val_size=0.1):
    """
    Prepara os dados para um modelo LSTM prever os alvos globais do próximo dia (mínimo, máximo, fechamento, volume),
    usando sequências de candles intradiários (ex: 5 em 5 minutos) do(s) dia(s) anterior(es).
    """

    # Ordenar temporalmente
    df = df.sort_values(['data', 'hora']).reset_index(drop=True)

    # Definir colunas
    features = [
        'abertura', 'minimo', 'maximo', 'fechamento', 'volume',
        'SMA_10', 'EMA_10', 'OBV', 'retorno', 'volatilidade',
        'MACD', 'Signal_Line', 'rsi', 'ADX', 'BB_MA20', 'BB_STD20',
        'BB_upper', 'BB_lower', '%K', '%D', 'CCI', 'ATR',
        'fechamento_lag1', 'retorno_lag1', 'volume_lag1',
        'fechamento_lag2', 'retorno_lag2', 'volume_lag2',
        'fechamento_lag3', 'retorno_lag3', 'volume_lag3',
        'hora_num', 'minuto'
    ]
    targets = ['minimo_dia', 'maximo_dia', 'fechamento_dia', 'volume_dia']

    # Normalização
    scaler_features = MinMaxScaler()
    scaler_targets = MinMaxScaler()
    X_scaled = scaler_features.fit_transform(df[features])
    y_scaled = scaler_targets.fit_transform(df[targets])

    # Construir DataFrames auxiliares
    df_seq = pd.DataFrame(X_scaled, columns=features)
    df_seq['data'] = df['data'].values
    df_targets = pd.DataFrame(y_scaled, columns=targets)
    df_targets['data'] = df['data'].values

    # Geração de sequências por dia
    X, y = [], []
    dias_unicos = df_seq['data'].unique()
    for i in range(len(dias_unicos) - 1):
        dia = dias_unicos[i]
        proximo = dias_unicos[i + 1]

        dados_dia = df_seq[df_seq['data'] == dia].drop(columns='data')
        alvo_proximo = df_targets[df_targets['data'] == proximo][targets]

        if len(dados_dia) >= janela and not alvo_proximo.empty:
            X.append(dados_dia.iloc[-janela:].values)
            y.append(alvo_proximo.iloc[0].values)

    X = np.array(X)
    y = np.array(y)

    # Salvar dados completos antes do split
    np.savez_compressed(
        '/content/Piloto_Day_Trade/models/LSTM/lstm_global_dataset_completo.npz',
        X=X, y=y
    )

    # Split sem embaralhar
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size, shuffle=False)
    X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=val_size, shuffle=False)

    # Salvar splits
    np.savez_compressed(
        '/content/Piloto_Day_Trade/models/LSTM/lstm_global_datasets.npz',
        X_train=X_train, X_val=X_val, X_test=X_test,
        y_train=y_train, y_val=y_val, y_test=y_test
    )

    # Salvar scalers
    joblib.dump(scaler_features, '/content/Piloto_Day_Trade/models/LSTM/scaler_features_lstm_global.save')
    joblib.dump(scaler_targets, '/content/Piloto_Day_Trade/models/LSTM/scaler_targets_lstm_global.save')

    return X_train, X_val, X_test, y_train, y_val, y_test, {
        'scaler_features': scaler_features,
        'scaler_targets': scaler_targets
    }

# Bloco de teste local (não roda se importado)
if __name__ == "__main__":
    df_transformado = pd.read_csv('/content/Piloto_Day_Trade/data/transformed/dados_transformados.csv')
    X_train, X_val, X_test, y_train, y_val, y_test, scalers = preparar_dados_lstm_global(df_transformado)

    print("Pré-visualização de uma sequência de entrada normalizada:")
    print(X_train[0])

    print("\nTarget correspondente (normalizado):")
    print(y_train[0])


In [None]:
%%writefile /content/Piloto_Day_Trade/scripts/modelagem_machine_learning/preparar_dados_modelagem_LSTM_intradiario.py

# @title Preparação dos dados para o modelo LSTM Intradiário

import os
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from collections import Counter
import joblib  # Para salvar os scalers

def preparar_dados_lstm_intradiario(
    df: pd.DataFrame,                    # DataFrame com os dados históricos intradiários
    colunas_features: list,              # Lista com nomes das colunas usadas como entrada (X)
    colunas_targets: list,               # Lista com nomes das colunas de saída (y)
    dias_entrada: int = 3,               # Quantos dias anteriores usar como entrada
    n_pontos_dia: int = None,            # Número de candles por dia (detectado automaticamente se não informado)
    verbose: bool = True                 # Imprimir ou não os logs de progresso
):
    """
    Prepara os dados normalizados e sequenciais para treinar LSTM em séries temporais intradiárias.

    Retorna:
        X: np.ndarray, formato (amostras, timesteps, features)
        y: np.ndarray, formato (amostras, timesteps, targets)
        datas_validas: list, datas correspondentes a cada amostra
    """
    df = df.copy()

    # Verifica se a coluna 'data' está presente
    if "data" not in df.columns:
        raise ValueError("A coluna 'data' é obrigatória.")

    # Verifica se todas as colunas necessárias estão no DataFrame
    for col in colunas_features + colunas_targets:
        if col not in df.columns:
            raise ValueError(f"Coluna '{col}' ausente no DataFrame.")

    # Converte 'data' para string (evita bugs de agrupamento por datetime)
    df["data"] = df["data"].astype(str)

    # Conta quantos candles existem por dia e determina o valor mais comum
    contagem_por_dia = df.groupby("data").size()
    valor_mais_comum = Counter(contagem_por_dia).most_common(1)[0][0]
    n_pontos_dia = n_pontos_dia or valor_mais_comum

    if verbose:
        print(f"[Info] Detectado {n_pontos_dia} candles por dia.")

    # Inicializa os normalizadores
    scaler_features = MinMaxScaler()
    scaler_targets = MinMaxScaler()

    # Aplica normalização
    df[colunas_features] = scaler_features.fit_transform(df[colunas_features])
    df[colunas_targets] = scaler_targets.fit_transform(df[colunas_targets])

    # Salva os dados normalizados em CSV
    path_norm = '/content/Piloto_Day_Trade/data/prepared/dados_normalizados.csv'
    os.makedirs(os.path.dirname(path_norm), exist_ok=True)
    df.to_csv(path_norm, index=False)
    print(df.head())
    if verbose:
        print(f"[Info] Dados normalizados salvos em: {path_norm}")

    # Salva os scalers para reutilizar depois (ex: prever novos dados ou fazer inverse_transform)
    scaler_path = '/content/Piloto_Day_Trade/models/LSTM'
    os.makedirs(scaler_path, exist_ok=True)
    joblib.dump(scaler_features, os.path.join(scaler_path, 'scaler_features.pkl'))
    joblib.dump(scaler_targets, os.path.join(scaler_path, 'scaler_targets.pkl'))
    if verbose:
        print(f"[Info] Scalers salvos em: {scaler_path}")

    # Gera as sequências de entrada (X) e saída (y) para cada dia válido
    dias_unicos = sorted(df["data"].unique())
    X, y, datas_validas = [], [], []

    for i in range(dias_entrada, len(dias_unicos)):
        dias_passados = dias_unicos[i - dias_entrada:i]  # Dias usados como entrada
        dia_target = dias_unicos[i]                      # Dia de saída (alvo)

        entradas = []
        sequencia_valida = True

        # Junta os dados dos dias anteriores
        for dia in dias_passados:
            dados_dia = df[df["data"] == dia][colunas_features].values
            if len(dados_dia) != n_pontos_dia:
                sequencia_valida = False
                break
            entradas.append(dados_dia)

        # Dados de saída (target)
        saida = df[df["data"] == dia_target][colunas_targets].values
        if len(saida) != n_pontos_dia:
            sequencia_valida = False

        # Se tudo certo, adiciona à lista final
        if sequencia_valida:
            X.append(np.vstack(entradas))
            y.append(saida)
            datas_validas.append(dia_target)
        elif verbose:
            print(f"[Aviso] Ignorado: {dia_target} com sequência incompleta.")

    # Converte listas em arrays numpy
    X = np.array(X)
    y = np.array(y)

    if verbose:
        print(f"[Info] X shape: {X.shape}, y shape: {y.shape}, Amostras válidas: {len(datas_validas)}")
        print("[Info] Preparação finalizada com sucesso.")

    # Salva os arrays como .npy para uso futuro
    np.save(os.path.join(scaler_path, 'X.npy'), X)
    np.save(os.path.join(scaler_path, 'y.npy'), y)

    return X, y, datas_validas

if __name__ == "__main__":
    import joblib

    # Caminho dos arquivos
    caminho_dados = '/content/Piloto_Day_Trade/data/transformed/dados_transformados.csv'
    caminho_scaler_features = '/content/Piloto_Day_Trade/models/LSTM/scaler_features.pkl'
    caminho_scaler_targets = '/content/Piloto_Day_Trade/models/LSTM/scaler_targets.pkl'

    # Lê o dataset transformado
    df_transformado = pd.read_csv(caminho_dados)

    # Define colunas de entrada (X) e saída (y)
    colunas_features = [
        'abertura', 'minimo', 'maximo', 'fechamento', 'volume',
        'SMA_10', 'EMA_10', 'OBV', 'retorno', 'volatilidade',
        'MACD', 'Signal_Line', 'rsi', 'ADX', 'BB_MA20', 'BB_STD20',
        'BB_upper', 'BB_lower', '%K', '%D', 'CCI', 'ATR'
    ]
    colunas_targets = ['minimo_dia', 'maximo_dia', 'fechamento_dia', 'volume_dia']

    # Executa o pipeline de preparação
    X, y, datas = preparar_dados_lstm_intradiario(
        df=df_transformado,
        colunas_features=colunas_features,
        colunas_targets=colunas_targets,
        dias_entrada=3,
        verbose=True
    )

    print("\n[Resumo Final]")
    print(f"X shape: {X.shape}")
    print(f"y shape: {y.shape}")
    print(f"Amostras válidas: {len(datas)}")
    print(f"Primeira data válida: {datas[0] if datas else 'Nenhuma'}")

    # Testa carregamento dos scalers
    scaler_features = joblib.load(caminho_scaler_features)
    scaler_targets = joblib.load(caminho_scaler_targets)
    print("\n[Info] Scalers carregados com sucesso.")



In [None]:
#@title Executar preparar dados para o modelo LSTM intradiario
import sys
sys.path.append('/content/Piloto_Day_Trade/scripts/modelagem_machine_learning')

from preparar_dados_modelagem_LSTM_intradiario import preparar_dados_lstm_intradiario
import pandas as pd

df = pd.read_csv('/content/Piloto_Day_Trade/data/transformed/dados_transformados.csv')

colunas_features = [
    'abertura', 'minimo', 'maximo', 'fechamento', 'volume',
    'SMA_10', 'EMA_10', 'OBV', 'retorno', 'volatilidade',
    'MACD', 'Signal_Line', 'rsi', 'ADX', 'BB_MA20', 'BB_STD20',
    'BB_upper', 'BB_lower', '%K', '%D', 'CCI', 'ATR'
]

colunas_targets = ['minimo_dia', 'maximo_dia', 'fechamento_dia', 'volume_dia']

X, y, datas = preparar_dados_lstm_intradiario(
    df=df,
    colunas_features=colunas_features,
    colunas_targets=colunas_targets
)


In [None]:
%%writefile /content/Piloto_Day_Trade/scripts/modelagem_machine_learning/validar_dados_preparados.py

#@title Validação dos dados preparados

import pandas as pd
import numpy as np
import os

def validar_dados(path_arquivo):
    """Valida integridade e consistência dos dados preparados para modelagem."""

    assert os.path.exists(path_arquivo), f"[ERRO] Arquivo não encontrado: {path_arquivo}"
    df = pd.read_csv(path_arquivo)

    print("[INFO] Arquivo carregado com sucesso.")
    print(f"[INFO] Shape: {df.shape}")
    print("\n[INFO] Colunas disponíveis:")
    print(df.columns.tolist())

    # Verifica valores nulos e infinitos
    total_nulos = df.isnull().sum().sum()
    total_infinitos = np.isinf(df.values).sum()
    assert total_nulos == 0, f"[ERRO] Dados contêm {total_nulos} valores nulos."
    assert total_infinitos == 0, f"[ERRO] Dados contêm {total_infinitos} valores infinitos."
    print("[OK] Sem valores nulos ou infinitos.")

    # Estatísticas descritivas
    print("\n[INFO] Estatísticas descritivas (valores arredondados):")
    print(df.describe().T[['mean', 'std', 'min', 'max']].round(4))

    # Identificação de colunas normalizadas (0 a 1)
    normalizadas = [col for col in df.columns if df[col].min() >= 0.0 and df[col].max() <= 1.0]

    # Identificação de colunas padronizadas (média ≈ 0, std ≈ 1)
    padronizadas = []
    for col in df.columns:
        if col not in normalizadas:
            media = df[col].mean()
            desvio = df[col].std()
            if abs(media) < 0.1 and abs(desvio - 1.0) < 0.1:
                padronizadas.append(col)

    outras = [col for col in df.columns if col not in normalizadas + padronizadas]

    print(f"\n[INFO] {len(normalizadas)} colunas normalizadas (0 a 1): {normalizadas}")
    print(f"[INFO] {len(padronizadas)} colunas padronizadas (média ~0, std ~1): {padronizadas}")
    if outras:
        print(f"[ALERTA] {len(outras)} colunas fora dos padrões esperados: {outras}")

    return df

if __name__ == "__main__":
    caminho = '/content/Piloto_Day_Trade/data/transformed/dados_preparados_para_modelagem.csv'
    validar_dados(caminho)


In [None]:
import sys
# Adiciona o caminho raiz ao sys.path
sys.path.append('/content')

from Piloto_Day_Trade.scripts.modelagem_machine_learning.validar_dados_preparados import validar_dados

# Chamando a função
caminho = '/content/Piloto_Day_Trade/data/transformed/dados_preparados_para_modelagem.csv'
validar_dados(caminho)


In [None]:
%%writefile /content/Piloto_Day_Trade/scripts/modelagem_machine_learning/criar_modelo_LSTM.py

# @title Definindo Script para criar o modelo LSTM Encoder-Decoder

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, LSTM, Dense, RepeatVector, TimeDistributed, Dropout

def LSTM_seq2seq(input_timesteps, input_features, output_timesteps):
    """
    Cria um modelo LSTM do tipo encoder-decoder:
    - Entrada: sequência dos dias anteriores (3 dias de 5 em 5 min)
    - Saída: sequência do próximo dia (também em 5 em 5 min)

    Args:
        input_timesteps: total de timesteps da entrada (ex: 3 dias * 32 = 96)
        input_features: número de features por timestep
        output_timesteps: total de timesteps da saída (ex: 32 para 1 dia)

    Returns:
        model (tf.keras.Model): modelo compilado
    """
    # ENCODER
    encoder_inputs = Input(shape=(input_timesteps, input_features))
    encoder_lstm = LSTM(64, return_state=True, dropout=0.2)
    _, state_h, state_c = encoder_lstm(encoder_inputs)
    encoder_states = [state_h, state_c]

    # DECODER
    decoder_inputs = RepeatVector(output_timesteps)(state_h)  # Repete o estado oculto para cada passo de saída
    decoder_lstm = LSTM(64, return_sequences=True, dropout=0.2)
    decoder_outputs = decoder_lstm(decoder_inputs, initial_state=encoder_states)

    # Saída com 4 valores por timestep (abertura, máximo, mínimo, fechamento)
    decoder_dense = TimeDistributed(Dense(4))
    outputs = decoder_dense(decoder_outputs)

    # Modelo final
    model = Model(encoder_inputs, outputs)
    model.compile(optimizer='adam', loss='mse')

    return model


In [None]:
# @title Treinamento do modelo LSTM v1

import os
import numpy as np
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from scripts.modelagem_machine_learning.criar_modelo_LSTM import LSTM_seq2seq
from scripts.modelagem_machine_learning.preparar_dados_modelagem_LSTM import preparar_dados_lstm

# Parâmetros do modelo
input_timesteps = 96         # 3 dias úteis * 32 janelas de 15 min
output_timesteps = 32        # 1 dia útil = 32 janelas de 15 min
input_features = 25          # Número de features por timestep (ajuste conforme seu dataset)
batch_size = 16
epochs = 20
patience = 5

# Carregar e preparar os dados (substitua pelos parâmetros corretos se necessário)
# Ajuste o caminho e os parâmetros conforme sua estrutura de dados
data_path = '/content/Piloto_Day_Trade/data/transformed/dados_transformados.csv'

# Chama a função de preparação dos dados e retorna os dados preparados para treinamento e teste
X_treino, X_teste, y_treino, y_teste, scaler_X, scaler_y = preparar_dados_lstm(data_path, input_timesteps, output_timesteps, input_features)

# Criação do modelo LSTM v1
model = LSTM_seq2seq(input_timesteps, input_features, output_timesteps)

# Callbacks
checkpoint_path = '/content/Piloto_Day_Trade/modelos/modelo_LSTM_v1.h5'
checkpoint = ModelCheckpoint(filepath=checkpoint_path, monitor='loss', save_best_only=True, verbose=1)
early_stop = EarlyStopping(monitor='loss', patience=patience, restore_best_weights=True)

# Treinamento
model.fit(
    X_treino,
    y_treino,
    epochs=epochs,
    batch_size=batch_size,
    validation_data=(X_teste, y_teste),  # Adicionando validação durante o treinamento
    callbacks=[early_stop, checkpoint],
    verbose=1
)

# Mensagem de sucesso
print(f"Modelo LSTM v1 treinado e salvo em: {checkpoint_path}")


In [None]:
atualizar_repo("Incluindo Modelo LSTM")

In [None]:
#@title Calcular métricas e avaliar modelo LSTM
%%writefile /content/Piloto_Day_Trade/scripts/modelagem_machine_learning/calcular_metricas_avaliar_modelo_LSTM.py

import pandas as pd
import numpy as np
import joblib
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

def avaliar_modelo_lstm(modelo, X_teste, y_teste, caminho_scaler='/content/Piloto_Day_Trade/models/LSTM/scalers/scaler_normalizacao_preco.pkl'):
    """
    Avalia o modelo LSTM, imprimindo as principais métricas e comparação entre previsões e valores reais.
    """
    print("Realizando previsões...")
    y_previsto = modelo.predict(X_teste)

    print("Carregando scaler de preços para inversão...")
    scaler_precos = joblib.load(caminho_scaler)
    colunas_precos = ['abertura', 'maximo', 'minimo', 'fechamento']

    y_previsto_reshape = y_previsto.reshape(-1, 4)
    y_teste_reshape = y_teste.reshape(-1, 4)

    y_previsto_original = scaler_precos.inverse_transform(y_previsto_reshape)
    y_teste_original = scaler_precos.inverse_transform(y_teste_reshape)

    df_previsto = pd.DataFrame(y_previsto_original, columns=colunas_precos)
    df_real = pd.DataFrame(y_teste_original, columns=colunas_precos)

    comparacao = pd.DataFrame({
        'Abertura_Real': df_real['abertura'],
        'Abertura_Prevista': df_previsto['abertura'],
        'Maximo_Real': df_real['maximo'],
        'Maximo_Previsto': df_previsto['maximo'],
        'Minimo_Real': df_real['minimo'],
        'Minimo_Previsto': df_previsto['minimo'],
        'Fechamento_Real': df_real['fechamento'],
        'Fechamento_Previsto': df_previsto['fechamento']
    })

    print("\nComparação de previsões (valores reais):")
    print(comparacao.head(10))

    def calcular_metricas(y_real, y_previsto, nome):
        mae = mean_absolute_error(y_real, y_previsto)
        mse = mean_squared_error(y_real, y_previsto)
        r2 = r2_score(y_real, y_previsto)
        print(f"{nome} - MAE: {mae:.4f}, MSE: {mse:.4f}, R²: {r2:.4f}")

    print("\n Métricas de desempenho por coluna:")
    calcular_metricas(df_real['abertura'], df_previsto['abertura'], "Abertura")
    calcular_metricas(df_real['maximo'], df_previsto['maximo'], "Máximo")
    calcular_metricas(df_real['minimo'], df_previsto['minimo'], "Mínimo")
    calcular_metricas(df_real['fechamento'], df_previsto['fechamento'], "Fechamento")

    return df_real, df_previsto, comparacao


if __name__ == "__main__":
    print("Importando scripts de modelo e dados...")
    from tensorflow.keras.models import load_model
    from scripts.modelagem_machine_learning.preparar_dados_modelagem_LSTM import preparar_dados_lstm

    print("Preparando dados para avaliação...")
    X_treino, X_teste, y_treino, y_teste = preparar_dados_lstm(
        path_dados='/content/Piloto_Day_Trade/data/transformed/dados_transformados.csv',
        tam_seq=96,
        tx_treino=0.8
    )

    print("📡 Carregando modelo salvo...")
    modelo_lstm_v1 = load_model('/content/Piloto_Day_Trade/models/LSTM/modelo_LSTM_v1.keras')

    print("Avaliando modelo...")
    df_real, df_previsto, comparacao = avaliar_modelo_lstm(
        modelo=modelo_lstm_v1,
        X_teste=X_teste,
        y_teste=y_teste
    )


In [None]:
#@title Avaliar o modelo LSTM v1 treinado
from scripts.modelagem_machine_learning.calcular_metricas_avaliar_modelo_LSTM import avaliar_modelo_lstm
from scripts.modelagem_machine_learning.preparar_dados_modelagem_LSTM import preparar_dados_lstm
from tensorflow.keras.models import load_model

# Preparar os dados
X_treino, X_teste, y_treino, y_teste = preparar_dados_lstm(
    path_dados='/content/Piloto_Day_Trade/data/transformed/dados_transformados.csv',
    tam_seq=96,
    tx_treino=0.8
)

# Carregar o modelo salvo
modelo_lstm_v1 = load_model('/content/Piloto_Day_Trade/models/LSTM/modelo_LSTM_v1.keras')

# Avaliar o modelo
df_real, df_previsto, comparacao = avaliar_modelo_lstm(
    modelo=modelo_lstm_v1,
    X_teste=X_teste,
    y_teste=y_teste,
    caminho_scaler='/content/Piloto_Day_Trade/models/LSTM/scalers/scaler_normalizacao_preco.pkl'
)


In [None]:
atualizar_repo("Finalizando Modelo LSTM")

# Pipeline de dados

In [None]:
import os

def listar_arquivos_em_subpastas(pasta_base):
    for raiz, subpastas, arquivos in os.walk(pasta_base):
        nivel = raiz.replace(pasta_base, '').count(os.sep)
        indent = ' ' * 4 * nivel
        print(f"{indent}{os.path.basename(raiz)}/")
        subindent = ' ' * 4 * (nivel + 1)
        for arquivo in arquivos:
            print(f"{subindent}{arquivo}")

# Caminho da pasta onde estão os scripts
pasta_scripts = "/content/Piloto_Day_Trade/scripts"
listar_arquivos_em_subpastas(pasta_scripts)


In [None]:
%%writefile /content/Piloto_Day_Trade/scripts/pipeline/executar_pipeline.py

import os
import pandas as pd

from scripts.pipeline.extracao_dados import extrair_dados
from scripts.pipeline.limpeza_dados import limpeza_dados
from scripts.pipeline.transformacao_dados import transformar_dados
from scripts.pipeline.carga_dados import carregar_dados
from scripts.pipeline.criar_banco_dimensional import criar_banco
from scripts.modelagem_machine_learning.preparar_dados_modelagem_LSTM import preparar_dados_lstm

def executar_pipeline(ticker, intervalo, dias, caminho_bruto, caminho_limpo, caminho_transformado, db_path, tam_seq, tx_treino):
    print("\nIniciando execução completa do pipeline...")

    # Etapa 1: Extração
    print("Executando: Extração de dados")
    extrair_dados(ticker, dias, intervalo, caminho_bruto)

    # Etapa 2: Limpeza
    print("Executando: Limpeza de dados")
    df_bruto = pd.read_csv(caminho_bruto, index_col=0, parse_dates=True, dayfirst=True)
    limpeza_dados(df_bruto, caminho_limpo)

    # Etapa 3: Transformação
    print("Executando: Transformação de dados")
    transformar_dados(caminho_limpo, caminho_transformado)

    # Etapa 4: Criar banco dimensional
    print("Executando: Criação do banco dimensional")
    criar_banco(db_path)  # Passando db_path para a função

    # Etapa 5: Carga de dados
    print("Executando: Carga de dados")
    df_transformado = pd.read_csv(caminho_transformado)
    carregar_dados(df_transformado, db_path)

    # Etapa 7: Preparação dos dados para modelagem
    print("Executando: Preparação de dados para LSTM")
    preparar_dados_lstm(
        path_dados=caminho_transformado,
        tam_seq=tam_seq,
        tx_treino=tx_treino
    )

    print("\nPipeline finalizado com sucesso.")

if __name__ == "__main__":
    # Chamada com parâmetros do projeto
    ticker = "BBDC4.SA"
    intervalo = "5m"
    dias = 45
    caminho_bruto = "/content/Piloto_Day_Trade/data/raw/dados_brutos.csv"
    caminho_limpo = "/content/Piloto_Day_Trade/data/cleaned/dados_limpos.csv"
    caminho_transformado = "/content/Piloto_Day_Trade/data/transformed/dados_transformados.csv"
    db_path = "/content/Piloto_Day_Trade/modelagem/database/banco_dimensional.db"
    tam_seq = 96
    tx_treino = 0.8

    executar_pipeline(ticker, intervalo, dias, caminho_bruto, caminho_limpo, caminho_transformado, db_path, tam_seq, tx_treino)


In [None]:
# @title ## Testar pipeline

# Importando a função do pipeline
from Piloto_Day_Trade.scripts.pipeline.executar_pipeline import executar_pipeline

# Definindo os parâmetros a serem passados
ticker = "BBDC4.SA"
intervalo = "5m"
dias = 45
caminho_bruto = "/content/Piloto_Day_Trade/data/raw/dados_brutos_teste.csv"
caminho_limpo = "/content/Piloto_Day_Trade/data/cleaned/dados_limpos_teste.csv"
caminho_transformado = "/content/Piloto_Day_Trade/data/transformed/dados_transformados_teste.csv"
db_path = "/content/Piloto_Day_Trade/modelagem/database/banco_dimensional_teste.db"
tam_seq = 96
tx_treino = 0.8

# Executando o pipeline com os parâmetros definidos
executar_pipeline(ticker, intervalo, dias, caminho_bruto, caminho_limpo, caminho_transformado, db_path, tam_seq, tx_treino)


In [None]:
import os

# Caminhos dos arquivos
caminhos = [
    "/content/Piloto_Day_Trade/data/raw/dados_brutos_teste.csv",
    "/content/Piloto_Day_Trade/data/cleaned/dados_limpos_teste.csv",
    "/content/Piloto_Day_Trade/data/transformed/dados_transformados_teste.csv",
    "/content/Piloto_Day_Trade/data/raw/dados_brutos1304.csv",
    "/content/Piloto_Day_Trade/data/raw/dados_brutostemp.csv",
    "/content/Piloto_Day_Trade/data/raw/dados_brutostemp2.csv"
]

# Função para deletar arquivos se existirem
def deletar_arquivos(lista_caminhos):
    for caminho in lista_caminhos:
        if os.path.exists(caminho):
            os.remove(caminho)
            print(f"Arquivo deletado: {caminho}")

# Execução direta
if __name__ == "__main__":
    deletar_arquivos(caminhos)


In [None]:
%%writefile /content/Piloto_Day_Trade/.github/workflows/pipeline.yml

name: Executar Pipeline Completo

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  pipeline:
    runs-on: ubuntu-latest

    steps:
      - name: Clonar repositório
        uses: actions/checkout@v3

      - name: Configurar Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'

      - name: Instalar dependências
        run: |
          python -m pip install --upgrade pip
          pip install -r requirements.txt

      - name: Executar pipeline
        run: |
          python scripts/pipeline/executar_pipeline_completo.py


In [None]:
%%writefile /content/Piloto_Day_Trade/.github/workflows/workflow_doc.md

#@title Definindo Documentação do workflow completo para Execução do Pipeline

## Objetivo

Este documento descreve o fluxo de trabalho para a execução automatizada do pipeline de dados, utilizando o GitHub Actions. O pipeline abrange desde a extração até a modelagem de dados, e foi estruturado para garantir a automação da execução do processo, facilitando atualizações contínuas e execuções programadas no repositório GitHub.

## Estrutura do Pipeline

O pipeline é composto por várias etapas que são executadas em sequência. Cada uma das etapas envolve um script específico para garantir a correta manipulação dos dados:

1. **Extração de Dados:** Obtém os dados brutos de uma fonte externa, como o Yahoo Finance ou qualquer outra fonte configurada.
2. **Limpeza de Dados:** Realiza a limpeza e formatação dos dados brutos.
3. **Transformação de Dados:** Aplica transformações necessárias para deixar os dados prontos para a modelagem.
4. **Criação de Banco Dimensional:** Estrutura os dados de maneira que sejam facilmente analisáveis.
5. **Carga de Dados:** Carrega os dados transformados para o banco de dados.
6. **Geração de Catálogo de Dados:** Cria um catálogo de metadados para facilitar o uso futuro dos dados.
7. **Preparação de Dados para Modelagem LSTM:** Prepara os dados específicos para alimentar o modelo de LSTM.
8. **Modelagem e Avaliação:** Treina e avalia o modelo LSTM.

### Scripts Responsáveis por Cada Etapa

1. **`extracao_dados.py`:** Extração dos dados brutos.
2. **`limpeza_dados.py`:** Limpeza e pré-processamento dos dados brutos.
3. **`transformacao_dados.py`:** Aplicação das transformações necessárias nos dados.
4. **`criar_banco_dimensional.py`:** Criação do banco dimensional para armazenar os dados.
5. **`carga_dados.py`:** Carga dos dados transformados no banco dimensional.
6. **`gerar_catalogo_dados.py`:** Geração do catálogo de dados.
7. **`preparar_dados_modelagem_LSTM.py`:** Preparação final dos dados para treinamento do modelo LSTM.

## GitHub Actions: Workflow

O objetivo é configurar um workflow automatizado no GitHub Actions para que ele execute todo o pipeline a cada novo push ou evento programado. Para isso, criamos um arquivo YAML no repositório do GitHub.

### Workflow: `pipeline.yml`

Este workflow será responsável por orquestrar todas as etapas de execução. Abaixo está o conteúdo do arquivo YAML:

```yaml
name: Pipeline Completo de Dados

on:
  push:
    branches:
      - main
  schedule:
    - cron: '0 0 * * 1'  # Executa toda segunda-feira às 00:00 (UTC)

jobs:
  run_pipeline:
    runs-on: ubuntu-latest

    steps:
    - name: Checkout do repositório
      uses: actions/checkout@v2

    - name: Configurar Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.8'

    - name: Instalar dependências
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt

    - name: Executar Extração de Dados
      run: python scripts/pipeline/extracao_dados.py

    - name: Executar Limpeza de Dados
      run: python scripts/pipeline/limpeza_dados.py

    - name: Executar Transformação de Dados
      run: python scripts/pipeline/transformacao_dados.py

    - name: Criar Banco Dimensional
      run: python scripts/pipeline/criar_banco_dimensional.py

    - name: Carregar Dados
      run: python scripts/pipeline/carga_dados.py

    - name: Gerar Catálogo de Dados
      run: python scripts/pipeline/gerar_catalogo_dados.py

    - name: Preparar Dados para Modelagem LSTM
      run: python scripts/modelagem_machine_learning/preparar_dados_modelagem_LSTM.py

    - name: Avaliar Modelo LSTM
      run: python scripts/modelagem_machine_learning/calcular_metricas_avaliar_modelo_LSTM.py


  ### Explicação do Workflow:

    - Evento de Acionamento:

    O workflow é acionado por dois eventos principais:

    Push para a branch main: Sempre que um novo commit for enviado para a branch main, o pipeline será executado automaticamente.

    Agendamento Semanal: O pipeline é executado toda segunda-feira às 00:00 UTC, garantindo que os dados sejam atualizados regularmente.

    - Jobs:

    run_pipeline: Este job é o responsável por executar todas as etapas do pipeline.

    Ele é executado em uma máquina virtual Ubuntu, configurada com Python 3.8.

    - Etapas:

    Checkout do Repositório: Faz o checkout do código do repositório.

    Configuração do Python: Configura o ambiente Python necessário.

    Instalação de Dependências: Instala as dependências listadas no requirements.txt.

    Execução das Etapas do Pipeline: Cada etapa do pipeline é executada com o comando python apontando para o script correspondente.

    - Requisitos para o Workflow:
    requirements.txt: Um arquivo contendo todas as dependências necessárias para rodar o pipeline. Ele deve estar no repositório e ser mantido atualizado.

    Acesso ao Repositório: O repositório deve conter todos os scripts e arquivos de dados necessários para a execução do pipeline.


In [None]:
atualizar_repo("Finalizando Pipeline")

In [None]:
#@title Função operacional para esvaziar a pasta de dados para testes

import os

def esvaziar_pasta_data(caminho='/content/Piloto_Day_Trade/data'):
    for root, dirs, files in os.walk(caminho):
        for file in files:
            caminho_arquivo = os.path.join(root, file)
            os.remove(caminho_arquivo)
            print(f'Removido: {caminho_arquivo}')
    print('Pasta data esvaziada com sucesso.')

# Executar
esvaziar_pasta_data()
