Skip to content

Commit

Permalink
Merge pull request #35 from golemfactory/mf/reconnect-int
Browse files Browse the repository at this point in the history
Stop reconnecting on SIGINT
  • Loading branch information
mfranciszkiewicz committed Jun 18, 2021
2 parents 3aa73c0 + 642d573 commit 513a657
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 5 deletions.
4 changes: 3 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "ya-service-bus"
version = "0.4.8"
version = "0.4.9"
authors = ["Golem Factory <contact@golem.network>"]
edition = "2018"
homepage = "https://github.com/golemfactory/ya-service-bus"
Expand All @@ -27,7 +27,7 @@ rand = "0.7.2"
serde = { version = "1.0.102", features = ["derive"] }
serde_json = { version = "1.0.48", optional = true }
thiserror = "1.0.9"
tokio = { version = "0.2.6", features = ["tcp", "time", "io-util"] }
tokio = { version = "0.2.6", features = ["tcp", "time", "io-util", "signal"] }
tokio-util = "0.3.0"
url = "2.1.1"
semver="0.11.0"
Expand Down
20 changes: 18 additions & 2 deletions src/remote_router.rs
@@ -1,5 +1,5 @@
use actix::{prelude::*, WrapFuture};
use futures::{channel::oneshot, prelude::*, SinkExt};
use futures::{channel::oneshot, prelude::*, FutureExt, SinkExt};
use std::ops::Not;
use std::{collections::HashSet, time::Duration};

Expand All @@ -20,13 +20,28 @@ pub struct RemoteRouter {
local_bindings: HashSet<String>,
pending_calls: Vec<oneshot::Sender<Result<RemoteConnection, ConnectionTimeout>>>,
connection: Option<RemoteConnection>,
shutdown_rx: Option<oneshot::Receiver<()>>,
}

impl Actor for RemoteRouter {
type Context = Context<Self>;

fn started(&mut self, ctx: &mut Self::Context) {
self.try_connect(ctx);
if self.shutdown_rx.is_none() {
let (tx, rx) = oneshot::channel();
tokio::task::spawn_local(async move {
let _ = tokio::signal::ctrl_c().await;
let _ = tx.send(());
});
self.shutdown_rx.replace(rx);
}

if let Some(ref mut rx) = self.shutdown_rx {
match rx.try_recv() {
Ok(None) => self.try_connect(ctx),
_ => log::debug!("(re)connection interrupted"),
}
}
}
}

Expand Down Expand Up @@ -143,6 +158,7 @@ impl Default for RemoteRouter {
local_bindings: Default::default(),
pending_calls: Default::default(),
client_info: ClientInfo::new("sb-client"),
shutdown_rx: Default::default(),
}
}
}
Expand Down

0 comments on commit 513a657

Please sign in to comment.