Skip to content

Commit

Permalink
Remove unexpected deadlock due to test impl
Browse files Browse the repository at this point in the history
  • Loading branch information
iduartgomez committed Apr 7, 2024
1 parent 639bb2b commit 0db7b03
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 30 deletions.
12 changes: 0 additions & 12 deletions crates/core/src/node/network_bridge/p2p_protoc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use libp2p::{
identify,
identity::Keypair,
multiaddr::Protocol,
ping,
swarm::{
self,
dial_opts::DialOpts,
Expand Down Expand Up @@ -81,8 +80,6 @@ fn config_behaviour(
)
.with_agent_version(CURRENT_AGENT_VER.to_string());

let ping = ping::Behaviour::default();

let peer_id = private_key.public().to_peer_id();
let auto_nat = {
let config = autonat::Config {
Expand All @@ -97,7 +94,6 @@ fn config_behaviour(
};

NetBehaviour {
ping,
identify: identify::Behaviour::new(ident_config),
auto_nat,
freenet: FreenetBehaviour {
Expand Down Expand Up @@ -1192,7 +1188,6 @@ fn decode_msg(buf: BytesMut) -> Result<NetMessage, ConnectionError> {
#[behaviour(to_swarm = "NetEvent")]
pub(in crate::node) struct NetBehaviour {
identify: identify::Behaviour,
ping: ping::Behaviour,
freenet: FreenetBehaviour,
auto_nat: autonat::Behaviour,
}
Expand All @@ -1201,7 +1196,6 @@ pub(in crate::node) struct NetBehaviour {
pub(in crate::node) enum NetEvent {
Freenet(Box<NetMessage>),
Identify(Box<identify::Event>),
Ping(ping::Event),
Autonat(autonat::Event),
}

Expand All @@ -1217,12 +1211,6 @@ impl From<identify::Event> for NetEvent {
}
}

impl From<ping::Event> for NetEvent {
fn from(event: ping::Event) -> NetEvent {
Self::Ping(event)
}
}

impl From<NetMessage> for NetEvent {
fn from(event: NetMessage) -> NetEvent {
Self::Freenet(Box::new(event))
Expand Down
8 changes: 0 additions & 8 deletions crates/core/src/node/p2p_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ impl NodeP2P {
mod test {
use std::{net::Ipv4Addr, time::Duration};

use super::super::network_bridge::p2p_protoc::NetEvent;
use super::*;
use crate::{
client_events::test::MemoryEventsGen,
Expand All @@ -178,7 +177,6 @@ mod test {
};

use futures::StreamExt;
use libp2p::swarm::SwarmEvent;
use tokio::sync::watch::channel;

/// Ping test event loop
Expand All @@ -189,12 +187,6 @@ mod test {
peer.conn_manager.swarm.select_next_some(),
);
match ev.await {
Ok(SwarmEvent::Behaviour(NetEvent::Ping(ping))) => {
if ping.result.is_ok() {
tracing::info!("ping done @ {}", peer.peer_key);
return Ok(());
}
}
Ok(other) => {
tracing::debug!("{:?}", other)
}
Expand Down
9 changes: 4 additions & 5 deletions crates/core/src/transport/connection_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -831,11 +831,11 @@ mod test {

#[allow(clippy::type_complexity)]
static CHANNELS: OnceLock<
Arc<Mutex<HashMap<SocketAddr, mpsc::Sender<(SocketAddr, Vec<u8>)>>>>,
Arc<Mutex<HashMap<SocketAddr, mpsc::UnboundedSender<(SocketAddr, Vec<u8>)>>>>,
> = OnceLock::new();

struct MockSocket {
inbound: Mutex<mpsc::Receiver<(SocketAddr, Vec<u8>)>>,
inbound: Mutex<mpsc::UnboundedReceiver<(SocketAddr, Vec<u8>)>>,
this: SocketAddr,
packet_loss_factor: Option<f64>,
rng: Mutex<rand::rngs::SmallRng>,
Expand All @@ -846,7 +846,7 @@ mod test {
let channels = CHANNELS
.get_or_init(|| Arc::new(Mutex::new(HashMap::new())))
.clone();
let (outbound, inbound) = mpsc::channel(1);
let (outbound, inbound) = mpsc::unbounded_channel();
channels.lock().await.insert(addr, outbound);
static SEED: AtomicU64 = AtomicU64::new(0xfeedbeef);
MockSocket {
Expand Down Expand Up @@ -895,7 +895,6 @@ mod test {
// tracing::trace!(?target, ?self.this, "sending packet to remote");
sender
.send((self.this, buf.to_vec()))
.await
.map_err(|_| std::io::ErrorKind::ConnectionAborted)?;
// tracing::trace!(?target, ?self.this, "packet sent to remote");
Ok(buf.len())
Expand Down Expand Up @@ -1011,7 +1010,7 @@ mod test {
});
}
let results =
tokio::time::timeout(Duration::from_secs(5), conns.try_collect::<Vec<_>>())
tokio::time::timeout(Duration::from_secs(50), conns.try_collect::<Vec<_>>())
.await??;
Ok::<_, DynError>((results, test_generator))
});
Expand Down
6 changes: 2 additions & 4 deletions crates/core/src/transport/peer_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ impl PeerConnection {
.send((self.remote_conn.remote_addr, packet.clone()))
.await
.map_err(|_| TransportError::ConnectionClosed)?;
tracing::trace!(%idx, remote = ?self.remote_conn.remote_addr, "packet resent");
self.remote_conn.sent_tracker.lock().report_sent_packet(idx, packet);
tracing::trace!(%idx, remote = ?self.remote_conn.remote_addr, "packet resent");
}
}
}
Expand Down Expand Up @@ -254,12 +254,11 @@ impl PeerConnection {
payload,
} => {
if let Some(sender) = self.inbound_streams.get(&stream_id) {
tracing::trace!(%stream_id, %fragment_number, "pushing fragment to existing stream");
sender
.send((fragment_number, payload))
.await
.map_err(|_| TransportError::ConnectionClosed)?;
tracing::trace!(%stream_id, %fragment_number, "fragment pushed");
tracing::trace!(%stream_id, %fragment_number, "fragment pushed to existing stream");
} else {
let (sender, receiver) = mpsc::channel(1);
tracing::trace!(%stream_id, %fragment_number, "new stream");
Expand All @@ -270,7 +269,6 @@ impl PeerConnection {
tracing::trace!(%stream_id, %fragment_number, "stream finished");
return Ok(Some(msg));
}
tracing::trace!(%stream_id, "listening for more fragments");
self.inbound_stream_futures
.push(tokio::spawn(inbound_stream::recv_stream(
stream_id, receiver, stream,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/transport/rate_limiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ impl<T: TimeSource> PacketRateLimiter<T> {
self.add_packet(packet.len());
}
}
tracing::error!("Rate limiter task ended unexpectedly");
tracing::debug!("Rate limiter task ended unexpectedly");
}

/// Report that a packet was sent
Expand Down

0 comments on commit 0db7b03

Please sign in to comment.