`utils.logger`

In [1]:
import logging


def get_logger(
    logger_name: str = "notebook",
    logger_level: int = logging.INFO,
    logger_date_format: str = "%Y-%m-%d %H:%M:%S"
) -> logging.Logger:
    # Criando um objeto Logger e estabelecendo seu nível
    logger = logging.getLogger(logger_name)
    logger.setLevel(logger_level)

    # Configurando o formato da mensagem
    log_format = "%(levelname)s;%(asctime)s;%(filename)s;"
    log_format += "%(lineno)d;%(message)s"
    formatter = logging.Formatter(log_format,
                                  datefmt=logger_date_format)

    # Configurando stream handler
    stream_handler = logging.StreamHandler()
    stream_handler.setFormatter(formatter)
    logger.addHandler(stream_handler)

    return logger


`domain.entities.ticker.py`

In [2]:
from datetime import date, datetime
from pydantic import BaseModel, Field, model_validator, field_validator

class Ticker(BaseModel):
    code: str
    code_international: str
    company_name: str
    source_info: str
    dt_extracted: str = Field(default_factory=lambda: datetime.now().strftime("%Y-%m-%d %H:%M:%S"))


    @model_validator(mode="before")
    @classmethod
    def generate_code_international(cls, values):
        """Garante a inclusão do atributo code_international para representação do ticker"""
        values["code_international"] = values["code"] + ".SA"
        return values


    @field_validator("source_info")
    @classmethod
    def normalize_source_info(cls, value: str):
        """Garante a padronização do atributo source_info"""
        return value.strip().lower()


`domain.interfaces.repositories_interface`

In [3]:
from abc import ABC, abstractmethod


class ITickersInfoRepository(ABC):
    """Interface de contrato para armazenamento de informações de tickers da B3"""

    @abstractmethod
    def persist(self, ticker: Ticker) -> None:
        pass


`domain.interfaces.adapters_interface`

In [4]:
from abc import ABC, abstractmethod


class IHttpAdapter(ABC):
    """Interface de contrato para requisições HTTP/HTTPs via requests"""

    @abstractmethod
    def get(self):
        pass


class ITickersInfoAdapter(ABC):
    """Interface de contrato para coleta de informações de tickers da B3"""

    @abstractmethod
    def get_tickers(self) -> list[Ticker]:
        pass


`infra.adapters.requests_adapter`

In [5]:
import requests
from requests.adapters import HTTPAdapter, Retry


logger = get_logger()


class RequestsAdapter(IHttpAdapter):
    def __init__(
        self,
        url: str,
        timeout: int,
        headers: dict[str, str],
        num_retries: int,
        backoff_factor: int,
        status_forcelist: list[int],
        session: requests.sessions.Session = requests.Session()
    ):
        self.__url = url
        self.__timeout = timeout
        self.__headers = headers
        self.__num_retries = num_retries
        self.__backoff_factor = backoff_factor
        self.__status_forcelist = status_forcelist
        self.__session = session

        # Configurando sessão da requisição
        self.__configure_request_session()


    def __configure_request_session(self) -> None:
        retry_config = Retry(
            total=self.__num_retries,
            backoff_factor=self.__backoff_factor,
            status_forcelist=self.__status_forcelist
        )

        http_adapter = HTTPAdapter(max_retries=retry_config)
        self.__session.mount("https://", http_adapter)
        self.__session.mount("http://", http_adapter)


    def get(self) -> requests.models.Response:
        try:
            response = self.__session.get(
                url=self.__url,
                headers=self.__headers,
                timeout=self.__timeout
            )

            return response

        except requests.Timeout as to_error:
            logger.error(f"Erro de timeout ao acessar a url {self.__url}")
            raise to_error

        except requests.ConnectionError as conn_error:
            logger.error(f"Erro de conexão ao acessar a url {self.__url}")
            raise conn_error

        except requests.HTTPError as http_error:
            logger.error(f"Erro de HTTP ao acessar a url {self.__url} "
                         f"com status code {http_error.response.status_code}")
            raise http_error

        except requests.RequestException as req_error:
            logger.error(f"Erro inesperado ao acessar a url {self.__url}")
            raise req_error


    def get_url(self) -> str:
        return self.__url


`infra.adapters.fundamentus_adapter`

In [6]:
from bs4 import BeautifulSoup


# Definindo adapter para requisições HTTP/HTTPs via requests
REQUESTS_ADAPTER = RequestsAdapter(
    url="https://www.fundamentus.com.br/resultado.php",
    timeout=10,
    headers={
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
                      "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
    },
    num_retries=3,
    backoff_factor=3,
    status_forcelist=[500, 502, 503, 504]
)


class FundamentusGetTickersAdapter(ITickersInfoAdapter):
    def __init__(
        self,
        requests_adapter: RequestsAdapter = REQUESTS_ADAPTER,
        html_parser: str = "lxml"
    ) -> None:
        self.__requests_adapter = requests_adapter
        self.__html_parser = html_parser


    def __get_request_content(self) -> str:
        return self.__requests_adapter.get().text


    def __parse_html_content(self, html_text: str) -> BeautifulSoup:
        if not isinstance(html_text, str):
            raise TypeError("O conteúdo HTML (html_text) deve ser uma string válida")

        return BeautifulSoup(html_text, self.__html_parser)


    def __find_tickers_info(self, html_parsed: BeautifulSoup) -> list[dict[str, str]]:
        # Retornando tabela do HTML contendo células que possuem informações de ativos
        tickers_cells = list({
            row.find("td") for row in html_parsed.find_all("tr") if row.find("td") is not None
        })

        # Extraindo e consolidando informações em uma lista ordenada de dicionários
        tickers_info = sorted([
            {
                "codigo_papel": cell.find("a").text.upper().strip(),
                "nome_companhia": cell.find("span").get("title").upper().strip()
            }
            for cell in tickers_cells
        ], key=lambda x: x["codigo_papel"])

        return tickers_info


    def get_tickers(self) -> list[Ticker]:
        html_text = self.__get_request_content()
        html_parsed = self.__parse_html_content(html_text=html_text)
        tickers_info = self.__find_tickers_info(html_parsed=html_parsed)

        # Adaptando resultado como instâncias da entidade esperada
        tickers_objects = [
            Ticker(
                code=info["codigo_papel"],
                company_name=info["nome_companhia"],
                source_info="fundamentus"
            )
            for info in tickers_info
        ]

        return tickers_objects


`infra.repositories.dynamodb_repository`

In [7]:
import boto3


class DynamodbTickersInfoRepository(ITickersInfoRepository):

    def __init__(self, table_name: str, region_name: str):
        self.__table_name: str = table_name
        self.__region_name: str = region_name
        self.__boto3_resource = boto3.resource("dynamodb", region_name=self.__region_name)
        self.__dynamodb_table = self.__boto3_resource.Table(self.__table_name)


    def persist(self, ticker: Ticker) -> None:
        self.__dynamodb_table.put_item(Item=ticker.model_dump())


`usecases.save_tickers_info_usecase`

In [8]:
class SaveTickersInfoUseCase:
    def __init__(self, adapter = ITickersInfoAdapter, repository = ITickersInfoRepository):
        self.__adapter = adapter
        self.__repository = repository

    def execute(self):
        tickers = self.__adapter.get_tickers()
        for ticker in tickers:
            self.__repository.persist(ticker=ticker)


In [9]:
adapter = FundamentusGetTickersAdapter()
repository = DynamodbTickersInfoRepository(table_name="br_stocks_active_tickers", region_name="us-east-1")

use_case = SaveTickersInfoUseCase(adapter=adapter, repository=repository)
#use_case.execute()

In [10]:
tickers = adapter.get_tickers()
tickers[0]

Ticker(code='AALR3', code_international='AALR3.SA', company_name='ALLIAR', source_info='fundamentus', dt_extracted='2025-02-10 22:22:05')

In [19]:
for ticker in tickers:
    idx = tickers.index(ticker)
    if idx > 0 and idx % 50 == 0:
        num_tickers_left = len(tickers) - idx
        pct_tickers_left = round(100 * (1 - (num_tickers_left / len(tickers))), 2)
        logger.info(f"Foram inseridos {idx} tickers no repositório. "
                    f"Restam {num_tickers_left} tickers ({pct_tickers_left}% concluído)")
        
logger.info(f"Processo de extração e escrita de informações finalizado com sucesso "
                    f"para todos os {len(tickers)} tickers da B3")

INFO;2025-02-10 22:33:38;1934086383.py;6;Foram inseridos 50 tickers no repositório. Restam 937 tickers (5.07% concluído)
INFO;2025-02-10 22:33:38;1934086383.py;6;Foram inseridos 100 tickers no repositório. Restam 887 tickers (10.13% concluído)
INFO;2025-02-10 22:33:38;1934086383.py;6;Foram inseridos 150 tickers no repositório. Restam 837 tickers (15.2% concluído)
INFO;2025-02-10 22:33:38;1934086383.py;6;Foram inseridos 200 tickers no repositório. Restam 787 tickers (20.26% concluído)
INFO;2025-02-10 22:33:38;1934086383.py;6;Foram inseridos 250 tickers no repositório. Restam 737 tickers (25.33% concluído)
INFO;2025-02-10 22:33:38;1934086383.py;6;Foram inseridos 300 tickers no repositório. Restam 687 tickers (30.4% concluído)
INFO;2025-02-10 22:33:38;1934086383.py;6;Foram inseridos 350 tickers no repositório. Restam 637 tickers (35.46% concluído)
INFO;2025-02-10 22:33:38;1934086383.py;6;Foram inseridos 400 tickers no repositório. Restam 587 tickers (40.53% concluído)
INFO;2025-02-10 22:3

___

In [32]:
# Configurações necessárias para mockar um DynamoDB para fins de testes
MOCKED_DYNAMODB_TABLE_NAME = "mocked_br_stocks_active_tickers"
MOCKED_DYNAMODB_TABLE_KEY_SCHEMA = [
    {
        "AttributeName": "code",
        "KeyType": "HASH"
    },
    {
        "AttributeName": "dt_extracted",
        "KeyType": "RANGE"
    }
]
MOCKED_DYNAMODB_TABLE_ATTRIBUTE_DEFINITIONS = [
    {
        "AttributeName": "code",
        "AttributeType": "S"
    },
    {
        "AttributeName": "dt_extracted",
        "AttributeType": "S"
    }
]
MOCKED_DYNAMODB_TABLE_BILLING_MODE = "PAY_PER_REQUEST"
MOCKED_BOTO3_CLIENT_REGION = "us-east-1"

In [44]:
from moto import mock_aws
import time


@mock_aws
def dynamodb_repository(ticker=tickers[0]):
    # Criando tabela mockada no DynamoDB
    boto3_client = boto3.client("dynamodb")
    r = boto3_client.create_table(
        TableName=MOCKED_DYNAMODB_TABLE_NAME,
        KeySchema=MOCKED_DYNAMODB_TABLE_KEY_SCHEMA,
        AttributeDefinitions=MOCKED_DYNAMODB_TABLE_ATTRIBUTE_DEFINITIONS,
        BillingMode=MOCKED_DYNAMODB_TABLE_BILLING_MODE
    )

    # Validando criação da tabela
    while True:
        r = boto3_client.describe_table(TableName=MOCKED_DYNAMODB_TABLE_NAME)
        status = r["Table"]["TableStatus"]

        if status == 'ACTIVE':
            break

        time.sleep(2)

    # Inicializando repositório dentro de escopo mockado
    repository = DynamodbTickersInfoRepository(
        table_name=MOCKED_DYNAMODB_TABLE_NAME,
        region_name=MOCKED_BOTO3_CLIENT_REGION
    )

    # Validando
    repository.persist(ticker=ticker)
    r = boto3_client.get_item(
        TableName=MOCKED_DYNAMODB_TABLE_NAME,
        Key={
            "code": {"S": ticker.code},
            "dt_extracted": {"S": ticker.dt_extracted}
        }
    )

    return r

In [45]:
r = dynamodb_repository()

In [50]:
r["Item"]["code"]["S"]

'AALR3'