This file contains the preprocessing
blah blah blah mount less than 3 or something

Flattening the files, this means turning the nested XMLs to a txt equivalent.

In [2]:
from pathlib import Path
import xml.etree.ElementTree as ET
from collections import defaultdict

# Set path to the 'manifolds' folder
folder_path = Path('./manifolds2')

files_with_few_clean_patterns = []
files_with_lot_of_mounting = []
files_with_no_connections = []
files_deleted = []

for xml_file in folder_path.glob('*.xml'):
    print(xml_file)

    should_delete = False

    try:
        tree = ET.parse(xml_file)
        root = tree.getroot()

        # Find patterns
        patterns = root.find('Patterns')
        pattern_list = patterns.findall('Pattern') if patterns is not None else []
        total_patterns = len(pattern_list)

        # Check how many patterns are "clean"
        clean_pattern_count = 0
        if patterns is not None:
            for pattern in pattern_list:
                holes = pattern.find('Holes')
                is_clean = True  # Assume clean unless proven otherwise
                if holes is not None:
                    for hole in holes.findall('Hole'):
                        mounting = hole.find('Mounting')
                        through = hole.find('Through')
                        if mounting is not None and through is not None:
                            if mounting.text == '1' or through.text == '1':
                                is_clean = False
                                break  # no need to check more holes in this pattern
                if is_clean:
                    clean_pattern_count += 1

        # Check mountings (Mounting == 1 anywhere in file)
        count_mounting = sum(1 for mounting in root.findall('.//Mounting') if mounting.text == '1')

        # Check connections
        pattern_connections = defaultdict(set)
        connection_groups = root.find('ConnectionGroups')
        if connection_groups is not None:
            for group in connection_groups.findall('ConnectionGroup'):
                connection_ids = []
                for conn in group.findall('.//Connection'):
                    id_pattern = conn.find('IdPattern')
                    if id_pattern is not None:
                        pattern_id = int(id_pattern.text)
                        connection_ids.append(pattern_id)

                for pid in connection_ids:
                    pattern_connections[pid].update(p for p in connection_ids if p != pid)

        has_connections = len(pattern_connections) > 0

        # Now the DELETE conditions
        if clean_pattern_count < 5:
            should_delete = True
            files_with_few_clean_patterns.append(xml_file.name)

        if count_mounting >= 50:
            should_delete = True
            files_with_lot_of_mounting.append(xml_file.name)

        if not has_connections:
            should_delete = True
            files_with_no_connections.append(xml_file.name)

    except Exception as e:
        print(f"Error parsing {xml_file.name}: {e}")
        continue

    if should_delete:
        try:
            xml_file.unlink()
            files_deleted.append(xml_file.name)
            print(f"Deleted {xml_file.name}")
        except PermissionError as e:
            print(f"Could not delete {xml_file.name}: {e}")

print(f'Files with <5 clean patterns (no Mounting/Through): {len(files_with_few_clean_patterns)}')
print(f'Files with >=50 mountings: {len(files_with_lot_of_mounting)}')
print(f'Files with no connection patterns: {len(files_with_no_connections)}')
print(f'Total files deleted: {len(set(files_deleted))}')


manifolds2\-F7015.xml
manifolds2\F0010008.xml
manifolds2\F006669-2.xml
manifolds2\F006765-1.xml
manifolds2\F006919-1.xml
Deleted F006919-1.xml
manifolds2\F007390-3.xml
manifolds2\F007407-3.xml
manifolds2\F007908-1.xml
manifolds2\F008068-1.xml
manifolds2\F008142-2.xml
manifolds2\F008473-1.xml
manifolds2\F008606-1.xml
manifolds2\F008686-1.xml
manifolds2\F008726.xml
Deleted F008726.xml
manifolds2\F008728.xml
manifolds2\F008729.xml
manifolds2\F008730.xml
Deleted F008730.xml
manifolds2\F008731.xml
manifolds2\F008732.xml
manifolds2\F008733.xml
manifolds2\F008734.xml
manifolds2\F008735-1.xml
manifolds2\F008735-2.xml
manifolds2\F008735.xml
manifolds2\F008738.xml
manifolds2\F008739-1.xml
manifolds2\F008739-2.xml
manifolds2\F008739-3.xml
manifolds2\F008739.xml
manifolds2\F008741.xml
manifolds2\F008742.xml
manifolds2\F008745.xml
manifolds2\F008746.xml
manifolds2\F008748-1.xml
manifolds2\F008749-1.xml
manifolds2\F008749.xml
manifolds2\F008750.xml
manifolds2\F008750_mountingholes.xml
manifolds2\F00

In [3]:
from pathlib import Path

# Path to the 'manifolds' folder
folder_path = Path('./manifolds2')

# Count all .xml files in the folder
file_count = len(list(folder_path.glob('*.xml')))

print(f'Total XML files in "manifolds" folder: {file_count}')


Total XML files in "manifolds" folder: 4895


In [12]:
# Cleans the XML files by removing unplaced patterns
import os
import xml.etree.ElementTree as ET

def clean_xml_in_place(input_path):
    tree = ET.parse(input_path)
    root = tree.getroot()

    # Step 1: Remove <Pattern> elements where <IsPlaced> == 0
    patterns_element = root.find('Patterns')
    if patterns_element is not None:
        for pattern in list(patterns_element.findall('Pattern')):
            is_placed = pattern.find('IsPlaced')
            if is_placed is not None and is_placed.text.strip() == '0':
                patterns_element.remove(pattern)

    # Overwrite the original XML file
    tree.write(input_path, encoding='utf-8', xml_declaration=True)

def process_all_manifolds(folder='manifolds'):
    # Loop through all .XML files
    for filename in os.listdir(folder):
        if filename.lower().endswith('.xml'):
            input_path = os.path.join(folder, filename)
            print(f"Cleaning: {filename}")
            clean_xml_in_place(input_path)

# Example usage
process_all_manifolds('manifolds')


Cleaning: -F7015.xml
Cleaning: F0010008.xml
Cleaning: F006669-2.xml
Cleaning: F006765-1.xml
Cleaning: F007390-3.xml
Cleaning: F007407-3.xml
Cleaning: F007908-1.xml
Cleaning: F008068-1.xml
Cleaning: F008142-2.xml
Cleaning: F008473-1.xml
Cleaning: F008606-1.xml
Cleaning: F008686-1.xml
Cleaning: F008728.xml
Cleaning: F008729.xml
Cleaning: F008731.xml
Cleaning: F008732.xml
Cleaning: F008733.xml
Cleaning: F008734.xml
Cleaning: F008735-1.xml
Cleaning: F008735-2.xml
Cleaning: F008735.xml
Cleaning: F008738.xml
Cleaning: F008739-1.xml
Cleaning: F008739-2.xml
Cleaning: F008739-3.xml
Cleaning: F008739.xml
Cleaning: F008741.xml
Cleaning: F008742.xml
Cleaning: F008745.xml
Cleaning: F008746.xml
Cleaning: F008748-1.xml
Cleaning: F008749-1.xml
Cleaning: F008749.xml
Cleaning: F008750.xml
Cleaning: F008750_mountingholes.xml
Cleaning: F008755.xml
Cleaning: F008756-1.xml
Cleaning: F008756.xml
Cleaning: F008757.xml
Cleaning: F008758.xml
Cleaning: F008759.xml
Cleaning: F008760.xml
Cleaning: F008761.xml
Clea

In [14]:
# Flattens the XML files to text format
import xml.etree.ElementTree as ET
from pathlib import Path

def parse_minimal_corrected_with_faces_to_text(xml_file):
    with open(xml_file, 'rb') as f:  # Note 'rb' for reading as bytes
        tree = ET.parse(f)

    root = tree.getroot()

    lines = []

    # 1. Block dimensions
    dimensions_node = root.find('Dimensions')
    lines.append("Block Dimensions:")
    lines.append(f"  X: {dimensions_node.findtext('X')}")
    lines.append(f"  Y: {dimensions_node.findtext('Y')}")
    lines.append(f"  Z: {dimensions_node.findtext('Z')}")
    lines.append("")  # blank line

    # 2. Faces
    face_definitions_node = root.find('FaceDefinitions')
    if face_definitions_node is not None:
        lines.append("Faces:")
        for face_def in face_definitions_node.findall('FaceDefinition'):
            face_id = face_def.findtext('Face')
            lines.append(f"  Face {face_id}:")
            lines.append(f"    Origin: ({face_def.find('Origin/X').text}, {face_def.find('Origin/Y').text}, {face_def.find('Origin/Z').text})")
            lines.append(f"    Inwards: ({face_def.find('InWards/X').text}, {face_def.find('InWards/Y').text}, {face_def.find('InWards/Z').text})")
        lines.append("")

    # 3. Holes
    lines.append("Holes:")
    for pattern in root.find('Patterns').findall('Pattern'):
        pattern_id = pattern.findtext('Id')
        face = pattern.findtext('Face')
        current_position = pattern.find('CurrentPosition')
        pos_x = current_position.findtext('X')
        pos_y = current_position.findtext('Y')
        pos_z = current_position.findtext('Z')

        for hole in pattern.find('Holes').findall('Hole'):
            hole_id = hole.findtext('ID')
            mounting = hole.findtext('Mounting')
            through = hole.findtext('Through')

            # Interpret the mounting and through flags
            hole_type = []
            if mounting == "1":
                hole_type.append("Mounting Hole")
            if through == "1":
                hole_type.append("Through Hole")
            type_desc = " (" + ", ".join(hole_type) + ")" if hole_type else ""

            lines.append(f"  Hole {hole_id} (Pattern {pattern_id}, Face {face}) at Position ({pos_x}, {pos_y}, {pos_z}){type_desc}:")

            current_direction = hole.find('CurrentDirection')
            dir_x = current_direction.findtext('X')
            dir_y = current_direction.findtext('Y')
            dir_z = current_direction.findtext('Z')
            lines.append(f"    Direction: ({dir_x}, {dir_y}, {dir_z})")

            for step in hole.find('Steps').findall('Step'):
                step_number = step.findtext('StepNumber')
                length = step.findtext('Length')
                diameter = step.findtext('Diameter')
                step_line = f"    Step {step_number}: Diameter {diameter}, Length {length}"

                if step.find('PortMin') is not None:
                    port_min = step.findtext('PortMin')
                    step_line += f", PortMin {port_min}"
                if step.find('PortMax') is not None:
                    port_max = step.findtext('PortMax')
                    step_line += f", PortMax {port_max}"

                lines.append(step_line)
            lines.append("")  # extra space after each hole


    # 4. Connection Groups
    lines.append("Connection Groups:")
    for group in root.find('ConnectionGroups').findall('ConnectionGroup'):
        group_id = group.findtext('ID')
        lines.append(f"  Group {group_id}:")
        for conn in group.find('Connections').findall('Connection'):
            pattern_id = conn.findtext('IdPattern')
            step_number = conn.findtext('StepNumber')
            lines.append(f"    Connect Pattern {pattern_id} Step {step_number}")
        lines.append("")

    return lines

# Batch processing code
input_folder = Path('./manifolds')
output_folder = Path('./manifolds_txt')
output_folder.mkdir(exist_ok=True)

for xml_path in input_folder.glob('*.XML'):
    try:
        text_lines = parse_minimal_corrected_with_faces_to_text(xml_path)
        output_file = output_folder / f'{xml_path.stem}.txt'

        with open(output_file, 'w') as f:
            for line in text_lines:
                f.write(line + '\n')

        print(f"Saved {output_file}")

    except Exception as e:
        print(f"Failed to process {xml_path.name}: {e}")


Saved manifolds_txt\-F7015.txt
Saved manifolds_txt\F0010008.txt
Saved manifolds_txt\F006669-2.txt
Saved manifolds_txt\F006765-1.txt
Saved manifolds_txt\F007390-3.txt
Saved manifolds_txt\F007407-3.txt
Saved manifolds_txt\F007908-1.txt
Saved manifolds_txt\F008068-1.txt
Saved manifolds_txt\F008142-2.txt
Saved manifolds_txt\F008473-1.txt
Saved manifolds_txt\F008606-1.txt
Saved manifolds_txt\F008686-1.txt
Saved manifolds_txt\F008728.txt
Saved manifolds_txt\F008729.txt
Saved manifolds_txt\F008731.txt
Saved manifolds_txt\F008732.txt
Saved manifolds_txt\F008733.txt
Saved manifolds_txt\F008734.txt
Saved manifolds_txt\F008735-1.txt
Saved manifolds_txt\F008735-2.txt
Saved manifolds_txt\F008735.txt
Saved manifolds_txt\F008738.txt
Saved manifolds_txt\F008739-1.txt
Saved manifolds_txt\F008739-2.txt
Saved manifolds_txt\F008739-3.txt
Saved manifolds_txt\F008739.txt
Saved manifolds_txt\F008741.txt
Saved manifolds_txt\F008742.txt
Saved manifolds_txt\F008745.txt
Saved manifolds_txt\F008746.txt
Saved mani

In [15]:
import os
import random
import shutil

def split_files(folder_path, train_ratio=0.9):
    # Ensure the folder exists
    if not os.path.exists(folder_path):
        raise FileNotFoundError(f"The folder {folder_path} does not exist.")

    # Paths for train and test folders
    train_folder = os.path.join(folder_path, 'train')
    test_folder = os.path.join(folder_path, 'test')

    # Create train and test folders if they don't exist
    os.makedirs(train_folder, exist_ok=True)
    os.makedirs(test_folder, exist_ok=True)

    # List all .txt files in the folder
    files = [f for f in os.listdir(folder_path) if f.endswith('.txt') and os.path.isfile(os.path.join(folder_path, f))]

    # Shuffle the files
    random.shuffle(files)

    # Calculate the split index
    split_index = int(len(files) * train_ratio)

    # Split the files
    train_files = files[:split_index]
    test_files = files[split_index:]

    # Move files
    for file in train_files:
        shutil.move(os.path.join(folder_path, file), os.path.join(train_folder, file))

    for file in test_files:
        shutil.move(os.path.join(folder_path, file), os.path.join(test_folder, file))

    print(f"Moved {len(train_files)} files to train folder and {len(test_files)} files to test folder.")

split_files('manifolds_txt')


Moved 4405 files to train folder and 490 files to test folder.


In [16]:
import random
import json
from pathlib import Path

def save_shortest_text_jsonl(source_dir, output_file, num_files):
    source_dir = Path(source_dir)
    txt_files = list(source_dir.glob("*.txt"))

    # Read file lengths
    file_lengths = []
    for file in txt_files:
        with open(file, "r", encoding="utf-8") as f:
            text = f.read()
            file_lengths.append((file, len(text)))

    # Sort files by length (ascending)
    file_lengths.sort(key=lambda x: x[1])

    # Select the shortest num_files
    selected_files = [file for file, _ in file_lengths[:num_files]]

    examples = []
    for file in selected_files:
        with open(file, "r", encoding="utf-8") as f:
            text = f.read()

        examples.append({
            "input": text,
            "metadata": {
                "source_file": file.name
            }
        })

    with open(output_file, "w", encoding="utf-8") as f:
        for ex in examples:
            f.write(json.dumps(ex) + "\n")

    print(f"Saved {len(examples)} shortest entries to {output_file}")


save_shortest_text_jsonl(
    source_dir="./manifolds_txt/test",
    output_file="test_input_100.jsonl",
    num_files=100
)

save_shortest_text_jsonl(
    source_dir="./manifolds_txt/train",
    output_file="train_input_900.jsonl",
    num_files=900
)

Saved 100 shortest entries to test_input_100.jsonl
Saved 900 shortest entries to train_input_900.jsonl


Create seperate dataset for finetuning, this one already applies masking. For every instance of inference masking is applied in memory

In [1]:
import json
import random
import re
from tqdm import tqdm

# Helper to parse full holes and steps from input lines
def parse_full_hole_entries(lines):
    holes = []
    current = None
    for idx, line in enumerate(lines):
        hole_match = re.match(r"\s*Hole\s+(\d+)\s+\(.*?\)\s+at Position\s+(\([^)]+\))(?!.*Mounting|Through)", line)
        if hole_match:
            if current:
                holes.append(current)
            current = {
                "hole_id": int(hole_match.group(1)),
                "coord": hole_match.group(2),
                "coord_line": idx,
                "step_lines": [],
                "step_data": []
            }
        elif current and "Length" in line:
            step_match = re.search(r"Step\s+(\d+):.*?Length\s+([\d.]+)", line)
            if step_match:
                step_no = int(step_match.group(1))
                length = step_match.group(2)
                current["step_lines"].append(idx)
                current["step_data"].append({"step": step_no, "length": length})
        elif line.strip() == "":
            if current:
                holes.append(current)
                current = None
    if current:
        holes.append(current)
    return holes

# Mask position and length values
def apply_full_hole_masking(lines, holes):
    masked = lines.copy()
    for h in holes:
        masked[h["coord_line"]] = re.sub(r"at Position\s+\([^)]+\)", "at Position <masked>", masked[h["coord_line"]])
        for idx in h["step_lines"]:
            masked[idx] = re.sub(r"Length\s+[\d.]+", "Length <masked>", masked[idx])
    return masked

# Build instruction with hole explicitly mentioned
def build_instruction(hole_id):
    return (
        "Predict the missing coordinate and step lengths for the following hole in a hydraulic manifold design.\n"
        "The masked hole has its 'Position' and all 'Length' values replaced with '<masked>'.\n"
        "You must output predictions in the format:\n"
        f"'Hole {hole_id} at Position (x.xxx, y.yyy, z.zzz)' and 'Hole {hole_id} Step <Step> Length <value>' for each step.\n"
        "Do not include any explanations.\n"
        f"\nHoles masked: {hole_id}\n"
    )

# Load original file
input_path = "train_input_900.jsonl"
output_path = "train_input_900_masked.jsonl"

with open(input_path, "r", encoding="utf-8") as f:
    examples = [json.loads(line) for line in f]

masked_dataset = []

for ex in tqdm(examples, desc="Masking 1 hole per input"):
    lines = ex["input"].splitlines()
    holes = parse_full_hole_entries(lines)
    if not holes:
        continue
    selected = random.choice(holes)
    masked_lines = apply_full_hole_masking(lines, [selected])
    instruction = build_instruction(selected["hole_id"])
    output_lines = [f"Hole {selected['hole_id']} at Position {selected['coord']}"]
    for s in selected["step_data"]:
        output_lines.append(f"Hole {selected['hole_id']} Step {s['step']} Length {s['length']}")
    result = {
        "input": "\n".join(masked_lines),
        "instruction": instruction,
        "output": "\n".join(output_lines)
    }
    masked_dataset.append(result)

# Save new jsonl file
with open(output_path, "w", encoding="utf-8") as f:
    for item in masked_dataset:
        f.write(json.dumps(item, ensure_ascii=False) + "\n")

print(f"✅ Done. Saved {len(masked_dataset)} examples to {output_path}")


Masking 1 hole per input: 100%|██████████| 900/900 [00:00<00:00, 7636.62it/s]

✅ Done. Saved 900 examples to train_input_900_masked.jsonl



