Skip to content

Commit

Permalink
More robust channels timestamps (#2674)
Browse files Browse the repository at this point in the history
Previous implementation had the advantage of being all in one place, but it left holes:
- `last_connected_timestamp` was only set after the first disconnection
- in some corner cases the `closed_timestamp` was never set (nothing at stake, funding tx timeout, post-restart)
  • Loading branch information
pm47 committed May 25, 2023
1 parent 84f1d03 commit 71968d0
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,17 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
val encoded = channelDataCodec.encode(data).require.toByteArray
using(pg.prepareStatement(
"""
| INSERT INTO local.channels (channel_id, remote_node_id, data, json, is_closed)
| VALUES (?, ?, ?, ?::JSONB, FALSE)
| INSERT INTO local.channels (channel_id, remote_node_id, data, json, created_timestamp, last_connected_timestamp, is_closed)
| VALUES (?, ?, ?, ?::JSONB, ?, ?, FALSE)
| ON CONFLICT (channel_id)
| DO UPDATE SET data = EXCLUDED.data, json = EXCLUDED.json ;
| """.stripMargin)) { statement =>
statement.setString(1, data.channelId.toHex)
statement.setString(2, data.remoteNodeId.toHex)
statement.setBytes(3, encoded)
statement.setString(4, serialization.write(data))
statement.setTimestamp(5, Timestamp.from(Instant.now()))
statement.setTimestamp(6, Timestamp.from(Instant.now()))
statement.executeUpdate()
}
}
Expand All @@ -194,9 +196,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
}
}

/**
* Helper method to factor updating timestamp columns
*/
/** Helper method to factor updating timestamp columns */
private def updateChannelMetaTimestampColumn(channelId: ByteVector32, columnName: String): Unit = {
inTransaction(IsolationLevel.TRANSACTION_READ_UNCOMMITTED) { pg =>
using(pg.prepareStatement(s"UPDATE local.channels SET $columnName=? WHERE channel_id=?")) { statement =>
Expand All @@ -209,11 +209,9 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit

override def updateChannelMeta(channelId: ByteVector32, event: ChannelEvent.EventType): Unit = {
val timestampColumn_opt = event match {
case ChannelEvent.EventType.Created => Some("created_timestamp")
case ChannelEvent.EventType.Connected => Some("last_connected_timestamp")
case ChannelEvent.EventType.PaymentReceived => Some("last_payment_received_timestamp")
case ChannelEvent.EventType.PaymentSent => Some("last_payment_sent_timestamp")
case _: ChannelEvent.EventType.Closed => Some("closed_timestamp")
case _ => None
}
timestampColumn_opt.foreach(updateChannelMetaTimestampColumn(channelId, _))
Expand All @@ -231,8 +229,9 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
statement.executeUpdate()
}

using(pg.prepareStatement("UPDATE local.channels SET is_closed=TRUE WHERE channel_id=?")) { statement =>
statement.setString(1, channelId.toHex)
using(pg.prepareStatement("UPDATE local.channels SET is_closed=TRUE, closed_timestamp=? WHERE channel_id=?")) { statement =>
statement.setTimestamp(1, Timestamp.from(Instant.now()))
statement.setString(2, channelId.toHex)
statement.executeUpdate()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,11 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging {
update.setBytes(1, encoded)
update.setBytes(2, data.channelId.toArray)
if (update.executeUpdate() == 0) {
using(sqlite.prepareStatement("INSERT INTO local_channels (channel_id, data, is_closed) VALUES (?, ?, 0)")) { statement =>
using(sqlite.prepareStatement("INSERT INTO local_channels (channel_id, data, created_timestamp, last_connected_timestamp, is_closed) VALUES (?, ?, ?, ?, 0)")) { statement =>
statement.setBytes(1, data.channelId.toArray)
statement.setBytes(2, encoded)
statement.setLong(3, TimestampMilli.now().toLong)
statement.setLong(4, TimestampMilli.now().toLong)
statement.executeUpdate()
}
}
Expand All @@ -135,11 +137,9 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging {

override def updateChannelMeta(channelId: ByteVector32, event: ChannelEvent.EventType): Unit = {
val timestampColumn_opt = event match {
case ChannelEvent.EventType.Created => Some("created_timestamp")
case ChannelEvent.EventType.Connected => Some("last_connected_timestamp")
case ChannelEvent.EventType.PaymentReceived => Some("last_payment_received_timestamp")
case ChannelEvent.EventType.PaymentSent => Some("last_payment_sent_timestamp")
case _: ChannelEvent.EventType.Closed => Some("closed_timestamp")
case _ => None
}
timestampColumn_opt.foreach(updateChannelMetaTimestampColumn(channelId, _))
Expand All @@ -156,8 +156,9 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging {
statement.executeUpdate()
}

using(sqlite.prepareStatement("UPDATE local_channels SET is_closed=1 WHERE channel_id=?")) { statement =>
statement.setBytes(1, channelId.toArray)
using(sqlite.prepareStatement("UPDATE local_channels SET is_closed=1, closed_timestamp=? WHERE channel_id=?")) { statement =>
statement.setLong(1, TimestampMilli.now().toLong)
statement.setBytes(2, channelId.toArray)
statement.executeUpdate()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,10 @@ class ChannelsDbSpec extends AnyFunSuite {
db.addOrUpdateChannel(channel1)
db.addOrUpdateChannel(channel2)

// make sure initially all metadata are empty
assert(getTimestamp(dbs, channel1.channelId, "created_timestamp").isEmpty)
assert(getTimestamp(dbs, channel1.channelId, "created_timestamp").nonEmpty)
assert(getTimestamp(dbs, channel1.channelId, "last_payment_sent_timestamp").isEmpty)
assert(getTimestamp(dbs, channel1.channelId, "last_payment_received_timestamp").isEmpty)
assert(getTimestamp(dbs, channel1.channelId, "last_connected_timestamp").isEmpty)
assert(getTimestamp(dbs, channel1.channelId, "last_connected_timestamp").nonEmpty)
assert(getTimestamp(dbs, channel1.channelId, "closed_timestamp").isEmpty)

db.updateChannelMeta(channel1.channelId, ChannelEvent.EventType.Created)
Expand All @@ -146,14 +145,13 @@ class ChannelsDbSpec extends AnyFunSuite {
db.updateChannelMeta(channel1.channelId, ChannelEvent.EventType.Connected)
assert(getTimestamp(dbs, channel1.channelId, "last_connected_timestamp").nonEmpty)

db.updateChannelMeta(channel1.channelId, ChannelEvent.EventType.Closed(null))
db.removeChannel(channel1.channelId)
assert(getTimestamp(dbs, channel1.channelId, "closed_timestamp").nonEmpty)

// make sure all metadata are still empty for channel 2
assert(getTimestamp(dbs, channel2.channelId, "created_timestamp").isEmpty)
assert(getTimestamp(dbs, channel2.channelId, "created_timestamp").nonEmpty)
assert(getTimestamp(dbs, channel2.channelId, "last_payment_sent_timestamp").isEmpty)
assert(getTimestamp(dbs, channel2.channelId, "last_payment_received_timestamp").isEmpty)
assert(getTimestamp(dbs, channel2.channelId, "last_connected_timestamp").isEmpty)
assert(getTimestamp(dbs, channel2.channelId, "last_connected_timestamp").nonEmpty)
assert(getTimestamp(dbs, channel2.channelId, "closed_timestamp").isEmpty)
}
}
Expand Down

0 comments on commit 71968d0

Please sign in to comment.