In [57]:
# setting up the project root on sys.path
import sys
from pathlib import Path

PROJECT_ROOT = Path.cwd().parent
sys.path.insert(0, str(PROJECT_ROOT))

print("Project root on sys.path:", PROJECT_ROOT)
print("Notebook cwd:", Path.cwd())
print("Root children:", [p.name for p in PROJECT_ROOT.iterdir() if p.is_dir()])


Project root on sys.path: /home/alonbenach/project/invoice-analysis
Notebook cwd: /home/alonbenach/project/invoice-analysis/notebooks
Root children: ['.git', 'outputs', 'data', 'src', 'config', 'outputs_large', 'notebooks', 'balagan']


ETL

In [58]:
# imports
from pathlib import Path
import pandas as pd
import yaml

from src.io_utils import list_csvs, read_csv, write_parquet, ensure_dir
from src.clean_utils import clean_non_products, normalize_columns, parse_timestamp, assign_slots, cast_basic_types, basic_checks
from src.viz_utils import save_bar, save_hist, save_box

from src.fc_map_utils import map_fc_products, normalize_text

%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [59]:
# 1 Paths
# IMPORTANT: use project-root-relative paths
DATA_DIR = PROJECT_ROOT / "data" / "invoices"
DATA_FILE = DATA_DIR / "invoices_09_2025.csv"
OUT_DIR  = PROJECT_ROOT / "outputs_large" / "audit"
PLOTS    = OUT_DIR / "plots"
CFG_SLOTS = PROJECT_ROOT / "config" / "slots.yaml"
CFG_FC_TH = PROJECT_ROOT / "config" / "fc_mapping_threshold.yaml"
PROCESSED = PROJECT_ROOT / "data" / "processed"


ensure_dir(OUT_DIR); ensure_dir(PLOTS)
pd.options.display.max_columns = 200


In [60]:
# 2 load files
if DATA_FILE.exists():
    try:
        df = pd.read_csv(DATA_FILE, sep=",", encoding="utf-8", low_memory=False)
    except UnicodeDecodeError:
        df = pd.read_csv(DATA_FILE, sep=",", encoding="cp1250", low_memory=False)
    raw_sources = [DATA_FILE.name]
else:
    # fallback to old behaviour if needed
    csvs = list_csvs(DATA_DIR)
    dfs = [read_csv(p) for p in csvs]
    df = pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()
    raw_sources = [p.name for p in csvs]

print(f"Loaded rows: {len(df):,} from: {raw_sources}")
pd.Series(raw_sources).to_csv(OUT_DIR/"_file_list.csv", index=False, header=["file"])


Loaded rows: 260,000 from: ['invoices_09_2025.csv']


In [61]:
# 3 structure check (pre-normalization)
drop_cols = ["Numer_paragonu","EAN","Rabat","Kasjer","Metoda_platnosci", "Stawka_VAT", "Siec_sklepow"]
present = [c for c in drop_cols if c in df.columns]
if present:
    df = df.drop(columns=present)
    print("Dropped columns:", present)
else:
    print("No useless columns to drop.")
    
print(f"Rows: {len(df):,}")
print("Raw columns:", list(df.columns))

# Ensure column names are unique
dup_cols = pd.Series(df.columns).value_counts()
if (dup_cols > 1).any():
    print("Duplicate column names:", dup_cols[dup_cols > 1].to_dict())

# Save raw header snapshot
pd.Series(df.columns, name="raw_columns").to_csv(OUT_DIR/"_raw_columns.csv", index=False)

# Peek a few rows
df.head(3)

Dropped columns: ['Numer_paragonu', 'EAN', 'Rabat', 'Kasjer', 'Metoda_platnosci', 'Stawka_VAT', 'Siec_sklepow']
Rows: 260,000
Raw columns: ['ID_Paragonu', 'Data_zakupu', 'Godzina_zakupu', 'Linia_produktowa', 'Nazwa_produktu', 'Ilosc', 'Cena_jednostkowa_brutto', 'Cena_jednostkowa_netto']


Unnamed: 0,ID_Paragonu,Data_zakupu,Godzina_zakupu,Linia_produktowa,Nazwa_produktu,Ilosc,Cena_jednostkowa_brutto,Cena_jednostkowa_netto
0,31006967,2025-09-01,07:07,KAJZERKA xxl 95g-C,,2.0,0.79,0.75
1,31006967,2025-09-01,07:07,SER TOPIONY TOST PLASTRY 130g-C,Sertop Tychy Ser topiony w plastrach tost 130 ...,1.0,6.4,6.1
2,31006967,2025-09-01,07:07,CHLEB TOST PELNOZ-C,,1.0,4.99,4.75


In [62]:
# 4 normalize columns + basic dtypes
df = normalize_columns(df)  # standardize column names
df = cast_basic_types(df)   # qty -> float, prices -> float, ean -> float/Int64, etc.

print("Normalized columns:", list(df.columns))
df.dtypes.to_frame("dtype").to_csv(OUT_DIR/"_normalized_dtypes.csv")

# Required columns for this audit
required = {
    "receipt_id","purchase_date","purchase_time",
    "product_name","product_line","qty","unit_price_gross"
}
missing = [c for c in required if c not in df.columns]
print("Missing required columns:", missing)

df.dtypes.to_frame("dtype").to_csv(OUT_DIR/"_normalized_dtypes.csv")

clean_non_products

# Quick sample
df.head(3)

Normalized columns: ['receipt_id', 'purchase_date', 'purchase_time', 'product_line', 'product_name', 'qty', 'unit_price_gross', 'unit_price_net']
Missing required columns: []


Unnamed: 0,receipt_id,purchase_date,purchase_time,product_line,product_name,qty,unit_price_gross,unit_price_net
0,31006967,2025-09-01,07:07,KAJZERKA xxl 95g-C,,2.0,0.79,0.75
1,31006967,2025-09-01,07:07,SER TOPIONY TOST PLASTRY 130g-C,Sertop Tychy Ser topiony w plastrach tost 130 ...,1.0,6.4,6.1
2,31006967,2025-09-01,07:07,CHLEB TOST PELNOZ-C,,1.0,4.99,4.75


In [63]:
# 5 keys & duplicates
report = {}
report["rows"] = len(df)
report["unique_receipt_ids"] = df["receipt_id"].nunique() if "receipt_id" in df else None

# Exact duplicate lines
report["exact_duplicate_lines"] = int(df.duplicated().sum())

# Duplicates within same receipt_id + product_name + qty + price
subset_cols = [c for c in ["receipt_id","product_name","unit_price_gross","qty"] if c in df.columns]
if subset_cols:
    dup_within = df.duplicated(subset=subset_cols).sum()
    report["dup_within_receipt"] = int(dup_within)

pd.DataFrame([report]).to_csv(OUT_DIR/"audit_keys_duplicates.csv", index=False)
report

{'rows': 260000,
 'unique_receipt_ids': 123246,
 'exact_duplicate_lines': 4340,
 'dup_within_receipt': 5956}

In [64]:
# 6 missingness & numeric sanity
na_pct = df.isna().mean().sort_values(ascending=False)
na_pct.to_csv(OUT_DIR/"na_fraction_by_column.csv", header=["na_fraction"])
print("Top NA columns:")
print(na_pct.head(10))

# Suspicious qty/price
bad = df[(df["qty"] <= 0) | (df["unit_price_gross"] <= 0)]
print("Suspicious qty/price rows:", len(bad))
bad.head(3).to_csv(OUT_DIR/"_suspicious_qty_price_sample.csv", index=False)

Top NA columns:
product_name        0.004088
receipt_id          0.000000
purchase_time       0.000000
purchase_date       0.000000
product_line        0.000000
qty                 0.000000
unit_price_gross    0.000000
unit_price_net      0.000000
dtype: float64
Suspicious qty/price rows: 0


In [65]:
# 7 timestamp parsing + slots
tz = yaml.safe_load((PROJECT_ROOT/"config/slots.yaml").read_text())["timezone"]
df = parse_timestamp(df, tz)
df = assign_slots(df, PROJECT_ROOT/"config/slots.yaml")

pct_ts = df["ts"].notna().mean()
dmin, dmax = df["ts"].min(), df["ts"].max()
print(f"TS parsed: {pct_ts:.3f} | Range: {dmin} → {dmax}")

  return pd.to_datetime(s, errors="coerce").dt.time
  return pd.to_datetime(s, errors="coerce").dt.time


TS parsed: 0.463 | Range: 2025-01-09 00:00:00+01:00 → 2025-12-09 23:02:00+01:00


In [66]:
# 8 categorical distributions
def top_counts(col):
    s = df[col].value_counts(dropna=False).head(25)
    s.to_csv(OUT_DIR/f"top_{col}.csv", header=["count"])
    return s

for c in ["product_line","payment_method","cashier"]:
    if c in df.columns:
        print(f"\n{c}:")
        print(top_counts(c).head(10))


product_line:
product_line
BUTELKA ZYWIEC 0,5(1)    3531
BUTELKA KOMPANIA 0(1)    3005
NAPOJ MONSTER 0,5l-A     2102
TORBA PAPIEROWA ZA-A     1490
BUTELKA OKOCIM 0,5(1)    1467
PIWO HARNAS JP 0,5-A     1386
HOT DOG PAR Z SZYN-B     1346
NAPOJ OSHEE 555ml-C      1234
NAPOJ COCA 0,85l-A       1233
TORBA PAP MALA-A         1218
Name: count, dtype: int64


In [67]:
# 9 product health + value mix
df["line_value_gross"] = df["qty"] * df["unit_price_gross"]
val_by_pl = df.groupby("product_line")["line_value_gross"].sum().sort_values(ascending=False).head(20)
val_by_pl.to_csv(OUT_DIR/"value_by_product_line_top20.csv", header=["value_gross"])
save_bar(val_by_pl, "Gross value by product_line (top 20)", PLOTS/"value_by_product_line_top20.png")
val_by_pl.head(10)

product_line
PIWO ZUBR-A                         24063.36000
PAPIEROSY LM FIRST-A                21667.66000
PAPIEROSY WINSTON BLUE SUPER L-A    21539.99099
PAP LxM FC BRIG SL-A                19271.00000
PIWO HARNAS JP 0,5-A                18523.60000
NAPOJ MONSTER 0,5l-A                17775.48000
PIWO 4-PAK TYSKI-A                  17058.94000
PAPIEROSY LM BLUE KS-A              16184.96000
PAPIEROSY LxM FIRS-A                14391.00000
BUTELKA ZYWIEC 0,5(1)               13969.02000
Name: line_value_gross, dtype: float64

In [68]:
# 10 daily coverage + outliers
daily = (df.dropna(subset=["ts"])
           .groupby(df["ts"].dt.date)
           .agg(lines=("product_name","size"),
                value=("line_value_gross","sum"))
           .reset_index())
daily.to_csv(OUT_DIR/"daily_lines_value.csv", index=False)
print(f"Daily rows: {len(daily)}  Range: {daily['ts'].min()} → {daily['ts'].max()}")

# Outliers
hi_price = df["unit_price_gross"].quantile(0.999)
hi_qty = df["qty"].quantile(0.999)
outliers = df[(df["unit_price_gross"] > hi_price) | (df["qty"] > hi_qty)]
outliers.head(10).to_csv(OUT_DIR/"_outlier_lines_sample.csv", index=False)
print("Outlier lines:", len(outliers))

Daily rows: 12  Range: 2025-01-09 → 2025-12-09
Outlier lines: 382


In [None]:
# 11 food Corner mapping and coverage
# Use product_line as the canonical product identifier
df["product_norm"] = normalize_text(df["product_line"])

# Load canonical Food Corner menu
canonical_path = PROJECT_ROOT / "data" / "refs" / "zabka_food_corner_menu_canonical.csv"
canonical = pd.read_csv(canonical_path)

# Run deterministic mapping
mapping = map_fc_products(df, canonical, threshold=70)

# Save mapping
MAP_OUT = PROJECT_ROOT / "data" / "refs" / "auto_fc_mapping_from_menu.csv"
mapping.to_csv(MAP_OUT, index=False)
print("Saved mapping:", MAP_OUT)

# Join back for coverage stats
mp = mapping[["product_norm","is_food_corner_auto","match_category","best_match_item","score"]].drop_duplicates("product_norm")
joined = df.merge(mp, on="product_norm", how="left")

# Coverage & FC share
line_coverage = joined["is_food_corner_auto"].notna().mean()
fc_rate = (joined["is_food_corner_auto"] == True).mean()  # treat NaN as non-FC
coverage = float(line_coverage
                 
                 
                 
                 
                 
                 
                 
                 
                 
                 
                 
                 
                 
                 
                 
                 
                 )  # for back-compat with Cell 12

# Save numeric summary
pd.DataFrame([{
    "line_coverage": float(line_coverage),
    "fc_rate": float(fc_rate),
    "threshold": 70
}]).to_csv(OUT_DIR / "fc_coverage_summary.csv", index=False)

# Simple bar “gauges”
save_bar(pd.Series({"mapped": line_coverage, "unmapped": 1 - line_coverage}),
         "Line-level mapping coverage", PLOTS/"fc_line_coverage.png")
save_bar(pd.Series({"FC": fc_rate, "Non-FC": 1 - fc_rate}),
         "Lines flagged as Food Corner (auto)", PLOTS/"fc_flag_rate.png")

print(f"Line-level mapping coverage: {line_coverage:.3f}")
print(f"Lines flagged as FC (auto):  {fc_rate:.3f}")

# Unmapped sample for quick review
unmapped = joined.loc[joined["is_food_corner_auto"].isna(),
                      ["product_line","product_name"]].drop_duplicates().head(50)
unmapped.to_csv(OUT_DIR / "fc_unmapped_examples.csv", index=False)
unmapped

Saved mapping: /home/alonbenach/project/invoice-analysis/data/refs/auto_fc_mapping_from_menu.csv
Line-level mapping coverage: 1.000
Lines flagged as FC (auto):  0.056


Unnamed: 0,product_line,product_name


To be fixed:
* False positives
* Better sorting algo

In [70]:
# 12 Persist cleaned dataset for Stage 2 

ensure_dir(PROCESSED)

# Our final table for Stage 2 is the joined one (has 'is_food_corner_auto')
df_final = joined.copy()

# Ensure core fields exist
if "line_value_gross" not in df_final.columns:
    df_final["line_value_gross"] = df_final["qty"] * df_final["unit_price_gross"]

# Use receipt_id as basket id
df_final["basket_id"] = df_final["receipt_id"]

# Minimal, consistent column order (keep only those that exist)
cols_order = [
    "basket_id","receipt_id","ts","purchase_date","purchase_time","slot_id","slot_label",
    "product_line","product_name","qty","unit_price_gross","line_value_gross",
    "cashier","payment_method","is_food_corner_auto","match_category","best_match_item","score"
]
df_final = df_final[[c for c in cols_order if c in df_final.columns]]

# Save (Parquet preferred; CSV fallback)
out_parquet = PROCESSED / "invoices_2025-09_clean.parquet"
out_csv     = PROCESSED / "invoices_2025-09_clean.csv.gz"

saved = []
try:
    df_final.to_parquet(out_parquet, index=False)
    saved.append(str(out_parquet))
except Exception as e:
    print("Parquet save failed, falling back to CSV.gz:", e)
    df_final.to_csv(out_csv, index=False, compression="gzip")
    saved.append(str(out_csv))

# Convenience "latest" copy
latest_parquet = PROCESSED / "latest.parquet"
latest_csv     = PROCESSED / "latest.csv.gz"
if out_parquet.exists():
    df_final.to_parquet(latest_parquet, index=False)
else:
    df_final.to_csv(latest_csv, index=False, compression="gzip")

print("Saved:", saved)
df_final.head(3)


Saved: ['/home/alonbenach/project/invoice-analysis/data/processed/invoices_2025-09_clean.parquet']


Unnamed: 0,basket_id,receipt_id,ts,purchase_date,purchase_time,slot_id,slot_label,product_line,product_name,qty,unit_price_gross,line_value_gross,is_food_corner_auto,match_category,best_match_item,score
0,31006967,31006967,2025-01-09 07:07:00+01:00,2025-09-01,07:07,1.0,Going to work,KAJZERKA xxl 95g-C,,2.0,0.79,1.58,False,Kajzerki,Kajzerka kebab,72
1,31006967,31006967,2025-01-09 07:07:00+01:00,2025-09-01,07:07,1.0,Going to work,SER TOPIONY TOST PLASTRY 130g-C,Sertop Tychy Ser topiony w plastrach tost 130 ...,1.0,6.4,6.4,False,Tosty,Tost z szynką i serem,42
2,31006967,31006967,2025-01-09 07:07:00+01:00,2025-09-01,07:07,1.0,Going to work,CHLEB TOST PELNOZ-C,,1.0,4.99,4.99,True,Tosty,Tost z szynką i serem,45
