From 0b11f496d46950869f1c4ce0be64254656ddc899 Mon Sep 17 00:00:00 2001 From: James Harton Date: Sat, 17 Jan 2026 19:37:09 +1300 Subject: [PATCH] feat: add `BB.Sensor.Mimic` for mechanically-linked joints Adds a sensor that derives joint state from another joint by subscribing to sensor messages and re-publishing transformed values. Useful for modelling parallel jaw grippers and other mechanically-coupled joints. Options: - `source` - name of the source joint to follow (required) - `multiplier` - scale factor for position values (default: 1.0) - `offset` - constant offset added after scaling (default: 0.0) - `message_types` - message types to forward (default: [JointState]) Implements the equivalent of URDF mimic joints: `mimic_position = source_position * multiplier + offset` --- lib/bb/sensor/mimic.ex | 124 +++++++++++++++++++++ test/bb/sensor/mimic_test.exs | 204 ++++++++++++++++++++++++++++++++++ 2 files changed, 328 insertions(+) create mode 100644 lib/bb/sensor/mimic.ex create mode 100644 test/bb/sensor/mimic_test.exs diff --git a/lib/bb/sensor/mimic.ex b/lib/bb/sensor/mimic.ex new file mode 100644 index 0000000..4f2f1ef --- /dev/null +++ b/lib/bb/sensor/mimic.ex @@ -0,0 +1,124 @@ +# SPDX-FileCopyrightText: 2026 James Harton +# +# SPDX-License-Identifier: Apache-2.0 + +defmodule BB.Sensor.Mimic do + @moduledoc """ + A sensor that derives joint state from another joint. + + Subscribes to sensor messages from a source joint and re-publishes + transformed messages for the mimic joint. This is useful for modelling + parallel jaw grippers and other mechanically-linked joint pairs. + + ## Options + + * `:source` - (required) The name of the source joint to follow + * `:multiplier` - (optional, default 1.0) Scale factor applied to position values + * `:offset` - (optional, default 0.0) Constant offset added after scaling + * `:message_types` - (optional, default [JointState]) List of message types to forward + + For JointState messages: `mimic_position = source_position * multiplier + offset` + + ## Example + + joint :right_finger do + type(:prismatic) + sensor(:mimic, {BB.Sensor.Mimic, + source: :left_finger, + multiplier: 1.0, + message_types: [JointState] + }) + end + + ## URDF Mimic Joints + + This sensor implements the equivalent of URDF mimic joints: + + + + + + Forward kinematics and visualisation automatically work since they + consume JointState messages published by this sensor. + """ + + use BB.Sensor, + options_schema: [ + source: [type: :atom, required: true, doc: "Name of the source joint to follow"], + multiplier: [type: :float, default: 1.0, doc: "Scale factor for position values"], + offset: [type: :float, default: 0.0, doc: "Constant offset added after scaling"], + message_types: [ + type: {:list, :atom}, + default: [BB.Message.Sensor.JointState], + doc: "Message types to forward" + ] + ] + + alias BB.Message + alias BB.Message.Sensor.JointState + + @impl BB.Sensor + def init(opts) do + {:ok, state} = build_state(opts) + + BB.subscribe(state.bb.robot, [:sensor | state.source_path], + message_types: state.message_types + ) + + {:ok, state} + end + + defp build_state(opts) do + opts = Map.new(opts) + [sensor_name, joint_name | rest] = Enum.reverse(opts.bb.path) + + source_path = build_source_path(rest, opts.source) + + state = %{ + bb: opts.bb, + sensor_name: sensor_name, + joint_name: joint_name, + source: opts.source, + source_path: source_path, + multiplier: Map.get(opts, :multiplier, 1.0), + offset: Map.get(opts, :offset, 0.0), + message_types: Map.get(opts, :message_types, [JointState]) + } + + {:ok, state} + end + + defp build_source_path(parent_path, source_joint) do + Enum.reverse(parent_path) ++ [source_joint] + end + + @impl BB.Sensor + def handle_info( + {:bb, _source_path, %Message{payload: %JointState{} = payload} = message}, + state + ) do + transformed_payload = transform_joint_state(payload, state) + transformed_message = %{message | payload: transformed_payload} + BB.publish(state.bb.robot, [:sensor | state.bb.path], transformed_message) + {:noreply, state} + end + + def handle_info({:bb, _source_path, %Message{} = message}, state) do + BB.publish(state.bb.robot, [:sensor | state.bb.path], message) + {:noreply, state} + end + + defp transform_joint_state(%JointState{} = payload, state) do + positions = + Enum.map(payload.positions, fn pos -> + pos * state.multiplier + state.offset + end) + + names = + Enum.map(payload.names, fn _name -> + state.joint_name + end) + + %{payload | positions: positions, names: names} + end +end diff --git a/test/bb/sensor/mimic_test.exs b/test/bb/sensor/mimic_test.exs new file mode 100644 index 0000000..bec16f7 --- /dev/null +++ b/test/bb/sensor/mimic_test.exs @@ -0,0 +1,204 @@ +# SPDX-FileCopyrightText: 2026 James Harton +# +# SPDX-License-Identifier: Apache-2.0 + +defmodule BB.Sensor.MimicTest do + use ExUnit.Case, async: true + use Mimic + + alias BB.Message + alias BB.Message.Sensor.JointState + alias BB.Sensor.Mimic + + @source_joint :left_finger + @mimic_joint :right_finger + @sensor_name :mimic + + defp joint_state_message(names, positions) do + Message.new!(JointState, :gripper, + names: names, + positions: positions + ) + end + + defp default_bb_context do + %{robot: TestRobot, path: [:gripper_link, @mimic_joint, @sensor_name]} + end + + defp init_sensor(opts \\ []) do + stub(BB.PubSub, :subscribe, fn _robot, _path, _opts -> :ok end) + + default_opts = [bb: default_bb_context(), source: @source_joint] + {:ok, state} = Mimic.init(Keyword.merge(default_opts, opts)) + + state + end + + describe "init/1" do + test "subscribes to source joint's sensor topic" do + test_pid = self() + + expect(BB.PubSub, :subscribe, fn robot, path, opts -> + send(test_pid, {:subscribed, robot, path, opts}) + :ok + end) + + opts = [bb: default_bb_context(), source: @source_joint] + {:ok, _state} = Mimic.init(opts) + + assert_receive {:subscribed, TestRobot, [:sensor, :gripper_link, @source_joint], opts} + assert opts[:message_types] == [JointState] + end + + test "defaults to multiplier of 1.0" do + state = init_sensor() + + assert state.multiplier == 1.0 + end + + test "defaults to offset of 0.0" do + state = init_sensor() + + assert state.offset == 0.0 + end + + test "accepts custom multiplier" do + state = init_sensor(multiplier: -1.0) + + assert state.multiplier == -1.0 + end + + test "accepts custom offset" do + state = init_sensor(offset: 0.01) + + assert state.offset == 0.01 + end + + test "stores source joint name" do + state = init_sensor() + + assert state.source == @source_joint + end + + test "stores mimic joint name" do + state = init_sensor() + + assert state.joint_name == @mimic_joint + end + end + + describe "handle_info/2 with JointState" do + test "transforms position with default multiplier and offset" do + state = init_sensor() + test_pid = self() + + expect(BB.PubSub, :publish, fn robot, path, message -> + send(test_pid, {:published, robot, path, message}) + :ok + end) + + source_message = joint_state_message([@source_joint], [0.025]) + + {:noreply, _state} = + Mimic.handle_info({:bb, [:sensor, :gripper_link, @source_joint], source_message}, state) + + assert_receive {:published, TestRobot, [:sensor, :gripper_link, @mimic_joint, @sensor_name], + message} + + assert message.payload.names == [@mimic_joint] + assert message.payload.positions == [0.025] + end + + test "applies multiplier to position" do + state = init_sensor(multiplier: -1.0) + test_pid = self() + + expect(BB.PubSub, :publish, fn _robot, _path, message -> + send(test_pid, {:position, message.payload.positions}) + :ok + end) + + source_message = joint_state_message([@source_joint], [0.025]) + + {:noreply, _state} = + Mimic.handle_info({:bb, [:sensor, :gripper_link, @source_joint], source_message}, state) + + assert_receive {:position, [-0.025]} + end + + test "applies offset to position" do + state = init_sensor(offset: 0.01) + test_pid = self() + + expect(BB.PubSub, :publish, fn _robot, _path, message -> + send(test_pid, {:position, message.payload.positions}) + :ok + end) + + source_message = joint_state_message([@source_joint], [0.025]) + + {:noreply, _state} = + Mimic.handle_info({:bb, [:sensor, :gripper_link, @source_joint], source_message}, state) + + assert_receive {:position, [0.035]} + end + + test "applies both multiplier and offset correctly" do + state = init_sensor(multiplier: 2.0, offset: 0.01) + test_pid = self() + + expect(BB.PubSub, :publish, fn _robot, _path, message -> + send(test_pid, {:position, message.payload.positions}) + :ok + end) + + source_message = joint_state_message([@source_joint], [0.025]) + + {:noreply, _state} = + Mimic.handle_info({:bb, [:sensor, :gripper_link, @source_joint], source_message}, state) + + assert_receive {:position, [position]} + assert_in_delta position, 0.06, 0.0001 + end + + test "replaces joint name with mimic joint name" do + state = init_sensor() + test_pid = self() + + expect(BB.PubSub, :publish, fn _robot, _path, message -> + send(test_pid, {:names, message.payload.names}) + :ok + end) + + source_message = joint_state_message([@source_joint], [0.025]) + + {:noreply, _state} = + Mimic.handle_info({:bb, [:sensor, :gripper_link, @source_joint], source_message}, state) + + assert_receive {:names, [@mimic_joint]} + end + end + + describe "handle_info/2 with non-JointState messages" do + test "forwards message without transformation" do + state = init_sensor(message_types: [JointState, SomeOtherType]) + test_pid = self() + + other_message = %Message{ + timestamp: System.monotonic_time(:nanosecond), + frame_id: :gripper, + payload: %{__struct__: SomeOtherType, value: 42} + } + + expect(BB.PubSub, :publish, fn _robot, _path, message -> + send(test_pid, {:forwarded, message}) + :ok + end) + + {:noreply, _state} = + Mimic.handle_info({:bb, [:sensor, :gripper_link, @source_joint], other_message}, state) + + assert_receive {:forwarded, ^other_message} + end + end +end