Skip to content

Commit

Permalink
Merge pull request #18 from blckngm/shutdown-shared-boxed
Browse files Browse the repository at this point in the history
feat: stream shutdown signal
  • Loading branch information
blckngm committed Nov 28, 2023
2 parents ed531c8 + 735f7ee commit d4b22da
Showing 1 changed file with 19 additions and 2 deletions.
21 changes: 19 additions & 2 deletions src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@

use std::{sync::atomic::AtomicU64, time::Duration};

use futures_core::Stream;
use futures_util::{Sink, SinkExt, StreamExt};
use futures_core::{future::BoxFuture, Future, Stream};
use futures_util::{
future::{self, Shared},
FutureExt, Sink, SinkExt, StreamExt,
};
use jsonrpc_core::{MetaIoHandler, Metadata};
use tokio::{sync::mpsc::channel, time::Instant};

Expand All @@ -19,6 +22,7 @@ pub struct StreamServerConfig {
pub(crate) keep_alive: bool,
pub(crate) keep_alive_duration: Duration,
pub(crate) ping_interval: Duration,
pub(crate) shutdown_signal: Shared<BoxFuture<'static, ()>>,
}

impl Default for StreamServerConfig {
Expand All @@ -29,6 +33,7 @@ impl Default for StreamServerConfig {
keep_alive: false,
keep_alive_duration: Duration::from_secs(60),
ping_interval: Duration::from_secs(19),
shutdown_signal: future::pending().boxed().shared(),
}
}
}
Expand Down Expand Up @@ -86,6 +91,14 @@ impl StreamServerConfig {
self.ping_interval = ping_interval;
self
}

pub fn with_shutdown<S>(mut self, shutdown: S) -> StreamServerConfig
where
S: Future<Output = ()> + Send + 'static,
{
self.shutdown_signal = shutdown.boxed().shared();
self
}
}

pub enum StreamMsg {
Expand Down Expand Up @@ -129,6 +142,7 @@ pub async fn serve_stream_sink<E, T: Metadata + From<Session>>(
Ok::<_, E>(rpc.handle_request(&msg, session.clone().into()).await)
})
.buffer_unordered(config.pipeline_size);
let mut shutdown = config.shutdown_signal;
loop {
tokio::select! {
result = result_stream.next() => {
Expand Down Expand Up @@ -156,6 +170,9 @@ pub async fn serve_stream_sink<E, T: Metadata + From<Session>>(
_ = ping_interval.tick(), if config.keep_alive => {
sink.send(StreamMsg::Ping).await?;
}
_ = &mut shutdown => {
break;
}
}
}
Ok(())
Expand Down

0 comments on commit d4b22da

Please sign in to comment.