# **Trusted** 

A camada trusted como a primeira representa√ß√£o estruturada e validada dos dados ap√≥s a ingest√£o bruta (raw). Ela transforma arquivos heterog√™neos (JSON, CSV, XML, ZIP etc.) em datasets tabulares padronizados, tipados e audit√°veis, mantendo fidelidade √† fonte original.

***Se o Raw √© evid√™ncia, a Trusted √© evid√™ncia organizada.***

Ela ainda n√£o aplica regras de neg√≥cio complexas ou integra m√∫ltiplas fontes ‚Äî isso pertence √†s camadas seguintes (Refined, Feature, etc.). A Trusted apenas garante que os dados:

- possuem tipos corretos
- t√™m schema est√°vel
- s√£o consistentes e reprocess√°veis
- mant√™m rastreabilidade completa

----------------------------

## *BCB SGS*

### *1. Tipo e Estrutura*
Os dados extra√≠dos da SGS s√£o at√© o momento especificamente as s√©ries SELIC (432) e IPCA (433). Por defini√ß√£o esses s√£o dados di√°rios extra√≠dos (para IPCA o BCB aplica um interpola√ß√£o para retornar o varia√ß√£o percentual do dia) cujo o principal interesse √© *data* e *valor*. Afim de preservar uma estrutura audit√°vel tamb√©m ser√£o gerados campos que indiquem:

- Data de Processamento (injestion)
- Arquivo Raw que gerou aquele dados
- Hash do conte√∫do para governan√ßa, pois:
    - Audit√°vel permitindo reconstruir hist√≥rico, provar integridade e fazer reconcilia√ß√£o
    - Caso reprocesse o mesmo per√≠odo os dados n√£o s√£o reinseridos (indepot√™ncia)
    - Detectar altera√ß√£o silenciosa (Se houver altera√ß√£o de dados antigos a coluna vai acusar)
    - Unicidade Real; Pequenas varia√ß√µes de campo e formata√ß√£o (como tipo) n√£o impactam a base

tornando assim os dados *raw* uma *tabela clean e audit√°vel". Com isso em mente podemos montar o schema desses dados como:

**Raw:**
 data | valor |
|:------------:|:-------:|
| str | str |

**Trusted**

| series_id | ref_date | value | raw_file | raw_hash | record_hash | ijestioningestion_ts_utc |
|:---------:|:--------:|:-----:|:--------:|:--------:|:-----------:|:------------------------:|
| int | date | float | str | str | str | str |

series_id e ref_date fornecem naturalmente uma chave √∫nica para a base de dados.

#### *Classe Modelo*

Como estamos falando de um processo fixo de injest√£o de dados que ser√£o transformados em arquivos *.parquet* √© interessenate construir uma classe fixa de formata√ß√£o de dados n√£o dependendo de pandas. Esse tipo de arquitetura √© interessante inclusive pela restri√ß√£o de flexbilidade do objeto gerando flexibilidade de c√≥digo, j√° que seu futuramente pandas n√£o for mais uma op√ß√£o o dom√≠nio SgsPoint continua o mesmo, com controle de tipo e testabilidade.

```python models.py
@dataclass(frozen=True)
class SgsPoint:
    series_id: int
    ref_date: date
    value: Optional[float]  # pode ser None se vier vazio
    raw_file: str
    raw_hash: str
    record_hash: str
    ingestion_ts_utc: str 
```
Da mesma forma, afim de manter a escalabilidade da Base de Dados podemos j√° esquematizar os Metadados da s√©ries extra√≠das, gerando assim maior governan√ßa dos dados e facilidade de entendimente para futuros consumidores dessa informa√ß√£o, tendo assim no√ß√µes de fonte, nome da s√©rie, frequ√™ncia de publica√ß√£o, unidade sem precisar consultar documenta√ß√£o.

```python models.py
@dataclass(frozen=True)
class SgsSeriesMeta:
    series_id: int
    name: str
    frequency: str
    unit: str
    source: str = "BCB_SGS" # Por padr√£o por enquanto
```

Apartir da forma que os arquivos JSON est√£o sendo salvos podemos facilmente extrair de qual s√©rie s√£o aqueles dados.

In [10]:
from pathlib import Path
import json

path = Path("C:\\Users\\Dell\\OneDrive\\Documentos\\GitHub\\ML-ETTJ26\\data\\01_raw\\bcb\\sgs\\433_01-01-2000_31-12-2008.json")

stem = path.stem
series_str = stem.split("_", 1)[0]
series_str

'433'


Com uma r√°pida olhada nos dados podemos perceber que as informa√ß√µes extar√°idas da API est√° em uma lista de discion√°rios onde ambas as chaves e valores s√£o strings.

```JSON
[{'data': '01/01/2000', 'valor': '1.00'}, ...]
```


In [11]:
with path.open("r", encoding="utf-8") as f:
    json_f = json.load(f)
json_f[0]

{'data': '01/01/2000', 'valor': '0.62'}

Antes de passar esses valores para trusted eles precisam ent√£o ser normalizados:
- Data deve ser formato date, n√£o string
- Valor deve ser valor decimal com quantidade fixa de casas, n√£o string 

(obs: float pode gerar comportamento indesejado em raz√£o da base bin√°ria)

In [12]:
from datetime import datetime

s = json_f[0]["data"]
datetime.strptime(s, "%d/%m/%Y").date()

datetime.date(2000, 1, 1)

In [13]:
from decimal import Decimal, ROUND_HALF_UP

s = json_f[0].get("valor")
s = str(s).strip()
s = Decimal(s)
vq = s.quantize(Decimal("0.0000000001"), rounding=ROUND_HALF_UP)
print(vq, s)

0.6200000000 0.62


Tranquilamente conseguimos Gerar agora os valores hash com a seguran√ßa de comportamento maior

In [14]:
import hashlib
series_id = int(series_str)
ref_date = datetime.strptime(json_f[0]["data"], "%d/%m/%Y").date()
value_dec = vq

payload = f"{series_id}|{ref_date.isoformat()}|{value_dec}"

record_hash = hashlib.sha256(payload.encode("utf-8")).hexdigest()
record_hash

'072692bcfd6ededdac1377bf9cab985900fca426ef5d8543c6d9a7cdc779fb24'

In [15]:
h = hashlib.sha256()
with path.open("rb") as f:
    
    for chunk in iter(lambda: f.read(1024 * 1024), b""):
        h.update(chunk)
h.hexdigest()

'2064cccd061cb5ed9f8747e42c3cbbea8637b9542f78a6146547c9c8674f67ef'

In [16]:
from datetime import timezone

ingestion_ts_utc = datetime.now(timezone.utc).replace(microsecond=0).isoformat()
ingestion_ts_utc

'2026-02-21T17:11:44+00:00'

In [17]:
# Ap√≥s rodar o pipeline da trusted ( kedro run --pipeline trusted_bcb_sgs ) podemos ler o parquet que deve ser gerado
import pandas as pd

sgs_p = pd.read_parquet(r"C:\Users\Dell\OneDrive\Documentos\GitHub\ML-ETTJ26\data\02_trusted\bcb\sgs\points.parquet")
print(sgs_p.info(), sgs_p["raw_hash"].value_counts(), sep="\n\n")

<class 'pandas.DataFrame'>
RangeIndex: 9852 entries, 0 to 9851
Data columns (total 7 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   series_id         9852 non-null   int64  
 1   ref_date          9852 non-null   object 
 2   value             9852 non-null   float64
 3   record_hash       9852 non-null   str    
 4   raw_file          9852 non-null   str    
 5   raw_hash          9852 non-null   str    
 6   ingestion_ts_utc  9852 non-null   str    
dtypes: float64(1), int64(1), object(1), str(4)
memory usage: 2.2+ MB
None

raw_hash
480c424330ea60ca1d9e880bb515e8be38bb0b61f0c3d6f6b07d640e8884bad9    3329
023c3e8e15126a84ae1b8a9e00a325719c03eb7c8f8bbb7bf1988a81415c7926    3288
74ec4cb5fdcd6e61a207f15f04df8b708d760c50f50e8cb4a1bd6538b7db72c6    2922
2064cccd061cb5ed9f8747e42c3cbbea8637b9542f78a6146547c9c8674f67ef     108
93a1f52671c819997afbd5e454898679dd21e844a227298538b613a37dae725e     108
7a8c38a1e810ad0d8478c2962c395b58

In [18]:
sgs_m = pd.read_parquet(r"C:\Users\Dell\OneDrive\Documentos\GitHub\ML-ETTJ26\data\02_trusted\bcb\sgs\series_meta.parquet")
print(sgs_m.info(), sgs_m.head(), sep="\n\n")

<class 'pandas.DataFrame'>
RangeIndex: 2 entries, 0 to 1
Data columns (total 5 columns):
 #   Column       Non-Null Count  Dtype
---  ------       --------------  -----
 0   series_id    2 non-null      int64
 1   series_name  2 non-null      str  
 2   frequency    2 non-null      str  
 3   unit         2 non-null      str  
 4   source       2 non-null      str  
dtypes: int64(1), str(4)
memory usage: 244.0 bytes
None

   series_id series_name frequency    unit   source
0        432       SELIC         D  % a.a.  BCB_SGS
1        433        IPCA         M       %  BCB_SGS


---------------------

## *BCB DEMAB*

Ao contrario do Sistema Gerenciador de S√©ries Temporais (SGS) do BCB o Departamento de Opera√ß√µes do Mercado Aberto (DEMAB) oferece os dados mensais de definitivos de T√≠tulos P√∫blicos Federais registrados no Sistema Especial de Liquida√ß√£o e Cust√≥dia (o Selic), com detalhamento di√°rio por t√≠tulo e vencimento,  ou seja, o exato insumo nescess√°rio para constru√ß√£o da ETTJ PRE desse projeto.

Obs: Considerarei apenas os negociados Extragrupo por considerar que as informa√ß√µes Intragrupo (NegT) pode gerar uma esp√©cie de ru√≠do de pre√ßo e liquidez desnecess√°rio.

In [19]:
from pathlib import Path
from collections import Counter
import zipfile
from datetime import datetime

zip_path = [Path(r"C:\Users\Dell\OneDrive\Documentos\GitHub\ML-ETTJ26\data\01_raw\bcb\demab\negociacoes_titulos_federais_secundario\NegE200701.ZIP"), 
            Path(r"C:\Users\Dell\OneDrive\Documentos\GitHub\ML-ETTJ26\data\01_raw\bcb\demab\negociacoes_titulos_federais_secundario\NegE202309.ZIP"),  
            Path(r"C:\Users\Dell\OneDrive\Documentos\GitHub\ML-ETTJ26\data\01_raw\bcb\demab\negociacoes_titulos_federais_secundario\NegE202512.ZIP"),
            ]

for zp in zip_path:
    with zipfile.ZipFile(zp, "r") as z:
        infos = z.infolist()
        print(f"ZIP: {zp.name}")
        exts = [Path(i.filename).suffix.lower() for i in z.infolist() if not i.is_dir()]
        print(Counter(exts))
        print(f"Arquivos: {len(infos)}")
        for i in infos[:50]:
            dt = datetime(*i.date_time)
            print(f"- {i.filename} | {i.file_size/1024:.1f} KB | {dt}")

ZIP: NegE200701.ZIP
Counter({'.csv': 1})
Arquivos: 1
- NegE200701.CSV | 94.2 KB | 2007-02-03 00:04:04
ZIP: NegE202309.ZIP
Counter({'.csv': 1})
Arquivos: 1
- NegE202309.CSV | 142.7 KB | 2023-10-03 23:47:42
ZIP: NegE202512.ZIP
Counter({'.csv': 1})
Arquivos: 1
- NegE202512.CSV | 166.5 KB | 2026-01-05 23:04:18


Rapidamente podemos conferir os arquivos em diferentes momentos do tempo explorando e garantido se h√° consist√™ncia na forma que os arquivos foram extra√≠dos e como podemos ver, al√©m do nome *NegEyyymm.ZIP* o arquivo compactado parece se manter o mesmo ao longo do tempo ( .csv ) sendo sempre √∫nico

In [20]:
for zp in zip_path:
    with zipfile.ZipFile(zp, "r") as z:
        # escolha um arquivo que pare√ßa dados
        name = [i.filename for i in z.infolist() if i.filename.lower().endswith((".csv", ".txt"))][0]
        print("Arquivo:", name)
        with z.open(name) as f:
            sample = f.read(4096)  # 4KB
        # tente decodificar
        for enc in ("utf-8", "latin-1", "cp1252"):
            try:
                text = sample.decode(enc)
                print("Encoding prov√°vel:", enc)
                print(text.splitlines()[:10])
                break
            except UnicodeDecodeError:
                pass


Arquivo: NegE200701.CSV
Encoding prov√°vel: utf-8
['DATA MOV;SIGLA;CODIGO;CODIGO ISIN;EMISSAO;VENCIMENTO;NUM DE OPER;QUANT NEGOCIADA;VALOR NEGOCIADO;PU MIN;PU MED;PU MAX;PU LASTRO;VALOR PAR;TAXA MIN;TAXA MED;TAXA MAX', '02/01/2007;LFT;210100;BRSTNCLF1741;04/01/2002;17/01/2007;28;4223;;2963,02918000;2963,02918000;2963,02918000;2962,89340793;2963,00978687;-0,0150;-0,0150;-0,0150', '02/01/2007;LFT;210100;BRSTNCLF17W8;19/09/2002;21/02/2007;2;1259;;2963,06973000;2963,07607744;2963,07694900;2962,65014685;2963,00978687;-0,0168;-0,0166;-0,0150', '02/01/2007;LFT;210100;BRSTNCLF17X6;19/09/2002;21/03/2007;43;14756;;2963,00978600;2963,09021875;2963,09125200;2962,37521180;2963,00978687;-0,0128;-0,0127;0,0000', '02/01/2007;LFT;210100;BRSTNCLF1808;19/09/2002;20/06/2007;11;12764;;2963,00978600;2963,15668471;2963,17210800;2961,64682719;2963,00978687;-0,0119;-0,0108;0,0000', '02/01/2007;LFT;210100;BRSTNCLF1832;19/09/2002;19/09/2007;4;1113;;2963,23409641;2963,23620561;2963,23646600;2960,89513416;2963,009

A partir da√≠ j√° d√° para come√ßar a montar a trusted, pois temos que algumas colunas s√£o fixas e tendo nas datas mais recentes com colunas a mais, irrelevantes para o modelo, mas interessante se atentar na hora de modelar o fluxo para a trusted. Assim como com os dados de SGS podemos j√° estruturar os dados:

**Raw:** Temos muitas colunas que v√£o apenas ocupar espa√ßo e armazenar informa√ß√£o que nunca ser√° usada para constru√ß√£o de curvas, por mais que seja interessante para analise de liquidez, por exemplo, por enquanto s√£o desnescess√°rias e n√£o ser√£o carregadas na trusted.
| DATA MOV | SIGLA | CODIGO | CODIGO ISIN | EMISSAO | VENCIMENTO | NUM DE OPER | QUANT NEGOCIADA |
|:--------:|:-----:|:------:|:-----------:|:-------:|:----------:|:-----------:|:---------------:|

| PU MIN | PU MED | PU MAX | PU LASTRO | VALOR PAR | TAXA MIN | TAXA MED | TAXA MAX | OPER COM CORRETAGEM | QUANT NEG COM CORRETAGEM |
| :------:|:------:|:------:|:---------:|:---------:|:--------:|:-------:|:--------:|:-------------------:|:------------------------:|

**Trusted:**
| DATA MOV | SIGLA | CODIGO ISIN | EMISSAO | VENCIMENTO | PU MIN | PU MED | PU MAX | PU LASTRO | VALOR PAR | TAXA MIN | TAXA MED | TAXA MAX |
|:--------:|:-----:|:-----------:|:-------:|:----------:|:------:|:------:|:------:|:---------:|:---------:|:--------:|:--------:|:--------:|

J√° aqui podemos podemos fazer a separa√ß√£o por fato e dimens√£o e construir os modelos de dom√≠nio

```python models.py
@dataclass(frozen=True)
class DemabQuoteDaily:
    trade_date: date
    codigo_isin: str

    pu_min: Optional[float]
    pu_med: Optional[float]
    pu_max: Optional[float]
    pu_lastro: Optional[float] 
    valor_par : Optional[float]

    taxa_min: Optional[float]
    taxa_med: Optional[float]
    taxa_max: Optional[float]

    ref_month: str
    raw_zip_file: str
    raw_zip_hash: str
    inner_file: str
    record_hash: str
    ingestion_ts_utc: str 


@dataclass(frozen=True)
class DemabInstrument:
    codigo_isin: str
    sigla: str
    emissao_date: date
    vencimento_date: date
    source: str = "BCB_DEMAB" # Por padr√£o por enquanto
```
onde record_hash ser√° montada por : isin | trade_date | pu_med | taxa_med


In [61]:
import pandas as pd
df = pd.read_parquet(r"C:\Users\Dell\OneDrive\Documentos\GitHub\ML-ETTJ26\data\02_trusted\bcb\demab\quotes_daily\2008-08.parquet")
df.info()

<class 'pandas.DataFrame'>
RangeIndex: 746 entries, 0 to 745
Data columns (total 15 columns):
 #   Column            Non-Null Count  Dtype  
---  ------            --------------  -----  
 0   trade_date        746 non-null    object 
 1   isin              746 non-null    str    
 2   pu_min            658 non-null    float64
 3   pu_med            658 non-null    float64
 4   pu_max            658 non-null    float64
 5   pu_lastro         746 non-null    float64
 6   valor_par         746 non-null    float64
 7   taxa_min          425 non-null    float64
 8   taxa_med          425 non-null    float64
 9   taxa_max          425 non-null    float64
 10  raw_zip_file      746 non-null    str    
 11  raw_zip_hash      746 non-null    str    
 12  inner_file        746 non-null    str    
 13  record_hash       746 non-null    str    
 14  ingestion_ts_utc  746 non-null    str    
dtypes: float64(8), object(1), str(6)
memory usage: 228.2+ KB


In [22]:
df[df["pu_med"].isna() & df["taxa_med"].isna()]["trade_date"].unique()

array([datetime.date(2008, 8, 1), datetime.date(2008, 8, 4),
       datetime.date(2008, 8, 5), datetime.date(2008, 8, 6),
       datetime.date(2008, 8, 7), datetime.date(2008, 8, 8),
       datetime.date(2008, 8, 11), datetime.date(2008, 8, 12),
       datetime.date(2008, 8, 13), datetime.date(2008, 8, 14),
       datetime.date(2008, 8, 15), datetime.date(2008, 8, 18),
       datetime.date(2008, 8, 19), datetime.date(2008, 8, 20),
       datetime.date(2008, 8, 21), datetime.date(2008, 8, 22),
       datetime.date(2008, 8, 25), datetime.date(2008, 8, 26),
       datetime.date(2008, 8, 27), datetime.date(2008, 8, 28),
       datetime.date(2008, 8, 29)], dtype=object)

---------------------------

## *B3 Price Report*
Assim como no DEMAB os arquvios B3 s√£o zipados, por√©m agora cada arquivo representa a extra√ß√£o de um dia. Price Report especificamente cont√©m o relat√≥rio completo detalhado por dia do preg√£o. A decis√£o de usar esse price report para capturar as negocia√ß√µes de futuro de DI ao inv√©s do simplificado √© por conta quantidade de dados hist√≥ricos dispon√≠veis, j√° que apenas recentemente come√ßaram a separar o mercado de a√ß√µes e derivativos em dois reports simplificados distintos.



In [23]:
from pathlib import Path
from collections import Counter
import zipfile
from datetime import datetime

zip_path = [Path(r"C:\Users\Dell\OneDrive\Documentos\GitHub\ML-ETTJ26\data\01_raw\b3\PriceReport\PR200102_20200102.zip"), 
            Path(r"C:\Users\Dell\OneDrive\Documentos\GitHub\ML-ETTJ26\data\01_raw\b3\PriceReport\PR220517_20220517.zip"),  
            Path(r"C:\Users\Dell\OneDrive\Documentos\GitHub\ML-ETTJ26\data\01_raw\b3\PriceReport\PR260204_20260204.zip"),
            ]

for zp in zip_path:
    with zipfile.ZipFile(zp, "r") as z:
        infos = z.infolist()
        print(f"ZIP: {zp.name}")
        exts = [Path(i.filename).suffix.lower() for i in z.infolist() if not i.is_dir()]
        print(Counter(exts))
        print(f"Arquivos: {len(infos)}")
        for i in infos[:50]:
            dt = datetime(*i.date_time)
            print(f"- {i.filename} | {i.file_size/1024:.1f} KB | {dt}")

ZIP: PR200102_20200102.zip
Counter({'.zip': 1})
Arquivos: 1
- PR200102.zip | 2417.4 KB | 2026-02-15 23:40:14
ZIP: PR220517_20220517.zip
Counter({'.zip': 1})
Arquivos: 1
- PR220517.zip | 4915.4 KB | 2026-02-16 00:00:10
ZIP: PR260204_20260204.zip
Counter({'.zip': 1})
Arquivos: 1
- PR260204.zip | 8040.9 KB | 2026-02-16 01:10:10


Muito interessante observar que h√° dentro de cada arquivo compactado um arquivo compactado ( .zip ) tamb√©m, isso mostra a nescessidade de fazer segunda abertura de zip para investigar. Olhando ent√£o mais afundo:

In [24]:
import zipfile
from pathlib import Path
from collections import Counter
from datetime import datetime
from io import BytesIO

for zp in zip_path:
    with zipfile.ZipFile(zp, "r") as z:
        infos = z.infolist()
        print(f"\nZIP EXTERNO: {zp.name}")
        
        exts = [Path(i.filename).suffix.lower() 
                for i in infos if not i.is_dir()]
        print("Extens√µes:", Counter(exts))
        print(f"Arquivos: {len(infos)}")

        for i in infos:
            dt = datetime(*i.date_time)
            print(f"- {i.filename} | {i.file_size/1024:.1f} KB | {dt}")

            # Se for um ZIP interno, abrir na mem√≥ria
            if i.filename.lower().endswith(".zip"):
                print(f"\n  >>> Abrindo ZIP interno: {i.filename}")

                with z.open(i) as inner_file:
                    inner_bytes = BytesIO(inner_file.read())

                    with zipfile.ZipFile(inner_bytes, "r") as inner_zip:
                        inner_infos = inner_zip.infolist()
                        inner_exts = [
                            Path(j.filename).suffix.lower()
                            for j in inner_infos if not j.is_dir()
                        ]

                        print("  Extens√µes internas:", Counter(inner_exts))
                        print(f"  Arquivos internos: {len(inner_infos)}")

                        for j in inner_infos[:20]:
                            dt2 = datetime(*j.date_time)
                            print(f"   - {j.filename} | {j.file_size/1024:.1f} KB | {dt2}")



ZIP EXTERNO: PR200102_20200102.zip
Extens√µes: Counter({'.zip': 1})
Arquivos: 1
- PR200102.zip | 2417.4 KB | 2026-02-15 23:40:14

  >>> Abrindo ZIP interno: PR200102.zip
  Extens√µes internas: Counter({'.xml': 3})
  Arquivos internos: 3
   - BVBG.086.01_BV000328202001020328000001830098585.xml | 35566.1 KB | 2020-01-02 18:30:50
   - BVBG.086.01_BV000328202001020328000001900552975.xml | 35650.7 KB | 2020-01-02 19:01:34
   - BVBG.086.01_BV000328202001020328000001952035761.xml | 35658.0 KB | 2020-01-02 19:52:44

ZIP EXTERNO: PR220517_20220517.zip
Extens√µes: Counter({'.zip': 1})
Arquivos: 1
- PR220517.zip | 4915.4 KB | 2026-02-16 00:00:10

  >>> Abrindo ZIP interno: PR220517.zip
  Extens√µes internas: Counter({'.xml': 3})
  Arquivos internos: 3
   - BVBG.086.01_BV000328202205170328000001809111380.xml | 66746.0 KB | 2022-05-17 18:10:08
   - BVBG.086.01_BV000328202205170328000001858502601.xml | 66760.4 KB | 2022-05-17 18:59:38
   - BVBG.086.01_BV000328202205170328000001922141813.xml | 66760

Veja ent√£o que consistentemente cada arquivo compactado possui um √∫nico arquivo .zip que por sua vez possue 3 arquivos .xml dentro de si. Pela quantidade de arquivos (Estou rodando desde 01/01/2020) √© inteligente come√ßar a montar uma estrat√©gia que n√£o carregue (ou carregue o m√≠nimo)desses arquivos na mem√≥ria e salve apenas o nescess√°rio. Antes disso precisamos garantir nesse processo 2 coisas: Os arquivos intradi√°rios possuem a mesma estrutura; Ao longo do tempo sempre temos o mesmo esquema de infroma√ß√µes.

Como essa infraestrutura apenas d√° suporte ao projeto principal considerarei que o arquivo mais recente dentre os 3 do dia possui a informa√ß√£o oficial de mercado sobre os produtos j√° consolidado e apenas criarei um traffic light system para compar√°-los e ascender um alerta no caso de informa√ß√µes muito discrepantes, n√£o havendo assim problemas de vers√µes ou vi√©ses que devem ser tratados na engenharia de dados. 

Analizando as primeiros 5000 bytes do arquivo vemos que:

- **1¬∞ ) A Raiz e o Schema ( XSD )**
    ```html
    <Document ... xsi:schemaLocation="urn:bvmf.052.01.xsd bvmf.052.01.xsd" xmlns="urn:bvmf.052.01.xsd">
    ```
    O documento segue o namespace "urn:bvmf.052.01.xsd" e existe um schemalocation dizendo qual xsd define esse envelope

- **2¬∞ ) Header do Arquivo ( Metadado )**
    ```html
    <BizFileHdr> ... <BizGrpIdr> ... <TtlNbOfMsg>15990</TtlNbOfMsg\> ...
    ```
    ID do grupo/lote ( BizGrpId ); N√∫mero total de mensagens ( TtlNbOfMsg ) √© por volta de ~15k; tipo do grupo (BizGrpTp = BVBG.086.01); e data de cria√ß√£o

- **3¬∞ ) Estrutura Envelope + Conte√∫do ( ISO20022 )**
    ```html
    <AppHdr xmlns="urn:iso:std:iso:20022:tech:xsd:head.001.001.01">
    <Document xmlns="urn:bvmf.217.01.xsd">
      <PricRpt> ...
    ```

    Com isso j√° conseguimos acelerar um search pr√°tico na captura de dados que ignore o envelope ( bvmf.052.01 ) e passe diretamente para o bloco de informa√ß√µes PricRpt do payload ( bvmf.217.01 )

In [25]:
zip_externo = zip_path[0]
with zipfile.ZipFile(zip_externo, "r") as z:
    inner_name = [n for n in z.namelist() if n.lower().endswith(".zip")][0]
    inner_bytes = BytesIO(z.read(inner_name))

with zipfile.ZipFile(inner_bytes, "r") as zi:
    xml_name = [n for n in zi.namelist() if n.lower().endswith(".xml")][0]
    with zi.open(xml_name) as f:
        print(f.read(5000).decode("utf-8", errors="replace"))


<?xml version="1.0" encoding="utf-8"?>
<Document xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="urn:bvmf.052.01.xsd bvmf.052.01.xsd" xmlns="urn:bvmf.052.01.xsd">
  <BizFileHdr>
    <Xchg>
      <BizGrpDesc>
        <Fr>
          <OrgId>
            <Id>
              <OrgId>
                <Othr>
                  <Id>BVMF</Id>
                  <Issr>40</Issr>
                  <SchmeNm>
                    <Prtry>39</Prtry>
                  </SchmeNm>
                </Othr>
              </OrgId>
            </Id>
          </OrgId>
        </Fr>
        <To>
          <OrgId>
            <Id>
              <OrgId>
                <Othr>
                  <Id>PUBLIC</Id>
                  <Issr>40</Issr>
                  <SchmeNm>
                    <Prtry>39</Prtry>
                  </SchmeNm>
                </Othr>
              </OrgId>
            </Id>
          </OrgId>
        </To>
        <BizGrp

Dando segmento ao plano, aqui investigamos as mudan√ßas estruturais dos arquivos intradi√°rio (Os 3 XMLs do ZIP) e interdi√°rio (entre as 3 datas esolhida). Montei em formato de fun√ß√£o para ter liberdade de escolher mais datas para analisar, a profundidade de an√°lise e PrcRprt em cada arquivo (j√° que o menor tem ~15k observa√ß√µes se n√£o tivermos profundidade nescess√°ria posso acabar afirmando que campos chaves deixaram de existir quando na verdade minha busca s√≥ n√£o alcan√ßou o produto, pois esse report cobre desde a√ß√µes at√© derivativos e eles n√£o possuem as mesmas propriedades).

In [26]:
import zipfile
from io import BytesIO
import xml.etree.ElementTree as ET
from collections import Counter
from pathlib import Path

# -----------------------------
# Helpers de XML / fingerprint
# -----------------------------

def strip_ns(tag: str) -> str:
    return tag.split("}", 1)[-1] if "}" in tag else tag

def fingerprint_price_report(xml_file, max_pricrpt=300, max_depth=5):
    """
    Fingerprint estrutural do XML (sem valores), focado em PricRpt:
      - namespaces vistos
      - qtde PricRpt escaneados (at√© max_pricrpt)
      - conjunto de paths relativos dentro de PricRpt (at√© max_depth)
      - presen√ßa/aus√™ncia de TradDtls (missing/present/empty)
    """
    ns_seen = set()
    pricrpt_count = 0
    paths = set()
    block_presence = Counter()

    ctx = ET.iterparse(xml_file, events=("start", "end"))

    stack = []
    in_pricrpt = False
    pricrpt_depth0 = None
    current_pricrpt_has_traddtls = False

    for event, elem in ctx:
        tag_full = elem.tag
        tag = strip_ns(tag_full)

        # registra namespaces (forma {ns}Tag)
        if "}" in tag_full:
            ns_seen.add(tag_full.split("}", 1)[0][1:])

        if event == "start":
            stack.append(tag)

            if tag == "PricRpt":
                in_pricrpt = True
                pricrpt_depth0 = len(stack)
                current_pricrpt_has_traddtls = False

        else:  # end
            if in_pricrpt:
                rel_stack = stack[pricrpt_depth0:]  # come√ßa dentro de PricRpt
                if 1 <= len(rel_stack) <= max_depth:
                    paths.add("/".join(rel_stack))

                if tag == "TradDtls":
                    # TradDtls pode existir mas vir vazio
                    if len(list(elem)) == 0 and not ((elem.text or "").strip()):
                        block_presence["TradDtls_empty"] += 1
                    else:
                        block_presence["TradDtls_present"] += 1
                    current_pricrpt_has_traddtls = True

                if tag == "PricRpt":
                    pricrpt_count += 1
                    if not current_pricrpt_has_traddtls:
                        block_presence["TradDtls_missing"] += 1

                    # libera mem√≥ria
                    elem.clear()
                    in_pricrpt = False
                    pricrpt_depth0 = None

                    if pricrpt_count >= max_pricrpt:
                        break

            stack.pop()

    return {
        "namespaces": ns_seen,
        "paths": paths,
        "block_presence": dict(block_presence),
        "pricrpt_count_scanned": pricrpt_count,
    }

def diff_sets(a: set, b: set):
    return {
        "only_in_a": sorted(a - b),
        "only_in_b": sorted(b - a),
        "common": len(a & b),
        "size_a": len(a),
        "size_b": len(b),
    }

def compare_fps(fp_items):
    """
    fp_items = [(label, fp_dict), ...]
    Compara todos os pares e retorna um dict com diffs.
    """
    out = {}
    for i in range(len(fp_items)):
        for j in range(i + 1, len(fp_items)):
            li, fpi = fp_items[i]
            lj, fpj = fp_items[j]
            out[f"{li} vs {lj}"] = {
                "namespaces": diff_sets(set(fpi["namespaces"]), set(fpj["namespaces"])),
                "paths": diff_sets(set(fpi["paths"]), set(fpj["paths"])),
                "block_presence_left": fpi["block_presence"],
                "block_presence_right": fpj["block_presence"],
                "pricrpt_scanned_left": fpi["pricrpt_count_scanned"],
                "pricrpt_scanned_right": fpj["pricrpt_count_scanned"],
            }
    return out

def merge_day_fps(fp_items_for_day):
    """
    Une os fingerprints dos 3 XMLs de um dia:
      namespaces = uni√£o
      paths = uni√£o
      block_presence = soma
    """
    merged_ns = set()
    merged_paths = set()
    merged_blocks = Counter()
    scanned_total = 0

    for label, fp in fp_items_for_day:
        merged_ns |= set(fp["namespaces"])
        merged_paths |= set(fp["paths"])
        merged_blocks.update(fp["block_presence"])
        scanned_total += fp["pricrpt_count_scanned"]

    return {
        "namespaces": merged_ns,
        "paths": merged_paths,
        "block_presence": dict(merged_blocks),
        "pricrpt_count_scanned_sum": scanned_total,
        "files": [label for label, _ in fp_items_for_day],
    }

# ------------------------------------------
# Helpers para padr√£o de abertura de ZIP
# ------------------------------------------

def get_inner_zip_bytes(zip_externo_path: Path) -> BytesIO:
    """
    Replica padr√£o: abre ZIP externo e pega o primeiro ZIP interno.
    """
    with zipfile.ZipFile(zip_externo_path, "r") as z:
        inner_name = [n for n in z.namelist() if n.lower().endswith(".zip")][0]
        return BytesIO(z.read(inner_name))

def list_xml_names_from_inner(inner_bytes: BytesIO):
    inner_bytes.seek(0)
    with zipfile.ZipFile(inner_bytes, "r") as zi:
        return sorted([n for n in zi.namelist() if n.lower().endswith(".xml")])

def fingerprint_all_xmls_in_day(zip_externo_path: Path, max_pricrpt=300, max_depth=5):
    """
    PASSO 1 (intra-dia): gera fingerprint dos 3 XMLs dentro do zip interno.
    Retorna:
      - lista [(xml_name, fp), ...]
      - comparativo entre eles
      - fingerprint agregado do dia
    """
    inner_bytes = get_inner_zip_bytes(zip_externo_path)

    xml_names = list_xml_names_from_inner(inner_bytes)
    if len(xml_names) == 0:
        raise RuntimeError(f"Nenhum XML encontrado em {zip_externo_path.name}")

    fp_items = []
    inner_bytes.seek(0)
    with zipfile.ZipFile(inner_bytes, "r") as zi:
        for xml_name in xml_names:
            with zi.open(xml_name) as f:
                fp = fingerprint_price_report(f, max_pricrpt=max_pricrpt, max_depth=max_depth)
            fp_items.append((xml_name, fp))

    intra_report = compare_fps(fp_items)
    day_fp = merge_day_fps(fp_items)
    return fp_items, intra_report, day_fp

# -----------------------------
# Runner: 2 passos de compara√ß√£o
# -----------------------------

def run_two_step_comparison(zip_paths_for_dates, max_pricrpt=300, max_depth=5):
    """
    zip_paths_for_dates: lista de Path, ex:
      [zip_20200102, zip_20220517, zip_20260204]
    Faz:
      1) intra-dia: compara 3 XMLs dentro de cada data
      2) inter-datas: compara fingerprint agregado do dia entre datas
    """
    day_fps = {}  # date_label -> day_fp

    # PASSO 1: intra-dia
    for zp in zip_paths_for_dates:
        print("\n" + "="*80)
        print(f"DATA (ZIP EXTERNO): {zp.name}")

        fp_items, intra_report, day_fp = fingerprint_all_xmls_in_day(
            zp, max_pricrpt=max_pricrpt, max_depth=max_depth
        )
        day_fps[zp.name] = day_fp

        print(f"XMLs encontrados: {len(fp_items)} -> {[name for name, _ in fp_items]}")
        print(f"Namespaces (uni√£o): {len(day_fp['namespaces'])}")
        print(f"Paths (uni√£o): {len(day_fp['paths'])}")
        print(f"TradDtls stats: {day_fp['block_presence']}")

        print("\n--- Diferen√ßas INTRA-DIA (pares) ---")
        for pair, rep in intra_report.items():
            only_a = len(rep["paths"]["only_in_a"])
            only_b = len(rep["paths"]["only_in_b"])
            print(f"{pair}: paths only_in_left={only_a}, only_in_right={only_b} | common={rep['paths']['common']}")

    # PASSO 2: inter-datas
    print("\n" + "="*80)
    print("COMPARA√á√ÉO ENTRE DATAS (fingerprint agregado por dia)")
    names = list(day_fps.keys())
    for i in range(len(names)):
        for j in range(i+1, len(names)):
            di = names[i]
            dj = names[j]
            fpi = day_fps[di]
            fpj = day_fps[dj]
            d_paths = diff_sets(set(fpi["paths"]), set(fpj["paths"]))
            d_ns = diff_sets(set(fpi["namespaces"]), set(fpj["namespaces"]))

            print("\n" + "-"*80)
            print(f"{di}  VS  {dj}")
            print(f"Namespaces: only_in_{di}={len(d_ns['only_in_a'])}, only_in_{dj}={len(d_ns['only_in_b'])}, common={d_ns['common']}")
            print(f"Paths: only_in_{di}={len(d_paths['only_in_a'])}, only_in_{dj}={len(d_paths['only_in_b'])}, common={d_paths['common']}")

            # Se quiser ver exatamente o que mudou (cuidado: pode ser grande)
            print("Exemplos only_in_a:", d_paths["only_in_a"][:30])
            print("Exemplos only_in_b:", d_paths["only_in_b"][:30])

    return day_fps

day_fps = run_two_step_comparison(zip_path, max_pricrpt=5000, max_depth=1000)



DATA (ZIP EXTERNO): PR200102_20200102.zip
XMLs encontrados: 3 -> ['BVBG.086.01_BV000328202001020328000001830098585.xml', 'BVBG.086.01_BV000328202001020328000001900552975.xml', 'BVBG.086.01_BV000328202001020328000001952035761.xml']
Namespaces (uni√£o): 3
Paths (uni√£o): 43
TradDtls stats: {'TradDtls_empty': 11473, 'TradDtls_present': 3527}

--- Diferen√ßas INTRA-DIA (pares) ---
BVBG.086.01_BV000328202001020328000001830098585.xml vs BVBG.086.01_BV000328202001020328000001900552975.xml: paths only_in_left=0, only_in_right=0 | common=43
BVBG.086.01_BV000328202001020328000001830098585.xml vs BVBG.086.01_BV000328202001020328000001952035761.xml: paths only_in_left=0, only_in_right=0 | common=43
BVBG.086.01_BV000328202001020328000001900552975.xml vs BVBG.086.01_BV000328202001020328000001952035761.xml: paths only_in_left=0, only_in_right=0 | common=43

DATA (ZIP EXTERNO): PR220517_20220517.zip
XMLs encontrados: 3 -> ['BVBG.086.01_BV000328202205170328000001809111380.xml', 'BVBG.086.01_BV00032820

Como podemos observar, ao analisar 5k observa√ß√µes de cada arquivo (Lembrando, se o XML de 2020 possui ~15k observa√ß√µes isso cobre cerca de 30% das observa√ß√µs) j√° podemos tirar conclus√µes importantes:

- Os arquivos intradi√°rios compartilham os mesmos campos a cada observa√ß√£o, como podemos ver todos os 'only_in_left' e 'only_in_right' s√£o 0 evidenciando que 2 a 2 nenhum arquivo possui mais campos descritivos que outro no mesmo dia;
- A partir de algum momento do tempo 4 campos de informa√ß√£o foram adicionados, j√° que a quantidade de campos comuns na primeira data s√£o 43 e nas outras duas 47;
- Os campos que passaram a ser medidos posteriormente s√£o:

    ```python
     ['FinInstrmAttrbts/IntlNonRglrVol', 'FinInstrmAttrbts/NonRglrTraddCtrcts', 'FinInstrmAttrbts/NonRglrTxsQty', 'FinInstrmAttrbts/NtlNonRglrVol']
    ```
    que s√£o compara√ß√µes estat√≠sticas dos negociados em mercado regulado e n√£o regulado, portanto, n√£o impactando no projeto.

Com uma busca superficial podemos ver tamb√©m se para cada um dos arquivos XML $(3\times 3 =9)$ conseguimos os mesmos tickers de DI1

In [27]:
from collections import Counter

def strip_ns(tag: str) -> str:
    return tag.split("}", 1)[-1] if "}" in tag else tag

def iter_di1_tickers(xml_file):
    """
    Retorna gerador de tickers que come√ßam com DI1.
    N√£o guarda o XML inteiro; vai limpando mem√≥ria.
    """
    ctx = ET.iterparse(xml_file, events=("end",))
    for event, elem in ctx:
        if strip_ns(elem.tag) != "PricRpt":
            continue

        # acha o TckrSymb dentro desse PricRpt
        tck = None
        for node in elem.iter():
            if strip_ns(node.tag) == "TckrSymb":
                tck = (node.text or "").strip()
                break

        if tck and tck.startswith("DI1"):
            yield tck

        elem.clear()
        
for zp in zip_path:
    inner_bytes = get_inner_zip_bytes(zp)
    xml_names = list_xml_names_from_inner(inner_bytes)
    for xml_name in xml_names:
        with zipfile.ZipFile(inner_bytes, "r") as zi:
            with zi.open(xml_name) as f:
                ticks = list(iter_di1_tickers(f))

        print("qtde DI1:", len(ticks))
        print("exemplos:", ticks[:10])
        print("distintos:", len(set(ticks)))


qtde DI1: 37
exemplos: ['DI1G20', 'DI1U20', 'DI1N23', 'DI1J20', 'DI1F25', 'DI1X20', 'DI1V24', 'DI1J24', 'DI1F21', 'DI1F31']
distintos: 37
qtde DI1: 37
exemplos: ['DI1G20', 'DI1U20', 'DI1N23', 'DI1J20', 'DI1F25', 'DI1X20', 'DI1V24', 'DI1J24', 'DI1F21', 'DI1F31']
distintos: 37
qtde DI1: 37
exemplos: ['DI1G20', 'DI1U20', 'DI1N23', 'DI1J20', 'DI1F25', 'DI1X20', 'DI1V24', 'DI1J24', 'DI1F21', 'DI1F31']
distintos: 37
qtde DI1: 39
exemplos: ['DI1Q22', 'DI1N23', 'DI1M22', 'DI1F25', 'DI1F35', 'DI1V24', 'DI1J24', 'DI1F31', 'DI1N26', 'DI1F29']
distintos: 39
qtde DI1: 39
exemplos: ['DI1Q22', 'DI1N23', 'DI1M22', 'DI1F25', 'DI1F35', 'DI1V24', 'DI1J24', 'DI1F31', 'DI1N26', 'DI1F29']
distintos: 39
qtde DI1: 39
exemplos: ['DI1Q22', 'DI1N23', 'DI1M22', 'DI1F25', 'DI1F35', 'DI1V24', 'DI1J24', 'DI1F31', 'DI1N26', 'DI1F29']
distintos: 39
qtde DI1: 42
exemplos: ['DI1N26', 'DI1N29', 'DI1F34', 'DI1V27', 'DI1N31', 'DI1N28', 'DI1F27', 'DI1F37', 'DI1V26', 'DI1N30']
distintos: 42
qtde DI1: 42
exemplos: ['DI1N26', 

3 a 3 podemos ent√£o afirmar que compartilham a mesma quantidade de observa√ß√µes √∫nicas e inclusive as discriminam na mesma ordem ( pois as 10 √∫ltimas observa√ß√µes s√£o id√™nticas) fortalencendo a decis√£o de sempre olhar para o √∫ltimo arquivo postado da data ao inv√©s de desenvolver todo um processo de controle e uni√£o das 3 bases. A partir da√≠ j√° podemos come√ßar a identificar quais os campos de interesse que ser√£o salvos no arquivo final da trusted.

In [60]:
import xml.etree.ElementTree as ET

def strip_ns(tag: str) -> str:
    return tag.split("}", 1)[-1] if "}" in tag else tag

def _find_first_text(elem, tag_name: str) -> str | None:
    """Acha o primeiro n√≥ com tag=tag_name dentro de elem e retorna seu texto."""
    for n in elem.iter():
        if strip_ns(n.tag) == tag_name:
            txt = (n.text or "").strip()
            return txt or None
    return None

def _flatten_paths(elem) -> dict[str, str]:
    """
    Gera um dict { "A/B/C": "valor" } para todas as folhas com texto.
    (Sem precisar conhecer nomes de campos.)
    """
    out = {}

    def walk(node, prefix=""):
        tag = strip_ns(node.tag)
        path = f"{prefix}/{tag}" if prefix else tag

        # se tem texto "√∫til", guarda
        txt = (node.text or "").strip()
        if txt:
            out[path] = txt

        # continua descendo
        for ch in list(node):
            walk(ch, path)

    walk(elem, "")
    return out

def get_one_di1_full_pricrpt(xml_file) -> dict[str, str] | None:
    """
    Encontra o primeiro <PricRpt> cujo <TckrSymb> come√ßa com 'DI1'
    e retorna TODOS os campos dentro desse PricRpt como {path: value}.
    """
    ctx = ET.iterparse(xml_file, events=("end",))
    for _event, elem in ctx:
        if strip_ns(elem.tag) != "PricRpt":
            continue

        tck = _find_first_text(elem, "TckrSymb")
        if tck and tck.startswith("DI1"):
            data = _flatten_paths(elem)
            elem.clear()
            return data

        elem.clear()

    return None


zip_externo = zip_path[2]
with zipfile.ZipFile(zip_externo, "r") as z:
    inner_name = [n for n in z.namelist() if n.lower().endswith(".zip")][0]
    inner_bytes = BytesIO(z.read(inner_name))

with zipfile.ZipFile(inner_bytes, "r") as zi:
    xml_name = [n for n in zi.namelist() if n.lower().endswith(".xml")][0]

    with zi.open(xml_name) as f:
        rec = get_one_di1_full_pricrpt(f)

print(len(rec), "campos")
df = pd.DataFrame.from_dict(rec, orient='index').reset_index()
df

33 campos


Unnamed: 0,index,0
0,PricRpt/TradDt/Dt,2026-02-04
1,PricRpt/SctyId/TckrSymb,DI1N26
2,PricRpt/FinInstrmId/OthrId/Id,100000103726
3,PricRpt/FinInstrmId/OthrId/Tp/Prtry,8
4,PricRpt/FinInstrmId/PlcOfListg/MktIdrCd,BVMF
5,PricRpt/TradDtls/TradQty,3484
6,PricRpt/FinInstrmAttrbts/MktDataStrmId,E
7,PricRpt/FinInstrmAttrbts/NtlFinVol,29446304851.57
8,PricRpt/FinInstrmAttrbts/IntlFinVol,5623924225.36
9,PricRpt/FinInstrmAttrbts/OpnIntrst,4171817


Sabendo ent√£o agora o que cada n√≥ da √°rvore possui uma estrutura de campos fixo para contratos de DI Futuro podemos come√ßar a organizar uma estrat√©gia de informa√ß√µes que ser√£o armazenadas. 

```python
@dataclass(frozen=True)
class DI1QuotesDaily:
    # Chave Prim√°ria
    TradDt : datetime
    TckrSymb : str
    snapshot_ts_utc: datetime

    AdjstdQtTax : float #taxa
    AdjstdQt : float    #pu
    

    # Info para futuros projetos
    BestBidPric: Optional[float]
    BestAskPric: Optional[float]
    LastPric: Optional[float]
    TradAvrgPric: Optional[float]
    MinPric: Optional[float]
    MaxPric: Optional[float]

    TradQty: Optional[int]
    FinInstrmQty: Optional[int]
    OpnIntrst: Optional[int]

    # Auditoria & Governan√ßa
    lineage_id : str # FK
    ingestion_ts_utc: datetime

@dataclass(frozen=True)
class InstrumentMaster:
    # PK
    TckrSymb : str

    asset: str
    contract_month_code: str
    contract_year: int
    maturity_date: datetime

@dataclass(frozen=True)
class DataLineage:
    # PK
    lineage_id : str # outer_zip|inner_zip|xml_name|snapshot_ts_utc|hash_file

    outer_zip: str
    inner_zip: str
    xml_name: str
    snapshot_ts_utc : str
    hash_file: str
    ingestion_ts_utc : datetime
```

In [94]:
from ml_ettj26.domain.b3_PriceReport.zip_reader import NestedZipReader
from ml_ettj26.domain.b3_PriceReport.header_probe import parse_snapshot_ts_from_head

zip_externo = zip_path[2]
zip_externo
latest = None
reader = NestedZipReader(zip_externo)
latest = None
with reader.open_inner_zip() as zi:
    xmls = [n for n in zi.namelist() if n.lower().endswith(".xml")]

    for xml_name in xmls:
        with zi.open(xml_name) as f:
            head = f.read(16384)
            ts = parse_snapshot_ts_from_head(head)

            if ts and (latest is None or ts > latest):
                latest = ts
        print("New Latest snapshot:", latest)


New Latest snapshot: 2026-02-04 21:49:06+00:00
New Latest snapshot: 2026-02-04 22:13:52+00:00
New Latest snapshot: 2026-02-04 22:45:23+00:00


In [95]:
from ml_ettj26.domain.b3_PriceReport.parsing import pick_latest_xml
zip_externo = zip_path[2]
reader = NestedZipReader(zip_externo)

with reader.open_inner_zip() as zi:
    xml_names = [n for n in zi.namelist() if n.lower().endswith(".xml")]
    pick = pick_latest_xml(zi, xml_names, head_bytes=64_000)

print(pick.xml_name, pick.snapshot_dt, pick.method)


BVBG.086.01_BV000328202602040328000001945385434.xml 2026-02-04 22:45:23+00:00 header_ts


In [None]:
from ml_ettj26.domain.b3_PriceReport.zip_reader import NestedZipReader
from ml_ettj26.domain.b3_PriceReport.parsing import iter_di1_quotes
from ml_ettj26.domain.b3_PriceReport.parsing import pick_latest_xml
from datetime import datetime, timezone
import xml.etree.ElementTree as ET

outer_zip =zip_path[2]
reader = NestedZipReader(outer_zip)

with reader.open_inner_zip() as zi:
    xml_names = [n for n in zi.namelist() if n.endswith(".xml")]
    pick = pick_latest_xml(zi, xml_names)

    xml_name = pick.xml_name

    snapshot_ts = pick.snapshot_dt or datetime.now(timezone.utc)
    ingestion_ts = datetime.now(timezone.utc)
    lineage_id = "TEST_LINEAGE"

    # üîπ Testando iter_di1_quotes (recomendado)
    with zi.open(xml_name) as f:
        for q in iter_di1_quotes(
            f,
            snapshot_ts_utc=snapshot_ts,
            lineage_id=lineage_id,
            ingestion_ts_utc=ingestion_ts,
        ):
            print(q)
            break  # s√≥ primeiro

DI1QuotesDaily(TradDt=datetime.datetime(2026, 2, 4, 0, 0, tzinfo=datetime.timezone.utc), TckrSymb='DI1N26', snapshot_ts_utc=datetime.datetime(2026, 2, 4, 22, 45, 23, tzinfo=datetime.timezone.utc), AdjstdQtTax=14.336, AdjstdQt=94872.95, BestBidPric=14.33, BestAskPric=14.335, LastPric=14.335, TradAvrgPric=14.34, MinPric=14.33, MaxPric=14.36, TradQty=3484, FinInstrmQty=310381, OpnIntrst=4171817, lineage_id='TEST_LINEAGE', ingestion_ts_utc=datetime.datetime(2026, 2, 22, 23, 55, 56, 872928, tzinfo=datetime.timezone.utc))


In [112]:
import pandas as pd
# Criando um DataFrame de exemplo
df = pd.DataFrame({'col1': [1, 2, 3], 'col2': ['A', 'B', 'C']})
# Salvando o DataFrame no formato Parquet
#df.to_parquet(r'C:\Users\Dell\OneDrive\Documentos\GitHub\ML-ETTJ26\data\02_trusted\b3\di1_instrument_master.parquet', engine='pyarrow', compression='snappy')

import pandas as pd
from pathlib import Path

# Caminho
path = Path(r"C:\Users\Dell\OneDrive\Documentos\GitHub\ML-ETTJ26\data\02_trusted\b3")
path.mkdir(parents=True, exist_ok=True)

# Criar DataFrame vazio com schema correto
df = pd.DataFrame({
    "TckrSymb": pd.Series(dtype="string"),
    "asset": pd.Series(dtype="string"),
    "contract_month_code": pd.Series(dtype="string"),
    "contract_year": pd.Series(dtype="int64"),
    "maturity_date": pd.Series(dtype="datetime64[ns]"),
})

# Salvar parquet vazio
df.to_parquet(path / "di1_instrument_master.parquet", index=False)

print("Seed criado com sucesso.")


Seed criado com sucesso.


In [139]:
df = pd.read_parquet(r"C:\Users\Dell\OneDrive\Documentos\GitHub\ML-ETTJ26\data\02_trusted\b3\di1_instrument_master.parquet")
print(df.info())

<class 'pandas.DataFrame'>
RangeIndex: 44 entries, 0 to 43
Data columns (total 5 columns):
 #   Column               Non-Null Count  Dtype              
---  ------               --------------  -----              
 0   TckrSymb             44 non-null     str                
 1   asset                44 non-null     str                
 2   contract_month_code  44 non-null     str                
 3   contract_year        44 non-null     int64              
 4   maturity_date        44 non-null     datetime64[us, UTC]
dtypes: datetime64[us, UTC](1), int64(1), str(3)
memory usage: 2.3 KB
None


In [138]:
df = pd.read_parquet(r"C:\Users\Dell\OneDrive\Documentos\GitHub\ML-ETTJ26\data\02_trusted\b3\di1_lineage\2026-01.parquet")
print(df.info())

<class 'pandas.DataFrame'>
RangeIndex: 21 entries, 0 to 20
Data columns (total 7 columns):
 #   Column            Non-Null Count  Dtype              
---  ------            --------------  -----              
 0   lineage_id        21 non-null     str                
 1   outer_zip         21 non-null     str                
 2   inner_zip         21 non-null     str                
 3   xml_name          21 non-null     str                
 4   snapshot_ts_utc   21 non-null     str                
 5   hash_file         21 non-null     str                
 6   ingestion_ts_utc  21 non-null     datetime64[us, UTC]
dtypes: datetime64[us, UTC](1), str(6)
memory usage: 8.8 KB
None


In [137]:
df = pd.read_parquet(r"C:\Users\Dell\OneDrive\Documentos\GitHub\ML-ETTJ26\data\02_trusted\b3\di1_quotes_daily\2026-01.parquet")
print(df.info())

<class 'pandas.DataFrame'>
RangeIndex: 880 entries, 0 to 879
Data columns (total 16 columns):
 #   Column            Non-Null Count  Dtype              
---  ------            --------------  -----              
 0   TradDt            880 non-null    datetime64[us, UTC]
 1   TckrSymb          880 non-null    str                
 2   snapshot_ts_utc   880 non-null    datetime64[us, UTC]
 3   AdjstdQtTax       880 non-null    float64            
 4   AdjstdQt          880 non-null    float64            
 5   BestBidPric       705 non-null    float64            
 6   BestAskPric       702 non-null    float64            
 7   LastPric          820 non-null    float64            
 8   TradAvrgPric      820 non-null    float64            
 9   MinPric           820 non-null    float64            
 10  MaxPric           820 non-null    float64            
 11  TradQty           820 non-null    float64            
 12  FinInstrmQty      820 non-null    float64            
 13  OpnIntrst       

In [4]:
from pathlib import Path
import subprocess
import sys

PROJECT_ROOT = Path(r"C:\Users\Dell\OneDrive\Documentos\GitHub\ML-ETTJ26")
PIPELINE = "trusted_b3_di1"

def run_month(ym: str):
    params = f"b3_di1_range.start_month={ym},b3_di1_range.end_month={ym}"
    cmd = [sys.executable, "-m", "kedro", "run", "--pipeline", PIPELINE, "--params", params]

    r = subprocess.run(
        cmd,
        cwd=str(PROJECT_ROOT),
        capture_output=True,
        text=True,
        encoding="utf-8",     # <- for√ßa UTF-8
        errors="replace",     # <- troca bytes inv√°lidos por ÔøΩ ao inv√©s de quebrar
    )

    if r.returncode != 0:
        print("STDOUT:\n", r.stdout)
        print("STDERR:\n", r.stderr)
        raise RuntimeError(f"Kedro falhou para {ym} (code {r.returncode})")

    print(f"[OK] {ym}")

gatilho = True # <- cuidado: roda para todos os meses! 
if gatilho:
    for year in range(2021, 2022): # <- Range de anos (inclusivo in√≠cio, exclusivo fim)
        for month in range(1, 2): # <- Range de meses (inclusivo in√≠cio, exclusivo fim)
            ym = f"{year}-{month:02d}"
            run_month(ym)


[OK] 2021-01


In [2]:
#@'
import os, re, zipfile, io, xml.etree.ElementTree as ET
from pathlib import Path

root = Path(r"c:\Users\Dell\OneDrive\Documentos\GitHub\ML-ETTJ26")
folder = root / "data" / "01_raw" / "b3" / "PriceReport"
pat = re.compile(r"_(\d{8})\.zip$", re.I)

bad = []
for p in sorted(folder.glob("PR*_202101*.zip")):
    outer = p.name
    try:
        with zipfile.ZipFile(p, 'r') as zo:
            inner_names = [n for n in zo.namelist() if n.lower().endswith('.zip')]
            if not inner_names:
                continue
            with zo.open(inner_names[0]) as inner_bytes:
                with zipfile.ZipFile(io.BytesIO(inner_bytes.read()), 'r') as zi:
                    xmls = [n for n in zi.namelist() if n.lower().endswith('.xml')]
                    if not xmls:
                        continue
                    xml_name = sorted(xmls)[-1]
                    with zi.open(xml_name) as xf:
                        try:
                            for _ in ET.iterparse(xf, events=("end",)):
                                pass
                        except ET.ParseError as e:
                            bad.append((outer, inner_names[0], xml_name, str(e)))
    except Exception as e:
        bad.append((outer, '<outer>', '<outer>', f'{type(e).__name__}: {e}'))

if not bad:
    print('NO_PARSE_ERRORS')
else:
    for item in bad:
        print('BAD', *item, sep=' | ')
#'@ | uv run python -

BAD | PR210104_20210104.zip | PR210104.zip | BVBG.086.01_BV000328202101040328000002030310476.xml | mismatched tag: line 465115, column 14


In [1]:
#@'
import zipfile, io
from pathlib import Path

p = Path(r"c:\Users\Dell\OneDrive\Documentos\GitHub\ML-ETTJ26\data\01_raw\b3\PriceReport\PR210104_20210104.zip")
with zipfile.ZipFile(p, 'r') as zo:
    iname = [n for n in zo.namelist() if n.lower().endswith('.zip')][0]
    with zo.open(iname) as ib:
        with zipfile.ZipFile(io.BytesIO(ib.read()), 'r') as zi:
            xname = 'BVBG.086.01_BV000328202101040328000002030310476.xml'
            with zi.open(xname) as xf:
                target = 465115
                start = target - 2
                end = target + 2
                for i, raw in enumerate(xf, start=1):
                    if i < start:
                        continue
                    if i > end:
                        break
                    line = raw.decode('utf-8', errors='replace').rstrip('\n')
                    print(f"{i}: {line[:220]}")
#'@ | uv run python -

465113:               <MaxTradLmt Ccy="ZAR">999991</MaxTradLmt>
465114:               <MinTradLmt Ccy="ZAR">1</MinTradLmt>
465115:             </FinInstrmAttrbts>
465116:           </PricRpt>
465117:         </Document>


In [3]:
import zipfile, io
from pathlib import Path

p = Path(r"c:\Users\Dell\OneDrive\Documentos\GitHub\ML-ETTJ26\data\01_raw\b3\PriceReport\PR210104_20210104.zip")
with zipfile.ZipFile(p, 'r') as zo:
    iname = [n for n in zo.namelist() if n.lower().endswith('.zip')][0]
    with zo.open(iname) as ib:
        with zipfile.ZipFile(io.BytesIO(ib.read()), 'r') as zi:
            xname = 'BVBG.086.01_BV000328202101040328000002030310476.xml'
            with zi.open(xname) as xf:
                target = 465115
                start = target - 80
                end = target + 20
                for i, raw in enumerate(xf, start=1):
                    if i < start:
                        continue
                    if i > end:
                        break
                    line = raw.decode('utf-8', errors='replace').rstrip('\n')
                    print(f"{i}: {line}")


465035:                   <Prtry>8</Prtry>
465036:                 </Tp>
465037:               </OthrId>
465038:               <PlcOfListg>
465039:                 <MktIdrCd>BVMF</MktIdrCd>
465040:               </PlcOfListg>
465041:             </FinInstrmId>
465042:             <TradDtls />
465043:             <FinInstrmAttrbts>
465044:               <OpnIntrst>56000</OpnIntrst>
465045:             </FinInstrmAttrbts>
465046:           </PricRpt>
465047:         </Document>
465048:       </BizGrp>
465049:       <BizGrp>
465050:         <AppHdr xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="urn:iso:std:iso:20022:tech:xsd:head.001.001.01">
465051:           <BizMsgIdr>BV000328202101040328000002030260941</BizMsgIdr>
465052:           <MsgDefIdr>BVMF.217.01</MsgDefIdr>
465053:           <CreDt>2021-01-04T23:30:26Z</CreDt>
465054:           <Fr>
465055:             <OrgId>
465056:               <Id>
465057:                 <OrgId>

In [8]:
import pandas as pd
df = pd.read_parquet(r"C:\Users\Dell\OneDrive\Documentos\GitHub\ML-ETTJ26\data\02_trusted\b3\di1_quotes_daily\2021-01.parquet")
df["TradDt"].unique()

<DatetimeArray>
['2021-01-04 00:00:00+00:00', '2021-01-05 00:00:00+00:00',
 '2021-01-06 00:00:00+00:00', '2021-01-07 00:00:00+00:00',
 '2021-01-08 00:00:00+00:00', '2021-01-11 00:00:00+00:00',
 '2021-01-12 00:00:00+00:00', '2021-01-13 00:00:00+00:00',
 '2021-01-14 00:00:00+00:00', '2021-01-15 00:00:00+00:00',
 '2021-01-18 00:00:00+00:00', '2021-01-19 00:00:00+00:00',
 '2021-01-20 00:00:00+00:00', '2021-01-21 00:00:00+00:00',
 '2021-01-22 00:00:00+00:00', '2021-01-26 00:00:00+00:00',
 '2021-01-27 00:00:00+00:00', '2021-01-28 00:00:00+00:00',
 '2021-01-29 00:00:00+00:00']
Length: 19, dtype: datetime64[us, UTC]

--------------------------------------------------------------------------------------


## Calend√°rio : *ANBIMA*


In [120]:
import pandas as pd
holidays = pd.read_parquet(r'C:\Users\Dell\OneDrive\Documentos\GitHub\ML-ETTJ26\data\calendars\02_trusted\ref\anbima_holidays.parquet')
bu_index = pd.read_parquet(r'C:\Users\Dell\OneDrive\Documentos\GitHub\ML-ETTJ26\data\calendars\02_trusted\ref\calendar_bd_index.parquet')

In [121]:
print(holidays.info(), bu_index.info(), sep="\n\n")

<class 'pandas.DataFrame'>
RangeIndex: 1263 entries, 0 to 1262
Data columns (total 7 columns):
 #   Column            Non-Null Count  Dtype              
---  ------            --------------  -----              
 0   cal_id            1263 non-null   str                
 1   date              1263 non-null   datetime64[ms, UTC]
 2   holiday_name      1263 non-null   str                
 3   weekday           1263 non-null   int32              
 4   ingestion_ts_utc  1263 non-null   str                
 5   source_file_hash  1263 non-null   str                
 6   pipeline_run_id   1263 non-null   str                
dtypes: datetime64[ms, UTC](1), int32(1), str(5)
memory usage: 214.4 KB
<class 'pandas.DataFrame'>
RangeIndex: 36159 entries, 0 to 36158
Data columns (total 9 columns):
 #   Column            Non-Null Count  Dtype              
---  ------            --------------  -----              
 0   cal_id            36159 non-null  str                
 1   date              36159

In [117]:
bu_index.info()

<class 'pandas.DataFrame'>
RangeIndex: 36159 entries, 0 to 36158
Data columns (total 9 columns):
 #   Column            Non-Null Count  Dtype              
---  ------            --------------  -----              
 0   cal_id            36159 non-null  str                
 1   date              36159 non-null  datetime64[ms, UTC]
 2   weekday           36159 non-null  int32              
 3   is_business_day   36159 non-null  bool               
 4   bd_index          36159 non-null  int64              
 5   holiday_name      0 non-null      str                
 6   ingestion_ts_utc  36159 non-null  str                
 7   source_file_hash  36159 non-null  str                
 8   pipeline_run_id   36159 non-null  str                
dtypes: bool(1), datetime64[ms, UTC](1), int32(1), int64(1), str(5)
memory usage: 5.7 MB
