Skip to content
This repository has been archived by the owner on Feb 3, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1752 from holochain/fix-shutdown-deadlock
Browse files Browse the repository at this point in the history
Fix shutdown deadlock
  • Loading branch information
lucksus committed Oct 16, 2019
2 parents ea2c88c + 84914ac commit d832466
Show file tree
Hide file tree
Showing 27 changed files with 303 additions and 273 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG-UNRELEASED.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

### Fixed

- Fixed the frequent deadlocks that would occur on conductor shutdown [#1752](https://github.com/holochain/holochain-rust/pull/1752)

### Security

116 changes: 44 additions & 72 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions app_spec/test/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,5 @@ module.exports = {
}
})
return f(s_)
})

}),
}
2 changes: 1 addition & 1 deletion app_spec/test/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"devDependencies": {},
"dependencies": {
"@holochain/try-o-rama": "^0.1.1",
"@holochain/try-o-rama": "^0.1.2-beta.4",
"faucet": "0.0.1",
"json3": "^3.3.3",
"sleep": "^5.2.3",
Expand Down
4 changes: 2 additions & 2 deletions app_spec/test/run-ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ rm -fr $STORAGE
mkdir $STORAGE
if [ -z $1];
# We are directly pointing to the faucet executable because we can't use symlinks in vagrant on windows
then DIORAMA_STORAGE=$STORAGE node index.js | tee test.out~ | node_modules/faucet/bin/cmd.js || ( cat test.out~; false );
else DIORAMA_STORAGE=$STORAGE node $1;
then TRYORAMA_STORAGE=$STORAGE TRYORAMA_STRICT_CONDUCTOR_TIMEOUT=1 node index.js | tee test.out~ | node_modules/faucet/bin/cmd.js || ( cat test.out~; false );
else TRYORAMA_STORAGE=$STORAGE TRYORAMA_STRICT_CONDUCTOR_TIMEOUT=1 node $1;
fi;
1 change: 0 additions & 1 deletion conductor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,3 @@ tiny_http = "=0.6.2"
ws = "=0.8.0"
[target.'cfg(unix)'.dependencies]
signal-hook = "=0.1.10"

6 changes: 4 additions & 2 deletions conductor/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,10 @@ fn main() {

// So we're here because we received a shutdown signal.
// Let's shut down.
let mut conductor_guard = CONDUCTOR.lock().unwrap();
let conductor = std::mem::replace(&mut *conductor_guard, None);
let conductor = {
let mut conductor_guard = CONDUCTOR.lock().unwrap();
std::mem::replace(&mut *conductor_guard, None)
};
let refs = Arc::strong_count(&CONDUCTOR);
if refs == 1 {
println!("Gracefully shutting down conductor...");
Expand Down
16 changes: 5 additions & 11 deletions conductor_api/src/conductor/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,7 @@ impl Drop for Conductor {
// like during unit testing because they all use the same registered logger
// self.logger.shutdown();

if let Some(network) = self.n3h_keepalive_network.take() {
if let Err(err) = network.stop() {
println!("ERROR stopping network thread: {:?}", err);
} else {
println!("Network thread successfully stopped");
}
self.n3h_keepalive_network = None;
};
if let Some(mut network) = self.n3h_keepalive_network.take() { network.stop() }
}
}

Expand Down Expand Up @@ -582,10 +575,11 @@ impl Conductor {
.unwrap()
.context()
.unwrap()
.network()
.lock()
.as_ref()
.network_state()
.unwrap()
.network
.as_ref()
.expect("Network not initialized")
.p2p_endpoint()
}).collect();
match p2p_config.to_owned().backend_config {
Expand Down
5 changes: 2 additions & 3 deletions conductor_api/src/static_server_impls/nickel_static_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ use config::{InterfaceConfiguration, UiBundleConfiguration, UiInterfaceConfigura
use error::HolochainResult;
use holochain_core_types::error::HolochainError;
use static_file_server::{dna_connections_response, ConductorStaticFileServer, DNA_CONFIG_ROUTE};

use crossbeam_channel::{self, Sender};
use std::{
net::SocketAddr,
sync::mpsc::{self, Sender},
thread,
};

Expand Down Expand Up @@ -39,7 +38,7 @@ impl ConductorStaticFileServer for NickelStaticServer {
}

fn start(&mut self) -> HolochainResult<()> {
let (tx, rx) = mpsc::channel();
let (tx, rx) = crossbeam_channel::unbounded();

self.shutdown_signal = Some(tx);
self.running = true;
Expand Down
7 changes: 0 additions & 7 deletions core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,6 @@ impl Context {
self.state.as_ref().map(|s| s.try_read()).unwrap_or(None)
}

pub fn network(&self) -> P2pNetworkWrapper {
P2pNetworkWrapper(match self.network_state() {
Some(s) => s.network.clone(),
None => Arc::new(Mutex::new(None)),
})
}

pub fn network_state(&self) -> Option<Arc<NetworkState>> {
self.state().map(move |state| state.network())
}
Expand Down
5 changes: 4 additions & 1 deletion core/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ impl Instance {
while kill_receiver.try_recv().is_err() {
if let Ok(action_wrapper) = rx_action.recv_timeout(Duration::from_secs(1)) {
// Ping can happen often, and should be as lightweight as possible
if *action_wrapper.action() != Action::Ping {
let should_process = *action_wrapper.action() != Action::Ping;
if should_process {
state_observers = sync_self.process_action(
&action_wrapper,
state_observers,
Expand Down Expand Up @@ -336,6 +337,8 @@ impl Instance {

impl Drop for Instance {
fn drop(&mut self) {
// TODO: this is already performed in Holochain::stop explicitly,
// can we get rid of one or the other?
let _ = self.shutdown_network();
self.stop_action_loop();
self.state.write().unwrap().drop_inner_state();
Expand Down
9 changes: 5 additions & 4 deletions core/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
//! gets emitted globaly from the conductor.
use chrono::Local;
use holochain_core_types::sync::{HcMutex as Mutex};
use std::sync::{mpsc, Arc};
use std::sync::{Arc};
use crossbeam_channel;

/// trait that defines the logging functionality that holochain_core requires
pub trait Logger: Send {
Expand Down Expand Up @@ -35,8 +36,8 @@ impl Logger for TestLogger {
}
}

pub type Receiver = mpsc::Receiver<(String, String)>;
pub type Sender = mpsc::Sender<(String, String)>;
pub type Receiver = crossbeam_channel::Receiver<(String, String)>;
pub type Sender = crossbeam_channel::Sender<(String, String)>;

#[derive(Clone)]
pub struct ChannelLogger {
Expand All @@ -55,7 +56,7 @@ impl ChannelLogger {
ChannelLogger { id, sender }
}
pub fn setup() -> (Sender, Receiver) {
mpsc::channel()
crossbeam_channel::unbounded()
}
}
pub fn default_handler(msg: String) {
Expand Down
1 change: 1 addition & 0 deletions core/src/network/actions/get_validation_package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ impl Future for GetValidationPackageFuture {
{
return Poll::Ready(Err(err));
}

if let Some(state) = self.context.try_state() {
let state = state.network();
if let Err(error) = state.initialized() {
Expand Down
2 changes: 1 addition & 1 deletion core/src/network/actions/initialize_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl Future for InitNetworkFuture {
//
cx.waker().clone().wake();
if let Some(state) = self.context.try_state() {
if state.network().network.lock().unwrap().is_some()
if state.network().network.is_some()
&& state.network().dna_address.is_some()
&& state.network().agent_id.is_some()
{
Expand Down
36 changes: 20 additions & 16 deletions core/src/network/actions/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ use crate::{
use crossbeam_channel::Sender;
use futures::{future::Future, task::Poll};

use holochain_core_types::{error::{HcResult, HolochainError}, sync::{HcRwLock as RwLock},};
use holochain_core_types::{
error::{HcResult, HolochainError},
sync::HcRwLock as RwLock,
};

use crate::state::StateWrapper;
use std::{
pin::Pin,
sync::{Arc},
};
use std::{pin::Pin, sync::Arc};

/// Shutdown the network
/// This tells the network to untrack this instance and then stops the network thread
Expand Down Expand Up @@ -39,16 +39,20 @@ impl Future for ShutdownFuture {
type Output = HcResult<()>;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context) -> Poll<Self::Output> {
let state = self.state.read().unwrap().network();
if state.network.lock().unwrap().is_some() {
//
// TODO: connect the waker to state updates for performance reasons
// See: https://github.com/holochain/holochain-rust/issues/314
//
cx.waker().clone().wake();
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
self.state
.try_read()
.map(|state| {
if state.network().network.is_some() {
//
// TODO: connect the waker to state updates for performance reasons
// See: https://github.com/holochain/holochain-rust/issues/314
//
cx.waker().clone().wake();
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
})
.unwrap_or(Poll::Pending)
}
}
17 changes: 10 additions & 7 deletions core/src/network/reducers/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ use crate::{
};
use holochain_net::{connection::net_connection::NetSend, p2p_network::P2pNetwork};
use lib3h_protocol::{data_types::SpaceData, protocol_client::Lib3hClientProtocol, Address};
use log::error;

pub fn reduce_init(state: &mut NetworkState, root_state: &State, action_wrapper: &ActionWrapper) {
let action = action_wrapper.action();
let network_settings = unwrap_to!(action => Action::InitNetwork);
let network = P2pNetwork::new(
let mut network = P2pNetwork::new(
network_settings.handler.clone(),
network_settings.p2p_config.clone(),
Some(Address::from(network_settings.agent_id.clone())),
Expand All @@ -35,15 +36,17 @@ pub fn reduce_init(state: &mut NetworkState, root_state: &State, action_wrapper:
agent_id: network_settings.agent_id.clone().into(),
});

let mut network_lock = state.network.lock().unwrap();
*network_lock = Some(network);

state.dna_address = Some(network_settings.dna_address.clone());
state.agent_id = Some(network_settings.agent_id.clone());

if let Err(err) = network_lock.as_mut().unwrap().send(json) {
println!("Could not send JsonProtocol::TrackDna. Error: {:?}", err);
println!("Failed to initialize network!");
let _ = network_lock.take().unwrap().stop();
if let Err(err) = network.send(json) {
error!("Could not send JsonProtocol::TrackDna. Error: {:?}", err);
error!("Failed to initialize network!");
network.stop();
state.network = None;
} else {
state.network = Some(network);
}
}

Expand Down
5 changes: 1 addition & 4 deletions core/src/network/reducers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,9 @@ pub fn send(
) -> Result<(), HolochainError> {
network_state
.network
.lock()
.unwrap()
.as_mut()
.map(|network| {
network
.send(msg)
network.send(msg)
.map_err(|error| HolochainError::IoError(error.to_string()))
})
.ok_or_else(|| HolochainError::ErrorGeneric("Network not initialized".to_string()))?
Expand Down
17 changes: 7 additions & 10 deletions core/src/network/reducers/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
use holochain_net::connection::net_connection::NetSend;

use lib3h_protocol::{data_types::SpaceData, protocol_client::Lib3hClientProtocol};

use log::error;
use std::{thread::sleep, time::Duration};

pub fn reduce_shutdown(
Expand All @@ -34,19 +34,16 @@ pub fn reduce_shutdown(
.into(),
});

let mut network_lock = state.network.lock().unwrap();

{
let network = network_lock
.as_mut()
.expect("Tried to shutdown uninitialized network");
if let Some(mut network) = state.network.take() {
let _ = network.send(json);

sleep(Duration::from_secs(2));
}

if let Err(err) = network_lock.take().unwrap().stop() {
println!("ERROR stopping network thread: {:?}", err);
network.stop();
} else {
println!("Network thread successfully stopped");
error!("Tried to shutdown uninitialized network");
}


}
9 changes: 4 additions & 5 deletions core/src/network/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ use crate::{
network::{actions::ActionResponse, direct_message::DirectMessage,query::NetworkQueryResult},
};
use boolinator::*;
use holochain_core_types::{error::HolochainError, validation::ValidationPackage, sync::{HcMutex as Mutex}};
use holochain_core_types::{error::HolochainError, validation::ValidationPackage};
use holochain_net::p2p_network::P2pNetwork;
use holochain_persistence_api::cas::content::Address;
use snowflake;
use std::{
collections::HashMap,
sync::{Arc},
};

type Actions = HashMap<ActionWrapper, ActionResponse>;
Expand All @@ -30,7 +29,7 @@ pub struct NetworkState {
// @TODO this will blow up memory, implement as some kind of dropping/FIFO with a limit?
// @see https://github.com/holochain/holochain-rust/issues/166
pub actions: Actions,
pub network: Arc<Mutex<Option<P2pNetwork>>>,
pub network: Option<P2pNetwork>,
pub dna_address: Option<Address>,
pub agent_id: Option<String>,

Expand Down Expand Up @@ -60,7 +59,7 @@ impl NetworkState {
pub fn new() -> Self {
NetworkState {
actions: HashMap::new(),
network: Arc::new(Mutex::new(None)),
network: None,
dna_address: None,
agent_id: None,
get_query_results: HashMap::new(),
Expand All @@ -77,7 +76,7 @@ impl NetworkState {
}

pub fn initialized(&self) -> Result<(), HolochainError> {
(self.network.lock().unwrap().is_some()
(self.network.is_some()
&& self.dna_address.is_some()
&& self.agent_id.is_some())
.ok_or(HolochainError::ErrorGeneric(
Expand Down
3 changes: 2 additions & 1 deletion core/src/nucleus/ribosome/api/call.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ pub mod tests {
},
workflows::author_entry::author_entry,
};
use crossbeam_channel::RecvTimeoutError;
use holochain_core_types::{
dna::{
capabilities::CapabilityRequest,
Expand All @@ -198,7 +199,7 @@ pub mod tests {
use serde_json;
use std::{
collections::BTreeMap,
sync::{mpsc::RecvTimeoutError, Arc},
sync::{Arc},
};
use test_utils::create_test_dna_with_defs;

Expand Down

0 comments on commit d832466

Please sign in to comment.