Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions dimos/protocol/tf/test_tf.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.

import math
import threading
import time

import pytest
Expand Down Expand Up @@ -371,6 +372,99 @@ def test_time_tolerance(self) -> None:
result = ttbuffer.get("world", "robot", time_point=base_time + 0.5, time_tolerance=0.1)
assert result is None

def test_forward_tolerance_returns_when_buffer_fills(self) -> None:
ttbuffer = MultiTBuffer()
base_time = time.time()

def publish_after_delay() -> None:
time.sleep(0.05)
ttbuffer.receive_transform(
Transform(
translation=Vector3(1.0, 0.0, 0.0),
frame_id="world",
child_frame_id="robot",
ts=base_time,
)
)

publisher = threading.Thread(target=publish_after_delay)
publisher.start()

t0 = time.monotonic()
result = ttbuffer.get(
"world", "robot", time_point=base_time, time_tolerance=0.1, forward_tolerance=1.0
)
elapsed = time.monotonic() - t0
publisher.join()

assert result is not None
assert result.translation.x == 1.0
assert elapsed < 0.5

def test_forward_tolerance_times_out(self) -> None:
ttbuffer = MultiTBuffer()
t0 = time.monotonic()
result = ttbuffer.get("world", "robot", time_point=time.time(), forward_tolerance=0.1)
elapsed = time.monotonic() - t0
assert result is None
assert 0.08 < elapsed < 1.0

def test_forward_tolerance_fast_path_when_already_available(self) -> None:
ttbuffer = MultiTBuffer()
base_time = time.time()
ttbuffer.receive_transform(
Transform(
translation=Vector3(2.0, 0.0, 0.0),
frame_id="world",
child_frame_id="robot",
ts=base_time,
)
)

t0 = time.monotonic()
result = ttbuffer.get("world", "robot", time_point=base_time, forward_tolerance=10.0)
elapsed = time.monotonic() - t0

assert result is not None
assert result.translation.x == 2.0
assert elapsed < 0.05

def test_forward_tolerance_wakes_on_chain_completion(self) -> None:
ttbuffer = MultiTBuffer()
base_time = time.time()

ttbuffer.receive_transform(
Transform(
translation=Vector3(1.0, 0.0, 0.0),
frame_id="world",
child_frame_id="robot",
ts=base_time,
)
)

def publish_after_delay() -> None:
time.sleep(0.05)
ttbuffer.receive_transform(
Transform(
translation=Vector3(0.0, 2.0, 0.0),
frame_id="robot",
child_frame_id="sensor",
ts=base_time,
)
)

publisher = threading.Thread(target=publish_after_delay)
publisher.start()

result = ttbuffer.get(
"world", "sensor", time_point=base_time, time_tolerance=0.1, forward_tolerance=1.0
)
publisher.join()

assert result is not None
assert result.translation.x == 1.0
assert result.translation.y == 2.0

def test_same_frame_returns_identity(self) -> None:
ttbuffer = MultiTBuffer()

Expand Down
Loading
Loading