Skip to content
This repository has been archived by the owner on Feb 3, 2023. It is now read-only.

Commit

Permalink
Merge pull request #2159 from holochain/key-fixes-for-validation
Browse files Browse the repository at this point in the history
Key fixes for validation
  • Loading branch information
zippy committed Mar 20, 2020
2 parents 6d56e52 + 79b3017 commit a6bb773
Show file tree
Hide file tree
Showing 16 changed files with 90 additions and 47 deletions.
2 changes: 2 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ workflows:
only:
- develop
- final-exam
- key-fixes-for-validation
- docker-build-trycp-server:
requires:
- docker-build-minimal
Expand All @@ -284,6 +285,7 @@ workflows:
only:
- develop
- final-exam
- key-fixes-for-validation
- docker-build-circle-build:
requires:
- docker-build-latest
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion crates/cli/src/cli/sim2h_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub fn sim2h_client(url_string: String, message_string: String) -> Result<(), St
println!("Got response => {:?}", msg);
break;
}
WireMessage::DebugResponse(debug_response_map) => {
WireMessage::DebugResponse((debug_response_map, extra_debug_data)) => {
println!("Got DebugResponse for {} spaces.", debug_response_map.len());
for (space, json) in debug_response_map {
let filename = format!("{}.json", space);
Expand All @@ -85,6 +85,7 @@ pub fn sim2h_client(url_string: String, message_string: String) -> Result<(), St
.write_all(json.into_bytes().as_slice())
.expect("Could not write to file!");
}
eprintln!("Extra Debug Data:\n{}", extra_debug_data);
break;
}
_ => println!("{:?}", msg),
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ env_logger = "=0.6.1"
url = { version = "=2.1.0", features = ["serde"] }
rand = "=0.7.3"
threadpool = "=1.7.1"
tracing = "=0.1.13"
im = { version = "=14.0.0", features = ["serde"] }
itertools = "0.8.2"
newrelic = { version = "=0.2.2", optional = true }
Expand Down
17 changes: 13 additions & 4 deletions crates/core/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,19 +181,19 @@ pub enum Action {

/// Makes the network module DM the source of the given entry
/// and prepare for receiveing an answer
GetValidationPackage(ChainHeader),
GetValidationPackage((ValidationKey, ChainHeader)),

/// Makes the get validation request with the given ID timeout by adding an
/// Err(HolochainError::Timeout) to NetworkState::get_validation_package_results.
GetValidationPackageTimeout(Address),
GetValidationPackageTimeout(ValidationKey),

/// Updates the state to hold the response that we got for
/// our previous request for a validation package.
/// Triggered from the network handler when we get the response.
HandleGetValidationPackage((Address, Option<ValidationPackage>)),
HandleGetValidationPackage((ValidationKey, Option<ValidationPackage>)),

/// Clean up the validation package result so the state doesn't grow indefinitely.
ClearValidationPackageResult(Address),
ClearValidationPackageResult(ValidationKey),

/// Updates the state to hold the response that we got for
/// our previous custom direct message.
Expand Down Expand Up @@ -275,6 +275,15 @@ pub struct GetEntryKey {
pub id: String,
}

#[derive(Clone, PartialEq, Eq, Hash, Debug, Serialize, Deserialize)]
pub struct ValidationKey {
/// The address of the entry to get the package for
pub address: Address,

/// A unique ID that is used to pair the eventual result to this request
pub id: String,
}

/// Everything the network module needs to know in order to send a
/// direct message.
#[derive(Clone, PartialEq, Debug, Serialize)]
Expand Down
20 changes: 10 additions & 10 deletions crates/core/src/network/actions/get_validation_package.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use crate::{
action::{Action, ActionWrapper},
action::{Action, ActionWrapper, ValidationKey},
context::Context,
instance::dispatch_action,
};
use futures::{future::Future, task::Poll};

use holochain_persistence_api::cas::content::Address;

use holochain_core_types::{
chain_header::ChainHeader, error::HcResult, validation::ValidationPackage,
};
Expand All @@ -25,11 +23,15 @@ pub async fn get_validation_package(
context: &Arc<Context>,
) -> HcResult<Option<ValidationPackage>> {
let entry_address = header.entry_address().clone();
let action_wrapper = ActionWrapper::new(Action::GetValidationPackage(header));
let key = ValidationKey {
address: entry_address,
id: snowflake::ProcessUniqueId::new().to_string(),
};
let action_wrapper = ActionWrapper::new(Action::GetValidationPackage((key.clone(), header)));
dispatch_action(context.action_channel(), action_wrapper.clone());
GetValidationPackageFuture {
context: context.clone(),
address: entry_address,
key,
}
.await
}
Expand All @@ -39,7 +41,7 @@ pub async fn get_validation_package(
/// is not the source.
pub struct GetValidationPackageFuture {
context: Arc<Context>,
address: Address,
key: ValidationKey,
}

#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)]
Expand All @@ -66,13 +68,11 @@ impl Future for GetValidationPackageFuture {
return Poll::Ready(Err(error));
}

match state.get_validation_package_results.get(&self.address) {
match state.get_validation_package_results.get(&self.key) {
Some(Some(result)) => {
dispatch_action(
self.context.action_channel(),
ActionWrapper::new(Action::ClearValidationPackageResult(
self.address.clone(),
)),
ActionWrapper::new(Action::ClearValidationPackageResult(self.key.clone())),
);
Poll::Ready(result.clone())
}
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/network/direct_message.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use holochain_persistence_api::cas::content::Address;
use crate::action::ValidationKey;

use holochain_json_api::{error::JsonError, json::JsonString};

Expand Down Expand Up @@ -28,7 +28,7 @@ pub enum DirectMessage {

/// This message is used to ask another node (which needs to
/// be the author) for the validation package of a given entry.
RequestValidationPackage(Address),
RequestValidationPackage(ValidationKey),

/// With this message an author is responding to a
/// RequestValidationPackage message.
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/network/handler/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ pub fn handle_send_message(message_data: DirectMessageData, context: Arc<Context
let future = closure();
context.spawn_task(future);
}
DirectMessage::RequestValidationPackage(address) => {
DirectMessage::RequestValidationPackage(key) => {
context.spawn_task({
let context = context.clone();
async move || {
respond_validation_package_request(
message_data.from_agent_id.into(),
message_data.request_id,
address,
key.address,
context,
vec![],
);
Expand Down
5 changes: 3 additions & 2 deletions crates/core/src/network/reducers/clear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ pub fn reduce_clear_validation_package_result(
action_wrapper: &ActionWrapper,
) {
let action = action_wrapper.action();
let address = unwrap_to!(action => Action::ClearValidationPackageResult);
let key = unwrap_to!(action => Action::ClearValidationPackageResult);

network_state.get_validation_package_results.remove(address);
network_state.get_validation_package_results.remove(key);
network_state.get_validation_package_timeouts.remove(key);
}
#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)]
pub fn reduce_clear_custom_send_response(
Expand Down
30 changes: 16 additions & 14 deletions crates/core/src/network/reducers/get_validation_package.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
action::ActionWrapper,
action::{ActionWrapper, ValidationKey},
network::{direct_message::DirectMessage, reducers::send_message, state::NetworkState},
state::State,
};
Expand All @@ -11,15 +11,19 @@ use std::time::{Duration, SystemTime};
const GET_VALIDATION_PACKAGE_MESSAGE_TIMEOUT_MS: u64 = 10000;

#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)]
fn inner(network_state: &mut NetworkState, header: &ChainHeader) -> Result<(), HolochainError> {
fn inner(
network_state: &mut NetworkState,
header: &ChainHeader,
key: ValidationKey,
) -> Result<(), HolochainError> {
network_state.initialized()?;

let source_address = &header
.provenances()
.first()
.ok_or_else(|| HolochainError::ErrorGeneric("No source found in ChainHeader".to_string()))?
.source();
let direct_message = DirectMessage::RequestValidationPackage(header.entry_address().clone());
let direct_message = DirectMessage::RequestValidationPackage(key);

send_message(network_state, source_address, direct_message)
}
Expand All @@ -30,25 +34,25 @@ pub fn reduce_get_validation_package(
action_wrapper: &ActionWrapper,
) {
let action = action_wrapper.action();
let header = unwrap_to!(action => crate::action::Action::GetValidationPackage);
let entry_address = header.entry_address().clone();
let (key, header) = unwrap_to!(action => crate::action::Action::GetValidationPackage);

let result = match inner(network_state, header) {
let result = match inner(network_state, header, key.clone()) {
Ok(()) => None,
Err(err) => Some(Err(err)),
};

network_state
.get_validation_package_results
.insert(entry_address.clone(), result);
.insert(key.clone(), result);

let timeout = (
SystemTime::now(),
Duration::from_millis(GET_VALIDATION_PACKAGE_MESSAGE_TIMEOUT_MS),
);
tracing::debug!(new_val_pack = ?key);
network_state
.get_validation_package_timeouts
.insert(entry_address, timeout);
.insert(key.clone(), timeout);
}
#[holochain_tracing_macros::newrelic_autotrace(HOLOCHAIN_CORE)]
pub fn reduce_get_validation_package_timeout(
Expand All @@ -57,19 +61,17 @@ pub fn reduce_get_validation_package_timeout(
action_wrapper: &ActionWrapper,
) {
let action = action_wrapper.action();
let address = unwrap_to!(action => crate::action::Action::GetValidationPackageTimeout);
let key = unwrap_to!(action => crate::action::Action::GetValidationPackageTimeout);

network_state
.get_validation_package_timeouts
.remove(address);
network_state.get_validation_package_timeouts.remove(key);

if let Some(Some(_)) = network_state.get_validation_package_results.get(address) {
if let Some(Some(_)) = network_state.get_validation_package_results.get(key) {
// A result already came back from the network so don't overwrite it
return;
}

network_state.get_validation_package_results.insert(
address.clone(),
key.clone(),
Some(Err(HolochainError::Timeout(format!(
"timeout src: {}:{}",
file!(),
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/network/state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
action::{ActionWrapper, QueryKey},
action::{ActionWrapper, QueryKey, ValidationKey},
network::{actions::Response, direct_message::DirectMessage, query::NetworkQueryResult},
};
use boolinator::*;
Expand Down Expand Up @@ -37,8 +37,8 @@ pub struct NetworkState {

/// Here we store the results of get validation package processes.
/// None means that we are still waiting for a result from the network.
pub get_validation_package_results: HashMap<Address, GetValidationPackageResult>,
pub get_validation_package_timeouts: HashMap<Address, (SystemTime, Duration)>,
pub get_validation_package_results: HashMap<ValidationKey, GetValidationPackageResult>,
pub get_validation_package_timeouts: HashMap<ValidationKey, (SystemTime, Duration)>,

/// This stores every open (= waiting for response) node-to-node messages.
/// Entries get removed when we receive an answer through Action::ResolveDirectConnection.
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/scheduled_jobs/timeouts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ pub fn check_network_processes_for_timeouts(context: Arc<Context>) {
}
}

for (address, (time, duration)) in state.network().get_validation_package_timeouts.iter() {
for (key, (time, duration)) in state.network().get_validation_package_timeouts.iter() {
if let Ok(elapsed) = time.elapsed() {
if elapsed > *duration {
dispatch_action(
context.action_channel(),
ActionWrapper::new(Action::GetValidationPackageTimeout(address.clone())),
ActionWrapper::new(Action::GetValidationPackageTimeout(key.clone())),
);
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/state_dump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl From<Arc<Context>> for StateDump {
.get_validation_package_results
.into_iter()
.filter(|(_, result)| result.is_none())
.map(|(address, _)| address)
.map(|(key, _)| key.address)
.collect();

let direct_message_flows: Vec<(String, DirectMessage)> = network
Expand Down
30 changes: 27 additions & 3 deletions crates/sim2h/src/connection_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ enum ConMgrCommand {
Connect(Lib3hUri, TcpWss),
SendData(Lib3hUri, WsFrame),
Disconnect(Lib3hUri),
ListConnections(tokio::sync::oneshot::Sender<Vec<Lib3hUri>>),
}

type EvtSend = tokio::sync::mpsc::UnboundedSender<ConMgrEvent>;
Expand Down Expand Up @@ -88,6 +89,7 @@ fn process_control_cmds(cmd_info: &mut CmdInfo) -> Loop {
return Loop::Break;
}
ConMgrCommand::Connect(_, _) => unreachable!(),
ConMgrCommand::ListConnections(_) => unreachable!(),
}
}
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
Expand Down Expand Up @@ -141,6 +143,8 @@ fn process_websocket_frames(cmd_info: &mut CmdInfo) -> Loop {
return Loop::Continue;
}

#[allow(clippy::complexity)]
#[instrument(skip(uri, wss, evt_send, cmd_recv))]
/// internal websocket polling loop
async fn wss_task(uri: Lib3hUri, wss: TcpWss, evt_send: EvtSend, cmd_recv: CmdRecv) {
// TODO - this should be done with tokio tcp streams && selecting
Expand All @@ -157,13 +161,11 @@ async fn wss_task(uri: Lib3hUri, wss: TcpWss, evt_send: EvtSend, cmd_recv: CmdRe
frame: None,
};
'wss_task_loop: loop {
let span = debug_span!("wss_task");
let _g = span.enter();
cmd_info.did_work = false;
cmd_info.cmd_count = 0;
cmd_info.read_count = 0;
cmd_info.frame = None;

trace!("start");
let loop_start = std::time::Instant::now();

// first, process a batch of control commands
Expand All @@ -187,8 +189,10 @@ async fn wss_task(uri: Lib3hUri, wss: TcpWss, evt_send: EvtSend, cmd_recv: CmdRe
// if we did work we might have more work to do,
// if not, let this task get parked for a time
if cmd_info.did_work {
trace!("did work");
tokio::task::yield_now().await;
} else {
trace!("did no work");
tokio::time::delay_for(std::time::Duration::from_millis(5)).await;
}
}
Expand Down Expand Up @@ -321,6 +325,9 @@ impl ConnectionMgr {
ConMgrCommand::Connect(uri, wss) => {
self.handle_connect_data(uri, wss);
}
ConMgrCommand::ListConnections(respond) => {
let _ = respond.send(self.wss_map.keys().cloned().collect());
}
}
}
Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
Expand Down Expand Up @@ -475,4 +482,21 @@ impl ConnectionMgrHandle {
error!("failed to send on channel - shutting down? {:?}", e);
}
}

#[tracing::instrument(skip(self))]
/// disconnect and forget about a managed websocket connection
pub async fn list_connections(&self) -> Vec<Lib3hUri> {
let (s, r) = tokio::sync::oneshot::channel();
if let Err(e) = self.send_cmd.send(ConMgrCommand::ListConnections(s)) {
error!("failed to send on channel - shutting down? {:?}", e);
return vec![];
}
match r.await {
Ok(v) => v,
Err(e) => {
tracing::error!("{:?}", e);
vec![]
}
}
}
}
Loading

0 comments on commit a6bb773

Please sign in to comment.