In [1]:
import argparse
import pathlib
import json
import dgl
import numpy as np
import torch
from occwl.graph import face_adjacency
from occwl.compound import Compound
from occwl.uvgrid import ugrid, uvgrid
from tqdm import tqdm
from multiprocessing.pool import Pool
from itertools import repeat
import signal
def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)

def build_graph(solid, curv_num_u_samples, surf_num_u_samples, surf_num_v_samples):
    # Build face adjacency graph with B-rep entities as node and edge features
    graph = face_adjacency(solid)

    # Compute the UV-grids for faces
    graph_face_feat = []
    for face_idx in graph.nodes:
        # Get the B-rep face
        face = graph.nodes[face_idx]["face"]
        # Compute UV-grids
        points = uvgrid(
            face, method="point", num_u=surf_num_u_samples, num_v=surf_num_v_samples
        )
        # print(points)
        normals = uvgrid(
            face, method="normal", num_u=surf_num_u_samples, num_v=surf_num_v_samples
        )
        visibility_status = uvgrid(
            face, method="visibility_status", num_u=surf_num_u_samples, num_v=surf_num_v_samples
        )
        mask = np.logical_or(visibility_status == 0, visibility_status == 2)  # 0: Inside, 1: Outside, 2: On boundary
        # Concatenate channel-wise to form face feature tensor
        face_feat = np.concatenate((points, normals, mask), axis=-1)
        graph_face_feat.append(face_feat)
    graph_face_feat = np.asarray(graph_face_feat)

    # Compute the U-grids for edges
    graph_edge_feat = []
    for edge_idx in graph.edges:
        # Get the B-rep edge
        edge = graph.edges[edge_idx]["edge"]
        # Ignore dgenerate edges, e.g. at apex of cone
        if not edge.has_curve():
            continue
        # Compute U-grids
        points = ugrid(edge, method="point", num_u=curv_num_u_samples)
        tangents = ugrid(edge, method="tangent", num_u=curv_num_u_samples)
        # Concatenate channel-wise to form edge feature tensor
        edge_feat = np.concatenate((points, tangents), axis=-1)
        graph_edge_feat.append(edge_feat)
    graph_edge_feat = np.asarray(graph_edge_feat)

    # Convert face-adj graph to DGL format
    edges = list(graph.edges)
    src = [e[0] for e in edges]
    dst = [e[1] for e in edges]
    dgl_graph = dgl.graph((src, dst), num_nodes=len(graph.nodes))
    dgl_graph.ndata["x"] = torch.from_numpy(graph_face_feat)
    dgl_graph.edata["x"] = torch.from_numpy(graph_edge_feat)
    return dgl_graph


def process_one_file(arguments):
    try:
        fn, args = arguments
        fn_stem = fn.stem
        output_path = pathlib.Path(args.output)
        solid, mapping = Compound.load_step_with_attributes(fn)
        #solid = solid[0]
        if args.convert_labels:
            labels = [int(mapping[face]['name']) for face in solid.faces() if face in mapping]
            with open(str(output_path)+"_labels/"+fn_stem+".json", 'w') as f:
                json.dump(labels, f)
        graph = build_graph(
            solid, args.curv_u_samples, args.surf_u_samples, args.surf_v_samples
        )
        dgl.data.utils.save_graphs(str(output_path.joinpath(fn_stem + ".bin")), [graph]) #_color
    except Exception as e:
        print(e)


def initializer():
    """Ignore CTRL+C in the worker process."""
    signal.signal(signal.SIGINT, signal.SIG_IGN)


def process(args):
    input_path = pathlib.Path(args.input)
    output_path = pathlib.Path(args.output)
    if not output_path.exists():
        output_path.mkdir(parents=True, exist_ok=True)
    step_files = list(input_path.glob("*.st*p"))
    # for fn in tqdm(step_files):
    #     process_one_file(fn, args)
    pool = Pool(processes=50, initializer=initializer)
    try:
        results = list(tqdm(pool.imap(process_one_file, zip(step_files, repeat(args))), total=len(step_files)))
    except KeyboardInterrupt:
        pool.terminate()
        pool.join()
    print(f"Processed {len(results)} files.")

Using backend: pytorch


In [None]:
# path_data_raw = "../../data/machining_features_sprint_1/raw"
# path_data = "../../data/machining_features_sprint_1/graphs_20/"

# path_data_raw = "../../MFCAD/dataset/step"
# path_data = "/home/egor/data/mfcad30/graph/"

path_data_raw = "../../data/janush_dataset/raw"
path_data = "/home/egor/data/janush_dataset/tst/"
nsamples = 10
args = argparse.Namespace(input=path_data_raw, output=path_data, convert_labels=False,
                          curv_u_samples=nsamples, surf_u_samples=nsamples, surf_v_samples=nsamples, num_processes=50)

input_path = pathlib.Path(args.input)
output_path = pathlib.Path(args.output)
if not output_path.exists():
    output_path.mkdir(parents=True, exist_ok=True)
step_files = list(input_path.glob("*.st*p"))
pool = Pool(processes=50, initializer=initializer)
try:
    results = list(tqdm(pool.imap(process_one_file, zip(step_files, repeat(args))), total=len(step_files)))
except KeyboardInterrupt:
    pool.terminate()
    pool.join()

In [3]:

# for split in ["train", "test", "val"]:
split = 'tmp'
path_data_raw = f"../../data/MFCAD++_dataset/tmp_raw"
path_data = f"../../data/MFCAD++_dataset/tmp"
nsamples = 10
args = argparse.Namespace(input=path_data_raw, output=path_data, convert_labels=True,
                          curv_u_samples=nsamples, surf_u_samples=nsamples, surf_v_samples=nsamples, num_processes=50)

input_path = pathlib.Path(args.input)
output_path = pathlib.Path(args.output)
labels_path = pathlib.Path(args.output+"_labels")

if not output_path.exists():
    output_path.mkdir(parents=True, exist_ok=True)
if not labels_path.exists():
    labels_path.mkdir(parents=True, exist_ok=True)

step_files = list(input_path.glob("*.st*p"))
pool = Pool(processes=100, initializer=initializer)
try:
    results = list(tqdm(pool.imap(process_one_file, zip(step_files, repeat(args))), total=len(step_files)))
except KeyboardInterrupt:
    pool.terminate()
    pool.join()

100%|██████████| 1/1 [00:00<00:00,  3.29it/s]


In [6]:
import json
import os

In [19]:
path_write = f"/home/egor/data/MFCAD++_dataset/brepnet/seg/"
for split in ["train", "test", "val"]:
    path = f"/home/egor/data/MFCAD++_dataset/converted_10/{split}_labels/"
    for fnm in os.listdir(path):
        with open(path+fnm, 'r') as f_read:
            data = json.load(f_read)
        with open(path_write +fnm.split('.')[0]+ '.seg', 'w') as f_write:
            for x in data:
                f_write.write(str(x)+ os.linesep)