Skip to content
This repository has been archived by the owner on Feb 3, 2023. It is now read-only.

Node-to-node send/receive #746

Merged
merged 35 commits into from
Dec 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
b53bf2b
"receive" added to define_zome! macro
lucksus Dec 11, 2018
15f07e7
Adjust hdk::send signature
lucksus Dec 11, 2018
258a699
Integration test for send/receive in hdk
lucksus Dec 11, 2018
8caf8b8
hdk::send() with SendArgs
lucksus Dec 11, 2018
af4645e
CustomDirectMessage
lucksus Dec 11, 2018
9631a0d
Network::actions::custom_send
lucksus Dec 11, 2018
4fdbdaa
hc_send -> invoke_send() -> network::actions::custom_send
lucksus Dec 11, 2018
9075ce8
ribosome::callback::receive and CallbackResult::Receive(String)
lucksus Dec 11, 2018
e07dd1a
handle_custom_direct_message workflow and usage in network handler
lucksus Dec 11, 2018
3a00a57
Handling of custom send result
lucksus Dec 11, 2018
75eaf8b
Fix invoke_send to return a result
lucksus Dec 11, 2018
a23a846
Fix serialization/wrapping of payload
lucksus Dec 11, 2018
70a464f
println--
lucksus Dec 11, 2018
af515cc
println--
lucksus Dec 11, 2018
f9bc8d2
rustfmt
lucksus Dec 11, 2018
20c46f1
Adjust agent_id fixture
lucksus Dec 12, 2018
8c29cb6
Fix some tests
lucksus Dec 12, 2018
77444f3
Add dummy hc_send in integration tests to make windows build work
lucksus Dec 12, 2018
42c31eb
Merge branch 'develop' into send-receive
Dec 12, 2018
5ab7729
Add dummy hc_send to example rustdocs so they can compile in windows CI
lucksus Dec 12, 2018
abe76af
Merge branch 'send-receive' of https://github.com/holochain/holochain…
lucksus Dec 12, 2018
e679c81
typo in comment
zippy Dec 12, 2018
1758785
test instance name = test name
lucksus Dec 12, 2018
181dc77
Merge branch 'send-receive' of https://github.com/holochain/holochain…
lucksus Dec 12, 2018
6eb2e62
println--
lucksus Dec 12, 2018
677ddd1
custom_send code comments
lucksus Dec 12, 2018
10696eb
invoke_send code comments
lucksus Dec 12, 2018
5b50169
SendDirectMessageTimeout and test
lucksus Dec 12, 2018
334dfcf
typo in comment
Connoropolous Dec 12, 2018
3836dcb
rustfmt
lucksus Dec 12, 2018
950ee14
Merge branch 'develop' into send-receive
Dec 13, 2018
434d223
rustfmt
lucksus Dec 13, 2018
9ea2c12
Merge branch 'develop' into send-receive
Dec 13, 2018
941b9ff
Merge branch 'develop' into send-receive
Dec 13, 2018
5a05493
Merge branch 'develop' into send-receive
Dec 13, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion core/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,10 @@ pub enum Action {
/// to the address given in [DirectMessageData](struct.DirectMessageData.html)
SendDirectMessage(DirectMessageData),

/// Makes the direct message connection with the given ID timeout by adding an
/// Err(HolochainError::Timeout) to NetworkState::custom_direct_message_replys.
SendDirectMessageTimeout(String),

/// Makes the network module forget about the direct message
/// connection with the given ID.
/// Triggered when we got an answer to our initial DM.
Expand All @@ -145,9 +149,14 @@ pub enum Action {

/// Updates the state to hold the response that we got for
/// our previous request for a validation package.
/// Triggered from the network handler when we got the response.
/// Triggered from the network handler when we get the response.
HandleGetValidationPackage((Address, Option<ValidationPackage>)),

/// Updates the state to hold the response that we got for
/// our previous custom direct message.
/// /// Triggered from the network handler when we get the response.
HandleCustomSendResponse((String, Result<String, String>)),

// ----------------
// Nucleus actions:
// ----------------
Expand Down
78 changes: 78 additions & 0 deletions core/src/network/actions/custom_send.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
extern crate futures;
use crate::{
action::{Action, ActionWrapper, DirectMessageData},
context::Context,
instance::dispatch_action,
network::direct_message::{CustomDirectMessage, DirectMessage},
};
use futures::{
future::Future,
task::{LocalWaker, Poll},
};
use holochain_core_types::{cas::content::Address, error::HolochainError};
use snowflake::ProcessUniqueId;
use std::{
pin::{Pin, Unpin},
sync::Arc,
thread::sleep,
time::Duration,
};

/// SendDirectMessage Action Creator for custom (=app) messages
/// This triggers the network module to open a synchronous node-to-node connection
/// by sending the given CustomDirectMessage and preparing to receive a response.
pub async fn custom_send(
to_agent: Address,
custom_direct_message: CustomDirectMessage,
context: &Arc<Context>,
) -> Result<String, HolochainError> {
let id = ProcessUniqueId::new().to_string();
let direct_message = DirectMessage::Custom(custom_direct_message);
let direct_message_data = DirectMessageData {
address: to_agent,
message: direct_message,
msg_id: id.clone(),
is_response: false,
};
let action_wrapper = ActionWrapper::new(Action::SendDirectMessage(direct_message_data));
dispatch_action(&context.action_channel, action_wrapper);

async {
sleep(Duration::from_secs(60));
let action_wrapper = ActionWrapper::new(Action::SendDirectMessageTimeout(id.clone()));
dispatch_action(&context.action_channel, action_wrapper.clone());
};

await!(SendResponseFuture {
context: context.clone(),
id,
})
}

/// SendResponseFuture waits for a result to show up in NetworkState::custom_direct_message_replys
pub struct SendResponseFuture {
context: Arc<Context>,
id: String,
}

impl Unpin for SendResponseFuture {}

impl Future for SendResponseFuture {
type Output = Result<String, HolochainError>;

fn poll(self: Pin<&mut Self>, lw: &LocalWaker) -> Poll<Self::Output> {
let state = self.context.state().unwrap().network();
if let Err(error) = state.initialized() {
return Poll::Ready(Err(HolochainError::ErrorGeneric(error.to_string())));
}
//
// TODO: connect the waker to state updates for performance reasons
// See: https://github.com/holochain/holochain-rust/issues/314
//
lw.wake();
match state.custom_direct_message_replys.get(&self.id) {
Some(result) => Poll::Ready(result.clone()),
_ => Poll::Pending,
}
}
}
1 change: 1 addition & 0 deletions core/src/network/actions/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod custom_send;
pub mod get_entry;
pub mod get_validation_package;
pub mod initialize_network;
Expand Down
18 changes: 16 additions & 2 deletions core/src/network/direct_message.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
use holochain_core_types::{cas::content::Address, validation::ValidationPackage};
use holochain_core_types::{
cas::content::Address, error::HolochainError, json::JsonString, validation::ValidationPackage,
};

/// This is direct message that got created by the zome code through hdk::send().
#[derive(Serialize, Deserialize, Clone, PartialEq, Debug, DefaultJson)]
pub struct CustomDirectMessage {
/// We have to track which zome sent the message so we can call the
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an interesting point... so an app can have one receive callback per zome?

But it could handle different event types, if they were passed as data, and using a match in receive?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Connoropolous pretty similar to how websockets works i guess, you can send and receive strings, but dispatching logic and simulating request/response behaviour is BYO

/// receive callback of the same zome on the receiving side.
pub zome: String,

/// The payload that the zome sends.
/// This is a result to enable the receive handler to return an error
pub payload: Result<String, String>,
}

/// These are the different kinds of (low-level, i.e. non-app)
/// node-to-node messages that can be send between Holochain nodes.
Expand All @@ -7,7 +21,7 @@ pub enum DirectMessage {
/// A custom direct message is something that gets triggered
/// from zome code, i.e. from the app.
/// Receiving such a messages triggers a WASM callback
Custom(String),
Custom(CustomDirectMessage),

/// This message is used to ask another node (which needs to
/// be the author) for the validation package of a given entry.
Expand Down
37 changes: 33 additions & 4 deletions core/src/network/handler/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use crate::{
context::Context,
instance::dispatch_action,
network::direct_message::DirectMessage,
workflows::respond_validation_package_request::respond_validation_package_request,
workflows::{
handle_custom_direct_message::handle_custom_direct_message,
respond_validation_package_request::respond_validation_package_request,
},
};
use futures::executor::block_on;
use holochain_core_types::cas::content::Address;
Expand All @@ -18,7 +21,18 @@ pub fn handle_send(message_data: MessageData, context: Arc<Context>) {
serde_json::from_str(&serde_json::to_string(&message_data.data).unwrap()).unwrap();

match message {
DirectMessage::Custom(_) => context.log("DirectMessage::Custom not implemented"),
DirectMessage::Custom(custom_direct_message) => {
thread::spawn(move || {
if let Err(error) = block_on(handle_custom_direct_message(
Address::from(message_data.from_agent_id),
message_data.msg_id,
custom_direct_message,
context.clone(),
)) {
context.log(format!("Error handling custom direct message: {:?}", error));
}
});
}
DirectMessage::RequestValidationPackage(address) => {
// Async functions only get executed when they are polled.
// I don't want to wait for this workflow to finish here as it would block the
Expand All @@ -39,7 +53,7 @@ pub fn handle_send(message_data: MessageData, context: Arc<Context>) {
};
}

/// We got a ProtocolWrapper::SendResult, this means somebody has responded to our message
/// We got a ProtocolWrapper::HandleSendResult, this means somebody has responded to our message
/// -> we called and this is the answer
pub fn handle_send_result(message_data: MessageData, context: Arc<Context>) {
let response: DirectMessage =
Expand All @@ -55,7 +69,22 @@ pub fn handle_send_result(message_data: MessageData, context: Arc<Context>) {
.cloned();

match response {
DirectMessage::Custom(_) => context.log("DirectMessage::Custom not implemented"),
DirectMessage::Custom(custom_direct_message) => {
if initial_message.is_none() {
context.log("Received a custom direct message response but could not find message ID in history. Not able to process.");
return;
}

let action_wrapper = ActionWrapper::new(Action::HandleCustomSendResponse((
message_data.msg_id.clone(),
custom_direct_message.payload,
)));
dispatch_action(&context.action_channel, action_wrapper.clone());

let action_wrapper =
ActionWrapper::new(Action::ResolveDirectConnection(message_data.msg_id));
dispatch_action(&context.action_channel, action_wrapper.clone());
}
DirectMessage::RequestValidationPackage(_) => context.log(
"Got DirectMessage::RequestValidationPackage as a response. This should not happen.",
),
Expand Down
19 changes: 19 additions & 0 deletions core/src/network/reducers/handle_custom_send_response.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use crate::{action::ActionWrapper, context::Context, network::state::NetworkState};
use holochain_core_types::error::HolochainError;
use std::sync::Arc;

pub fn reduce_handle_custom_send_response(
_context: Arc<Context>,
network_state: &mut NetworkState,
action_wrapper: &ActionWrapper,
) {
let action = action_wrapper.action();
let (msg_id, response) = unwrap_to!(action => crate::action::Action::HandleCustomSendResponse);

network_state.custom_direct_message_replys.insert(
msg_id.clone(),
response
.clone()
.map_err(|error| HolochainError::ErrorGeneric(error)),
);
}
6 changes: 5 additions & 1 deletion core/src/network/reducers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod get_entry;
pub mod get_validation_package;
pub mod handle_custom_send_response;
pub mod handle_get_result;
pub mod handle_get_validation_package;
pub mod init;
Expand All @@ -16,13 +17,14 @@ use crate::{
reducers::{
get_entry::{reduce_get_entry, reduce_get_entry_timeout},
get_validation_package::reduce_get_validation_package,
handle_custom_send_response::reduce_handle_custom_send_response,
handle_get_result::reduce_handle_get_result,
handle_get_validation_package::reduce_handle_get_validation_package,
init::reduce_init,
publish::reduce_publish,
resolve_direct_connection::reduce_resolve_direct_connection,
respond_get::reduce_respond_get,
send_direct_message::reduce_send_direct_message,
send_direct_message::{reduce_send_direct_message, reduce_send_direct_message_timeout},
},
state::NetworkState,
},
Expand All @@ -41,13 +43,15 @@ fn resolve_reducer(action_wrapper: &ActionWrapper) -> Option<NetworkReduceFn> {
Action::GetEntry(_) => Some(reduce_get_entry),
Action::GetEntryTimeout(_) => Some(reduce_get_entry_timeout),
Action::GetValidationPackage(_) => Some(reduce_get_validation_package),
Action::HandleCustomSendResponse(_) => Some(reduce_handle_custom_send_response),
Action::HandleGetResult(_) => Some(reduce_handle_get_result),
Action::HandleGetValidationPackage(_) => Some(reduce_handle_get_validation_package),
Action::InitNetwork(_) => Some(reduce_init),
Action::Publish(_) => Some(reduce_publish),
Action::ResolveDirectConnection(_) => Some(reduce_resolve_direct_connection),
Action::RespondGet(_) => Some(reduce_respond_get),
Action::SendDirectMessage(_) => Some(reduce_send_direct_message),
Action::SendDirectMessageTimeout(_) => Some(reduce_send_direct_message_timeout),
_ => None,
}
}
Expand Down
94 changes: 94 additions & 0 deletions core/src/network/reducers/send_direct_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ fn inner(
let protocol_object = if direct_message_data.is_response {
ProtocolWrapper::HandleSendResult(data)
} else {
network_state
.direct_message_connections
.insert(data.msg_id.clone(), direct_message_data.message.clone());
ProtocolWrapper::SendMessage(data)
};

Expand All @@ -42,3 +45,94 @@ pub fn reduce_send_direct_message(
context.log(format!("Error sending direct message: {:?}", error));
}
}

pub fn reduce_send_direct_message_timeout(
_context: Arc<Context>,
network_state: &mut NetworkState,
action_wrapper: &ActionWrapper,
) {
let action = action_wrapper.action();
let id = unwrap_to!(action => crate::action::Action::SendDirectMessageTimeout);

if network_state.custom_direct_message_replys.get(id).is_some() {
return;
}

network_state
.custom_direct_message_replys
.insert(id.clone(), Err(HolochainError::Timeout));
}

#[cfg(test)]
mod tests {

use crate::{
action::{Action, ActionWrapper, DirectMessageData, NetworkSettings},
context::mock_network_config,
instance::tests::test_context,
network::direct_message::{CustomDirectMessage, DirectMessage},
state::test_store,
};
use holochain_core_types::{cas::content::Address, error::HolochainError};
use std::sync::{Arc, RwLock};

#[test]
pub fn reduce_send_direct_message_timeout_test() {
let mut context = test_context("alice");
let store = test_store(context.clone());
let store = Arc::new(RwLock::new(store));

Arc::get_mut(&mut context).unwrap().set_state(store.clone());

let action_wrapper = ActionWrapper::new(Action::InitNetwork(NetworkSettings {
config: mock_network_config(),
dna_hash: String::from("abcd"),
agent_id: String::from("abcd"),
}));

{
let mut new_store = store.write().unwrap();
*new_store = new_store.reduce(context.clone(), action_wrapper);
}

let custom_direct_message = DirectMessage::Custom(CustomDirectMessage {
zome: String::from("test"),
payload: Ok(String::from("test")),
});
let msg_id = String::from("any");
let direct_message_data = DirectMessageData {
address: Address::from("bogus"),
message: custom_direct_message,
msg_id: msg_id.clone(),
is_response: false,
};
let action_wrapper = ActionWrapper::new(Action::SendDirectMessage(direct_message_data));

{
let mut new_store = store.write().unwrap();
*new_store = new_store.reduce(context.clone(), action_wrapper);
}
let maybe_reply = store
.read()
.unwrap()
.network()
.custom_direct_message_replys
.get(&msg_id)
.cloned();
assert_eq!(maybe_reply, None);

let action_wrapper = ActionWrapper::new(Action::SendDirectMessageTimeout(msg_id.clone()));
{
let mut new_store = store.write().unwrap();
*new_store = new_store.reduce(context.clone(), action_wrapper);
}
let maybe_reply = store
.read()
.unwrap()
.network()
.custom_direct_message_replys
.get(&msg_id.clone())
.cloned();
assert_eq!(maybe_reply, Some(Err(HolochainError::Timeout)));
}
}
3 changes: 3 additions & 0 deletions core/src/network/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub struct NetworkState {
/// Entries get removed when we receive an answer through Action::ResolveDirectConnection.
pub direct_message_connections: HashMap<String, DirectMessage>,

pub custom_direct_message_replys: HashMap<String, Result<String, HolochainError>>,

id: snowflake::ProcessUniqueId,
}

Expand All @@ -73,6 +75,7 @@ impl NetworkState {
get_entry_with_meta_results: HashMap::new(),
get_validation_package_results: HashMap::new(),
direct_message_connections: HashMap::new(),
custom_direct_message_replys: HashMap::new(),

id: snowflake::ProcessUniqueId::new(),
}
Expand Down
Loading