In [1]:
import xml.etree.ElementTree as ET
import json
import re
import os

def extract_logical_chunks(xml_content, output_json):
    """
    Extract logical chunks from a hospice XML document including sections, comments, and figures.
    Save the extracted data to the provided output JSON file.
    Improved to handle nested headers with same SOURCE attribute by analyzing numbering patterns.
    """
    root = ET.fromstring(xml_content)

    # Initialize document metadata
    document_metadata = {
        "title": None,
        "agency": None,
        "subagency": None,
        "cfr": None,
        "subject": None,
        "effective_date": None,
        "contact_info": [],
        "summary": None
    }

    # Extract preamble information (if present)
    preamble = root.find(".//PREAMB")
    if preamble is not None:
        agency_elem = preamble.find("./AGENCY")
        if agency_elem is not None:
            document_metadata["agency"] = "".join(agency_elem.itertext()).strip()

        subagency_elem = preamble.find("./SUBAGY")
        if subagency_elem is not None:
            document_metadata["subagency"] = "".join(subagency_elem.itertext()).strip()

        cfr_elem = preamble.find("./CFR")
        if cfr_elem is not None:
            document_metadata["cfr"] = "".join(cfr_elem.itertext()).strip()

        subject_elem = preamble.find("./SUBJECT")
        if subject_elem is not None:
            document_metadata["title"] = "".join(subject_elem.itertext()).strip()
            document_metadata["subject"] = "".join(subject_elem.itertext()).strip()

        effdate_elem = preamble.find("./EFFDATE/P")
        if effdate_elem is not None:
            document_metadata["effective_date"] = "".join(effdate_elem.itertext()).strip()

        for contact in preamble.findall("./FURINF/P"):
            contact_text = "".join(contact.itertext()).strip()
            if contact_text:
                document_metadata["contact_info"].append(contact_text)

        summary_elem = preamble.find("./SUM/P")
        if summary_elem is not None:
            document_metadata["summary"] = "".join(summary_elem.itertext()).strip()

    # Process additional sections and figures
    extracted_data = []
    figures = []
    current_page = None
    current_comment = current_response = current_final_decision = None

    # Define regex patterns for different header formats
    number_patterns = {
        "roman_upper": re.compile(r"^([IVXLCDM]+)\.\s"),          # I., II., etc.
        "roman_lower": re.compile(r"^([ivxlcdm]+)\.\s"),          # i., ii., etc.
        "numeric": re.compile(r"^(\d+)\.\s"),                     # 1., 2., etc.
        "alpha_lower": re.compile(r"^([a-z])\.\s"),               # a., b., etc.
        "alpha_upper": re.compile(r"^([A-Z])\.\s"),               # A., B., etc.
        "numeric_paren": re.compile(r"^\((\d+)\)\s"),             # (1), (2), etc.
        "alpha_lower_paren": re.compile(r"^\(([a-z])\)\s"),       # (a), (b), etc.
        "alpha_upper_paren": re.compile(r"^\(([A-Z])\)\s")        # (A), (B), etc.
    }

    # Track the hierarchical structure
    current_headers = {
        "level_1": None,
        "level_2": None,
        "level_3": None,
        "level_4": None,
        "level_5": None
    }

    # Keep track of the format used at each level
    level_formats = {
        "level_1": None,
        "level_2": None,
        "level_3": None,
        "level_4": None,
        "level_5": None
    }

    def get_header_pattern(header_text):
        """Identify the pattern type of a header based on its formatting"""
        for pattern_name, pattern in number_patterns.items():
            match = pattern.match(header_text)
            if match:
                return pattern_name, match.group(1)
        return None, None

    def determine_header_level(header_text, source_type):
        """
        Determine the appropriate level for a header based on both its SOURCE attribute
        and its numbering pattern
        """
        pattern_type, pattern_value = get_header_pattern(header_text)

        # Base level from SOURCE attribute
        if source_type == "HD1":
            base_level = 1
        elif source_type == "HD2":
            base_level = 2
        elif source_type == "HD3":
            base_level = 3
        else:
            base_level = 3  # Default for other source types

        # If there's no pattern, we use the SOURCE level
        if not pattern_type:
            return base_level, None, None

        # Check if this pattern is already used at a specific level
        for level, format_type in level_formats.items():
            if format_type == pattern_type:
                level_num = int(level.split('_')[1])
                return level_num, pattern_type, pattern_value

        # If we find a new pattern, assign it to the first empty level
        # starting from base_level
        for i in range(base_level, 6):
            level_key = f"level_{i}"
            if level_formats[level_key] is None:
                return i, pattern_type, pattern_value

        # Fallback
        return base_level, pattern_type, pattern_value

    for elem in root.iter():
        if elem.tag == "PRTPAGE":
            current_page = elem.attrib.get("P", None)

        if elem.tag == "GPH":
            figure_id = elem.find("GID").text if elem.find("GID") is not None else "Unknown"
            figure_span = elem.attrib.get("SPAN", "1")
            figure_deep = elem.attrib.get("DEEP", "0")
            figures.append({
                "figure_id": figure_id,
                "page_number": current_page,
                "span": figure_span,
                "deep": figure_deep
            })

        if elem.tag == "HD":
            header_text = "".join(elem.itertext()).strip()
            header_type = elem.attrib.get("SOURCE", "")

            # Determine the appropriate level for this header
            level, pattern_type, pattern_value = determine_header_level(header_text, header_type)

            # If we found a pattern, update the format used at this level
            if pattern_type and level_formats[f"level_{level}"] is None:
                level_formats[f"level_{level}"] = pattern_type

            # Update the current header at this level
            current_headers[f"level_{level}"] = header_text

            # Clear any lower-level headers
            for i in range(level + 1, 6):
                current_headers[f"level_{i}"] = None

            # Build the section path
            section_path = []
            for i in range(1, 6):
                header = current_headers[f"level_{i}"]
                if header:
                    section_path.append(header)

            section_name = " > ".join(section_path)

            # Create section entry
            section_entry = {
                "section_name": section_name,
                "section_level_1": current_headers["level_1"],
                "section_level_2": current_headers["level_2"],
                "section_level_3": current_headers["level_3"],
                "section_level_4": current_headers["level_4"],
                "section_level_5": current_headers["level_5"],
                "text_content": "",
                "page_number": current_page,
                "comments_responses": [],
                "final_decision": None
            }

            extracted_data.append(section_entry)

        if elem.tag == "P":
            para_text = "".join(elem.itertext()).strip()
            if not para_text:
                continue

            # Handle comments, responses, and final decisions
            if para_text.startswith("Comment:") or any(e is not None and e.text == "Comment:" for e in elem.findall(".//E")):
                current_comment = para_text
                current_response = current_final_decision = None
            elif para_text.startswith("Response:") or any(e is not None and e.text == "Response:" for e in elem.findall(".//E")):
                current_response = para_text
                if current_comment and extracted_data:
                    extracted_data[-1]["comments_responses"].append({
                        "comment": current_comment,
                        "response": current_response
                    })
                current_final_decision = None
            elif para_text.startswith("Final Decision:") or any(e is not None and e.text == "Final Decision:" for e in elem.findall(".//E")):
                current_final_decision = para_text
                if extracted_data:
                    extracted_data[-1]["final_decision"] = current_final_decision
                current_comment = current_response = None
            else:
                # Regular paragraph text
                if extracted_data:
                    if extracted_data[-1]["text_content"]:
                        extracted_data[-1]["text_content"] += "\n\n" + para_text
                    else:
                        extracted_data[-1]["text_content"] = para_text

    # Filter out sections with both empty text_content and empty comments_responses
    filtered_data = [
        section for section in extracted_data
        if section["text_content"].strip() or section["comments_responses"] or section["final_decision"]
    ]

    # Save the filtered data
    with open(output_json, "w", encoding="utf-8") as f:
        json.dump({
            "metadata": document_metadata,
            "sections": filtered_data,
            "figures": figures
        }, f, indent=4, ensure_ascii=False)

    print(f"Logical chunking completed. Data saved to {output_json}")
    print(f"Filtered out {len(extracted_data) - len(filtered_data)} empty sections.")

def process_file(xml_file_path, output_json):
    with open(xml_file_path, 'r', encoding='utf-8') as f:
        xml_content = f.read()
    extract_logical_chunks(xml_content, output_json)

def process_all_files(xml_folder, json_folder):
    if not os.path.exists(json_folder):
        os.makedirs(json_folder)

    for filename in os.listdir(xml_folder):
        if filename.endswith(".xml"):
            xml_path = os.path.join(xml_folder, filename)
            json_filename = os.path.splitext(filename)[0] + ".json"
            json_path = os.path.join(json_folder, json_filename)

            print(f"Processing {filename}...")
            process_file(xml_path, json_path)

In [5]:
process_file('hospice_2025_final.xml','hospice_2025_final_output.json')

Logical chunking completed. Data saved to hospice_2025_final_output.json
Filtered out 17 empty sections.


In [6]:
process_file('snf_2025_final.xml','snf_2025_final_output.json')

Logical chunking completed. Data saved to snf_2025_final_output.json
Filtered out 33 empty sections.


In [3]:
process_all_files("regulation_files/xml_files", "regulation_files/json_files")

Processing hospice_2025_final.xml...
Logical chunking completed. Data saved to regulation_files/json_files/hospice_2025_final.json
Filtered out 17 empty sections.
Processing snf_2025_final.xml...
Logical chunking completed. Data saved to regulation_files/json_files/snf_2025_final.json
Filtered out 33 empty sections.
