Skip to content

Commit

Permalink
Make plugins to use the same Runtime than zenohd
Browse files Browse the repository at this point in the history
  • Loading branch information
JEnoch committed Jun 15, 2020
1 parent 9b60061 commit 7cff85c
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 32 deletions.
2 changes: 1 addition & 1 deletion plugins/example-plugin/Cargo.toml
Expand Up @@ -27,11 +27,11 @@ crate-type = ["cdylib"]

[dependencies]
zenoh = { version = "0.5.0", path = "../../zenoh" }
zenoh-router = { version = "0.5.0", path = "../../zenoh-router" }
futures = "0.3.5"
clap = "2"
log = "0.4"
env_logger = "0.7.1"
# rand = "0.7.3"

[dependencies.async-std]
version = "~1.6.0"
Expand Down
13 changes: 7 additions & 6 deletions plugins/example-plugin/src/lib.rs
Expand Up @@ -20,6 +20,7 @@ use std::pin::Pin;
use futures::prelude::*;
use futures::select;
use clap::{Arg, ArgMatches};
use zenoh_router::runtime::Runtime;
use zenoh::net::*;
use zenoh::net::queryable::STORAGE;

Expand All @@ -35,20 +36,19 @@ pub fn get_expected_args<'a, 'b>() -> Vec<Arg<'a, 'b>>
}

#[no_mangle]
pub fn start<'a>(args: &'a ArgMatches<'a>, locator: &'a str) -> Pin<Box<dyn Future<Output=()> + 'a>>
pub fn start<'a>(runtime: Runtime, args: &'a ArgMatches<'a>) -> Pin<Box<dyn Future<Output=()> + 'a>>
{
// NOTES: the Future cannot be returned as such to the caller of this plugin.
// Otherwise Rust complains it cannot move it as its size is not known.
// We need to wrap it in a pinned Box.
// See https://stackoverflow.com/questions/61167939/return-an-async-function-from-a-function-in-rust
Box::pin(run(args, locator))
Box::pin(run(runtime, args))
}

async fn run(args: &ArgMatches<'_>, locator: &str) {
async fn run(runtime: Runtime, args: &ArgMatches<'_>) {
env_logger::init();
debug!("Run example-plugin, openning session to {}", locator);

let session = open(&locator, None).await.unwrap();
let session = Session::init(runtime).await;

let mut stored: HashMap<String, (RBuf, Option<RBuf>)> = HashMap::new();

Expand All @@ -59,7 +59,8 @@ async fn run(args: &ArgMatches<'_>, locator: &str) {
};

let selector: ResKey = args.value_of("storage-selector").unwrap().into();

debug!("Run example-plugin with storage-selector={}", selector);

debug!("Declaring Subscriber on {}", selector);
let mut sub = session.declare_subscriber(&selector, &sub_info).await.unwrap();

Expand Down
1 change: 1 addition & 0 deletions zenoh-router/Cargo.toml
Expand Up @@ -21,6 +21,7 @@ edition = "2018"
[dependencies]
async-trait = "0.1.31"
rand = "0.7.3"
libloading = "0.6.2"
log = "0.4"
env_logger = "0.7.1"
clap = "2"
Expand Down
26 changes: 15 additions & 11 deletions zenoh-router/src/bin/zenohd.rs
Expand Up @@ -17,6 +17,7 @@ use async_std::task;
use zenoh_protocol::link::Locator;
use zenoh_protocol::proto::whatami;
use zenoh_router::runtime::Runtime;
use zenoh_router::plugins::PluginsMgr;

fn main() {
task::block_on(async {
Expand All @@ -31,7 +32,7 @@ fn main() {
Repeat this option to connect to several peers.'"));

log::debug!("Load plugins...");
let mut plugins_mgr = zenoh_util::plugins::PluginsMgr::new();
let mut plugins_mgr = PluginsMgr::new();
plugins_mgr.search_and_load_plugins("zenoh-", ".plugin").await;
let args = app.args(&plugins_mgr.get_plugins_args()).get_matches();

Expand All @@ -41,22 +42,25 @@ fn main() {
let self_locator: Locator = args.value_of("locator").unwrap().parse().unwrap();
log::trace!("self_locator: {}", self_locator);

let orchestrator = &mut runtime.write().await.orchestrator;
{
let orchestrator = &mut runtime.write().await.orchestrator;

if let Err(_err) = orchestrator.add_acceptor(&self_locator).await {
log::error!("Unable to open listening {}!", self_locator);
std::process::exit(-1);
}
if let Err(_err) = orchestrator.add_acceptor(&self_locator).await {
log::error!("Unable to open listening {}!", self_locator);
std::process::exit(-1);
}

if args.occurrences_of("peer") > 0 {
log::debug!("Peers: {:?}", args.values_of("peer").unwrap().collect::<Vec<&str>>());
for locator in args.values_of("peer").unwrap() {
orchestrator.add_peer(&locator.parse().unwrap()).await;
if args.occurrences_of("peer") > 0 {
log::debug!("Peers: {:?}", args.values_of("peer").unwrap().collect::<Vec<&str>>());
for locator in args.values_of("peer").unwrap() {
orchestrator.add_peer(&locator.parse().unwrap()).await;
}
}
// release runtime.write() lock for plugins to use Runtime
}

log::debug!("Start plugins...");
plugins_mgr.start_plugins(&args, &format!("{}", self_locator)).await;
plugins_mgr.start_plugins(&runtime, &args).await;

future::pending::<()>().await;
});
Expand Down
1 change: 1 addition & 0 deletions zenoh-router/src/lib.rs
Expand Up @@ -15,3 +15,4 @@

pub mod routing;
pub mod runtime;
pub mod plugins;
Expand Up @@ -17,6 +17,7 @@ use libloading::{Library, Symbol};
use std::future::Future;
use std::pin::Pin;
use clap::{Arg, ArgMatches};
use super::runtime::Runtime;


pub struct PluginsMgr {
Expand Down Expand Up @@ -118,9 +119,9 @@ impl PluginsMgr {
result
}

pub async fn start_plugins(&self, args: &ArgMatches<'_>, self_locator: &str) {
pub async fn start_plugins(&self, runtime: &Runtime, args: &ArgMatches<'_>) {
for plugin in &self.plugins {
plugin.start(args, self_locator).await
plugin.start(runtime.clone(), args).await
}
}

Expand All @@ -141,7 +142,7 @@ struct Plugin {
const START_FN_NAME: &[u8; 6] = b"start\0";
const GET_ARGS_FN_NAME: &[u8; 18] = b"get_expected_args\0";

type StartFn<'lib> = Symbol<'lib, unsafe extern fn(&ArgMatches, &str) -> Pin<Box<dyn Future<Output=()>>>>;
type StartFn<'lib> = Symbol<'lib, unsafe extern fn(Runtime, &ArgMatches) -> Pin<Box<dyn Future<Output=()>>>>;
type GetArgsFn<'lib, 'a, 'b> = Symbol<'lib, unsafe extern fn() -> Vec<Arg<'a, 'b>>>;


Expand Down Expand Up @@ -175,11 +176,11 @@ impl Plugin {
}
}

pub async fn start(&self, args: &ArgMatches<'_>, self_locator: &str) {
pub async fn start(&self, runtime: Runtime, args: &ArgMatches<'_>) {
unsafe {
debug!("Call start() of plugin {}", self.name);
let start: StartFn = self.lib.get(START_FN_NAME).unwrap();
start(args, self_locator).as_mut().await;
start(runtime, args).as_mut().await;
}
}
}
1 change: 0 additions & 1 deletion zenoh-util/Cargo.toml
Expand Up @@ -23,7 +23,6 @@ edition = "2018"
[dependencies]
async-trait = "0.1.33"
lazy_static = "1.4.0"
libloading = "0.6.2"
log = "0.4.8"
rand = "0.7.3"
clap = "2"
Expand Down
1 change: 0 additions & 1 deletion zenoh-util/src/lib.rs
Expand Up @@ -18,7 +18,6 @@ extern crate lazy_static;

pub mod collections;
pub mod core;
pub mod plugins;
pub mod sync;

pub use crate::core::macros::*;
Expand Down
20 changes: 13 additions & 7 deletions zenoh/src/net/session.rs
Expand Up @@ -68,20 +68,26 @@ impl Session {
}
}

let broker = runtime.read().await.broker.clone();

let inner = Arc::new(RwLock::new(InnerSession::new()));

let session = Session{runtime, inner: inner.clone()};

inner.write().primitives = Some(broker.new_primitives(Arc::new(session.clone())).await);
let session = Self::init(runtime).await;

// Workaround for the declare_and_shoot problem
task::sleep(std::time::Duration::from_millis(200)).await;

session
}

// Initialize a Session with an existing Runtime.
// This operation is used by the plugins to share the same Runtime than the router.
#[doc(hidden)]
pub async fn init(runtime: Runtime) -> Session {
let broker = runtime.read().await.broker.clone();
let inner = Arc::new(RwLock::new(InnerSession::new()));
let session = Session{runtime, inner: inner.clone()};
inner.write().primitives = Some(broker.new_primitives(Arc::new(session.clone())).await);
session
}


pub async fn close(&self) -> ZResult<()> {
// @TODO: implement
trace!("close()");
Expand Down

0 comments on commit 7cff85c

Please sign in to comment.