Skip to content

Commit

Permalink
Stuff metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
thomash-acinq committed Sep 9, 2021
1 parent a933bad commit c4ecd4a
Show file tree
Hide file tree
Showing 8 changed files with 71 additions and 36 deletions.
13 changes: 7 additions & 6 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala
Expand Up @@ -85,8 +85,8 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
}

def migration89(statement: Statement): Unit = {
statement.executeUpdate("CREATE TABLE audit.path_finding_metrics (amount_msat BIGINT NOT NULL, fees_msat BIGINT NOT NULL, success BOOLEAN NOT NULL, duration_ms BIGINT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL, is_mpp BOOLEAN NOT NULL, experiment_name TEXT NOT NULL)")
statement.executeUpdate("CREATE INDEX metrics_success_idx ON audit.path_finding_metrics(success)")
statement.executeUpdate("CREATE TABLE audit.path_finding_metrics (amount_msat BIGINT NOT NULL, fees_msat BIGINT NOT NULL, status TEXT NOT NULL, duration_ms BIGINT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL, is_mpp BOOLEAN NOT NULL, experiment_name TEXT NOT NULL, recipient_node_id TEXT NOT NULL)")
statement.executeUpdate("CREATE INDEX metrics_status_idx ON audit.path_finding_metrics(status)")
statement.executeUpdate("CREATE INDEX metrics_timestamp_idx ON audit.path_finding_metrics(timestamp)")
statement.executeUpdate("CREATE INDEX metrics_mpp_idx ON audit.path_finding_metrics(is_mpp)")
statement.executeUpdate("CREATE INDEX metrics_name_idx ON audit.path_finding_metrics(experiment_name)")
Expand All @@ -103,7 +103,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
statement.executeUpdate("CREATE TABLE audit.network_fees (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, tx_id TEXT NOT NULL, fee_sat BIGINT NOT NULL, tx_type TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.channel_events (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, capacity_sat BIGINT NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.channel_updates (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, fee_base_msat BIGINT NOT NULL, fee_proportional_millionths BIGINT NOT NULL, cltv_expiry_delta BIGINT NOT NULL, htlc_minimum_msat BIGINT NOT NULL, htlc_maximum_msat BIGINT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.path_finding_metrics (amount_msat BIGINT NOT NULL, fees_msat BIGINT NOT NULL, success BOOLEAN NOT NULL, duration_ms BIGINT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL, is_mpp BOOLEAN NOT NULL, experiment_name TEXT NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.path_finding_metrics (amount_msat BIGINT NOT NULL, fees_msat BIGINT NOT NULL, status TEXT NOT NULL, duration_ms BIGINT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL, is_mpp BOOLEAN NOT NULL, experiment_name TEXT NOT NULL, recipient_node_id TEXT NOT NULL)")

statement.executeUpdate("CREATE TABLE audit.channel_errors (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, error_name TEXT NOT NULL, error_message TEXT NOT NULL, is_fatal BOOLEAN NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE INDEX sent_timestamp_idx ON audit.sent(timestamp)")
Expand All @@ -118,7 +118,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
statement.executeUpdate("CREATE INDEX channel_updates_cid_idx ON audit.channel_updates(channel_id)")
statement.executeUpdate("CREATE INDEX channel_updates_nid_idx ON audit.channel_updates(node_id)")
statement.executeUpdate("CREATE INDEX channel_updates_timestamp_idx ON audit.channel_updates(timestamp)")
statement.executeUpdate("CREATE INDEX metrics_success_idx ON audit.path_finding_metrics(success)")
statement.executeUpdate("CREATE INDEX metrics_status_idx ON audit.path_finding_metrics(status)")
statement.executeUpdate("CREATE INDEX metrics_timestamp_idx ON audit.path_finding_metrics(timestamp)")
statement.executeUpdate("CREATE INDEX metrics_mpp_idx ON audit.path_finding_metrics(is_mpp)")
statement.executeUpdate("CREATE INDEX metrics_name_idx ON audit.path_finding_metrics(experiment_name)")
Expand Down Expand Up @@ -278,14 +278,15 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {

override def addPathFindingExperimentMetrics(m: PathFindingExperimentMetrics): Unit = withMetrics("audit/add-experiment-metrics", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO audit.path_finding_metrics VALUES (?, ?, ?, ?, ?, ?, ?)")) { statement =>
using(pg.prepareStatement("INSERT INTO audit.path_finding_metrics VALUES (?, ?, ?, ?, ?, ?, ?, ?)")) { statement =>
statement.setLong(1, m.amount.toLong)
statement.setLong(2, m.fees.toLong)
statement.setBoolean(3, m.success)
statement.setString(3, m.status)
statement.setLong(4, m.duration)
statement.setTimestamp(5, new Timestamp(m.timestamp))
statement.setBoolean(6, m.isMultiPart)
statement.setString(7, m.experimentName)
statement.setString(8, m.recipientNodeId.value.toHex)
statement.executeUpdate()
}
}
Expand Down
Expand Up @@ -92,8 +92,8 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
}

def migration67(statement: Statement): Unit = {
statement.executeUpdate("CREATE TABLE path_finding_metrics (amount_msat INTEGER NOT NULL, fees_msat INTEGER NOT NULL, success INTEGER NOT NULL, duration_ms INTEGER NOT NULL, timestamp INTEGER NOT NULL, is_mpp INTEGER NOT NULL, experiment_name TEXT NOT NULL)")
statement.executeUpdate("CREATE INDEX metrics_success_idx ON path_finding_metrics(success)")
statement.executeUpdate("CREATE TABLE path_finding_metrics (amount_msat INTEGER NOT NULL, fees_msat INTEGER NOT NULL, status TEXT NOT NULL, duration_ms INTEGER NOT NULL, timestamp INTEGER NOT NULL, is_mpp INTEGER NOT NULL, experiment_name TEXT NOT NULL, recipient_node_id BLOB NOT NULL)")
statement.executeUpdate("CREATE INDEX metrics_status_idx ON path_finding_metrics(status)")
statement.executeUpdate("CREATE INDEX metrics_timestamp_idx ON path_finding_metrics(timestamp)")
statement.executeUpdate("CREATE INDEX metrics_mpp_idx ON path_finding_metrics(is_mpp)")
statement.executeUpdate("CREATE INDEX metrics_name_idx ON path_finding_metrics(experiment_name)")
Expand All @@ -109,7 +109,7 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
statement.executeUpdate("CREATE TABLE channel_events (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, capacity_sat INTEGER NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE channel_errors (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, error_name TEXT NOT NULL, error_message TEXT NOT NULL, is_fatal INTEGER NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE channel_updates (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, fee_base_msat INTEGER NOT NULL, fee_proportional_millionths INTEGER NOT NULL, cltv_expiry_delta INTEGER NOT NULL, htlc_minimum_msat INTEGER NOT NULL, htlc_maximum_msat INTEGER NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE path_finding_metrics (amount_msat INTEGER NOT NULL, fees_msat INTEGER NOT NULL, success INTEGER NOT NULL, duration_ms INTEGER NOT NULL, timestamp INTEGER NOT NULL, is_mpp INTEGER NOT NULL, experiment_name TEXT NOT NULL)")
statement.executeUpdate("CREATE TABLE path_finding_metrics (amount_msat INTEGER NOT NULL, fees_msat INTEGER NOT NULL, status TEXT NOT NULL, duration_ms INTEGER NOT NULL, timestamp INTEGER NOT NULL, is_mpp INTEGER NOT NULL, experiment_name TEXT NOT NULL, recipient_node_id BLOB NOT NULL)")

statement.executeUpdate("CREATE INDEX sent_timestamp_idx ON sent(timestamp)")
statement.executeUpdate("CREATE INDEX received_timestamp_idx ON received(timestamp)")
Expand All @@ -123,7 +123,7 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
statement.executeUpdate("CREATE INDEX channel_updates_cid_idx ON channel_updates(channel_id)")
statement.executeUpdate("CREATE INDEX channel_updates_nid_idx ON channel_updates(node_id)")
statement.executeUpdate("CREATE INDEX channel_updates_timestamp_idx ON channel_updates(timestamp)")
statement.executeUpdate("CREATE INDEX metrics_success_idx ON path_finding_metrics(success)")
statement.executeUpdate("CREATE INDEX metrics_status_idx ON path_finding_metrics(status)")
statement.executeUpdate("CREATE INDEX metrics_timestamp_idx ON path_finding_metrics(timestamp)")
statement.executeUpdate("CREATE INDEX metrics_mpp_idx ON path_finding_metrics(is_mpp)")
statement.executeUpdate("CREATE INDEX metrics_name_idx ON path_finding_metrics(experiment_name)")
Expand Down Expand Up @@ -271,14 +271,15 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
}

override def addPathFindingExperimentMetrics(m: PathFindingExperimentMetrics): Unit = {
using(sqlite.prepareStatement("INSERT INTO path_finding_metrics VALUES (?, ?, ?, ?, ?, ?, ?)")) { statement =>
using(sqlite.prepareStatement("INSERT INTO path_finding_metrics VALUES (?, ?, ?, ?, ?, ?, ?, ?)")) { statement =>
statement.setLong(1, m.amount.toLong)
statement.setLong(2, m.fees.toLong)
statement.setBoolean(3, m.success)
statement.setString(3, m.status)
statement.setLong(4, m.duration)
statement.setLong(5, m.timestamp)
statement.setBoolean(6, m.isMultiPart)
statement.setString(7, m.experimentName)
statement.setBytes(8, m.recipientNodeId.value.toArray)
statement.executeUpdate()
}
}
Expand Down
Expand Up @@ -232,4 +232,11 @@ object PaymentFailure {

}

case class PathFindingExperimentMetrics(amount: MilliSatoshi, fees: MilliSatoshi, success: Boolean, duration: Long, timestamp: Long, isMultiPart: Boolean, experimentName: String)
case class PathFindingExperimentMetrics(amount: MilliSatoshi,
fees: MilliSatoshi,
status: String,
duration: Long,
timestamp: Long,
isMultiPart: Boolean,
experimentName: String,
recipientNodeId: PublicKey)
Expand Up @@ -235,19 +235,27 @@ class MultiPartPaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig,
log.info("multi-part payment succeeded")
reply(request.replyTo, paymentSent)
}
val success = event.isRight
val status = event match {
case Right(s: PaymentSent) => "SUCCESS"
case Left(f: PaymentFailed) =>
if (f.failures.exists({ case r: RemoteFailure => r.e.originNode == cfg.recipientNodeId case _ => false })) {
"RECIPIENT_FAILURE"
} else {
"FAILURE"
}
}
val now = System.currentTimeMillis
val duration = now - start
if (cfg.recordMetrics) {
val fees = event match {
case Left(paymentFailed) => 0 msat
case Left(paymentFailed) => request.routeParams.getMaxFee(cfg.recipientAmount)
case Right(paymentSent) => paymentSent.feesPaid
}
context.system.eventStream.publish(PathFindingExperimentMetrics(cfg.recipientAmount, fees, success, duration, now, isMultiPart = true, request.routeParams.experimentName))
context.system.eventStream.publish(PathFindingExperimentMetrics(cfg.recipientAmount, fees, status, duration, now, isMultiPart = true, request.routeParams.experimentName, cfg.recipientNodeId))
}
Metrics.SentPaymentDuration
.withTag(Tags.MultiPart, Tags.MultiPartType.Parent)
.withTag(Tags.Success, value = success)
.withTag(Tags.Success, value = (status == "SUCCESS"))
.record(duration, TimeUnit.MILLISECONDS)
if (retriedFailedChannels) {
Metrics.RetryFailedChannelsResult.withTag(Tags.Success, event.isRight).increment()
Expand Down
Expand Up @@ -289,23 +289,35 @@ class PaymentLifecycle(nodeParams: NodeParams, cfg: SendPaymentConfig, router: A
}
request.replyTo ! result
if (cfg.publishEvent) context.system.eventStream.publish(result)
val success = result.isInstanceOf[PaymentSent]
val status = result match {
case s: PaymentSent => "SUCCESS"
case f: PaymentFailed =>
if (f.failures.exists({ case r: RemoteFailure => r.e.originNode == cfg.recipientNodeId case _ => false })) {
"RECIPIENT_FAILURE"
} else {
"FAILURE"
}
case _ => "INTERNAL_ERROR"
}
val now = System.currentTimeMillis
val duration = now - start
if (cfg.recordMetrics) {
val fees = result match {
case paymentSent: PaymentSent => paymentSent.feesPaid
case _ => 0 msat
case _ => request match {
case s: SendPaymentToNode => s.routeParams.getMaxFee(cfg.recipientAmount)
case _: SendPaymentToRoute => 0 msat
}
}
request match {
case SendPaymentToNode(_, _, _, _, _, routeParams) =>
context.system.eventStream.publish(PathFindingExperimentMetrics(request.finalPayload.amount, fees, success, duration, now, isMultiPart = false, routeParams.experimentName))
context.system.eventStream.publish(PathFindingExperimentMetrics(request.finalPayload.amount, fees, status, duration, now, isMultiPart = false, routeParams.experimentName, cfg.recipientNodeId))
case SendPaymentToRoute(_, _, _, _) => ()
}
}
Metrics.SentPaymentDuration
.withTag(Tags.MultiPart, if (cfg.id != cfg.parentId) Tags.MultiPartType.Child else Tags.MultiPartType.Disabled)
.withTag(Tags.Success, value = success)
.withTag(Tags.Success, value = (status == "SUCCESS"))
.record(duration, TimeUnit.MILLISECONDS)
stop(FSM.Normal)
}
Expand Down
Expand Up @@ -616,7 +616,7 @@ class AuditDbSpec extends AnyFunSuite {

test("add experiment metrics") {
forAllDbs { dbs =>
dbs.audit.addPathFindingExperimentMetrics(PathFindingExperimentMetrics(100000000 msat, 3000 msat, success = true, 37, System.currentTimeMillis, isMultiPart = false, "my-test-experiment"))
dbs.audit.addPathFindingExperimentMetrics(PathFindingExperimentMetrics(100000000 msat, 3000 msat, status = "SUCCESS", 37, System.currentTimeMillis, isMultiPart = false, "my-test-experiment", randomKey().publicKey))
}
}

Expand Down
Expand Up @@ -100,7 +100,7 @@ class MultiPartPaymentLifecycleSpec extends TestKitBaseClass with FixtureAnyFunS
assert(result.nonTrampolineFees === 100.msat)

val metrics = metricsListener.expectMsgType[PathFindingExperimentMetrics]
assert(metrics.success)
assert(metrics.status == "SUCCESS")
assert(metrics.experimentName == "my-test-experiment")
assert(metrics.amount == finalAmount)
assert(metrics.fees == 100.msat)
Expand Down Expand Up @@ -136,7 +136,7 @@ class MultiPartPaymentLifecycleSpec extends TestKitBaseClass with FixtureAnyFunS
assert(result.nonTrampolineFees === 200.msat)

val metrics = metricsListener.expectMsgType[PathFindingExperimentMetrics]
assert(metrics.success)
assert(metrics.status == "SUCCESS")
assert(metrics.experimentName == "my-test-experiment")
assert(metrics.amount == finalAmount)
assert(metrics.fees == 200200.msat)
Expand All @@ -163,7 +163,7 @@ class MultiPartPaymentLifecycleSpec extends TestKitBaseClass with FixtureAnyFunS
assert(result.trampolineFees === 1000.msat)

val metrics = metricsListener.expectMsgType[PathFindingExperimentMetrics]
assert(metrics.success)
assert(metrics.status == "SUCCESS")
assert(metrics.experimentName == "my-test-experiment")
assert(metrics.amount == finalAmount)
assert(metrics.fees == 1200.msat)
Expand Down Expand Up @@ -196,7 +196,7 @@ class MultiPartPaymentLifecycleSpec extends TestKitBaseClass with FixtureAnyFunS
assert(result.nonTrampolineFees === 200.msat)

val metrics = metricsListener.expectMsgType[PathFindingExperimentMetrics]
assert(metrics.success)
assert(metrics.status == "SUCCESS")
assert(metrics.experimentName == "my-test-experiment")
assert(metrics.amount == finalAmount)
assert(metrics.fees == 200.msat)
Expand Down Expand Up @@ -238,7 +238,7 @@ class MultiPartPaymentLifecycleSpec extends TestKitBaseClass with FixtureAnyFunS
assert(result.nonTrampolineFees === 200.msat)

val metrics = metricsListener.expectMsgType[PathFindingExperimentMetrics]
assert(metrics.success)
assert(metrics.status == "SUCCESS")
assert(metrics.experimentName == "my-test-experiment")
assert(metrics.amount == finalAmount)
assert(metrics.fees == 200.msat)
Expand Down Expand Up @@ -305,7 +305,7 @@ class MultiPartPaymentLifecycleSpec extends TestKitBaseClass with FixtureAnyFunS
assert(result.amountWithFees === 1000200.msat)

val metrics = metricsListener.expectMsgType[PathFindingExperimentMetrics]
assert(metrics.success)
assert(metrics.status == "SUCCESS")
assert(metrics.experimentName == "my-test-experiment")
assert(metrics.amount == finalAmount)
assert(metrics.fees == 200.msat)
Expand Down Expand Up @@ -417,9 +417,10 @@ class MultiPartPaymentLifecycleSpec extends TestKitBaseClass with FixtureAnyFunS
assert(result.failures.contains(LocalFailure(Nil, RetryExhausted)))

val metrics = metricsListener.expectMsgType[PathFindingExperimentMetrics]
assert(!metrics.success)
assert(metrics.status == "FAILURE")
assert(metrics.experimentName == "my-test-experiment")
assert(metrics.amount == finalAmount)
assert(metrics.fees == 15000.msat)
metricsListener.expectNoMessage()
}

Expand Down Expand Up @@ -447,9 +448,10 @@ class MultiPartPaymentLifecycleSpec extends TestKitBaseClass with FixtureAnyFunS
childPayFsm.expectNoMessage(100 millis)

val metrics = metricsListener.expectMsgType[PathFindingExperimentMetrics]
assert(!metrics.success)
assert(metrics.status == "FAILURE")
assert(metrics.experimentName == "my-test-experiment")
assert(metrics.amount == finalAmount)
assert(metrics.fees == 15000.msat)
metricsListener.expectNoMessage()
}

Expand All @@ -467,9 +469,10 @@ class MultiPartPaymentLifecycleSpec extends TestKitBaseClass with FixtureAnyFunS
assert(result.failures.length === 1)

val metrics = metricsListener.expectMsgType[PathFindingExperimentMetrics]
assert(!metrics.success)
assert(metrics.status == "FAILURE")
assert(metrics.experimentName == "my-test-experiment")
assert(metrics.amount == finalAmount)
assert(metrics.fees == 15000.msat)
metricsListener.expectNoMessage()
}

Expand Down

0 comments on commit c4ecd4a

Please sign in to comment.