-
Notifications
You must be signed in to change notification settings - Fork 1
Pipeline EN
Technical reference document. Every line of code that transforms a transaction passes through these stages, in this order.
FILE (CSV / XLSX)
β
βΌ
βββββββββββββββββββββ
β 1. LOADING β parse bytes, encoding, delimiter, header
ββββββββββ¬βββββββββββ
β
βΌ
ββββββββββββββββββββββββββ schema in DB?
β 2. SCHEMA DECISION ββββββββββββββββββββββββββββββββ
ββββββββββ¬ββββββββββββββββ β
β no (Flow 2) β yes (Flow 1)
βΌ β
ββββββββββββββββββββββββββ β
β 2b. DOCUMENT β LLM β DocumentSchema β
β CLASSIFICATION β β
β [RF-01] β β
ββββββββββ¬ββββββββββββββββ β
ββββββββββββββββββββββββββββ¬ββββββββββββββββββββ
β
βΌ
ββββββββββββββββββββββββββββ
β 3. NORMALISATION β dates, amounts, SHA-256 ID
β [RF-02] β transaction type
ββββββββββββββ¬ββββββββββββββ
β
βΌ
ββββββββββββββββββββββββββββ
β 4. DEDUP CHECK β skip already-imported txs
β β (ID calculated at step 3)
ββββββββββββββ¬ββββββββββββββ
β β no LLM call on already-known txs
βΌ
ββββββββββββββββββββββββββββ
β 5. DESCRIPTION CLEANING β LLM extracts counterparty
β [RF-02 pre-cat.] β (payer or payee)
ββββββββββββββ¬ββββββββββββββ
β
βΌ
ββββββββββββββββββββββββββββ
β 6. INTERNAL TRANSFER β amount+date matching
β DETECTION [RF-04] β or owner name
ββββββββββββββ¬ββββββββββββββ
β
βΌ
ββββββββββββββββββββββββββββ
β 7. CARD RECONCILIATION β credit card β debit
β [RF-03] β on current account
ββββββββββββββ¬ββββββββββββββ
β
βΌ
ββββββββββββββββββββββββββββ
β 8. CATEGORISATION β rules β LLM β fallback
β [RF-05] β
ββββββββββββββ¬ββββββββββββββ
β
βΌ
ββββββββββββββββββββββββββββ
β 9. DB PERSISTENCE β idempotent upsert
β [RF-06, RF-07] β
ββββββββββββββ¬ββββββββββββββ
β
βΌ
ββββββββββββββββββββββββββββ
β 10. MANUAL REVIEW β to_review=True β user
β + RULES [RF-08] β β re-apply rules
β β
β UI Pages: β
β β’ Review β
β β’ Bulk edits β
ββββββββββββββββββββββββββββ
Module: core/normalizer.py β load_raw_dataframe()
detect_encoding(raw_bytes)
ββ chardet β normalised alias (ascii β utf-8)
For XLSX / XLS:
detect_best_sheet(workbook)
ββ excludes sheets named summary/totale/riepilogo
ββ score = n_rows + (n_numeric_columns Γ 10)
pd.read_excel(sheet)
For CSV / text:
detect_delimiter(content)
ββ character frequency [, ; | TAB] β most frequent wins
detect_header_row(lines) β (skip_rows: int, certain: bool)
ββ first row with β₯ 2 non-numeric and non-empty fields
ββ certain=True β header found, used silently
ββ certain=False β no match, fallback to 0 (ambiguous β UI asks user)
pd.read_csv(sep=delimiter, skiprows=skip_rows)
After loading (both CSV and Excel), Phase 0 pre-processing is applied:
detect_and_strip_preheader_rows(df)
ββ counts non-null cells per row β computes median β threshold = median Γ 0.5
ββ contiguous rows at the top with density < threshold β removed (max 20 rows / 10%)
ββ first non-sparse row becomes the new column header
drop_low_variability_columns(df)
ββ for each column: nunique(col) / n_rows < 1.5% β metadata column
ββ candidate columns removed (never drops below 2 columns)
Output: cleaned DataFrame + PreprocessInfo(skipped_rows, dropped_columns)
Module: core/normalizer.py β compute_header_sha256(), load_raw_head()
To avoid running the LLM classifier on every import of the same file format, Spendify computes the SHA256 of the first min(30, N) raw rows (before any skip or pre-processing) and stores it alongside the confirmed schema.
compute_header_sha256(raw_bytes, filename, n=30)
ββ Excel: first min(30, N) rows of the best sheet β serialised with "|" between cells
ββ CSV: first min(30, N) raw text rows
ββ SHA256(content.encode()) β 64-character hex string
load_raw_head(raw_bytes, filename, n=10)
ββ loads N rows without skiprows, without preprocessing
ββ used by the schema review UI to show the raw structure of the file
Algorithm on re-import:
- Compute
header_sha256of the first min(30, N) raw rows - DB query:
SELECT * FROM document_schema WHERE header_sha256 = ? - If found β use saved schema (includes
skip_rows) β skip classifier and review UI - If not found β Flow 2 (LLM classification + mandatory review UI)
Why the first rows? Bank statement files typically contain static institutional header rows (bank name, account number, date range) that are identical across all monthly exports from the same institution. These rows are a reliable fingerprint of the format.
Skip rows detection β full flow
detect_skip_rows(raw_bytes, filename) β (N, certain)
ββ CSV: detect_header_row(lines) β (N, certain)
ββ Excel: detect_header_row_excel(bytes) β (N, certain)
ββ same CSV heuristic applied to cell values
At upload time (before the Elaborate button):
1. compute_header_sha256 β find_schema_by_header_sha256()
ββ HIT β skip_rows known from schema; no user input needed
ββ MISS β detect_skip_rows()
ββ certain=True β N used silently (any value)
ββ certain=False β UI shows "Rows to skip" number_input
(default=0, user can correct)
skip_rows_override β process_file accepts skip_rows_override: int | None (from the UI form). Always takes precedence over known_schema.skip_rows. load_raw_dataframe accepts the same parameter:
- CSV: replaces
detect_header_row() - Excel: passes
skiprows=Ntopd.read_exceland skipsdetect_and_strip_preheader_rows()
Module: core/orchestrator.py, core/classifier.py
_schema_is_usable(known_schema)
ββ requires: date_col AND (amount_col OR (debit_col AND credit_col))
ββ if valid β skip classification
classify_document(df_raw, llm_backend)
PHASE 0 β Python, deterministic
ββ Column synonyms (no LLM):
date_col β data, date, data operazione, buchungsdatum, β¦
amount_col β importo, amount, betrag, montant, β¦
debit/credit β dare/avere, addebiti/accrediti, uscite/entrate, β¦
description β descrizione, causale, memo, payee, β¦
PHASE 0.5 β Sign inspection
ββ If amount_col semantics "neutral":
reads actual data β if any value < 0 β invert_sign=False certain
PHASE 1 β LLM, ambiguous fields
input:
- column names
- first 20 rows (sensitive data redacted)
- Phase 0 results (as certain facts)
JSON output:
{
doc_type: bank_account | credit_card | debit_card | prepaid_card | savings | unknown
date_format: strptime pattern (e.g. %d/%m/%Y)
sign_convention: signed_single | debit_positive | credit_negative
invert_sign: true/false (cards: expenses typically positive in CSV)
internal_transfer_patterns: ["bonifico", "giroconto", β¦]
}
POST-LLM β Phase 0 overrides LLM
ββ merge: certain Phase 0 results overwrite the LLM
ββ safety: if doc_type = card β invert_sign=True forced
Output: DocumentSchema with column mapping and sign conventions
On the first import of an unknown file (header SHA256 not found in DB), the import always stops β regardless of the classifier's confidence β and shows the user a review form with:
-
Raw preview: first 10 rows of the file without preprocessing (via
load_raw_head()) - Schema fields: doc_type, account_label, amount column, date, sign, debits/credits, invert sign
- Parsed preview: first 8 transactions processed with the current schema β updates live on every change
-
"Confirm and import" button: saves the schema (with
header_sha256) and starts the import
From the second import of the same format, the header_sha256 is found in DB and the entire process is automatic (no LLM call, no UI).
Module: core/normalizer.py β _normalize_df_with_schema()
For each row of the DataFrame:
parse_date_safe(value, format)
ββ tries schema format β fallback to common IT/ISO/US formats
ββ None if it fails (row discarded)
apply_sign_convention(row, convention)
ββ signed_single: uses amount_col as-is
ββ debit_positive: credit β debit (both positive in CSV)
ββ credit_negative: credit as-is, β|debit|
parse_amount(value)
ββ "1.234,56" (EU) β 1234.56
ββ "1,234.56" (US) β 1234.56
ββ "1234,56" β 1234.56
normalize_description(text)
ββ NFC unicode + casefold + strip
compute_transaction_id(account_label, raw_date, raw_amount, raw_description)
ββ SHA-256[:24] on RAW values
ββ stable across normalisation versions
_infer_tx_type(amount, doc_type, description, internal_patterns)
ββ matches internal_patterns β internal_out (< 0) / internal_in (β₯ 0)
ββ credit card / debit card / prepaid card β card_tx
ββ otherwise: income (β₯ 0) / expense (< 0)
Intra-file dedup:
Rows with the same (account_label + date + amount + description)
β sum amounts, recompute hash
(avoids double counting if the same tx appears multiple times in the export)
Card balance row removal:
remove_card_balance_row(txs, epsilon)
ββ detects the row whose |amount| β Ξ£|other amounts|
ββ with owner_label β renames description (internal transfer detection captures it)
ββ without owner_label β removes the row
Output: list of dict transactions with all canonical fields, immutable raw_description
Module: db/repository.py β get_existing_tx_ids()
Transaction IDs are calculated at step 3 from raw values, so dedup happens before any LLM call: no tokens wasted on already-imported txs.
existing_ids = query DB WHERE id IN (all_ids_in_batch)
β filters already-present txs
β if all present β abort early (file already imported, zero LLM calls)
β continues only with new txs
Module: core/description_cleaner.py β clean_descriptions_batch()
Extracts the counterparty name from the bank's raw string.
Split by sign:
expenses (amount < 0) β PASS 1: extract RECIPIENT
income (amount β₯ 0) β PASS 2: extract SENDER
Privacy (mandatory before every LLM call):
redact_pii(description, sanitize_config)
ββ Owner names β plausible fictitious names (pool by language)
β IT: Carlo Brambilla, Marta Pellegrino, β¦
β EN: James Fletcher, Helen Norris, β¦
β DE: Klaus Hartmann, Monika Braun, β¦
β FR: Pierre Dumont, Claire Lebrun, β¦
ββ IBAN β <ACCOUNT_ID>
ββ PAN / card (13-19 digits) β <CARD_ID>
ββ Masked card (****0178) β <CARD_ID>
ββ Transaction codes (CAU, NDS, CRO, RIF, TRNβ¦) β <TX_CODE>
ββ Tax code β <FISCAL_ID>
LLM processes redacted description
restore_owner_placeholders(llm_result)
ββ maps fictitious names β real owner names back
What the LLM must remove:
- Payment type labels: POS, Bonifico, Virement, Lastschrift, SCT, wire transfer
- Beneficiary markers: Fv., F.V., Beg., BegΓΌnstigter, Pour, For the benefit of
- VOSTRA DISPOSIZIONE, Disposizione
- Amounts and currencies: "352,00 EUR", "9.798,76 EUR"
- Dates: "23.12.2025", "2025-12-29", "29/10.41"
- Card numbers, auth codes (CAU/NDS), references (RIF:/CRO:/INV/)
- ORD. tokens, country codes (ITA)(FRA)
- City names after the company name
- Duplicate phrases: "Expense reimbursement expense reimbursement" β "Expense reimbursement"
Bank-originated expenses (no external counterparty):
β label in the configured language:
IT: "Interessi bancari", "Commissioni bancarie"
EN: "Bank fees", "Bank interest"
FR: "Frais bancaires", "IntΓ©rΓͺts bancaires"
DE: "BankgebΓΌhren", "Bankzinsen"
Fallback: if LLM fails β keep original raw_description
Output: transaction["description"] updated; raw_description never modified
Module: core/normalizer.py β detect_internal_transfers()
PHASE 1 β Matching between different accounts
For every pair (i, j) with i.account_label β j.account_label:
amount_match = |amount_i + amount_j| β€ epsilon (0.01 β¬)
date_match = |date_i β date_j| β€ delta_days (5 days)
If both:
high_symmetry = |amount_i + amount_j| β€ epsilon_strict (0.005 β¬)
AND |date_i β date_j| β€ delta_days_strict (1 day)
Confidence:
HIGH β keyword "bonifico/giroconto/transfer/β¦" in description
MEDIUM β high_symmetry without keyword
If require_keyword_confirmation=True AND confidence=MEDIUM:
β marks transfer_pair_id but does NOT update tx_type (to_review)
Otherwise:
β updates tx_type: internal_out (outgoing) / internal_in (incoming)
PHASE 2 β Match by owner name (txs not yet paired)
For every tx without a pair:
If the description contains an owner name
(regex with all permutations of the name tokens):
β tx_type = internal_out / internal_in
β transfer_confidence = HIGH
(the owner is the counterparty: no pairing needed)
Key parameters:
| Parameter | Default | Meaning |
|---|---|---|
tolerance |
0.01 β¬ | amount epsilon |
tolerance_strict |
0.005 β¬ | strict epsilon |
settlement_days |
5 days | date window |
settlement_days_strict |
1 day | strict window |
Module: core/normalizer.py β find_card_settlement_matches()
Matches card_settlement debits (from the current account) to individual card_tx entries (from the card).
For every debit:
PHASE 1 β Time window
ββ card_tx in [debit_date β 45 days, debit_date + 7 days]
PHASE 2 β Sliding window (contiguous subsets)
For every contiguous subset [i..j]:
ββ verify gap between consecutive txs β€ max_gap_days (5 days)
ββ sum = Ξ£ |amount[i..j]|
ββ If |sum β debit_amount| β€ epsilon β MATCH β
PHASE 3 β Boundary subset sum (fallback)
ββ takes the k=10 txs before + k=10 txs after the debit date
ββ exhaustive search over all subsets (n β€ 20 β 2^20 β 1M, safe)
ββ If any subset sums to the amount β MATCH β
If MATCH found:
β ReconciliationLink {settlement_id, matched_ids, delta, method}
β matched txs: reconciled=True
Module: core/categorizer.py β categorize_batch()
Processes only expense, income, card_tx, unknown. Skips internal transfers and card_settlement.
For each transaction β 4-level cascade:
LEVEL 0 β User rules (CategoryRule, sorted by priority)
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
For each rule (in descending priority order):
CategoryRule.matches(description, doc_type):
ββ exact: description.casefold() == pattern.casefold()
ββ contains: pattern.casefold() IN description.casefold()
ββ regex: re.search(pattern, description.casefold())
If doc_type specified in the rule β must match
FIRST matching rule wins β
category, subcategory, confidence=HIGH, source=rule, to_review=False
LEVEL 1 β Static keyword rules (direction-aware)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Hardcoded patterns, separated by expense/income:
EXPENSES:
conad|coop|esselunga|lidl|carrefour|β¦ β Food / Grocery shopping
farmacia|pharma|β¦ β Health / Medicines
eni|shell|q8|tamoil|β¦ β Transport / Fuel
telepass|autostrad|β¦ β Transport / Parking and ZTL
trenitalia|italo|frecciarossa|β¦ β Transport / Public transport
enel|iren|a2a|hera|β¦ β Home / Electricity
netflix|spotify|amazon prime|β¦ β Leisure / Streaming
INCOME:
stipendio|salary|busta paga|β¦ β Employment / Salary
pensione|inps rendita|β¦ β Social benefits / Pension
β confidence=HIGH, source=rule, to_review=False
LEVEL 2 β ML model (stub)
ββββββββββββββββββββββββββ
β returns None (reserved for future development)
LEVEL 3 β LLM (two directional batches)
ββββββββββββββββββββββββββββββββββββββββ
Separate batches for expenses and income.
Privacy:
redact_pii(description) before sending to LLM
Payload for each tx:
{"amount": "β352.00", "description": "Notorious Cinemas"}
Expected response:
{
"results": [
{
"category": "Leisure and free time",
"subcategory": "Cinema and theatre",
"confidence": "high",
"rationale": "Cinema"
},
β¦
]
}
LLM response validation:
ββ valid category + subcategory in taxonomy?
ββ correct direction (expense for expenses, income for income)?
ββ If subcategory not found β look for parent category
ββ If category not found β first valid sub for that category
ββ If correction needed β confidence=low, to_review=True
Confidence levels:
HIGH β to_review=False
MEDIUM β to_review=False (above threshold 0.80)
LOW β to_review=True
LEVEL 4 β Fallback (everything fails)
ββββββββββββββββββββββββββββββββββββββ
expenses: category=Other, sub=Unclassified expenses
income: category=Other income, sub=Unclassified income
confidence=LOW, source=llm, to_review=True
Module: db/repository.py β persist_import_result()
Everything in an atomic transaction, every operation is idempotent:
create_import_batch(sha256, filename, flow_used, n_transactions)
ββ if sha256 already exists β return existing (file already imported)
upsert_document_schema(schema)
ββ if source_identifier exists β update; otherwise create
For each transaction:
upsert_transaction(tx)
ββ if tx.id exists β skip (final dedup)
ββ otherwise: INSERT with all fields
For each reconciliation:
create_reconciliation_link(settlement_id, detail_id, delta, method)
update tx: reconciled=True
For each internal transfer:
create_transfer_link(out_id, in_id, confidence, keyword_matched)
session.commit()
Page: ui/review_page.py, ui/rules_page.py
Auto-apply rules (on every Review page load):
apply_rules_to_review_transactions(session, user_rules)
ββ for each tx with to_review=True:
ββ first matching rule β
category, source=rule, to_review=False
"βΆοΈ Run all rules" button (Rules page):
apply_all_rules_to_all_transactions(session, user_rules)
ββ applies all rules to ALL transactions (not only to_review=True)
ββ rules in descending priority order, first match wins
ββ returns (n_matched, n_cleared_review)
ββ requires confirmation via checkbox before execution
"Reprocess with LLM" button (Review page):
_rerun_llm_on_review(engine)
ββ loads all txs with to_review=True
(excluding internal transfers and card_settlement)
ββ re-runs clean_descriptions_batch()
ββ re-runs categorize_batch()
(skips txs with category_source=manual or rule)
Manual correction:
update_transaction_category(tx_id, category, sub)
ββ category_source=manual, to_review=False
Rule creation:
create_category_rule(pattern, match_type, category, sub, priority)
ββ immediately propagates to all similar txs
Bulk description edit:
_apply_description_rule_bulk(engine, pattern, match_type, new_desc)
ββ updates description for all txs with matching raw_description
ββ re-categorises with LLM
Source (category_source) |
Meaning | to_review |
|---|---|---|
rule |
User rule or static keyword | False |
llm confidence HIGH/MEDIUM |
LLM above threshold | False |
llm confidence LOW |
LLM below threshold | True |
manual |
Manual user correction | False |
llm fallback (Other) |
Everything failed | True |
| Parameter | Default | Where to set |
|---|---|---|
llm_backend |
local_ollama |
Settings |
description_language |
it |
Settings |
confidence_threshold |
0.80 | ProcessingConfig |
tolerance (transfer amount) |
0.01 β¬ | ProcessingConfig |
settlement_days |
5 days | ProcessingConfig |
window_days (card reconciliation) |
45 days | ProcessingConfig |
require_keyword_confirmation |
True |
ProcessingConfig |
owner_names |
β | Settings |
batch_size (LLM) |
20 tx/call | categorize_batch() |