Skip to content

Commit

Permalink
feat(metrics): Add tokio runtime metrics (#1878)
Browse files Browse the repository at this point in the history
  • Loading branch information
gurinderu authored Nov 6, 2023
1 parent 83ee3ec commit 5522bed
Show file tree
Hide file tree
Showing 25 changed files with 606 additions and 116 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,9 @@ base64 = "0.21.5"
bs58 = "0.5.0"
fluence-keypair = "0.10.3"
parking_lot = "0.12.1"
tokio = { version = "1.33.0", features = ["full", "tracing"] }
tokio-stream = { version = "0.1.14" }
tokio-util = { version = "0.7.10" }
tokio = "1.33.0"
tokio-stream = "0.1.14"
tokio-util = "0.7.10"
uuid = { version = "1.5.0", features = ["v4"] }
derivative = "2.2.0"
serde_json = { version = "1.0.107", features = ["preserve_order"] }
Expand All @@ -144,6 +144,7 @@ cid = "0.10.1"
libipld = "0.16.0"
axum = "0.6.20"
hyper = "0.14.27"
once_cell = "1.18.0"

# Enable a small amount of optimization in debug mode
[profile.dev]
Expand Down
3 changes: 2 additions & 1 deletion crates/connected-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ test-constants = { workspace = true }
local-vm = { workspace = true }

fluence-keypair = { workspace = true }
libp2p = { workspace = true }
libp2p = { workspace = true, features = ["identify"] }
libp2p-swarm = { workspace = true }
tokio = { workspace = true }
futures = { workspace = true }
Expand All @@ -23,6 +23,7 @@ eyre = { workspace = true }
parking_lot = { workspace = true }
either = "1.9.0"
void = "1.0.2"
tracing = { workspace = true }

[dev-dependencies]
rand = "0.8.5"
Expand Down
4 changes: 2 additions & 2 deletions crates/connected-client/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@
* limitations under the License.
*/

use crate::behaviour::ClientBehaviour;
use crate::behaviour::FluenceClientBehaviour;
use libp2p::PeerId;
use particle_protocol::Particle;

pub trait ParticleApi {
fn send(&mut self, peer_id: PeerId, particle: Particle);
}

impl ParticleApi for ClientBehaviour {
impl ParticleApi for FluenceClientBehaviour {
fn send(&mut self, peer_id: PeerId, particle: Particle) {
self.call(peer_id, particle)
}
Expand Down
115 changes: 58 additions & 57 deletions crates/connected-client/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,62 +14,83 @@
* limitations under the License.
*/

use either::Either;
use std::collections::VecDeque;
use std::task::{Context, Poll, Waker};
use std::time::Duration;

use futures::future::BoxFuture;
use futures::FutureExt;
use libp2p::core::Endpoint;
use libp2p::identity::PublicKey;
use libp2p::swarm::ToSwarm::GenerateEvent;
use libp2p::swarm::{
ConnectionDenied, ConnectionHandler, ConnectionHandlerSelect, ConnectionId, DialError,
FromSwarm, THandler, THandlerInEvent, THandlerOutEvent,
ConnectionDenied, ConnectionId, DialError, FromSwarm, THandler, THandlerInEvent,
THandlerOutEvent,
};
use libp2p::{
core::{connection::ConnectedPoint, Multiaddr},
identify::{Behaviour as Identify, Config as IdentifyConfig},
ping::{Behaviour as Ping, Config as PingConfig},
swarm::{NetworkBehaviour, NotifyHandler, OneShotHandler, PollParameters, ToSwarm},
PeerId,
};
use particle_protocol::{HandlerMessage, Particle, ProtocolConfig};
use particle_protocol::{HandlerMessage, Particle, ProtocolConfig, PROTOCOL_NAME};

use crate::ClientEvent;

pub type SwarmEventType = ToSwarm<ClientEvent, THandlerInEvent<ClientBehaviour>>;

#[derive(NetworkBehaviour)]
pub struct FluenceClientBehaviour {
client: ClientBehaviour,
ping: Ping,
identify: Identify,
}

impl FluenceClientBehaviour {
pub fn new(protocol_config: ProtocolConfig, public_key: PublicKey) -> Self {
let client = ClientBehaviour::new(protocol_config);
let identify = Identify::new(IdentifyConfig::new(PROTOCOL_NAME.into(), public_key));
let ping = Ping::new(
PingConfig::new()
.with_interval(Duration::from_secs(5))
.with_timeout(Duration::from_secs(60)),
);
Self {
client,
ping,
identify,
}
}

pub fn call(&mut self, peer_id: PeerId, call: Particle) {
self.client.events.push_back(ToSwarm::NotifyHandler {
event: HandlerMessage::OutParticle(call, <_>::default()),
handler: NotifyHandler::Any,
peer_id,
});

self.client.wake();
}
}

pub struct ClientBehaviour {
protocol_config: ProtocolConfig,
events: VecDeque<SwarmEventType>,
ping: Ping,
reconnect: Option<BoxFuture<'static, Vec<Multiaddr>>>,
waker: Option<Waker>,
}

impl ClientBehaviour {
pub fn new(protocol_config: ProtocolConfig) -> Self {
#[allow(deprecated)]
let ping = Ping::new(PingConfig::new());
Self {
protocol_config,
events: VecDeque::default(),
ping,
reconnect: None,
waker: None,
}
}

pub fn call(&mut self, peer_id: PeerId, call: Particle) {
self.events.push_back(ToSwarm::NotifyHandler {
event: Either::Left(HandlerMessage::OutParticle(call, <_>::default())),
handler: NotifyHandler::Any,
peer_id,
});

self.wake();
}

fn wake(&self) {
if let Some(waker) = &self.waker {
waker.wake_by_ref()
Expand Down Expand Up @@ -132,13 +153,14 @@ impl ClientBehaviour {

match cp {
ConnectedPoint::Dialer { address, .. } => {
let address = address.clone();
log::warn!(
"Disconnected from {} @ {:?}, reconnecting",
peer_id,
address
);
self.events.push_back(ToSwarm::Dial {
opts: address.clone().into(),
self.events.push_front(SwarmEventType::Dial {
opts: address.into(),
});
}
ConnectedPoint::Listener {
Expand All @@ -157,50 +179,33 @@ impl ClientBehaviour {
}

impl NetworkBehaviour for ClientBehaviour {
type ConnectionHandler = ConnectionHandlerSelect<
OneShotHandler<ProtocolConfig, HandlerMessage, HandlerMessage>,
<Ping as NetworkBehaviour>::ConnectionHandler,
>;
type ConnectionHandler = OneShotHandler<ProtocolConfig, HandlerMessage, HandlerMessage>;

type ToSwarm = ClientEvent;

fn handle_established_inbound_connection(
&mut self,
connection_id: ConnectionId,
peer_id: PeerId,
local_addr: &Multiaddr,
remote_addr: &Multiaddr,
_connection_id: ConnectionId,
_peer_id: PeerId,
_local_addr: &Multiaddr,
_remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
let ping_handler: THandler<Ping> = self.ping.handle_established_inbound_connection(
connection_id,
peer_id,
local_addr,
remote_addr,
)?;
let oneshot_handler: OneShotHandler<ProtocolConfig, HandlerMessage, HandlerMessage> =
self.protocol_config.clone().into();

let result = ConnectionHandler::select(oneshot_handler, ping_handler);
Ok(result)
Ok(oneshot_handler)
}

fn handle_established_outbound_connection(
&mut self,
connection_id: ConnectionId,
peer: PeerId,
addr: &Multiaddr,
role_override: Endpoint,
_connection_id: ConnectionId,
_peer: PeerId,
_addr: &Multiaddr,
_role_override: Endpoint,
) -> Result<THandler<Self>, ConnectionDenied> {
let ping_handler = self.ping.handle_established_outbound_connection(
connection_id,
peer,
addr,
role_override,
)?;
let oneshot_handler: OneShotHandler<ProtocolConfig, HandlerMessage, HandlerMessage> =
self.protocol_config.clone().into();
let result = ConnectionHandler::select(oneshot_handler, ping_handler);
Ok(result)
Ok(oneshot_handler)
}

fn on_swarm_event(&mut self, event: FromSwarm<'_, Self::ConnectionHandler>) {
Expand Down Expand Up @@ -230,37 +235,33 @@ impl NetworkBehaviour for ClientBehaviour {
fn on_connection_handler_event(
&mut self,
peer_id: PeerId,
cid: ConnectionId,
_cid: ConnectionId,
event: THandlerOutEvent<Self>,
) {
use ClientEvent::Particle;

match event {
Either::Left(HandlerMessage::InParticle(particle)) => {
HandlerMessage::InParticle(particle) => {
self.events.push_back(GenerateEvent(Particle {
particle,
sender: peer_id,
}))
}
Either::Right(ping) => self.ping.on_connection_handler_event(peer_id, cid, ping),
Either::Left(_) => {}
_ => {}
}
}

fn poll(
&mut self,
cx: &mut Context<'_>,
params: &mut impl PollParameters,
_params: &mut impl PollParameters,
) -> Poll<SwarmEventType> {
self.waker = Some(cx.waker().clone());

// just polling it to the end
while self.ping.poll(cx, params).is_ready() {}

if let Some(Poll::Ready(addresses)) = self.reconnect.as_mut().map(|r| r.poll_unpin(cx)) {
self.reconnect = None;
for addr in addresses {
self.events.push_back(ToSwarm::Dial { opts: addr.into() });
self.events.push_front(ToSwarm::Dial { opts: addr.into() });
}
}

Expand Down
Loading

0 comments on commit 5522bed

Please sign in to comment.