# AP2.1 – Preprocessing Exploration

Exploration eines CAD-Modells durch die Preprocessing-Pipeline.  
Vorher/Nachher-Vergleich für jeden Step.



In [1]:
import sys
from pathlib import Path

import numpy as np
import open3d as o3d
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# Projekt-Root zum Pfad hinzufügen
project_root = Path().resolve().parent
if str(project_root / 'src') not in sys.path:
    sys.path.insert(0, str(project_root / 'src'))

from schweiss_ki.preprocessing import (
    PreprocessingPipeline,
    StatisticalOutlierFilter,
    RadiusOutlierFilter,
    VoxelGridDownsampler,
    NormalEstimator,
)
from schweiss_ki.core.data_structures import WeldVolumeModel

print('Imports OK')

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.
Imports OK


## 1 – Konfiguration

In [2]:
STEP_FILE = Path('../data/raw/step_files/DVS-Demo.STEP')  
CONFIG    = Path('../configs/pipeline.yaml')

# Maximale Punktanzahl für Visualisierung (Plotly wird bei >100k Punkten langsam)
VIZ_MAX_POINTS = 50_000

print(f'STEP: {STEP_FILE}')
print(f'Existiert: {STEP_FILE.exists()}')

STEP: ../data/raw/step_files/DVS-Demo.STEP
Existiert: True


## 2 – CAD-Konvertierung

In [3]:
from client.core import CADConverterClient
import time

client = CADConverterClient()

tmp_ply = Path('../data/processed/notebook_exploration') / STEP_FILE.stem / 'pointcloud_raw.ply'
tmp_ply.parent.mkdir(parents=True, exist_ok=True)

print('Konvertiere STEP → PLY ...')
t = time.time()
ply_path = client.convert_to_ply(str(STEP_FILE), str(tmp_ply))
print(f'Fertig in {time.time() - t:.2f}s')

pcd_raw = o3d.io.read_point_cloud(str(ply_path))
print(f'Punkte:        {len(pcd_raw.points):,}')
print(f'Hat Normalen:  {pcd_raw.has_normals()}')

bbox = pcd_raw.get_axis_aligned_bounding_box()
ext  = bbox.get_extent()
print(f'Bounding Box:  {ext[0]:.1f} × {ext[1]:.1f} × {ext[2]:.1f} mm')

Konvertiere STEP → PLY ...
Fertig in 0.26s
Punkte:        8,192
Hat Normalen:  True
Bounding Box:  320.0 × 70.0 × 120.0 mm


## 3 – Hilfsfunktionen für Visualisierung

In [4]:
def pcd_to_arrays(pcd: o3d.geometry.PointCloud, max_points: int = VIZ_MAX_POINTS):
    """Konvertiert Open3D PointCloud zu numpy-Arrays, downsampled wenn nötig."""
    pts = np.asarray(pcd.points)
    if len(pts) > max_points:
        idx = np.random.default_rng(42).choice(len(pts), max_points, replace=False)
        pts = pts[idx]
    return pts[:, 0], pts[:, 1], pts[:, 2]


def make_scatter(pcd, name, color='steelblue', max_points=VIZ_MAX_POINTS):
    """Erstellt einen Plotly Scatter3d Trace."""
    x, y, z = pcd_to_arrays(pcd, max_points)
    return go.Scatter3d(
        x=x, y=y, z=z,
        mode='markers',
        name=f'{name} ({len(pcd.points):,} Punkte)',
        marker=dict(size=1.5, color=color, opacity=0.7),
    )


def plot_before_after(pcd_before, pcd_after, title, color_before='steelblue', color_after='tomato'):
    """Plottet Vorher/Nachher nebeneinander."""
    fig = make_subplots(
        rows=1, cols=2,
        specs=[[{'type': 'scatter3d'}, {'type': 'scatter3d'}]],
        subplot_titles=[
            f'Vorher: {len(pcd_before.points):,} Punkte',
            f'Nachher: {len(pcd_after.points):,} Punkte  '
            f'(−{len(pcd_before.points) - len(pcd_after.points):,} = '
            f'{1 - len(pcd_after.points)/len(pcd_before.points):.1%} entfernt)',
        ],
    )
    fig.add_trace(make_scatter(pcd_before, 'Vorher', color_before), row=1, col=1)
    fig.add_trace(make_scatter(pcd_after,  'Nachher', color_after),  row=1, col=2)
    fig.update_layout(
        title_text=title,
        height=550,
        margin=dict(l=0, r=0, t=60, b=0),
    )
    fig.show()


def plot_single(pcd, title, color='steelblue'):
    """Plottet eine einzelne Punktwolke."""
    fig = go.Figure(make_scatter(pcd, title, color))
    fig.update_layout(
        title_text=f'{title} – {len(pcd.points):,} Punkte',
        height=600,
        margin=dict(l=0, r=0, t=40, b=0),
        scene=dict(aspectmode='data'),
    )
    fig.show()


print('Hilfsfunktionen geladen.')

Hilfsfunktionen geladen.


## 4 – Rohe CAD-Punktwolke anschauen

In [5]:
plot_single(pcd_raw, f'Roh: {STEP_FILE.stem}', color='steelblue')

## 5 – Step-für-Step Vorher/Nachher

### 5.1 – Statistical Outlier Filter

In [6]:
import schweiss_ki.preprocessing.filtering as f_module
print(f_module.__file__)
import inspect
print(inspect.getsource(f_module))

/home/coder/workspace/schweissKI/src/schweiss_ki/preprocessing/filtering.py
"""
Filter-Steps für die Preprocessing-Pipeline.

Enthält alle Steps, die Punkte entfernen:
- StatisticalOutlierFilter: Entfernt statistisch abweichende Ausreißer
- RadiusOutlierFilter: Entfernt isolierte Punkte (Spritzer-Kandidaten)
"""
from __future__ import annotations

import open3d as o3d

from .base import PreprocessingStep


class StatisticalOutlierFilter(PreprocessingStep):
    """
    Statistischer Ausreißerfilter.

    Berechnet für jeden Punkt den mittleren Abstand zu seinen nb_neighbors
    nächsten Nachbarn. Punkte, deren mittlerer Abstand mehr als
    std_ratio Standardabweichungen vom globalen Mittel abweicht,
    werden entfernt.

    Empfohlene Startwerte:
        nb_neighbors=20, std_ratio=2.0

    Für stark verrauschte Scans std_ratio reduzieren (z.B. 1.5).
    Für CAD-Punktwolken kann dieser Step deaktiviert werden.
    """

    def __init__(
        self,
        nb_neighbors: int = 20,
   

In [7]:
# Parameter hier anpassen und Zelle nochmal ausführen
NB_NEIGHBORS = 20
STD_RATIO    = 2.0

f = StatisticalOutlierFilter(nb_neighbors=NB_NEIGHBORS, std_ratio=STD_RATIO)
pcd_stat = f.apply(pcd_raw)
stats = f.get_stats()

print(f'Vorher:   {stats["points_before"]:,} Punkte')
print(f'Nachher:  {stats["points_after"]:,} Punkte')
print(f'Entfernt: {stats["points_before"] - stats["points_after"]:,} '
      f'({1 - stats["points_after"]/stats["points_before"]:.2%})')

plot_before_after(pcd_raw, pcd_stat,
    f'Statistical Outlier Filter  (nb_neighbors={NB_NEIGHBORS}, std_ratio={STD_RATIO})')

AttributeError: 'StatisticalOutlierFilter' object has no attribute 'get_stats'

### 5.2 – Voxel Grid Downsampling

In [None]:
# Parameter anpassen
VOXEL_SIZE = 0.5  # mm

d = VoxelGridDownsampler(voxel_size=VOXEL_SIZE)
pcd_voxel = d.apply(pcd_stat)  # auf gefilterter Wolke aufbauen
stats = d.get_stats()

print(f'Vorher:   {stats["points_before"]:,} Punkte')
print(f'Nachher:  {stats["points_after"]:,} Punkte')
print(f'Entfernt: {stats["points_before"] - stats["points_after"]:,} '
      f'({1 - stats["points_after"]/stats["points_before"]:.2%})')

plot_before_after(pcd_stat, pcd_voxel,
    f'Voxel Grid Downsampling  (voxel_size={VOXEL_SIZE}mm)',
    color_before='steelblue', color_after='mediumseagreen')

### 5.3 – Normalenschätzung

In [None]:
NORMAL_RADIUS = 2.0  # mm
NORMAL_MAX_NN = 30

n = NormalEstimator(radius=NORMAL_RADIUS, max_nn=NORMAL_MAX_NN, orientation='consistent')
pcd_normals = n.apply(pcd_voxel)
stats = n.get_stats()

print(f'Punkte:        {stats["points_after"]:,}  (unverändert – keine Punkte entfernt)')
print(f'Hat Normalen:  {stats["has_normals"]}')

# Normalen als Farbkodierung visualisieren
normals = np.asarray(pcd_normals.normals)
pts     = np.asarray(pcd_normals.points)

# Normalen auf [0,1] skalieren für RGB-Farbe
colors_norm = (normals + 1.0) / 2.0
color_hex = [
    f'rgb({int(r*255)},{int(g*255)},{int(b*255)})'
    for r, g, b in colors_norm[:VIZ_MAX_POINTS]
]

# Subsample für Visualisierung
idx = np.random.default_rng(42).choice(len(pts), min(VIZ_MAX_POINTS, len(pts)), replace=False)

fig = go.Figure(go.Scatter3d(
    x=pts[idx, 0], y=pts[idx, 1], z=pts[idx, 2],
    mode='markers',
    marker=dict(size=2, color=[color_hex[i] for i in idx], opacity=0.8),
    name='Normalen (RGB = XYZ-Richtung)',
))
fig.update_layout(
    title_text=f'Normalenschätzung – {len(pcd_normals.points):,} Punkte  (Farbe = Normalenrichtung)',
    height=600,
    scene=dict(aspectmode='data'),
)
fig.show()

## 6 – Komplette Pipeline (aus pipeline.yaml)

In [None]:
pipeline = PreprocessingPipeline.from_config(CONFIG, source_type='ideal')
print(f'Pipeline: {pipeline}')

pcd_final, report = pipeline.process(pcd_raw)

print(f'\nReport:')
print(f'  Punkte rein:  {report.points_in:,}')
print(f'  Punkte raus:  {report.points_out:,}')
print(f'  Retention:    {report.total_retention_rate:.1%}')
print(f'  Gesamt-Zeit:  {report.total_duration_ms:.0f}ms')
print()
for s in report.steps:
    print(f'  {s.step_name:<35} {s.points_before:>8,} → {s.points_after:>8,}  '
          f'(−{s.points_removed:,}, {s.duration_ms:.1f}ms)')

In [None]:
plot_before_after(pcd_raw, pcd_final,
    'Komplette Pipeline: Roh vs. Preprocessed',
    color_before='steelblue', color_after='tomato')

## 7 – Report als Balkendiagramm

In [None]:
import plotly.express as px
import pandas as pd

rows = []
for s in report.steps:
    rows.append({
        'Step': s.step_name.replace('_', ' '),
        'Punkte danach': s.points_after,
        'Entfernt': s.points_removed,
        'Zeit (ms)': round(s.duration_ms, 1),
        'Retention': f'{s.retention_rate:.1%}',
    })

# Roh-Eintrag vorne
rows.insert(0, {
    'Step': 'roh (input)',
    'Punkte danach': report.points_in,
    'Entfernt': 0,
    'Zeit (ms)': 0,
    'Retention': '100%',
})

df = pd.DataFrame(rows)
display(df)

fig = px.bar(
    df, x='Step', y='Punkte danach',
    title='Punktanzahl nach jedem Preprocessing-Step',
    color='Punkte danach', color_continuous_scale='Blues',
    text='Retention',
)
fig.update_traces(textposition='outside')
fig.update_layout(height=400, showlegend=False)
fig.show()

## 8 – Als WeldVolumeModel speichern

In [None]:
model = WeldVolumeModel(
    model_id=STEP_FILE.stem,
    source_type='ideal',
    source_file=STEP_FILE,
    point_cloud=pcd_final,
    preprocessing_report=report,
)

save_path = model.save(Path('../data/processed/notebook_exploration'))
print(f'Gespeichert: {save_path}')
print(f'Modell: {model}')