In [1]:
import os
import json
ROOT_DIR = os.getcwd()
document_type = "bank_statements"
template_name = "halifax"
identifier = "april"

pdf_path: str = os.path.join(
    ROOT_DIR,
    "data",
    document_type,
    template_name,
    "pdf",
    f"{template_name}_{identifier}.pdf",
)

In [2]:
output_path: str = os.path.join(
    ROOT_DIR,
    "outputs",
    f"{template_name}_{identifier}_output.json",
)


output = json.load(open(output_path))

In [3]:
output

{'metadata': {'document_id': '7895ae19-ac9a-424d-81be-6c68c06ebc83',
  'parsed_at': '2024-12-12T15:46:37',
  'number_of_pages': 2},
 'pages': [{'forms': [{'customer_name': 'Mr Jake Holmes'},
    {'sort_code': '11-00-56'},
    {'account_number': '11980162'},
    {'money_in': '£761.80'},
    {'start_balance': '£521.23'},
    {'money_out': '£777.93'},
    {'end_balance': '£505.10'}],
   'tables': [{'data': [{'date': '03 Apr 23',
       'type': 'DD',
       'description': 'UNIVERSITY OF NOTT',
       'money_out': '22.90',
       'money_in': '',
       'balance': '498.33'},
      {'date': '06 Apr 23',
       'type': 'FPI',
       'description': 'HIYACAR LTD',
       'money_out': '',
       'money_in': '59.50',
       'balance': '557.83'},
      {'date': '06 Apr 23',
       'type': 'DEB',
       'description': 'PAYPAL *FIVERR COM',
       'money_out': '28.46',
       'money_in': '',
       'balance': '529.37'},
      {'date': '11 Apr 23',
       'type': 'DEB',
       'description': 'PAYPAL *

In [4]:
# processors.py
from typing import Any, Dict
from datetime import datetime
import re

class ProcessorRegistry:
    def __init__(self):
        self.processors = {
            "clean_text": self.clean_text,
            "clean_numbers": self.clean_numbers,
            "clean_currency": self.clean_currency,
            "clean_date": self.clean_date
        }

    def clean_text(self, value: str, options: Dict[str, Any]) -> str:
        if not value:
            return value
            
        result = value.strip()
        
        if options.get("uppercase", False):
            result = result.upper()
            
        if options.get("remove_titles", False):
            titles = ["Mr", "Mrs", "Ms", "Dr", "Prof"]
            for title in titles:
                result = result.replace(title, "").strip()
                
        if "max_length" in options:
            result = result[:options["max_length"]]
            
        return result

    def clean_numbers(self, value: str, options: Dict[str, Any]) -> str:
        if not value:
            return value
            
        allowed = options.get("allow_chars", "")
        pattern = f"[^0-9{re.escape(allowed)}]"
        result = re.sub(pattern, "", value)
        
        if "format" in options:
            # Apply formatting (e.g., ##-##-## for sort code)
            format_str = options["format"]
            current_pos = 0
            formatted = ""
            for char in format_str:
                if char == "#":
                    if current_pos < len(result):
                        formatted += result[current_pos]
                        current_pos += 1
                else:
                    formatted += char
            result = formatted
            
        return result

    def clean_currency(self, value: str, options: Dict[str, Any]) -> str:
        if not value:
            return value
            
        # Remove specified currency symbols
        for symbol in options.get("remove_symbols", []):
            value = value.replace(symbol, "")
            
        # Remove any remaining non-numeric chars except decimal point
        result = re.sub(r"[^0-9.]", "", value)
        
        # Format to specified decimal places
        try:
            decimal_places = options.get("decimal_places", 2)
            result = f"{float(result):.{decimal_places}f}"
        except ValueError:
            return value
            
        return result

    def clean_date(self, value: str, options: Dict[str, Any]) -> str:
        if not value:
            return value
            
        try:
            date_obj = datetime.strptime(value, options["input_format"])
            return date_obj.strftime(options["output_format"])
        except ValueError:
            return value

    def get_processor(self, name: str):
        return self.processors.get(name)

# cleaner.py
import json

class DocumentCleaner:
    def __init__(self, config_path: str):
        with open(config_path) as f:
            self.config = json.load(f)
        self.processor_registry = ProcessorRegistry()

    def clean_value(self, value: Any, cleaning_rule: Dict) -> Any:
        if cleaning_rule is None:
            return value
            
        processor_name = cleaning_rule.get("processor")
        if not processor_name:
            return value
            
        processor = self.processor_registry.get_processor(processor_name)
        if not processor:
            return value
            
        options = cleaning_rule.get("options", {})
        return processor(value, options)



In [6]:
# Example usage
if __name__ == "__main__":
    path = os.path.join(ROOT_DIR, "cleaner_config", "halifax_cleaner_config.json")
    cleaner = DocumentCleaner(path)

    cleaned_document = cleaner.clean_document(output)


JSONDecodeError: Expecting value: line 1 column 1 (char 0)