Enhance workspace analyzer computational efficiency#230
Conversation
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… Corput Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…peedup Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…peedup Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…izations Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR focuses on speeding up workspace analysis at large sample counts by batching FK/IK calls and vectorizing several metrics and sampling components.
Changes:
- Updated
WorkspaceAnalyzerto use batched FK (compute_batch_fk) and batched IK (compute_batch_ik), including a helper for IK reference pose selection. - Vectorized/optimized metrics (density via KDTree, reachability voxelization via
np.unique, manipulability via batcheddet/svd) and Halton sampling. - Added a benchmark script to measure performance of individual optimizations.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| scripts/benchmark/workspace_analyzer/benchmark_workspace_analyzer.py | Adds a standalone benchmarking entrypoint for sampler/metric optimizations and placeholders for FK/IK benchmarks. |
| embodichain/lab/sim/utility/workspace_analyzer/workspace_analyzer.py | Switches FK/IK computation to batched robot APIs; adds _get_reference_pose and updates constraint checking usage. |
| embodichain/lab/sim/utility/workspace_analyzer/samplers/halton_sampler.py | Replaces loop-based Halton generation with a vectorized van der Corput implementation. |
| embodichain/lab/sim/utility/workspace_analyzer/metrics/reachability_metric.py | Replaces dict-based voxel counting with np.unique(..., return_counts=True)-based counting. |
| embodichain/lab/sim/utility/workspace_analyzer/metrics/manipulability_metric.py | Uses batched linear algebra for manipulability/condition numbers; handles empty valid-score cases. |
| embodichain/lab/sim/utility/workspace_analyzer/metrics/density_metric.py | Uses scipy.spatial.cKDTree to avoid O(N²) radius-density computation, with brute-force fallback. |
| embodichain/lab/sim/utility/workspace_analyzer/constraints/workspace_constraint.py | Adds check_constraints() convenience method combining bounds + collision checks. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -1003,208 +1019,127 @@ def compute_reachability( | |||
| """ | |||
| num_samples = len(cartesian_points) | |||
There was a problem hiding this comment.
If cartesian_points is empty, batch_size = min(batch_size, num_samples) becomes 0, which will later make range(0, num_samples, batch_size) raise (step cannot be 0). Add an early return for num_samples == 0 before computing/clamping batch_size, or clamp batch_size to at least 1.
| num_samples = len(cartesian_points) | |
| num_samples = len(cartesian_points) | |
| if num_samples == 0: | |
| return ( | |
| cartesian_points, | |
| cartesian_points[:0], | |
| cartesian_points.new_empty((0,)), | |
| torch.zeros((0,), dtype=torch.bool, device=cartesian_points.device), | |
| cartesian_points.new_empty((0, 0)), | |
| ) |
| logger.log_info( | ||
| f"Pre-filtered Cartesian points: {valid_cartesian_mask.sum()}/{num_samples} " | ||
| f"points pass workspace constraints ({(valid_cartesian_mask.sum()/num_samples*100):.1f}%)" |
There was a problem hiding this comment.
This log line divides by num_samples unconditionally: (valid_cartesian_mask.sum()/num_samples*100). If cartesian_points is empty (num_samples == 0), this will raise a ZeroDivisionError before the early-return fix above. Please guard the percentage calculation when num_samples == 0.
| logger.log_info( | |
| f"Pre-filtered Cartesian points: {valid_cartesian_mask.sum()}/{num_samples} " | |
| f"points pass workspace constraints ({(valid_cartesian_mask.sum()/num_samples*100):.1f}%)" | |
| valid_percentage = ( | |
| valid_cartesian_mask.sum() / num_samples * 100 if num_samples > 0 else 0.0 | |
| ) | |
| logger.log_info( | |
| f"Pre-filtered Cartesian points: {valid_cartesian_mask.sum()}/{num_samples} " | |
| f"points pass workspace constraints ({valid_percentage:.1f}%)" |
| # Update best results: prefer first success, then accumulate | ||
| newly_succeeded = success & ~batch_best_success.bool() | ||
| batch_best_success[newly_succeeded] = 1.0 | ||
| batch_best_qpos[newly_succeeded] = qpos[newly_succeeded] | ||
|
|
||
| # Count successes | ||
| if ret is not None and ret[0]: | ||
| success_count += 1 | ||
| # Store first successful configuration | ||
| if best_qpos is None: | ||
| best_qpos = qpos[0] # Extract from batch dimension | ||
| # Accumulate success rate | ||
| batch_best_success += success.float() |
There was a problem hiding this comment.
batch_best_success is used as a success counter, but newly-succeeded entries are set to 1.0 and then immediately incremented again via batch_best_success += success.float(), which double-counts the first success for each point (inflating success_rates_batch). Consider tracking (1) a boolean has_best for storing first successful qpos and (2) a separate success_counts tensor for counting successes, or just remove the = 1.0 assignment and only accumulate via += success.float().
| # Count neighbors within radius for all points at once | ||
| counts = tree.query_ball_point(points, r=radius, return_length=True) |
There was a problem hiding this comment.
cKDTree.query_ball_point(..., return_length=True) is not supported in some SciPy versions (it raises TypeError: unexpected keyword argument 'return_length'). Since SciPy isn’t version-pinned in this repo, consider catching TypeError here (in addition to ImportError) and falling back to the brute-force path or to np.fromiter((len(x) for x in tree.query_ball_point(...)), ...) for broader compatibility.
| # Count neighbors within radius for all points at once | |
| counts = tree.query_ball_point(points, r=radius, return_length=True) | |
| # Count neighbors within radius for all points at once. | |
| # Older SciPy versions do not support `return_length=True`, | |
| # so fall back to counting the returned neighbor lists. | |
| try: | |
| counts = tree.query_ball_point(points, r=radius, return_length=True) | |
| except TypeError: | |
| neighbors = tree.query_ball_point(points, r=radius) | |
| counts = np.fromiter( | |
| (len(neighbor_list) for neighbor_list in neighbors), | |
| dtype=np.int64, | |
| count=len(points), | |
| ) |
| num_samples = len(joint_configs) | ||
| batch_size = batch_size or self.config.sampling.batch_size | ||
| # Cap batch size to total samples | ||
| batch_size = min(batch_size, num_samples) | ||
|
|
||
| logger.log_info( | ||
| f"Computing FK for {num_samples} samples (batch_size={batch_size})..." | ||
| ) | ||
|
|
||
| # Pre-allocate lists for results | ||
| workspace_points_list = [] | ||
| valid_configs_list = [] | ||
|
|
||
| logger.log_info(f"Computing FK for {num_samples} samples...") | ||
|
|
||
| # Track valid points for progress bar | ||
| total_valid = 0 | ||
|
|
||
| # Robot expects one configuration at a time (batch_size from robot environments, not samples) | ||
| # Process each configuration individually | ||
| pbar = self._create_optimized_tqdm( | ||
| range(num_samples), | ||
| desc="Forward Kinematics", | ||
| unit="cfg", | ||
| range(0, num_samples, batch_size), | ||
| desc="Forward Kinematics (batched)", |
There was a problem hiding this comment.
When joint_configs is empty, batch_size = min(batch_size, num_samples) becomes 0 and then range(0, num_samples, batch_size) raises (step cannot be 0). Consider an early return for num_samples == 0 (empty workspace_points / valid_configs) before computing batch_size/creating the tqdm range, or ensure batch_size >= 1.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -99,7 +99,8 @@ def _sample_from_bounds( | |||
| # Calculate samples per dimension if not provided | |||
| if self.samples_per_dim is None: | |||
| # Compute samples_per_dim to approximate the desired num_samples | |||
| samples_per_dim = max(2, int(np.ceil(num_samples ** (1.0 / n_dims)))) | |||
| # Use floor to ensure actual grid size never exceeds num_samples | |||
| samples_per_dim = max(2, int(num_samples ** (1.0 / n_dims))) | |||
| else: | |||
There was a problem hiding this comment.
The docstring now states the grid size "will not exceed" num_samples, but with samples_per_dim = max(2, int(num_samples ** (1.0 / n_dims))) the actual grid can still exceed num_samples for small requests (e.g., num_samples=3, n_dims=2 -> samples_per_dim=2 -> 4 samples). Either adjust the wording to match the behavior, or change the logic so samples_per_dim**n_dims <= num_samples while still handling small num_samples cleanly.
| valid_mask = manipulability_scores >= self.config.jacobian_threshold | ||
| valid_scores = manipulability_scores[valid_mask] | ||
|
|
||
| if len(valid_scores) == 0: | ||
| valid_scores = np.array([0.0]) | ||
|
|
||
| self.results = { |
There was a problem hiding this comment.
When no Jacobians pass the threshold, setting valid_scores = np.array([0.0]) makes num_valid_points report 1 and forces min/max to 0, which is misleading (it implies one valid sample). Prefer keeping num_valid_points = 0 and explicitly setting mean/std/min/max to 0.0 (or NaN) for the empty-valid set.
| config = SimulationManagerCfg( | ||
| headless=False, sim_device="cpu", width=1080, height=1080 | ||
| headless=False, sim_device="cuda", width=1080, height=1080 | ||
| ) |
There was a problem hiding this comment.
This example now hard-codes sim_device="cuda", which will fail on CPU-only machines and in CI environments without a GPU. Consider keeping the default as cpu (or adding a simple runtime check / CLI flag) so the example remains runnable everywhere.
|
|
||
| wa_joint = WorkspaceAnalyzer(robot=robot, sim_manager=sim_manager) | ||
| results_joint = wa_joint.analyze(num_samples=3000, visualize=True) | ||
| results_joint = wa_joint.analyze(num_samples=30000, visualize=True) |
There was a problem hiding this comment.
Raising the default num_samples from 3,000 to 30,000 can make this example take significantly longer (and may be too slow for quick-start usage). Consider keeping a smaller default and documenting how to increase it for performance testing, or making it configurable via an argument/env var.
| num_samples = len(joint_configs) | ||
| batch_size = batch_size or self.config.sampling.batch_size | ||
| # Cap batch size to total samples | ||
| batch_size = min(batch_size, num_samples) | ||
|
|
||
| logger.log_info( | ||
| f"Computing FK for {num_samples} samples (batch_size={batch_size})..." | ||
| ) | ||
|
|
||
| # Pre-allocate lists for results | ||
| workspace_points_list = [] | ||
| valid_configs_list = [] | ||
|
|
||
| logger.log_info(f"Computing FK for {num_samples} samples...") | ||
|
|
||
| # Track valid points for progress bar | ||
| total_valid = 0 | ||
|
|
||
| # Robot expects one configuration at a time (batch_size from robot environments, not samples) | ||
| # Process each configuration individually | ||
| pbar = self._create_optimized_tqdm( | ||
| range(num_samples), | ||
| desc="Forward Kinematics", | ||
| unit="cfg", | ||
| range(0, num_samples, batch_size), | ||
| desc="Forward Kinematics (batched)", | ||
| unit="batch", |
There was a problem hiding this comment.
batch_size = min(batch_size, num_samples) can produce batch_size == 0 when num_samples is 0 (or config batch_size is 0), which makes range(0, num_samples, batch_size) raise ValueError (step must be non-zero). Add an early return for num_samples == 0 (and/or validate batch_size >= 1) before constructing the progress range.
|
|
||
| logger.log_info( | ||
| f"Pre-filtered Cartesian points: {valid_cartesian_mask.sum()}/{num_samples} " | ||
| f"points pass workspace constraints ({(valid_cartesian_mask.sum()/num_samples*100):.1f}%)" |
There was a problem hiding this comment.
This log line computes a percentage as a torch scalar and formats it with :.1f, which raises TypeError: unsupported format string passed to Tensor.__format__. Also, when num_samples == 0 it will divide by zero. Convert the mask sum to a Python number via .item() and guard the percentage computation for the empty-input case.
| logger.log_info( | |
| f"Pre-filtered Cartesian points: {valid_cartesian_mask.sum()}/{num_samples} " | |
| f"points pass workspace constraints ({(valid_cartesian_mask.sum()/num_samples*100):.1f}%)" | |
| num_valid_cartesian_points = valid_cartesian_mask.sum().item() | |
| valid_cartesian_percentage = ( | |
| (num_valid_cartesian_points / num_samples * 100) if num_samples else 0.0 | |
| ) | |
| logger.log_info( | |
| f"Pre-filtered Cartesian points: {num_valid_cartesian_points}/{num_samples} " | |
| f"points pass workspace constraints ({valid_cartesian_percentage:.1f}%)" |
| num_samples = len(cartesian_points) | ||
| ik_samples_per_point = self.config.ik_samples_per_point | ||
| batch_size = batch_size or self.config.sampling.batch_size | ||
| batch_size = min(batch_size, num_samples) | ||
|
|
||
| # Pre-filter Cartesian points by workspace constraints | ||
| # This eliminates points that are outside bounds or in collision zones | ||
| valid_cartesian_mask = self.constraint_checker.check_bounds( | ||
| # Pre-filter by workspace constraints (vectorized) | ||
| valid_cartesian_mask = self.constraint_checker.check_constraints( | ||
| cartesian_points | ||
| ) & self.constraint_checker.check_collision(cartesian_points) | ||
| ) | ||
|
|
||
| logger.log_info( | ||
| f"Pre-filtered Cartesian points: {valid_cartesian_mask.sum()}/{num_samples} " | ||
| f"points pass workspace constraints ({(valid_cartesian_mask.sum()/num_samples*100):.1f}%)" | ||
| ) | ||
|
|
||
| # Store results for all points (including invalid ones for consistent indexing) | ||
| # Get reference end-effector pose for IK target orientation | ||
| current_ee_pose = self._get_reference_pose() | ||
|
|
||
| # Initialize result arrays | ||
| all_success_rates = torch.zeros(num_samples, device=self.device) | ||
| reachable_points_list = [] | ||
| best_configs_list = [] | ||
| total_reachable = 0 | ||
|
|
||
| logger.log_info( | ||
| f"Computing IK for {num_samples} Cartesian samples " | ||
| f"({ik_samples_per_point} seeds per point)..." | ||
| ) | ||
|
|
||
| # Create a random sampler for generating IK seeds (avoid UniformSampler issues) | ||
| # Prepare random seeds for all attempts | ||
| from embodichain.lab.sim.utility.workspace_analyzer.samplers import ( | ||
| RandomSampler, | ||
| ) | ||
|
|
||
| random_sampler = RandomSampler(seed=self.config.sampling.seed) | ||
|
|
||
| # Get reference end-effector pose for IK target orientation | ||
| # Priority: use reference_pose if provided, otherwise compute from current joint configuration | ||
| if ( | ||
| hasattr(self.config, "reference_pose") | ||
| and self.config.reference_pose is not None | ||
| ): | ||
| # Use provided reference pose (should be 4x4 transformation matrix) | ||
| reference_pose = self.config.reference_pose | ||
| if isinstance(reference_pose, np.ndarray): | ||
| reference_pose = torch.from_numpy(reference_pose).to(self.device) | ||
| if reference_pose.dim() == 2: # Shape: (4, 4) -> (1, 4, 4) | ||
| reference_pose = reference_pose.unsqueeze(0) | ||
| current_ee_pose = reference_pose # Shape: (1, 4, 4) | ||
| logger.log_info("Using provided reference pose for IK target orientation") | ||
| else: | ||
| # Fallback: compute current end-effector pose from joint configuration | ||
| try: | ||
| # Using first environment (index 0) for qpos retrieval | ||
| current_qpos = self.robot.get_qpos()[0][ | ||
| self.robot.get_joint_ids(self.control_part_name) | ||
| ] | ||
| current_ee_pose = self.robot.compute_fk( | ||
| name=self.control_part_name, | ||
| qpos=current_qpos.unsqueeze(0), | ||
| to_matrix=True, | ||
| ) # Shape: (1, 4, 4) | ||
| logger.log_info( | ||
| "Computing reference pose from current robot configuration" | ||
| ) | ||
| except Exception as e: | ||
| logger.log_warning(f"Failed to compute current robot pose: {e}") | ||
| # Create identity pose as fallback | ||
| current_ee_pose = torch.eye(4, device=self.device).unsqueeze(0) | ||
| current_ee_pose[0, :3, 3] = torch.tensor( | ||
| [0.5, 0.0, 1.0], device=self.device | ||
| ) # Default position | ||
| logger.log_info("Using default identity pose as fallback") | ||
|
|
||
| # Print current joint configuration and computed pose | ||
| pose_np = current_ee_pose[0].cpu().numpy() | ||
| position = pose_np[:3, 3] | ||
| rotation_matrix = pose_np[:3, :3] | ||
|
|
||
| # Convert rotation matrix to Euler angles | ||
| import scipy.spatial.transform as spt | ||
|
|
||
| euler_angles = spt.Rotation.from_matrix(rotation_matrix).as_euler( | ||
| "xyz", degrees=True | ||
| ) | ||
|
|
||
| # Print detailed reference pose information | ||
| pose_np = current_ee_pose[0].cpu().numpy() | ||
| position = pose_np[:3, 3] | ||
| rotation_matrix = pose_np[:3, :3] | ||
|
|
||
| # Convert rotation matrix to Euler angles (ZYX convention) | ||
| import scipy.spatial.transform as spt | ||
|
|
||
| euler_angles = spt.Rotation.from_matrix(rotation_matrix).as_euler( | ||
| "xyz", degrees=True | ||
| random_sampler = RandomSampler( | ||
| seed=self.config.sampling.seed, device=self.device | ||
| ) | ||
|
|
||
| # Format matrix with proper indentation | ||
| matrix_lines = np.array2string(pose_np, precision=4, suppress_small=True).split( | ||
| "\n" | ||
| ) | ||
| matrix_str = "\n".join(f"\t {line}" for line in matrix_lines) | ||
| logger.log_info( | ||
| f"🎯 Using provided reference pose for IK target orientation:\n" | ||
| f"\t Position: [{position[0]:.4f}, {position[1]:.4f}, {position[2]:.4f}] m\n" | ||
| f"\t Rotation (XYZ Euler): [{euler_angles[0]:.2f}°, {euler_angles[1]:.2f}°, {euler_angles[2]:.2f}°]\n" | ||
| f"\t Matrix:\n{matrix_str}" | ||
| f"Computing IK for {num_samples} Cartesian samples " | ||
| f"(batch_size={batch_size}, {ik_samples_per_point} seeds per point)..." | ||
| ) | ||
|
|
||
| # Track statistics for progress bar | ||
| total_reachable = 0 | ||
|
|
||
| # Process each point individually (robot expects batch_size from environments, not samples) | ||
| pbar = self._create_optimized_tqdm( | ||
| range(num_samples), | ||
| desc="Inverse Kinematics", | ||
| unit="pt", | ||
| range(0, num_samples, batch_size), | ||
| desc="Inverse Kinematics (batched)", | ||
| unit="batch", | ||
| color="magenta", |
There was a problem hiding this comment.
batch_size = min(batch_size, num_samples) can yield batch_size == 0 when num_samples is 0 (or batch_size is misconfigured as 0). That will make range(0, num_samples, batch_size) invalid (step=0). Consider early-returning a fully-empty result tuple for num_samples == 0 and validating batch_size >= 1 before constructing the tqdm range.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 12 out of 12 changed files in this pull request and generated 2 comments.
Comments suppressed due to low confidence (1)
embodichain/lab/sim/utility/workspace_analyzer/samplers/iniform_sampler.py:108
- The updated docstring/logic claims the generated grid size
(samples_per_dim**n_dims)will not exceednum_samples, butsamples_per_dim = max(2, int(num_samples ** (1/n_dims)))can still overshoot for smallnum_samples/ highn_dims(e.g.,num_samples=2, n_dims=6yieldssamples_per_dim=2=> 64 samples). Either relax the docstring, or adjust the computation to ensuresamples_per_dim**n_dims <= num_samples(e.g., allowsamples_per_dimto be 1, or decrement until the constraint holds).
Note: The actual number of samples (samples_per_dim^n_dims) will not
exceed this value, but may be less to maintain a uniform grid.
Returns:
Tensor of shape (actual_num_samples, n_dims) containing the sampled points.
The actual number of samples will be samples_per_dim^n_dims.
Raises:
ValueError: If bounds are invalid.
Examples:
>>> sampler = UniformSampler(samples_per_dim=3)
>>> bounds = torch.tensor([[-1, 1], [-1, 1]], dtype=torch.float32)
>>> samples = sampler.sample(bounds, num_samples=10)
>>> samples.shape
torch.Size([9, 2]) # 3^2 = 9 samples
"""
bounds = self._validate_bounds(bounds)
n_dims = bounds.shape[0]
# Calculate samples per dimension if not provided
if self.samples_per_dim is None:
# Compute samples_per_dim to approximate the desired num_samples
# Use floor to ensure actual grid size never exceeds num_samples
samples_per_dim = max(2, int(num_samples ** (1.0 / n_dims)))
else:
samples_per_dim = self.samples_per_dim
actual_num_samples = samples_per_dim**n_dims
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| num_samples = len(cartesian_points) | ||
| ik_samples_per_point = self.config.ik_samples_per_point | ||
| batch_size = batch_size or self.config.sampling.batch_size | ||
| batch_size = min(batch_size, num_samples) | ||
|
|
||
| # Pre-filter Cartesian points by workspace constraints | ||
| # This eliminates points that are outside bounds or in collision zones | ||
| valid_cartesian_mask = self.constraint_checker.check_bounds( | ||
| # Pre-filter by workspace constraints (vectorized) | ||
| valid_cartesian_mask = self.constraint_checker.check_constraints( | ||
| cartesian_points | ||
| ) & self.constraint_checker.check_collision(cartesian_points) | ||
| ) | ||
|
|
||
| logger.log_info( | ||
| f"Pre-filtered Cartesian points: {valid_cartesian_mask.sum()}/{num_samples} " | ||
| f"points pass workspace constraints ({(valid_cartesian_mask.sum()/num_samples*100):.1f}%)" | ||
| ) | ||
|
|
||
| # Store results for all points (including invalid ones for consistent indexing) | ||
| # Get reference end-effector pose for IK target orientation | ||
| current_ee_pose = self._get_reference_pose() | ||
|
|
||
| # Initialize result arrays | ||
| all_success_rates = torch.zeros(num_samples, device=self.device) | ||
| reachable_points_list = [] | ||
| best_configs_list = [] | ||
| total_reachable = 0 | ||
|
|
||
| logger.log_info( | ||
| f"Computing IK for {num_samples} Cartesian samples " | ||
| f"({ik_samples_per_point} seeds per point)..." | ||
| ) | ||
|
|
||
| # Create a random sampler for generating IK seeds (avoid UniformSampler issues) | ||
| # Prepare random seeds for all attempts | ||
| from embodichain.lab.sim.utility.workspace_analyzer.samplers import ( | ||
| RandomSampler, | ||
| ) | ||
|
|
||
| random_sampler = RandomSampler(seed=self.config.sampling.seed) | ||
|
|
||
| # Get reference end-effector pose for IK target orientation | ||
| # Priority: use reference_pose if provided, otherwise compute from current joint configuration | ||
| if ( | ||
| hasattr(self.config, "reference_pose") | ||
| and self.config.reference_pose is not None | ||
| ): | ||
| # Use provided reference pose (should be 4x4 transformation matrix) | ||
| reference_pose = self.config.reference_pose | ||
| if isinstance(reference_pose, np.ndarray): | ||
| reference_pose = torch.from_numpy(reference_pose).to(self.device) | ||
| if reference_pose.dim() == 2: # Shape: (4, 4) -> (1, 4, 4) | ||
| reference_pose = reference_pose.unsqueeze(0) | ||
| current_ee_pose = reference_pose # Shape: (1, 4, 4) | ||
| logger.log_info("Using provided reference pose for IK target orientation") | ||
| else: | ||
| # Fallback: compute current end-effector pose from joint configuration | ||
| try: | ||
| # Using first environment (index 0) for qpos retrieval | ||
| current_qpos = self.robot.get_qpos()[0][ | ||
| self.robot.get_joint_ids(self.control_part_name) | ||
| ] | ||
| current_ee_pose = self.robot.compute_fk( | ||
| name=self.control_part_name, | ||
| qpos=current_qpos.unsqueeze(0), | ||
| to_matrix=True, | ||
| ) # Shape: (1, 4, 4) | ||
| logger.log_info( | ||
| "Computing reference pose from current robot configuration" | ||
| ) | ||
| except Exception as e: | ||
| logger.log_warning(f"Failed to compute current robot pose: {e}") | ||
| # Create identity pose as fallback | ||
| current_ee_pose = torch.eye(4, device=self.device).unsqueeze(0) | ||
| current_ee_pose[0, :3, 3] = torch.tensor( | ||
| [0.5, 0.0, 1.0], device=self.device | ||
| ) # Default position | ||
| logger.log_info("Using default identity pose as fallback") | ||
|
|
||
| # Print current joint configuration and computed pose | ||
| pose_np = current_ee_pose[0].cpu().numpy() | ||
| position = pose_np[:3, 3] | ||
| rotation_matrix = pose_np[:3, :3] | ||
|
|
||
| # Convert rotation matrix to Euler angles | ||
| import scipy.spatial.transform as spt | ||
|
|
||
| euler_angles = spt.Rotation.from_matrix(rotation_matrix).as_euler( | ||
| "xyz", degrees=True | ||
| ) | ||
|
|
||
| # Print detailed reference pose information | ||
| pose_np = current_ee_pose[0].cpu().numpy() | ||
| position = pose_np[:3, 3] | ||
| rotation_matrix = pose_np[:3, :3] | ||
|
|
||
| # Convert rotation matrix to Euler angles (ZYX convention) | ||
| import scipy.spatial.transform as spt | ||
|
|
||
| euler_angles = spt.Rotation.from_matrix(rotation_matrix).as_euler( | ||
| "xyz", degrees=True | ||
| random_sampler = RandomSampler( | ||
| seed=self.config.sampling.seed, device=self.device | ||
| ) | ||
|
|
||
| # Format matrix with proper indentation | ||
| matrix_lines = np.array2string(pose_np, precision=4, suppress_small=True).split( | ||
| "\n" | ||
| ) | ||
| matrix_str = "\n".join(f"\t {line}" for line in matrix_lines) | ||
| logger.log_info( | ||
| f"🎯 Using provided reference pose for IK target orientation:\n" | ||
| f"\t Position: [{position[0]:.4f}, {position[1]:.4f}, {position[2]:.4f}] m\n" | ||
| f"\t Rotation (XYZ Euler): [{euler_angles[0]:.2f}°, {euler_angles[1]:.2f}°, {euler_angles[2]:.2f}°]\n" | ||
| f"\t Matrix:\n{matrix_str}" | ||
| f"Computing IK for {num_samples} Cartesian samples " | ||
| f"(batch_size={batch_size}, {ik_samples_per_point} seeds per point)..." | ||
| ) | ||
|
|
||
| # Track statistics for progress bar | ||
| total_reachable = 0 | ||
|
|
||
| # Process each point individually (robot expects batch_size from environments, not samples) | ||
| pbar = self._create_optimized_tqdm( | ||
| range(num_samples), | ||
| desc="Inverse Kinematics", | ||
| unit="pt", | ||
| range(0, num_samples, batch_size), | ||
| desc="Inverse Kinematics (batched)", | ||
| unit="batch", | ||
| color="magenta", | ||
| emoji="🎯", | ||
| ) |
There was a problem hiding this comment.
compute_reachability() has the same empty-input issue as FK: if cartesian_points is empty then batch_size becomes 0 and range(0, num_samples, batch_size) will throw. Also the pre-filter log computes (valid_cartesian_mask.sum()/num_samples*100) which will divide by zero. Add a num_samples == 0 guard before logging / creating the progress bar and return empty outputs.
| if ( | ||
| hasattr(self.config, "reference_pose") | ||
| and self.config.reference_pose is not None | ||
| ): | ||
| reference_pose = self.config.reference_pose | ||
| if isinstance(reference_pose, np.ndarray): | ||
| reference_pose = torch.from_numpy(reference_pose).to(self.device) | ||
| if reference_pose.dim() == 2: | ||
| reference_pose = reference_pose.unsqueeze(0) | ||
| logger.log_info("Using provided reference pose for IK target orientation") | ||
| return reference_pose |
There was a problem hiding this comment.
_get_reference_pose() only moves reference_pose to self.device when it is a NumPy array. If callers provide a torch.Tensor on CPU (or wrong dtype), the returned pose can be on the wrong device and later compute_batch_ik will error due to device mismatch. Normalize reference_pose for both numpy and torch inputs (e.g., reference_pose = torch.as_tensor(..., device=self.device, dtype=torch.float32)), and keep the (1,4,4) shape handling.
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
Description
This PR optimizes the workspace analyzer for large-scale sampling (10k-100k+ samples) by replacing per-sample Python loops with batched GPU operations and vectorized algorithms.
Changes
Batch FK/IK (P0 - highest impact):
workspace_analyzer.py: Replaced per-samplecompute_fk()loop with chunkedcompute_batch_fk()callsworkspace_analyzer.py: Replaced per-samplecompute_ik()loop with chunkedcompute_batch_ik()calls_get_reference_pose()helper for cleaner IK reference pose handlingcheck_constraints()toWorkspaceConstraintCheckercombining bounds + collision in one callVectorized metrics (P1):
density_metric.py: Replaced O(N²) brute-force density withscipy.spatial.cKDTree(O(N log N)), with brute-force fallbackreachability_metric.py: Replaced dict-based voxel counting withnp.unique(return_counts=True)manipulability_metric.py: Replaced per-sample Jacobian loops with batchnp.linalg.det()andnp.linalg.svd()Vectorized sampler (P1):
halton_sampler.py: Replaced nested Python loops with vectorized van der Corput computation using numpy broadcastingBenchmark script:
scripts/benchmark/workspace_analyzer/benchmark_workspace_analyzer.py: Measures each optimization independently across sample sizes [100, 1000, 10000, 100000]Benchmark Results (CPU-only metrics)
Batch FK/IK benchmarks require live robot/simulation and are expected to show 10-100x speedup.
Type of change
Screenshots
N/A
Checklist
black .command to format the code base.🤖 Generated with Claude Code