In [3]:
# Version 1 without handling multiple AP versions

import xml.etree.ElementTree as ET
import csv

# Load the XML data
xml_file_path = "C:/Users/dbaeva/Documents/Docs/master-probability-causality/notebooks/case-2/data/field-feedback/SDT_Mhf39t20241030_050854_FieldExperience_NXT.xml"

# Parse the XML
tree = ET.parse(xml_file_path)
root = tree.getroot()

# Function to flatten XML elements
def flatten_xml(element, parent_key="", sep="."):
    """Recursively flattens an XML element and its children."""
    items = {}
    # Include attributes
    for key, value in element.attrib.items():
        items[f"{parent_key}{sep}{key}".strip(sep)] = value
    # Include tag text separately
    if element.text and element.text.strip():
        items[f"{parent_key}.text".strip(sep)] = element.text.strip()
    # Recurse into children
    for child in element:
        items.update(flatten_xml(child, f"{parent_key}{sep}{child.tag}".strip(sep), sep=sep))
    return items

# Prepare rows for actions and solutions
actions = []
solutions = []

# Process each DdfActionPlan
for plan in root.findall(".//DdfActionPlan"):
    base_row = flatten_xml(plan)

    # Extract nested actions
    for action in plan.findall(".//DdfActionList/DdfAction"):
        action_row = base_row.copy()
        action_row.update(flatten_xml(action, "DdfAction"))
        actions.append(action_row)

    # Extract nested solutions
    for solution in plan.findall(".//DdfSolutionList/DdfSolution"):
        solution_row = base_row.copy()
        solution_row.update(flatten_xml(solution, "DdfSolution"))
        solutions.append(solution_row)

# Write actions to a CSV file
action_headers = sorted({key for row in actions for key in row.keys()})
action_csv_file = "C:/Users/dbaeva/Documents/Docs/master-probability-causality/notebooks/case-2/data/act-and-sol/actions-9aba6d6a38c6423da035ea1be76a3cd9-SDT_Mhf39t20241030_050854_FieldExperience_NXT.csv"
with open(action_csv_file, mode="w", newline="", encoding="utf-8") as file:
    writer = csv.DictWriter(file, fieldnames=action_headers)
    writer.writeheader()
    writer.writerows(actions)
print(f"Actions data successfully written to {action_csv_file}.")

# Write solutions to a CSV file
solution_headers = sorted({key for row in solutions for key in row.keys()})
solution_csv_file = "C:/Users/dbaeva/Documents/Docs/master-probability-causality/notebooks/case-2/data/act-and-sol/solutions-9aba6d6a38c6423da035ea1be76a3cd9-SDT_Mhf39t20241030_050854_FieldExperience_NXT.csv"
with open(solution_csv_file, mode="w", newline="", encoding="utf-8") as file:
    writer = csv.DictWriter(file, fieldnames=solution_headers)
    writer.writeheader()
    writer.writerows(solutions)
print(f"Solutions data successfully written to {solution_csv_file}.")

Actions data successfully written to actions-9aba6d6a38c6423da035ea1be76a3cd9-SDT_Mhf39t20241208_030645_FieldExperience_NXT.csv.
Solutions data successfully written to solutions-9aba6d6a38c6423da035ea1be76a3cd9-SDT_Mhf39t20241208_030645_FieldExperience_NXT.csv.


Version 2 with handling multiple AP versions

In [28]:
import xml.etree.ElementTree as ET
import csv
import os

# Load the XML data
xml_file_path = "data/field-feedback/SDT_M9905t20231019_083344_FieldExperience_NXT.xml"

# Parse the XML
tree = ET.parse(xml_file_path)
root = tree.getroot()

# Function to flatten XML elements
def flatten_xml(element, parent_key="", sep="."):
    """Recursively flattens an XML element and its children."""
    items = {}
    # Include attributes
    for key, value in element.attrib.items():
        items[f"{parent_key}{sep}{key}".strip(sep)] = value
    # Include tag text separately
    if element.text and element.text.strip():
        items[f"{parent_key}.text".strip(sep)] = element.text.strip()
    # Recurse into children
    for child in element:
        items.update(flatten_xml(child, f"{parent_key}{sep}{child.tag}".strip(sep), sep=sep))
    return items

# Prepare rows for actions and solutions
actions = []
solutions = []

# Counter for versions
version_counter = 1

# Locate the DdfActionPlanList and process each DdfActionPlan
for plan in root.findall(".//DdfActionPlanList/DdfActionPlan"):
    version_label = f"v{version_counter}"  # Label to distinguish versions
    version_counter += 1

    # Extract actions and solutions for the current version
    actions_version = []
    solutions_version = []

    base_row = flatten_xml(plan)

    # Extract nested actions
    for action in plan.findall(".//DdfActionList/DdfAction"):
        action_row = base_row.copy()
        action_row.update(flatten_xml(action, "DdfAction"))
        actions_version.append(action_row)

    # Extract nested solutions
    for solution in plan.findall(".//DdfSolutionList/DdfSolution"):
        solution_row = base_row.copy()
        solution_row.update(flatten_xml(solution, "DdfSolution"))
        solutions_version.append(solution_row)

    # Debug: Print extracted counts
    print(f"Processing ActionPlan version {version_label}...")
    print(f"  Extracted {len(actions_version)} actions and {len(solutions_version)} solutions.")

    # Write version-specific files
    base_filename = os.path.splitext(os.path.basename(xml_file_path))[0]
    actions_folder = "act-and-sol/actions"
    solutions_folder = "act-and-sol/solutions"
    os.makedirs(actions_folder, exist_ok=True)
    os.makedirs(solutions_folder, exist_ok=True)

    # Write actions for this version
    if actions_version:
        action_headers = sorted({key for row in actions_version for key in row.keys()})
        action_csv_file = os.path.join(actions_folder, f"actions-{base_filename}-{version_label}.csv")
        with open(action_csv_file, mode="w", newline="", encoding="utf-8") as file:
            writer = csv.DictWriter(file, fieldnames=action_headers)
            writer.writeheader()
            writer.writerows(actions_version)
        print(f"  Actions for {version_label} written to {action_csv_file}.")
    else:
        print(f"  No actions to write for version {version_label}.")

    # Write solutions for this version
    if solutions_version:
        solution_headers = sorted({key for row in solutions_version for key in row.keys()})
        solution_csv_file = os.path.join(solutions_folder, f"solutions-{base_filename}-{version_label}.csv")
        with open(solution_csv_file, mode="w", newline="", encoding="utf-8") as file:
            writer = csv.DictWriter(file, fieldnames=solution_headers)
            writer.writeheader()
            writer.writerows(solutions_version)
        print(f"  Solutions for {version_label} written to {solution_csv_file}.")
    else:
        print(f"  No solutions to write for version {version_label}.")

Processing ActionPlan version v1...
  Extracted 6 actions and 7 solutions.
  Actions for v1 written to act-and-sol/actions\actions-SDT_M9905t20231019_083344_FieldExperience_NXT-v1.csv.
  Solutions for v1 written to act-and-sol/solutions\solutions-SDT_M9905t20231019_083344_FieldExperience_NXT-v1.csv.
