# E2E Example

This notebook demonstrates the data ingestion, feature engineering, and scoring pipeline for a fixed-income fund recommendation system.

This illustrates the steps in our pipeline and explains the recommendation method.


**Highlights:**

- Interpretable and flexible knowledge-based recommendation system.
- Config-driven pipeline from YAML


## Method

Our proposed recommendation system method is quite simple, it is a knowledge-based recommendation system that relies on CVM data to recommend fixed income funds on a periodic basis. It is designed to run in batch in a monthly basis but also allow exploration and backfill.

The high-level recommendation method is as follows:

1. **Fetches recent data from CVM**. It currently support CDA, but can easily expanded to other data sources, such as AMBIMA, and other CVM data, like daily information about quote values.

2. **Feature Engineering**: It has a built-in feature engine (quite cool by the way), which allow us to compute flexible feature definitions on parametrized entities and time columns. So, basically, we are currently applying the feature definitions on `(CNPJ_FUNDO_CLASSE, DENOM_SOCIAL)` as entity but we can replace it by any other entity we like.

The feature definitions are config-driven and are not tied to any specific backend. For example, we are currently using pandas to perform the transformations, but we could easily switch to pyspark, SQL or any other backend we like.

3. **Compute Scores**: On top of our features, we compute scores. Each score represent a criteria of interest, for example risk, diversification, etc. Right now, we are using single feature models based on z-score, but we could have more interesting heuristic logics or even ML models at this step. Something that I would like to test but I didn't have the time, is creating a estimated sharpe score, so take the quotas information, train an xgboost to predict the sharpe ration of a fund in the future.

4. **Ranking based on Customer Profile Weighting over the Scores**: This is the hearth of our recommendation system. Basically, I perform a weighted sum on top of the scores to compute a final score, which I use to rank the funds. In a sense, it is like a utility function that combines all criteria of interests, aka the scores, into a utility score that best suits a customer profile.

In a sense, we apply a **weighted sum** over all criteria, where each weight reflects the customer’s profile as the formula below:

```math
U_i = \sum_{k=1}^{K} w_k \, s_{i,k}
```

Where:

- **\(U_i\)** — final utility score for fund *i*  
- **\(s_{i,k}\)** — score *k* for fund *i*  
- **\(w_k\)** — weight of score *k* derived from customer profile  
- **\(K\)** — number of criteria  

---

```math
U_i =
\begin{bmatrix}
w_1 & w_2 & \cdots & w_K
\end{bmatrix}
\begin{bmatrix}
s_{i,1} \\
s_{i,2} \\
\vdots \\
s_{i,K}
\end{bmatrix}
```

---

And, we apply normalization:

```math
U_i = \sum_{k=1}^{K} 
\left( \frac{w_k}{\sum_{j=1}^{K} w_j} \right) s_{i,k}
```

```math
\text{RankedFunds} = \operatorname{argsort}\left( -U_i \right)
```

Where:

- \(U_i\) is the final utility score for fund *i*
- `argsort` returns the indices of funds sorted by the given value
- The negative sign ensures **descending order** (highest score first)

---

### Customer Profiles

The customer profiles weights, as it is, are hardcoded values. But we propose different ways to define these weights.

To address cold-start, we could have a questionnaire with Likert-scale questions, 1 to 5, to understand how important each criteria is to the customer, or even having a fuzzy logic heuristic to combine the questionnaire answers into the weights for each score.

After we have customer feedback on the recommendations, we could introduce this feedback into a re-ranker approach, where we adjust the rankings based on previous choices of our customer portfolio.

## Troubleshooting


**Prerequisites:** Python packages: `pandas`, `requests`, `pyyaml`. Optional: `pyarrow` or `fastparquet` for Parquet I/O.

> Note: For quick demos this notebook may use local CSV fallbacks; substitute `fetch_manifest(...)` to run the full end-to-end pipeline against remote sources.

In [66]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## Quick check

Run a quick sanity check to ensure the package is importable and that basic helpers (like `hello()`) work as expected. This is useful to confirm the development environment is set up correctly before running heavier pipeline steps.

In [67]:
from fif_recsys import hello

# This will print to the notebook output
hello()

## Configuration manifest (YAML)

The configuration dictionary (`config_d`) defines how data is fetched and how features and scores are computed.

- `fetch`: datasets to download. Each dataset includes `base_url`, `periods`, and `filename_template`.
- `feature`: registry of features to compute, including aggregation method and optional adjustments.
- `score`: scoring definitions (type, feature source, and adjustments like `invert`).
- `profile`: named profile weightings used to aggregate scores into a single ranking for each investor profile.

Edit these values to match your data sources and scoring preferences.

In [68]:
import yaml

config_d = yaml.safe_load("""
fetch:
    cda:
        base_url: "https://dados.cvm.gov.br/dados/FI/DOC/CDA/DADOS/"
        periods:
            - "202501"
            - "202502"
            - "202503"
            - "202504"
            - "202505"
            - "202506"
            - "202507"
            - "202508"
            - "202509"
            - "202510"
            - "202511"
            - "202512"
        filename_template: "cda_fi_{period}.zip"

    cotas:
        base_url: "https://dados.cvm.gov.br/dados/FI/DOC/INF_DIARIO/DADOS/"
        periods:
            - "202301"
            - "202302"
            - "202303"
            - "202304"
            - "202305"
            - "202306"
            - "202307"
            - "202308"
            - "202309"
            - "202310"
            - "202311"
            - "202312"
                          
            - "202401"
            - "202402"
            - "202403"
            - "202404"
            - "202405"
            - "202406"
            - "202407"
            - "202408"
            - "202409"
            - "202410"
            - "202411"
            - "202412"
                          
            - "202501"
            - "202502"
            - "202503"
            - "202504"
            - "202505"
            - "202506"
            - "202507"
            - "202508"
            - "202509"
            - "202510"
            - "202511"
            - "202512"
        filename_template: "inf_diario_fi_{period}.zip"
feature:
    group_keys:
        - CNPJ_FUNDO_CLASSE
        - DENOM_SOCIAL
        - reference_date
    feature_registry:
        cda:
            patrimonio_liq:
                description: "Maximum reported net asset value per fund-month."
                method: max
                args:
                    - VL_PATRIM_LIQ
                            
            log_aum:
                description: "Maximum reported net asset value per fund-month."
                method: max
                args:
                    - VL_PATRIM_LIQ
                adjustment:
                    - log

            total_posicao:
                description: "Sum of final market value of all positions in the period."
                method: sum
                args:
                    - VL_MERC_POS_FINAL

            n_ativos:
                description: "Number of unique assets in the fund portfolio."
                method: nunique
                args:
                    - CD_ATIVO

            n_emissores:
                description: "Number of unique issuers in the fund portfolio."
                method: nunique
                args:
                    - CPF_CNPJ_EMISSOR

            credito_share:
                description: "Weighted share of credit-linked assets in the portfolio."
                method: credito_share_feature_fn
                args:
                    - ["Debêntures", "Cédula de Crédito", "CRI", "CRA", "Notas Promissórias"]
                adjustment:
                    - clip

            related_party_share:
                description: "Weighted share of related-party issuers."
                method: related_party_share_feature_fn
                adjustment:
                    - clip

            issuer_hhi:
                description: "Herfindahl-Hirschman index based on issuer weights."
                method: hhi_feature_fn
                adjustment:
                    - clip
                    - coalesce
        cotas:
            
score:
    size_score:
        type: zscore
        description: >
            Measures the relative size of the fund based on its assets under
            management. Larger funds typically exhibit greater operational
            stability, better liquidity access, and lower idiosyncratic risk.
            Computed using the z-score of the log-transformed AUM (log_aum).
        args:
            feature: log_aum

    diversification_score:
        type: zscore
        description: >
            Evaluates how diversified the fund's portfolio is in terms of
            the number of unique assets held. Higher values indicate broader
            asset diversification, reducing exposure to security-specific risks.
        args:
            feature: n_ativos

    issuer_diversification_score:
        type: zscore
        description: >
            Measures diversification across issuers by counting how many distinct
            counterparties the fund is exposed to. Funds with exposures distributed
            across more issuers typically have lower concentration and reduced
            issuer-specific credit risk.
        args:
            feature: n_emissores

    credit_risk_score:
        type: zscore
        description: >
            Quantifies the fund's exposure to credit-linked instruments such as
            debentures, CRIs/CRAs, and promissory notes. A higher credit share
            typically increases sensitivity to credit events. The score is inverted
            so that higher credit exposure corresponds to a lower (worse) score.
        args:
            feature: credito_share
        adjustment:
            - invert

    governance_risk_score:
        type: zscore
        description: >
            Captures exposure to related-party transactions, which may increase
            governance risk due to potential conflicts of interest and reduced
            market discipline. The score is inverted, so funds with higher
            related-party share receive a lower (worse) score.
        args:
            feature: related_party_share
        adjustment:
            - invert

    concentration_risk_score:
        type: zscore
        description: >
            Measures portfolio concentration using the Herfindahl-Hirschman Index
            (HHI) computed over issuer exposure weights. Higher HHI values indicate
            more concentrated portfolios and elevated idiosyncratic and liquidity
            risks. Score is inverted so higher concentration yields a lower score.
        args:
            feature: issuer_hhi
        adjustment:
            - invert
profile:
  conservative:
    description: >
      Designed for risk-averse investors prioritizing capital preservation and stability.
      Emphasizes fund size, diversification, and issuer spread to minimize volatility,
      while keeping exposure to credit and governance risks tightly controlled.
    size_score: 0.25
    diversification_score: 0.20
    issuer_diversification_score: 0.20
    credit_risk_score: 0.15
    governance_risk_score: 0.10
    concentration_risk_score: 0.10

  balanced:
    description: >
      Suitable for investors seeking a middle ground between safety and return.
      Balances diversification and issuer exposure with moderate tolerance for credit
      and concentration risks, aiming for a stable but growth-oriented allocation.
    size_score: 0.20
    diversification_score: 0.15
    issuer_diversification_score: 0.15
    credit_risk_score: 0.20
    governance_risk_score: 0.15
    concentration_risk_score: 0.15

  institutional:
    description: >
      Targeted at large professional allocators who value scale and diversification
      but can tolerate more concentrated or complex positions. Prioritizes fund size
      and issuer spread while placing relatively lower weight on credit and governance constraints.
    size_score: 0.30
    diversification_score: 0.20
    issuer_diversification_score: 0.20
    credit_risk_score: 0.10
    governance_risk_score: 0.10
    concentration_risk_score: 0.10

""")

## Fetch datasets

Use `fetch_manifest` to download and assemble datasets defined in the manifest. The function returns a `dict` mapping dataset names to `pandas.DataFrame` objects and writes partitioned files to `output_dir/<dataset>/period=<period>/data.parquet` when a Parquet engine is available (a CSV fallback is used otherwise).

Example usage (below) demonstrates both the programmatic fetch and a temporary offline fallback for quick demos.

In [69]:
from pathlib import Path

from fif_recsys.commands.data import fetch_manifest


data_sources_d = fetch_manifest(config_d['fetch'], output_dir=Path("/tmp"))



  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(



  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(



  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(



  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(



  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(



  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(



  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(



  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(



  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(



  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(



  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(



  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(

  df = pd.read_csv(


## Compute features

Call `compute_all_features` (or `compute_all_features(...)` via the `FEATURE_ENGINE`) to aggregate fund-month features according to your `feature_registry`. The result is a DataFrame with one row per fund-month and computed features ready for scoring.

In [70]:
from fif_recsys.commands.feature import compute_all_features, FEATURE_ENGINE


feature_df = compute_all_features(data_sources_d, config_d, FEATURE_ENGINE)

feature_df.head()

  def build_feature_engine(feature_engine: Dict, group_keys: List[str], registry: Any):
  def build_feature_engine(feature_engine: Dict, group_keys: List[str], registry: Any):
  result = getattr(ufunc, method)(*inputs, **kwargs)
  def build_feature_engine(feature_engine: Dict, group_keys: List[str], registry: Any):
  def build_feature_engine(feature_engine: Dict, group_keys: List[str], registry: Any):
  def build_feature_engine(feature_engine: Dict, group_keys: List[str], registry: Any):


[33mSkipping dataset [0m[33m'cotas'[0m[33m: no features defined in registry.[0m


  


Unnamed: 0,CNPJ_FUNDO_CLASSE,DENOM_SOCIAL,reference_date,patrimonio_liq,log_aum,total_posicao,n_ativos,n_emissores,credito_share,related_party_share,issuer_hhi
0,06.323.688/0001-27,IT NOW PIBB IBRX-50 FUNDO DE ÍNDICE RESPONSABILIDADE LIMITADA,2026-01-23,996521100.0,20.719781,6826644000.0,58,1,0.0,0.125166,1.0
1,09.260.031/0001-56,FUNDO DE INVESTIMENTO EM QUOTAS DE FUNDO DE INVESTIMENTO EM DIREITOS CREDITÓRIOS NÃO PADRONIZADO SRM,2026-01-23,82364500.0,18.226665,503980600.0,0,8,0.0,0.479135,0.298536
2,10.292.322/0001-05,KONDOR KOBOLD FUNDO DE INVESTIMENTO EM COTAS DE FIDC - RESP LIMITADA,2026-01-23,538989300.0,20.105206,4547051000.0,0,4,0.0,0.999696,0.610606
3,10.406.511/0001-61,ISHARES IBOVESPA CLASSE DE ÍNDICE - RESPONSABILIDADE LIMITADA,2026-01-23,14990920000.0,23.43071,102854400000.0,103,9,0.0,0.013466,0.364377
4,10.406.600/0001-08,ISHARES BM&FBOVESPA SMALL CAP CLASSE DE ÍNDICE - RESPONSABILIDADE LIMITADA,2026-01-23,2112755000.0,21.471258,18136060000.0,131,11,0.0,0.035685,0.856891


## Compute scores

Convert features into normalized scores using `compute_scores_from_yaml`. The `score` section in the configuration defines score types (e.g., `zscore`) and optional adjustments (e.g., `invert`). The resulting DataFrame will contain the base features and the derived score columns.

In [75]:
from fif_recsys.commands.model import compute_scores_from_yaml

score_df = compute_scores_from_yaml(feature_df, config_d)

score_df[['CNPJ_FUNDO_CLASSE', 'DENOM_SOCIAL', *[c for c in score_df.columns if 'score' in c]]].head()


Unnamed: 0,CNPJ_FUNDO_CLASSE,DENOM_SOCIAL,size_score,diversification_score,issuer_diversification_score,credit_risk_score,governance_risk_score,concentration_risk_score
0,06.323.688/0001-27,IT NOW PIBB IBRX-50 FUNDO DE ÍNDICE RESPONSABILIDADE LIMITADA,1.280804,2.536111,-0.541774,0.070015,0.512272,-1.087377
1,09.260.031/0001-56,FUNDO DE INVESTIMENTO EM QUOTAS DE FUNDO DE INVESTIMENTO EM DIREITOS CREDITÓRIOS NÃO PADRONIZADO SRM,0.213781,-0.22343,-0.03113,0.070015,-0.363396,0.826859
2,10.292.322/0001-05,KONDOR KOBOLD FUNDO DE INVESTIMENTO EM COTAS DE FIDC - RESP LIMITADA,1.017773,-0.22343,-0.322927,0.070015,-1.651189,-0.024753
3,10.406.511/0001-61,ISHARES IBOVESPA CLASSE DE ÍNDICE - RESPONSABILIDADE LIMITADA,2.441048,4.677134,0.041819,0.070015,0.788599,0.647186
4,10.406.600/0001-08,ISHARES BM&FBOVESPA SMALL CAP CLASSE DE ÍNDICE - RESPONSABILIDADE LIMITADA,1.602427,6.009326,0.187718,0.070015,0.733635,-0.696846


## Compute profile rankings

Use `compute_profile_scores_from_yaml` (from `fif_recsys.commands.policy`) to aggregate weighted scores into a single profile score and ranking for each fund. Profiles are defined in the `profile` section of the configuration (e.g., `conservative`, `balanced`, `institutional`).

In [79]:
import pandas as pd
pd.set_option('display.max_colwidth', None) # or set to a large integer value (e.g., 500)


from fif_recsys.commands.policy import compute_profile_scores_from_yaml

ranking_df = compute_profile_scores_from_yaml(score_df.fillna(0), config_d)

ranking_df[['CNPJ_FUNDO_CLASSE', 'DENOM_SOCIAL', 'reference_date', *[c for c in ranking_df.columns if 'rank' in c]]].head()

Unnamed: 0,CNPJ_FUNDO_CLASSE,DENOM_SOCIAL,reference_date,rank_conservative,rank_balanced,rank_institutional
0,06.323.688/0001-27,IT NOW PIBB IBRX-50 FUNDO DE ÍNDICE RESPONSABILIDADE LIMITADA,2026-01-23,82,122,74
1,09.260.031/0001-56,FUNDO DE INVESTIMENTO EM QUOTAS DE FUNDO DE INVESTIMENTO EM DIREITOS CREDITÓRIOS NÃO PADRONIZADO SRM,2026-01-23,381,383,381
2,10.292.322/0001-05,KONDOR KOBOLD FUNDO DE INVESTIMENTO EM COTAS DE FIDC - RESP LIMITADA,2026-01-23,456,608,420
3,10.406.511/0001-61,ISHARES IBOVESPA CLASSE DE ÍNDICE - RESPONSABILIDADE LIMITADA,2026-01-23,7,7,7
4,10.406.600/0001-08,ISHARES BM&FBOVESPA SMALL CAP CLASSE DE ÍNDICE - RESPONSABILIDADE LIMITADA,2026-01-23,8,8,8


## Next steps & CLI

- Run the full pipeline from the command line using the Typer-based CLI:
  - `fif-recsys data fetch` to download and prepare datasets
  - `fif-recsys feature build` to compute and write feature tables
  - `fif-recsys model score` to compute scores

- Tips:
  - Install `pyarrow` for faster Parquet I/O when running on large datasets.
  - For reproducible fetches, consider passing a deterministic `reference_date` to `fetch_manifest`.

Feel free to update this notebook with real data paths and run the pipeline end-to-end.

## Inspecting pipeline outputs

If you ran the Docker pipeline and mounted an output directory (e.g., `/tmp/fif_data` on the host → `/data` in the container), the pipeline writes the final profile-scored table to `features_profile_scored.parquet` or `features_profile_scored.csv` in that directory. Use the cell below to load and preview the output; update the `output_path` if you used a different directory.

In [77]:
ranking_df[['CNPJ_FUNDO_CLASSE', 'DENOM_SOCIAL', 'reference_date', *[c for c in ranking_df.columns if 'rank' in c]]].sort_values(by='rank_conservative', ascending=True)[:5]

Unnamed: 0,CNPJ_FUNDO_CLASSE,DENOM_SOCIAL,reference_date,rank_conservative,rank_balanced,rank_institutional
219,40.155.573/0001-09,TREND ETF IBOVESPA CLASSE DE ÍNDICE - RESPONSABILIDADE LIMITADA,2026-01-23,1,1,1
133,32.203.211/0001-18,FUNDO DE INVESTIMENTO DE ÍNDICE - CLASSE DE INVESTIMENTO ETF BRADESCO IBOVESPA - RESP LIMITADA,2026-01-23,2,2,2
143,34.606.480/0001-50,BB ETF IBOVESPA FUNDO DE ÍNDICE RESPONSABILIDADE LIMITADA,2026-01-23,3,3,3
424,48.643.130/0001-79,FUNDO DE INVESTIMENTO DE ÍNDICE - CI B-INDEX MORNINGSTAR BRASIL PESOS IGUAIS - RESP LIMITADA,2026-01-23,4,4,4
730,57.848.980/0001-02,BB ETF ÍNDICE BOVESPA B3 BR+ FUNDO DE ÍNDICE RESPONSABILIDADE LIMITADA,2026-01-23,5,5,5


In [78]:
# # Load and preview the profile-scored table
# from pathlib import Path
# import pandas as pd

# pd.set_option('display.max_colwidth', None) # or set to a large integer value (e.g., 500)


# # Update this path to the directory you mounted into the container (host path: /tmp/fif_data)
# output_dir = Path("/tmp/fif_data")

# pj = output_dir / "features_profile_scored.parquet"
# pcsv = output_dir / "features_profile_scored.csv"

# if pj.exists():
#     df = pd.read_parquet(pj)
# elif pcsv.exists():
#     df = pd.read_csv(pcsv)
# else:
#     raise FileNotFoundError(f"No profile-scored output found at {pj} or {pcsv}. Make sure you mounted the output dir and ran the pipeline.")

# # Quick preview
# print("Path:", pj if pj.exists() else pcsv)
# print("Rows:", len(df))
# print("Columns:", list(df.columns))
# df
