Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Live channel database backup #951

Merged
merged 17 commits into from Apr 19, 2019
Merged
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

Backup handler: use a custom mailbox

This is an alternate design for handling multiple backup events: we use
a custom mailbox with a max capacity of 1 message (ignore messages go directly to dead-letters)
and use synchronous backups in the actual event handler.
  • Loading branch information...
sstone committed Apr 17, 2019
commit 604bf87d26793de85c8c6b697252d4f438b63841
@@ -18,7 +18,13 @@ eclair {

watcher-type = "bitcoind" // other *experimental* values include "electrum"

backup-file = "eclair.sqlite.backup"
// specific mail box for our backup handler, bounded to 1 message
// DO NOT overwrite these settings
backup-mailbox {
mailbox-type = "akka.dispatch.BoundedMailbox"
mailbox-capacity = 1
mailbox-push-timeout-time = 0
}

bitcoind {
host = "localhost"
@@ -235,13 +235,12 @@ class Setup(datadir: File,
case address => logger.info(s"initial wallet address=$address")
}
backupHandler = system.actorOf(
SimpleSupervisor.props(Props(
new BackupHandler(
SimpleSupervisor.props(
BackupHandler.props(
nodeParams.db,
new File(chaindir, config.getString("backup-file").concat(".wip")),
new File(chaindir, config.getString("backup-file"))
)
), "backup", SupervisorStrategy.Resume)
new File(chaindir, "eclair.bak.wip"),
This conversation was marked as resolved by pm47

This comment has been minimized.

Copy link
@pm47

pm47 Apr 18, 2019

Member
Suggested change
new File(chaindir, "eclair.bak.wip"),
new File(chaindir, "eclair.bak.tmp"),
new File(chaindir, "eclair.bak")
), "backup", SupervisorStrategy.Resume)
)
audit = system.actorOf(SimpleSupervisor.props(Auditor.props(nodeParams), "auditor", SupervisorStrategy.Resume))
paymentHandler = system.actorOf(SimpleSupervisor.props(config.getString("payment-handler") match {
@@ -353,8 +352,11 @@ class Setup(datadir: File,

// @formatter:off
sealed trait Bitcoin

This conversation was marked as resolved by pm47

This comment has been minimized.

Copy link
@pm47

pm47 Apr 18, 2019

Member

Formatting issue

case class Bitcoind(bitcoinClient: BasicBitcoinJsonRPCClient) extends Bitcoin

case class Electrum(electrumClient: ActorRef) extends Bitcoin

// @formatter:on

case class Kit(nodeParams: NodeParams,
@@ -2,86 +2,45 @@ package fr.acinq.eclair.db

import java.io.File

import akka.actor.{Actor, ActorLogging, Status}
import akka.pattern.pipe
import akka.actor.{Actor, ActorLogging, Props}
import fr.acinq.eclair.channel.ChannelPersisted
import grizzled.slf4j.{Logger, Logging}

import scala.concurrent.{ExecutionContext, Future}

class BackupHandler(databases: Databases, wip: File, destination: File)(implicit ec: ExecutionContext) extends Actor with ActorLogging {
import BackupHandler._
/**
* README !
* This actor will synchronously make a backup of the database it was initialized with whenever it receives
* a ChannelPersisted event.
* To avoid piling up messages and entering an endless backup loop, it is supposed to be used with a bounded mailbox
* with a single item:
*
* backup-mailbox {
This conversation was marked as resolved by pm47

This comment has been minimized.

Copy link
@pm47

pm47 Apr 18, 2019

Member

Isn't there a way to enforce this configuration? That seems a bit error prone.

Overall the pattern is pretty nice.

* mailbox-type = "akka.dispatch.BoundedMailbox"
* mailbox-capacity = 1
* mailbox-push-timeout-time = 0
* }
*
* Messages that cannot be processed will be sent to dead letters
*
* @param databases database to backup
* @param wip work-in-progress file
This conversation was marked as resolved by sstone

This comment has been minimized.

Copy link
@pm47

pm47 Apr 18, 2019

Member
Suggested change
* @param wip work-in-progress file
* @param tmpFile temporary file
* @param destination destination file
This conversation was marked as resolved by sstone

This comment has been minimized.

Copy link
@pm47

pm47 Apr 18, 2019

Member
Suggested change
* @param destination destination file
* @param backupFile final backup file
*/
class BackupHandler(databases: Databases, wip: File, destination: File) extends Actor with ActorLogging {
This conversation was marked as resolved by sstone

This comment has been minimized.

Copy link
@pm47

pm47 Apr 18, 2019

Member
Suggested change
class BackupHandler(databases: Databases, wip: File, destination: File) extends Actor with ActorLogging {
class BackupHandler(databases: Databases, tmpFile: File, backupFile: File) extends Actor with ActorLogging {

// we listen to ChannelPersisted events, which will trigger a backup
override def preStart(): Unit = {
super.preStart()
context.system.eventStream.subscribe(self, classOf[ChannelPersisted])
}

override def unhandled(message: Any): Unit = {
super.unhandled(message)
log.warning(s"unhandled message $message")
}

def receive = idle
context.system.eventStream.subscribe(self, classOf[ChannelPersisted])

// idle mode: start backup as soon as we receive an event
def idle: Receive = {
def receive = {
case persisted: ChannelPersisted =>
doBackup(databases, wip, destination, persisted) pipeTo self
context become busy(None)
}

// busy mode: there is already a backup in progress
// `again` tells us if we have to do another one when we're done with the current one
def busy(again: Option[ChannelPersisted]): Receive = {
case persisted: ChannelPersisted =>
log.info(s"replacing pending backup with a newer one")
context become busy(Some(persisted))

case Done if again.isDefined =>
doBackup(databases, wip, destination, again.get) pipeTo self
context become busy(None)

case Done =>
context become idle

// we use the pipe pattern so errors are wrapped in akka.Status.Failure
case Status.Failure(cause) if again.isDefined =>
log.error("database backup failed: {}", cause)
doBackup(databases, wip, destination, again.get) pipeTo self
context become busy(None)

case Status.Failure(cause) =>
log.error("database backup failed: {}", cause)
context become idle
val start = System.currentTimeMillis()
databases.backup(wip)
This conversation was marked as resolved by sstone

This comment has been minimized.

Copy link
@pm47

pm47 Apr 18, 2019

Member
Suggested change
databases.backup(wip)
databases.backup(tmpFile)
val result = wip.renameTo(destination)
This conversation was marked as resolved by sstone

This comment has been minimized.

Copy link
@pm47

pm47 Apr 18, 2019

Member
Suggested change
val result = wip.renameTo(destination)
val result = tmpFile.renameTo(backupFile)

This comment has been minimized.

Copy link
@pm47

pm47 Apr 18, 2019

Member

From javadoc:

Many aspects of the behavior of this method are inherently platform-dependent: The rename operation might not be able to move a file from one filesystem to another, it might not be atomic, and it might not succeed if a file with the destination abstract pathname already exists. The return value should always be checked to make sure that the rename operation was successful.

require(result, s"cannot rename $wip to $destination")
This conversation was marked as resolved by sstone

This comment has been minimized.

Copy link
@pm47

pm47 Apr 18, 2019

Member
Suggested change
require(result, s"cannot rename $wip to $destination")
require(result, s"cannot rename $tmpFile to $backupFile")
val end = System.currentTimeMillis()
log.info(s" database backup triggered by ${persisted.channelId} to $destination done in ${end - start} ms")
This conversation was marked as resolved by sstone

This comment has been minimized.

Copy link
@pm47

pm47 Apr 18, 2019

Member
Suggested change
log.info(s" database backup triggered by ${persisted.channelId} to $destination done in ${end - start} ms")
log.info(s"database backup triggered by channelId=${persisted.channelId} took ${end - start}ms")
}
}

object BackupHandler extends Logging {

case object Done
/**
* Database backup of our database to a destination file.
* Backup is done is 2 steps: first it it written to a wip file, then this file is renamed to the destination file.
* Renaming/moving a file * should * be atomic so the destination file is always consistent and usable but this is
* implementation dependent and it may not be the case with some file systems and/or operating systems
*
* @param databases database to backup
* @param wip work-in-progress file
* @param destination destination file
* @param persisted channel persistence event that triggered the backup
* @param ec execution context
* @return `Done` if backup was successful
*/
def doBackup(databases: Databases, wip: File, destination: File, persisted: ChannelPersisted)(implicit ec: ExecutionContext) : Future[BackupHandler.Done.type] = Future {
logger.info(s" database backup triggered by ${persisted.channelId} to $destination starts")
val start = System.currentTimeMillis()
databases.backup(wip)
val result = wip.renameTo(destination)
require(result, s"cannot rename $wip to $destination")
val end = System.currentTimeMillis()
logger.info(s" database backup triggered by ${persisted.channelId} to $destination done in ${end - start} ms")
Done
}
object BackupHandler {
def props(databases: Databases, wip: File, destination: File) = Props(new BackupHandler(databases: Databases, wip: File, destination: File)).withMailbox("eclair.backup-mailbox")
This conversation was marked as resolved by sstone

This comment has been minimized.

Copy link
@pm47

pm47 Apr 18, 2019

Member
Suggested change
def props(databases: Databases, wip: File, destination: File) = Props(new BackupHandler(databases: Databases, wip: File, destination: File)).withMailbox("eclair.backup-mailbox")
def props(databases: Databases, tmpFile: File, backupFile: File) = Props(new BackupHandler(databases: Databases, wip: File, destination: File)).withMailbox("eclair.backup-mailbox")
}
@@ -82,7 +82,7 @@ object SqliteUtils {
* Obtain an exclusive lock on a sqlite database. This is useful when we want to make sure that only one process
* accesses the database file (see https://www.sqlite.org/pragma.html).
*
* The lock will be kept until the database is closed, or if the locking mode is explicitely reset.
* The lock will be kept until the database is closed, or if the locking mode is explicitly reset.
*
* @param sqlite
*/
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.