Skip to content

Commit

Permalink
Add wake-up channel for server allowing shutdown
Browse files Browse the repository at this point in the history
This commit has the server_socket.accept() be in a select!
statement along with a mpsc channel. By signalling the channel
a client loop can stop the server.

Right now this is only useful for simplying the integration test
as it allows ending the server in a different way. Later it will
be useful for error handling for errors that are fatal and require
the whole server to shut down.
  • Loading branch information
chris-belcher committed Jun 17, 2021
1 parent fd53dd9 commit 3eb9ae9
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 21 deletions.
8 changes: 0 additions & 8 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,21 +358,13 @@ mod test {
}

async fn kill_maker(addr: &str) {
// Need to connect twice by a delay to stop maker
// The outer loop in [maker_protocol::run()] iterates
// immediately upon connecting a client,
// The first iteration doesn't register kill signal
// Signal registers in the 2nd iteration when a new client connects
{
let mut stream = tokio::net::TcpStream::connect(addr).await.unwrap();
let (_, mut writer) = stream.split();

writer.write_all(b"kill").await.unwrap();
}
thread::sleep(time::Duration::from_secs(5));
{
tokio::net::TcpStream::connect(addr).await.unwrap();
}
}

// This test requires a bitcoin regtest node running in local machine with a
Expand Down
29 changes: 16 additions & 13 deletions src/maker_protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ use tokio::io::BufReader;
use tokio::net::TcpListener;
use tokio::prelude::*;

use tokio::select;
use tokio::sync::mpsc;

use serde_json::Value;

use bitcoin::hashes::{hash160::Hash as Hash160, Hash};
Expand Down Expand Up @@ -64,20 +67,20 @@ async fn run(rpc: Arc<Client>, wallet: Arc<RwLock<Wallet>>, port: u16) -> Result
let listener = TcpListener::bind((Ipv4Addr::LOCALHOST, port)).await?;
println!("listening on port {}", port);

#[cfg(test)]
let shutdown = Arc::new(RwLock::new(false));

let (server_end_tx, mut server_end_rx) = mpsc::channel::<()>(100);
loop {
#[cfg(test)]
let c_shutdown = Arc::clone(&shutdown);
#[cfg(test)]
if *c_shutdown.read().unwrap() {
return Ok(());
let server_end_tx = server_end_tx.clone();
let new_client = select! {
_ = server_end_rx.recv() => None,
a = listener.accept() => Some(a?)
};
if new_client.is_none() {
println!("got signal to end server");
break;
}

let (mut socket, address) = listener.accept().await?;
println!("accepted connection from {:?}", address);

let (mut socket, addr) = new_client.unwrap();
println!("accepted connection from {:?}", addr);
let client_rpc = Arc::clone(&rpc);
let client_wallet = Arc::clone(&wallet);

Expand Down Expand Up @@ -116,9 +119,8 @@ async fn run(rpc: Arc<Client>, wallet: Arc<RwLock<Wallet>>, port: u16) -> Result
};
#[cfg(test)]
if line == "kill".to_string() {
server_end_tx.send(()).await.unwrap();
println!("Kill signal received, stopping maker....");
let mut w = c_shutdown.write().unwrap();
*w = true;
break;
}

Expand All @@ -145,6 +147,7 @@ async fn run(rpc: Arc<Client>, wallet: Arc<RwLock<Wallet>>, port: u16) -> Result
}
});
}
Ok(())
}

fn handle_message(
Expand Down

0 comments on commit 3eb9ae9

Please sign in to comment.