From 35fed2ac0530f8b5db923ae9aea81fa105b521da Mon Sep 17 00:00:00 2001 From: Hector Santos Date: Mon, 13 Dec 2021 22:36:14 +0100 Subject: [PATCH] test broadcast --- .../locutus-node/src/node/event_listener.rs | 144 +++++++++++++++++- crates/locutus-node/src/node/in_memory.rs | 13 +- crates/locutus-node/src/node/mod.rs | 2 +- crates/locutus-node/src/node/test_utils.rs | 47 +++++- crates/locutus-node/src/operations/get.rs | 3 + crates/locutus-node/src/operations/put.rs | 120 ++++++++++++--- .../locutus-node/src/operations/subscribe.rs | 2 + crates/locutus-node/src/user_events.rs | 3 +- 8 files changed, 307 insertions(+), 27 deletions(-) diff --git a/crates/locutus-node/src/node/event_listener.rs b/crates/locutus-node/src/node/event_listener.rs index 5bd124a0d..af620fa82 100644 --- a/crates/locutus-node/src/node/event_listener.rs +++ b/crates/locutus-node/src/node/event_listener.rs @@ -65,6 +65,32 @@ impl<'a> EventLog<'a> { }, *msg.id(), ), + Message::Put(PutMsg::Broadcasting { + new_value, + broadcast_to, + key, + .. + }) => EventKind::Put( + PutEvent::BroadcastEmitted { + broadcast_to: broadcast_to.clone(), + key: key.clone(), + value: new_value.clone(), + }, + *msg.id(), + ), + Message::Put(PutMsg::BroadcastTo { + sender, + new_value, + key, + .. + }) => EventKind::Put( + PutEvent::BroadcastReceived { + requester: sender.peer.clone(), + key: key.clone(), + value: new_value.clone(), + }, + *msg.id(), + ), Message::Get(GetMsg::ReturnGet { key, value: StoreResponse { value: Some(_), .. }, @@ -134,6 +160,32 @@ enum PutEvent { /// value that was put value: ContractValue, }, + BroadcastEmitted { + /// subscribed peers + broadcast_to: Vec, + /// key of the contract which value was being updated + key: ContractKey, + /// value that was put + value: ContractValue, + }, + BroadcastReceived { + /// peer who started the broadcast op + requester: PeerKey, + /// key of the contract which value was being updated + key: ContractKey, + /// value that was put + value: ContractValue, + }, + BroadcastComplete { + /// peer who performed the event + performer: PeerKey, + /// peer who started the broadcast op + requester: PeerKey, + /// key of the contract which value was being updated + key: ContractKey, + /// value that was put + value: ContractValue, + }, } #[inline] @@ -150,13 +202,16 @@ fn create_log(logs: &[MessageLog], log: EventLog) -> (MessageLog, ListenerLogId) .iter() .filter_map(|l| { if matches!(l, MessageLog { kind: EventKind::Put(_, id), .. } if incoming_tx == id ) { - Some(&l.kind) + match l.kind { + EventKind::Put(PutEvent::BroadcastEmitted { .. }, _) => None, + _ => Some(&l.kind), + } } else { None } }) .chain([&kind]); - let kind = fuse_events_msg(find_put_ops).unwrap_or(kind); + let kind = fuse_successful_put_op(find_put_ops).unwrap_or(kind); let msg_log = MessageLog { ts: Instant::now(), @@ -166,7 +221,9 @@ fn create_log(logs: &[MessageLog], log: EventLog) -> (MessageLog, ListenerLogId) (msg_log, log_id) } -fn fuse_events_msg<'a>(mut put_ops: impl Iterator) -> Option { +fn fuse_successful_put_op<'a>( + mut put_ops: impl Iterator, +) -> Option { let prev_msgs = [put_ops.next().cloned(), put_ops.next().cloned()]; match prev_msgs { [Some(EventKind::Put(PutEvent::Request { performer, key }, id)), Some(EventKind::Put(PutEvent::PutSuccess { requester, value }, _))] => { @@ -189,6 +246,7 @@ mod test_utils { use std::{collections::HashMap, sync::Arc}; use dashmap::DashMap; + use itertools::Itertools; use parking_lot::RwLock; use crate::{contract::ContractKey, message::TxType, ring::Distance}; @@ -234,6 +292,86 @@ mod test_utils { }) } + pub fn get_broadcast_count( + &self, + expected_key: &ContractKey, + expected_value: &ContractValue, + ) -> usize { + let mut logs = self.logs.read(); + logs.iter().filter(|log| { + matches!(log.kind, EventKind::Put(PutEvent::BroadcastEmitted { ref key, ref value, .. }, ..) if key == expected_key && value == expected_value ) + }).count() + } + + pub fn has_broadcast_contract( + &self, + mut broadcast_pairs: Vec<(PeerKey, PeerKey)>, + expected_key: &ContractKey, + expected_value: &ContractValue, + ) -> bool { + let logs = self.logs.read(); + let mut broadcast_ops = logs.iter().filter_map(|l| { + if matches!( + l, + MessageLog { + kind: EventKind::Put(_, id), + .. + } + ) { + match l.kind { + EventKind::Put(PutEvent::BroadcastEmitted { .. }, _) + | EventKind::Put(PutEvent::BroadcastReceived { .. }, _) => Some(&l.kind), + _ => None, + } + } else { + None + } + }); + + let prev_msgs = [broadcast_ops.next().cloned(), broadcast_ops.next().cloned()]; + let broadcast = match prev_msgs { + [Some(EventKind::Put(PutEvent::BroadcastEmitted { broadcast_to, .. }, id1)), Some(EventKind::Put( + PutEvent::BroadcastReceived { + requester, + key, + value, + }, + id2, + ))] => { + if id1 == id2 { + Some(EventKind::Put( + PutEvent::BroadcastComplete { + performer: broadcast_to.get(0).unwrap().peer, + requester, + key, + value, + }, + id1, + )) + } else { + None + } + } + _ => None, + }; + + match broadcast { + Some(EventKind::Put( + PutEvent::BroadcastComplete { + ref performer, + ref requester, + .. + }, + _, + )) => { + let expected_pair = (performer, requester); + broadcast_pairs.retain(|pair| matches!(pair, expected_pair)); + !broadcast_pairs.is_empty() + } + _ => false, + } + } + pub fn has_got_contract(&self, peer: &PeerKey, expected_key: &ContractKey) -> bool { let logs = self.logs.read(); logs.iter().any(|log| { diff --git a/crates/locutus-node/src/node/in_memory.rs b/crates/locutus-node/src/node/in_memory.rs index 08da741c7..aa754ef7a 100644 --- a/crates/locutus-node/src/node/in_memory.rs +++ b/crates/locutus-node/src/node/in_memory.rs @@ -1,9 +1,10 @@ +use std::collections::HashMap; use std::sync::Arc; use tokio::sync::mpsc::{self, Receiver}; use crate::contract::{ - Contract, ContractError, ContractHandlerEvent, ContractValue, SimStoreError, + Contract, ContractError, ContractHandlerEvent, ContractKey, ContractValue, SimStoreError, }; use crate::{ conn_manager::{in_memory::MemoryConnManager, ConnectionBridge, PeerKey}, @@ -84,6 +85,7 @@ where pub(crate) async fn append_contracts( &self, contracts: Vec<(Contract, ContractValue)>, + contract_subscribers: HashMap>, ) -> Result<(), ContractError> { for (contract, value) in contracts { let key = contract.key(); @@ -98,7 +100,14 @@ where key, self.op_storage.ring.peer_key ); - self.op_storage.ring.cached_contracts.insert(key); + self.op_storage.ring.cached_contracts.insert(key.clone()); + + if let Some(subscribers) = contract_subscribers.get(&key) { + // add contract subscribers + for subscriber in subscribers { + self.op_storage.ring.add_subscriber(key, *subscriber); + } + } } Ok(()) } diff --git a/crates/locutus-node/src/node/mod.rs b/crates/locutus-node/src/node/mod.rs index 74dbb81ee..16b58e04f 100644 --- a/crates/locutus-node/src/node/mod.rs +++ b/crates/locutus-node/src/node/mod.rs @@ -56,7 +56,7 @@ where } #[cfg(not(test))] -impl Node +impl Node where CErr: std::error::Error + Send + Sync + 'static, { diff --git a/crates/locutus-node/src/node/test_utils.rs b/crates/locutus-node/src/node/test_utils.rs index f751b7a8d..3f2cb1aa5 100644 --- a/crates/locutus-node/src/node/test_utils.rs +++ b/crates/locutus-node/src/node/test_utils.rs @@ -10,6 +10,7 @@ use rand::Rng; use tokio::sync::watch::{channel, Receiver, Sender}; use crate::contract::{Contract, ContractKey, ContractValue, SimStoreError}; +use crate::ring::PeerKeyLocation; use crate::user_events::UserEvent; use crate::{ conn_manager::PeerKey, @@ -61,6 +62,7 @@ pub(crate) struct NodeSpecification { pub owned_contracts: Vec<(Contract, ContractValue)>, pub non_owned_contracts: Vec, pub events_to_generate: HashMap, + pub contract_subscribers: HashMap>, } #[derive(Clone)] @@ -231,7 +233,7 @@ impl SimNetwork { self.labels.insert(label, peer.peer_key); tokio::spawn(async move { if let Some(specs) = node_specs { - peer.append_contracts(specs.owned_contracts) + peer.append_contracts(specs.owned_contracts, specs.contract_subscribers) .await .map_err(|_| anyhow::anyhow!("failed inserting test owned contracts"))?; } @@ -239,6 +241,25 @@ impl SimNetwork { }); } + pub fn get_locations_by_node(&self) -> HashMap { + let mut locations_by_node: HashMap = HashMap::new(); + + // Get node and gateways location by label + for (node, label) in &self.nodes { + locations_by_node.insert( + label.to_string(), + node.op_storage.ring.own_location().clone(), + ); + } + for (node, config) in &self.gateways { + locations_by_node.insert( + config.label.to_string(), + node.op_storage.ring.own_location().clone(), + ); + } + locations_by_node + } + pub fn connected(&self, peer: &str) -> bool { if let Some(key) = self.labels.get(peer) { self.event_listener.is_connected(key) @@ -255,6 +276,30 @@ impl SimNetwork { } } + pub fn has_broadcast_contract( + &self, + broadcast_pairs: Vec<(&str, &str)>, + key: &ContractKey, + value: &ContractValue, + ) -> bool { + let peers = broadcast_pairs + .into_iter() + .step_by(2) + .map( + |(peer1, peer2)| match (self.labels.get(peer1), self.labels.get(peer2)) { + (Some(pk1), Some(pk2)) => (*pk1, *pk2), + _ => panic!("peer not found"), + }, + ) + .collect(); + self.event_listener + .has_broadcast_contract(peers, key, value) + } + + pub fn count_broadcasts(&self, key: &ContractKey, value: &ContractValue) -> usize { + self.event_listener.get_broadcast_count(key, value) + } + pub fn has_got_contract(&self, peer: &str, key: &ContractKey) -> bool { if let Some(pk) = self.labels.get(peer) { self.event_listener.has_got_contract(pk, key) diff --git a/crates/locutus-node/src/operations/get.rs b/crates/locutus-node/src/operations/get.rs index fbd4c6c85..baf53ed77 100644 --- a/crates/locutus-node/src/operations/get.rs +++ b/crates/locutus-node/src/operations/get.rs @@ -724,12 +724,14 @@ mod test { owned_contracts: vec![], non_owned_contracts: vec![key], events_to_generate: HashMap::from_iter([(1, get_event)]), + contract_subscribers: HashMap::new(), }; let gw_0 = NodeSpecification { owned_contracts: vec![(contract, contract_val)], non_owned_contracts: vec![], events_to_generate: HashMap::new(), + contract_subscribers: HashMap::new(), }; let get_specs = HashMap::from_iter([ @@ -769,6 +771,7 @@ mod test { owned_contracts: vec![], non_owned_contracts: vec![key], events_to_generate: HashMap::from_iter([(1, get_event)]), + contract_subscribers: HashMap::new(), }; let get_specs = HashMap::from_iter([("node-1".to_string(), node_1)]); diff --git a/crates/locutus-node/src/operations/put.rs b/crates/locutus-node/src/operations/put.rs index a68b7f3ad..55f8d96e9 100644 --- a/crates/locutus-node/src/operations/put.rs +++ b/crates/locutus-node/src/operations/put.rs @@ -3,6 +3,7 @@ //! as well as will broadcast updates to the contract value to all subscribers. // FIXME: should allow to do partial value updates +use std::collections::HashSet; use std::time::Duration; use crate::{ @@ -151,14 +152,21 @@ impl StateMachineImpl for PutOpSm { PutMsg::Broadcasting { id, new_value, + broadcasted_to, broadcast_to, - .. + key, }, ) => { if broadcast_to.is_empty() { Some(PutMsg::SuccessfulUpdate { id, new_value }) } else { - None + Some(PutMsg::Broadcasting { + id, + new_value, + broadcasted_to, + broadcast_to, + key, + }) } } _ => None, @@ -426,6 +434,65 @@ where return Err(OpError::StatePushed); } } + PutMsg::BroadcastTo { + id, + key, + new_value, + sender, + sender_subscribers, + } => { + let target = op_storage.ring.own_location(); + + log::debug!("Attempting contract value update"); + let new_value = put_contract(op_storage, key, new_value).await?; + log::debug!("Contract successfully updated"); + + let broadcast_to = op_storage + .ring + .subscribers_of(&key) + .map(|i| { + // Avoid already broadcast nodes and sender from broadcasting + let mut subscribers: Vec = i.value().to_vec(); + let mut avoid_list: HashSet = + sender_subscribers.into_iter().map(|pl| pl.peer).collect(); + avoid_list.insert(sender.peer); + subscribers.retain(|s| !avoid_list.contains(&s.peer)); + subscribers + }) + .unwrap_or_default(); + log::debug!( + "Successfully updated a value for contract {} @ {:?}", + key, + target.location + ); + + let internal_cb = state + .sm + .consume_to_output(PutMsg::Broadcasting { + id, + broadcast_to, + broadcasted_to: 0, + key, + new_value, + })? + .ok_or(OpError::InvalidStateTransition(id))?; + + if let PutMsg::SuccessfulUpdate { .. } = internal_cb { + log::debug!( + "Empty broadcast list while updating value for contract {}", + key + ); + // means the whole tx finished so can return early + return_msg = Some(internal_cb.into()); + new_state = None; + } else { + log::debug!("Callback to start broadcasting to other nodes"); + op_storage + .notify_change(internal_cb.into(), Operation::Put(state)) + .await?; + return Err(OpError::StatePushed); + } + } PutMsg::Broadcasting { id, mut broadcast_to, @@ -439,6 +506,7 @@ where key, new_value: new_value.clone(), sender, + sender_subscribers: broadcast_to.clone(), }; let mut broadcasting = Vec::with_capacity(broadcast_to.len()); @@ -479,20 +547,9 @@ where broadcasted_to ); - return_msg = state - .sm - .consume_to_state(PutMsg::Broadcasting { - id, - broadcasted_to, - broadcast_to, - key, - new_value, - })? - .map(Message::from); + // Subscriber nodes have been notified of the change, the operation is completed + return_msg = None; new_state = None; - if &PutState::BroadcastComplete != state.sm.state() { - return Err(OpError::InvalidStateTransition(id)); - } } PutMsg::SuccessfulUpdate { id, new_value } => { return_msg = state @@ -707,6 +764,7 @@ mod messages { sender: PeerKeyLocation, key: ContractKey, new_value: ContractValue, + sender_subscribers: Vec, }, } @@ -838,9 +896,9 @@ mod test { Ok(()) } - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn successful_put_op_between_nodes() -> Result<(), anyhow::Error> { - const NUM_NODES: usize = 1usize; + const NUM_NODES: usize = 2usize; const NUM_GW: usize = 1usize; let bytes = crate::test_utils::random_bytes_1024(); @@ -850,37 +908,61 @@ mod test { let contract_val: ContractValue = gen.arbitrary()?; let new_value = ContractValue::new(Vec::from_iter(gen.arbitrary::<[u8; 20]>().unwrap())); + let mut sim_nodes = SimNetwork::new(NUM_GW, NUM_NODES, 3, 2, 4, 2); + let mut locations = sim_nodes.get_locations_by_node(); + let node0_loc = locations.remove("node-0").unwrap(); + let node1_loc = locations.remove("node-1").unwrap(); + let gateway0_loc = locations.remove("gateway-0").unwrap(); + // both own the contract, and one triggers an update let node_0 = NodeSpecification { owned_contracts: vec![(contract.clone(), contract_val.clone())], non_owned_contracts: vec![], events_to_generate: HashMap::new(), + contract_subscribers: HashMap::from_iter([(contract.key(), vec![node1_loc.clone()])]), + }; + + let node_1 = NodeSpecification { + owned_contracts: vec![(contract.clone(), contract_val.clone())], + non_owned_contracts: vec![], + events_to_generate: HashMap::new(), + contract_subscribers: HashMap::from_iter([(contract.key(), vec![node0_loc.clone()])]), }; let put_event = UserEvent::Put { contract: contract.clone(), value: new_value.clone(), }; + let gw_0 = NodeSpecification { owned_contracts: vec![(contract, contract_val)], non_owned_contracts: vec![], events_to_generate: HashMap::from_iter([(1, put_event)]), + contract_subscribers: HashMap::new(), }; // establish network let put_specs = HashMap::from_iter([ ("node-0".to_string(), node_0), + ("node-1".to_string(), node_1), ("gateway-0".to_string(), gw_0), ]); - let mut sim_nodes = SimNetwork::new(NUM_GW, NUM_NODES, 3, 2, 4, 2); + sim_nodes.build_with_specs(put_specs); + tokio::time::sleep(Duration::from_secs(5)).await; check_connectivity(&sim_nodes, NUM_NODES, Duration::from_secs(3)).await?; // trigger the put op @ gw-0, this sim_nodes - .trigger_event("gateway-0", 1, Some(Duration::from_millis(100))) + .trigger_event("gateway-0", 1, Some(Duration::from_secs(1))) .await?; assert!(sim_nodes.has_put_contract("gateway-0", &key, &new_value)); + assert_eq!(1, sim_nodes.count_broadcasts(&key, &new_value)); + assert!(sim_nodes.has_broadcast_contract( + vec![("node-0", "node-1"), ("node-1", "node-0")], + &key, + &new_value + )); Ok(()) } } diff --git a/crates/locutus-node/src/operations/subscribe.rs b/crates/locutus-node/src/operations/subscribe.rs index a373c0478..d7caf4708 100644 --- a/crates/locutus-node/src/operations/subscribe.rs +++ b/crates/locutus-node/src/operations/subscribe.rs @@ -616,12 +616,14 @@ mod test { owned_contracts: Vec::new(), non_owned_contracts: vec![contract_key], events_to_generate: HashMap::from_iter([(1, event)]), + contract_subscribers: HashMap::new(), }; let second_node = NodeSpecification { owned_contracts: vec![(contract, contract_val)], non_owned_contracts: Vec::new(), events_to_generate: HashMap::new(), + contract_subscribers: HashMap::new(), }; let subscribe_specs = HashMap::from_iter([ diff --git a/crates/locutus-node/src/user_events.rs b/crates/locutus-node/src/user_events.rs index 9f8540255..3ede68488 100644 --- a/crates/locutus-node/src/user_events.rs +++ b/crates/locutus-node/src/user_events.rs @@ -37,6 +37,7 @@ pub(crate) mod test_utils { use tokio::sync::watch::Receiver; use super::*; + use crate::ring::PeerKeyLocation; use crate::{conn_manager::PeerKey, node::test_utils::EventId}; pub(crate) struct MemoryEventsGen { @@ -68,7 +69,7 @@ pub(crate) mod test_utils { &mut self, contracts: impl IntoIterator, ) { - self.owned_contracts.extend(contracts.into_iter()) + self.owned_contracts.extend(contracts); } /// Events that the user generate.