Skip to content

Commit

Permalink
Add metrics to database
Browse files Browse the repository at this point in the history
  • Loading branch information
thomash-acinq committed Sep 1, 2021
1 parent 02c641c commit 5928c55
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 14 deletions.
4 changes: 3 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/db/AuditDb.scala
Expand Up @@ -21,7 +21,7 @@ import fr.acinq.bitcoin.{ByteVector32, Satoshi}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.AuditDb.{NetworkFee, Stats}
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent
import fr.acinq.eclair.payment.{PaymentReceived, PaymentRelayed, PaymentSent}
import fr.acinq.eclair.payment.{ExperimentMetrics, PaymentReceived, PaymentRelayed, PaymentSent}

import java.io.Closeable

Expand All @@ -41,6 +41,8 @@ trait AuditDb extends Closeable {

def addChannelUpdate(localChannelUpdate: LocalChannelUpdate): Unit

def addExperimentMetrics(metrics: ExperimentMetrics): Unit

def listSent(from: Long, to: Long): Seq[PaymentSent]

def listReceived(from: Long, to: Long): Seq[PaymentReceived]
Expand Down
Expand Up @@ -41,6 +41,7 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging {
context.system.eventStream.subscribe(self, classOf[ChannelStateChanged])
context.system.eventStream.subscribe(self, classOf[ChannelClosed])
context.system.eventStream.subscribe(self, classOf[LocalChannelUpdate])
context.system.eventStream.subscribe(self, classOf[ExperimentMetrics])

override def receive: Receive = {

Expand Down Expand Up @@ -128,6 +129,9 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with ActorLogging {
case _ => auditDb.addChannelUpdate(u)
}

case m: ExperimentMetrics =>
auditDb.addExperimentMetrics(m)

}

override def unhandled(message: Any): Unit = log.warning(s"unhandled msg=$message")
Expand Down
Expand Up @@ -166,6 +166,11 @@ case class DualAuditDb(sqlite: SqliteAuditDb, postgres: PgAuditDb) extends Audit
sqlite.addChannelUpdate(localChannelUpdate)
}

override def addExperimentMetrics(metrics: ExperimentMetrics): Unit = {
runAsync(postgres.addExperimentMetrics(metrics))
sqlite.addExperimentMetrics(metrics)
}

override def listSent(from: Long, to: Long): Seq[PaymentSent] = {
runAsync(postgres.listSent(from, to))
sqlite.listSent(from, to)
Expand Down
31 changes: 29 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala
Expand Up @@ -36,7 +36,7 @@ import javax.sql.DataSource

object PgAuditDb {
val DB_NAME = "audit"
val CURRENT_VERSION = 8
val CURRENT_VERSION = 9
}

class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
Expand Down Expand Up @@ -83,6 +83,13 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
statement.executeUpdate("CREATE INDEX channel_updates_timestamp_idx ON audit.channel_updates(timestamp)")
}

def migration89(statement: Statement): Unit = {
statement.executeUpdate("CREATE TABLE audit.metrics (amount_msat BIGINT NOT NULL, fees_msat BIGINT NOT NULL, success BOOLEAN NOT NULL, duration INTERVAL NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL, experiment_name TEXT NOT NULL)")
statement.executeUpdate("CREATE INDEX metrics_success_idx ON audit.metrics(success)")
statement.executeUpdate("CREATE INDEX metrics_timestamp_idx ON audit.metrics(timestamp)")
statement.executeUpdate("CREATE INDEX metrics_name_idx ON audit.metrics(experiment_name)")
}

getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE SCHEMA audit")
Expand All @@ -94,6 +101,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
statement.executeUpdate("CREATE TABLE audit.network_fees (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, tx_id TEXT NOT NULL, fee_sat BIGINT NOT NULL, tx_type TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.channel_events (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, capacity_sat BIGINT NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.channel_updates (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, fee_base_msat BIGINT NOT NULL, fee_proportional_millionths BIGINT NOT NULL, cltv_expiry_delta BIGINT NOT NULL, htlc_minimum_msat BIGINT NOT NULL, htlc_maximum_msat BIGINT NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE TABLE audit.metrics (amount_msat BIGINT NOT NULL, fees_msat BIGINT NOT NULL, success BOOLEAN NOT NULL, duration INTERVAL NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL, experiment_name TEXT NOT NULL)")

statement.executeUpdate("CREATE TABLE audit.channel_errors (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, error_name TEXT NOT NULL, error_message TEXT NOT NULL, is_fatal BOOLEAN NOT NULL, timestamp TIMESTAMP WITH TIME ZONE NOT NULL)")
statement.executeUpdate("CREATE INDEX sent_timestamp_idx ON audit.sent(timestamp)")
Expand All @@ -108,7 +116,10 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
statement.executeUpdate("CREATE INDEX channel_updates_cid_idx ON audit.channel_updates(channel_id)")
statement.executeUpdate("CREATE INDEX channel_updates_nid_idx ON audit.channel_updates(node_id)")
statement.executeUpdate("CREATE INDEX channel_updates_timestamp_idx ON audit.channel_updates(timestamp)")
case Some(v@(4 | 5 | 6 | 7)) =>
statement.executeUpdate("CREATE INDEX metrics_success_idx ON audit.metrics(success)")
statement.executeUpdate("CREATE INDEX metrics_timestamp_idx ON audit.metrics(timestamp)")
statement.executeUpdate("CREATE INDEX metrics_name_idx ON audit.metrics(experiment_name)")
case Some(v@(4 | 5 | 6 | 7 | 8)) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
if (v < 5) {
migration45(statement)
Expand All @@ -122,6 +133,9 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
if (v < 8) {
migration78(statement)
}
if (v < 9) {
migration89(statement)
}
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
Expand Down Expand Up @@ -259,6 +273,19 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
}
}

override def addExperimentMetrics(m: ExperimentMetrics): Unit = withMetrics("audit/add-experiment-metrics", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("INSERT INTO audit.metrics VALUES (?, ?, ?, INTERVAL '? millisecond', ?, ?)")) { statement =>
statement.setLong(1, m.amount.toLong)
statement.setLong(2, m.fees.toLong)
statement.setBoolean(3, m.success)
statement.setLong(4, m.duration)
statement.setTimestamp(5, new Timestamp(m.timestamp))
statement.setString(6, m.experimentName)
statement.executeUpdate()
}}
}

override def listSent(from: Long, to: Long): Seq[PaymentSent] =
inTransaction { pg =>
using(pg.prepareStatement("SELECT * FROM audit.sent WHERE timestamp BETWEEN ? AND ?")) { statement =>
Expand Down
Expand Up @@ -34,7 +34,7 @@ import java.util.UUID

object SqliteAuditDb {
val DB_NAME = "audit"
val CURRENT_VERSION = 6
val CURRENT_VERSION = 7
}

class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
Expand Down Expand Up @@ -91,6 +91,13 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
statement.executeUpdate("CREATE INDEX channel_updates_timestamp_idx ON channel_updates(timestamp)")
}

def migration67(statement: Statement): Unit = {
statement.executeUpdate("CREATE TABLE metrics (amount_msat INTEGER NOT NULL, fees_msat INTEGER NOT NULL, success INTEGER NOT NULL, duration INTEGER NOT NULL, timestamp INTEGER NOT NULL, experiment_name TEXT NOT NULL)")
statement.executeUpdate("CREATE INDEX metrics_success_idx ON metrics(success)")
statement.executeUpdate("CREATE INDEX metrics_timestamp_idx ON metrics(timestamp)")
statement.executeUpdate("CREATE INDEX metrics_name_idx ON metrics(experiment_name)")
}

getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE TABLE sent (amount_msat INTEGER NOT NULL, fees_msat INTEGER NOT NULL, recipient_amount_msat INTEGER NOT NULL, payment_id TEXT NOT NULL, parent_payment_id TEXT NOT NULL, payment_hash BLOB NOT NULL, payment_preimage BLOB NOT NULL, recipient_node_id BLOB NOT NULL, to_channel_id BLOB NOT NULL, timestamp INTEGER NOT NULL)")
Expand All @@ -101,6 +108,7 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
statement.executeUpdate("CREATE TABLE channel_events (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, capacity_sat INTEGER NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE channel_errors (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, error_name TEXT NOT NULL, error_message TEXT NOT NULL, is_fatal INTEGER NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE channel_updates (channel_id BLOB NOT NULL, node_id BLOB NOT NULL, fee_base_msat INTEGER NOT NULL, fee_proportional_millionths INTEGER NOT NULL, cltv_expiry_delta INTEGER NOT NULL, htlc_minimum_msat INTEGER NOT NULL, htlc_maximum_msat INTEGER NOT NULL, timestamp INTEGER NOT NULL)")
statement.executeUpdate("CREATE TABLE metrics (amount_msat INTEGER NOT NULL, fees_msat INTEGER NOT NULL, success INTEGER NOT NULL, duration INTEGER NOT NULL, timestamp INTEGER NOT NULL, experiment_name TEXT NOT NULL)")

statement.executeUpdate("CREATE INDEX sent_timestamp_idx ON sent(timestamp)")
statement.executeUpdate("CREATE INDEX received_timestamp_idx ON received(timestamp)")
Expand All @@ -114,7 +122,10 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
statement.executeUpdate("CREATE INDEX channel_updates_cid_idx ON channel_updates(channel_id)")
statement.executeUpdate("CREATE INDEX channel_updates_nid_idx ON channel_updates(node_id)")
statement.executeUpdate("CREATE INDEX channel_updates_timestamp_idx ON channel_updates(timestamp)")
case Some(v@(1 | 2 | 3 | 4 | 5)) =>
statement.executeUpdate("CREATE INDEX metrics_success_idx ON metrics(success)")
statement.executeUpdate("CREATE INDEX metrics_timestamp_idx ON metrics(timestamp)")
statement.executeUpdate("CREATE INDEX metrics_name_idx ON metrics(experiment_name)")
case Some(v@(1 | 2 | 3 | 4 | 5 | 6)) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
if (v < 2) {
migration12(statement)
Expand All @@ -131,6 +142,9 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
if (v < 6) {
migration56(statement)
}
if (v < 7) {
migration67(statement)
}
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
Expand Down Expand Up @@ -254,6 +268,18 @@ class SqliteAuditDb(sqlite: Connection) extends AuditDb with Logging {
}
}

override def addExperimentMetrics(m: ExperimentMetrics): Unit = {
using(sqlite.prepareStatement("INSERT INTO metrics VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
statement.setLong(1, m.amount.toLong)
statement.setLong(2, m.fees.toLong)
statement.setBoolean(3, m.success)
statement.setLong(4, m.duration)
statement.setLong(5, m.timestamp)
statement.setString(6, m.experimentName)
statement.executeUpdate()
}
}

override def listSent(from: Long, to: Long): Seq[PaymentSent] =
using(sqlite.prepareStatement("SELECT * FROM sent WHERE timestamp >= ? AND timestamp < ?")) { statement =>
statement.setLong(1, from)
Expand Down
Expand Up @@ -230,4 +230,6 @@ object PaymentFailure {
}))
}

}
}

case class ExperimentMetrics(amount: MilliSatoshi, fees: MilliSatoshi, success: Boolean, duration: Long, timestamp: Long, experimentName: String)
16 changes: 8 additions & 8 deletions eclair-core/src/test/scala/fr/acinq/eclair/db/AuditDbSpec.scala
Expand Up @@ -237,7 +237,7 @@ class AuditDbSpec extends AnyFunSuite {
val postMigrationDb = new SqliteAuditDb(connection)

using(connection.createStatement()) { statement =>
assert(getVersion(statement, "audit").contains(6))
assert(getVersion(statement, "audit").contains(SqliteAuditDb.CURRENT_VERSION))
}

postMigrationDb.add(ps1)
Expand Down Expand Up @@ -284,13 +284,13 @@ class AuditDbSpec extends AnyFunSuite {
postCheck = connection => {
val migratedDb = dbs.audit
using(connection.createStatement()) { statement =>
assert(getVersion(statement, "audit").contains(6))
assert(getVersion(statement, "audit").contains(SqliteAuditDb.CURRENT_VERSION))
}
migratedDb.add(e1)

val postMigrationDb = new SqliteAuditDb(connection)
using(connection.createStatement()) { statement =>
assert(getVersion(statement, "audit").contains(6))
assert(getVersion(statement, "audit").contains(SqliteAuditDb.CURRENT_VERSION))
}
postMigrationDb.add(e2)
}
Expand Down Expand Up @@ -362,7 +362,7 @@ class AuditDbSpec extends AnyFunSuite {
postCheck = connection => {
val migratedDb = dbs.audit
using(connection.createStatement()) { statement =>
assert(getVersion(statement, "audit").contains(6))
assert(getVersion(statement, "audit").contains(SqliteAuditDb.CURRENT_VERSION))
}
assert(migratedDb.listSent(50, 150).toSet === Set(
ps1.copy(id = pp1.id, recipientAmount = pp1.amount, parts = pp1 :: Nil),
Expand All @@ -372,7 +372,7 @@ class AuditDbSpec extends AnyFunSuite {

val postMigrationDb = new SqliteAuditDb(connection)
using(connection.createStatement()) { statement =>
assert(getVersion(statement, "audit").contains(6))
assert(getVersion(statement, "audit").contains(SqliteAuditDb.CURRENT_VERSION))
}
val ps2 = PaymentSent(UUID.randomUUID(), randomBytes32(), randomBytes32(), 1100 msat, randomKey().publicKey, Seq(
PaymentSent.PartialPayment(UUID.randomUUID(), 500 msat, 10 msat, randomBytes32(), None, 160),
Expand Down Expand Up @@ -467,7 +467,7 @@ class AuditDbSpec extends AnyFunSuite {

val postMigrationDb = new PgAuditDb()(dbs.datasource)
using(connection.createStatement()) { statement =>
assert(getVersion(statement, "audit").contains(8))
assert(getVersion(statement, "audit").contains(PgAuditDb.CURRENT_VERSION))
}
val relayed3 = TrampolinePaymentRelayed(randomBytes32(), Seq(PaymentRelayed.Part(450 msat, randomBytes32()), PaymentRelayed.Part(500 msat, randomBytes32())), Seq(PaymentRelayed.Part(800 msat, randomBytes32())), randomKey().publicKey, 700 msat, 150)
postMigrationDb.add(relayed3)
Expand Down Expand Up @@ -544,13 +544,13 @@ class AuditDbSpec extends AnyFunSuite {
postCheck = connection => {
val migratedDb = dbs.audit
using(connection.createStatement()) { statement =>
assert(getVersion(statement, "audit").contains(6))
assert(getVersion(statement, "audit").contains(SqliteAuditDb.CURRENT_VERSION))
}
assert(migratedDb.listRelayed(100, 120) === Seq(relayed1, relayed2))

val postMigrationDb = new SqliteAuditDb(connection)
using(connection.createStatement()) { statement =>
assert(getVersion(statement, "audit").contains(6))
assert(getVersion(statement, "audit").contains(SqliteAuditDb.CURRENT_VERSION))
}
val relayed3 = TrampolinePaymentRelayed(randomBytes32(), Seq(PaymentRelayed.Part(450 msat, randomBytes32()), PaymentRelayed.Part(500 msat, randomBytes32())), Seq(PaymentRelayed.Part(800 msat, randomBytes32())), randomKey().publicKey, 700 msat, 150)
postMigrationDb.add(relayed3)
Expand Down

0 comments on commit 5928c55

Please sign in to comment.