Skip to content

Commit

Permalink
Gossip metrics (#1372)
Browse files Browse the repository at this point in the history
Adding metrics for channel queries and gossip KPIs.
  • Loading branch information
t-bast committed Apr 15, 2020
1 parent 9cb14ee commit d563067
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 15 deletions.
41 changes: 39 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Monitoring.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,46 @@
package fr.acinq.eclair.router

import fr.acinq.eclair.router.Router.GossipDecision
import fr.acinq.eclair.wire.ChannelUpdate
import fr.acinq.eclair.{LongToBtcAmount, MilliSatoshi, getSimpleClassName}
import kamon.Kamon
import kamon.metric.Counter
import kamon.metric.{Counter, MeasurementUnit}

object Monitoring {

object Metrics {
val FindRouteDuration = Kamon.timer("router.find-route.duration", "Path-finding duration")
val RouteLength = Kamon.histogram("router.find-route.length", "Path-finding result length")

object QueryChannelRange {
val Blocks = Kamon.histogram("router.gossip.query-channel-range.blocks", "Number of blocks requested in query-channel-range")
val Replies = Kamon.histogram("router.gossip.query-channel-range.replies", "Number of reply-channel-range replies sent")
}

object ReplyChannelRange {
val ShortChannelIds = Kamon.histogram("router.gossip.reply-channel-range.ids", "Number of short channel ids in reply-channel-range")
val Blocks = Kamon.histogram("router.gossip.reply-channel-range.blocks", "Number of blocks in reply-channel-range")
val NewChannelAnnouncements = Kamon.histogram("router.gossip.reply-channel-range.new-channel-announcements", "Number of new channel announcements discovered in reply-channel-range")
val NewChannelUpdates = Kamon.histogram("router.gossip.reply-channel-range.new-channel-updates", "Number of new channel updates discovered in reply-channel-range")
}

object QueryShortChannelIds {
val Nodes = Kamon.histogram("router.gossip.query-short-channel-ids.node-announcements", "Number of node announcements sent in response to a query-short-channel-ids")
val ChannelAnnouncements = Kamon.histogram("router.gossip.query-short-channel-ids.channel-announcements", "Number of channel announcements sent in response to a query-short-channel-ids")
val ChannelUpdates = Kamon.histogram("router.gossip.query-short-channel-ids.channel-updates", "Number of channel updates sent in response to a query-short-channel-ids")
}

val Nodes = Kamon.gauge("router.gossip.nodes", "Number of known nodes in the network")
val Channels = Kamon.gauge("router.gossip.channels", "Number of known channels in the network")
val SyncProgress = Kamon.gauge("router.gossip.sync-progress", "Routing table sync progress (%)", MeasurementUnit.percentage)

private val ChannelUpdateRefreshRate = Kamon.histogram("router.gossip.channel-update-refresh-rate", "Rate at which channels update their fee policy (minutes)")

def channelUpdateRefreshed(update: ChannelUpdate, previous: ChannelUpdate, public: Boolean): Unit = {
val elapsed = (update.timestamp - previous.timestamp) / 60
ChannelUpdateRefreshRate.withTag(Tags.Announced, public).record(elapsed)
}

private val GossipResult = Kamon.counter("router.gossip.result")

def gossipResult(decision: GossipDecision): Counter = decision match {
Expand All @@ -36,8 +66,15 @@ object Monitoring {
}

object Tags {
val NumberOfRoutes = "numRoutes"
val Amount = "amount"
val Announced = "announced"
val Direction = "direction"
val NumberOfRoutes = "numRoutes"

object Directions {
val Incoming = "incoming"
val Outgoing = "outgoing"
}

/**
* We split amounts in buckets that can be used to tag metrics.
Expand Down
12 changes: 10 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import fr.acinq.eclair.io.Peer.PeerRoutingMessage
import fr.acinq.eclair.payment.PaymentRequest.ExtraHop
import fr.acinq.eclair.router.Graph.GraphStructure.DirectedGraph
import fr.acinq.eclair.router.Graph.WeightRatios
import fr.acinq.eclair.router.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.transactions.Scripts
import fr.acinq.eclair.wire._
import kamon.context.Context
Expand All @@ -47,9 +48,10 @@ import scala.util.Try
*/
class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[Promise[Done]] = None) extends FSMDiagnosticActorLogging[Router.State, Router.Data] {

import ExecutionContext.Implicits.global
import Router._

import ExecutionContext.Implicits.global

// we pass these to helpers classes so that they have the logging context
implicit def implicitLog: DiagnosticLoggingAdapter = diagLog

Expand All @@ -66,6 +68,8 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
log.info("loading network announcements from db...")
val channels = db.listChannels()
val nodes = db.listNodes()
Metrics.Nodes.withoutTags().update(nodes.size)
Metrics.Channels.withoutTags().update(channels.size)
log.info("loaded from db: channels={} nodes={}", channels.size, nodes.size)
val initChannels = channels
// this will be used to calculate routes
Expand Down Expand Up @@ -101,6 +105,7 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
when(NORMAL) {

case Event(SyncProgress(progress), d: Data) =>
Metrics.SyncProgress.withoutTags().update(100 * progress)
if (d.stats.isEmpty && progress == 1.0 && d.channels.nonEmpty) {
log.info("initial routing sync done: computing network statistics")
self ! TickComputeNetworkStats
Expand Down Expand Up @@ -128,6 +133,9 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[

case Event(TickComputeNetworkStats, d) =>
if (d.channels.nonEmpty) {
Metrics.Nodes.withoutTags().update(d.nodes.size)
Metrics.Channels.withTag(Tags.Announced, value = true).update(d.channels.size)
Metrics.Channels.withTag(Tags.Announced, value = false).update(d.privateChannels.size)
log.info("re-computing network statistics")
stay using d.copy(stats = NetworkStats.computeStats(d.channels.values))
} else {
Expand Down Expand Up @@ -219,7 +227,7 @@ class Router(val nodeParams: NodeParams, watcher: ActorRef, initialized: Option[
stay using Sync.handleReplyChannelRange(d, nodeParams.routerConf, RemoteGossip(peerConnection, remoteNodeId), r)

case Event(PeerRoutingMessage(peerConnection, remoteNodeId, q: QueryShortChannelIds), d) =>
Sync.handleQueryShortChannelIds(d.nodes, d.channels, nodeParams.routerConf, RemoteGossip(peerConnection, remoteNodeId), q)
Sync.handleQueryShortChannelIds(d.nodes, d.channels, RemoteGossip(peerConnection, remoteNodeId), q)
stay

case Event(PeerRoutingMessage(peerConnection, remoteNodeId, r: ReplyShortChannelIdsEnd), d) =>
Expand Down
29 changes: 19 additions & 10 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Sync.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import akka.event.LoggingAdapter
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.eclair.crypto.TransportHandler
import fr.acinq.eclair.router.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.wire._
import fr.acinq.eclair.{ShortChannelId, serializationResult}
Expand Down Expand Up @@ -68,6 +69,7 @@ object Sync {
def handleQueryChannelRange(channels: SortedMap[ShortChannelId, PublicChannel], routerConf: RouterConf, origin: RemoteGossip, q: QueryChannelRange)(implicit ctx: ActorContext, log: LoggingAdapter): Unit = {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
ctx.sender ! TransportHandler.ReadAck(q)
Metrics.QueryChannelRange.Blocks.withoutTags().record(q.numberOfBlocks)
Kamon.runWithContextEntry(remoteNodeIdKey, origin.nodeId.toString) {
Kamon.runWithSpan(Kamon.spanBuilder("query-channel-range").start(), finishSpan = true) {
log.info("received query_channel_range with firstBlockNum={} numberOfBlocks={} extendedQueryFlags_opt={}", q.firstBlockNum, q.numberOfBlocks, q.tlvStream)
Expand All @@ -77,11 +79,13 @@ object Sync {
val chunks = Kamon.runWithSpan(Kamon.spanBuilder("split-channel-ids").start(), finishSpan = true) {
split(shortChannelIds, q.firstBlockNum, q.numberOfBlocks, routerConf.channelRangeChunkSize)
}

Metrics.QueryChannelRange.Replies.withoutTags().record(chunks.size)
Kamon.runWithSpan(Kamon.spanBuilder("compute-timestamps-checksums").start(), finishSpan = true) {
chunks.foreach { chunk =>
val reply = buildReplyChannelRange(chunk, q.chainHash, routerConf.encodingType, q.queryFlags_opt, channels)
origin.peerConnection ! reply
Metrics.ReplyChannelRange.Blocks.withTag(Tags.Direction, Tags.Directions.Outgoing).record(reply.numberOfBlocks)
Metrics.ReplyChannelRange.ShortChannelIds.withTag(Tags.Direction, Tags.Directions.Outgoing).record(reply.shortChannelIds.array.size)
}
}
}
Expand All @@ -92,9 +96,11 @@ object Sync {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
ctx.sender ! TransportHandler.ReadAck(r)

Metrics.ReplyChannelRange.Blocks.withTag(Tags.Direction, Tags.Directions.Incoming).record(r.numberOfBlocks)
Metrics.ReplyChannelRange.ShortChannelIds.withTag(Tags.Direction, Tags.Directions.Incoming).record(r.shortChannelIds.array.size)

Kamon.runWithContextEntry(remoteNodeIdKey, origin.nodeId.toString) {
Kamon.runWithSpan(Kamon.spanBuilder("reply-channel-range").start(), finishSpan = true) {

@tailrec
def loop(ids: List[ShortChannelId], timestamps: List[ReplyChannelRangeTlv.Timestamps], checksums: List[ReplyChannelRangeTlv.Checksums], acc: List[ShortChannelIdAndFlag] = List.empty[ShortChannelIdAndFlag]): List[ShortChannelIdAndFlag] = {
ids match {
Expand All @@ -109,7 +115,6 @@ object Sync {

val timestamps_opt = r.timestamps_opt.map(_.timestamps).getOrElse(List.empty[ReplyChannelRangeTlv.Timestamps])
val checksums_opt = r.checksums_opt.map(_.checksums).getOrElse(List.empty[ReplyChannelRangeTlv.Checksums])

val shortChannelIdAndFlags = Kamon.runWithSpan(Kamon.spanBuilder("compute-flags").start(), finishSpan = true) {
loop(r.shortChannelIds.array, timestamps_opt, checksums_opt)
}
Expand All @@ -121,6 +126,8 @@ object Sync {
(c1, u1)
}
log.info(s"received reply_channel_range with {} channels, we're missing {} channel announcements and {} updates, format={}", r.shortChannelIds.array.size, channelCount, updatesCount, r.shortChannelIds.encoding)
Metrics.ReplyChannelRange.NewChannelAnnouncements.withoutTags().record(channelCount)
Metrics.ReplyChannelRange.NewChannelUpdates.withoutTags().record(updatesCount)

def buildQuery(chunk: List[ShortChannelIdAndFlag]): QueryShortChannelIds = {
// always encode empty lists as UNCOMPRESSED
Expand Down Expand Up @@ -151,15 +158,13 @@ object Sync {
}
}

def handleQueryShortChannelIds(nodes: Map[PublicKey, NodeAnnouncement], channels: SortedMap[ShortChannelId, PublicChannel], routerConf: RouterConf, origin: RemoteGossip, q: QueryShortChannelIds)(implicit ctx: ActorContext, log: LoggingAdapter): Unit = {
def handleQueryShortChannelIds(nodes: Map[PublicKey, NodeAnnouncement], channels: SortedMap[ShortChannelId, PublicChannel], origin: RemoteGossip, q: QueryShortChannelIds)(implicit ctx: ActorContext, log: LoggingAdapter): Unit = {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
ctx.sender ! TransportHandler.ReadAck(q)

Kamon.runWithContextEntry(remoteNodeIdKey, origin.nodeId.toString) {
Kamon.runWithSpan(Kamon.spanBuilder("query-short-channel-ids").start(), finishSpan = true) {

val flags = q.queryFlags_opt.map(_.array).getOrElse(List.empty[Long])

var channelCount = 0
var updateCount = 0
var nodeCount = 0
Expand All @@ -180,6 +185,9 @@ object Sync {
origin.peerConnection ! na
}
)
Metrics.QueryShortChannelIds.Nodes.withoutTags().record(nodeCount)
Metrics.QueryShortChannelIds.ChannelAnnouncements.withoutTags().record(channelCount)
Metrics.QueryShortChannelIds.ChannelUpdates.withoutTags().record(updateCount)
log.info("received query_short_channel_ids with {} items, sent back {} channels and {} updates and {} nodes", q.shortChannelIds.array.size, channelCount, updateCount, nodeCount)
origin.peerConnection ! ReplyShortChannelIdsEnd(q.chainHash, 1)
}
Expand Down Expand Up @@ -428,11 +436,12 @@ object Sync {
if (it.hasNext) {
val id = it.next()
val currentHeight = currentChunk.head.blockHeight
if (id.blockHeight == currentHeight)
if (id.blockHeight == currentHeight) {
loop(id :: currentChunk, acc) // same height => always add to the current chunk
else if (currentChunk.size < channelRangeChunkSize) // different height but we're under the size target => add to the current chunk
loop(id :: currentChunk, acc) // different height and over the size target => start a new chunk
else {
} else if (currentChunk.size < channelRangeChunkSize) {
loop(id :: currentChunk, acc) // different height but we're under the size target => add to the current chunk
} else {
// different height and over the size target => start a new chunk
// we always prepend because it's more efficient so we have to reverse the current chunk
// for the first chunk, we make sure that we start at the request first block
// for the next chunks we start at the end of the range covered by the last chunk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import fr.acinq.eclair.blockchain.{UtxoStatus, ValidateRequest, ValidateResult,
import fr.acinq.eclair.channel.{BITCOIN_FUNDING_EXTERNAL_CHANNEL_SPENT, LocalChannelDown, LocalChannelUpdate}
import fr.acinq.eclair.crypto.TransportHandler
import fr.acinq.eclair.db.NetworkDb
import fr.acinq.eclair.router.Monitoring.Metrics
import fr.acinq.eclair.router.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.transactions.Scripts
import fr.acinq.eclair.wire._
Expand Down Expand Up @@ -295,6 +295,7 @@ object Validation {
d
} else if (pc.getChannelUpdateSameSideAs(u).isDefined) {
log.debug("updated channel_update for shortChannelId={} public={} flags={} {}", u.shortChannelId, publicChannel, u.channelFlags, u)
Metrics.channelUpdateRefreshed(u, pc.getChannelUpdateSameSideAs(u).get, publicChannel)
remoteOrigins.foreach(sendDecision(_, GossipDecision.Accepted(u)))
ctx.system.eventStream.publish(ChannelUpdatesReceived(u :: Nil))
db.updateChannel(u)
Expand Down Expand Up @@ -342,6 +343,7 @@ object Validation {
d
} else if (pc.getChannelUpdateSameSideAs(u).isDefined) {
log.debug("updated channel_update for shortChannelId={} public={} flags={} {}", u.shortChannelId, publicChannel, u.channelFlags, u)
Metrics.channelUpdateRefreshed(u, pc.getChannelUpdateSameSideAs(u).get, publicChannel)
remoteOrigins.foreach(sendDecision(_, GossipDecision.Accepted(u)))
ctx.system.eventStream.publish(ChannelUpdatesReceived(u :: Nil))
// we also need to update the graph
Expand Down

0 comments on commit d563067

Please sign in to comment.