Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Live channel database backup #951

Merged
merged 17 commits into from Apr 19, 2019
Merged
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -18,6 +18,8 @@ eclair {

watcher-type = "bitcoind" // other *experimental* values include "electrum"

backup-file = "eclair.sqlite.backup"
This conversation was marked as resolved by pm47

This comment has been minimized.

Copy link
@pm47

pm47 Apr 17, 2019

Member
Suggested change
backup-file = "eclair.sqlite.backup"
backup-file = "eclair.sqlite.bak"

Not sure why we need to make this configurable while other files (e.g. eclair.sqlite) have a fixed name.


bitcoind {
host = "localhost"
rpcport = 18332
@@ -43,7 +43,7 @@ import fr.acinq.eclair.blockchain.fee.{ConstantFeeProvider, _}
import fr.acinq.eclair.blockchain.{EclairWallet, _}
import fr.acinq.eclair.channel.Register
import fr.acinq.eclair.crypto.LocalKeyManager
import fr.acinq.eclair.db.Databases
import fr.acinq.eclair.db.{BackupHandler, Databases}
import fr.acinq.eclair.io.{Authenticator, Server, Switchboard}
import fr.acinq.eclair.payment._
import fr.acinq.eclair.router._
@@ -220,11 +220,11 @@ class Setup(datadir: File,
routerTimeout = after(FiniteDuration(config.getDuration("router.init-timeout").getSeconds, TimeUnit.SECONDS), using = system.scheduler)(Future.failed(new RuntimeException("Router initialization timed out")))
_ <- Future.firstCompletedOf(routerInitialized.future :: routerTimeout :: Nil)

chaindir = new File(datadir, chain)
This conversation was marked as resolved by pm47

This comment has been minimized.

Copy link
@pm47

pm47 Apr 17, 2019

Member

There must be a better way to do this than to redefine the directory here. There was even a TODO comment that you missed when you moved it.

wallet = bitcoin match {
case Bitcoind(bitcoinClient) => new BitcoinCoreWallet(bitcoinClient)
case Electrum(electrumClient) =>
// TODO: DRY
val chaindir = new File(datadir, chain)
val sqlite = DriverManager.getConnection(s"jdbc:sqlite:${new File(chaindir, "wallet.sqlite")}")
val walletDb = new SqliteWalletDb(sqlite)
val electrumWallet = system.actorOf(ElectrumWallet.props(seed, electrumClient, ElectrumWallet.WalletParameters(nodeParams.chainHash, walletDb)), "electrum-wallet")
@@ -234,7 +234,15 @@ class Setup(datadir: File,
_ = wallet.getFinalAddress.map {
case address => logger.info(s"initial wallet address=$address")
}

backupHandler = system.actorOf(
SimpleSupervisor.props(Props(
new BackupHandler(
nodeParams.db,
new File(chaindir, config.getString("backup-file").concat(".wip")),
new File(chaindir, config.getString("backup-file"))
)
), "backup", SupervisorStrategy.Resume)
)
audit = system.actorOf(SimpleSupervisor.props(Auditor.props(nodeParams), "auditor", SupervisorStrategy.Resume))
paymentHandler = system.actorOf(SimpleSupervisor.props(config.getString("payment-handler") match {
case "local" => LocalPaymentHandler.props(nodeParams)
@@ -0,0 +1,87 @@
package fr.acinq.eclair.db

import java.io.File

import akka.actor.{Actor, ActorLogging, Status}
import akka.pattern.pipe
import fr.acinq.eclair.channel.ChannelPersisted
import grizzled.slf4j.{Logger, Logging}

import scala.concurrent.{ExecutionContext, Future}

class BackupHandler(databases: Databases, wip: File, destination: File)(implicit ec: ExecutionContext) extends Actor with ActorLogging {
import BackupHandler._

// we listen to ChannelPersisted events, which will trigger a backup
override def preStart(): Unit = {
super.preStart()
context.system.eventStream.subscribe(self, classOf[ChannelPersisted])
This conversation was marked as resolved by pm47

This comment has been minimized.

Copy link
@pm47

pm47 Apr 17, 2019

Member

This may be better to suscribe to events here, but we do it differently in all other actors. Maybe stay consistent for now and do another PR to square all actors up at the same time?

}

override def unhandled(message: Any): Unit = {
super.unhandled(message)
log.warning(s"unhandled message $message")
This conversation was marked as resolved by pm47

This comment has been minimized.

Copy link
@pm47

pm47 Apr 17, 2019

Member

Logging of unhandled messages should be done using akka conf.

}

def receive = idle

// idle mode: start backup as soon as we receive an event
def idle: Receive = {
case persisted: ChannelPersisted =>
doBackup(databases, wip, destination, persisted) pipeTo self
context become busy(None)
}

// busy mode: there is already a backup in progress
// `again` tells us if we have to do another one when we're done with the current one
def busy(again: Option[ChannelPersisted]): Receive = {
case persisted: ChannelPersisted =>
log.info(s"replacing pending backup with a newer one")
context become busy(Some(persisted))

case Done if again.isDefined =>
This conversation was marked as resolved by pm47

This comment has been minimized.

Copy link
@pm47

pm47 Apr 17, 2019

Member

I thought we agree to stop or at least limit doing several handlers for the same message? How about:

case Done =>
  again match {
    case Some(persisted) =>  
      doBackup(databases, wip, destination, again.get) pipeTo self
      context become busy(None)
    case None =>
      context become idle
doBackup(databases, wip, destination, again.get) pipeTo self
context become busy(None)

case Done =>
context become idle

// we use the pipe pattern so errors are wrapped in akka.Status.Failure
case Status.Failure(cause) if again.isDefined =>

This comment has been minimized.

Copy link
@pm47

pm47 Apr 17, 2019

Member

Have you considered using a dedicated thread for this actor instead?

log.error("database backup failed: {}", cause)
doBackup(databases, wip, destination, again.get) pipeTo self
context become busy(None)

case Status.Failure(cause) =>
log.error("database backup failed: {}", cause)
context become idle
}
}

object BackupHandler extends Logging {
This conversation was marked as resolved by pm47

This comment has been minimized.

Copy link
@pm47

pm47 Apr 17, 2019

Member

Using contextless logging should be discouraged, in favor of passing ActorLogging parameter as implicit to companion object methods (like we did in Commitment).


case object Done
This conversation was marked as resolved by pm47

This comment has been minimized.

Copy link
@pm47

pm47 Apr 17, 2019

Member

Why not use akka.Done?

/**
* Database backup of our database to a destination file.
* Backup is done is 2 steps: first it it written to a wip file, then this file is renamed to the destination file.
* Renaming/moving a file * should * be atomic so the destination file is always consistent and usable but this is
* implementation dependent and it may not be the case with some file systems and/or operating systems
*
* @param databases database to backup
* @param wip work-in-progress file
* @param destination destination file
* @param persisted channel persistence event that triggered the backup
* @param ec execution context
* @return `Done` if backup was successful
*/
def doBackup(databases: Databases, wip: File, destination: File, persisted: ChannelPersisted)(implicit ec: ExecutionContext) : Future[BackupHandler.Done.type] = Future {
logger.info(s" database backup triggered by ${persisted.channelId} to $destination starts")
val start = System.currentTimeMillis()
databases.backup(wip)
val result = wip.renameTo(destination)
require(result, s"cannot rename $wip to $destination")
val end = System.currentTimeMillis()
logger.info(s" database backup triggered by ${persisted.channelId} to $destination done in ${end - start} ms")
Done
}
}
@@ -19,6 +19,7 @@ trait Databases {

val pendingRelay: PendingRelayDb

def backup(file: File) : Unit
}

object Databases {
@@ -45,6 +46,12 @@ object Databases {
override val peers = new SqlitePeersDb(eclairJdbc)
override val payments = new SqlitePaymentsDb(eclairJdbc)
override val pendingRelay = new SqlitePendingRelayDb(eclairJdbc)
override def backup(file: File): Unit = {
This conversation was marked as resolved by pm47

This comment has been minimized.

Copy link
@pm47

pm47 Apr 17, 2019

Member

This API makes the assumption that we will only use a single file to store the databases, which is a bit strange.

SqliteUtils.using(eclairJdbc.createStatement()) {
statement => {
statement.executeUpdate(s"backup to ${file.getAbsolutePath}")
}
}
}
}

}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.