## Librerias

In [5]:
import pandas as pd
import os
import csv
import re
import unicodedata
from pathlib import Path
import numpy as np



## Directorios

In [6]:
# =======================================================
# Project structure and paths (portable)
# =======================================================

# If running inside a folder named "notebooks" (case-insensitive), use its parent as ROOT.
cwd = Path.cwd()
ROOT = cwd.parents[0] if cwd.name.lower() == "notebooks" else cwd

RAW_DIR       = ROOT / "data" / "raw"
STAGE_CSV_DIR = ROOT / "data" / "stage_csv"
PROCESSED_DIR = ROOT / "data" / "processed"
RULES_DIR     = ROOT / "rules"

combined_csv_path = PROCESSED_DIR / "combined.csv"
dictionary_path   = RULES_DIR / "categories_dictionary.csv"
final_output_path = PROCESSED_DIR / "categorized_expenses.csv"

for folder in [RAW_DIR, STAGE_CSV_DIR, PROCESSED_DIR, RULES_DIR]:
    folder.mkdir(parents=True, exist_ok=True)

print("✅ Project structure validated/created")




✅ Project structure validated/created


## Manipulacion de archivos y conexion a datos

### Funciones 

#### Funcion para extraer datos de los extractos en excel originales 


In [7]:
def process_excel(file_path, max_header_rows=5):
    """
    Process a bank Excel file and return a formatted DataFrame.
    It automatically detects the header row and normalizes column names.
    """

    # Helper: normalize column names (lowercase, no accents, no dots)
    def normalize_columns(cols):
        def strip_accents(s):
            return "".join(ch for ch in unicodedata.normalize("NFKD", s) if not unicodedata.combining(ch))
        out = []
        for c in cols:
            c = strip_accents(str(c)).strip().lower()
            c = c.replace(".", "").replace("  ", " ")
            out.append(c)
        return out

    # 1) Detect the header row within the first few rows
    preview = pd.read_excel(file_path, header=None, nrows=max_header_rows)
    keywords = {"fecha", "concepto", "importe", "divisa", "movimiento"}
    header_row = 0
    for i in range(len(preview)):
        cols = normalize_columns(preview.iloc[i].tolist())
        if any(k in cols for k in keywords):
            header_row = i
            break

    # 2) Read again with the detected header row
    df = pd.read_excel(file_path, header=header_row)
    df = df.dropna(how="all")  # drop completely empty rows

    # 3) Normalize column names
    df.columns = normalize_columns(df.columns)

    # 4) Map possible aliases to standard column names
    column_aliases = {
        "f_valor": ["f valor", "fecha valor"],
        "fecha": ["fecha", "f operacion"],
        "concepto": ["concepto", "detalle", "descripcion", "descripción"],
        "movimiento": ["movimiento", "tipo", "operacion"],
        "importe": ["importe", "monto", "cantidad"],
        "divisa": ["divisa", "moneda"],
        "disponible": ["disponible", "saldo"],
        "observaciones": ["observaciones", "notas", "comentarios"],
    }

    mapping = {}
    for std, aliases in column_aliases.items():
        for a in aliases:
            if a in df.columns:
                mapping[std] = a
                break

    if not mapping:
        raise ValueError(f"No standard columns recognized in {file_path}")

    df = df.rename(columns=mapping)

    # 5) Reorder columns if they exist
    ordered_cols = ["f_valor", "fecha", "concepto", "movimiento", "importe", "divisa", "disponible", "observaciones"]
    final_cols = [c for c in ordered_cols if c in df.columns] + [c for c in df.columns if c not in ordered_cols]

    return df[final_cols].reset_index(drop=True)


#### Funcion para converitir los df resultantes a CSV

In [8]:
def save_as_csv(df, csv_path):
    """
    Save a DataFrame as a CSV file.
    - Creates the parent folder if it does not exist.
    - Uses utf-8-sig encoding so Excel can open it with accents.
    """
    csv_path = Path(csv_path)
    csv_path.parent.mkdir(parents=True, exist_ok=True)
    df.to_csv(csv_path, index=False, encoding="utf-8-sig")


#### Funcion para extraer los documentos de excel del directorio original para guardarlos como CSV

In [9]:
def convert_excels(input_dir, output_dir, skip_existing=True):
    """
    Convert all .xlsx files in input_dir to CSVs in output_dir.
    - Skips temporary Excel files (~$...)
    - Creates output folder if it does not exist
    - If skip_existing=True, does not reprocess files that already have a CSV
    - Uses process_excel(...) and save_as_csv(...)
    """
    input_dir = Path(input_dir)
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)

    xlsx_files = sorted(p for p in input_dir.glob("*.xlsx") if not p.name.startswith("~$"))
    if not xlsx_files:
        print(f"No .xlsx files found in: {input_dir}")
        return []

    converted_paths = []
    for xlsx_path in xlsx_files:
        csv_path = output_dir / (xlsx_path.stem + ".csv")

        if skip_existing and csv_path.exists():
            print(f"↷ Skip (already exists): {csv_path.name}")
            converted_paths.append(csv_path)
            continue

        try:
            df = process_excel(xlsx_path)
            save_as_csv(df, csv_path)
            print(f"✔ {xlsx_path.name} → {csv_path.name} ({len(df)} rows)")
            converted_paths.append(csv_path)
        except Exception as e:
            print(f"✖ Error processing {xlsx_path.name}: {e}")

    return converted_paths


#### Funcion para leer todos los CSV del directorio

In [10]:
def read_csv_files(directory):
    """
    Read all CSV files in a directory and return them as a list of DataFrames.
    - Skips files that cannot be read
    - Prints status messages
    """
    directory = Path(directory)
    csv_files = sorted(directory.glob("*.csv"))

    if not csv_files:
        print(f"No CSV files found in: {directory}")
        return []

    dataframes = []
    for csv_path in csv_files:
        try:
            df = pd.read_csv(csv_path)
            dataframes.append(df)
            print(f"✔ Loaded {csv_path.name} ({len(df)} rows)")
        except Exception as e:
            print(f"✖ Error reading {csv_path.name}: {e}")

    return dataframes


#### Funcion para combinar CSV en un solo dataframe

In [11]:
def combine_dataframes(dataframes, drop_duplicates=True):
    """
    Combine a list of DataFrames into a single DataFrame.
    - Returns an empty DataFrame if the list is empty
    - Optionally drops duplicate rows
    """
    if not dataframes:
        print("⚠ No DataFrames to combine. Returning empty DataFrame.")
        return pd.DataFrame()

    combined = pd.concat(dataframes, ignore_index=True)

    if drop_duplicates:
        before = len(combined)
        combined = combined.drop_duplicates()
        after = len(combined)
        if before != after:
            print(f"⚠ Removed {before - after} duplicate rows")

    return combined


#### Funciones  maestra para convertir CSV en un CSV final para trabajar

In [12]:
def process_combined_files(input_dir=STAGE_CSV_DIR, output_path=combined_csv_path):
    """
    Read all intermediate CSVs from input_dir, combine them into a single DataFrame,
    optionally de-duplicate on common key columns if available, and save to output_path.
    Returns the combined DataFrame (or None if nothing to combine).
    """
    # 1) Load all CSVs in the staging folder
    dataframes = read_csv_files(input_dir)
    if not dataframes:
        print(f"No CSV files found in: {input_dir}")
        return None

    # 2) Combine them (and drop exact duplicate rows)
    df_combined = combine_dataframes(dataframes, drop_duplicates=True)

    # 3) Optional: de-duplicate on typical bank keys if those columns exist
    key_cols = [c for c in ["fecha", "concepto", "importe", "divisa"] if c in df_combined.columns]
    if key_cols:
        before = len(df_combined)
        df_combined = df_combined.drop_duplicates(subset=key_cols)
        removed = before - len(df_combined)
        if removed > 0:
            print(f"⚠ Removed {removed} duplicates using keys: {key_cols}")

    # 4) Save combined CSV (UTF-8 with BOM so Excel reads accents correctly)
    save_as_csv(df_combined, output_path)
    print(f"✅ Combined file saved to: {output_path} ({len(df_combined)} rows)")

    return df_combined


### Proceso de extraccion de archivos

#### Conversion de .xlsx a CSV

In [13]:
# === Run pipeline: Excel -> CSV -> Combined ===

# 1) Convert Excel files to intermediate CSVs
converted_files = convert_excels(RAW_DIR, STAGE_CSV_DIR, skip_existing=True)

if converted_files:
    print(f"✅ Converted {len(converted_files)} Excel files to CSV")
else:
    print("⚠ No Excel files were converted")

# 2) Combine all intermediate CSVs into a single file
df_combined = process_combined_files(STAGE_CSV_DIR, combined_csv_path)

if df_combined is not None:
    print(f"✅ Combined CSV created with {len(df_combined)} rows")
else:
    print("⚠ No combined CSV created")


✔ Abril-Junio 2024.xlsx → Abril-Junio 2024.csv (334 rows)
✔ Agosto-Diciembre 2021.xlsx → Agosto-Diciembre 2021.csv (406 rows)
✔ Diciembre-Agosto 2025.xlsx → Diciembre-Agosto 2025.csv (506 rows)
✔ Diciembre-Febrero 2025.xlsx → Diciembre-Febrero 2025.csv (220 rows)
✔ Enero - Abril 2023.xlsx → Enero - Abril 2023.csv (334 rows)
✔ Enero-Abril 2022.xlsx → Enero-Abril 2022.csv (275 rows)
✔ Enero-Marzo 2024.xlsx → Enero-Marzo 2024.csv (294 rows)
✔ Enero-Mayo 2020.xlsx → Enero-Mayo 2020.csv (236 rows)
✔ Julio-Agosto 2024.xlsx → Julio-Agosto 2024.csv (353 rows)
✔ Junio-Diciembre 2019.xlsx → Junio-Diciembre 2019.csv (349 rows)
✔ Junio-Octubre 2020.xlsx → Junio-Octubre 2020.csv (327 rows)
✔ Marzo-Julio 2021.xlsx → Marzo-Julio 2021.csv (425 rows)
✔ Mayo-Agosto 2022.xlsx → Mayo-Agosto 2022.csv (332 rows)
✔ Mayo-Agosto 2023.xlsx → Mayo-Agosto 2023.csv (422 rows)
✔ Noviembre-Febrero 2021.xlsx → Noviembre-Febrero 2021.csv (253 rows)
✔ Septiembre-Diciembre 2022.xlsx → Septiembre-Diciembre 2022.csv (344 

#### Ejecutar

In [14]:
# Quick check of the combined file
df = pd.read_csv(combined_csv_path)

print("Shape:", df.shape)
print("Columns:", df.columns.tolist())
print(df.head(5))


Shape: (5901, 10)
Columns: ['fecha', 'concepto', 'movimiento', 'importe', 'divisa', 'disponible', 'observaciones', 'unnamed: 0', 'fvalor', 'divisa1']
        fecha                     concepto  \
0  28/06/2024      Corp alim guissona t561   
1  28/06/2024    Mercadona rambla del pobl   
2  28/06/2024          Adeudo de carrefour   
3  28/06/2024  Traspaso programa tu cuenta   
4  27/06/2024                      Caprabo   

                                       movimiento  importe divisa  disponible  \
0                                Pago con tarjeta   -15.40    EUR      854.25   
1                                Pago con tarjeta   -22.47    EUR      869.65   
2                      Adeudo nº 2024173000746549    -9.99    EUR      892.12   
3  Trp redondeo tarjeta          4940197125842350    -0.13    EUR      902.11   
4                                Pago con tarjeta    -2.99    EUR      902.24   

                                       observaciones  unnamed: 0      fvalor  \
0  494

## Limpieza Inicial de los datos

### Limpieza inicial y exploracion

In [15]:
# Align columns to your actual combined.csv structure

# 1) Drop junk columns safely
junk_cols = []
for c in df.columns:
    if c.lower() in {"divisa1"}:
        junk_cols.append(c)
    if c.lower().startswith("unnamed"):
        junk_cols.append(c)

if junk_cols:
    df = df.drop(columns=junk_cols)
    print(f"🧹 Dropped junk columns: {junk_cols}")

# 2) Keep 'fvalor' as the true date -> rename to 'fecha' and drop any old 'fecha'
has_fecha  = "fecha"  in df.columns
has_fvalor = "fvalor" in df.columns

if has_fvalor:
    # If there is already a 'fecha', remove it because 'fvalor' is the correct one
    if has_fecha:
        df = df.drop(columns=["fecha"])
    df = df.rename(columns={"fvalor": "fecha"})
    print("ℹ Using 'fvalor' as the correct date and renaming it to 'fecha'.")
elif not has_fecha:
    print("⚠ Neither 'fvalor' nor 'fecha' found. Please check your inputs.")

# 3) Parse 'fecha' to datetime (ES format)
if "fecha" in df.columns:
    df["fecha"] = pd.to_datetime(df["fecha"], errors="coerce", dayfirst=True)
else:
    print("⚠ No 'fecha' column present after adjustments. Check your inputs.")

# 4) Quick check
print("Shape:", df.shape)
print("Columns:", df.columns.tolist())
print(df.dtypes)
print(df.head(3))


🧹 Dropped junk columns: ['unnamed: 0', 'divisa1']
ℹ Using 'fvalor' as the correct date and renaming it to 'fecha'.
Shape: (5901, 7)
Columns: ['concepto', 'movimiento', 'importe', 'divisa', 'disponible', 'observaciones', 'fecha']
concepto                 object
movimiento               object
importe                 float64
divisa                   object
disponible              float64
observaciones            object
fecha            datetime64[ns]
dtype: object
                    concepto                  movimiento  importe divisa  \
0    Corp alim guissona t561            Pago con tarjeta   -15.40    EUR   
1  Mercadona rambla del pobl            Pago con tarjeta   -22.47    EUR   
2        Adeudo de carrefour  Adeudo nº 2024173000746549    -9.99    EUR   

   disponible                                      observaciones      fecha  
0      854.25  4940197125842350 CORP ALIM GUISSONA T561  BARC... 2024-06-28  
1      869.65  4940197125842350 MERCADONA RAMBLA DEL POBLBARC... 2024-06

In [16]:
def remove_substrings_from_column(df, column, substrings):
    """
    Remove any of the given substrings from a text column.
    - Builds ONE regex for all substrings (faster than looping)
    - Safe with NaN
    - Trims extra spaces after removal
    """
    if column not in df.columns or not substrings:
        return df

    pattern = "|".join(re.escape(s) for s in substrings)
    s = df[column].astype(str).fillna("")

    s = (
        s.str.replace(pattern, "", regex=True)
         .str.replace(r"\s{2,}", " ", regex=True)
         .str.strip()
    )
    df[column] = s
    return df


def drop_rows_if_column_contains(df, column, substrings, case_insensitive=True, word_boundaries=False):
    """
    Drop rows where 'column' contains ANY of the substrings.
    - Uses a compiled regex (no capturing groups) to avoid pandas UserWarning.
    """
    if column not in df.columns or not substrings:
        return df

    escaped = [re.escape(s) for s in substrings]
    if word_boundaries:
        parts = [rf"\b{e}\b" for e in escaped]
    else:
        parts = escaped

    pattern = "|".join(parts)              # no () -> no capturing groups
    flags = re.IGNORECASE if case_insensitive else 0
    regex = re.compile(pattern, flags)     # compile once

    mask_keep = ~df[column].astype(str).str.contains(regex, na=False)
    return df.loc[mask_keep].reset_index(drop=True)

    return df.loc[mask_keep].reset_index(drop=True)


def clean_specific_fields(df,
                          observations_numbers=None,
                          movement_numbers=None,
                          movements_to_remove=None,
                          concepts_to_remove=None):
    """
    Wrapper that applies your three steps in order:
    1) Remove numbers from 'observaciones'
    2) Remove numbers from 'movimiento'
    3) Drop rows by 'movimiento' and 'concepto'
    """
    observations_numbers = observations_numbers or []
    movement_numbers     = movement_numbers or []
    movements_to_remove  = movements_to_remove or []
    concepts_to_remove   = concepts_to_remove or []

    if "observaciones" in df.columns:
        df = remove_substrings_from_column(df, "observaciones", observations_numbers)

    if "movimiento" in df.columns:
        df = remove_substrings_from_column(df, "movimiento", movement_numbers)

    if "movimiento" in df.columns:
        df = drop_rows_if_column_contains(df, "movimiento", movements_to_remove, case_insensitive=True)

    if "concepto" in df.columns:
        df = drop_rows_if_column_contains(df, "concepto", concepts_to_remove, case_insensitive=True)

    return df


In [17]:
observations_numbers = ['4940197125842350', '2024173000746549', '4543390531579234', '5181760018112648']
movement_numbers     = ['2024173000746549']
movements_to_remove  = ['Trp redondeo tarjeta']
concepts_to_remove   = ['mes cuentas claras', 'Comision mensual cuentas claras']

df = clean_specific_fields(
    df,
    observations_numbers=observations_numbers,
    movement_numbers=movement_numbers,
    movements_to_remove=movements_to_remove,
    concepts_to_remove=concepts_to_remove
)

print("✅ Specific cleaning applied. Shape:", df.shape)


✅ Specific cleaning applied. Shape: (4907, 7)


In [18]:
# === Finalize cleaning and save cleaned.csv ===
# Ensure numeric types
for col in ["importe", "disponible"]:
    if col in df.columns:
        df[col] = pd.to_numeric(df[col], errors="coerce")

# Drop rows with invalid date or amount
before = len(df)
df = df[df["fecha"].notna()]
df = df[df["importe"].notna()]
after = len(df)
if after != before:
    print(f"⚠ Removed {before - after} rows with invalid date/amount")

# Normalize text columns (trim spaces)
for col in ["concepto", "movimiento", "observaciones", "divisa"]:
    if col in df.columns:
        df[col] = df[col].astype(str).str.strip()

# Derive date parts (optional, handy for Power BI)
df["year"]  = df["fecha"].dt.year
df["month"] = df["fecha"].dt.month
df["ym"]    = df["fecha"].dt.to_period("M").astype(str)  # e.g., '2024-06'
df["weekday"] = df["fecha"].dt.day_name(locale="es_ES") if hasattr(df["fecha"].dt, "day_name") else df["fecha"].dt.day_name()

# Save cleaned file
cleaned_csv_path = PROCESSED_DIR / "cleaned.csv"
df.to_csv(cleaned_csv_path, index=False, encoding="utf-8-sig")
print(f"✅ Cleaned file saved to: {cleaned_csv_path} ({len(df)} rows)")


✅ Cleaned file saved to: c:\Users\diego\Desktop\Proyectos GitHub\data\processed\cleaned.csv (4907 rows)


## Categorizacion

In [19]:
def strip_accents(s: str) -> str:
    return "".join(ch for ch in unicodedata.normalize("NFKD", str(s)) if not unicodedata.combining(ch))

def normalize_text_for_match(series):
    s = series.astype(str).fillna("")
    s = s.str.lower().apply(strip_accents)
    s = s.str.replace(r"\s+", " ", regex=True).str.strip()
    return s

# Build a single column to match against (concept + observations)
def build_match_text(df):
    df["concepto_norm"]      = normalize_text_for_match(df["concepto"]) if "concepto" in df.columns else ""
    df["observaciones_norm"] = normalize_text_for_match(df["observaciones"]) if "observaciones" in df.columns else ""
    df["match_text"] = (df.get("concepto_norm", "") + " " + df.get("observaciones_norm","")).str.strip()
    return df


In [20]:
df = build_match_text(df)

In [21]:
def load_category_dict(csv_path):
    """
    Load a two-column CSV mapping: Category, ConceptKeyword
    Accepts files with or without header.
    Returns a dict: {category: [keywords,...]}
    """
    # try with header first
    try:
        df_dic = pd.read_csv(csv_path, encoding="utf-8-sig")
        if df_dic.shape[1] < 2:
            raise ValueError("Dictionary file must have at least 2 columns")
        if not {"Categoria","Concepto"}.issubset(set(df_dic.columns)):
            # fallback: assume no header
            df_dic = pd.read_csv(csv_path, header=None, encoding="utf-8-sig")
            df_dic = df_dic.rename(columns={0:"Categoria", 1:"Concepto"})
    except Exception:
        # hard fallback
        df_dic = pd.read_csv(csv_path, header=None, encoding="utf-8-sig")
        df_dic = df_dic.rename(columns={0:"Categoria", 1:"Concepto"})

    # normalize Concepto for matching
    df_dic["Concepto"] = normalize_text_for_match(df_dic["Concepto"])
    cat_dict = df_dic.groupby("Categoria")["Concepto"].apply(list).to_dict()
    return cat_dict


In [22]:
cat_dict = load_category_dict(dictionary_path)
print("✅ Category dictionary loaded. Categories:", list(cat_dict.keys()))


✅ Category dictionary loaded. Categories: ['amazon', 'bizum', 'comida', 'comida_fuera', 'compras', 'deporte', 'impuestos', 'mascota', 'movimiento', 'ocio', 'otros', 'prestamo', 'salud', 'seguros', 'servicios', 'suscripcion', 'tecnologia', 'transporte', 'viajes', 'vivienda']


In [23]:
def categorize_with_dict(df, cat_dict, target_col="category", source_col="match_text", priorities=None):
    """
    Apply categories based on regex word matches over source_col.
    - cat_dict: {category: [keywords]}
    - priorities: optional dict {category: int}; lower = higher priority
    """
    if source_col not in df.columns:
        raise ValueError(f"{source_col} not in DataFrame")

    # prepare priority
    if priorities is None:
        priorities = {cat: 1000 for cat in cat_dict}  # default low priority
    # build regex per category
    compiled = {}
    for cat, words in cat_dict.items():
        parts = [rf"\b{re.escape(w)}\b" for w in words if str(w).strip() != ""]
        if not parts:
            continue
        regex = re.compile("|".join(parts), flags=re.IGNORECASE)
        compiled[cat] = regex

    # initialize
    df[target_col] = np.nan

    # assign categories by priority
    cats_sorted = sorted(compiled.keys(), key=lambda c: priorities.get(c, 1000))
    for cat in cats_sorted:
        rx = compiled[cat]
        mask = df[target_col].isna() & df[source_col].str.contains(rx, na=False)
        if mask.any():
            df.loc[mask, target_col] = cat

    return df


In [24]:
df = categorize_with_dict(df, cat_dict, target_col="category", source_col="match_text")
print("✅ Initial categorization done.")


  df.loc[mask, target_col] = cat


✅ Initial categorization done.


In [25]:
def apply_correction_rules(df, rules_map, source_col="match_text", target_col="category"):
    """
    Apply explicit mapping patterns: {pattern_string: category}
    - Matching is case-insensitive and accent-insensitive due to 'match_text'
    """
    compiled_rules = [(re.compile(re.escape(p), re.IGNORECASE), cat) for p, cat in rules_map.items()]

    for rx, cat in compiled_rules:
        mask = df[source_col].str.contains(rx, na=False)
        if mask.any():
            df.loc[mask, target_col] = cat
    return df

def force_income_for_positive_amounts(df, amount_col="importe", target_col="category", income_label="ingresos"):
    if amount_col in df.columns:
        df.loc[df[amount_col] > 0, target_col] = income_label
    return df


In [26]:
rules = {
    "pasaje turquia arianna": "viajes",
    "ahorro": "ahorro",
    "airbnb milan": "viajes",
    "alquiler pallars": "vivienda",
    "prohorta": "vivienda",
    "dueñas": "salud",
    "traspaso desde cuenta": "movimiento",
    "abo. por traspaso desde tarj.de": "Correction",
    "adeudo mensual de tarjeta": "Correction",
    "operacion financiada con tarjeta": "Correction",
    "0182-4383-99-0830116863": "Correction",
    "01828740 999": "Correction",
    "01820209 999": "Correction",
    "prestamo lu/ari": "Correction",
    "piso arianna": "Correction",
    "diego antonio mere caravelli": "Correction"
}

df = apply_correction_rules(df, rules, source_col="match_text", target_col="category")
df = force_income_for_positive_amounts(df, amount_col="importe", target_col="category", income_label="ingresos")
print("✅ Corrections applied and positive amounts forced to 'ingresos'.")


✅ Corrections applied and positive amounts forced to 'ingresos'.


In [27]:
uncat = df[df["category"].isna()].copy()
if uncat.empty:
    print("¡Genial! Todos los conceptos están categorizados.")
else:
    freq = uncat["concepto"].value_counts()
    print(f"Total transacciones sin categoría: {len(uncat)}")
    print(f"Total conceptos únicos sin categoría: {len(freq)}")
    print("\nTop 20 conceptos sin categoría:")
    print(freq.head(20))


Total transacciones sin categoría: 25
Total conceptos únicos sin categoría: 10

Top 20 conceptos sin categoría:
concepto
Nasae s.l.                  12
El maracucho s.l.            3
Boheme cafeteria.            2
Yelmo films s.l.             2
Cosi duci s.l.               1
Locanda del vulture s.l.     1
Distinta s.l.                1
Cool partners s.l.           1
Thekedar dhupsari s.l.       1
Sushi malasaãa,s.l.          1
Name: count, dtype: int64


In [28]:
def update_dictionary_interactive(cat_dict, concepts, number_to_category):
    """
    Interactive mapping: prompts the user to assign categories to unknown concepts.
    Press 'q' to quit.
    """
    for concept in concepts:
        print("Uncategorized concept:", concept)
        choice = input("Enter category number (or 'q' to quit): ").strip()
        if choice.lower() in {"q", "quit", "salir"}:
            print("➡️ Exiting manual categorization.")
            break
        if choice in number_to_category:
            cat = number_to_category[choice]
            cat_dict.setdefault(cat, []).append(concept)
        else:
            print("Invalid category number. Try again.")
    return cat_dict

# mapping numbers -> category names (as you had)
number_to_category = {
    '1': 'comida_fuera','2': 'suscripcion','3': 'vivienda','4': 'ocio','5': 'transporte',
    '6': 'impuestos','7': 'viajes','8': 'mascota','9': 'seguros','10': 'deporte',
    '11': 'prestamo','12': 'compras','13': 'servicios','14': 'salud','15': 'tecnologia',
    '16': 'comida','17': 'bizum','18': 'amazon','19': 'otros'
}

unknown_concepts = sorted(set(uncat["concepto"].tolist())) if not uncat.empty else []
print(f"✅ Found {len(unknown_concepts)} new concepts to categorize.")
if unknown_concepts:
    print("Updating dictionary with uncategorized concepts...")
    cat_dict = update_dictionary_interactive(cat_dict, unknown_concepts, number_to_category)
    # Rebuild normalized match text for the updated keywords
    # (ensure all dict keywords are normalized for matching)
    cat_dict = {k: [strip_accents(str(x)).lower().strip() for x in v] for k,v in cat_dict.items()}
    # Re-apply categorization
    df = categorize_with_dict(df, cat_dict, target_col="category", source_col="match_text")
    df = apply_correction_rules(df, rules, source_col="match_text", target_col="category")
    df = force_income_for_positive_amounts(df, amount_col="importe", target_col="category", income_label="ingresos")


✅ Found 10 new concepts to categorize.
Updating dictionary with uncategorized concepts...
Uncategorized concept: Boheme cafeteria.
➡️ Exiting manual categorization.


  df.loc[mask, target_col] = cat


In [29]:
def save_category_dict(cat_dict, csv_path):
    rows = []
    for cat, words in cat_dict.items():
        for w in words:
            rows.append({"Categoria": cat, "Concepto": w})
    out = pd.DataFrame(rows)
    out.to_csv(csv_path, index=False, encoding="utf-8-sig")

# save back
save_category_dict(cat_dict, dictionary_path)
print("✅ Category dictionary updated on disk.")


✅ Category dictionary updated on disk.


In [30]:
final_cols = [c for c in ["fecha","importe","category","concepto","observaciones"] if c in df.columns]
final_df = df[final_cols].copy()

final_output_path = PROCESSED_DIR / "categorized.csv"
save_as_csv(final_df, final_output_path)
print(f"✅ Categorized file saved to: {final_output_path} ({len(final_df)} rows)")


✅ Categorized file saved to: c:\Users\diego\Desktop\Proyectos GitHub\data\processed\categorized.csv (4907 rows)
