Skip to content

Commit

Permalink
Implementing more command handlers for Kademlia
Browse files Browse the repository at this point in the history
  • Loading branch information
hoffmabc committed Mar 31, 2023
1 parent 9965e49 commit 1ea043e
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 29 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Expand Up @@ -26,4 +26,5 @@ bdk_electrum = { git = "https://github.com/hoffmabc/bdk", version="0.2.0"}
bdk_file_store = { git = "https://github.com/hoffmabc/bdk", version="0.1.0"}
bdk_chain = { git = "https://github.com/hoffmabc/bdk", version="0.4"}
bdk_esplora = { git = "https://github.com/hoffmabc/bdk", version="0.2.0"}
sled = "0.34.7"
sled = "0.34.7"
bincode = "1.3.3"
1 change: 1 addition & 0 deletions src/main.rs
Expand Up @@ -87,6 +87,7 @@ fn main() -> anyhow::Result<()> {
// Create or retrieve datastore
let ds = rt.block_on(async move { OpenBazaarDb::new(db_file).await.unwrap() });

// Retrieve or create a new BIP39-based identity from the datastore
let keypair = rt.block_on(async move { ds.get_identity().await.unwrap() });

/************
Expand Down
209 changes: 181 additions & 28 deletions src/network/mod.rs
@@ -1,7 +1,11 @@
use bincode::Serializer;
use futures::StreamExt;
use libp2p::identity::Keypair;
use libp2p::kad::record::Key;
use libp2p::kad::{KademliaEvent, QueryId};
use libp2p::kad::{
AddProviderOk, GetClosestPeersOk, GetProvidersOk, GetRecordOk, KademliaEvent, QueryId,
QueryResult,
};
use libp2p::multiaddr::Protocol;
use libp2p::swarm::SwarmEvent;
use libp2p::swarm::{NetworkBehaviour, SwarmBuilder};
Expand Down Expand Up @@ -85,6 +89,35 @@ impl Client {
.expect("Command receiver not to be dropped.");
receiver.await.expect("Sender not to be dropped.")
}

#[instrument]
pub async fn stop_providing(&self, share_addr: ShareAddress) {
self.sender
.send(Command::StopProviding { share_addr })
.await
.expect("Command receiver not to be dropped.");
}

#[instrument]
pub async fn get_providers(&self, share_addr: ShareAddress) -> HashSet<PeerId> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Command::GetProviders { share_addr, sender })
.await
.expect("Command receiver not to be dropped.");
receiver.await.expect("Sender not to be dropped.")
}

// getclosestpeers
#[instrument]
pub async fn get_closest_peers(&self, addr: ShareAddress) -> anyhow::Result<PeerId> {
let (sender, receiver) = oneshot::channel();
self.sender
.send(Command::GetClosestPeer { addr, sender })
.await
.expect("Command receiver not to be dropped.");
receiver.await.expect("Sender not to be dropped.")
}
}

#[derive(Debug)]
Expand Down Expand Up @@ -159,36 +192,11 @@ pub struct EventLoop {
pending_dial: HashMap<PeerId, oneshot::Sender<Result<(), anyhow::Error>>>,
pending_start_providing: HashMap<QueryId, oneshot::Sender<()>>,
pending_get_providers: HashMap<QueryId, oneshot::Sender<HashSet<PeerId>>>,
pending_get_closest_peer: HashMap<QueryId, oneshot::Sender<anyhow::Result<PeerId>>>,
providing: HashSet<Key>,
}

impl EventLoop {
fn new(
swarm: libp2p::Swarm<ComposedBehaviour>,
command_receiver: mpsc::Receiver<Command>,
) -> Self {
Self {
swarm,
command_receiver,
pending_dial: Default::default(),
pending_start_providing: Default::default(),
pending_get_providers: Default::default(),
}
}

pub async fn run(&mut self) {
loop {
tokio::select! {
event = self.swarm.next() => {
self.handle_event(event.unwrap()).await
},
command = self.command_receiver.recv() => match command {
Some(c) => self.handle_command(c).await,
None => return,
}
}
}
}

async fn handle_command(&mut self, command: Command) {
match command {
Command::StartListening { addr, sender } => {
Expand Down Expand Up @@ -238,12 +246,128 @@ impl EventLoop {
let query_id = self.swarm.behaviour_mut().kademlia.get_providers(key);
self.pending_get_providers.insert(query_id, sender);
}
Command::GetClosestPeer { addr, sender } => {
let query_id = self.swarm.behaviour_mut().kademlia.get_closest_peers(addr);
self.pending_get_closest_peer.insert(query_id, sender);
}
Command::GetListenAddress { sender } => {
let peer_id = self.swarm.local_peer_id().to_owned().into();
let addr: Vec<Multiaddr> = self
.swarm
.listeners()
.map(|addr| addr.to_owned().with(Protocol::P2p(peer_id)))
.collect();
sender
.send(Ok(addr))
.expect("Failed to send listen address.")
}
_ => todo!(),
}
}

/// The result of [`Kademlia::bootstrap`].
// Bootstrap(BootstrapResult),

// /// The result of a (automatic) republishing of a provider record.
// RepublishProvider(AddProviderResult),

// /// The result of [`Kademlia::get_record`].
// GetRecord(GetRecordResult),

// /// The result of [`Kademlia::put_record`].
// PutRecord(PutRecordResult),

// /// The result of a (automatic) republishing of a (value-)record.
// RepublishRecord(PutRecordResult),

async fn handle_event(&mut self, event: SwarmEvent<ComposedEvent, std::io::Error>) {
match event {
// SwarmEvent::Behaviour(ComposedEvent::Kademlia(
// KademliaEvent::OutboundQueryProgressed {
// id,
// result: QueryResult::GetRecord(Ok(GetRecordOk::FoundRecord(peer_record))),
// ..
// },
// )) => {
// let data = peer_record.record.value.clone();
// let b: anyhow::Result<ServerAddrBundle, bincode::Error> =
// bincode::deserialize(&data);
// let bundle = match b {
// Ok(r) => Ok(r),
// Err(e) => Err(anyhow::Error::from(e)),
// };

// let _ = self
// .pending_get_clear_addr
// .remove(&id)
// .expect("Completed query to previously pending")
// .send(bundle);
// }
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryProgressed {
id,
result:
QueryResult::GetProviders(Ok(GetProvidersOk::FoundProviders { providers, key })),
..
},
)) => {
let mut providers = providers.into_iter().collect::<HashSet<_>>();
if self.providing.contains(&key) {
providers.insert(*self.swarm.local_peer_id());
}
let _ = self
.pending_get_providers
.remove(&id)
.expect("No pending get providers request.")
.send(providers);
}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryProgressed {
id,
result: QueryResult::GetClosestPeers(Ok(GetClosestPeersOk { peers, key })),
..
},
)) => {
// Get k-bucket where this key is located
let key = libp2p::kad::kbucket::Key::from(key);

let local_peer_id = *self.swarm.local_peer_id();
let local_peer_key = libp2p::kad::kbucket::Key::from(local_peer_id);

// Find the distance between the key and the local peer
let host_distance = local_peer_key.distance(&key);
println!("Closest Peers => {:?}", peers);

let mut peer_id = peers.get(0).unwrap().to_owned();
let remote_peer_key = libp2p::kad::kbucket::Key::from(peer_id);

// Find the distance between the key and the remote peer
let remote_distance = remote_peer_key.distance(&key);

// Check if local peer is the closest
if remote_distance > host_distance {
peer_id = local_peer_id;
}
println!("Returning peer => {:?}", peer_id);

let _ = self
.pending_get_closest_peer
.remove(&id)
.expect("Completed query to previously pending")
.send(Ok(peer_id));
}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(
KademliaEvent::OutboundQueryProgressed {
id: query_id,
result: QueryResult::StartProviding(Ok(AddProviderOk { key })),
..
},
)) => {
self.providing.insert(key);
if let Some(sender) = self.pending_start_providing.remove(&query_id) {
let _ = sender.send(());
}
}
SwarmEvent::Behaviour(ComposedEvent::Kademlia(..)) => {}
SwarmEvent::NewListenAddr { address, .. } => {
let local_peer_id = *self.swarm.local_peer_id();
Expand Down Expand Up @@ -298,4 +422,33 @@ impl EventLoop {
e => panic!("{:?}", e),
}
}

fn new(
swarm: libp2p::Swarm<ComposedBehaviour>,
command_receiver: mpsc::Receiver<Command>,
) -> Self {
Self {
swarm,
command_receiver,
pending_dial: Default::default(),
pending_start_providing: Default::default(),
pending_get_providers: Default::default(),
pending_get_closest_peer: Default::default(),
providing: Default::default(),
}
}

pub async fn run(&mut self) {
loop {
tokio::select! {
event = self.swarm.next() => {
self.handle_event(event.unwrap()).await
},
command = self.command_receiver.recv() => match command {
Some(c) => self.handle_command(c).await,
None => return,
}
}
}
}
}

0 comments on commit 1ea043e

Please sign in to comment.