Create objects that corresponds to the .json format expected for output

In [108]:
import json
from dataclasses import dataclass, field, asdict
from collections import defaultdict
from typing import List, Dict

# Define ContractDuration as a class inherited from dict
class ContractDuration(dict):
    def __init__(self, start_date: str, end_date: str):
        super().__init__(start_date=start_date, end_date=end_date)
    
    @property
    def start_date(self):
        return self["start_date"]

    @start_date.setter
    def start_date(self, value: str):
        if not isinstance(value, str) or not value.strip():
            raise ValueError("Start date must be a non-empty string.")
        self["start_date"] = value

    @property
    def end_date(self):
        return self["end_date"]

    @end_date.setter
    def end_date(self, value: str):
        if not isinstance(value, str) or not value.strip():
            raise ValueError("End date must be a non-empty string.")
        self["end_date"] = value

# Define ContractedRate as a class inherited from dict
class ContractedRate(dict):
    def __init__(self, rate: str, unit: str):
        super().__init__(rate=rate, unit=unit)
    
    @property
    def rate(self):
        return self["rate"]

    @rate.setter
    def rate(self, value: str):
        if not isinstance(value, str) or not value.strip():
            raise ValueError("Rate must be a non-empty string.")
        self["rate"] = value

    @property
    def unit(self):
        return self["unit"]

    @unit.setter
    def unit(self, value: str):
        if not isinstance(value, str) or not value.strip():
            raise ValueError("Unit must be a non-empty string.")
        self["unit"] = value

# Define EnergyContract dataclass
@dataclass
class EnergyContract:
    __contract_id: str
    __monthly_forecasted_usage: Dict[str, float]
    __contract_duration: ContractDuration
    __contracted_rates: List[ContractedRate]
    __rate_class: str
    __unit_of_measurement: str
    __customer_dba_name: str
    __service_addresses: List[str]
    __account_numbers: List[str]
    __meter_numbers: List[str] 
    __notes: List[str]

    def __init__(self, 
                 contract_id: str,
                 monthly_forecasted_usage: Dict[str, float],
                 contract_duration: ContractDuration,
                 contracted_rates: List[ContractedRate],
                 rate_class: str, 
                 unit_of_measurement: str,
                 customer_dba_name: str, 
                 service_addresses: List[str],
                 account_numbers: List[str],
                 meter_numbers: List[str],
                 notes: Optional[list[str]] = None):
        self.__contract_id = contract_id
        self.__monthly_forecasted_usage = monthly_forecasted_usage
        self.__contract_duration = contract_duration
        self.__contracted_rates = contracted_rates
        self.__rate_class = rate_class
        self.__unit_of_measurement = unit_of_measurement
        self.__customer_dba_name = customer_dba_name
        self.__service_addresses = service_addresses
        self.__account_numbers = account_numbers
        self.__meter_numbers = meter_numbers
        self.__notes = notes if notes is not None else []
        
    @property
    def contract_id(self) -> str:
        return self.__contract_id

    @contract_id.setter
    def contract_id(self, value: str):
        if not isinstance(value, str) or not value.strip():
            raise ValueError("Contract ID must be a non-empty string.")
        self.__contract_id = value

    @property
    def monthly_forecasted_usage(self) -> Dict[str, float]:
        return self.__monthly_forecasted_usage

    @monthly_forecasted_usage.setter
    def monthly_forecasted_usage(self, value: Dict[str, float]):
        if not isinstance(value, dict):
            raise ValueError("Monthly forecasted usage must be a dictionary.")
        if not all(isinstance(k, str) and isinstance(v, (int, float)) for k, v in value.items()):
            raise ValueError("Each entry in monthly forecasted usage must be a string (key) and a number (value).")
        self.__monthly_forecasted_usage = value

    @property
    def contract_duration(self) -> ContractDuration:
        return self.__contract_duration

    @contract_duration.setter
    def contract_duration(self, value: ContractDuration):
        if not isinstance(value, ContractDuration):
            raise ValueError("Contract duration must be a ContractDuration object.")
        self.__contract_duration = value

    @property
    def contracted_rates(self) -> List[ContractedRate]:
        return self.__contracted_rates

    @contracted_rates.setter
    def contracted_rates(self, value: List[ContractedRate]):
        if not isinstance(value, list) or not all(isinstance(rate, ContractedRate) for rate in value):
            raise ValueError("Contracted rates must be a list of ContractedRate objects.")
        self.__contracted_rates = value

    @property
    def rate_class(self) -> str:
        return self.__rate_class

    @rate_class.setter
    def rate_class(self, value: str):
        if not isinstance(value, str) or not value.strip():
            raise ValueError("Rate class must be a non-empty string.")
        self.__rate_class = value

    @property
    def unit_of_measurement(self) -> str:
        return self.__unit_of_measurement

    @unit_of_measurement.setter
    def unit_of_measurement(self, value: str):
        if not isinstance(value, str) or not value.strip():
            raise ValueError("Unit of measurement must be a non-empty string.")
        self.__unit_of_measurement = value
        
    @property
    def customer_dba_name(self) -> str:
        return self.__customer_dba_name

    @customer_dba_name.setter
    def customer_dba_name(self, value: str):
        if not isinstance(value, str) or not value.strip():
            raise ValueError("Customer DBA Name must be a non-empty string.")
        self.__customer_dba_name = value

    @property
    def service_addresses(self) -> List[str]:
        return self.__service_addresses

    @service_addresses.setter
    def service_addresses(self, value: List[str]):
        if not isinstance(value, list) or not all(isinstance(addr, str) for addr in value):
            raise ValueError("Service addresses must be a list of non-empty strings.")
        self.__service_addresses = value

    @property
    def account_numbers(self) -> List[str]:
        return self.__account_numbers

    @account_numbers.setter
    def account_numbers(self, value: List[str]):
        if not isinstance(value, list) or not all(isinstance(num, str) for num in value):
            raise ValueError("Account numbers must be a list of non-empty strings.")
        self.__account_numbers = value

    @property
    def meter_numbers(self) -> List[str]:
        return self.__meter_numbers

    @meter_numbers.setter
    def meter_numbers(self, value: List[str]):
        if not isinstance(value, list) or not all(isinstance(num, str) for num in value):
            raise ValueError("Meter numbers must be a list of non-empty strings.")
        self.__meter_numbers = value
        
    @property
    def notes(self) -> List[str]:
        return self.__notes

    @notes.setter
    def notes(self, value: List[str]):
        if not isinstance(value, list) or not all(isinstance(num, str) for num in value):
            raise ValueError("Notes must be a list of non-empty strings.")
        self.__notes = value
        
    def update_notes(self, value):
        if isinstance(value, list):
            self.__notes.extend(value)
        elif isinstance(value, str):
            self._list_attribute.append(value)
        else:
            raise TypeError("Value must be a list or a valid element.")

    # -- JSON Helpers --
    def to_json(self) -> str:
        # indent=2 means that there are 2 spaces for each subfield
        return json.dumps(asdict(self), indent=2)

    @classmethod
    def with_defaults(cls) -> "EnergyContract":
        return EnergyContract(
            contract_id='',
            monthly_forecasted_usage=defaultdict(float),
            contract_duration=ContractDuration('', ''),
            contracted_rates=[],
            rate_class='',
            unit_of_measurement='',
            customer_dba_name='',
            service_addresses=[],
            account_numbers=[],
            meter_numbers=[]
        )
    
    @staticmethod
    def from_json(data: str) -> EnergyContract:
        raw = json.loads(data)
        # Convert contract duration
        contract_duration_data = raw['contract_duration']
        contract_duration = ContractDuration(**contract_duration_data)

        # Convert contracted rates
        contracted_rates = [ContractedRate(**rate) for rate in raw['contracted_rates']]

        return EnergyContract(
            contract_id=raw['contract_id'],
            customer_dba_name=raw['customer_dba_name'],
            service_addresses=raw['service_addresses'],
            account_numbers=raw['account_numbers'],
            meter_numbers=raw['meter_numbers'],
            contract_duration=contract_duration,
            rate_class=raw['rate_class'],
            contracted_rates=contracted_rates,
            unit_of_measurement=raw['unit_of_measurement'],
            monthly_forecasted_usage=raw['monthly_forecasted_usage']
        )

Define function to identify all `.md` files and folders with missing `.md` file

In [21]:
import os

def find_md_files_and_missing_dirs(root_dir: str) -> dict:
    """
    Recursively find all .md files in the given directory and its subdirectories.
    Returns a dictionary with lists of found files and missing folders.

    Args:
        root_dir (str): Path to the root directory.

    Returns:
        dict: {
            "md_files": List[str],  # List of .md file paths
            "missing_folders": List[str]  # Folders with no .md files
        }
    """
    result = {
        "md_files": [],
        "missing_folders": []
    }

    for dirpath, _, filenames in os.walk(root_dir):
        md_in_folder = [f for f in filenames if f.lower().endswith(".md")]
        if md_in_folder:
            for f in md_in_folder:
                result["md_files"].append(os.path.join(dirpath, f))
        else:
            result["missing_folders"].append(dirpath)

    return result


In [27]:
result = find_md_files_and_missing_dirs("../../ProjectMaterials/outputs")

print("Found:")
for md_file in result["md_files"]:
    print(md_file)

print()

print('Missing .md files at')
for folder in result["missing_folders"]:
    print(folder)

Found:
../../ProjectMaterials/outputs/Example 2 (Contract)/Example 2 (Contract).md
../../ProjectMaterials/outputs/Example 3 (Sample Bill)/Example 3 (Sample Bill).md
../../ProjectMaterials/outputs/Example 7 (Sample Bill)/Example 7 (Sample Bill).md
../../ProjectMaterials/outputs/Example 4 (Sample Bill)/Example 4 (Sample Bill).md
../../ProjectMaterials/outputs/Example 1 (Sample Bill)/Example 1 (Sample Bill).md
../../ProjectMaterials/outputs/Example 6 (Sample Bill)/Example 6 (Sample Bill).md
../../ProjectMaterials/outputs/Example 2 (Sample Bill)/Example 2 (Sample Bill).md
../../ProjectMaterials/outputs/Example 8 (Sample Bill)/Example 8 (Sample Bill).md
../../ProjectMaterials/outputs/Example 1 (Contract)/Example 1 (Contract).md
../../ProjectMaterials/outputs/Example 4 (Contract)/Example 4 (Contract).md

Missing .md files at
../../ProjectMaterials/outputs
../../ProjectMaterials/outputs/Example 2 (Contract)/images
../../ProjectMaterials/outputs/Example 3 (Sample Bill)/images
../../ProjectMate

Notice that folders ***Example 5 (Sample Bill)*** and ***Example 3 (Contract)*** are missing `.md` files

----

In [37]:
def find_md_files_and_missing_dirs_with_keywords(root_dir: str, 
                                                 include_keywords: List[str]=None) -> Dict[str, List[str]]:
    """
    Recursively find all .md files in the given directory and its subdirectories,
    but only traverse into subdirectories that contain any of the specified keywords.

    Args:
        root_dir (str): Path to the root directory.
        include_keywords (List[str], optional): Only include subdirectories containing any of these keywords.

    Returns:
        dict: {
            "md_files": List[str],
            "missing_folders": List[str]
        }
    """
    result = {
        "md_files": [],
        "missing_folders": []
    }

    include_keywords = include_keywords or []

    for dirpath, dirnames, filenames in os.walk(root_dir):
        # Filter subdirectories to only include those with matching keywords
        dirnames[:] = [
            d for d in dirnames
            if any(kw.lower() in d.lower() for kw in include_keywords)
        ] if include_keywords else dirnames

        # Process current dirpath only if it matches include keywords
        if include_keywords and not any(kw.lower() in dirpath.lower() for kw in include_keywords):
            continue

        md_in_folder = [f for f in filenames if f.lower().endswith(".md")]
        if md_in_folder:
            for f in md_in_folder:
                result["md_files"].append(os.path.join(dirpath, f))
        else:
            result["missing_folders"].append(dirpath)

    return result

In [188]:
result = find_md_files_and_missing_dirs_with_keywords(
    "../../ProjectMaterials/outputs",
    include_keywords=['Contract']
)

print("Markdown files found:")
print("\n".join(result["md_files"]))

print("\nFolders missing .md files:")
print("\n".join(result["missing_folders"]))

Markdown files found:
../../ProjectMaterials/outputs/Example 2 (Contract)/Example 2 (Contract).md
../../ProjectMaterials/outputs/Example 1 (Contract)/Example 1 (Contract).md
../../ProjectMaterials/outputs/Example 4 (Contract)/Example 4 (Contract).md

Folders missing .md files:
../../ProjectMaterials/outputs/Example 3 (Contract)


In [41]:
result = find_md_files_and_missing_dirs_with_keywords(
    "../../ProjectMaterials/outputs",
    include_keywords=['Bill']
)

print("Markdown files found:")
print("\n".join(result["md_files"]))

print("\nFolders missing .md files:")
print("\n".join(result["missing_folders"]))

Markdown files found:
../../ProjectMaterials/outputs/Example 3 (Sample Bill)/Example 3 (Sample Bill).md
../../ProjectMaterials/outputs/Example 7 (Sample Bill)/Example 7 (Sample Bill).md
../../ProjectMaterials/outputs/Example 4 (Sample Bill)/Example 4 (Sample Bill).md
../../ProjectMaterials/outputs/Example 1 (Sample Bill)/Example 1 (Sample Bill).md
../../ProjectMaterials/outputs/Example 6 (Sample Bill)/Example 6 (Sample Bill).md
../../ProjectMaterials/outputs/Example 2 (Sample Bill)/Example 2 (Sample Bill).md
../../ProjectMaterials/outputs/Example 8 (Sample Bill)/Example 8 (Sample Bill).md

Folders missing .md files:
../../ProjectMaterials/outputs/Example 5 (Sample Bill)


----

In [94]:
def get_file_and_folder(path: str):
    file_name = os.path.basename(path)
    folder_name = os.path.basename(os.path.dirname(path))
    return folder_name, file_name

In [96]:
print(get_file_and_folder(result["md_files"][0]))

('Example 3 (Sample Bill)', 'Example 3 (Sample Bill).md')


In [98]:
def define_file_name(path: str):
    folder_name, _ = get_file_and_folder(path)
    file_type = folder_name.replace("(", "").replace(")", "").split()[-1]
    file_number = int(folder_name.split()[1])
    return f"{file_type.upper()}_{file_number:03}"

In [100]:
define_file_name(result["md_files"][0])

'BILL_003'

In [102]:
import json
from dataclasses import asdict, is_dataclass
from typing import Any

def write_json_to_file(obj: Any, filename: str) -> None:
    if not is_dataclass(obj):
        raise TypeError(f"Object of type {type(obj).__name__} is not a dataclass and cannot be serialized with asdict().")

    try:
        data = asdict(obj)
        with open(filename, "w", encoding="utf-8") as f:
            json.dump(data, f, indent=2)
    except TypeError as e:
        raise TypeError(f"Failed to serialize object to JSON: {e}")

In [260]:
def deduplicate(items: list) -> list:
    return list(set(items))

In [106]:
from functools import wraps

# Central Error Code
EXCEPTION_ERROR_CODES = {
    FileNotFoundError: 10,
    PermissionError: 11,
    OSError: 12,  # General I/O errors
    KeyError: 1,
    ValueError: 2,
    IndexError: 3,
    Exception: 99
}

# Decorator pattern: wraps risky functions
def exception_to_code(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        try:
            result = func(*args, **kwargs)
            return result, 0, None
        except Exception as e:
            error_code = EXCEPTION_ERROR_CODES.get(type(e), EXCEPTION_ERROR_CODES[Exception])
            return None, error_code, str(e)
    return wrapper

def indent_decorator(func):
    """Decorator to indent print statements based on call depth."""
    def wrapper(*args, **kwargs):
        wrapper.depth += 1
        indent = " " * (wrapper.depth * 2)

        # Capture original print function
        original_print = __builtins__.print

        def indented_print(*args, **kwargs):
            # Prepend indentation to the first argument if it's a string
            if args and isinstance(args[0], str):
                args = (indent + args[0],) + args[1:]
            original_print(*args, **kwargs)

        # Replace built-in print with indented print
        __builtins__.print = indented_print

        try:
            return func(*args, **kwargs)
        finally:
            # Restore original print function no matter what
            __builtins__.print = original_print
            wrapper.depth -= 1

    wrapper.depth = 0
    return wrapper

In [202]:
def initialize_contract() -> EnergyContract:
    return EnergyContract.with_defaults()
    
def process_contract(full_file_path: str) -> EnergyContract:
    folder_name, file_name = get_file_and_folder(full_file_path)
    print(f"Processing contract at folder location: {folder_name}")
    print(f"  Processing markdown file - {file_name} - as EnergyContract object")
    
    results_dict = {}  # Initialize an empty dictionary for storing extraction results
    errors = []

    contract = initialize_contract()
    
    # Define contract_id base on file name
    contract.contract_id = define_file_name(full_file_path)
    
    # Attempt to read the file first
    markdown_content, code, msg = read_md(full_file_path)
    if code != 0:
        # Stop further processing if file read failed
        errors.append(("File Content", code, msg))
        print(f"❌ Stopping process. Error in file read: {msg}")
        return contract  # Return early due to read failure
    
    # Quick examination of the .md file
    eda_md(markdown_content)
    
    # Extract customer DBA name
    customer_dba_names = extract_customer_dba_name(markdown_content)
    
    if len(customer_dba_names) == 0:
        contract.notes.append('No customer dba name found!')
    elif len(customer_dba_name) == 1:
        contract.customer_dba_name == customer_dba_names[0]
    elif len(customer_dba_name) > 1:
        contract.customer_dba_name == customer_dba_names[0]
        contract.notes.append('Multiple customer dba name found. Taking 1st identified name and ignore rest')
        
    
    return contract
    # Read in .md file
    # Show information related to the .md file
    # Assign unique contract id to .json
    # Extract Contract Duration and Usage Forecast
    # Extract Unit of Measurement and Contracted Rate(s)
    # Extract Rate Class and Service Address(es)
    # Extract Account Number(s) and Meter Number(s)
    # Complete processing. Return filled in contract

@exception_to_code
def read_md(full_file_path: str) -> str:
    try: 
        with open(full_file_path, 'r', encoding='utf-8') as f:
            markdown_content = f.read()
            print(type(markdown_content))
            
            _, file_name = get_file_and_folder(full_file_path)
            print(f"  Successfully read {file_name}.")  
            return markdown_content
    except Exception as e:
        raise Exception(f"  Error reading file {file_name}: {e}")
        
def eda_md(markdown_content: str):
    print('The markdown file has following attributes:')
    print(f'{extract_num_pages_from_markdown(markdown_content)} page(s)')
    print(f'{extract_num_lines_from_markdown(markdown_content)} line(s)')
    print(f'{extract_num_char_from_markdown(markdown_content)} character(s)')
        
def extract_contract_duration(markdown_content: str) -> Dict[str, str]:
    return dict()

def extract_usage_forecast(markdown_content: str) ->  Dict[str, float]:
    return dict()

def extract_unit_of_measurement(markdown_content: str) -> str:
    return ''

def extract_contracted_rate(markdown_content: str) -> List[ContractedRate]:
    return list()

def extract_rate_class(markdown_content: str) -> str:
    return ''

def extract_service_address(markdown_content: str) -> List[str]:
    return list()

def extract_account_number(markdown_content: str) -> List[str]:
    return list()

def extract_meter_number(markdown_content: str) -> List[str]:
    return list()

In [168]:
import re
def extract_num_pages_from_markdown(markdown_content: str) -> int:
    # Extract all numbers following '/page_num: '
    page_nums = [int(num) for num in re.findall(r'/page_num: (\d+)', markdown_content)]
    # Last number indicate number of pages that exists
    return page_nums[-1]

def extract_num_lines_from_markdown(markdown_content: str) -> int:
    return len(markdown_content.split('\n'))

def extract_num_char_from_markdown(markdown_content: str) -> int:
    return len(markdown_content)

In [258]:
def extract_customer_dba_name(markdown_content: str) -> list:
    # Define list of keywords
    keywords = ["BUYER Name", 'Business Name', 'Company Name', 
                'DBA\s*/\s*Assumed\s+Name', 'Customer Name', 'Customer',
                'Buyer']
                
    # Define the case-insensitive regex pattern
    pattern = r'(?i)\*{0,2}\s*(' + '|'.join(map(re.escape, keywords)) + r')\s*:\*{0,2}\s+([\w\s]+)'

    results = []
    
    for line in markdown_content.splitlines():
        matches = re.findall(pattern, line)
        results.extend(match[1].strip() for match in matches)
        
    if len(results) >= 1:
        print("Customer DBA Name match found")
        
    if len(results) > 1:
        print("Multiple match found. Attempt to deduplicate")
        return deduplicate(results)
    else:
        return results

extract_customer_dba_name(read_md.__wrapped__(result["md_files"][1]))

<class 'str'>
  Successfully read Example 1 (Contract).md.
Customer DBA Name match found
Multiple match found. Attempt to deduplicate


['Barcelona Rino LLC']