In [1]:
import re
import pandas as pd
import datetime
import os
from rapidfuzz import fuzz
from dateutil.parser import parse

In [2]:
bank_abbreviations = {
    "NEFT", "RTGS", "IMPS", "UPI", "ECS", "NACH", "ACH", "ATM", "POS", "CHQ", "CLG", "TPT", "TRF", "IB", "MB", "VPA", "BIL/BILLPAY", "AUTOPAY", "SAL/SALARY", "DIV", "INT", "TDS", "REF", "CHRG", "REV", "PUR", "CASHDEP", "CASHWDL", "IB/IBFT", "MB/MOB", "BBPS", "CHQ NO.", "INST NO.", "TXN/TRAN", "REF NO.", "B/F", "C/F", "BOD", "EOD", "MIN BAL", "AMB CHG"
}

expected_headers = {
        "date": [
            "date", "txn date", "value date", "transaction date", "posting date", "tran date",
            "dt", "trans date", "process date", "effective date", "book date"
        ],
        "description": [
            "description", "narration", "details", "particulars", "transaction details", "remark",
            "txn details", "transaction description", "purpose", "memo", "note", "comments"
        ],
        "debit": [
            "debit", "withdrawal", "amount debited", "debit amount", "dr", "withdraw", "debit amt",
            "dr amount", "paid", "out", "payment", "disbursement"
        ],
        "credit": [
            "credit", "deposit", "amount credited", "credit amount", "cr", "deposit amount", "credit amt",
            "cr amount", "received", "in", "receipt", "collection"
        ]
    }

noise_patterns = [
            r'statement\s+(of|for|period)',
            r'account\s+(holder|number|summary)',
            r'customer\s+(name|id|details)',
            r'address\s*:',
            r'phone\s*(no|number)\s*:',
            r'email\s*(id|address)\s*:',
            r'branch\s+(name|code|address)',
            r'ifsc\s*(code)?\s*:',
            r'opening\s+balance',
            r'closing\s+balance',
            r'total\s+(credit|debit|transactions)',
            r'summary\s+(of|for)',
            r'thank\s+you\s+for\s+banking',
            r'continued\s+(on|from)',
            r'page\s+\d+(\s+of\s+\d+)?',
            r'(generated|printed)\s+on\s*:',
            r'statement\s+period\s*:',
            r'customer\s+id\s*:',
            r'^\s*$',
            r'bank\s+(name|logo)',
            r'terms\s+(and|&)\s+conditions',
            r'disclaimer',
            r'important\s+notes?',
            r'legend\s*:',
            r'abbreviations?\s*:'
        ]


In [3]:
def mask_all_digits(text):
    return re.sub(r"\d{6,}", lambda m: "*" * len(m.group()), text)

def normalize_date(value):
    if isinstance(value, (pd.Timestamp, datetime.datetime)):
        return value.strftime("%Y-%m-%d")
    try:
        return parse(str(value), fuzzy=True).strftime("%Y-%m-%d")
    except Exception:
        print(f"Failed to parse date from value: {value}. Please check the format or content.")
        return value  # return original if parsing fails

In [4]:
class TransactionValidator:
    @staticmethod
    def is_valid_date(value):
        if isinstance(value, (pd.Timestamp, datetime.datetime)):
            return True
        if isinstance(value, (int, float)):  # reject pure numbers (likely not dates)
            return False
        if not isinstance(value, str):
            return False

        value = value.strip()
        if not value or value.lower() == 'nan':
            return False

        try:
            parse(value, fuzzy=True)  # allow embedded/extra tokens
            return True
        except Exception:
            return False

    @staticmethod
    def is_valid_amount(value):
        try:
            val = float(value)
            return not pd.isna(val) and str(value).lower() != 'nan'
        except (ValueError, TypeError):
            return False

    @staticmethod
    def is_valid_description(value):
        if not isinstance(value, str):
            return False
        value = value.strip().lower()
        for pattern in noise_patterns:
            if re.search(pattern, value, re.IGNORECASE):
                return False
        if not value or value in ["nan", "date", "opening", "closing", "balance"]:
            return False
        if re.match(r"^(transaction|txn|narration|particulars|details|ref|id)[\s:]*$", value):
            return False
        contains_abbreviation = any(abbrev.lower() in value for abbrev in bank_abbreviations)
        return contains_abbreviation or any(c.isalpha() for c in value)

    @classmethod
    def is_valid_transaction_row(cls, row = None):
        values = row.astype(str).tolist()
        non_empty_values = [val.strip() for val in values if val.strip().lower() not in ['nan', '']]

        if len(non_empty_values) < 3:
            return False

        date_val = next((val for val in non_empty_values if TransactionValidator.is_valid_date(val)), None)
        amount_val = next((val for val in non_empty_values if TransactionValidator.is_valid_amount(val)), None)
        desc_val = next((val for val in non_empty_values if TransactionValidator.is_valid_description(val)), None)

        if not (date_val and amount_val and desc_val):
            return False

        return len({str(date_val), str(amount_val), str(desc_val)}) == 3


def robust_load(input_file):
    print("🔍 Loading file:", input_file)
    _, ext = os.path.splitext(input_file)
    ext = ext.lower().lstrip('.')
    if ext == "xlsx":
        return pd.read_excel(input_file, engine="openpyxl", header=None)
    elif ext == "xls":
        try:
            return pd.read_excel(input_file, engine="xlrd", header=None)
        except Exception as e:
            raise ValueError(f"⚠️ Error reading {input_file}: {e}")
    elif ext == "csv":
        return pd.read_csv(input_file, header=None)
    else:
        raise ValueError(f"Unsupported file type: {ext}")

In [5]:
class HeaderDetector:
    """
    A class to detect headers in a bank statement.
    """
    @staticmethod
    def match_expected_to_actual(actual_headers):
        matches = []

        # Step 1: Create (expected_key, actual_header, score) triples
        for expected_key, alias_list in expected_headers.items():
            for actual in actual_headers:
                score = max(fuzz.partial_ratio(actual.lower(), alias.lower()) for alias in alias_list)
                matches.append((expected_key, actual, score))

        # Step 2: Sort by score descending
        matches.sort(key=lambda x: x[2], reverse=True)

        # Step 3: Greedily assign best matches
        used_actual_headers = set()
        assigned_keys = set()
        final_mapping = {}

        for expected_key, actual, score in matches:
            if actual not in used_actual_headers and expected_key not in assigned_keys:
                final_mapping[expected_key] = actual
                used_actual_headers.add(actual)
                assigned_keys.add(expected_key)

        # Swap keys and values to get actual_to_standard mapping
        final_mapping = {v: k for k, v in final_mapping.items()}

        return final_mapping

    @staticmethod
    def find_header_row(df, start_row):
        best_idx, best_score = None, -1
        for i in range(max(0, start_row - 3), start_row):
            row_cells = df.iloc[i].astype(str).str.lower().tolist()
            category_scores = []
            for hdr_terms in expected_headers.values():
                term_score = max(
                    fuzz.partial_ratio(cell, term)
                    for cell in row_cells
                    for term in hdr_terms
                )
                category_scores.append(term_score)
            avg_score = sum(category_scores) / len(category_scores)
            if avg_score > best_score:
                best_score, best_idx = avg_score, i
        return best_idx

    @staticmethod
    def score_header_row(row_values):
        category_scores = []
        for hdr_terms in expected_headers.values():
            term_score = max(
                fuzz.partial_ratio(cell, term)
                for cell in row_values
                for term in hdr_terms
            )
            category_scores.append(term_score)
        return sum(category_scores) / len(category_scores)

    @staticmethod
    def find_best_header(df, first_txn_idx, scan_top_n=50):
        # Bottom-up
        bottom_idx = HeaderDetector.find_header_row(df, first_txn_idx)
        bottom_row = df.iloc[bottom_idx].astype(str).str.lower().tolist()
        bottom_score = HeaderDetector.score_header_row(bottom_row)

        # Top-down
        best_top_score, best_top_idx = -1, None
        for i in range(min(scan_top_n, len(df))):
            row = df.iloc[i].astype(str).str.lower().tolist()
            score = HeaderDetector.score_header_row(row)
            if score > best_top_score:
                best_top_score = score
                best_top_idx = i

        # Pick better
        print(f"Bottom-row {df.iloc[bottom_idx].tolist()}")
        print(f"Top-row {df.iloc[best_top_idx].tolist()}")

        if best_top_score > bottom_score:
            print(f"🧠 Using top-down header (row {best_top_idx}) with score {best_top_score:.2f}")
            print(f"Bottom-up header (row {bottom_idx}) had score {bottom_score:.2f}")
            return best_top_idx
        else:
            print(f"🔍 Using bottom-up header (row {bottom_idx}) with score {bottom_score:.2f}")
            print(f"Top-down header (row {best_top_idx}) had score {best_top_score:.2f}")
            return bottom_idx

In [6]:
class BankStatementParser:
    output_file = None
    input_dir = None

    def __init__(self, input_dir, output_file):
        self.input_dir = input_dir
        self.output_file = output_file

    def process(self):
        all_valid_rows = []
        print("Processing bank statement...")
        print("Input directory:", self.input_dir)
        for file in os.listdir(self.input_dir):
            print("🔍 Checking file:", file)
            if not file.endswith(('.xls', '.xlsx', '.csv')):
                continue
            print("🔍 Processing file:", file)
            file_path = os.path.join(self.input_dir, file)

            df = robust_load(file_path)

            if df.empty:
                print(f"⚠️ Skipping {file}: No data loaded.")
                continue

            first_txn_idx = None
            for i, row in df.iterrows():
                if TransactionValidator.is_valid_transaction_row(row):
                    first_txn_idx = i
                    break

            if first_txn_idx is None:
                continue

            header_idx = HeaderDetector.find_best_header(df, first_txn_idx)
            headers = df.iloc[header_idx].tolist()
            # Trim the headers to remove leading/trailing whitespace
            headers = [header.strip() for header in headers if isinstance(header, str)]
            data_df = df.iloc[header_idx + 1:].reset_index(drop=True)
            data_df.columns = headers
            print(f"✅ Detected header row at index {header_idx}: {headers}")

            actual_to_standard = HeaderDetector.match_expected_to_actual(data_df.columns)

            print(f"🔍 Mapped columns: {actual_to_standard}")

            valid_rows = [row for _, row in data_df.iterrows() if TransactionValidator.is_valid_transaction_row(row)]
            if not valid_rows:
                continue

            partial_df = pd.DataFrame(valid_rows, columns=data_df.columns)[list(actual_to_standard.keys())]
            partial_df = partial_df.rename(columns=actual_to_standard)
            print(partial_df.head())
            partial_df["description"] = partial_df["description"].astype(str).apply(mask_all_digits)
            partial_df["date"] = partial_df["date"].apply(normalize_date)

            partial_df = partial_df[["date", "description", "credit", "debit"]]
            print(f"✅ Processed {file}: {len(partial_df)} valid rows found.")
            all_valid_rows.append(partial_df)

        if not all_valid_rows:
            print("⚠️ No valid transactions found across all files.")
            return

        final_df = pd.concat(all_valid_rows, ignore_index=True)
        final_df.to_csv(self.output_file, index=False)
        print(f"✅ Consolidated {len(final_df)} valid rows into {self.output_file}")

In [7]:
if __name__ == "__main__":
    parser = BankStatementParser("../bank_statements", "../output/consolidated_output.csv")
    parser.process()

Processing bank statement...
Input directory: ../bank_statements
🔍 Checking file: .DS_Store
🔍 Checking file: bank_statement2.xlsx
🔍 Processing file: bank_statement2.xlsx
🔍 Loading file: ../bank_statements/bank_statement2.xlsx
Bottom-row ['Txn Date', 'Value Date', 'Description', 'Ref No./Cheque No.', '        Debit', 'Credit', 'Balance']
Top-row ['Txn Date', 'Value Date', 'Description', 'Ref No./Cheque No.', '        Debit', 'Credit', 'Balance']
🔍 Using bottom-up header (row 20) with score 100.00
Top-down header (row 20) had score 100.00
✅ Detected header row at index 20: ['Txn Date', 'Value Date', 'Description', 'Ref No./Cheque No.', 'Debit', 'Credit', 'Balance']
🔍 Mapped columns: {'Txn Date': 'date', 'Description': 'description', 'Debit': 'debit', 'Credit': 'credit'}
        date                                        description     debit  \
0 2024-09-01     ATM WDL-ATM CASH 8346  ANJANAPURA BANGALORE...     10000   
1 2024-09-01     ATM WDL-ATM CASH 8347  ANJANAPURA BANGALORE...    