In [14]:

#!pip install ezdxf shapely numpy


In [15]:
import os
import math
from dataclasses import dataclass
from typing import List, Tuple, Optional, Dict, Iterable, Set

import ezdxf
from ezdxf.entities import Line, LWPolyline
from shapely.geometry import LineString, Point
from shapely.strtree import STRtree
from shapely.ops import split as shapely_split

import numpy as np

In [None]:
"""
Improved breakline trim by plan-view intersections.

Key improvements:
 - Robust deduplication: considers interval overlap + dedup tolerance.
 - Handles overlapping/collinear intersections by creating cut intervals from the overlap extents.
 - STRtree queries use a small buffer around the geometry (gap + dedup_tol) to reduce candidates.
 - Preserves per-entity metadata and layers.
 - Better logging of skipped intersections (endpoints, degenerate).
"""
from dataclasses import dataclass
from typing import List, Tuple, Optional, Dict, Iterable, Set
import math
import numpy as np
import ezdxf
from shapely.geometry import LineString, Point, MultiPoint, MultiLineString
from shapely.strtree import STRtree

# ---------------- Configuration ----------------
GAP = 0.20  # half-gap on each side of intersection (meters). Total removed = 2 x GAP m.
PLAN_TOL = 1e-9  # tolerance for plan-view operations (in drawing units)
DEDUP_TOL = 1e-3  # min separation between successive cuts on the same line (plan distance)
LAYER_FILTER = None  # e.g., {"BREAKLINES", "DRAINAGE"} or None to process all

# ---------------- Utilities ----------------


def dist2d(a: Tuple[float, float], b: Tuple[float, float]) -> float:
    return math.hypot(a[0] - b[0], a[1] - b[1])


def lerp(a: float, b: float, t: float) -> float:
    return a + t * (b - a)


def lerp3(p0: np.ndarray, p1: np.ndarray, t: float) -> np.ndarray:
    return p0 + t * (p1 - p0)


@dataclass
class Poly3D:
    """Holds a 3D polyline and its plan-view projection with per-segment cumulative measures."""
    coords3d: np.ndarray  # shape (N,3)
    coords2d: np.ndarray  # shape (N,2)
    cum2d: np.ndarray     # cumulative 2D length per vertex, shape (N,)

    @property
    def length2d(self) -> float:
        return float(self.cum2d[-1])

    def point_at_plan_distance(self, d: float) -> Tuple[np.ndarray, np.ndarray]:
        """
        Given a plan-view distance d along the projected polyline,
        return (pt2d, pt3d) interpolated at that measure.
        """
        d = max(0.0, min(d, self.length2d))
        # find segment containing d
        idx = np.searchsorted(self.cum2d, d, side='right') - 1
        idx = int(max(0, min(idx, len(self.cum2d) - 2)))
        seg_start = self.cum2d[idx]
        seg_len = self.cum2d[idx + 1] - seg_start
        t = 0.0 if seg_len <= PLAN_TOL else (d - seg_start) / seg_len

        p2d = lerp3(self.coords2d[idx], self.coords2d[idx + 1], t)
        p3d = lerp3(self.coords3d[idx], self.coords3d[idx + 1], t)
        return p2d, p3d

    def splits_at_plan_distances(self, cuts: List[Tuple[float, float]]) -> List[np.ndarray]:
        """
        Split the 3D polyline into segments that remain after removing
        each interval [d1, d2] in plan-view distances. Returns a list of 3D segments,
        each segment is an array of 3D points (M,3).
        """
        if not cuts:
            return [self.coords3d.copy()]

        # Normalize and merge overlapping cut intervals (clamp to poly length)
        intervals = []
        for d1, d2 in cuts:
            a, b = sorted((max(0.0, d1), min(self.length2d, d2)))
            if b - a > PLAN_TOL:
                intervals.append((a, b))
        if not intervals:
            return [self.coords3d.copy()]

        intervals.sort(key=lambda x: x[0])
        merged = []
        for a, b in intervals:
            if not merged or a > merged[-1][1] + PLAN_TOL:
                merged.append([a, b])
            else:
                merged[-1][1] = max(merged[-1][1], b)
        intervals = [(float(a), float(b)) for a, b in merged]

        # Build keep intervals between cuts
        keeps = []
        cursor = 0.0
        for a, b in intervals:
            if a > cursor + PLAN_TOL:
                keeps.append((cursor, a))
            cursor = max(cursor, b)
        if cursor < self.length2d - PLAN_TOL:
            keeps.append((cursor, self.length2d))

        # Convert each keep interval to a 3D segment polyline
        kept_segments = []
        for k1, k2 in keeps:
            if k2 - k1 <= PLAN_TOL:
                continue
            # Start endpoint
            _, p3_start = self.point_at_plan_distance(k1)
            # End endpoint
            _, p3_end = self.point_at_plan_distance(k2)

            # Gather interior vertices whose cum2d lies strictly within (k1, k2)
            mask = (self.cum2d > k1 + PLAN_TOL) & (self.cum2d < k2 - PLAN_TOL)
            interior_idx = np.where(mask)[0]
            coords3 = [p3_start]
            if interior_idx.size:
                coords3.extend(list(self.coords3d[interior_idx]))
            coords3.append(p3_end)
            kept_segments.append(np.vstack(coords3))
        return kept_segments


def build_poly3d_from_entity(e) -> Optional[Poly3D]:
    """
    Create a Poly3D from LINE or LWPOLYLINE entity.
    Returns None if invalid or degenerate.
    """
    if e.dxftype() == 'LINE' or isinstance(e, ezdxf.entities.line.Line):
        # Generic handling using DXF attributes
        try:
            p0 = np.array([e.dxf.start.x, e.dxf.start.y, getattr(e.dxf.start, "z", 0.0) or 0.0], dtype=float)
            p1 = np.array([e.dxf.end.x, e.dxf.end.y, getattr(e.dxf.end, "z", 0.0) or 0.0], dtype=float)
            coords3d = np.vstack([p0, p1])
        except Exception:
            # Fallback: try attribute names
            try:
                p0 = np.array([e.dxf.x1, e.dxf.y1, getattr(e.dxf, "z1", 0.0) or 0.0], dtype=float)
                p1 = np.array([e.dxf.x2, e.dxf.y2, getattr(e.dxf, "z2", 0.0) or 0.0], dtype=float)
                coords3d = np.vstack([p0, p1])
            except Exception:
                return None

    elif e.dxftype() == 'LWPOLYLINE' or isinstance(e, ezdxf.entities.lwpolyline.LWPolyline):
        pts = []
        for v in e.get_points():
            # get_points returns tuples: (x, y, [start_width], [end_width], [bulge], [vx], [vy], [vz])
            x, y = v[0], v[1]
            z = 0.0
            if hasattr(e.dxf, "elevation") and e.dxf.elevation is not None:
                z = float(e.dxf.elevation)
            # If vertex has extended z at position 7
            if len(v) >= 8 and v[7] is not None:
                try:
                    z = float(v[7])
                except Exception:
                    pass
            pts.append([x, y, z])
        coords3d = np.array(pts, dtype=float)
    else:
        return None

    # Remove duplicate consecutive points (plan-view)
    if coords3d.shape[0] < 2:
        return None
    diff = np.linalg.norm(np.diff(coords3d[:, :2], axis=0), axis=1)
    keep_mask = np.hstack([[True], diff > PLAN_TOL])
    coords3d = coords3d[keep_mask]
    if coords3d.shape[0] < 2:
        return None

    coords2d = coords3d[:, :2].copy()
    seglens = np.linalg.norm(np.diff(coords2d, axis=0), axis=1)
    cum2d = np.hstack([[0.0], np.cumsum(seglens)])
    if cum2d[-1] <= PLAN_TOL:
        return None

    return Poly3D(coords3d=coords3d, coords2d=coords2d, cum2d=cum2d)


# ---------------- Main processing ----------------

def collect_entities(msp) -> List:
    q = "LINE LWPOLYLINE"  # space-separated, not comma
    ents = []
    for e in msp.query(q):
        if LAYER_FILTER and e.dxf.layer not in LAYER_FILTER:
            continue
        ents.append(e)
    return ents


def build_shapely_lines(poly: Poly3D) -> LineString:
    return LineString(poly.coords2d.tolist())


def trim_breaklines_planview(in_dxf: str, out_dxf: str,
                             gap: float = GAP,
                             dedup_tol: float = DEDUP_TOL) -> None:
    # Read input DXF
    doc = ezdxf.readfile(in_dxf)
    msp = doc.modelspace()

    # Collect entities and build Poly3D objects
    raw_ents = collect_entities(msp)
    polys: List[Poly3D] = []
    meta: List[Dict] = []

    for e in raw_ents:
        p = build_poly3d_from_entity(e)
        if p is None:
            print(f"Skipping invalid/degenerate entity handle={getattr(e.dxf, 'handle', None)} layer={getattr(e.dxf, 'layer', None)}")
            continue
        polys.append(p)
        meta.append({
            "layer": e.dxf.layer,
            "color": getattr(e.dxf, "color", None),
            "linetype": getattr(e.dxf, "linetype", None),
            "handle": e.dxf.handle,
            "type": e.dxftype(),
        })

    if not polys:
        print("No valid LINE/LWPOLYLINE entities found.")
        return

    # Build spatial index of plan-view lines
    plan_lines = [build_shapely_lines(p) for p in polys]
    tree = STRtree(plan_lines)

    # For each polyline, collect cut intervals [d-gap, d+gap] in plan measure
    cuts_per_idx: Dict[int, List[Tuple[float, float]]] = {i: [] for i in range(len(polys))}
    processed_pairs: Set[Tuple[int, int]] = set()

    # Map geometry id -> index
    geom_to_idx = {id(geom): i for i, geom in enumerate(plan_lines)}

    # Helper: robust dedup function (checks for overlap with any existing interval)
    def too_close(existing: List[Tuple[float, float]], d_center: float, half_gap: float) -> bool:
        # Consider intervals [center-half_gap, center+half_gap] and existing intervals [a,b]
        a_new = d_center - half_gap
        b_new = d_center + half_gap
        for a, b in existing:
            if (a_new <= b + dedup_tol) and (b_new >= a - dedup_tol):
                return True
        return False

    # Iterate and detect intersections (including overlaps)
    for i, ls in enumerate(plan_lines):
        # query with buffer to limit candidates (accounts for gap + dedup_tol)
        buffer_amount = max(gap + dedup_tol, 0.0)
        candidates: Iterable[LineString] = tree.query(ls.buffer(buffer_amount))
        for cand in candidates:
            j = geom_to_idx.get(id(cand), None)
            if j is None or j <= i:
                continue
            if (i, j) in processed_pairs:
                continue

            inter = ls.intersection(cand)
            if inter.is_empty:
                processed_pairs.add((i, j))
                continue

            # Handle point intersections (single or multiple)
            if inter.geom_type == "Point":
                points = [inter]
            elif inter.geom_type == "MultiPoint":
                points = list(inter.geoms)
            elif inter.geom_type in ("LineString", "MultiLineString"):
                # Overlapping / collinear region(s) â€” generate representative point(s) or intervals
                overlap_segments = []
                if inter.geom_type == "LineString":
                    overlap_segments = [inter]
                else:
                    overlap_segments = list(inter.geoms)

                # For each overlap segment, compute its projected start/end along both lines and create cut intervals
                for seg in overlap_segments:
                    if seg.length <= PLAN_TOL:
                        continue
                    # compute multiple sample points: use segment endpoints as cut region extents
                    start_pt = Point(seg.coords[0])
                    end_pt = Point(seg.coords[-1])
                    di_start = ls.project(start_pt)
                    di_end = ls.project(end_pt)
                    dj_start = cand.project(start_pt)
                    dj_end = cand.project(end_pt)

                    # create cut intervals centered on the overlap extents (extend by gap)
                    a_i, b_i = sorted((di_start, di_end))
                    a_j, b_j = sorted((dj_start, dj_end))

                    # enforce endpoints margin: do not cut if overlap reaches endpoints within gap
                    def valid_interval(length, a, b):
                        if a < gap + PLAN_TOL or (length - b) < gap + PLAN_TOL:
                            return False
                        if b - a <= PLAN_TOL:
                            return False
                        return True

                    if valid_interval(polys[i].length2d, a_i, b_i):
                        # expand by gap at each distinct intersection boundary
                        cuts_per_idx[i].append((a_i - gap, b_i + gap))
                    else:
                        print(f"Skipping overlap cut on poly {i} due to proximity to endpoints or degenerate length.")
                    if valid_interval(polys[j].length2d, a_j, b_j):
                        cuts_per_idx[j].append((a_j - gap, b_j + gap))
                    else:
                        print(f"Skipping overlap cut on poly {j} due to proximity to endpoints or degenerate length.")

                processed_pairs.add((i, j))
                continue
            else:
                # Unsupported geometry type (e.g., GeometryCollection)
                print(f"Unsupported intersection type '{inter.geom_type}' between {i} and {j}; skipping.")
                processed_pairs.add((i, j))
                continue

            # For point intersections, create +/- gap cuts unless too close to endpoints
            for pt in points:
                if not isinstance(pt, Point):
                    continue
                di = ls.project(pt)
                dj = cand.project(pt)

                # Skip if too close to endpoints
                if di < gap + PLAN_TOL or (polys[i].length2d - di) < gap + PLAN_TOL:
                    # too close to endpoints on poly i
                    # keep scanning other points but skip adding cut on i
                    print(f"Intersection on poly {i} too close to endpoint (d={di:.6f}); skipping that cut.")
                    skip_i = True
                else:
                    skip_i = False
                if dj < gap + PLAN_TOL or (polys[j].length2d - dj) < gap + PLAN_TOL:
                    print(f"Intersection on poly {j} too close to endpoint (d={dj:.6f}); skipping that cut.")
                    skip_j = True
                else:
                    skip_j = False

                if skip_i and skip_j:
                    # both too close to endpoints: nothing to do
                    continue

                # dedup check: use interval overlap test
                if not skip_i:
                    if not too_close(cuts_per_idx[i], di, gap):
                        cuts_per_idx[i].append((di - gap, di + gap))
                if not skip_j:
                    if not too_close(cuts_per_idx[j], dj, gap):
                        cuts_per_idx[j].append((dj - gap, dj + gap))

            processed_pairs.add((i, j))

    # Optional: merge and clamp intervals per polyline (cleaning stage)
    def merge_and_clamp(intervals: List[Tuple[float, float]], length: float) -> List[Tuple[float, float]]:
        if not intervals:
            return []
        iv = [(max(0.0, a), min(length, b)) for a, b in intervals if b > 0.0 and a < length]
        iv.sort(key=lambda x: x[0])
        merged = []
        for a, b in iv:
            if not merged or a > merged[-1][1] + PLAN_TOL:
                merged.append([a, b])
            else:
                merged[-1][1] = max(merged[-1][1], b)
        return [(float(x), float(y)) for x, y in merged]

    for k in cuts_per_idx:
        cuts_per_idx[k] = merge_and_clamp(cuts_per_idx[k], polys[k].length2d)

    # Build output DXF
    out = ezdxf.new(doc.version)
    out_msp = out.modelspace()

    # Write trimmed or original segments
    for k, poly in enumerate(polys):
        cuts = cuts_per_idx[k]
        layer_name = meta[k]["layer"]
        color = meta[k]["color"]
        ltype = meta[k]["linetype"]

        if not cuts:
            coords = poly.coords3d
            if coords.shape[0] == 2:
                out_msp.add_line(tuple(coords[0]), tuple(coords[1]),
                                 dxfattribs={"layer": layer_name, "color": color, "linetype": ltype})
            else:
                out_msp.add_lwpolyline([(float(x), float(y), float(z)) for x, y, z in coords],
                                       dxfattribs={"layer": layer_name, "color": color, "linetype": ltype})
            continue

        # Apply splits and write each kept segment as LWPOLYLINE (3D)
        kept_segments = poly.splits_at_plan_distances(cuts)
        if not kept_segments:
            # Entire polyline removed by cuts
            print(f"Poly {k} on layer {layer_name} entirely removed by cuts.")
            continue
        for seg in kept_segments:
            if seg.shape[0] >= 2:
                out_msp.add_lwpolyline([(float(x), float(y), float(z)) for x, y, z in seg],
                                       dxfattribs={"layer": layer_name, "color": color, "linetype": ltype})

    out.saveas(out_dxf)
    print(f"Saved trimmed DXF: {out_dxf}")


# Example usage:
# trim_breaklines_planview("input.dxf", "trimmed_output.dxf", gap=0.2, dedup_tol=0.001)


In [17]:
# Set folder and LandXML file name.
INPUT_DIR = r'P:\2025\ARROYO DE LOS PINOS\RS\02_PRODUCTION\06_CYCLONE_3DR\Breakline'
input_dxf =  f'{INPUT_DIR}/sample.dxf'

# Replace .xml with .csv
output_dxf = os.path.splitext(input_dxf)[0] + '_cleanup.dxf'

trim_breaklines_planview(input_dxf, output_dxf, gap=GAP, dedup_tol=DEDUP_TOL)


No valid LINE/LWPOLYLINE entities found.
