Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ communication = ["usubscription", "dep:thiserror", "tokio/sync", "tokio/time"]
udiscovery = []
usubscription = []
utwin = []
util = ["tokio/sync"]

[dependencies]
async-trait = { version = "0.1" }
Expand Down Expand Up @@ -65,6 +66,7 @@ test-case = { version = "3.3" }
tokio = { version = "1.35", default-features = false, features = [
"macros",
"rt",
"rt-multi-thread",
"sync",
"time",
] }
Expand All @@ -76,3 +78,15 @@ codegen-units = 1

[package.metadata.docs.rs]
all-features = true

[[example]]
name = "simple_notify"
required-features = ["util"]

[[example]]
name = "simple_publish"
required-features = ["util"]

[[example]]
name = "simple_rpc"
required-features = ["util"]
62 changes: 62 additions & 0 deletions examples/simple_notify.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/********************************************************************************
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use std::sync::Arc;

use protobuf::well_known_types::wrappers::StringValue;
use up_rust::{
communication::{CallOptions, Notifier, SimpleNotifier, UPayload},
local_transport::LocalTransport,
LocalUriProvider, UListener, UMessage,
};

struct ConsolePrinter {}

#[async_trait::async_trait]
impl UListener for ConsolePrinter {
async fn on_receive(&self, msg: UMessage) {
if let Ok(payload) = msg.extract_protobuf::<StringValue>() {
println!("received notification: {}", payload.value);
}
}
}

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
const ORIGIN_RESOURCE_ID: u16 = 0xd100;

let transport = Arc::new(LocalTransport::new("my-vehicle", 0xa34b, 0x01));
let uri_provider: Arc<dyn LocalUriProvider> = transport.clone();
let notifier = SimpleNotifier::new(transport.clone(), uri_provider.clone());
let topic = uri_provider.get_resource_uri(ORIGIN_RESOURCE_ID);
let listener = Arc::new(ConsolePrinter {});

notifier.start_listening(&topic, listener.clone()).await?;

let value = StringValue {
value: "Hello".to_string(),
..Default::default()
};
let payload = UPayload::try_from_protobuf(value)?;
notifier
.notify(
ORIGIN_RESOURCE_ID,
&uri_provider.get_source_uri(),
CallOptions::for_notification(None, None, None),
Some(payload),
)
.await?;

notifier.stop_listening(&topic, listener).await?;
Ok(())
}
72 changes: 72 additions & 0 deletions examples/simple_publish.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/********************************************************************************
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use std::sync::Arc;

use protobuf::well_known_types::wrappers::StringValue;
use up_rust::{
communication::{CallOptions, Publisher, SimplePublisher, UPayload},
local_transport::LocalTransport,
LocalUriProvider, UListener, UMessage, UTransport,
};

struct ConsolePrinter {}

#[async_trait::async_trait]
impl UListener for ConsolePrinter {
async fn on_receive(&self, msg: UMessage) {
if let Ok(payload) = msg.extract_protobuf::<StringValue>() {
println!("received event: {}", payload.value);
}
}
}

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
const ORIGIN_RESOURCE_ID: u16 = 0xb4c1;
let transport = Arc::new(LocalTransport::new("my-vehicle", 0xa34b, 0x01));
let uri_provider: Arc<dyn LocalUriProvider> = transport.clone();
let publisher = SimplePublisher::new(transport.clone(), uri_provider.clone());
let listener = Arc::new(ConsolePrinter {});

transport
.register_listener(
&uri_provider.get_resource_uri(ORIGIN_RESOURCE_ID),
None,
listener.clone(),
)
.await?;

let value = StringValue {
value: "Hello".to_string(),
..Default::default()
};
let payload = UPayload::try_from_protobuf(value)?;
publisher
.publish(
ORIGIN_RESOURCE_ID,
CallOptions::for_publish(None, None, None),
Some(payload),
)
.await?;

transport
.unregister_listener(
&uri_provider.get_resource_uri(ORIGIN_RESOURCE_ID),
None,
listener,
)
.await?;

Ok(())
}
109 changes: 109 additions & 0 deletions examples/simple_rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/********************************************************************************
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

/*!
This example illustrates how client code can use the _Communication Level API_ to invoke a
service operation. It also shows how the corresponding service provider can be implemented.
*/
use std::sync::Arc;

use protobuf::well_known_types::wrappers::StringValue;
use up_rust::{
communication::{
CallOptions, InMemoryRpcClient, InMemoryRpcServer, RequestHandler, RpcClient, RpcServer,
ServiceInvocationError, UPayload,
},
local_transport::LocalTransport,
LocalUriProvider,
};

struct EchoOperation {}

#[async_trait::async_trait]
impl RequestHandler for EchoOperation {
async fn invoke_method(
&self,
_resource_id: u16,
request_payload: Option<UPayload>,
) -> Result<Option<UPayload>, ServiceInvocationError> {
if let Some(req_payload) = request_payload {
Ok(Some(req_payload))
} else {
Err(ServiceInvocationError::InvalidArgument(
"request has no payload".to_string(),
))
}
}
}

#[tokio::main]
pub async fn main() -> Result<(), Box<dyn std::error::Error>> {
const METHOD_RESOURCE_ID: u16 = 0x00a0;
let transport = Arc::new(LocalTransport::new("my-vehicle", 0xa34b, 0x01));
let uri_provider: Arc<dyn LocalUriProvider> = transport.clone();

// create the RpcServer using the local transport
let rpc_server = InMemoryRpcServer::new(transport.clone(), uri_provider.clone());
// and register an endpoint for the service operation
let echo_op = Arc::new(EchoOperation {});

rpc_server
.register_endpoint(None, METHOD_RESOURCE_ID, echo_op.clone())
.await?;

// now create an RpcClient attached to the same local transport
let rpc_client = InMemoryRpcClient::new(transport.clone(), uri_provider.clone()).await?;
// and invoke the service operation without any payload
match rpc_client
.invoke_method(
uri_provider.get_resource_uri(METHOD_RESOURCE_ID),
CallOptions::for_rpc_request(1_000, None, None, None),
None, // no payload
)
.await
{
Err(ServiceInvocationError::InvalidArgument(msg)) => {
println!("service returned expected error: {}", msg)
}
_ => panic!("expected service to return an Invalid Argument error"),
}

// now invoke the operaiton with a message in the request payload
let value = StringValue {
value: "Hello".to_string(),
..Default::default()
};
let payload = UPayload::try_from_protobuf(value)?;
// and make sure that the response contains a message in the payload
match rpc_client
.invoke_method(
uri_provider.get_resource_uri(METHOD_RESOURCE_ID),
CallOptions::for_rpc_request(1_000, None, None, None),
Some(payload),
)
.await
{
Ok(Some(payload)) => {
let value = payload.extract_protobuf::<StringValue>()?;
println!("service returned message: {}", value.value);
}
_ => panic!("expected service to return response message"),
}

// and finally unregister the endpoint
rpc_server
.unregister_endpoint(None, METHOD_RESOURCE_ID, echo_op)
.await?;

Ok(())
}
13 changes: 13 additions & 0 deletions src/core.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
/********************************************************************************
* Copyright (c) 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

#[cfg(feature = "usubscription")]
pub mod usubscription;

Expand Down
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ For user convenience, all of these modules export their types on up_rust top-lev
implementations. Enabled by default.
* `utwin` enables support for types required to interact with [uTwin service](https://raw.githubusercontent.com/eclipse-uprotocol/up-spec/v1.6.0-alpha.2/up-l3/utwin/v3/README.adoc)
implementations.
* `util` provides some useful helper structs. In particular, provides a local, in-memory UTransport for exchanging messages within a single process. This transport is also used by the examples illustrating usage of the Communication Layer API.

## References

Expand All @@ -56,6 +57,8 @@ For user convenience, all of these modules export their types on up_rust top-lev
// up_core_api types used and augmented by up_rust - symbols re-exported to toplevel, errors are module-specific
#[cfg(feature = "communication")]
pub mod communication;
#[cfg(feature = "util")]
pub mod local_transport;
mod uattributes;
pub use uattributes::{
NotificationValidator, PublishValidator, RequestValidator, ResponseValidator,
Expand Down
Loading