Skip to content

Commit

Permalink
EventClientSubscriptionManager sub-service subscription support (#225)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hackerman342 committed Apr 25, 2024
1 parent 9798528 commit 4766425
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 45 deletions.
12 changes: 8 additions & 4 deletions .github/workflows/ci_python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [ubuntu-latest, macos-latest]
python-version: ['3.8', '3.9', '3.10']
# Using macos-12 bc:
# 1. they removed support for python 3.8 & 3.9 on macos-14 (now macos-latest)
# - See: https://github.com/actions/setup-python/issues/696#issuecomment-2071769156
# 2. `test_pose` fails on macos-13 and macos-14 (but not macos-12)
os: [ubuntu-latest, macos-12]
python-version: ['3.8', '3.9', '3.10', '3.11', '3.12']
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
with:
submodules: "recursive"
- uses: actions/setup-python@v4
- uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Install dependencies
Expand Down
26 changes: 14 additions & 12 deletions py/farm_ng/core/event_client_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
EventServiceConfigList,
SubscribeRequest,
)
from farm_ng.core.uri import get_service_name
from farm_ng.core.uri_pb2 import Uri

__all__ = ["EventClientSubscriptionManager"]
Expand Down Expand Up @@ -86,37 +87,38 @@ def _initialize_clients(
clients.update({config.name: EventClient(config)})
return clients

def _try_register_topic(self, service_name: str, uri_path: str) -> bool:
def _try_register_topic(self, client_name: str, uri: Uri) -> bool:
"""Attempts to register a topic for a service.
Args:
service_name: The name of the service.
uri_path: The path of the topic.
client_name: The name of the client to register the topic for.
uri: The uri to register.
Returns:
True if the topic was registered successfully, False otherwise.
"""
if service_name not in self._clients:
self.logger.warning("Service %s not found", service_name)
if client_name not in self._clients:
self.logger.warning("Service %s not found", client_name)
return False

# make the key for the subscription map
service_path: str = f"{service_name}{uri_path}"
service_name = get_service_name(uri)
topic_name: str = f"{service_name}{uri.path}"

if service_path in self._subscriptions:
# self.logger.warning("Topic %s already registered", service_path)
if topic_name in self._subscriptions:
# self.logger.warning("Topic %s already registered", topic_name)
return False

subscribe_request = SubscribeRequest(
uri=Uri(
path=uri_path,
path=uri.path,
query=f"service_name={service_name}",
),
every_n=1,
)

self.logger.info("Registering topic %s", service_path)
self._subscriptions[service_path] = subscribe_request
self.logger.info("Registering topic %s for client %s", topic_name, client_name)
self._subscriptions[topic_name] = subscribe_request

return True

Expand All @@ -133,7 +135,7 @@ async def update_subscriptions_for_client(self, client: EventClient) -> None:
# register the topics
uri: Uri
for uri in uris:
self._try_register_topic(client.config.name, uri.path)
self._try_register_topic(client.config.name, uri)

await asyncio.sleep(1.0)

Expand Down
5 changes: 2 additions & 3 deletions py/tests/_asyncio/test_event_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
)
from google.protobuf.message import Message

pytestmark = pytest.mark.anyio


class TestEventClient:
def test_smoke(self, event_client: EventClient) -> None:
Expand All @@ -24,7 +26,6 @@ def test_smoke(self, event_client: EventClient) -> None:
assert event_client.logger.name == "test_service/client"
assert event_client.server_address == "localhost:5001"

@pytest.mark.anyio()
async def test_publish_subscribe(
self,
event_service: EventServiceGrpc,
Expand Down Expand Up @@ -71,7 +72,6 @@ async def subscribe_callback(client: EventClient, queue: asyncio.Queue):
pass
assert task.done()

@pytest.mark.anyio()
async def test_request_reply(
self,
event_service: EventServiceGrpc,
Expand Down Expand Up @@ -109,7 +109,6 @@ async def request_reply_handler(
assert "StringValue" in res.event.uri.query
assert res.payload == b"\n\x03foo"

@pytest.mark.anyio()
async def test_request_reply_callback(
self,
event_service: EventServiceGrpc,
Expand Down
9 changes: 3 additions & 6 deletions py/tests/_asyncio/test_event_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
if TYPE_CHECKING:
from farm_ng.core.event_service import EventServiceGrpc

pytestmark = pytest.mark.anyio


class TestEventServiceGrpc:
@pytest.mark.anyio()
async def test_smoke(self, event_service: EventServiceGrpc) -> None:
# reset the counts
event_service.reset()
Expand All @@ -27,7 +28,6 @@ async def test_smoke(self, event_service: EventServiceGrpc) -> None:
assert not event_service.uris
assert event_service.request_reply_handler is None

@pytest.mark.anyio()
async def test_publish(self, event_service: EventServiceGrpc) -> None:
# reset the counts
event_service.reset()
Expand Down Expand Up @@ -57,7 +57,6 @@ async def test_publish(self, event_service: EventServiceGrpc) -> None:
assert event_service.metrics.data["/bar/send_count"] == 1
assert event_service.uris["/bar"].query == message_uri.query

@pytest.mark.anyio()
async def test_publish_error(self, event_service: EventServiceGrpc) -> None:
# reset the counts
event_service.reset()
Expand All @@ -72,7 +71,6 @@ async def test_publish_error(self, event_service: EventServiceGrpc) -> None:
):
await event_service.publish(path="/foo", message=Int32Value(value=0))

@pytest.mark.anyio()
async def test_multiple_publishers(self, event_service: EventServiceGrpc) -> None:
async def _publish_message(
event_service: EventServiceGrpc,
Expand All @@ -99,7 +97,6 @@ async def _publish_message(
assert event_service.metrics.data["/foo/send_count"] == 2
assert event_service.metrics.data["/bar/send_count"] == 3

@pytest.mark.anyio()
async def test_latch(self, event_service: EventServiceGrpc) -> None:
# reset the counts
event_service.reset()
Expand Down Expand Up @@ -136,6 +133,6 @@ async def test_latch(self, event_service: EventServiceGrpc) -> None:
assert message.value == 42
break

@pytest.mark.skip(reason="TODO: implement me")
# @pytest.mark.skip(reason="TODO: implement me")
def test_list_uris(self) -> None:
pass
5 changes: 2 additions & 3 deletions py/tests/_asyncio/test_event_service_recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
if TYPE_CHECKING:
from farm_ng.core.uri_pb2 import Uri

pytestmark = pytest.mark.anyio


async def request_reply_handler(
request: RequestReplyRequest,
Expand All @@ -37,7 +39,6 @@ def test_smoke(self, recorder_service: EventServiceRecorder) -> None:
assert recorder_service.logger.name == "record_default"
assert recorder_service.record_queue.qsize() == 0

@pytest.mark.anyio()
async def test_event_service_recorder(
self,
tmp_path: Path,
Expand Down Expand Up @@ -85,7 +86,6 @@ async def test_event_service_recorder(
event_message = event_log.read_message()
assert event_message == message

@pytest.mark.anyio()
async def test_file_headers(
self,
tmp_path: Path,
Expand Down Expand Up @@ -147,7 +147,6 @@ async def test_file_headers(


class TestRecorderService:
@pytest.mark.anyio()
async def test_recorder_service(
self,
tmp_path: Path,
Expand Down
3 changes: 2 additions & 1 deletion py/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

from .event_common import event_service_config_list as _event_service_config_list

pytestmark = pytest.mark.anyio


@pytest.fixture(scope="session")
def anyio_backend():
Expand All @@ -31,7 +33,6 @@ def event_service_config(
return event_service_config_list.configs[0]


@pytest.mark.anyio()
@pytest.fixture(scope="module")
async def event_service(
event_service_config: EventServiceConfig,
Expand Down
41 changes: 29 additions & 12 deletions py/tests/test_event_client_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest
from farm_ng.core import uri_pb2
from farm_ng.core.event_client_manager import EventClientSubscriptionManager
from farm_ng.core.event_service_pb2 import (
EventServiceConfigList,
Expand Down Expand Up @@ -38,7 +39,13 @@ def test_register_topic(
event_manager: EventClientSubscriptionManager,
) -> None:
# register a topic
assert event_manager._try_register_topic("test_service", "/foo")
uri_foo: uri_pb2.Uri = uri_pb2.Uri(
scheme="protobuf",
authority="localhost",
path="/foo",
query="type=farm_ng.core.proto.Timestamp&pb=farm_ng/core/timestamp.proto&service_name=test_service",
)
assert event_manager._try_register_topic("test_service", uri=uri_foo)
assert "test_service/foo" in event_manager._subscriptions
assert event_manager._subscriptions["test_service/foo"].uri.path == "/foo"
assert (
Expand All @@ -47,24 +54,34 @@ def test_register_topic(
)
assert event_manager._subscriptions["test_service/foo"].every_n == 1

# register a second topic
assert event_manager._try_register_topic("test_service", "/bar")
assert "test_service/bar" in event_manager._subscriptions
assert event_manager._subscriptions["test_service/bar"].uri.path == "/bar"
# register a second topic, on a sub-service
uri_bar: uri_pb2.Uri = uri_pb2.Uri(
path="/bar",
query="type=farm_ng.core.proto.Timestamp&pb=farm_ng/core/timestamp.proto&service_name=sub_service",
)
assert event_manager._try_register_topic("test_service", uri_bar)
assert "sub_service/bar" in event_manager._subscriptions
assert event_manager._subscriptions["sub_service/bar"].uri.path == "/bar"
assert (
event_manager._subscriptions["test_service/bar"].uri.query
== "service_name=test_service"
event_manager._subscriptions["sub_service/bar"].uri.query
== "service_name=sub_service"
)
assert event_manager._subscriptions["test_service/bar"].every_n == 1
assert event_manager._subscriptions["sub_service/bar"].every_n == 1

# register a topic for a service that doesn't exist
assert not event_manager._try_register_topic("unexisting_service", "/bar")
assert not event_manager._try_register_topic("unexisting_service", uri_bar)

# register a topic that already exists
assert not event_manager._try_register_topic("test_service", "/foo")
assert not event_manager._try_register_topic("test_service", uri_foo)

def test_get_all_uris(self, event_manager: EventClientSubscriptionManager) -> None:
assert event_manager._try_register_topic("test_service", "/foo")
uri: uri_pb2.Uri = uri_pb2.Uri(
scheme="protobuf",
authority="localhost",
path="/foo",
query="type=farm_ng.core.proto.Timestamp&pb=farm_ng/core/timestamp.proto&service_name=sub_service",
)
assert event_manager._try_register_topic("test_service", uri)
config_list = event_manager.get_all_uris_config_list(
config_name="record_default",
)
Expand All @@ -75,6 +92,6 @@ def test_get_all_uris(self, event_manager: EventClientSubscriptionManager) -> No
assert config_list.configs[1].subscriptions[0].uri.path == "/foo"
assert (
config_list.configs[1].subscriptions[0].uri.query
== "service_name=test_service"
== "service_name=sub_service"
)
assert config_list.configs[1].subscriptions[0].every_n == 1
4 changes: 2 additions & 2 deletions py/tests/test_lie.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ def test_pose():
# computes the error between frame_b of two respective poses
# e.g. robot and robot now
err = ng.Pose3F64.error(world_from_robot, world_from_robot_now) * (1 / dt)
# print(err)
# print(tangent_of_now_in_prev)
print("err:", err)
print("tan:", world_from_robot_now.tangent_of_b_in_a)
assert np.allclose(err, world_from_robot_now.tangent_of_b_in_a)
world_from_robot = world_from_robot_now

Expand Down
6 changes: 4 additions & 2 deletions run_python_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ EXIT_STATUS=0
if [[ "$*" == *"--asyncio"* ]]; then
# iterate over all files in the directory
for file in $PYTHON_ASYNCIO_TESTS_DIR/*.py; do
# run the test
pytest -v $file || EXIT_STATUS=$?
if [[ "$(basename $file)" != "__init__.py" ]]; then
# run the test
pytest -v $file || EXIT_STATUS=$?
fi
done

# otherwise, run all tests except asyncio tests
Expand Down

0 comments on commit 4766425

Please sign in to comment.