Skip to content

Commit

Permalink
chore: Updated libp2p to 0.50
Browse files Browse the repository at this point in the history
Note: Additional changes looks to be required around DHT due to PR being unexpectedly pulled moments before release of 0.50
  • Loading branch information
dariusc93 committed Nov 25, 2022
1 parent a0b213b commit 8f3b5ed
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 25 deletions.
11 changes: 7 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,19 @@ libp2p = { default-features = false, features = [
"identify",
"kad",
"websocket",
"tcp-tokio",
"tcp",
"macros",
"quic",
"tokio",
"mplex",
"noise",
"ping",
"yamux",
"dns-tokio",
"mdns-tokio",
"dns",
"mdns",
"rsa",
"serde",
], version = "0.49" }
], version = "0.50" }
parking_lot = "0.12"
serde = { default-features = false, features = ["derive"], version = "1.0" }
serde_json = { default-features = false, features = ["std"], version = "1.0" }
Expand Down
4 changes: 2 additions & 2 deletions bitswap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ libipld = "0.14"
fnv = { default-features = false, version = "1.0" }
futures = { default-features = false, version = "0.3" }
hash_hasher = "2.0.3"
libp2p-core = { default-features = false, version = "0.37.0" }
libp2p-swarm = { default-features = false, version = "0.40.0" }
libp2p-core = { default-features = false, version = "0.38.0" }
libp2p-swarm = { default-features = false, version = "0.41.0" }
prost = { default-features = false, version = "0.11" }
thiserror = { default-features = false, version = "1.0" }
tokio = { default-features = false, version = "1", features = ["rt"] }
Expand Down
23 changes: 11 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1787,7 +1787,7 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
InboundRequest { request } => {
trace!("kad: inbound {:?} request handled", request);
}
OutboundQueryCompleted { result, id, .. } => {
OutboundQueryProgressed { result, id, .. } => {
// make sure the query is exhausted
if self.swarm.behaviour().kademlia.query(&id).is_none() {
match result {
Expand Down Expand Up @@ -1853,10 +1853,9 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
);
}
}
GetProviders(Ok(GetProvidersOk {
GetProviders(Ok(GetProvidersOk::FoundProviders{
key: _,
providers,
closest_peers: _,
})) => {
if self.swarm.behaviour().kademlia.query(&id).is_none() {
let providers =
Expand All @@ -1868,6 +1867,7 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
);
}
}
GetProviders(Ok(GetProvidersOk::FinishedWithNoAdditionalRecord { .. })) => {},
GetProviders(Err(GetProvidersError::Timeout {
key, ..
})) => {
Expand Down Expand Up @@ -1911,16 +1911,17 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
key
);
}
GetRecord(Ok(GetRecordOk { records, .. })) => {
GetRecord(Ok(GetRecordOk::FoundRecord(record))) => {
if self.swarm.behaviour().kademlia.query(&id).is_none() {
let records =
records.into_iter().map(|rec| rec.record).collect();
// let records =
// records.into_iter().map(|rec| rec.record).collect();
self.kad_subscriptions.finish_subscription(
id.into(),
Ok(KadResult::Records(records)),
Ok(KadResult::Record(record.record)),
);
}
}
GetRecord(Ok(GetRecordOk::FinishedWithNoAdditionalRecord { .. })) => {},
GetRecord(Err(GetRecordError::NotFound {
key,
closest_peers: _,
Expand Down Expand Up @@ -1956,9 +1957,7 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
}
}
GetRecord(Err(GetRecordError::Timeout {
key,
records: _,
quorum: _,
key
})) => {
let key = multibase::encode(Base::Base32Lower, key);
warn!("kad: timed out while trying to get key {}", key);
Expand Down Expand Up @@ -2442,8 +2441,8 @@ impl<TRepoTypes: RepoTypes> Future for IpfsFuture<TRepoTypes> {
};
let _ = ret.send(future);
}
IpfsEvent::DhtGet(key, quorum, ret) => {
let id = self.swarm.behaviour_mut().kademlia.get_record(key, quorum);
IpfsEvent::DhtGet(key,_, ret) => {
let id = self.swarm.behaviour_mut().kademlia.get_record(key);

let future = self.kad_subscriptions.create_subscription(id.into(), None);
let _ = ret.send(future);
Expand Down
6 changes: 4 additions & 2 deletions src/p2p/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use libp2p::kad::record::{
Record,
};
use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent};
use libp2p::mdns::{MdnsConfig, MdnsEvent, TokioMdns as Mdns};
use libp2p::mdns::{Config as MdnsConfig, Event as MdnsEvent, tokio::Behaviour as Mdns};
use libp2p::ping::{Behaviour as Ping, Event as PingEvent};
use libp2p::relay::v2::client::transport::ClientTransport;
use libp2p::relay::v2::client::{Client as RelayClient, Event as RelayClientEvent};
Expand All @@ -32,7 +32,7 @@ use std::num::NonZeroU32;
use std::time::Duration;

/// Behaviour type.
#[derive(libp2p::NetworkBehaviour)]
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "BehaviourEvent", event_process = false)]
pub struct Behaviour {
pub mdns: Toggle<Mdns>,
Expand Down Expand Up @@ -139,6 +139,8 @@ pub enum KadResult {
Peers(Vec<PeerId>),
/// The query successfully returns a `GetRecord` result.
Records(Vec<Record>),
///
Record(Record),
}

#[derive(Serialize, Deserialize, Clone, Debug)]
Expand Down
6 changes: 2 additions & 4 deletions src/p2p/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,12 +203,10 @@ pub async fn create_swarm(
let transport = transport::build_transport(keypair, relay_transport, transport_config)?;

// Create a Swarm
let swarm = libp2p::swarm::SwarmBuilder::new(transport, behaviour, peer_id)
.connection_limits(swarm_config.connection)
let swarm = libp2p::swarm::SwarmBuilder::with_executor(transport, behaviour, peer_id, SpannedExecutor(span)).connection_limits(swarm_config.connection)
.notify_handler_buffer_size(swarm_config.notify_handler_buffer_size)
.connection_event_buffer_size(swarm_config.connection_event_buffer_size)
.dial_concurrency_factor(swarm_config.dial_concurrency_factor)
.executor(Box::new(SpannedExecutor(span)))
.max_negotiating_inbound_streams(swarm_config.max_inbound_stream)
.build();

Expand All @@ -217,7 +215,7 @@ pub async fn create_swarm(

struct SpannedExecutor(Span);

impl libp2p::core::Executor for SpannedExecutor {
impl libp2p::swarm::Executor for SpannedExecutor {
fn exec(
&self,
future: std::pin::Pin<Box<dyn std::future::Future<Output = ()> + 'static + Send>>,
Expand Down
2 changes: 1 addition & 1 deletion src/p2p/swarm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -634,7 +634,7 @@ mod tests {
// use tokio, but from a futures-executor threadpool, which is outside of tokio context.
struct ThreadLocalTokio;

impl libp2p::core::Executor for ThreadLocalTokio {
impl libp2p::swarm::Executor for ThreadLocalTokio {
fn exec(&self, future: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) {
tokio::task::spawn(future);
}
Expand Down

0 comments on commit 8f3b5ed

Please sign in to comment.