Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework the db version management #1775

Merged
merged 4 commits into from Apr 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 40 additions & 4 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/jdbc/JdbcUtils.scala
Expand Up @@ -16,15 +16,15 @@

package fr.acinq.eclair.db.jdbc

import java.sql.{Connection, ResultSet, Statement}
import java.util.UUID

import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.MilliSatoshi
import javax.sql.DataSource
import org.sqlite.SQLiteConnection
import scodec.Codec
import scodec.bits.{BitVector, ByteVector}

import java.sql.{Connection, ResultSet, Statement}
import java.util.UUID
import javax.sql.DataSource
import scala.collection.immutable.Queue

trait JdbcUtils {
Expand Down Expand Up @@ -60,6 +60,42 @@ trait JdbcUtils {
}
}

private def createVersionTable(statement: Statement): Unit = {
statement.executeUpdate("CREATE TABLE IF NOT EXISTS versions (db_name TEXT NOT NULL PRIMARY KEY, version INTEGER NOT NULL)")
}

/**
* Several logical databases (channels, network, peers) may be stored in the same physical database.
* We keep track of their respective version using a dedicated table. The version entry will be created if
* there is none but will never be updated here (use setVersion to do that).
*/
def getVersion(statement: Statement, db_name: String): Option[Int] = {
createVersionTable(statement)
// if there was a previous version installed, this will return a different value from current version
val rs = statement.executeQuery(s"SELECT version FROM versions WHERE db_name='$db_name'")
if (rs.next()) Some(rs.getInt("version")) else None
}

/**
* Updates the version for a particular logical database, it will overwrite the previous version.
*
* NB: we could define this method in [[fr.acinq.eclair.db.sqlite.SqliteUtils]] and [[fr.acinq.eclair.db.pg.PgUtils]]
* but it would make testing more complicated because we need to use one or the other depending on the backend.
Comment on lines +82 to +83
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This design choice is debatable but I think it's worthwhile, less duplication and simpler tests.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a shared mechanism for all DB types, I think it's perfectly fine to have it here, especially if it makes testing easier.

*/
def setVersion(statement: Statement, db_name: String, newVersion: Int): Unit = {
createVersionTable(statement)
statement.getConnection match {
case _: SQLiteConnection =>
// if there was no version for the current db, then insert the current version
statement.executeUpdate(s"INSERT OR IGNORE INTO versions VALUES ('$db_name', $newVersion)")
// if there was an existing version, then previous step was a no-op, now we overwrite the existing version
statement.executeUpdate(s"UPDATE versions SET version=$newVersion WHERE db_name='$db_name'")
case _ => // if it isn't an sqlite connection, we assume it's postgres
// insert or update the version
statement.executeUpdate(s"INSERT INTO versions VALUES ('$db_name', $newVersion) ON CONFLICT (db_name) DO UPDATE SET version = EXCLUDED.version ;")
pm47 marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
* This helper assumes that there is a "data" column available, decodable with the provided codec
*
Expand Down
56 changes: 28 additions & 28 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala
Expand Up @@ -46,38 +46,38 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {

inTransaction { pg =>
using(pg.createStatement()) { statement =>
def migration45(statement: Statement): Int = {
statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed_trampoline (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, next_node_id TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_trampoline_timestamp_idx ON relayed_trampoline(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_trampoline_payment_hash_idx ON relayed_trampoline(payment_hash)")
def migration45(statement: Statement): Unit = {
pm47 marked this conversation as resolved.
Show resolved Hide resolved
statement.executeUpdate("CREATE TABLE relayed_trampoline (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, next_node_id TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE INDEX relayed_trampoline_timestamp_idx ON relayed_trampoline(timestamp)")
statement.executeUpdate("CREATE INDEX relayed_trampoline_payment_hash_idx ON relayed_trampoline(payment_hash)")
}

getVersion(statement, DB_NAME, CURRENT_VERSION) match {
case 4 =>
logger.warn(s"migrating db $DB_NAME, found version=4 current=$CURRENT_VERSION")
migration45(statement)
setVersion(statement, DB_NAME, CURRENT_VERSION)
case CURRENT_VERSION =>
statement.executeUpdate("CREATE TABLE IF NOT EXISTS sent (amount_msat BIGINT NOT NULL, fees_msat BIGINT NOT NULL, recipient_amount_msat BIGINT NOT NULL, payment_id TEXT NOT NULL, parent_payment_id TEXT NOT NULL, payment_hash TEXT NOT NULL, payment_preimage TEXT NOT NULL, recipient_node_id TEXT NOT NULL, to_channel_id TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS received (amount_msat BIGINT NOT NULL, payment_hash TEXT NOT NULL, from_channel_id TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, channel_id TEXT NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS relayed_trampoline (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, next_node_id TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS network_fees (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, tx_id TEXT NOT NULL, fee_sat BIGINT NOT NULL, tx_type TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_events (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, capacity_sat BIGINT NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS channel_errors (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, error_name TEXT NOT NULL, error_message TEXT NOT NULL, is_fatal BOOLEAN NOT NULL, timestamp BIGINT NOT NULL)")
getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE TABLE sent (amount_msat BIGINT NOT NULL, fees_msat BIGINT NOT NULL, recipient_amount_msat BIGINT NOT NULL, payment_id TEXT NOT NULL, parent_payment_id TEXT NOT NULL, payment_hash TEXT NOT NULL, payment_preimage TEXT NOT NULL, recipient_node_id TEXT NOT NULL, to_channel_id TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE received (amount_msat BIGINT NOT NULL, payment_hash TEXT NOT NULL, from_channel_id TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE relayed (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, channel_id TEXT NOT NULL, direction TEXT NOT NULL, relay_type TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE relayed_trampoline (payment_hash TEXT NOT NULL, amount_msat BIGINT NOT NULL, next_node_id TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE network_fees (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, tx_id TEXT NOT NULL, fee_sat BIGINT NOT NULL, tx_type TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE channel_events (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, capacity_sat BIGINT NOT NULL, is_funder BOOLEAN NOT NULL, is_private BOOLEAN NOT NULL, event TEXT NOT NULL, timestamp BIGINT NOT NULL)")
statement.executeUpdate("CREATE TABLE channel_errors (channel_id TEXT NOT NULL, node_id TEXT NOT NULL, error_name TEXT NOT NULL, error_message TEXT NOT NULL, is_fatal BOOLEAN NOT NULL, timestamp BIGINT NOT NULL)")

statement.executeUpdate("CREATE INDEX IF NOT EXISTS sent_timestamp_idx ON sent(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS received_timestamp_idx ON received(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_timestamp_idx ON relayed(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_payment_hash_idx ON relayed(payment_hash)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_trampoline_timestamp_idx ON relayed_trampoline(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS relayed_trampoline_payment_hash_idx ON relayed_trampoline(payment_hash)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS network_fees_timestamp_idx ON network_fees(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS channel_events_timestamp_idx ON channel_events(timestamp)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS channel_errors_timestamp_idx ON channel_errors(timestamp)")
case unknownVersion =>
throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
statement.executeUpdate("CREATE INDEX sent_timestamp_idx ON sent(timestamp)")
statement.executeUpdate("CREATE INDEX received_timestamp_idx ON received(timestamp)")
statement.executeUpdate("CREATE INDEX relayed_timestamp_idx ON relayed(timestamp)")
statement.executeUpdate("CREATE INDEX relayed_payment_hash_idx ON relayed(payment_hash)")
statement.executeUpdate("CREATE INDEX relayed_trampoline_timestamp_idx ON relayed_trampoline(timestamp)")
statement.executeUpdate("CREATE INDEX relayed_trampoline_payment_hash_idx ON relayed_trampoline(payment_hash)")
statement.executeUpdate("CREATE INDEX network_fees_timestamp_idx ON network_fees(timestamp)")
statement.executeUpdate("CREATE INDEX channel_events_timestamp_idx ON channel_events(timestamp)")
statement.executeUpdate("CREATE INDEX channel_errors_timestamp_idx ON channel_errors(timestamp)")
case Some(v@4) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration45(statement)
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
setVersion(statement, DB_NAME, CURRENT_VERSION)
}
}

Expand Down
36 changes: 19 additions & 17 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala
Expand Up @@ -40,27 +40,29 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit
val DB_NAME = "channels"
val CURRENT_VERSION = 3

def migration23(statement: Statement): Unit = {
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN created_timestamp BIGINT")
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN last_payment_sent_timestamp BIGINT")
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN last_payment_received_timestamp BIGINT")
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN last_connected_timestamp BIGINT")
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN closed_timestamp BIGINT")
}

inTransaction { pg =>
using(pg.createStatement()) { statement =>
getVersion(statement, DB_NAME, CURRENT_VERSION) match {
case 2 =>
logger.warn(s"migrating db $DB_NAME, found version=2 current=$CURRENT_VERSION")

def migration23(statement: Statement): Unit = {
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN created_timestamp BIGINT")
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN last_payment_sent_timestamp BIGINT")
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN last_payment_received_timestamp BIGINT")
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN last_connected_timestamp BIGINT")
statement.executeUpdate("ALTER TABLE local_channels ADD COLUMN closed_timestamp BIGINT")
}

getVersion(statement, DB_NAME) match {
pm47 marked this conversation as resolved.
Show resolved Hide resolved
case None =>
statement.executeUpdate("CREATE TABLE local_channels (channel_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT FALSE, created_timestamp BIGINT, last_payment_sent_timestamp BIGINT, last_payment_received_timestamp BIGINT, last_connected_timestamp BIGINT, closed_timestamp BIGINT)")
statement.executeUpdate("CREATE TABLE htlc_infos (channel_id TEXT NOT NULL, commitment_number TEXT NOT NULL, payment_hash TEXT NOT NULL, cltv_expiry BIGINT NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))")
statement.executeUpdate("CREATE INDEX htlc_infos_idx ON htlc_infos(channel_id, commitment_number)")
case Some(v@2) =>
logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION")
migration23(statement)
setVersion(statement, DB_NAME, CURRENT_VERSION)
case CURRENT_VERSION =>
statement.executeUpdate("CREATE TABLE IF NOT EXISTS local_channels (channel_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT FALSE, created_timestamp BIGINT, last_payment_sent_timestamp BIGINT, last_payment_received_timestamp BIGINT, last_connected_timestamp BIGINT, closed_timestamp BIGINT)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS htlc_infos (channel_id TEXT NOT NULL, commitment_number TEXT NOT NULL, payment_hash TEXT NOT NULL, cltv_expiry BIGINT NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS htlc_infos_idx ON htlc_infos(channel_id, commitment_number)")
case unknownVersion => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
setVersion(statement, DB_NAME, CURRENT_VERSION)
}
}

Expand Down
Expand Up @@ -39,13 +39,15 @@ class PgNetworkDb(implicit ds: DataSource) extends NetworkDb with Logging {

inTransaction { pg =>
using(pg.createStatement()) { statement =>
getVersion(statement, DB_NAME, CURRENT_VERSION) match {
case CURRENT_VERSION => () // nothing to do
case unknown => throw new IllegalArgumentException(s"unknown version $unknown for network db")
getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE TABLE nodes (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL)")
statement.executeUpdate("CREATE TABLE channels (short_channel_id BIGINT NOT NULL PRIMARY KEY, txid TEXT NOT NULL, channel_announcement BYTEA NOT NULL, capacity_sat BIGINT NOT NULL, channel_update_1 BYTEA NULL, channel_update_2 BYTEA NULL)")
statement.executeUpdate("CREATE TABLE pruned (short_channel_id BIGINT NOT NULL PRIMARY KEY)")
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
statement.executeUpdate("CREATE TABLE IF NOT EXISTS nodes (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS channels (short_channel_id BIGINT NOT NULL PRIMARY KEY, txid TEXT NOT NULL, channel_announcement BYTEA NOT NULL, capacity_sat BIGINT NOT NULL, channel_update_1 BYTEA NULL, channel_update_2 BYTEA NULL)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS pruned (short_channel_id BIGINT NOT NULL PRIMARY KEY)")
setVersion(statement, DB_NAME, CURRENT_VERSION)
}
}

Expand Down
23 changes: 12 additions & 11 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPaymentsDb.scala
Expand Up @@ -54,18 +54,19 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit

inTransaction { pg =>
using(pg.createStatement()) { statement =>

getVersion(statement, DB_NAME, CURRENT_VERSION) match {
case CURRENT_VERSION =>
statement.executeUpdate("CREATE TABLE IF NOT EXISTS received_payments (payment_hash TEXT NOT NULL PRIMARY KEY, payment_type TEXT NOT NULL, payment_preimage TEXT NOT NULL, payment_request TEXT NOT NULL, received_msat BIGINT, created_at BIGINT NOT NULL, expire_at BIGINT NOT NULL, received_at BIGINT)")
statement.executeUpdate("CREATE TABLE IF NOT EXISTS sent_payments (id TEXT NOT NULL PRIMARY KEY, parent_id TEXT NOT NULL, external_id TEXT, payment_hash TEXT NOT NULL, payment_preimage TEXT, payment_type TEXT NOT NULL, amount_msat BIGINT NOT NULL, fees_msat BIGINT, recipient_amount_msat BIGINT NOT NULL, recipient_node_id TEXT NOT NULL, payment_request TEXT, payment_route BYTEA, failures BYTEA, created_at BIGINT NOT NULL, completed_at BIGINT)")

statement.executeUpdate("CREATE INDEX IF NOT EXISTS sent_parent_id_idx ON sent_payments(parent_id)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS sent_payment_hash_idx ON sent_payments(payment_hash)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS sent_created_idx ON sent_payments(created_at)")
statement.executeUpdate("CREATE INDEX IF NOT EXISTS received_created_idx ON received_payments(created_at)")
case unknownVersion => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE TABLE received_payments (payment_hash TEXT NOT NULL PRIMARY KEY, payment_type TEXT NOT NULL, payment_preimage TEXT NOT NULL, payment_request TEXT NOT NULL, received_msat BIGINT, created_at BIGINT NOT NULL, expire_at BIGINT NOT NULL, received_at BIGINT)")
statement.executeUpdate("CREATE TABLE sent_payments (id TEXT NOT NULL PRIMARY KEY, parent_id TEXT NOT NULL, external_id TEXT, payment_hash TEXT NOT NULL, payment_preimage TEXT, payment_type TEXT NOT NULL, amount_msat BIGINT NOT NULL, fees_msat BIGINT, recipient_amount_msat BIGINT NOT NULL, recipient_node_id TEXT NOT NULL, payment_request TEXT, payment_route BYTEA, failures BYTEA, created_at BIGINT NOT NULL, completed_at BIGINT)")

statement.executeUpdate("CREATE INDEX sent_parent_id_idx ON sent_payments(parent_id)")
statement.executeUpdate("CREATE INDEX sent_payment_hash_idx ON sent_payments(payment_hash)")
statement.executeUpdate("CREATE INDEX sent_created_idx ON sent_payments(created_at)")
statement.executeUpdate("CREATE INDEX received_created_idx ON received_payments(created_at)")
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
setVersion(statement, DB_NAME, CURRENT_VERSION)
}
}

Expand Down
Expand Up @@ -38,8 +38,13 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb {

inTransaction { pg =>
using(pg.createStatement()) { statement =>
require(getVersion(statement, DB_NAME, CURRENT_VERSION) == CURRENT_VERSION, s"incompatible version of $DB_NAME DB found") // there is only one version currently deployed
statement.executeUpdate("CREATE TABLE IF NOT EXISTS peers (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL)")
getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE TABLE peers (node_id TEXT NOT NULL PRIMARY KEY, data BYTEA NOT NULL)")
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
setVersion(statement, DB_NAME, CURRENT_VERSION)
}
}

Expand Down