Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 4 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 19 additions & 12 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,29 @@
# Config file for cargo deny
# For all options see https://github.com/EmbarkStudios/cargo-deny/blob/main/deny.template.toml

[advisories]
ignore = ["RUSTSEC-2023-0071"]

[bans]
multiple-versions = "allow"

# If you add a license in the following section also consider changing about.toml
[licenses]
allow = ["Apache-2.0", "EPL-1.0", "EPL-2.0", "MIT"]
#private = { ignore = true }
exceptions = [{ name = "unicode-ident", allow = ["Unicode-3.0"] }]

# THIS IS ONLY TEMPORARY, UNTIL up-transport-socket-rust MERGES INITIAL PR AND PROPERLY DECLARES ITS LICENSE
[licenses.private]
ignore = true
ignore-sources = [
"https://github.com/eclipse-uprotocol/up-transport-socket.git",
allow = [
"Apache-2.0",
"BSD-2-Clause",
"BSD-3-Clause",
"EPL-2.0",
"ISC",
"MIT",
"MPL-2.0",
"Unicode-3.0",
"Zlib",
]
exceptions = [{ name = "ring", allow = ["OpenSSL"] }]
#private = { ignore = true }

# TEMPORARY until this can be pulled from crates.io releases
[sources]
allow-git = ["https://github.com/eclipse-uprotocol/up-transport-mqtt5-rust.git"]
[[licenses.clarify]]
name = "ring"
expression = "MIT AND ISC AND OpenSSL"
license-files = [{ path = "LICENSE", hash = 0xbd0eed23 }]
9 changes: 5 additions & 4 deletions up-subscription-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ license = false
eula = false

[features]
default = ["mqtt5"]
default = ["zenoh"]
socket = ["dep:up-transport-socket-rust"]
mqtt5 = ["dep:up-transport-mqtt5"]
zenoh = ["dep:up-transport-zenoh"]
zenoh = ["dep:up-transport-zenoh", "dep:serde_json"]

[dependencies]
async-trait = { workspace = true }
Expand All @@ -41,13 +41,14 @@ env_logger = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
protobuf = { workspace = true }
serde_json = { version = "1.0.138", optional = true }
tokio = { workspace = true }
up-rust = { workspace = true }
up-subscription = { workspace = true }
up-transport-mqtt5 = { git = "https://github.com/eclipse-uprotocol/up-transport-mqtt5-rust.git", rev = "8c15c38ecfa54e74eb1ee5570b1cc68ca34d0b8c", optional = true }
# up-transport-mqtt5 = { version = "0.4.0", optional = true }
#up-transport-mqtt5 = { version = "0.2.0", optional = true }
up-transport-socket-rust = { git = "https://github.com/eclipse-uprotocol/up-transport-socket.git", rev = "78ac7ff6acba7090a79fc36b2ddca49bd93e7188", optional = true }
up-transport-zenoh = { version = "0.4.0", optional = true }
up-transport-zenoh = { version = "0.5.0", optional = true }

[target.'cfg(unix)'.dependencies]
daemonize = { version = "0.5" }
Expand Down
13 changes: 10 additions & 3 deletions up-subscription-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ use daemonize::Daemonize;
#[cfg(feature = "mqtt5")]
use up_transport_mqtt5::MqttClientOptions;

#[cfg(feature = "zenoh")]
use crate::transport::zenoh::ZenohArgs;

mod transport;
#[cfg(feature = "mqtt5")]
use transport::get_mqtt5_transport;
Expand Down Expand Up @@ -105,7 +108,11 @@ pub(crate) struct Args {

#[cfg(feature = "mqtt5")]
#[command(flatten)]
mqtt_client_options: MqttClientOptions,
mqtt_args: MqttClientOptions,

#[cfg(feature = "zenoh")]
#[command(flatten)]
zenoh_args: ZenohArgs,
}

#[tokio::main]
Expand Down Expand Up @@ -134,7 +141,7 @@ async fn main() {
let transport: Option<Arc<dyn UTransport>> = match args.transport {
#[cfg(feature = "mqtt5")]
Transport::Mqtt5 => Some(
get_mqtt5_transport(_config.clone(), args.mqtt_client_options)
get_mqtt5_transport(_config.clone(), args.mqtt_args)
.await
.inspect_err(|e| panic!("Error setting up MQTT5 transport: {}", e.get_message()))
.unwrap(),
Expand All @@ -148,7 +155,7 @@ async fn main() {
),
#[cfg(feature = "zenoh")]
Transport::Zenoh => Some(
get_zenoh_transport(_config.clone())
get_zenoh_transport(_config.clone(), args.zenoh_args)
.await
.inspect_err(|e| panic!("Error setting up Zenoh transport: {}", e.get_message()))
.unwrap(),
Expand Down
4 changes: 2 additions & 2 deletions up-subscription-cli/src/transport/mqtt5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ use up_transport_mqtt5::{Mqtt5Transport, MqttClientOptions, TransportMode};

pub(crate) async fn get_mqtt5_transport(
uri_provider: Arc<dyn LocalUriProvider>,
client_options: MqttClientOptions,
mqtt5_args: MqttClientOptions,
) -> Result<Arc<dyn UTransport>, UStatus> {
Ok(Mqtt5Transport::new(
TransportMode::InVehicle,
client_options,
mqtt5_args,
uri_provider.get_authority(),
)
.await
Expand Down
98 changes: 80 additions & 18 deletions up-subscription-cli/src/transport/zenoh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,32 +11,94 @@
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/

use serde_json::json;
use std::sync::Arc;
use up_transport_zenoh::{zenoh_config, UPTransportZenoh};

use up_rust::{LocalUriProvider, UStatus, UTransport};

pub(crate) async fn get_zenoh_transport(
_uri_provider: Arc<dyn LocalUriProvider>,
) -> Result<Arc<dyn UTransport>, UStatus> {
// UPTransportZenoh::try_init_log_from_env();
#[derive(clap::ValueEnum, Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub(crate) enum WhatAmIType {
Peer,
Client,
Router,
}

// // Load the config from file path
// // Config Examples: https://github.com/eclipse-zenoh/zenoh/blob/0.10.1-rc/DEFAULT_CONFIG.json5
// // zenoh_config::Config::from_file(path).unwrap()
impl WhatAmIType {
const fn to_str(self) -> &'static str {
match self {
Self::Peer => "peer",
Self::Client => "client",
Self::Router => "router",
}
}
}

#[derive(clap::Parser, Clone, PartialEq, Eq, Hash, Debug)]
pub(crate) struct ZenohArgs {
#[arg(short, long)]
/// A configuration file.
config: Option<String>,
#[arg(short, long)]
/// The Zenoh session mode [default: peer].
mode: Option<WhatAmIType>,
#[arg(short = 'e', long)]
/// Endpoints to connect to.
connect: Vec<String>,
#[arg(short, long)]
/// Endpoints to listen on.
listen: Vec<String>,
#[arg(long)]
/// Disable the multicast-based scouting mechanism.
no_multicast_scouting: bool,
}

// // Loat the default config struct
// let mut zenoh_cfg = zenoh_config::Config::default();
pub fn get_zenoh_config(args: ZenohArgs) -> zenoh_config::Config {
// Load the config from file path
let mut zenoh_cfg = match &args.config {
Some(path) => zenoh_config::Config::from_file(path).unwrap(),
None => zenoh_config::Config::default(),
};

// // You can choose from Router, Peer, Client
// zenoh_cfg.insert_json5("mode", "Peer").unwrap();
// You can choose from Router, Peer, Client
if let Some(mode) = args.mode {
zenoh_cfg
.insert_json5("mode", &json!(mode.to_str()).to_string())
.unwrap();
}

// let transport = Arc::new(
// UPTransportZenoh::new(zenoh_cfg, uri_provider.get_source_uri().to_string())
// .await
// .unwrap(),
// );
// Set connection address
if !args.connect.is_empty() {
zenoh_cfg
.insert_json5("connect/endpoints", &json!(args.connect).to_string())
.unwrap();
}

// Some(transport)
// Set listener address
if !args.listen.is_empty() {
zenoh_cfg
.insert_json5("listen/endpoints", &json!(args.listen).to_string())
.unwrap();
}

todo!()
// Set multicast configuration
if args.no_multicast_scouting {
zenoh_cfg
.insert_json5("scouting/multicast/enabled", &json!(false).to_string())
.unwrap();
}

zenoh_cfg
}

pub(crate) async fn get_zenoh_transport(
uri_provider: Arc<dyn LocalUriProvider>,
zenoh_args: ZenohArgs,
) -> Result<Arc<dyn UTransport>, UStatus> {
UPTransportZenoh::try_init_log_from_env();
Ok(
UPTransportZenoh::new(get_zenoh_config(zenoh_args), uri_provider.get_source_uri())
.await
.map(Arc::new)?,
)
}
Loading