diff --git a/Cargo.lock b/Cargo.lock index 8bc22e3..4c29ee4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -157,6 +157,12 @@ version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +[[package]] +name = "hermit-abi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + [[package]] name = "home" version = "0.5.9" @@ -248,6 +254,16 @@ dependencies = [ "syn", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "object" version = "0.32.2" @@ -594,6 +610,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1adbebffeca75fcfd058afa480fb6c0b81e165a0323f9c9d39c9697e37c46787" dependencies = [ "backtrace", + "num_cpus", "pin-project-lite", "tokio-macros", ] diff --git a/Cargo.toml b/Cargo.toml index 9b4a8f6..508da51 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ communication = ["usubscription", "dep:thiserror", "tokio/sync", "tokio/time"] udiscovery = [] usubscription = [] utwin = [] +util = ["tokio/sync"] [dependencies] async-trait = { version = "0.1" } @@ -65,6 +66,7 @@ test-case = { version = "3.3" } tokio = { version = "1.35", default-features = false, features = [ "macros", "rt", + "rt-multi-thread", "sync", "time", ] } @@ -76,3 +78,15 @@ codegen-units = 1 [package.metadata.docs.rs] all-features = true + +[[example]] +name = "simple_notify" +required-features = ["util"] + +[[example]] +name = "simple_publish" +required-features = ["util"] + +[[example]] +name = "simple_rpc" +required-features = ["util"] diff --git a/examples/simple_notify.rs b/examples/simple_notify.rs new file mode 100644 index 0000000..ca0ce4d --- /dev/null +++ b/examples/simple_notify.rs @@ -0,0 +1,62 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +use std::sync::Arc; + +use protobuf::well_known_types::wrappers::StringValue; +use up_rust::{ + communication::{CallOptions, Notifier, SimpleNotifier, UPayload}, + local_transport::LocalTransport, + LocalUriProvider, UListener, UMessage, +}; + +struct ConsolePrinter {} + +#[async_trait::async_trait] +impl UListener for ConsolePrinter { + async fn on_receive(&self, msg: UMessage) { + if let Ok(payload) = msg.extract_protobuf::() { + println!("received notification: {}", payload.value); + } + } +} + +#[tokio::main] +pub async fn main() -> Result<(), Box> { + const ORIGIN_RESOURCE_ID: u16 = 0xd100; + + let transport = Arc::new(LocalTransport::new("my-vehicle", 0xa34b, 0x01)); + let uri_provider: Arc = transport.clone(); + let notifier = SimpleNotifier::new(transport.clone(), uri_provider.clone()); + let topic = uri_provider.get_resource_uri(ORIGIN_RESOURCE_ID); + let listener = Arc::new(ConsolePrinter {}); + + notifier.start_listening(&topic, listener.clone()).await?; + + let value = StringValue { + value: "Hello".to_string(), + ..Default::default() + }; + let payload = UPayload::try_from_protobuf(value)?; + notifier + .notify( + ORIGIN_RESOURCE_ID, + &uri_provider.get_source_uri(), + CallOptions::for_notification(None, None, None), + Some(payload), + ) + .await?; + + notifier.stop_listening(&topic, listener).await?; + Ok(()) +} diff --git a/examples/simple_publish.rs b/examples/simple_publish.rs new file mode 100644 index 0000000..89d39b1 --- /dev/null +++ b/examples/simple_publish.rs @@ -0,0 +1,72 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +use std::sync::Arc; + +use protobuf::well_known_types::wrappers::StringValue; +use up_rust::{ + communication::{CallOptions, Publisher, SimplePublisher, UPayload}, + local_transport::LocalTransport, + LocalUriProvider, UListener, UMessage, UTransport, +}; + +struct ConsolePrinter {} + +#[async_trait::async_trait] +impl UListener for ConsolePrinter { + async fn on_receive(&self, msg: UMessage) { + if let Ok(payload) = msg.extract_protobuf::() { + println!("received event: {}", payload.value); + } + } +} + +#[tokio::main] +pub async fn main() -> Result<(), Box> { + const ORIGIN_RESOURCE_ID: u16 = 0xb4c1; + let transport = Arc::new(LocalTransport::new("my-vehicle", 0xa34b, 0x01)); + let uri_provider: Arc = transport.clone(); + let publisher = SimplePublisher::new(transport.clone(), uri_provider.clone()); + let listener = Arc::new(ConsolePrinter {}); + + transport + .register_listener( + &uri_provider.get_resource_uri(ORIGIN_RESOURCE_ID), + None, + listener.clone(), + ) + .await?; + + let value = StringValue { + value: "Hello".to_string(), + ..Default::default() + }; + let payload = UPayload::try_from_protobuf(value)?; + publisher + .publish( + ORIGIN_RESOURCE_ID, + CallOptions::for_publish(None, None, None), + Some(payload), + ) + .await?; + + transport + .unregister_listener( + &uri_provider.get_resource_uri(ORIGIN_RESOURCE_ID), + None, + listener, + ) + .await?; + + Ok(()) +} diff --git a/examples/simple_rpc.rs b/examples/simple_rpc.rs new file mode 100644 index 0000000..ead83f0 --- /dev/null +++ b/examples/simple_rpc.rs @@ -0,0 +1,109 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +/*! +This example illustrates how client code can use the _Communication Level API_ to invoke a +service operation. It also shows how the corresponding service provider can be implemented. + */ +use std::sync::Arc; + +use protobuf::well_known_types::wrappers::StringValue; +use up_rust::{ + communication::{ + CallOptions, InMemoryRpcClient, InMemoryRpcServer, RequestHandler, RpcClient, RpcServer, + ServiceInvocationError, UPayload, + }, + local_transport::LocalTransport, + LocalUriProvider, +}; + +struct EchoOperation {} + +#[async_trait::async_trait] +impl RequestHandler for EchoOperation { + async fn invoke_method( + &self, + _resource_id: u16, + request_payload: Option, + ) -> Result, ServiceInvocationError> { + if let Some(req_payload) = request_payload { + Ok(Some(req_payload)) + } else { + Err(ServiceInvocationError::InvalidArgument( + "request has no payload".to_string(), + )) + } + } +} + +#[tokio::main] +pub async fn main() -> Result<(), Box> { + const METHOD_RESOURCE_ID: u16 = 0x00a0; + let transport = Arc::new(LocalTransport::new("my-vehicle", 0xa34b, 0x01)); + let uri_provider: Arc = transport.clone(); + + // create the RpcServer using the local transport + let rpc_server = InMemoryRpcServer::new(transport.clone(), uri_provider.clone()); + // and register an endpoint for the service operation + let echo_op = Arc::new(EchoOperation {}); + + rpc_server + .register_endpoint(None, METHOD_RESOURCE_ID, echo_op.clone()) + .await?; + + // now create an RpcClient attached to the same local transport + let rpc_client = InMemoryRpcClient::new(transport.clone(), uri_provider.clone()).await?; + // and invoke the service operation without any payload + match rpc_client + .invoke_method( + uri_provider.get_resource_uri(METHOD_RESOURCE_ID), + CallOptions::for_rpc_request(1_000, None, None, None), + None, // no payload + ) + .await + { + Err(ServiceInvocationError::InvalidArgument(msg)) => { + println!("service returned expected error: {}", msg) + } + _ => panic!("expected service to return an Invalid Argument error"), + } + + // now invoke the operaiton with a message in the request payload + let value = StringValue { + value: "Hello".to_string(), + ..Default::default() + }; + let payload = UPayload::try_from_protobuf(value)?; + // and make sure that the response contains a message in the payload + match rpc_client + .invoke_method( + uri_provider.get_resource_uri(METHOD_RESOURCE_ID), + CallOptions::for_rpc_request(1_000, None, None, None), + Some(payload), + ) + .await + { + Ok(Some(payload)) => { + let value = payload.extract_protobuf::()?; + println!("service returned message: {}", value.value); + } + _ => panic!("expected service to return response message"), + } + + // and finally unregister the endpoint + rpc_server + .unregister_endpoint(None, METHOD_RESOURCE_ID, echo_op) + .await?; + + Ok(()) +} diff --git a/src/core.rs b/src/core.rs index 69e2b80..0ea68d9 100644 --- a/src/core.rs +++ b/src/core.rs @@ -1,3 +1,16 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + #[cfg(feature = "usubscription")] pub mod usubscription; diff --git a/src/lib.rs b/src/lib.rs index a796ba2..8b40d4e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -46,6 +46,7 @@ For user convenience, all of these modules export their types on up_rust top-lev implementations. Enabled by default. * `utwin` enables support for types required to interact with [uTwin service](https://raw.githubusercontent.com/eclipse-uprotocol/up-spec/v1.6.0-alpha.2/up-l3/utwin/v3/README.adoc) implementations. +* `util` provides some useful helper structs. In particular, provides a local, in-memory UTransport for exchanging messages within a single process. This transport is also used by the examples illustrating usage of the Communication Layer API. ## References @@ -56,6 +57,8 @@ For user convenience, all of these modules export their types on up_rust top-lev // up_core_api types used and augmented by up_rust - symbols re-exported to toplevel, errors are module-specific #[cfg(feature = "communication")] pub mod communication; +#[cfg(feature = "util")] +pub mod local_transport; mod uattributes; pub use uattributes::{ NotificationValidator, PublishValidator, RequestValidator, ResponseValidator, diff --git a/src/local_transport.rs b/src/local_transport.rs new file mode 100644 index 0000000..ff11c15 --- /dev/null +++ b/src/local_transport.rs @@ -0,0 +1,249 @@ +/******************************************************************************** + * Copyright (c) 2024 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +/*! +Provides a local UTransport which can be used for connecting uEntities running in the same +process. +*/ + +use std::{collections::HashSet, sync::Arc}; + +use tokio::sync::RwLock; + +use crate::{ComparableListener, LocalUriProvider, UListener, UMessage, UStatus, UTransport, UUri}; + +#[derive(Eq, PartialEq, Hash)] +struct RegisteredListener { + source_filter: UUri, + sink_filter: Option, + listener: ComparableListener, +} + +impl RegisteredListener { + fn matches(&self, source: &UUri, sink: Option<&UUri>) -> bool { + if !self.source_filter.matches(source) { + return false; + } + + if let Some(pattern) = &self.sink_filter { + sink.map_or(false, |candidate_sink| pattern.matches(candidate_sink)) + } else { + sink.is_none() + } + } + fn matches_msg(&self, msg: &UMessage) -> bool { + if let Some(source) = msg + .attributes + .as_ref() + .and_then(|attribs| attribs.source.as_ref()) + { + self.matches( + source, + msg.attributes + .as_ref() + .and_then(|attribs| attribs.sink.as_ref()), + ) + } else { + false + } + } + async fn on_receive(&self, msg: UMessage) { + self.listener.on_receive(msg).await + } +} + +/// A [`UTransport`] that can be used to exchange messages within a single process. +/// +/// A message sent via [`UTransport::send`] will be dispatched to all registered listeners that +/// match the message's source and sink filters. +pub struct LocalTransport { + listeners: RwLock>, + authority_name: String, + entity_id: u32, + entity_version: u8, +} + +impl LocalTransport { + pub fn new(authority_name: &str, entity_id: u32, entity_version: u8) -> Self { + LocalTransport { + listeners: RwLock::new(HashSet::new()), + authority_name: authority_name.to_string(), + entity_id, + entity_version, + } + } + async fn dispatch(&self, message: UMessage) { + let listeners = self.listeners.read().await; + for listener in listeners.iter() { + if listener.matches_msg(&message) { + listener.on_receive(message.clone()).await; + } + } + } +} + +impl LocalUriProvider for LocalTransport { + fn get_authority(&self) -> String { + self.authority_name.clone() + } + + fn get_resource_uri(&self, resource_id: u16) -> UUri { + UUri::try_from_parts( + &self.authority_name, + self.entity_id, + self.entity_version, + resource_id, + ) + .unwrap() + } + fn get_source_uri(&self) -> UUri { + self.get_resource_uri(0x0000) + } +} + +#[async_trait::async_trait] +impl UTransport for LocalTransport { + async fn send(&self, message: UMessage) -> Result<(), UStatus> { + self.dispatch(message).await; + Ok(()) + } + + async fn register_listener( + &self, + source_filter: &UUri, + sink_filter: Option<&UUri>, + listener: Arc, + ) -> Result<(), UStatus> { + let registered_listener = RegisteredListener { + source_filter: source_filter.to_owned(), + sink_filter: sink_filter.map(|u| u.to_owned()), + listener: ComparableListener::new(listener), + }; + let mut listeners = self.listeners.write().await; + if listeners.contains(®istered_listener) { + Err(UStatus::fail_with_code( + crate::UCode::ALREADY_EXISTS, + "listener already registered for filters", + )) + } else { + listeners.insert(registered_listener); + Ok(()) + } + } + + async fn unregister_listener( + &self, + source_filter: &UUri, + sink_filter: Option<&UUri>, + listener: Arc, + ) -> Result<(), UStatus> { + let registered_listener = RegisteredListener { + source_filter: source_filter.to_owned(), + sink_filter: sink_filter.map(|u| u.to_owned()), + listener: ComparableListener::new(listener), + }; + let mut listeners = self.listeners.write().await; + if listeners.remove(®istered_listener) { + Ok(()) + } else { + Err(UStatus::fail_with_code( + crate::UCode::NOT_FOUND, + "no such listener registered for filters", + )) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{utransport::MockUListener, UMessageBuilder}; + + #[tokio::test] + async fn test_send_dispatches_to_matching_listener() { + const RESOURCE_ID: u16 = 0xa1b3; + let mut listener = MockUListener::new(); + listener.expect_on_receive().once().return_const(()); + let listener_ref = Arc::new(listener); + let transport = LocalTransport::new("my-vehicle", 0x100d, 0x02); + + transport + .register_listener( + &transport.get_resource_uri(RESOURCE_ID), + None, + listener_ref.clone(), + ) + .await + .unwrap(); + let _ = transport + .send( + UMessageBuilder::publish(transport.get_resource_uri(RESOURCE_ID)) + .build() + .unwrap(), + ) + .await; + + transport + .unregister_listener(&transport.get_resource_uri(RESOURCE_ID), None, listener_ref) + .await + .unwrap(); + let _ = transport + .send( + UMessageBuilder::publish(transport.get_resource_uri(RESOURCE_ID)) + .build() + .unwrap(), + ) + .await; + } + + #[tokio::test] + async fn test_send_does_not_dispatch_to_non_matching_listener() { + const RESOURCE_ID: u16 = 0xa1b3; + let mut listener = MockUListener::new(); + listener.expect_on_receive().never().return_const(()); + let listener_ref = Arc::new(listener); + let transport = LocalTransport::new("my-vehicle", 0x100d, 0x02); + + transport + .register_listener( + &transport.get_resource_uri(RESOURCE_ID + 10), + None, + listener_ref.clone(), + ) + .await + .unwrap(); + let _ = transport + .send( + UMessageBuilder::publish(transport.get_resource_uri(RESOURCE_ID)) + .build() + .unwrap(), + ) + .await; + + transport + .unregister_listener( + &transport.get_resource_uri(RESOURCE_ID + 10), + None, + listener_ref, + ) + .await + .unwrap(); + let _ = transport + .send( + UMessageBuilder::publish(transport.get_resource_uri(RESOURCE_ID)) + .build() + .unwrap(), + ) + .await; + } +}