<a href="https://colab.research.google.com/github/astro-petern/docs/blob/main/engineering_harness_to_connectivity.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# RapidHarness to XML File

Converts rapid harnes files into an xml compatible with the cable checker


In [22]:
from google.colab import files, drive
import pandas as pd
from ipywidgets import Button, HBox, Output, VBox, Layout, widgets
from IPython.display import display, clear_output
files_to_process = []
import numpy as np


In [23]:
mapping_file = "/content/AFHN-000226 Manufacturing Data(Wiring Table).csv"
sheed_name = "Wiring Table"
HEADER_IDENTIFIER_VALUE="From"

### File Processing and Confirmation

This code processs a data file - output from Rapid Harness - and make sure it's in the correct format before we use it.

Specifically, it:

*   Sets up a place to show you messages.
*   Has a function that reads a file you give it (either an Excel or CSV file).
*   Tries to clean up the data a bit by removing empty rows and columns.
*   Shows you a preview of the first few rows of the data it read.
*   Asks you to confirm if this is the right file using "Yes" or "No" buttons.
*   If you click "Yes", it saves the data so we can use it later. If you click "No", it skips the file.

In [24]:
import pandas as pd
from google.colab import files
from ipywidgets import Button, HBox, VBox, Output, Layout
from IPython.display import display, clear_output

# Create an output widget to display messages and data
output = Output()
display(output)

# This is where we will store the DataFrame after confirmation
df_confirmed = None
files_to_process = []

def process_and_confirm_file(filepath):
    """Processes a single file and prompts for confirmation."""
    global df_confirmed
    with output:
        clear_output()
        if not filepath:
            print("No file path provided.")
            return

        df = None

        # Check file extension and read accordingly
        if filepath.endswith(('.xlsx', '.xls')):
            print(f"\nAttempting to read Excel file: '{filepath}'")
            try:
                # Read the first sheet as the user indicated a simpler format
                df = pd.read_excel(filepath)

            except Exception as e:
                print(f"Error reading Excel file: {e}")
                return # Skip to the next file on error

        elif filepath.endswith('.csv'):
            print(f"\nAttempting to read CSV file: '{filepath}')")
            df = pd.read_csv(filepath)
        else:
            print("\nError: Unsupported file type. Please upload an Excel (.xlsx, .xls) or CSV (.csv) file.")
            return # Skip to the next file if file type is unsupported

        if df is not None and not df.empty:
            # Remove leading and top NaN padding
            df = df.dropna(axis=0, how='all').dropna(axis=1, how='all')
            df = df.fillna('_')

            print(f"\n--- '{filepath}' Preview ---")
            display(df.head(100))
            print("\nIs this the correct file? Please confirm below.")

            # Create confirmation buttons
            yes_button = Button(description="Yes, correct", button_style='success')
            no_button = Button(description="No, skip", button_style='danger') # Changed text to reflect skipping

            def on_yes_clicked(btn):
                global df_confirmed
                with output:
                    # clear_output()
                    df_confirmed = df
                    print(f"File '{filepath}' confirmed. The DataFrame is now available as 'df_confirmed'.")

            def on_no_clicked(btn):
                with output:
                    clear_output()
                    print(f"File '{filepath}' skipped. Please re-run the file upload cell if you need to process it.")

            yes_button.on_click(on_yes_clicked)
            no_button.on_click(on_no_clicked)

            display(HBox([yes_button, no_button]))
        else:
            print(f"Could not read the file or the file '{filepath}' is empty.")

# Clear the existing files_to_process list and add the new file
files_to_process.clear()
files_to_process.append(mapping_file)


# Iterate through the files_to_process list and process each file
for file_path in files_to_process:
    process_and_confirm_file(file_path)

Output()

### Data Cleaning and Header Identification

This code block focuses on cleaning the confirmed DataFrame (`df_confirmed`) to prepare it for further processing. It specifically addresses the issue of identifying and setting the correct header row, which is embedded within the data rather than being the first row.


In [30]:
if df_confirmed.empty:
    print("❌ DataFrame is empty after dropping highly sparse rows.")
else:
    df_tmp = df_confirmed.copy()

    # --- DEBUGGED HEADER IDENTIFICATION ---
    # The previous approach (finding the first non-NaN row) incorrectly selected metadata.
    # The correct header starts with the known value 'From'.

    # 1️⃣ Find the index of the row where the first column (index 0) is the header identifier.
    # We must ensure the comparison is robust by checking for string type and stripping whitespace.
    header_row_check = df_tmp.iloc[:, 0].apply(
        lambda x: isinstance(x, str) and x.strip() == HEADER_IDENTIFIER_VALUE
    )

    if header_row_check.any():
        true_header_idx = header_row_check[header_row_check].index[0]
        print(f"✅ True header row containing '{HEADER_IDENTIFIER_VALUE}' detected at original index: {true_header_idx}")

        # 2️⃣ Extract potential header row
        raw_header = df_tmp.loc[true_header_idx].tolist()

        # 3️⃣ Clean header names (This function is kept for general robustness)
        def clean_name(name, idx):
            """Return a safe, non-placeholder header name."""
            if pd.isna(name):
                return f"col_{idx}"
            if not isinstance(name, str):
                return str(name).strip() if name is not None else f"col_{idx}"

            cleaned = name.strip()
            # Treat as placeholder if empty or just underscores/dashes
            if cleaned == "" or cleaned.replace("_", "").replace("-", "") == "":
                return f"col_{idx}"
            return cleaned

        cleaned_header = [clean_name(val, i) for i, val in enumerate(raw_header)]

        print("\nDetected raw header row:", raw_header)
        print("Cleaned header row:", cleaned_header)

        # 4️⃣ Apply new header and drop rows ABOVE and INCLUDING the header row index
        df_cleaned = df_tmp.loc[true_header_idx + 1 :].copy()
        df_cleaned.columns = cleaned_header
        df_cleaned.reset_index(drop=True, inplace=True)

        # 5️⃣ Preview
        print("\n--- Preview of Final Cleaned DataFrame (Wiring Table) ---")
        # Display the first 10 rows for a cleaner view
        display(df_cleaned.head(25))

        print("\n💾 Cleaned DataFrame stored as 'df_confirmed_clean'.")
        # Store the result in the expected output variable
        df_confirmed_clean = df_cleaned

    else:
        print(f"❌ Error: Could not find the expected header row starting with '{HEADER_IDENTIFIER_VALUE}'.")


✅ True header row containing 'From' detected at original index: 8

Detected raw header row: ['From', 'To', 'Conductor', 'Twisted With', 'Size', 'Length', 'From Contact IPN', 'To Contact IPN']
Cleaned header row: ['From', 'To', 'Conductor', 'Twisted With', 'Size', 'Length', 'From Contact IPN', 'To Contact IPN']

--- Preview of Final Cleaned DataFrame (Wiring Table) ---


Unnamed: 0,From,To,Conductor,Twisted With,Size,Length,From Contact IPN,To Contact IPN
0,J1.1,P15.1,CBL1.White,Blue,26 AWG,0 in,FC_RS422_AB_P,FC_RS422_AB_P
1,J1.2,P15.3,CBL2.White,Blue,26 AWG,0 in,FC_RS422_YZ_P,FC_RS422_YZ_P
2,J1.3,P15.5,W1.White,_,26 AWG,0 in,FORCE_RECOVERY_PIN,FORCE_RECOVERY_PIN
3,J1.4,P15.7,CBL3.White,Blue,26 AWG,0 in,FC_GROUND,FC_GROUND
4,J1.5,P15.9,W2.White,_,26 AWG,0 in,FC_ISO_GSE_GROUND,FC_ISO_GSE_GROUND
5,J1.6,P13.1,W3.White,_,26 AWG,0 in,TDI,TDI
6,J1.7,P13.3,W4.White,_,26 AWG,0 in,TCK_SWCLK,TCK_SWCLK
7,J1.8,P13.5,W5.White,_,26 AWG,0 in,NRST,NRST
8,J1.9,P13.8,CBL4.White,Blue,26 AWG,0 in,ENET_PORT_B_RX_P,ENET_PORT_B_RX_P
9,J1.10,P13.11,CBL5.White,Blue,26 AWG,0 in,ENET_PORT_B_TX_P,ENET_PORT_B_TX_P



💾 Cleaned DataFrame stored as 'df_confirmed_clean'.


In [32]:
"""
### Connector Configuration for RapidEye
This section allows you to configure each connector's properties for RapidEye XML generation.
"""

import pandas as pd
from ipywidgets import Button, VBox, HBox, Output, Layout, widgets, Label
from IPython.display import display, clear_output

# Global storage for connector configurations
connector_configs = {}

def setup_connector_configuration(df):
    """
    Creates an interactive form to configure connector properties for RapidEye.
    """
    global connector_configs

    # Create a fresh output widget for this run
    config_output = Output()
    display(config_output)

    with config_output:
        # Extract unique connectors from From and To columns
        df_working = df[['From', 'To']].copy()
        df_working[['From_Con', 'From_Pin']] = df_working['From'].str.split('.', n=1, expand=True)
        df_working[['To_Con', 'To_Pin']] = df_working['To'].str.split('.', n=1, expand=True)

        unique_connectors = pd.concat([df_working['From_Con'], df_working['To_Con']]).dropna().unique()

        # Determine which connectors are primarily "From" (Input) or "To" (Output)
        from_connectors = set(df_working['From_Con'].dropna().unique())
        to_connectors = set(df_working['To_Con'].dropna().unique())

        print("="*80)
        print(f"CONNECTOR CONFIGURATION - Found {len(unique_connectors)} connectors")
        print("="*80)
        print(f"Connectors: {sorted(unique_connectors)}\n")

        # Create widgets for each connector
        connector_widgets = {}
        connector_boxes = []  # Store all boxes to display at once

        for idx, conn_name in enumerate(sorted(unique_connectors)):
            # Determine smart defaults
            is_input = conn_name in from_connectors and conn_name not in to_connectors
            is_output = conn_name in to_connectors and conn_name not in from_connectors

            # Default type based on usage
            if is_input:
                default_type = "Input"
                default_bank = "B1"
            elif is_output:
                default_type = "Output"
                # Distribute outputs across B2-B5
                default_bank = f"B{2 + (idx % 4)}"
            else:
                default_type = "Both"
                default_bank = "B1"

            # Try to guess connector type from name or default to HDR-64
            default_contype = "HDR-64"

            # Create widgets for this connector
            bank_widget = widgets.Dropdown(
                options=['B1', 'B2', 'B3', 'B4', 'B5'],
                value=default_bank,
                description='Bank:',
                style={'description_width': '100px'},
                layout=Layout(width='200px')
            )

            contype_widget = widgets.Dropdown(
                options=['HDR-64', 'HDR-24', 'HDR-32', 'HDR-16', 'HDR-8', 'Custom'],
                value=default_contype,
                description='ConName:',
                style={'description_width': '100px'},
                layout=Layout(width='200px')
            )

            custom_contype_widget = widgets.Text(
                value='',
                description='Custom:',
                placeholder='e.g., HDR-128',
                style={'description_width': '100px'},
                layout=Layout(width='200px'),
                disabled=True
            )

            # Enable custom field when Custom is selected
            def make_contype_handler(custom_widget):
                def on_contype_change(change):
                    custom_widget.disabled = (change['new'] != 'Custom')
                return on_contype_change

            contype_widget.observe(make_contype_handler(custom_contype_widget), names='value')

            pins_widget = widgets.IntText(
                value=64,
                description='Pins:',
                style={'description_width': '100px'},
                layout=Layout(width='200px')
            )

            offset_widget = widgets.IntText(
                value=0,
                description='Pin Offset:',
                style={'description_width': '100px'},
                layout=Layout(width='200px')
            )

            type_widget = widgets.Dropdown(
                options=['Input', 'Output', 'Both'],
                value=default_type,
                description='Type:',
                style={'description_width': '100px'},
                layout=Layout(width='200px')
            )

            # Store widgets
            connector_widgets[conn_name] = {
                'bank': bank_widget,
                'contype': contype_widget,
                'custom_contype': custom_contype_widget,
                'pins': pins_widget,
                'offset': offset_widget,
                'type': type_widget
            }

            # Create a labeled section for this connector
            header = Label(
                value=f"━━━ {conn_name} (RapidHarness ID) ━━━",
                style={'font_weight': 'bold', 'font_size': '14px'}
            )

            connector_box = VBox([
                header,
                HBox([bank_widget, contype_widget]),
                HBox([custom_contype_widget, pins_widget]),
                HBox([offset_widget, type_widget]),
            ], layout=Layout(margin='10px 0px', padding='10px', border='1px solid #ddd'))

            connector_boxes.append(connector_box)

    # Display all connector boxes OUTSIDE the with block
    for box in connector_boxes:
        display(box)

    # Create and display submit button OUTSIDE the with block
    submit_button = Button(
        description="✓ Confirm Configuration",
        button_style='success',
        layout=Layout(width='300px', height='40px', margin='20px 0px')
    )

    # Create output for confirmation message
    confirm_output = Output()

    def on_submit(btn):
        """Save all configurations"""
        global connector_configs
        connector_configs = {}

        for conn_name, widgets_dict in connector_widgets.items():
            # Get connector type (use custom if specified)
            contype = widgets_dict['contype'].value
            if contype == 'Custom' and widgets_dict['custom_contype'].value:
                contype = widgets_dict['custom_contype'].value
            elif contype == 'Custom':
                contype = 'HDR-64'  # Fallback if custom not specified

            connector_configs[conn_name] = {
                'rhid': conn_name,  # RapidHarness ID
                'bank': widgets_dict['bank'].value,  # RapidEye Bank
                'conname': contype,  # Connector type (HDR-64, etc.)
                'conid': contype,    # Same as conname
                'pins': widgets_dict['pins'].value,
                'offset': widgets_dict['offset'].value,
                'type': widgets_dict['type'].value
            }

        with confirm_output:
            clear_output()
            print("\n" + "="*80)
            print("✅ CONNECTOR CONFIGURATION SAVED")
            print("="*80)
            print("\nRapidHarness → RapidEye Mapping:")
            print("-" * 80)
            print(f"{'RH ID':>8} | {'Bank':>6} | {'ConName':>10} | {'Pins':>5} | {'Offset':>7} | {'Type':>8}")
            print("-" * 80)
            for rh_name, config in sorted(connector_configs.items(), key=lambda x: x[1]['bank']):
                print(f"{config['rhid']:>8} | {config['bank']:>6} | {config['conname']:>10} | "
                      f"{config['pins']:>5} | {config['offset']:>7} | {config['type']:>8}")
            print("-" * 80)
            print("\n✓ Configuration stored in 'connector_configs' dictionary")
            print("✓ You can now proceed with XML generation\n")

    submit_button.on_click(on_submit)

    # Display button and confirmation output
    display(submit_button)
    display(confirm_output)

# Run the configuration setup if df_confirmed_clean exists
if 'df_confirmed_clean' in globals():
    setup_connector_configuration(df_confirmed_clean)
else:
    print("⚠️ Please run the data cleaning section first to create 'df_confirmed_clean'")

Output()

VBox(children=(Label(value='━━━ J1 (RapidHarness ID) ━━━'), HBox(children=(Dropdown(description='Bank:', layou…

VBox(children=(Label(value='━━━ P13 (RapidHarness ID) ━━━'), HBox(children=(Dropdown(description='Bank:', inde…

VBox(children=(Label(value='━━━ P15 (RapidHarness ID) ━━━'), HBox(children=(Dropdown(description='Bank:', inde…

Button(button_style='success', description='✓ Confirm Configuration', layout=Layout(height='40px', margin='20p…

Output()

XML Generation

Takes a cleaned pandas DataFrame (assumed to be the wiring table) and converts it into an XML string following a specific format.

In [33]:
"""
### Enhanced XML Generation for RapidEye
This function generates XML using RapidEye format with Bank naming convention.
"""

import xml.etree.ElementTree as ET
from xml.dom import minidom

def dataframe_to_xml_rapideye(df, connector_configs, cable_name="AFHN-000226_HARNESS"):
    """
    Converts the cleaned wiring table DataFrame into RapidEye XML format.

    Args:
        df: Cleaned DataFrame with 'From' and 'To' columns
        connector_configs: Dictionary with connector configurations
        cable_name: Name of the cable/harness

    Returns:
        Formatted XML string
    """
    if df.empty:
        return "<!-- Empty DataFrame, no XML generated. -->"

    if not connector_configs:
        print("⚠️ Warning: No connector configurations found. Please run configuration first.")
        return "<!-- No connector configurations found -->"

    # 1. Prepare XML structure
    root = ET.Element("CableList")
    cable = ET.SubElement(root, "Cable", Name=cable_name)
    connectors_element = ET.SubElement(cable, "Connectors")
    fromto_element = ET.SubElement(cable, "FromTo")

    # 2. Pin and Connector Extraction
    df_working = df[['From', 'To']].copy()
    df_working[['From_Con', 'From_Pin']] = df_working['From'].str.split('.', n=1, expand=True)
    df_working[['To_Con', 'To_Pin']] = df_working['To'].str.split('.', n=1, expand=True)

    # 3. Generate <Connectors> block using RapidEye format
    print("\n3. Generating RapidEye Connectors...")
    print("-" * 80)
    print(f"{'Bank':>6} | {'ConName':>10} | {'ConID':>10} | {'RHID':>8} | {'Pins':>5}")
    print("-" * 80)

    # Sort by bank for cleaner output
    for rh_name, config in sorted(connector_configs.items(), key=lambda x: x[1]['bank']):
        conn = ET.SubElement(
            connectors_element,
            "Connector",
            Name=config['bank'],        # RapidEye Bank (B1, B2, etc.)
            ConName=config['conname'],  # Connector type (HDR-64, etc.)
            ConID=config['conid'],      # Same as ConName
            RHID=config['rhid']         # RapidHarness ID (J1, P15, etc.)
        )

        # Use configured pin count
        ET.SubElement(conn, "Pins").text = str(config['pins'])

        print(f"{config['bank']:>6} | {config['conname']:>10} | {config['conid']:>10} | "
              f"{config['rhid']:>8} | {config['pins']:>5}")

    print("-" * 80)

    # 4. Generate <FromTo> block (Cx tags) using Bank:Pin format
    print("\n4. Generating Pin Mappings (Bank:Pin format)...")

    # Create reverse lookup: RapidHarness name → Bank
    rh_to_bank = {config['rhid']: config for config in connector_configs.values()}

    connection_count = 0
    for _, row in df_working.iterrows():
        from_rh = row['From_Con']
        to_rh = row['To_Con']
        from_pin = row['From_Pin']
        to_pin = row['To_Pin']

        # Look up the bank and offset for each connector
        from_config = rh_to_bank.get(from_rh)
        to_config = rh_to_bank.get(to_rh)

        if not from_config or not to_config:
            print(f"⚠️ Warning: Connector {from_rh} or {to_rh} not found in configuration. Skipping.")
            continue

        try:
            # Calculate actual pin numbers with offset
            from_pin_num = int(from_pin) + from_config['offset']
            to_pin_num = int(to_pin) + to_config['offset']

            # Format: BANK:PIN (e.g., B1:1, B4:20)
            from_attr = f"{from_config['bank']}:{from_pin_num}"
            to_attr = f"{to_config['bank']}:{to_pin_num}"
        except (ValueError, TypeError) as e:
            # If pin numbers aren't numeric, use as-is without offset
            print(f"⚠️ Warning: Non-numeric pin {from_pin} or {to_pin}. Using without offset.")
            from_attr = f"{from_config['bank']}:{from_pin}"
            to_attr = f"{to_config['bank']}:{to_pin}"

        # Determine connection type based on connector types
        from_type = from_config['type']
        to_type = to_config['type']

        if from_type == "Input" and to_type == "Output":
            conn_type = "Wire"
        elif from_type == to_type:
            conn_type = "Internal"
        else:
            conn_type = "Wire"

        ET.SubElement(
            fromto_element,
            "Cx",
            From=from_attr,
            To=to_attr,
            Type=conn_type
        )
        connection_count += 1

    print(f"✓ Generated {connection_count} pin mappings")

    # 5. Format and return XML string
    rough_string = ET.tostring(root, 'utf-8')
    reparsed = minidom.parseString(rough_string)

    return reparsed.toprettyxml(indent="  ")


# Generate XML using the RapidEye format
if 'df_confirmed_clean' in globals() and 'connector_configs' in globals() and connector_configs:
    print("\n" + "="*80)
    print("GENERATING RAPIDEYE XML")
    print("="*80)

    xml_output = dataframe_to_xml_rapideye(
        df_confirmed_clean,
        connector_configs,
        cable_name="AFHN-000226_HARNESS_WIRING"
    )

    print("\n" + "="*80)
    print("XML OUTPUT PREVIEW")
    print("="*80)
    xml_lines = xml_output.split('\n')

    # Show first 50 lines
    preview_lines = min(50, len(xml_lines))
    print('\n'.join(xml_lines[:preview_lines]))

    if len(xml_lines) > preview_lines:
        print(f"\n... ({len(xml_lines) - preview_lines} more lines) ...")
        print("\nShowing last 10 lines:")
        print('\n'.join(xml_lines[-10:]))

    print("\n" + "="*80)
    print("✅ XML generation complete. Stored in 'xml_output' variable.")
    print("="*80)
else:
    print("⚠️ Please complete the connector configuration step first.")


GENERATING RAPIDEYE XML

3. Generating RapidEye Connectors...
--------------------------------------------------------------------------------
  Bank |    ConName |      ConID |     RHID |  Pins
--------------------------------------------------------------------------------
    B1 |     HDR-64 |     HDR-64 |       J1 |    64
    B3 |     HDR-64 |     HDR-64 |      P13 |    32
    B4 |     HDR-64 |     HDR-64 |      P15 |     9
--------------------------------------------------------------------------------

4. Generating Pin Mappings (Bank:Pin format)...
✓ Generated 30 pin mappings

XML OUTPUT PREVIEW
<?xml version="1.0" ?>
<CableList>
  <Cable Name="AFHN-000226_HARNESS_WIRING">
    <Connectors>
      <Connector Name="B1" ConName="HDR-64" ConID="HDR-64" RHID="J1">
        <Pins>64</Pins>
      </Connector>
      <Connector Name="B3" ConName="HDR-64" ConID="HDR-64" RHID="P13">
        <Pins>32</Pins>
      </Connector>
      <Connector Name="B4" ConName="HDR-64" ConID="HDR-64" RHID="P

### Saving XML File

This code block saves the generated XML output to a file.

1.  Define Output Path using `XML_OUTPUT_PATH`.
2.  Print Status
3.  Save XML to File

In [34]:
XML_OUTPUT_PATH = "/content/AFHN-000226 Manufacturing Data(Wiring Table).xml"
print(f"\n5. Saving XML to file: {XML_OUTPUT_PATH}...")
try:
    # Save the XML to the specified path in Colab
    with open(XML_OUTPUT_PATH, "w") as f:
        f.write(xml_output)
    print(f"✅ XML successfully saved to {XML_OUTPUT_PATH}")
except Exception as e:
    print(f"❌ Error saving XML file: {e}")


5. Saving XML to file: /content/AFHN-000226 Manufacturing Data(Wiring Table).xml...
✅ XML successfully saved to /content/AFHN-000226 Manufacturing Data(Wiring Table).xml
