diff --git a/.gitignore b/.gitignore index 7f4f4fbad..35c3e4ed1 100644 --- a/.gitignore +++ b/.gitignore @@ -18,5 +18,3 @@ target/ # IDEs .idea/* .vscode/* - -Cargo.lock \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index ac243ff0b..fb9fbb407 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1708,6 +1708,7 @@ dependencies = [ name = "locutus-node" version = "0.0.1" dependencies = [ + "async-trait", "bincode", "config", "crossbeam", diff --git a/crates/freenet2-node/src/conn_manager.rs b/crates/freenet2-node/src/conn_manager.rs deleted file mode 100644 index fc627427e..000000000 --- a/crates/freenet2-node/src/conn_manager.rs +++ /dev/null @@ -1,170 +0,0 @@ -//! Types and definitions to handle all socket communication for the peer nodes. - -use std::{fmt::Display, net::SocketAddr, sync::atomic::AtomicU64, time::Duration}; - -use libp2p::{core::PublicKey, PeerId}; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; - -use crate::{ - message::{Message, TransactionTypeId, Transaction}, - ring_proto::Location, - StdResult, -}; - -pub mod in_memory; - -const _PING_EVERY: Duration = Duration::from_secs(30); -const _DROP_CONN_AFTER: Duration = Duration::from_secs(30 * 10); -static HANDLE_ID: AtomicU64 = AtomicU64::new(0); - -// pub(crate) type RemoveConnHandler<'t> = Box; -pub(crate) type Result = StdResult; - -/// 3 words size for 64-bit platforms. -#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] -pub(crate) struct ListenerHandle(u64); - -impl ListenerHandle { - pub fn new() -> Self { - ListenerHandle(HANDLE_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst)) - } -} - -impl Default for ListenerHandle { - fn default() -> Self { - Self::new() - } -} - -/// Types which impl this trait are responsible for the following responsabilities: -/// - establishing reliable connections to other peers, -/// including any handshake procedures -/// - keep connections alive or reconnecting to other peers -/// - securely transmitting messages between peers -/// - serializing and deserializing messages -/// -/// The implementing types manage the lower level connection details of the the network, -/// usually working at the transport layer over UDP or TCP and performing NAT traversal -/// to establish connections between peers. -pub(crate) trait ConnectionBridge: Send + Sync { - /// The transport being used to manage networking. - type Transport: Transport; - - /// Register a handler callback for when a connection is removed. - // fn on_remove_conn(&self, func: RemoveConnHandler); - - /// Start listening for incoming connections and process any incoming messages with - /// the provided function. - fn listen(&self, tx_type: TransactionTypeId, listen_fn: F) -> ListenerHandle - where - F: Fn(PeerKeyLocation, Message) -> Result<()> + Send + Sync + 'static; - - /// Listens to inbound replies for a previously broadcasted transaction to the network, - /// if a reply is detected performs a callback. - // FIXME: the fn could take arguments by ref if necessary but due to - // https://github.com/rust-lang/rust/issues/70263 it won't compile - // can workaround by wrapping up the fn to express lifetime constraints, - // consider this, meanwhile passing by value is fine - fn listen_to_replies(&self, tx_id: Transaction, callback: F) - where - F: Fn(PeerKeyLocation, Message) -> Result<()> + Send + Sync + 'static; - - fn transport(&self) -> &Self::Transport; - - /// Initiate a connection with a given peer. At this stage NAT traversal - /// has been succesful and the [`Transport`] has established a connection. - fn add_connection(&self, peer_key: PeerKeyLocation, unsolicited: bool); - - /// Sends a message to a given peer which has already been identified and - /// which has established a connection with this peer, registers a callback action - /// with the manager for when a response is received. - fn send_with_callback( - &self, - to: PeerKeyLocation, - tx_id: Transaction, - msg: Message, - callback: F, - ) -> Result<()> - where - F: Fn(PeerKeyLocation, Message) -> Result<()> + Send + Sync + 'static; - - /// Send a message to a given peer which has already been identified and - /// which has established a connection with this peer. - fn send(&self, to: PeerKeyLocation, tx_id: Transaction, msg: Message) -> Result<()>; - - /// Remove a listener for a given transaction. - fn remove_listener(&self, tx_id: Transaction); -} - -/// A protocol used to send and receive data over the network. -pub(crate) trait Transport { - fn is_open(&self) -> bool; -} - -#[derive(Debug, PartialEq, Eq, Hash)] -struct Peer { - addr: SocketAddr, - port: u16, - label: Option, -} - -#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] -pub struct PeerKey(PeerId); - -impl Display for PeerKey { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0) - } -} - -impl From for PeerKey { - fn from(val: PublicKey) -> Self { - PeerKey(PeerId::from(val)) - } -} - -#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Hash)] -/// The Location of a PeerKey in the ring. This location allows routing towards the peer. -pub(crate) struct PeerKeyLocation { - pub peer: PeerKey, - pub location: Option, -} - -#[derive(Debug, thiserror::Error)] -pub(crate) enum ConnError { - #[error("received unexpected response type for a sent request: {0}")] - UnexpectedResponseMessage(Message), - #[error("location unknown for this node")] - LocationUnknown, - #[error("expected transaction id was {0} but received {1}")] - UnexpectedTx(Transaction, Transaction), - #[error("error while de/serializing message")] - Serialization(#[from] Box), - #[error("connection negotiation between two peers failed")] - NegotationFailed, -} - -mod serialization { - use super::*; - - impl Serialize for PeerKey { - fn serialize(&self, serializer: S) -> StdResult - where - S: Serializer, - { - serializer.serialize_bytes(&self.0.to_bytes()) - } - } - - impl<'de> Deserialize<'de> for PeerKey { - fn deserialize(deserializer: D) -> StdResult - where - D: Deserializer<'de>, - { - let bytes: Vec = Deserialize::deserialize(deserializer)?; - Ok(PeerKey( - PeerId::from_bytes(&bytes).expect("failed deserialization of PeerKey"), - )) - } - } -} diff --git a/crates/freenet2-node/src/conn_manager/in_memory.rs b/crates/freenet2-node/src/conn_manager/in_memory.rs deleted file mode 100644 index 97cba427f..000000000 --- a/crates/freenet2-node/src/conn_manager/in_memory.rs +++ /dev/null @@ -1,268 +0,0 @@ -//! A in-memory connection manager and transport implementation. Used for testing pourpouses. -use std::{array::IntoIter, collections::HashMap, io::Cursor, sync::Arc, time::Duration}; - -use crossbeam::channel::{self, Receiver, Sender}; -use once_cell::sync::OnceCell; -use parking_lot::{Mutex, RwLock}; - -use crate::{ - config::tracing::Logger, - conn_manager::{self, ConnectionBridge, ListenerHandle, PeerKey, PeerKeyLocation}, - message::{Message, Transaction, TransactionTypeId}, - ring_proto::Location, -}; - -use super::Transport; - -type InboundListenerFn = - Box conn_manager::Result<()> + Send + Sync>; -type InboundListenerRegistry = RwLock>; - -type ResponseListenerFn = - Box conn_manager::Result<()> + Send + Sync>; -type OutboundListenerRegistry = Arc>>; - -#[derive(Clone)] -pub(crate) struct MemoryConnManager { - /// listeners for inbound initial messages - inbound_listeners: Arc>, - /// listeners for outbound messages replies - outbound_listeners: OutboundListenerRegistry, - transport: InMemoryTransport, - // LIFO stack for pending listeners - pend_listeners: Sender<(Transaction, ResponseListenerFn)>, -} - -impl MemoryConnManager { - pub fn new(is_open: bool, peer: PeerKey, location: Option) -> Self { - Logger::init_logger(); - let (pend_listeners, rcv_pend_listeners) = channel::unbounded(); - let transport = InMemoryTransport::new(is_open, peer, location); - let inbound_listeners: Arc> = Arc::new( - IntoIter::new(TransactionTypeId::enumeration()) - .map(|id| (id, RwLock::new(HashMap::new()))) - .collect(), - ); - let outbound_listeners: Arc>> = - Arc::new(RwLock::new(HashMap::new())); - - let tr_cp = transport.clone(); - let inbound_cp = Arc::clone(&inbound_listeners); - let outbound_cp = outbound_listeners.clone(); - tokio::spawn(async move { - // evaluate the messages as they arrive - loop { - let msg = { tr_cp.msg_stack_queue.lock().pop() }; - if let Some(msg) = msg { - let msg_data: Message = - bincode::deserialize_from(Cursor::new(msg.data)).unwrap(); - if let Some(tx_fn) = outbound_cp.read().get(msg_data.id()) { - log::debug!("Received response for transaction: {}", msg_data.id()); - if let Some(location) = msg.origin_loc { - if let Err(err) = tx_fn( - PeerKeyLocation { - peer: msg.origin, - location: Some(location), - }, - msg_data, - ) { - log::error!("Error processing response: {}", err); - } - } else { - log::error!("No location for responding peer {}", msg.target); - } - } else { - let listeners = &inbound_cp[&msg_data.msg_type()]; - log::debug!("Received inbound transaction: {}", msg_data.id()); - let reg = &*listeners.read(); - for func in reg.values() { - if let Err(err) = func( - PeerKeyLocation { - peer: msg.origin, - location: None, - }, - msg_data.clone(), - ) { - log::error!("Error while calling inbound msg handler: {}", err); - } - } - } - // insert any pending functions generated from within the callback - let mut lock = outbound_cp.write(); - for (tx, func) in rcv_pend_listeners.try_iter() { - lock.insert(tx, func); - } - } - tokio::time::sleep(Duration::from_millis(1)).await; - } - }); - - Self { - inbound_listeners, - outbound_listeners, - transport, - pend_listeners, - } - } -} - -impl ConnectionBridge for MemoryConnManager { - type Transport = InMemoryTransport; - - fn listen(&self, tx_type: TransactionTypeId, listen_fn: F) -> ListenerHandle - where - F: Fn(PeerKeyLocation, Message) -> conn_manager::Result<()> + Send + Sync + 'static, - { - let tx_ty_listener = &self.inbound_listeners[&tx_type]; - let handle_id = ListenerHandle::new(); - tx_ty_listener - .write() - .insert(handle_id, Box::new(listen_fn)); - handle_id - } - - fn listen_to_replies(&self, tx: Transaction, callback: F) - where - F: Fn(PeerKeyLocation, Message) -> conn_manager::Result<()> + Send + Sync + 'static, - { - // optimistically try to acquire a lock - if let Some(mut lock) = self.outbound_listeners.try_write() { - lock.insert(tx, Box::new(callback)); - } else { - // it failed, this is being inserted from an other existing closure holding the lock - // send it to the temporal stack queue for posterior insertion - self.pend_listeners - .send((tx, Box::new(callback))) - .expect("full or disconnected"); - } - } - - fn transport(&self) -> &Self::Transport { - &self.transport - } - - fn add_connection(&self, _peer_key: PeerKeyLocation, _unsolicited: bool) {} - - fn send_with_callback( - &self, - to: PeerKeyLocation, - tx: Transaction, - msg: Message, - callback: F, - ) -> conn_manager::Result<()> - where - F: Fn(PeerKeyLocation, Message) -> conn_manager::Result<()> + Send + Sync + 'static, - { - // store listening func - self.outbound_listeners - .write() - .insert(tx, Box::new(callback)); - - // send the msg - let serialized = bincode::serialize(&msg)?; - self.transport - .send(to.peer, to.location.unwrap(), serialized); - Ok(()) - } - - fn send( - &self, - to: PeerKeyLocation, - _tx: Transaction, - msg: Message, - ) -> conn_manager::Result<()> { - let serialized = bincode::serialize(&msg)?; - self.transport - .send(to.peer, to.location.unwrap(), serialized); - Ok(()) - } - - fn remove_listener(&self, tx: Transaction) { - self.outbound_listeners.write().remove(&tx); - } -} - -static NETWORK_WIRES: OnceCell<(Sender, Receiver)> = - OnceCell::new(); - -#[derive(Clone, Debug)] -struct MessageOnTransit { - origin: PeerKey, - origin_loc: Option, - target: PeerKey, - data: Vec, -} - -#[derive(Clone, Debug)] -pub struct InMemoryTransport { - interface_peer: PeerKey, - location: Option, - is_open: bool, - /// received messages per each peer awaiting processing - msg_stack_queue: Arc>>, - /// all messages 'traversing' the network at a given time - network: Sender, -} - -impl InMemoryTransport { - fn new(is_open: bool, interface_peer: PeerKey, location: Option) -> Self { - let msg_stack_queue = Arc::new(Mutex::new(Vec::new())); - let (tx, rx) = NETWORK_WIRES.get_or_init(crossbeam::channel::unbounded); - - // store messages incoming from the network in the msg stack - let rcv_msg_c = msg_stack_queue.clone(); - let network = tx.clone(); - let rx = rx.clone(); - tokio::spawn(async move { - loop { - match rx.try_recv() { - Ok(msg) if msg.target == interface_peer => { - log::debug!( - "Inbound message received for peer {} from {}", - interface_peer, - msg.origin - ); - rcv_msg_c.lock().push(msg); - } - Err(channel::TryRecvError::Disconnected) => break, - Err(channel::TryRecvError::Empty) | Ok(_) => { - tokio::time::sleep(Duration::from_millis(1)).await - } - } - } - log::error!("Stopped receiving messages in {}", interface_peer); - }); - - Self { - interface_peer, - location, - is_open, - msg_stack_queue, - network, - } - } - - fn send(&self, peer: PeerKey, location: Location, message: Vec) { - let send_res = self.network.try_send(MessageOnTransit { - origin: self.interface_peer, - origin_loc: Some(location), - target: peer, - data: message, - }); - match send_res { - Err(channel::TrySendError::Disconnected(_)) => { - log::debug!("Network shutdown") - } - Err(channel::TrySendError::Full(_)) => { - unreachable!("not unbounded capacity!") - } - Ok(_) => {} - } - } -} - -impl Transport for InMemoryTransport { - fn is_open(&self) -> bool { - self.is_open - } -} diff --git a/crates/freenet2-node/src/node.rs b/crates/freenet2-node/src/node.rs deleted file mode 100644 index 2f234e146..000000000 --- a/crates/freenet2-node/src/node.rs +++ /dev/null @@ -1,183 +0,0 @@ -use std::net::IpAddr; - -use libp2p::{identity, multiaddr::Protocol, Multiaddr, PeerId}; - -use crate::config::CONF; - -use self::{in_memory::InMemory, libp2p_impl::NodeLibP2P}; - -mod in_memory; -mod libp2p_impl; -mod op_state; - -pub struct Node(NodeImpl); - -impl Node { - pub fn listen_on(&mut self) -> Result<(), ()> { - match self.0 { - NodeImpl::LibP2P(ref mut node) => node.listen_on(), - NodeImpl::InMemory(ref mut node) => node.listen_on(), - } - } -} - -enum NodeImpl { - LibP2P(Box), - InMemory(Box), -} - -pub struct NodeConfig { - /// local peer private key in - local_key: identity::Keypair, - - // optional local info, in case this is an initial bootstrap node - /// IP to bind to the listener - local_ip: Option, - /// socket port to bind to the listener - local_port: Option, - - /// At least an other running listener node is required for joining the network. - /// Not necessary if this is an initial node. - remote_nodes: Vec, -} - -impl NodeConfig { - /// When instancing a node you can either join an existing network or bootstrap a new network with a listener - /// which will act as the initial provider. This initial peer will be listening at the provided port and assigned IP. - /// If those are not free the instancing process will return an error. - /// - /// In order to bootstrap a new network the following arguments are required to be provided to the builder: - /// - ip: IP associated to the initial node. - /// - port: listening port of the initial node. - /// - /// If both are provided but also additional peers are added via the [add_provider] method, this node will - /// be listening but also try to connect to an existing peer. - pub fn new() -> NodeConfig { - let local_key = if let Some(key) = &CONF.local_peer_keypair { - key.clone() - } else { - identity::Keypair::generate_ed25519() - }; - NodeConfig { - local_key, - remote_nodes: Vec::with_capacity(1), - local_ip: None, - local_port: None, - } - } - - pub fn with_port(mut self, port: u16) -> Self { - self.local_port = Some(port); - self - } - - pub fn with_ip>(mut self, ip: T) -> Self { - self.local_ip = Some(ip.into()); - self - } - - /// Optional identity key of this node. - /// If not provided it will be either obtained from the configuration or freshly generated. - pub fn with_key(mut self, key: identity::Keypair) -> Self { - self.local_key = key; - self - } - - /// Connection info for an already existing peer. Required in case this is not a bootstrapping node. - pub fn add_provider(mut self, peer: InitPeerNode) -> Self { - self.remote_nodes.push(peer); - self - } - - /// Builds a node using libp2p as backend connection manager. - pub fn build_libp2p(self) -> std::io::Result { - Ok(Node(NodeImpl::LibP2P(Box::new(NodeLibP2P::build(self)?)))) - } - - /// Builds a node using in-memory transport. Used for testing pourpouses. - pub fn build_in_memory(self) -> Result { - let inmem = InMemory::build(self)?; - Ok(Node(NodeImpl::InMemory(Box::new(inmem)))) - } -} - -impl Default for NodeConfig { - fn default() -> Self { - Self::new() - } -} - -/// Initial listening peer node to bootstrap the network. -#[derive(Clone)] -pub struct InitPeerNode { - addr: Option, - identifier: Option, -} - -impl InitPeerNode { - pub fn new() -> Self { - Self { - addr: None, - identifier: None, - } - } - - /// Given a byte array decode into a PeerId data type. - /// - /// # Panic - /// Will panic if is not a valid representation. - pub fn decode_peer_id>(mut bytes: T) -> PeerId { - PeerId::from_public_key( - identity::Keypair::Ed25519(identity::ed25519::Keypair::decode(bytes.as_mut()).unwrap()) - .public(), - ) - } - - /// IP which will be assigned to this node. - pub fn listening_ip>(mut self, ip: T) -> Self { - if let Some(addr) = &mut self.addr { - addr.push(Protocol::from(ip.into())); - } else { - self.addr = Some(Multiaddr::from(ip.into())); - } - self - } - - /// TCP listening port (only required in case of using TCP as transport). - /// If not specified port 7800 will be used as default. - pub fn listening_port(mut self, port: u16) -> Self { - if let Some(addr) = &mut self.addr { - addr.push(Protocol::Tcp(port)); - } else { - self.addr = Some(Multiaddr::from(Protocol::Tcp(port))); - } - self - } - - pub fn with_identifier(mut self, id: PeerId) -> Self { - self.identifier = Some(id); - self - } -} - -impl std::default::Default for InitPeerNode { - fn default() -> Self { - let conf = &CONF; - let identifier = conf.bootstrap_id - .expect("At least one public identifier is required to bootstrap the connection to the network."); - let multi_addr = multiaddr_from_connection((conf.bootstrap_ip, conf.bootstrap_port)); - Self { - addr: Some(multi_addr), - identifier: Some(identifier), - } - } -} - -/// Small helper function to convert a tuple composed of an IP address and a port -/// to a libp2p Multiaddr type. -fn multiaddr_from_connection(conn: (IpAddr, u16)) -> Multiaddr { - let mut addr = Multiaddr::with_capacity(2); - addr.push(Protocol::from(conn.0)); - addr.push(Protocol::Tcp(conn.1)); - addr -} diff --git a/crates/freenet2-node/src/node/in_memory.rs b/crates/freenet2-node/src/node/in_memory.rs deleted file mode 100644 index bacc5bcca..000000000 --- a/crates/freenet2-node/src/node/in_memory.rs +++ /dev/null @@ -1,37 +0,0 @@ -use crate::{conn_manager::in_memory::MemoryConnManager, NodeConfig, PeerKey}; - -use super::op_state::OpStateStorage; - -pub(super) struct InMemory { - peer: PeerKey, - listening: bool, - conn_manager: MemoryConnManager, - tx_storage: OpStateStorage, -} - -impl InMemory { - pub fn build(config: NodeConfig) -> Result { - if (config.local_ip.is_none() || config.local_port.is_none()) - && config.remote_nodes.is_empty() - { - return Err("At least one remote gateway is required to join an existing network for non-gateway nodes."); - } - let peer = PeerKey::from(config.local_key.public()); - let conn_manager = MemoryConnManager::new(true, peer, None); - Ok(InMemory { - peer, - listening: true, - conn_manager, - tx_storage: OpStateStorage::new(), - }) - } - - pub fn listen_on(&mut self) -> Result<(), ()> { - if !self.listening { - return Err(()); - } - - loop {} - Ok(()) - } -} diff --git a/crates/freenet2-node/src/node/op_state.rs b/crates/freenet2-node/src/node/op_state.rs deleted file mode 100644 index 4a9246913..000000000 --- a/crates/freenet2-node/src/node/op_state.rs +++ /dev/null @@ -1,35 +0,0 @@ -use crate::{ - message::Transaction, - operations::{AssociatedTxType, JoinRingOp, OpsMap}, -}; - -pub(super) struct OpStateStorage { - ops: OpsMap, -} - -impl OpStateStorage { - pub fn new() -> Self { - Self { ops: OpsMap::new() } - } - - pub fn push_join_ring_op(&mut self, id: Transaction, tx: JoinRingOp) -> Result<(), ()> { - let op_type = ::tx_type_id(); - if !matches!(id.tx_type(), OP_TYPE) { - return Err(()); - } - self.ops.join_ring.insert(id, tx); - Ok(()) - } - - pub fn pop_join_ring_op(&mut self, id: &Transaction) -> Option { - self.ops.join_ring.remove(id) - } -} - -#[cfg(test)] -mod tests { - #[test] - fn example() { - todo!() - } -} diff --git a/crates/freenet2-node/src/operations.rs b/crates/freenet2-node/src/operations.rs deleted file mode 100644 index b3878eac5..000000000 --- a/crates/freenet2-node/src/operations.rs +++ /dev/null @@ -1,110 +0,0 @@ -//! Keeps track of the join operation state in this machine. -use rust_fsm::*; -use serde::{Deserialize, Serialize}; - -use crate::message::TransactionTypeId; - -pub(crate) use sealed_op_types::{OperationType, OpsMap}; - -state_machine! { - derive(Debug) - pub(crate) JoinRingOp(Connecting) - - Connecting => { - Connecting => OCReceived [OCReceived], - OCReceived => Connected [Connected], - Connected => Connected [Connected], - }, - OCReceived(Connected) => Connected [Connected], -} - -#[derive(Debug, Default)] -pub struct ProbeOp; - -/// Get the transaction type associated to a given operation type. -pub(crate) trait AssociatedTxType: sealed_op_types::SealedAssocTxType { - fn tx_type_id() -> TransactionTypeId; -} - -impl AssociatedTxType for T -where - T: sealed_op_types::SealedAssocTxType, -{ - fn tx_type_id() -> TransactionTypeId { - ::tx_type_id() - } -} - -mod sealed_op_types { - use super::*; - use crate::message::Transaction; - - pub(crate) trait SealedAssocTxType { - fn tx_type_id() -> TransactionTypeId; - } - - macro_rules! op_type_enumeration { - (decl struct { $($field:ident: $var:tt -> $tx_ty:tt),+ } ) => { - #[repr(u8)] - #[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)] - pub(crate) enum OperationType { - $($var,)+ - } - - $( - impl SealedAssocTxType for $var { - fn tx_type_id() -> TransactionTypeId { - TransactionTypeId::$tx_ty - } - } - )+ - - #[derive(Debug)] - pub(crate) struct OpsMap { - $( pub $field: std::collections::HashMap),+, - } - - impl OpsMap { - pub fn new() -> Self { - Self { - $( $field: std::collections::HashMap::new()),+, - } - } - } - }; - } - - op_type_enumeration!(decl struct { - join_ring: JoinRingOp -> OpenConnection, - probe_peers: ProbeOp -> Probe - }); -} - -#[cfg(test)] -mod tests { - use super::*; - use rust_fsm::StateMachine; - - #[test] - fn join_ring_transitions() { - let mut join_op_host_1 = StateMachine::::new(); - let res = join_op_host_1 - .consume(&JoinRingOpInput::Connecting) - .unwrap() - .unwrap(); - assert!(matches!(res, JoinRingOpOutput::OCReceived)); - - let mut join_op_host_2 = StateMachine::::new(); - let res = join_op_host_2 - .consume(&JoinRingOpInput::OCReceived) - .unwrap() - .unwrap(); - assert!(matches!(res, JoinRingOpOutput::Connected)); - - let res = join_op_host_1 - .consume(&JoinRingOpInput::Connected) - .unwrap() - .unwrap(); - assert!(matches!(res, JoinRingOpOutput::Connected)); - } -} diff --git a/crates/freenet2-node/src/ring_proto.rs b/crates/freenet2-node/src/ring_proto.rs deleted file mode 100644 index 3236c4398..000000000 --- a/crates/freenet2-node/src/ring_proto.rs +++ /dev/null @@ -1,781 +0,0 @@ -#![allow(unused)] // FIXME: remove this attr - -use std::{ - collections::{BTreeMap, HashSet}, - convert::TryFrom, - fmt::Display, - hash::Hasher, - sync::Arc, - time::{Duration, Instant}, -}; - -use parking_lot::{Mutex, RwLock}; -use serde::{Deserialize, Serialize}; - -use crate::{ - conn_manager::{self, ConnectionBridge, ListenerHandle, PeerKey, PeerKeyLocation, Transport}, - message::{Message, TransactionType, Transaction}, - ring_proto::messages::{JoinRequest, JoinResponse}, - StdResult, -}; - -type Result = StdResult; - -pub(crate) struct RingProtocol { - pub conn_manager: Arc, - peer_key: PeerKey, - /// A location gets assigned once a node joins the network via a gateway, - /// until then it has no location unless the node is a gateway. - pub location: RwLock>, - gateways: RwLock>, - max_hops_to_live: usize, - rnd_if_htl_above: usize, - pub ring: Ring, -} - -impl RingProtocol -where - T: Transport + 'static, - CM: ConnectionBridge + 'static, -{ - fn new( - conn_manager: CM, - peer_key: PeerKey, - max_hops_to_live: usize, - rnd_if_htl_above: usize, - ) -> Arc { - Arc::new(RingProtocol { - conn_manager: Arc::new(conn_manager), - peer_key, - location: RwLock::new(None), - gateways: RwLock::new(HashSet::new()), - max_hops_to_live, - rnd_if_htl_above, - ring: Ring::new(), - }) - } - - pub fn with_location(self: Arc, loc: Location) -> Arc { - *self.location.write() = Some(loc); - self - } - - fn listen_for_close_conn(&self) { - todo!() - } - - fn listen_for_join_req(self: &Arc) -> ListenerHandle { - let self_cp = self.clone(); - let process_join_req = move |sender: PeerKeyLocation, - msg: Message| - -> conn_manager::Result<()> { - let (tx, join_req) = if let Message::JoinRequest(id, join_req) = msg { - (id, join_req) - } else { - return Err(conn_manager::ConnError::UnexpectedResponseMessage(msg)); - }; - - enum ReqType { - Initial, - Proxy, - } - - let peer_key_loc; - let req_type; - let jr_hpt = match join_req { - messages::JoinRequest::Initial { key, hops_to_live } => { - peer_key_loc = PeerKeyLocation { - peer: key, - location: Some(Location::random()), - }; - req_type = ReqType::Initial; - hops_to_live - } - messages::JoinRequest::Proxy { - joiner, - hops_to_live, - } => { - peer_key_loc = joiner; - req_type = ReqType::Proxy; - hops_to_live - } - }; - log::debug!( - "JoinRequest received by {} with HTL {}", - sender - .location - .ok_or(conn_manager::ConnError::LocationUnknown)?, - jr_hpt - ); - - let your_location = self_cp - .location - .read() - .ok_or(conn_manager::ConnError::LocationUnknown)?; - let accepted_by = if self_cp.ring.should_accept( - &your_location, - &peer_key_loc - .location - .ok_or(conn_manager::ConnError::LocationUnknown)?, - ) { - log::debug!( - "Accepting connections to {:?}, establising connection @ {}", - peer_key_loc, - self_cp.peer_key - ); - self_cp.establish_conn(peer_key_loc, tx); - vec![PeerKeyLocation { - peer: self_cp.peer_key, - location: Some(your_location), - }] - } else { - log::debug!("Not accepting new connection sender {:?}", peer_key_loc); - Vec::new() - }; - - log::debug!( - "Sending JoinResponse to {} accepting {} connections", - sender.peer, - accepted_by.len() - ); - let join_response = match req_type { - ReqType::Initial => Message::from(( - tx, - JoinResponse::Initial { - accepted_by: accepted_by.clone(), - your_location: peer_key_loc - .location - .ok_or(conn_manager::ConnError::LocationUnknown)?, - your_peer_id: peer_key_loc.peer, - }, - )), - ReqType::Proxy => Message::from(( - tx, - JoinResponse::Proxy { - accepted_by: accepted_by.clone(), - }, - )), - }; - self_cp.conn_manager.send(peer_key_loc, tx, join_response)?; - // NOTE: this is in practica a jump to: join_ring.join_response_cb - - if jr_hpt > 0 && !self_cp.ring.connections_by_location.read().is_empty() { - let forward_to = if jr_hpt >= self_cp.rnd_if_htl_above { - log::debug!( - "Randomly selecting peer to forward JoinRequest sender {}", - sender.peer - ); - self_cp.ring.random_peer(|p| p.peer != sender.peer) - } else { - log::debug!( - "Selecting close peer to forward request sender {}", - sender.peer - ); - self_cp - .ring - .connections_by_location - .read() - .get( - &peer_key_loc - .location - .ok_or(conn_manager::ConnError::LocationUnknown)?, - ) - .filter(|it| it.peer != sender.peer) - .copied() - }; - - if let Some(forward_to) = forward_to { - let forwarded = Message::from(( - tx, - JoinRequest::Proxy { - joiner: peer_key_loc, - hops_to_live: jr_hpt.min(self_cp.max_hops_to_live) - 1, - }, - )); - - let forwarded_acceptors = - Arc::new(Mutex::new(accepted_by.into_iter().collect::>())); - - log::debug!( - "Forwarding JoinRequest sender {} to {}", - sender.peer, - forward_to.peer - ); - let self_cp2 = self_cp.clone(); - let register_acceptors = - move |jr_sender: PeerKeyLocation, join_resp| -> conn_manager::Result<()> { - if let Message::JoinResponse(tx, resp) = join_resp { - let new_acceptors = match resp { - JoinResponse::Initial { accepted_by, .. } => accepted_by, - JoinResponse::Proxy { accepted_by, .. } => accepted_by, - }; - let fa = &mut *forwarded_acceptors.lock(); - new_acceptors.iter().for_each(|p| { - if !fa.contains(p) { - fa.insert(*p); - } - }); - let msg = Message::from(( - tx, - JoinResponse::Proxy { - accepted_by: new_acceptors, - }, - )); - self_cp2.conn_manager.send(jr_sender, tx, msg)?; - }; - Ok(()) - }; - self_cp.conn_manager.send_with_callback( - forward_to, - tx, - forwarded, - register_acceptors, - )?; - } - } - Ok(()) - }; - self.conn_manager - .listen(::msg_type_id(), process_join_req) - } - - fn join_ring(self: &Arc) -> Result<()> { - if self.conn_manager.transport().is_open() && self.gateways.read().is_empty() { - match *self.location.read() { - Some(loc) => { - log::info!( - "No gateways to join through, listening for connections at loc: {}", - loc - ); - return Ok(()); - } - None => return Err(RingProtoError::Join), - } - } - - // FIXME: this iteration should be shuffled, must write an extension iterator shuffle items "in place" - // the idea here is to limit the amount of gateways being contacted that's why shuffling is required - for gateway in self.gateways.read().iter() { - log::info!( - "Joining ring via {} at {}", - gateway.peer, - gateway - .location - .ok_or(conn_manager::ConnError::LocationUnknown)? - ); - self.conn_manager.add_connection(*gateway, true); - let tx = Transaction::new(::msg_type_id()); - let join_req = messages::JoinRequest::Initial { - key: self.peer_key, - hops_to_live: self.max_hops_to_live, - }; - log::debug!("Sending {:?} to {}", join_req, gateway.peer); - - let ring_proto = self.clone(); - let join_response_cb = - move |sender: PeerKeyLocation, join_res: Message| -> conn_manager::Result<()> { - let (accepted_by, tx) = if let Message::JoinResponse( - incoming_tx, - messages::JoinResponse::Initial { - accepted_by, - your_location, - .. - }, - ) = join_res - { - log::debug!("JoinResponse received from {}", sender.peer,); - if incoming_tx != tx { - return Err(conn_manager::ConnError::UnexpectedTx(tx, incoming_tx)); - } - let loc = &mut *ring_proto.location.write(); - *loc = Some(your_location); - (accepted_by, incoming_tx) - } else { - return Err(conn_manager::ConnError::UnexpectedResponseMessage(join_res)); - }; - - let self_location = &*ring_proto.location.read(); - let self_location = - &self_location.ok_or(conn_manager::ConnError::LocationUnknown)?; - for new_peer_key in accepted_by { - if ring_proto.ring.should_accept( - self_location, - &new_peer_key - .location - .ok_or(conn_manager::ConnError::LocationUnknown)?, - ) { - log::info!("Establishing connection to {}", new_peer_key.peer); - ring_proto.establish_conn(new_peer_key, tx); - } else { - log::debug!("Not accepting connection to {}", new_peer_key.peer); - } - } - - Ok(()) - }; - log::debug!("Initiating JoinRequest transaction: {}", tx); - let msg: Message = (tx, join_req).into(); - self.conn_manager - .send_with_callback(*gateway, tx, msg, join_response_cb)?; - } - - Ok(()) - } - - fn establish_conn(self: &Arc, new_peer: PeerKeyLocation, tx: Transaction) { - self.conn_manager.add_connection(new_peer, false); - let self_cp = self.clone(); - let state = Arc::new(RwLock::new(messages::OpenConnection::Connecting)); - - let state_cp = state.clone(); - let ack_peer = move |peer: PeerKeyLocation, msg: Message| -> conn_manager::Result<()> { - let (tx, oc) = match msg { - Message::OpenConnection(tx, oc) => (tx, oc), - msg => return Err(conn_manager::ConnError::UnexpectedResponseMessage(msg)), - }; - let mut current_state = state_cp.write(); - current_state.transition(oc); - if !current_state.is_connected() { - let open_conn: Message = (tx, *current_state).into(); - log::debug!("Acknowledging OC"); - self_cp - .conn_manager - .send(peer, *open_conn.id(), open_conn)?; - } else { - log::info!( - "{} connected to {}, adding to ring", - self_cp.peer_key, - new_peer.peer - ); - self_cp.conn_manager.send( - peer, - tx, - Message::from((tx, messages::OpenConnection::Connected)), - )?; - self_cp.ring.connections_by_location.write().insert( - new_peer - .location - .ok_or(conn_manager::ConnError::LocationUnknown)?, - new_peer, - ); - } - Ok(()) - }; - self.conn_manager.listen_to_replies(tx, ack_peer); - - let conn_manager = self.conn_manager.clone(); - tokio::spawn(async move { - let curr_time = Instant::now(); - let mut attempts = 0; - while !state.read().is_connected() && curr_time.elapsed() <= Duration::from_secs(30) { - log::debug!( - "Sending {} to {}, number of messages sent: {}", - *state.read(), - new_peer.peer, - attempts - ); - conn_manager.send(new_peer, tx, Message::OpenConnection(tx, *state.read()))?; - attempts += 1; - tokio::time::sleep(Duration::from_millis(200)).await - } - if curr_time.elapsed() > Duration::from_secs(30) { - log::error!("Timed out trying to connect to {}", new_peer.peer); - Err(conn_manager::ConnError::NegotationFailed) - } else { - conn_manager.remove_listener(tx); - log::info!("Success negotiating connection to {}", new_peer.peer); - Ok(()) - } - }); - } -} - -#[derive(Debug)] -pub(crate) struct Ring { - pub connections_by_location: RwLock>, -} - -impl Ring { - const MIN_CONNECTIONS: usize = 10; - const MAX_CONNECTIONS: usize = 20; - - fn new() -> Self { - Ring { - connections_by_location: RwLock::new(BTreeMap::new()), - } - } - - fn should_accept(&self, my_location: &Location, location: &Location) -> bool { - let cbl = &*self.connections_by_location.read(); - if location == my_location || cbl.contains_key(location) { - false - } else if cbl.len() < Self::MIN_CONNECTIONS { - true - } else if cbl.len() >= Self::MAX_CONNECTIONS { - false - } else { - my_location.distance(location) < self.median_distance_to(my_location) - } - } - - fn median_distance_to(&self, location: &Location) -> Distance { - let mut conn_by_dist = self.connections_by_distance(location); - conn_by_dist.sort_by_key(|(k, _)| *k); - let idx = self.connections_by_location.read().len() / 2; - conn_by_dist[idx].0 - } - - pub fn connections_by_distance(&self, to: &Location) -> Vec<(Distance, PeerKeyLocation)> { - self.connections_by_location - .read() - .iter() - .map(|(key, peer)| (key.distance(to), *peer)) - .collect() - } - - fn random_peer(&self, filter_fn: F) -> Option - where - F: FnMut(&&PeerKeyLocation) -> bool, - { - // FIXME: should be optimized - self.connections_by_location - .read() - .values() - .find(filter_fn) - .copied() - } -} - -/// An abstract location on the 1D ring, represented by a real number on the interal [0, 1] -#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Copy)] -pub struct Location(f64); - -type Distance = Location; - -impl Location { - /// Returns a new random location. - pub fn random() -> Self { - use rand::prelude::*; - let mut rng = rand::thread_rng(); - Location(rng.gen_range(0.0..=1.0)) - } - - /// Compute the distance between two locations. - pub fn distance(&self, other: &Location) -> Distance { - let d = (self.0 - other.0).abs(); - if d < 0.5 { - Location(d) - } else { - Location(1.0 - d) - } - } -} - -impl Display for Location { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_str(self.0.to_string().as_str())?; - Ok(()) - } -} - -impl PartialEq for Location { - fn eq(&self, other: &Self) -> bool { - self.0 == other.0 - } -} - -/// Since we don't allow NaN values in the construction of Location -/// we can safely assume that an equivalence relation holds. -impl Eq for Location {} - -impl Ord for Location { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.partial_cmp(other) - .expect("always should return a cmp value") - } -} - -impl PartialOrd for Location { - fn partial_cmp(&self, other: &Self) -> Option { - self.0.partial_cmp(&other.0) - } -} - -impl std::hash::Hash for Location { - fn hash(&self, state: &mut H) { - let bits = self.0.to_bits(); - state.write_u64(bits); - state.finish(); - } -} - -impl TryFrom for Location { - type Error = (); - - fn try_from(value: f64) -> StdResult { - if !(0.0..=1.0).contains(&value) { - Err(()) - } else { - Ok(Location(value)) - } - } -} - -#[derive(thiserror::Error, Debug)] -pub(crate) enum RingProtoError { - #[error("failed while attempting to join a ring")] - Join, - #[error(transparent)] - ConnError(#[from] conn_manager::ConnError), -} - -pub(crate) mod messages { - use super::*; - - #[derive(Debug, Serialize, Deserialize, Clone)] - pub(crate) enum JoinRequest { - Initial { - key: PeerKey, - hops_to_live: usize, - }, - Proxy { - joiner: PeerKeyLocation, - hops_to_live: usize, - }, - } - - #[derive(Debug, Serialize, Deserialize, Clone)] - pub(crate) enum JoinResponse { - Initial { - accepted_by: Vec, - your_location: Location, - your_peer_id: PeerKey, - }, - Proxy { - accepted_by: Vec, - }, - } - - /// A stateful connection attempt. - #[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] - pub(crate) enum OpenConnection { - OCReceived, - Connecting, - Connected, - } - - impl OpenConnection { - pub fn is_initiated(&self) -> bool { - matches!(self, OpenConnection::Connecting) - } - - pub fn is_connected(&self) -> bool { - matches!(self, OpenConnection::Connected) - } - - pub(super) fn transition(&mut self, other_host_state: Self) { - match (*self, other_host_state) { - (Self::Connected, _) => {} - (_, Self::Connecting) => *self = Self::OCReceived, - (_, Self::OCReceived | Self::Connected) => *self = Self::Connected, - } - } - } - - impl Display for OpenConnection { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "OpenConnection::{:?}", self) - } - } -} - -#[cfg(test)] -mod tests { - use std::collections::HashMap; - - use libp2p::identity; - use rand::Rng; - - use super::{messages::OpenConnection, *}; - use crate::{ - config::tracing::Logger, - conn_manager::in_memory::MemoryConnManager, - message::ProbeRequest, - probe_proto::{self, ProbeProtocol}, - }; - - #[test] - fn open_connection_state_transition() { - let mut oc0 = OpenConnection::Connecting; - let oc1 = OpenConnection::Connecting; - oc0.transition(oc1); - assert_eq!(oc0, OpenConnection::OCReceived); - - let mut oc0 = OpenConnection::Connecting; - let oc1 = OpenConnection::OCReceived; - oc0.transition(oc1); - assert!(oc0.is_connected()); - - let mut oc0 = OpenConnection::Connecting; - let oc1 = OpenConnection::Connected; - oc0.transition(oc1); - assert!(oc0.is_connected()); - - let mut oc0 = OpenConnection::Connecting; - let oc1 = OpenConnection::OCReceived; - oc0.transition(oc1); - assert!(oc0.is_connected()); - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn node0_to_gateway_conn() -> StdResult<(), Box> { - //! Given a network of one node and one gateway test that both are connected. - Logger::init_logger(); - - let ring_protocols = sim_network_builder(1, 1, 0); - tokio::time::sleep(Duration::from_secs(3)).await; - - assert_eq!( - ring_protocols["node-0"] - .ring_protocol - .ring - .connections_by_location - .read() - .len(), - 1 - ); - - assert_eq!( - ring_protocols["gateway"] - .ring_protocol - .ring - .connections_by_location - .read() - .len(), - 1 - ); - - Ok(()) - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn all_nodes_should_connect() -> StdResult<(), Box> { - //! Given a network of 1000 peers all nodes should have connections. - Logger::init_logger(); - - let mut sim_nodes = sim_network_builder(10, 10, 7); - tokio::time::sleep(Duration::from_secs(300)).await; - // let _hist: Vec<_> = _ring_distribution(sim_nodes.values()).collect(); - - const NUM_PROBES: usize = 10; - let mut probe_responses = Vec::with_capacity(NUM_PROBES); - for probe_idx in 0..NUM_PROBES { - let target = Location::random(); - let idx: usize = rand::thread_rng().gen_range(0..sim_nodes.len()); - let rnd_node = sim_nodes - .get_mut(&format!("node-{}", idx)) - .ok_or("node not found")?; - let probe_response = ProbeProtocol::probe( - rnd_node.ring_protocol.clone(), - Transaction::new(::msg_type_id()), - ProbeRequest { - hops_to_live: 7, - target, - }, - ) - .await - .expect("failed to get probe response"); - probe_responses.push(probe_response); - } - // probe_proto::utils::plot_probe_responses(probe_responses); - - let any_empties = sim_nodes - .values() - .map(|node| { - node.ring_protocol - .ring - .connections_by_location - .read() - .is_empty() - }) - .any(|is_empty| is_empty); - assert!(!any_empties); - - Ok(()) - } - - struct SimulatedNode { - ring_protocol: Arc>, - probe_protocol: Option, - } - - fn sim_network_builder( - network_size: usize, - ring_max_htl: usize, - rnd_if_htl_above: usize, - // _per_node_delay: usize, - ) -> HashMap { - let mut nodes = HashMap::new(); - - // build gateway node - let keypair = identity::Keypair::generate_ed25519(); - let gw_key = keypair.public().into(); - let loc = Location::random(); - let conn_manager = MemoryConnManager::new(true, gw_key, Some(loc)); - let ring_protocol = RingProtocol::new(conn_manager, gw_key, ring_max_htl, rnd_if_htl_above) - .with_location(loc); - let probe_protocol = Some(ProbeProtocol::new(ring_protocol.clone(), loc)); - ring_protocol.listen_for_join_req(); - nodes.insert( - "gateway".to_owned(), - SimulatedNode { - ring_protocol, - probe_protocol, - }, - ); - - // add other nodes to the simulation - for node_no in 0..network_size { - let label = format!("node-{}", node_no); - let keypair = identity::Keypair::generate_ed25519(); - let peer_key = keypair.public().into(); - let conn_manager = MemoryConnManager::new(false, peer_key, None); - let ring_protocol = - RingProtocol::new(conn_manager, peer_key, ring_max_htl, rnd_if_htl_above); - ring_protocol.gateways.write().insert(PeerKeyLocation { - peer: gw_key, - location: Some(loc), - }); - ring_protocol.join_ring().unwrap(); - - nodes.insert( - label, - SimulatedNode { - ring_protocol, - probe_protocol: None, - }, - ); - } - nodes - } - - /// Builds an histogram of the distribution in the ring of each node relative to each other. - fn _ring_distribution<'a>( - nodes: impl Iterator + 'a, - ) -> impl Iterator + 'a { - // TODO: groupby certain intervals - // e.g. grouping func: (it * 200.0).roundToInt().toDouble() / 200.0 - nodes - .map(|node| { - let node_ring = &node.ring_protocol.ring; - let self_loc = node.ring_protocol.location.read().unwrap(); - node_ring - .connections_by_location - .read() - .keys() - .map(|d| self_loc.distance(d)) - .collect::>() - }) - .flatten() - } -} diff --git a/crates/freenet2-node/Cargo.toml b/crates/locutus-node/Cargo.toml similarity index 97% rename from crates/freenet2-node/Cargo.toml rename to crates/locutus-node/Cargo.toml index fe900f83f..31e1c7fd8 100644 --- a/crates/freenet2-node/Cargo.toml +++ b/crates/locutus-node/Cargo.toml @@ -13,6 +13,7 @@ doc = false [dependencies] # anyhow = "1.0.42" +async-trait = "0.1.51" bincode = "1.3" config = { version = "0.11", features = [ "toml" ] } crossbeam = "0.8.1" diff --git a/crates/freenet2-node/src/bin/gateway-deploy.rs b/crates/locutus-node/src/bin/gateway-deploy.rs similarity index 79% rename from crates/freenet2-node/src/bin/gateway-deploy.rs rename to crates/locutus-node/src/bin/gateway-deploy.rs index 0382cdf2e..a01150a78 100644 --- a/crates/freenet2-node/src/bin/gateway-deploy.rs +++ b/crates/locutus-node/src/bin/gateway-deploy.rs @@ -5,5 +5,5 @@ use locutus_node::*; async fn main() -> Result<(), Box> { let key = Keypair::generate_ed25519(); let mut node = NodeConfig::default().with_key(key).build_libp2p()?; - node.listen_on().map_err(|_| "failed to start".into()) + node.listen_on().await.map_err(|_| "failed to start".into()) } diff --git a/crates/freenet2-node/src/config.rs b/crates/locutus-node/src/config.rs similarity index 100% rename from crates/freenet2-node/src/config.rs rename to crates/locutus-node/src/config.rs diff --git a/crates/locutus-node/src/conn_manager.rs b/crates/locutus-node/src/conn_manager.rs new file mode 100644 index 000000000..415839320 --- /dev/null +++ b/crates/locutus-node/src/conn_manager.rs @@ -0,0 +1,98 @@ +//! Types and definitions to handle all socket communication for the peer nodes. + +use std::{fmt::Display, net::SocketAddr, sync::atomic::AtomicU64, time::Duration}; + +use libp2p::{core::PublicKey, PeerId}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; + +use crate::{ + message::{Message, Transaction}, + ring::Location, + StdResult, +}; + +pub mod in_memory; + +const PING_EVERY: Duration = Duration::from_secs(30); +const DROP_CONN_AFTER: Duration = Duration::from_secs(30 * 10); + +// pub(crate) type RemoveConnHandler<'t> = Box; +pub(crate) type Result = StdResult; + +#[async_trait::async_trait] +pub(crate) trait ConnectionBridge { + fn add_connection(&mut self, peer: PeerKeyLocation, unsolicited: bool); + + /// # Cancellation Safety + /// This async fn must be cancellation safe! + async fn recv(&self) -> Result; + + async fn send(&self, target: &PeerKeyLocation, msg: Message) -> Result<()>; +} + +/// A protocol used to send and receive data over the network. +pub(crate) trait Transport { + fn is_open(&self) -> bool; + fn location(&self) -> Option; +} + +#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)] +pub struct PeerKey(pub(crate) PeerId); + +impl Display for PeerKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.0) + } +} + +impl From for PeerKey { + fn from(val: PublicKey) -> Self { + PeerKey(PeerId::from(val)) + } +} + +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq, Hash)] +/// The Location of a PeerKey in the ring. This location allows routing towards the peer. +pub(crate) struct PeerKeyLocation { + pub peer: PeerKey, + pub location: Option, +} + +#[derive(Debug, thiserror::Error)] +pub(crate) enum ConnError { + #[error("received unexpected response type for a sent request: {0}")] + UnexpectedResponseMessage(Message), + #[error("location unknown for this node")] + LocationUnknown, + #[error("expected transaction id was {0} but received {1}")] + UnexpectedTx(Transaction, Transaction), + #[error("error while de/serializing message")] + Serialization(#[from] Box), + #[error("connection negotiation between two peers failed")] + NegotationFailed, +} + +mod serialization { + use super::*; + + impl Serialize for PeerKey { + fn serialize(&self, serializer: S) -> StdResult + where + S: Serializer, + { + serializer.serialize_bytes(&self.0.to_bytes()) + } + } + + impl<'de> Deserialize<'de> for PeerKey { + fn deserialize(deserializer: D) -> StdResult + where + D: Deserializer<'de>, + { + let bytes: Vec = Deserialize::deserialize(deserializer)?; + Ok(PeerKey( + PeerId::from_bytes(&bytes).expect("failed deserialization of PeerKey"), + )) + } + } +} diff --git a/crates/locutus-node/src/conn_manager/in_memory.rs b/crates/locutus-node/src/conn_manager/in_memory.rs new file mode 100644 index 000000000..f8f2e5b26 --- /dev/null +++ b/crates/locutus-node/src/conn_manager/in_memory.rs @@ -0,0 +1,166 @@ +//! A in-memory connection manager and transport implementation. Used for testing pourpouses. +use std::{io::Cursor, sync::Arc, time::Duration}; + +use crossbeam::channel::{self, Receiver, Sender}; +use once_cell::sync::OnceCell; +use parking_lot::Mutex; + +use super::{ConnError, Transport}; +use crate::{ + config::tracing::Logger, + conn_manager::{ConnectionBridge, PeerKey, PeerKeyLocation}, + message::Message, + ring::Location, +}; +static NETWORK_WIRES: OnceCell<(Sender, Receiver)> = + OnceCell::new(); + +#[derive(Clone)] +pub(crate) struct MemoryConnManager { + pub transport: InMemoryTransport, + msg_queue: Arc>>, +} + +impl MemoryConnManager { + pub fn new(is_open: bool, peer: PeerKey, location: Option) -> Self { + Logger::init_logger(); + let transport = InMemoryTransport::new(is_open, peer, location); + let msg_queue = Arc::new(Mutex::new(Vec::new())); + + let msg_queue_cp = msg_queue.clone(); + let tr_cp = transport.clone(); + tokio::spawn(async move { + // evaluate the messages as they arrive + loop { + let msg = { tr_cp.msg_stack_queue.lock().pop() }; + if let Some(msg) = msg { + let msg_data: Message = + bincode::deserialize_from(Cursor::new(msg.data)).unwrap(); + if let Some(mut queue) = msg_queue_cp.try_lock() { + queue.push(msg_data); + std::mem::drop(queue); + } + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }); + + Self { + transport, + msg_queue, + } + } +} + +#[async_trait::async_trait] +impl ConnectionBridge for MemoryConnManager { + async fn recv(&self) -> Result { + loop { + if let Some(mut queue) = self.msg_queue.try_lock() { + if let Some(msg) = queue.pop() { + return Ok(msg); + } + std::mem::drop(queue); + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + } + + async fn send(&self, target: &PeerKeyLocation, msg: Message) -> Result<(), ConnError> { + let msg = bincode::serialize(&msg)?; + self.transport.send( + target.peer, + target.location.ok_or(ConnError::LocationUnknown)?, + msg, + ); + Ok(()) + } + + fn add_connection(&mut self, _peer: PeerKeyLocation, _unsolicited: bool) {} +} + +#[derive(Clone, Debug)] +struct MessageOnTransit { + origin: PeerKey, + origin_loc: Option, + target: PeerKey, + data: Vec, +} + +#[derive(Clone, Debug)] +pub struct InMemoryTransport { + interface_peer: PeerKey, + location: Option, + is_open: bool, + /// received messages per each peer awaiting processing + msg_stack_queue: Arc>>, + /// all messages 'traversing' the network at a given time + network: Sender, +} + +impl InMemoryTransport { + fn new(is_open: bool, interface_peer: PeerKey, location: Option) -> Self { + let msg_stack_queue = Arc::new(Mutex::new(Vec::new())); + let (tx, rx) = NETWORK_WIRES.get_or_init(crossbeam::channel::unbounded); + + // store messages incoming from the network in the msg stack + let rcv_msg_c = msg_stack_queue.clone(); + let network = tx.clone(); + let rx = rx.clone(); + tokio::spawn(async move { + loop { + match rx.try_recv() { + Ok(msg) if msg.target == interface_peer => { + log::debug!( + "Inbound message received for peer {} from {}", + interface_peer, + msg.origin + ); + rcv_msg_c.lock().push(msg); + } + Err(channel::TryRecvError::Disconnected) => break, + Err(channel::TryRecvError::Empty) | Ok(_) => { + tokio::time::sleep(Duration::from_millis(10)).await + } + } + } + log::error!("Stopped receiving messages in {}", interface_peer); + }); + + Self { + interface_peer, + location, + is_open, + msg_stack_queue, + network, + } + } + + fn send(&self, peer: PeerKey, location: Location, message: Vec) { + let send_res = self.network.try_send(MessageOnTransit { + origin: self.interface_peer, + origin_loc: Some(location), + target: peer, + data: message, + }); + match send_res { + Err(channel::TrySendError::Disconnected(_)) => { + log::debug!("Network shutdown") + } + Err(channel::TrySendError::Full(_)) => { + unreachable!("not unbounded capacity!") + } + Ok(_) => {} + } + } +} + +impl Transport for InMemoryTransport { + fn is_open(&self) -> bool { + self.is_open + } + + fn location(&self) -> Option { + self.location + } +} diff --git a/crates/freenet2-node/src/lib.rs b/crates/locutus-node/src/lib.rs similarity index 70% rename from crates/freenet2-node/src/lib.rs rename to crates/locutus-node/src/lib.rs index 50d9d5828..24f95be68 100644 --- a/crates/freenet2-node/src/lib.rs +++ b/crates/locutus-node/src/lib.rs @@ -3,10 +3,10 @@ pub mod conn_manager; mod message; mod node; mod operations; -mod probe_proto; -mod ring_proto; +// mod probe_proto; +mod ring; +mod user_events; -pub use conn_manager::PeerKey; pub use node::NodeConfig; type StdResult = std::result::Result; diff --git a/crates/freenet2-node/src/message.rs b/crates/locutus-node/src/message.rs similarity index 56% rename from crates/freenet2-node/src/message.rs rename to crates/locutus-node/src/message.rs index 64f72e90a..33d1df46b 100644 --- a/crates/freenet2-node/src/message.rs +++ b/crates/locutus-node/src/message.rs @@ -1,9 +1,15 @@ +//! Main message type which encapsulated all the messaging between nodes. + use std::{fmt::Display, time::Duration}; use serde::{Deserialize, Serialize}; use uuid::Uuid; -use crate::ring_proto::{messages::*, Location}; +use crate::{ + conn_manager::PeerKeyLocation, + operations::{get::GetMsg, join_ring::JoinRingMsg, put::PutMsg}, + ring::Location, +}; pub(crate) use sealed_msg_type::TransactionTypeId; /// An transaction is a unique, universal and efficient identifier for any @@ -48,14 +54,14 @@ impl Display for Transaction { /// Get the transaction type associated to a given message type. pub(crate) trait TransactionType: sealed_msg_type::SealedTxType { - fn msg_type_id() -> TransactionTypeId; + fn tx_type_id() -> TransactionTypeId; } impl TransactionType for T where T: sealed_msg_type::SealedTxType, { - fn msg_type_id() -> TransactionTypeId { + fn tx_type_id() -> TransactionTypeId { ::tx_type_id() } } @@ -70,88 +76,68 @@ mod sealed_msg_type { #[repr(u8)] #[derive(Debug, PartialEq, Eq, Hash, Clone, Copy, Serialize, Deserialize)] pub(crate) enum TransactionTypeId { - OpenConnection, - Probe, - } - - impl TransactionTypeId { - pub const fn enumeration() -> [Self; 2] { - [Self::OpenConnection, Self::Probe] - } + JoinRing, + Put, + Get, + Canceled, } macro_rules! transaction_type_enumeration { - (decl struct { $($type:tt -> $var:tt),+ }) => { - $( transaction_type_enumeration!(@conversion $type -> $var); )+ - }; - - (@conversion $ty:tt -> $var:tt) => { - impl From<(Transaction, $ty)> for Message { - fn from(oc: (Transaction, $ty)) -> Self { - let (tx_id, oc) = oc; - Self::$ty(tx_id, oc) + (decl struct { $( $var:tt -> $ty:tt),+ }) => { + $( + impl From<$ty> for Message { + fn from(msg: $ty) -> Self { + Self::$var(msg) + } } - } - impl SealedTxType for $ty { - fn tx_type_id() -> TransactionTypeId { - TransactionTypeId::$var + impl SealedTxType for $ty { + fn tx_type_id() -> TransactionTypeId { + TransactionTypeId::$var + } } - } + )+ }; } transaction_type_enumeration!(decl struct { - JoinRequest -> OpenConnection, - JoinResponse -> OpenConnection, - OpenConnection -> OpenConnection, - ProbeRequest -> Probe, - ProbeResponse -> Probe + JoinRing -> JoinRingMsg, + Put -> PutMsg, + Get -> GetMsg }); } #[derive(Debug, Serialize, Deserialize, Clone)] pub(crate) enum Message { - // Ring - JoinRequest(Transaction, JoinRequest), - JoinResponse(Transaction, JoinResponse), - OpenConnection(Transaction, OpenConnection), - - // Probe - ProbeRequest(Transaction, ProbeRequest), - ProbeResponse(Transaction, ProbeResponse), + JoinRing(JoinRingMsg), + Put(PutMsg), + Get(GetMsg), + /// Failed a transaction, informing of cancellation. + Canceled(Transaction), } impl Message { fn msg_type_repr(&self) -> &'static str { - use Message::*; - match self { - JoinRequest(_, _) => "JoinRequest", - JoinResponse(_, _) => "JoinResponse", - OpenConnection(_, _) => "OpenConnection", - ProbeRequest(_, _) => "ProbeRequest", - ProbeResponse(_, _) => "ProbeResponse", - } + todo!() } pub fn id(&self) -> &Transaction { use Message::*; match self { - JoinRequest(id, _) => id, - JoinResponse(id, _) => id, - OpenConnection(id, _) => id, - ProbeRequest(id, _) => id, - ProbeResponse(id, _) => id, + JoinRing(op) => op.id(), + Put(op) => op.id(), + Get(op) => op.id(), + Canceled(tx) => tx, } } - pub fn msg_type(&self) -> TransactionTypeId { + pub fn sender(&self) -> Option<&PeerKeyLocation> { + use Message::*; match self { - Self::JoinRequest(_, _) => ::msg_type_id(), - Self::JoinResponse(_, _) => ::msg_type_id(), - Self::OpenConnection(_, _) => ::msg_type_id(), - Self::ProbeRequest(_, _) => ::msg_type_id(), - Self::ProbeResponse(_, _) => ::msg_type_id(), + JoinRing(op) => op.sender(), + Put(op) => op.sender(), + Get(op) => op.sender(), + Canceled(_) => None, } } } diff --git a/crates/locutus-node/src/node.rs b/crates/locutus-node/src/node.rs new file mode 100644 index 000000000..07f5af5d0 --- /dev/null +++ b/crates/locutus-node/src/node.rs @@ -0,0 +1,371 @@ +//! The main node data type which encapsulates all the behaviour for maintaining a connection +//! and performing operations within the network. +//! +//! # Implementations +//! Node comes with different underlying implementations that can be used upon construction. +//! Those implementations are: +//! - libp2p: all the connection is handled by libp2p. +//! - In memory: a simplifying node used for emulation pourpouses mainly. + +use std::net::IpAddr; + +use libp2p::{identity, multiaddr::Protocol, Multiaddr, PeerId}; + +use crate::{config::CONF, ring::Location}; + +use self::libp2p_impl::NodeLibP2P; +pub(crate) use in_memory::NodeInMemory; +pub(crate) use op_state::{OpExecutionError, OpStateStorage}; + +mod in_memory; +mod libp2p_impl; +mod op_state; + +pub struct Node(NodeImpl); + +impl Node { + pub async fn listen_on(&mut self) -> Result<(), ()> { + match self.0 { + NodeImpl::LibP2P(ref mut node) => node.listen_on().await, + NodeImpl::InMemory(ref mut node) => node.listen_on().await, + } + } +} + +enum NodeImpl { + LibP2P(Box), + InMemory(Box), +} + +/// When instancing a node you can either join an existing network or bootstrap a new network with a listener +/// which will act as the initial provider. This initial peer will be listening at the provided port and assigned IP. +/// If those are not free the instancing process will return an error. +/// +/// In order to bootstrap a new network the following arguments are required to be provided to the builder: +/// - ip: IP associated to the initial node. +/// - port: listening port of the initial node. +/// +/// If both are provided but also additional peers are added via the [`Self::add_provider()`] method, this node will +/// be listening but also try to connect to an existing peer. +pub struct NodeConfig { + /// local peer private key in + local_key: identity::Keypair, + + // optional local info, in case this is an initial bootstrap node + /// IP to bind to the listener + local_ip: Option, + /// socket port to bind to the listener + local_port: Option, + + /// At least an other running listener node is required for joining the network. + /// Not necessary if this is an initial node. + remote_nodes: Vec, + + /// the location of this node, used for gateways. + location: Option, +} + +impl NodeConfig { + pub fn new() -> NodeConfig { + let local_key = if let Some(key) = &CONF.local_peer_keypair { + key.clone() + } else { + identity::Keypair::generate_ed25519() + }; + NodeConfig { + local_key, + remote_nodes: Vec::with_capacity(1), + local_ip: None, + local_port: None, + location: None, + } + } + + pub fn with_port(mut self, port: u16) -> Self { + self.local_port = Some(port); + self + } + + pub fn with_ip>(mut self, ip: T) -> Self { + self.local_ip = Some(ip.into()); + self + } + + /// Optional identity key of this node. + /// If not provided it will be either obtained from the configuration or freshly generated. + pub fn with_key(mut self, key: identity::Keypair) -> Self { + self.local_key = key; + self + } + + pub fn with_location(mut self, loc: Location) -> Self { + self.location = Some(loc); + self + } + + /// Connection info for an already existing peer. Required in case this is not a bootstrapping node. + pub fn add_provider(mut self, peer: InitPeerNode) -> Self { + self.remote_nodes.push(peer); + self + } + + /// Builds a node using libp2p as backend connection manager. + pub fn build_libp2p(self) -> std::io::Result { + Ok(Node(NodeImpl::LibP2P(Box::new(NodeLibP2P::build(self)?)))) + } + + /// Builds a node using in-memory transport. Used for testing pourpouses. + pub fn build_in_memory(self) -> Result { + let inmem = NodeInMemory::build(self)?; + Ok(Node(NodeImpl::InMemory(Box::new(inmem)))) + } +} + +impl Default for NodeConfig { + fn default() -> Self { + Self::new() + } +} + +/// Initial listening peer node to bootstrap the network. +#[derive(Clone)] +pub struct InitPeerNode { + addr: Option, + identifier: Option, + location: Option, +} + +impl InitPeerNode { + pub fn new() -> Self { + Self { + addr: None, + identifier: None, + location: None, + } + } + + /// Given a byte array decode into a PeerId data type. + /// + /// # Panic + /// Will panic if is not a valid representation. + pub fn decode_peer_id>(mut bytes: T) -> PeerId { + PeerId::from_public_key( + identity::Keypair::Ed25519(identity::ed25519::Keypair::decode(bytes.as_mut()).unwrap()) + .public(), + ) + } + + /// IP which will be assigned to this node. + pub fn listening_ip>(mut self, ip: T) -> Self { + if let Some(addr) = &mut self.addr { + addr.push(Protocol::from(ip.into())); + } else { + self.addr = Some(Multiaddr::from(ip.into())); + } + self + } + + /// TCP listening port (only required in case of using TCP as transport). + /// If not specified port 7800 will be used as default. + pub fn listening_port(mut self, port: u16) -> Self { + if let Some(addr) = &mut self.addr { + addr.push(Protocol::Tcp(port)); + } else { + self.addr = Some(Multiaddr::from(Protocol::Tcp(port))); + } + self + } + + pub fn with_identifier(mut self, id: PeerId) -> Self { + self.identifier = Some(id); + self + } + + pub fn with_location(mut self, loc: Location) -> Self { + self.location = Some(loc); + self + } +} + +impl std::default::Default for InitPeerNode { + fn default() -> Self { + let conf = &CONF; + let identifier = conf.bootstrap_id + .expect("At least one public identifier is required to bootstrap the connection to the network."); + let multi_addr = multiaddr_from_connection((conf.bootstrap_ip, conf.bootstrap_port)); + Self { + addr: Some(multi_addr), + identifier: Some(identifier), + location: None, + } + } +} + +/// Small helper function to convert a tuple composed of an IP address and a port +/// to a libp2p Multiaddr type. +fn multiaddr_from_connection(conn: (IpAddr, u16)) -> Multiaddr { + let mut addr = Multiaddr::with_capacity(2); + addr.push(Protocol::from(conn.0)); + addr.push(Protocol::Tcp(conn.1)); + addr +} + +#[cfg(test)] +pub mod test_utils { + use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, TcpListener}; + + use libp2p::identity; + use rand::Rng; + use tokio::sync::mpsc; + + use crate::{ + conn_manager::{ConnectionBridge, PeerKey, Transport}, + message::Message, + node::{InitPeerNode, NodeInMemory}, + operations::{join_ring::join_ring_op, OpError}, + ring::{Distance, Location}, + NodeConfig, + }; + + pub fn get_free_port() -> Result { + let mut port; + for _ in 0..100 { + port = get_dynamic_port(); + let bind_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, port)); + if let Ok(conn) = TcpListener::bind(bind_addr) { + std::mem::drop(conn); + return Ok(port); + } + } + Err(()) + } + + pub fn get_dynamic_port() -> u16 { + const FIRST_DYNAMIC_PORT: u16 = 49152; + const LAST_DYNAMIC_PORT: u16 = 65535; + rand::thread_rng().gen_range(FIRST_DYNAMIC_PORT..LAST_DYNAMIC_PORT) + } + + pub(crate) struct SimNetwork { + // gateways: HashMap, + // peers: HashMap, + meta_info_tx: mpsc::Sender>, + meta_info_rx: mpsc::Receiver>, + } + + pub(crate) struct NetEvent { + pub(crate) _sender: String, + pub(crate) event: EventType, + } + + pub(crate) enum EventType { + /// A peer joined the network through some gateway. + JoinSuccess { gateway: PeerKey, new_node: PeerKey }, + } + + impl SimNetwork { + pub fn build( + network_size: usize, + ring_max_htl: usize, + rnd_if_htl_above: usize, + ) -> SimNetwork { + let sim = SimNetwork::new(); + + // build gateway node + // let probe_protocol = Some(ProbeProtocol::new(ring_protocol.clone(), loc)); + let gateway_pair = identity::Keypair::generate_ed25519(); + let gateway_peer_id = gateway_pair.public().into_peer_id(); + let gateway_port = get_free_port().unwrap(); + let gateway_loc = Location::random(); + let config = NodeConfig::new() + .with_ip(Ipv6Addr::LOCALHOST) + .with_port(gateway_port) + .with_key(gateway_pair) + .with_location(gateway_loc); + let gateway = NodeInMemory::build(config).unwrap(); + sim.initialize_gateway(gateway, "gateway".to_owned()); + + // add other nodes to the simulation + for node_no in 0..network_size { + let label = format!("node-{}", node_no); + let config = NodeConfig::new().add_provider( + InitPeerNode::new() + .listening_ip(Ipv6Addr::LOCALHOST) + .listening_port(gateway_port) + .with_identifier(gateway_peer_id) + .with_location(gateway_loc), + ); + sim.initialize_peer(NodeInMemory::build(config).unwrap(), label); + } + sim + } + + pub async fn recv_net_events(&mut self) -> Option> { + self.meta_info_rx.recv().await + } + + fn new() -> Self { + let (meta_info_tx, meta_info_rx) = mpsc::channel(100); + Self { + meta_info_rx, + meta_info_tx, + } + } + + fn initialize_gateway(&self, gateway: NodeInMemory, sender_label: String) { + let info_ch = self.meta_info_tx.clone(); + tokio::spawn(Self::listen(gateway, info_ch, sender_label)); + } + + fn initialize_peer(&self, mut peer: NodeInMemory, sender_label: String) { + let info_ch = self.meta_info_tx.clone(); + tokio::spawn(async move { + if peer.start().await.is_err() { + let _ = info_ch.send(Err(OpError::IllegalStateTransition)).await; + return Err(()); + } + Self::listen(peer, info_ch, sender_label).await + }); + } + + async fn listen( + mut gateway: NodeInMemory, + info_ch: mpsc::Sender>, + _sender: String, + ) -> Result<(), ()> { + while let Ok(msg) = gateway.conn_manager.recv().await { + if let Message::JoinRing(msg) = msg { + if let Err(err) = + join_ring_op(&mut gateway.op_storage, &mut gateway.conn_manager, msg).await + { + let _ = info_ch.send(Err(err)).await; + return Err(()); + } + } else { + break; + } + } + Ok(()) + } + } + + /// Builds an histogram of the distribution in the ring of each node relative to each other. + fn _ring_distribution<'a>( + nodes: impl Iterator + 'a, + ) -> impl Iterator + 'a { + // TODO: groupby certain intervals + // e.g. grouping func: (it * 200.0).roundToInt().toDouble() / 200.0 + nodes + .map(|node| { + let node_ring = &node.op_storage.ring; + let self_loc = node.conn_manager.transport.location().unwrap(); + node_ring + .connections_by_location + .read() + .keys() + .map(|d| self_loc.distance(d)) + .collect::>() + }) + .flatten() + } +} diff --git a/crates/locutus-node/src/node/in_memory.rs b/crates/locutus-node/src/node/in_memory.rs new file mode 100644 index 000000000..995bb48aa --- /dev/null +++ b/crates/locutus-node/src/node/in_memory.rs @@ -0,0 +1,118 @@ +use crate::{ + conn_manager::{in_memory::MemoryConnManager, ConnectionBridge, PeerKey, PeerKeyLocation}, + message::Message, + operations::{get, join_ring, put}, + user_events::{test_utils::MemoryEventsGen, UserEvent, UserEventsProxy}, + NodeConfig, +}; + +use super::{op_state::OpStateStorage, InitPeerNode}; + +pub(crate) struct NodeInMemory +where + Ev: UserEventsProxy, +{ + peer: PeerKey, + gateways: Vec, + pub conn_manager: MemoryConnManager, + pub op_storage: OpStateStorage, + user_events: Ev, +} + +impl NodeInMemory { + pub fn build(config: NodeConfig) -> Result { + if (config.local_ip.is_none() || config.local_port.is_none()) + && config.remote_nodes.is_empty() + { + return Err("At least one remote gateway is required to join an existing network for non-gateway nodes."); + } + let peer = PeerKey::from(config.local_key.public()); + let conn_manager = MemoryConnManager::new(true, peer, None); + let gateways = config + .remote_nodes + .into_iter() + .filter_map(|node| { + let InitPeerNode { + identifier, + location, + .. + } = node; + location.zip(identifier).map(|(loc, id)| PeerKeyLocation { + peer: PeerKey(id), + location: Some(loc), + }) + }) + .collect(); + Ok(NodeInMemory { + peer, + conn_manager, + op_storage: OpStateStorage::new(), + gateways, + user_events: MemoryEventsGen {}, + }) + } + + pub async fn start(&mut self) -> Result<(), ()> { + // FIXME: this iteration should be shuffled, must write an extension iterator shuffle items "in place" + // the idea here is to limit the amount of gateways being contacted that's why shuffling is required + for gateway in &self.gateways { + // initiate join action action per each gateway + let op = join_ring::JoinRingOp::initial_request( + self.peer, + *gateway, + self.op_storage.ring.max_hops_to_live, + ); + join_ring::initial_join_request(&mut self.op_storage, &mut self.conn_manager, op) + .await + .unwrap(); + } + Ok(()) + } + + /// Starts listening to incoming messages, only allowed to be called directly when this node + /// already joined the network. + pub async fn listen_on(&mut self) -> Result<(), ()> { + self.start().await?; + loop { + tokio::select! { + msg = self.conn_manager.recv() => { + match msg { + Ok(msg) => match msg { + Message::JoinRing(op) => { + join_ring::join_ring_op(&mut self.op_storage, &mut self.conn_manager, op) + .await + .unwrap(); + } + Message::Put(op) => { + put::put_op(&mut self.op_storage, &mut self.conn_manager, op) + .await + .unwrap(); + } + Message::Get(op) => { + get::get_op(&mut self.op_storage, &mut self.conn_manager, op) + .await + .unwrap(); + } + Message::Canceled(_) => todo!(), + }, + Err(_) => break Err(()), + } + } + usr_event = self.user_events.recv() => { + match usr_event { + UserEvent::Put { key, value } => { + // Initialize a put op. + let op = put::PutOp::new(key, value); + put::request_put(&mut self.op_storage, &mut self.conn_manager, op).await.unwrap(); + } + UserEvent::Get { key } => { + // Initialize a get op. + let op = get::GetOp::new(key); + get::request_get(&mut self.op_storage, &mut self.conn_manager, op).await.unwrap(); + } + } + } + } + } + } +} diff --git a/crates/freenet2-node/src/node/libp2p_impl.rs b/crates/locutus-node/src/node/libp2p_impl.rs similarity index 89% rename from crates/freenet2-node/src/node/libp2p_impl.rs rename to crates/locutus-node/src/node/libp2p_impl.rs index 3073cf4d3..8fd767882 100644 --- a/crates/freenet2-node/src/node/libp2p_impl.rs +++ b/crates/locutus-node/src/node/libp2p_impl.rs @@ -25,7 +25,7 @@ pub struct NodeLibP2P { } impl NodeLibP2P { - pub(super) fn listen_on(&mut self) -> Result<(), ()> { + pub(super) async fn listen_on(&mut self) -> Result<(), ()> { if let Some(conn) = self.listen_on { let listening_addr = super::multiaddr_from_connection(conn); self.swarm.listen_on(listening_addr).unwrap(); @@ -155,38 +155,15 @@ impl From for NetEvent { #[cfg(test)] mod tests { - use std::{ - net::{Ipv4Addr, SocketAddr, TcpListener}, - time::Duration, - }; + use std::{net::Ipv4Addr, time::Duration}; + use super::*; use crate::{ config::tracing::Logger, - node::{InitPeerNode, Node}, + node::{test_utils::get_free_port, InitPeerNode}, }; - use super::*; use libp2p::{futures::StreamExt, swarm::SwarmEvent}; - use rand::Rng; - - fn get_free_port() -> Result { - let mut port; - for _ in 0..100 { - port = get_dynamic_port(); - let bind_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, port)); - if let Ok(conn) = TcpListener::bind(bind_addr) { - std::mem::drop(conn); - return Ok(port); - } - } - Err(()) - } - - fn get_dynamic_port() -> u16 { - const FIRST_DYNAMIC_PORT: u16 = 49152; - const LAST_DYNAMIC_PORT: u16 = 65535; - rand::thread_rng().gen_range(FIRST_DYNAMIC_PORT..LAST_DYNAMIC_PORT) - } /// Ping test event loop async fn ping_ev_loop(peer: &mut NodeLibP2P) -> Result<(), ()> { @@ -228,7 +205,7 @@ mod tests { .with_port(peer1_port) .with_key(peer1_key); let mut peer1 = NodeLibP2P::build(config).unwrap(); - peer1.listen_on().unwrap(); + peer1.listen_on().await.unwrap(); ping_ev_loop(&mut peer1).await }); diff --git a/crates/locutus-node/src/node/op_state.rs b/crates/locutus-node/src/node/op_state.rs new file mode 100644 index 000000000..1da40c5af --- /dev/null +++ b/crates/locutus-node/src/node/op_state.rs @@ -0,0 +1,71 @@ +use std::{collections::HashMap, sync::Arc}; + +use crate::{ + message::{Transaction, TransactionTypeId}, + operations::{get::GetOp, join_ring::JoinRingOp, put::PutOp, Operation}, + ring::Ring, +}; + +pub(crate) struct OpStateStorage { + join_ring: HashMap, + put: HashMap, + get: HashMap, + pub ring: Arc, +} + +macro_rules! check_id_op { + ($get_ty:expr, $var:path) => { + if !matches!($get_ty, $var) { + return Err(OpExecutionError::IncorrectTxType( + TransactionTypeId::JoinRing, + $get_ty, + )); + } + }; +} + +impl OpStateStorage { + pub fn new() -> Self { + Self { + join_ring: HashMap::default(), + put: HashMap::default(), + get: HashMap::default(), + ring: Arc::new(Ring::new()), + } + } + + pub fn push(&mut self, id: Transaction, op: Operation) -> Result<(), OpExecutionError> { + match op { + Operation::JoinRing(tx) => { + check_id_op!(id.tx_type(), TransactionTypeId::JoinRing); + self.join_ring.insert(id, tx); + } + Operation::Put(tx) => { + check_id_op!(id.tx_type(), TransactionTypeId::Put); + self.put.insert(id, tx); + } + Operation::Get(tx) => { + check_id_op!(id.tx_type(), TransactionTypeId::Put); + self.get.insert(id, tx); + } + } + Ok(()) + } + + pub fn pop(&mut self, id: &Transaction) -> Option { + match id.tx_type() { + TransactionTypeId::JoinRing => self.join_ring.remove(id).map(Operation::JoinRing), + TransactionTypeId::Put => self.put.remove(id).map(Operation::Put), + TransactionTypeId::Get => self.get.remove(id).map(Operation::Get), + TransactionTypeId::Canceled => todo!(), + } + } +} + +#[derive(Debug, thiserror::Error, Clone)] +pub(crate) enum OpExecutionError { + #[error("unspected transaction type, trying to get a {0:?} from a {1:?}")] + IncorrectTxType(TransactionTypeId, TransactionTypeId), + #[error("failed while processing transaction {0}")] + TxUpdateFailure(Transaction), +} diff --git a/crates/locutus-node/src/operations.rs b/crates/locutus-node/src/operations.rs new file mode 100644 index 000000000..3234a4324 --- /dev/null +++ b/crates/locutus-node/src/operations.rs @@ -0,0 +1,32 @@ +use crate::{conn_manager, message::Message, node::OpExecutionError}; + +pub(crate) mod get; +pub(crate) mod join_ring; +pub(crate) mod put; +pub(crate) mod subscribe; + +pub(crate) struct OperationResult { + /// Inhabited if there is a message to return to the other peer. + pub return_msg: Option, + /// None if the operation has been completed. + pub state: Option, +} + +pub(crate) enum Operation { + JoinRing(join_ring::JoinRingOp), + Put(put::PutOp), + Get(get::GetOp), +} + +#[derive(Debug, Default)] +pub struct ProbeOp; + +#[derive(Debug, thiserror::Error)] +pub(crate) enum OpError { + #[error(transparent)] + ConnError(#[from] conn_manager::ConnError), + #[error(transparent)] + OpStateManagerError(#[from] OpExecutionError), + #[error("illegal awaiting state")] + IllegalStateTransition, +} diff --git a/crates/locutus-node/src/operations/get.rs b/crates/locutus-node/src/operations/get.rs new file mode 100644 index 000000000..d20da92c2 --- /dev/null +++ b/crates/locutus-node/src/operations/get.rs @@ -0,0 +1,102 @@ +use rust_fsm::{StateMachine, StateMachineImpl}; + +use crate::{conn_manager::ConnectionBridge, message::Transaction, node::OpStateStorage}; + +pub(crate) use self::messages::GetMsg; + +use super::OpError; + +/// This is just a placeholder for now! +pub(crate) struct GetOp(StateMachine); + +impl GetOp { + pub fn new(key: Vec) -> Self { + let mut state = StateMachine::new(); + state.consume(&GetMsg::FetchRouting { key }).unwrap(); + GetOp(state) + } +} + +struct GetOpSM; + +impl StateMachineImpl for GetOpSM { + type Input = GetMsg; + + type State = GetState; + + type Output = GetMsg; + + const INITIAL_STATE: Self::State = GetState::Initializing; + + fn transition(state: &Self::State, inget: &Self::Input) -> Option { + match (state, inget) { + (GetState::Initializing, GetMsg::FetchRouting { key }) => { + Some(GetState::Requesting { key: key.clone() }) + } + _ => None, + } + } + + fn output(state: &Self::State, inget: &Self::Input) -> Option { + match (state, inget) { + (GetState::Initializing, GetMsg::FetchRouting { key }) => { + Some(GetMsg::FetchRouting { key: key.clone() }) + } + _ => None, + } + } +} + +enum GetState { + Initializing, + Requesting { key: Vec }, +} + +pub(crate) async fn get_op( + op_storage: &mut OpStateStorage, + conn_manager: &mut CB, + get_op: GetMsg, +) -> Result<(), OpError> +where + CB: ConnectionBridge, +{ + Ok(()) +} + +/// Request to get the current value from a contract. +pub(crate) async fn request_get( + op_storage: &mut OpStateStorage, + conn_manager: &mut CB, + get_op: GetOp, +) -> Result<(), OpError> +where + CB: ConnectionBridge, +{ + // the initial request must provide: + // - a location in the network where the contract resides + // - and the value to get + todo!() +} + +mod messages { + use crate::conn_manager::PeerKeyLocation; + + use super::*; + + use serde::{Deserialize, Serialize}; + + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] + pub(crate) enum GetMsg { + FetchRouting { key: Vec }, + } + + impl GetMsg { + pub fn id(&self) -> &Transaction { + todo!() + } + + pub fn sender(&self) -> Option<&PeerKeyLocation> { + todo!() + } + } +} diff --git a/crates/locutus-node/src/operations/join_ring.rs b/crates/locutus-node/src/operations/join_ring.rs new file mode 100644 index 000000000..d0e8d889d --- /dev/null +++ b/crates/locutus-node/src/operations/join_ring.rs @@ -0,0 +1,732 @@ +use std::collections::HashSet; + +use rust_fsm::*; + +use super::{OpError, OperationResult}; +use crate::{ + conn_manager::{self, ConnectionBridge, PeerKey, PeerKeyLocation}, + message::{Message, Transaction, TransactionType}, + node::{OpExecutionError, OpStateStorage}, + operations::Operation, + ring::{Location, Ring}, +}; + +pub(crate) use self::messages::{JoinRequest, JoinResponse, JoinRingMsg}; + +pub(crate) struct JoinRingOp(StateMachine); + +impl JoinRingOp { + pub fn initial_request( + req_peer: PeerKey, + target_loc: PeerKeyLocation, + max_hops_to_live: usize, + ) -> Self { + let mut sm = StateMachine::new(); + sm.consume(&JoinRingMsg::Req { + id: Transaction::new(::tx_type_id()), + msg: JoinRequest::Initial { + req_peer, + target_loc, + max_hops_to_live, + // initially is the max hops, will be decreased over each hop + hops_to_live: max_hops_to_live, + }, + }) + .unwrap(); + JoinRingOp(sm) + } +} + +#[derive(Debug)] +struct JROpSM; + +impl StateMachineImpl for JROpSM { + type Input = JoinRingMsg; + + type State = JRState; + + type Output = JoinRingMsg; + + const INITIAL_STATE: Self::State = JRState::Initializing; + + fn transition(state: &Self::State, input: &Self::Input) -> Option { + match (state, input) { + ( + JRState::Initializing, + JoinRingMsg::Req { + msg: + JoinRequest::Initial { + req_peer, + target_loc, + max_hops_to_live, + .. + }, + .. + }, + ) => Some(JRState::Connecting(ConnectionInfo { + gateway: *target_loc, + this_peer: *req_peer, + max_hops_to_live: *max_hops_to_live, + })), + ( + JRState::Connecting { .. } | JRState::Initializing, + JoinRingMsg::Resp { + msg: JoinResponse::ReceivedOC { .. }, + .. + }, + ) => Some(JRState::OCReceived), + ( + JRState::Connecting { .. } | JRState::OCReceived, + JoinRingMsg::Req { .. } | JoinRingMsg::Connected { .. }, + ) => Some(JRState::Connected), + (JRState::Connected, _) => None, + _ => None, + } + } + + fn output(state: &Self::State, input: &Self::Input) -> Option { + match (state, input) { + ( + JRState::Initializing, + JoinRingMsg::Req { + id, + msg: + JoinRequest::Initial { + target_loc, + req_peer, + .. + }, + }, + ) => Some(JoinRingMsg::Resp { + id: *id, + msg: JoinResponse::ReceivedOC { + by_peer: *target_loc, + }, + sender: PeerKeyLocation { + peer: *req_peer, + location: None, + }, + }), + ( + JRState::Initializing | JRState::Connecting(_), + JoinRingMsg::Resp { + msg: JoinResponse::ReceivedOC { .. }, + .. + } + | JoinRingMsg::Connected, + ) => Some(JoinRingMsg::Connected), + (JRState::OCReceived, JoinRingMsg::Connected) => Some(JoinRingMsg::Connected), + _ => None, + } + } +} + +#[derive(Debug, Clone)] +enum JRState { + Initializing, + Connecting(ConnectionInfo), + OCReceived, + Connected, +} + +#[derive(Debug, Clone)] +struct ConnectionInfo { + gateway: PeerKeyLocation, + this_peer: PeerKey, + max_hops_to_live: usize, +} + +impl JRState { + fn try_unwrap_connecting(self) -> Result { + if let Self::Connecting(conn_info) = self { + Ok(conn_info) + } else { + Err(OpError::IllegalStateTransition) + } + } +} + +/// Join ring routine, called upon processing a request to join or while performing +/// a join operation for this node after initial request (see [`initial_join_request`]). +/// +/// # Cancellation Safety +/// This future is not cancellation safe. +pub(crate) async fn join_ring_op( + op_storage: &mut OpStateStorage, + conn_manager: &mut CB, + join_op: JoinRingMsg, +) -> Result<(), OpError> +where + CB: ConnectionBridge, +{ + let sender; + let tx = *join_op.id(); + let result = match op_storage.pop(join_op.id()) { + Some(Operation::JoinRing(state)) => { + sender = join_op.sender().cloned(); + // was an existing operation, the other peer messaged back + update_state(conn_manager, state, join_op, &op_storage.ring).await + } + Some(_) => return Err(OpExecutionError::TxUpdateFailure(tx).into()), + None => { + sender = join_op.sender().cloned(); + // new request to join from this node, initialize the machine + let machine = JoinRingOp(StateMachine::new()); + update_state(conn_manager, machine, join_op, &op_storage.ring).await + } + }; + + match result { + Err(err) => { + log::error!("error while processing join request: {}", err); + if let Some(sender) = sender { + conn_manager.send(&sender, Message::Canceled(tx)).await?; + } + return Err(err); + } + Ok(OperationResult { + return_msg: Some(msg), + state: Some(updated_state), + }) => { + // updated op + let id = *msg.id(); + if let Some(target) = msg.sender().cloned() { + conn_manager.send(&target, msg).await?; + } + op_storage.push(id, Operation::JoinRing(updated_state))?; + } + Ok(OperationResult { + return_msg: Some(msg), + state: None, + }) => { + // finished the operation at this node, informing back + if let Some(target) = msg.sender().cloned() { + conn_manager.send(&target, msg).await?; + } + } + Ok(OperationResult { + return_msg: None, + state: None, + }) => { + // operation finished_completely + } + _ => unreachable!(), + } + Ok(()) +} + +#[inline(always)] +async fn update_state( + conn_manager: &mut CB, + state: JoinRingOp, + other_host_msg: JoinRingMsg, + ring: &Ring, +) -> Result, OpError> +where + CB: ConnectionBridge, +{ + let return_msg; + let new_state; + match other_host_msg { + JoinRingMsg::Req { + id, + msg: + JoinRequest::Initial { + target_loc: your_location, + req_peer, + hops_to_live, + max_hops_to_live, + }, + } => { + log::debug!( + "Initial join request received by {} with HTL {}", + req_peer, + hops_to_live + ); + + let new_location = Location::random(); + let accepted_by = if ring.should_accept( + &your_location + .location + .ok_or(OpExecutionError::TxUpdateFailure(id))?, + &new_location, + ) { + log::debug!( + "Accepting connections from {}, establising connection @ {}", + req_peer, + &your_location.peer + ); + // FIXME: self_cp.establish_conn(peer_key_loc, tx); + vec![your_location] + } else { + log::debug!("Not accepting new connection for sender {}", req_peer); + Vec::new() + }; + + log::debug!( + "Sending JoinResponse to {} accepting {} connections", + req_peer, + accepted_by.len() + ); + + let join_response = Message::from(JoinRingMsg::Resp { + id, + sender: your_location, + msg: JoinResponse::Initial { + accepted_by: accepted_by.clone(), + your_location: new_location, + your_peer_id: req_peer, + }, + }); + let new_peer_loc = PeerKeyLocation { + location: Some(new_location), + peer: req_peer, + }; + + if hops_to_live > 0 && !ring.connections_by_location.read().is_empty() { + let forward_to = if hops_to_live >= ring.rnd_if_htl_above { + log::debug!( + "Randomly selecting peer to forward JoinRequest, sender: {}", + req_peer + ); + ring.random_peer(|p| p.peer != req_peer) + } else { + log::debug!( + "Selecting close peer to forward request, sender: {}", + req_peer + ); + ring.connections_by_location + .read() + .get(&new_location) + .filter(|it| it.peer != req_peer) + .copied() + }; + + if let Some(forward_to) = forward_to { + let forwarded = Message::from(JoinRingMsg::Req { + id, + msg: JoinRequest::Proxy { + joiner: new_peer_loc, + hops_to_live: hops_to_live.min(ring.max_hops_to_live) - 1, + }, + }); + log::debug!( + "Forwarding JoinRequest from sender {} to {}", + req_peer, + forward_to.peer + ); + conn_manager.send(&forward_to, forwarded).await?; + let _forwarded_acceptors = accepted_by.into_iter().collect::>(); + // this will would jump to JoinRingMsg::Resp::JoinResponse::Proxy after peer return + // TODO: add a new state that transits from Connecting -> WaitingProxyResponse + todo!() + } else { + new_state = Some(state); + return_msg = Some(join_response); + } + } else { + new_state = Some(state); + return_msg = Some(join_response); + } + } + JoinRingMsg::Req { + id, + msg: + JoinRequest::Proxy { + joiner, + hops_to_live, + }, + } => { + todo!() + } + JoinRingMsg::Resp { + id, + sender, + msg: + JoinResponse::Initial { + accepted_by, + your_location, + your_peer_id, + }, + } => { + log::debug!("JoinResponse received from {}", sender.peer,); + // state.0.consume(input); + + // let loc = &mut *ring.location.write(); + // *loc = Some(your_location); + // let self_location = &*ring_proto.location.read(); + // let self_location = &self_location.ok_or(conn_manager::ConnError::LocationUnknown)?; + // for new_peer_key in accepted_by { + // if ring_proto.ring.should_accept( + // self_location, + // &new_peer_key + // .location + // .ok_or(conn_manager::ConnError::LocationUnknown)?, + // ) { + // log::info!("Establishing connection to {}", new_peer_key.peer); + // ring_proto.establish_conn(new_peer_key, tx); + // } else { + // log::debug!("Not accepting connection to {}", new_peer_key.peer); + // } + // } + todo!() + } + JoinRingMsg::Resp { + id, + sender, + msg: JoinResponse::Proxy { accepted_by }, + } => { + // let register_acceptors = + // move |jr_sender: PeerKeyLocation, join_resp| -> conn_manager::Result<()> { + // if let Message::JoinResponse(tx, resp) = join_resp { + // let new_acceptors = match resp { + // JoinResponse::Initial { accepted_by, .. } => accepted_by, + // JoinResponse::Proxy { accepted_by, .. } => accepted_by, + // }; + // let fa = &mut *forwarded_acceptors.lock(); + // new_acceptors.iter().for_each(|p| { + // if !fa.contains(p) { + // fa.insert(*p); + // } + // }); + // let msg = Message::from(( + // tx, + // JoinResponse::Proxy { + // accepted_by: new_acceptors, + // }, + // )); + // self_cp2.conn_manager.send(jr_sender, tx, msg)?; + // }; + // Ok(()) + // }; + todo!() + } + JoinRingMsg::Resp { + id, + sender, + msg: JoinResponse::ReceivedOC { .. }, + } => { + // + todo!() + } + JoinRingMsg::Connected => todo!(), + } + + Ok(OperationResult { + return_msg, + state: new_state, + }) +} + +/// Join ring routine, called upon performing a join operation for this node. +pub(crate) async fn initial_join_request( + op_storage: &mut OpStateStorage, + conn_manager: &mut CB, + join_op: JoinRingOp, +) -> Result<(), OpError> +where + CB: ConnectionBridge, +{ + let ConnectionInfo { + gateway, + this_peer, + max_hops_to_live, + } = (&join_op.0).state().clone().try_unwrap_connecting()?; + + log::info!( + "Joining ring via {} (@{})", + gateway.peer, + gateway + .location + .ok_or(conn_manager::ConnError::LocationUnknown)? + ); + + conn_manager.add_connection(gateway, true); + let tx = Transaction::new(::tx_type_id()); + let join_req = Message::from(messages::JoinRingMsg::Req { + id: tx, + msg: messages::JoinRequest::Initial { + target_loc: gateway, + req_peer: this_peer, + hops_to_live: max_hops_to_live, + max_hops_to_live, + }, + }); + log::debug!( + "Sending initial join tx: {:?} to {}", + join_req, + gateway.peer + ); + conn_manager.send(&gateway, join_req).await?; + op_storage.push(tx, Operation::JoinRing(join_op))?; + Ok(()) +} + +// fn establish_conn(conn_manager: &mut CB, new_peer: PeerKeyLocation, tx: Transaction) +// where +// CB: ConnectionBridge, +// { +// conn_manager.add_connection(new_peer, false); +// let state = Arc::new(RwLock::new(messages::OpenConnection::Connecting)); + +// let ack_peer = move |peer: PeerKeyLocation, msg: Message| -> conn_manager::Result<()> { +// let (tx, oc) = match msg { +// Message::OpenConnection(tx, oc) => (tx, oc), +// msg => return Err(conn_manager::ConnError::UnexpectedResponseMessage(msg)), +// }; +// current_state.transition(oc); +// if !current_state.is_connected() { +// let open_conn: Message = (tx, *current_state).into(); +// log::debug!("Acknowledging OC"); +// conn_manager.send(peer, *open_conn.id(), open_conn)?; +// } else { +// log::info!( +// "{} connected to {}, adding to ring", +// peer_key, +// new_peer.peer +// ); +// conn_manager.send( +// peer, +// tx, +// Message::from((tx, messages::OpenConnection::Connected)), +// )?; +// ring.connections_by_location.write().insert( +// new_peer +// .location +// .ok_or(conn_manager::ConnError::LocationUnknown)?, +// new_peer, +// ); +// } +// Ok(()) +// }; +// self.conn_manager.listen_to_replies(tx, ack_peer); +// let conn_manager = self.conn_manager.clone(); +// tokio::spawn(async move { +// let curr_time = Instant::now(); +// let mut attempts = 0; +// while !state.read().is_connected() && curr_time.elapsed() <= Duration::from_secs(30) { +// log::debug!( +// "Sending {} to {}, number of messages sent: {}", +// *state.read(), +// new_peer.peer, +// attempts +// ); +// conn_manager.send(new_peer, tx, Message::OpenConnection(tx, *state.read()))?; +// attempts += 1; +// tokio::time::sleep(Duration::from_millis(200)).await +// } +// if curr_time.elapsed() > Duration::from_secs(30) { +// log::error!("Timed out trying to connect to {}", new_peer.peer); +// Err(conn_manager::ConnError::NegotationFailed) +// } else { +// conn_manager.remove_listener(tx); +// log::info!("Success negotiating connection to {}", new_peer.peer); +// Ok(()) +// } +// }); +// } + +mod messages { + use super::*; + use crate::{conn_manager::PeerKeyLocation, ring::Location}; + + use serde::{Deserialize, Serialize}; + + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] + pub(crate) enum JoinRingMsg { + Req { + id: Transaction, + msg: JoinRequest, + }, + Resp { + id: Transaction, + sender: PeerKeyLocation, + msg: JoinResponse, + }, + Connected, + } + + impl JoinRingMsg { + pub fn id(&self) -> &Transaction { + use JoinRingMsg::*; + match self { + Req { id, .. } => id, + Resp { id, .. } => id, + Connected => todo!(), + } + } + + pub fn sender(&self) -> Option<&PeerKeyLocation> { + use JoinRingMsg::*; + match self { + Req { .. } => None, + Resp { sender, .. } => Some(sender), + Connected => todo!(), + } + } + } + + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] + pub(crate) enum JoinRequest { + Initial { + target_loc: PeerKeyLocation, + req_peer: PeerKey, + hops_to_live: usize, + max_hops_to_live: usize, + }, + Proxy { + joiner: PeerKeyLocation, + hops_to_live: usize, + }, + } + + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] + pub(crate) enum JoinResponse { + Initial { + accepted_by: Vec, + your_location: Location, + your_peer_id: PeerKey, + }, + ReceivedOC { + by_peer: PeerKeyLocation, + }, + Proxy { + accepted_by: Vec, + }, + } +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use libp2p::identity::Keypair; + + use super::*; + use crate::{ + config::tracing::Logger, + message::TransactionTypeId, + node::test_utils::{EventType, SimNetwork}, + }; + + #[test] + fn join_ring_transitions() { + let id = Transaction::new(TransactionTypeId::JoinRing); + let h1 = PeerKeyLocation { + peer: PeerKey::from(Keypair::generate_ed25519().public()), + location: None, + }; + let h2 = PeerKeyLocation { + peer: PeerKey::from(Keypair::generate_ed25519().public()), + location: None, + }; + + let mut join_op_host_1 = StateMachine::::new(); + let res = join_op_host_1 + .consume(&JoinRingMsg::Req { + id, + msg: JoinRequest::Initial { + target_loc: h1, + req_peer: h2.peer, + hops_to_live: 0, + max_hops_to_live: 0, + }, + }) + .unwrap() + .unwrap(); + let expected = JoinRingMsg::Resp { + id, + sender: h2, + msg: JoinResponse::ReceivedOC { by_peer: h1 }, + }; + assert_eq!(res, expected); + assert!(matches!(join_op_host_1.state(), JRState::Connecting(_))); + + let mut join_op_host_2 = StateMachine::::new(); + let res = join_op_host_2.consume(&res).unwrap().unwrap(); + let expected = JoinRingMsg::Connected; + assert_eq!(res, expected); + assert!(matches!(join_op_host_2.state(), JRState::OCReceived)); + + let res = join_op_host_1.consume(&res).unwrap().unwrap(); + let expected = JoinRingMsg::Connected; + assert_eq!(res, expected); + assert!(matches!(join_op_host_1.state(), JRState::Connected)); + + let res = join_op_host_2.consume(&res).unwrap().unwrap(); + let expected = JoinRingMsg::Connected; + assert_eq!(res, expected); + assert!(matches!(join_op_host_2.state(), JRState::Connected)); + + // transaction finished, should not return anymore + assert!(join_op_host_1.consume(&res).is_err()); + assert!(join_op_host_2.consume(&res).is_err()); + assert!(matches!(join_op_host_1.state(), JRState::Connected)); + assert!(matches!(join_op_host_2.state(), JRState::Connected)); + } + + // #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn node0_to_gateway_conn() -> Result<(), Box> { + //! Given a network of one node and one gateway test that both are connected. + Logger::init_logger(); + let mut sim_net = SimNetwork::build(1, 1, 0); + tokio::time::sleep(Duration::from_secs(300)).await; + match tokio::time::timeout(Duration::from_secs(300), sim_net.recv_net_events()).await { + Ok(Some(Ok(event))) => match event.event { + EventType::JoinSuccess { gateway, new_node } => { + log::info!("Successful join op between {} and {}", gateway, new_node); + Ok(()) + } + }, + _ => Err("no event received".into()), + } + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn all_nodes_should_connect() -> Result<(), Box> { + //! Given a network of 1000 peers all nodes should have connections. + Logger::init_logger(); + + let _sim_nodes = SimNetwork::build(10, 10, 7); + // tokio::time::sleep(Duration::from_secs(300)).await; + // let _hist: Vec<_> = _ring_distribution(sim_nodes.values()).collect(); + + // FIXME: enable probing + // const NUM_PROBES: usize = 10; + // let mut probe_responses = Vec::with_capacity(NUM_PROBES); + // for probe_idx in 0..NUM_PROBES { + // let target = Location::random(); + // let idx: usize = rand::thread_rng().gen_range(0..sim_nodes.len()); + // let rnd_node = sim_nodes + // .get_mut(&format!("node-{}", idx)) + // .ok_or("node not found")?; + // let probe_response = ProbeProtocol::probe( + // rnd_node.ring_protocol.clone(), + // Transaction::new(::msg_type_id()), + // ProbeRequest { + // hops_to_live: 7, + // target, + // }, + // ) + // .await + // .expect("failed to get probe response"); + // probe_responses.push(probe_response); + // } + // probe_proto::utils::plot_probe_responses(probe_responses); + + // let any_empties = sim_nodes + // .peers + // .values() + // .map(|node| { + // node.op_storage + // .ring + // .connections_by_location + // .read() + // .is_empty() + // }) + // .any(|is_empty| is_empty); + // assert!(!any_empties); + + Ok(()) + } +} diff --git a/crates/locutus-node/src/operations/put.rs b/crates/locutus-node/src/operations/put.rs new file mode 100644 index 000000000..da8a469df --- /dev/null +++ b/crates/locutus-node/src/operations/put.rs @@ -0,0 +1,105 @@ +use rust_fsm::{StateMachine, StateMachineImpl}; + +use crate::{conn_manager::ConnectionBridge, message::Transaction, node::OpStateStorage}; + +pub(crate) use self::messages::PutMsg; + +use super::OpError; + +/// This is just a placeholder for now! +pub(crate) struct PutOp(StateMachine); + +impl PutOp { + pub fn new(key: Vec, value: Vec) -> Self { + let mut state = StateMachine::new(); + state.consume(&PutMsg::RouteValue { key, value }).unwrap(); + PutOp(state) + } +} + +struct PutOpSM; + +impl StateMachineImpl for PutOpSM { + type Input = PutMsg; + + type State = PutState; + + type Output = PutMsg; + + const INITIAL_STATE: Self::State = PutState::Initializing; + + fn transition(state: &Self::State, input: &Self::Input) -> Option { + match (state, input) { + (PutState::Initializing, PutMsg::RouteValue { key, .. }) => { + Some(PutState::Requesting { key: key.clone() }) + } + _ => None, + } + } + + fn output(state: &Self::State, input: &Self::Input) -> Option { + match (state, input) { + (PutState::Initializing, PutMsg::RouteValue { key, value }) => { + Some(PutMsg::RouteValue { + key: key.clone(), + value: value.clone(), + }) + } + _ => None, + } + } +} + +enum PutState { + Initializing, + Requesting { key: Vec }, +} + +pub(crate) async fn put_op( + op_storage: &mut OpStateStorage, + conn_manager: &mut CB, + put_op: PutMsg, +) -> Result<(), OpError> +where + CB: ConnectionBridge, +{ + Ok(()) +} + +/// Request to insert/update a value into a contract. +pub(crate) async fn request_put( + op_storage: &mut OpStateStorage, + conn_manager: &mut CB, + put_op: PutOp, +) -> Result<(), OpError> +where + CB: ConnectionBridge, +{ + // the initial request must provide: + // - a location in the network where the contract resides + // - and the value to put + todo!() +} + +mod messages { + use crate::conn_manager::PeerKeyLocation; + + use super::*; + + use serde::{Deserialize, Serialize}; + + #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] + pub(crate) enum PutMsg { + RouteValue { key: Vec, value: Vec }, + } + + impl PutMsg { + pub fn id(&self) -> &Transaction { + todo!() + } + + pub fn sender(&self) -> Option<&PeerKeyLocation> { + todo!() + } + } +} diff --git a/crates/locutus-node/src/operations/subscribe.rs b/crates/locutus-node/src/operations/subscribe.rs new file mode 100644 index 000000000..e69de29bb diff --git a/crates/freenet2-node/src/probe_proto.rs b/crates/locutus-node/src/probe_proto.rs similarity index 100% rename from crates/freenet2-node/src/probe_proto.rs rename to crates/locutus-node/src/probe_proto.rs diff --git a/crates/locutus-node/src/ring.rs b/crates/locutus-node/src/ring.rs new file mode 100644 index 000000000..b681edab4 --- /dev/null +++ b/crates/locutus-node/src/ring.rs @@ -0,0 +1,222 @@ +//! Ring protocol logic and supporting types. +//! +//! # Routing +//! The routing mechanism consist in a greedy routing algorithm which just targets +//! the closest location to the target destination iteratively in each hop, until it reaches +//! the destination. +//! +//! Path is limited to local knowledge, at any given point only 3 data points are known: +//! - previous node +//! - next node +//! - final location + +use std::{collections::BTreeMap, convert::TryFrom, fmt::Display, hash::Hasher}; + +use parking_lot::RwLock; + +use crate::conn_manager::{self, PeerKeyLocation}; + +#[derive(Debug)] +pub(crate) struct Ring { + pub connections_by_location: RwLock>, + pub rnd_if_htl_above: usize, + pub max_hops_to_live: usize, +} + +impl Ring { + const MIN_CONNECTIONS: usize = 10; + const MAX_CONNECTIONS: usize = 20; + + /// Above this number of remaining hops, + /// randomize which of node a message which be forwarded to. + pub const RAND_WALK_ABOVE_HTL: usize = 7; + + /// + pub const MAX_HOPS_TO_LIVE: usize = 10; + + pub fn new() -> Self { + Ring { + connections_by_location: RwLock::new(BTreeMap::new()), + rnd_if_htl_above: Self::RAND_WALK_ABOVE_HTL, + max_hops_to_live: Self::MAX_HOPS_TO_LIVE, + } + } + + pub fn with_rnd_walk_above(&mut self, rnd_if_htl_above: usize) -> &mut Self { + self.rnd_if_htl_above = rnd_if_htl_above; + self + } + + pub fn with_max_hops(&mut self, max_hops_to_live: usize) -> &mut Self { + self.max_hops_to_live = max_hops_to_live; + self + } + + pub fn should_accept(&self, my_location: &Location, location: &Location) -> bool { + let cbl = &*self.connections_by_location.read(); + if location == my_location || cbl.contains_key(location) { + false + } else if cbl.len() < Self::MIN_CONNECTIONS { + true + } else if cbl.len() >= Self::MAX_CONNECTIONS { + false + } else { + my_location.distance(location) < self.median_distance_to(my_location) + } + } + + pub fn median_distance_to(&self, location: &Location) -> Distance { + let connections = self.connections_by_location.read(); + let mut conn_by_dist: Vec<_> = connections + .keys() + .map(|key| key.distance(location)) + .collect(); + conn_by_dist.sort_unstable(); + let idx = self.connections_by_location.read().len() / 2; + conn_by_dist[idx] + } + + pub fn routing(&self, target: &Location) -> Option<(Location, PeerKeyLocation)> { + let connections = self.connections_by_location.read(); + let mut conn_by_dist: Vec<_> = connections + .iter() + .map(|(loc, peer)| (loc.distance(target), (loc, peer))) + .collect(); + conn_by_dist.sort_by_key(|&(dist, _)| dist); + conn_by_dist.first().map(|(_, v)| (*v.0, *v.1)) + } + + pub fn random_peer(&self, filter_fn: F) -> Option + where + F: FnMut(&&PeerKeyLocation) -> bool, + { + // FIXME: should be optimized and avoid copying + self.connections_by_location + .read() + .values() + .find(filter_fn) + .copied() + } +} + +/// An abstract location on the 1D ring, represented by a real number on the interal [0, 1] +#[derive(Debug, serde::Serialize, serde::Deserialize, Clone, Copy)] +pub struct Location(f64); + +pub(crate) type Distance = Location; + +impl Location { + /// Returns a new random location. + pub fn random() -> Self { + use rand::prelude::*; + let mut rng = rand::thread_rng(); + Location(rng.gen_range(0.0..=1.0)) + } + + /// Compute the distance between two locations. + pub fn distance(&self, other: &Location) -> Distance { + let d = (self.0 - other.0).abs(); + if d < 0.5 { + Location(d) + } else { + Location(1.0 - d) + } + } +} + +impl Display for Location { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(self.0.to_string().as_str())?; + Ok(()) + } +} + +impl PartialEq for Location { + fn eq(&self, other: &Self) -> bool { + self.0 == other.0 + } +} + +/// Since we don't allow NaN values in the construction of Location +/// we can safely assume that an equivalence relation holds. +impl Eq for Location {} + +impl Ord for Location { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.partial_cmp(other) + .expect("always should return a cmp value") + } +} + +impl PartialOrd for Location { + fn partial_cmp(&self, other: &Self) -> Option { + self.0.partial_cmp(&other.0) + } +} + +impl std::hash::Hash for Location { + fn hash(&self, state: &mut H) { + let bits = self.0.to_bits(); + state.write_u64(bits); + state.finish(); + } +} + +impl TryFrom for Location { + type Error = (); + + fn try_from(value: f64) -> Result { + if !(0.0..=1.0).contains(&value) { + Err(()) + } else { + Ok(Location(value)) + } + } +} + +#[derive(thiserror::Error, Debug)] +pub(crate) enum RingProtoError { + #[error("failed while attempting to join a ring")] + Join, + #[error(transparent)] + ConnError(#[from] Box), +} + +#[cfg(test)] +mod tests { + use libp2p::PeerId; + + use super::*; + use crate::conn_manager::PeerKey; + + #[test] + fn location_dist() { + let l0 = Location(0.); + let l1 = Location(0.25); + assert!(l0.distance(&l1) == Location(0.25)); + + let l0 = Location(0.75); + let l1 = Location(0.50); + assert!(l0.distance(&l1) == Location(0.25)); + } + + #[test] + fn find_closest() { + let ring = Ring::new(); + { + let conns = &mut *ring.connections_by_location.write(); + let def_peer = PeerKeyLocation { + peer: PeerKey(PeerId::random()), + location: None, + }; + conns.insert(Location(0.3), def_peer); + conns.insert(Location(0.5), def_peer); + conns.insert(Location(0.0), def_peer); + } + + assert_eq!(Location(0.0), ring.routing(&Location(0.9)).unwrap().0); + assert_eq!(Location(0.0), ring.routing(&Location(0.1)).unwrap().0); + assert_eq!(Location(0.5), ring.routing(&Location(0.41)).unwrap().0); + assert_eq!(Location(0.3), ring.routing(&Location(0.39)).unwrap().0); + } +} diff --git a/crates/locutus-node/src/user_events.rs b/crates/locutus-node/src/user_events.rs new file mode 100644 index 000000000..b17d43fca --- /dev/null +++ b/crates/locutus-node/src/user_events.rs @@ -0,0 +1,34 @@ +#[async_trait::async_trait] +pub(crate) trait UserEventsProxy { + async fn recv(&self) -> UserEvent; +} + +pub(crate) enum UserEvent { + /// Update or insert a new value in a contract corresponding with the provided key. + Put { + /// Hash key of the contract. + key: Vec, + /// Value to upsert in the contract. + value: Vec, + }, + /// Fetch the current value from a contrart corresponding to the provided key. + Get { + /// Hash key of the contract. + key: Vec, + }, +} + +pub(crate) mod test_utils { + use super::*; + + pub(crate) struct MemoryEventsGen {} + + #[async_trait::async_trait] + impl UserEventsProxy for MemoryEventsGen { + /// # Cancellation Safety + /// This future must be safe to cancel. + async fn recv(&self) -> UserEvent { + todo!() + } + } +} diff --git a/crates/freenet2-node/tests/node_startup.rs b/crates/locutus-node/tests/node_startup.rs similarity index 81% rename from crates/freenet2-node/tests/node_startup.rs rename to crates/locutus-node/tests/node_startup.rs index be0a90b7a..fdf1ad15a 100644 --- a/crates/freenet2-node/tests/node_startup.rs +++ b/crates/locutus-node/tests/node_startup.rs @@ -5,5 +5,5 @@ use locutus_node::*; async fn start_node() -> Result<(), Box> { let key = Keypair::generate_ed25519(); let mut node = NodeConfig::default().with_key(key).build_in_memory()?; - node.listen_on().map_err(|_| "failed to start".into()) + node.listen_on().await.map_err(|_| "failed to start".into()) }