Skip to content

Commit

Permalink
Add a globalbalance api call (#1737)
Browse files Browse the repository at this point in the history
It returns an overall balance, separating onchain, offchain, and
removing duplicates (e.g. mutual closes that haven't reached min depth
still have an associated channel, but they already appear in the
on-chain balance). We also take into account known preimages, even if
the htlc hasn't been formally resolved.

Metrics have also been added.

Co-authored-by: Bastien Teinturier <31281497+t-bast@users.noreply.github.com>
  • Loading branch information
pm47 and t-bast committed Jul 8, 2021
1 parent 3a573e2 commit bd57d41
Show file tree
Hide file tree
Showing 24 changed files with 941 additions and 104 deletions.
11 changes: 5 additions & 6 deletions eclair-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,11 @@
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<dependency>
<groupId>com.softwaremill.quicklens</groupId>
<artifactId>quicklens_${scala.version.short}</artifactId>
<version>1.5.0</version>
</dependency>
<!-- MONITORING -->
<dependency>
<groupId>io.kamon</groupId>
Expand All @@ -264,12 +269,6 @@
<version>${kamon.version}</version>
</dependency>
<!-- TESTS -->
<dependency>
<groupId>com.softwaremill.quicklens</groupId>
<artifactId>quicklens_${scala.version.short}</artifactId>
<version>1.5.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_${scala.version.short}</artifactId>
Expand Down
2 changes: 2 additions & 0 deletions eclair-core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ eclair {
reserve-to-funding-ratio = 0.01 // recommended by BOLT #2
max-reserve-to-funding-ratio = 0.05 // channel reserve can't be more than 5% of the funding amount (recommended: 1%)

balance-check-interval = 1 hour

to-remote-delay-blocks = 720 // number of blocks that the other node's to-self outputs must be delayed (720 ~ 5 days)
max-to-local-delay-blocks = 2016 // maximum number of blocks that we are ready to accept for our own delayed outputs (2016 ~ 2 weeks)
mindepth-blocks = 3
Expand Down
23 changes: 19 additions & 4 deletions eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@
package fr.acinq.eclair

import akka.actor.ActorRef
import akka.actor.typed.scaladsl.AskPattern.Askable
import akka.actor.typed.scaladsl.adapter.ClassicSchedulerOps
import akka.pattern._
import akka.util.Timeout
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.{ByteVector32, ByteVector64, Crypto, Satoshi}
import fr.acinq.eclair.TimestampQueryFilters._
import fr.acinq.eclair.balance.CheckBalance.GlobalBalance
import fr.acinq.eclair.balance.{BalanceActor, ChannelsListener}
import fr.acinq.eclair.blockchain.OnChainBalance
import fr.acinq.eclair.blockchain.bitcoind.BitcoinCoreWallet
import fr.acinq.eclair.blockchain.bitcoind.BitcoinCoreWallet.WalletTransaction
Expand All @@ -39,12 +43,13 @@ import fr.acinq.eclair.payment.send.PaymentInitiator.{SendPayment, SendPaymentTo
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.router.{NetworkStats, RouteCalculation, Router}
import fr.acinq.eclair.wire.protocol._
import grizzled.slf4j.Logging
import scodec.bits.ByteVector

import java.nio.charset.StandardCharsets
import java.util.UUID
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.reflect.ClassTag

case class GetInfoResponse(version: String, nodeId: PublicKey, alias: String, color: String, features: Features, chainHash: ByteVector32, network: String, blockHeight: Int, publicAddresses: Seq[NodeAddress], instanceId: String)
Expand All @@ -59,9 +64,9 @@ case class VerifiedMessage(valid: Boolean, publicKey: PublicKey)

object TimestampQueryFilters {
/** We use this in the context of timestamp filtering, when we don't need an upper bound. */
val MaxEpochMilliseconds = Duration.fromNanos(Long.MaxValue).toMillis
val MaxEpochMilliseconds: Long = Duration.fromNanos(Long.MaxValue).toMillis

def getDefaultTimestampFilters(from_opt: Option[Long], to_opt: Option[Long]) = {
def getDefaultTimestampFilters(from_opt: Option[Long], to_opt: Option[Long]): TimestampQueryFilters = {
// NB: we expect callers to use seconds, but internally we use milli-seconds everywhere.
val from = from_opt.getOrElse(0L).seconds.toMillis
val to = to_opt.map(_.seconds.toMillis).getOrElse(MaxEpochMilliseconds)
Expand Down Expand Up @@ -149,12 +154,14 @@ trait Eclair {

def onChainTransactions(count: Int, skip: Int): Future[Iterable[WalletTransaction]]

def globalBalance()(implicit timeout: Timeout): Future[GlobalBalance]

def signMessage(message: ByteVector): SignedMessage

def verifyMessage(message: ByteVector, recoverableSignature: ByteVector): VerifiedMessage
}

class EclairImpl(appKit: Kit) extends Eclair {
class EclairImpl(appKit: Kit) extends Eclair with Logging {

implicit val ec: ExecutionContext = appKit.system.dispatcher

Expand Down Expand Up @@ -428,6 +435,14 @@ class EclairImpl(appKit: Kit) extends Eclair {
override def usableBalances()(implicit timeout: Timeout): Future[Iterable[UsableBalance]] =
(appKit.relayer ? GetOutgoingChannels()).mapTo[OutgoingChannels].map(_.channels.map(_.toUsableBalance))

override def globalBalance()(implicit timeout: Timeout): Future[GlobalBalance] = {
for {
ChannelsListener.GetChannelsResponse(channels) <- appKit.channelsListener.ask(ref => ChannelsListener.GetChannels(ref))(timeout, appKit.system.scheduler.toTyped)
globalBalance_try <- appKit.balanceActor.ask(res => BalanceActor.GetGlobalBalance(res, channels))(timeout, appKit.system.scheduler.toTyped)
globalBalance <- Promise[GlobalBalance]().complete(globalBalance_try).future
} yield globalBalance
}

override def signMessage(message: ByteVector): SignedMessage = {
val bytesToSign = SignedMessage.signedBytes(message)
val (signature, recoveryId) = appKit.nodeParams.nodeKeyManager.signDigest(bytesToSign)
Expand Down
6 changes: 4 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ case class NodeParams(nodeKeyManager: NodeKeyManager,
routerConf: RouterConf,
socksProxy_opt: Option[Socks5ProxyParams],
maxPaymentAttempts: Int,
enableTrampolinePayment: Boolean) {
enableTrampolinePayment: Boolean,
balanceCheckInterval: FiniteDuration) {
val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey

val nodeId: PublicKey = nodeKeyManager.nodeId
Expand Down Expand Up @@ -394,7 +395,8 @@ object NodeParams extends Logging {
),
socksProxy_opt = socksProxy_opt,
maxPaymentAttempts = config.getInt("max-payment-attempts"),
enableTrampolinePayment = config.getBoolean("trampoline-payments-enable")
enableTrampolinePayment = config.getBoolean("trampoline-payments-enable"),
balanceCheckInterval = FiniteDuration(config.getDuration("balance-check-interval").getSeconds, TimeUnit.SECONDS)
)
}
}
15 changes: 13 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ import akka.util.Timeout
import com.softwaremill.sttp.okhttp.OkHttpFutureBackend
import fr.acinq.bitcoin.{Block, ByteVector32, Satoshi}
import fr.acinq.eclair.Setup.Seeds
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.balance.{BalanceActor, ChannelsListener}
import fr.acinq.eclair.blockchain.bitcoind.rpc.{BasicBitcoinJsonRPCClient, BatchingBitcoinJsonRPCClient, ExtendedBitcoinClient}
import fr.acinq.eclair.blockchain.bitcoind.zmq.ZMQActor
import fr.acinq.eclair.blockchain.bitcoind.{BitcoinCoreWallet, ZmqWatcher}
import fr.acinq.eclair.blockchain.fee._
import fr.acinq.eclair.blockchain.fee.{ConstantFeeProvider, _}
import fr.acinq.eclair.blockchain.{EclairWallet, _}
import fr.acinq.eclair.channel.{Channel, Register}
import fr.acinq.eclair.crypto.WeakEntropyPool
import fr.acinq.eclair.crypto.keymanager.{LocalChannelKeyManager, LocalNodeKeyManager}
Expand Down Expand Up @@ -194,6 +195,7 @@ class Setup(val datadir: File,
tcpBound = Promise[Done]()
routerInitialized = Promise[Done]()
postRestartCleanUpInitialized = Promise[Done]()
channelsListenerReady = Promise[Done]()

defaultFeerates = {
val confDefaultFeerates = FeeratesPerKB(
Expand Down Expand Up @@ -249,6 +251,9 @@ class Setup(val datadir: File,
wallet = new BitcoinCoreWallet(bitcoin)
_ = wallet.getReceiveAddress().map(address => logger.info(s"initial wallet address=$address"))

channelsListener = system.spawn(ChannelsListener(channelsListenerReady), name = "channels-listener")
_ <- channelsListenerReady.future

_ = if (config.getBoolean("file-backup.enabled")) {
nodeParams.db match {
case fileBackup: FileBackup if config.getBoolean("file-backup.enabled") =>
Expand Down Expand Up @@ -283,6 +288,8 @@ class Setup(val datadir: File,
paymentInitiator = system.actorOf(SimpleSupervisor.props(PaymentInitiator.props(nodeParams, PaymentInitiator.SimplePaymentFactory(nodeParams, router, register)), "payment-initiator", SupervisorStrategy.Restart))
_ = for (i <- 0 until config.getInt("autoprobe-count")) yield system.actorOf(SimpleSupervisor.props(Autoprobe.props(nodeParams, router, paymentInitiator), s"payment-autoprobe-$i", SupervisorStrategy.Restart))

balanceActor = system.spawn(BalanceActor(nodeParams.db, extendedBitcoinClient, channelsListener, nodeParams.balanceCheckInterval), name = "balance-actor")

kit = Kit(
nodeParams = nodeParams,
system = system,
Expand All @@ -294,6 +301,8 @@ class Setup(val datadir: File,
switchboard = switchboard,
paymentInitiator = paymentInitiator,
server = server,
channelsListener = channelsListener,
balanceActor = balanceActor,
wallet = wallet)

zmqBlockTimeout = after(5 seconds, using = system.scheduler)(Future.failed(BitcoinZMQConnectionTimeoutException))
Expand Down Expand Up @@ -360,6 +369,8 @@ case class Kit(nodeParams: NodeParams,
switchboard: ActorRef,
paymentInitiator: ActorRef,
server: ActorRef,
channelsListener: typed.ActorRef[ChannelsListener.Command],
balanceActor: typed.ActorRef[BalanceActor.Command],
wallet: EclairWallet)

object Kit {
Expand Down
139 changes: 139 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/balance/BalanceActor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package fr.acinq.eclair.balance

import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.eclair.balance.BalanceActor._
import fr.acinq.eclair.balance.CheckBalance.GlobalBalance
import fr.acinq.eclair.balance.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient
import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient.Utxo
import fr.acinq.eclair.channel.HasCommitments
import fr.acinq.eclair.db.Databases
import grizzled.slf4j.Logger
import org.json4s.JsonAST.JInt

import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

object BalanceActor {

// @formatter:off
sealed trait Command
private final case object TickBalance extends Command
final case class GetGlobalBalance(replyTo: ActorRef[Try[GlobalBalance]], channels: Map[ByteVector32, HasCommitments]) extends Command
private final case class WrappedChannels(wrapped: ChannelsListener.GetChannelsResponse) extends Command
private final case class WrappedGlobalBalance(wrapped: Try[GlobalBalance]) extends Command
private final case class WrappedUtxoInfo(wrapped: Try[UtxoInfo]) extends Command
// @formatter:on

def apply(db: Databases, extendedBitcoinClient: ExtendedBitcoinClient, channelsListener: ActorRef[ChannelsListener.GetChannels], interval: FiniteDuration)(implicit ec: ExecutionContext): Behavior[Command] = {
Behaviors.setup { context =>
Behaviors.withTimers { timers =>
timers.startTimerWithFixedDelay(TickBalance, interval)
new BalanceActor(context, db, extendedBitcoinClient, channelsListener).apply(refBalance_opt = None)
}
}
}

final case class UtxoInfo(utxos: Seq[Utxo], ancestorCount: Map[ByteVector32, Long])

def checkUtxos(extendedBitcoinClient: ExtendedBitcoinClient)(implicit ec: ExecutionContext): Future[UtxoInfo] = {

def getUnconfirmedAncestorCount(utxo: Utxo): Future[(ByteVector32, Long)] = extendedBitcoinClient.rpcClient.invoke("getmempoolentry", utxo.txid).map(json => {
val JInt(ancestorCount) = json \ "ancestorcount"
(utxo.txid, ancestorCount.toLong)
}).recover {
case ex: Throwable =>
// a bit hackish but we don't need the actor context for this simple log
val log = Logger(classOf[BalanceActor])
log.warn(s"could not retrieve unconfirmed ancestor count for txId=${utxo.txid} amount=${utxo.amount}:", ex)
(utxo.txid, 0)
}

def getUnconfirmedAncestorCountMap(utxos: Seq[Utxo]): Future[Map[ByteVector32, Long]] = Future.sequence(utxos.filter(_.confirmations == 0).map(getUnconfirmedAncestorCount)).map(_.toMap)

for {
utxos <- extendedBitcoinClient.listUnspent()
ancestorCount <- getUnconfirmedAncestorCountMap(utxos)
} yield UtxoInfo(utxos, ancestorCount)
}

}

private class BalanceActor(context: ActorContext[Command],
db: Databases,
extendedBitcoinClient: ExtendedBitcoinClient,
channelsListener: ActorRef[ChannelsListener.GetChannels])(implicit ec: ExecutionContext) {

private val log = context.log

def apply(refBalance_opt: Option[GlobalBalance]): Behavior[Command] = Behaviors.receiveMessage {
case TickBalance =>
log.debug("checking balance...")
channelsListener ! ChannelsListener.GetChannels(context.messageAdapter[ChannelsListener.GetChannelsResponse](WrappedChannels))
context.pipeToSelf(checkUtxos(extendedBitcoinClient))(WrappedUtxoInfo)
Behaviors.same
case WrappedChannels(res) =>
context.pipeToSelf(CheckBalance.computeGlobalBalance(res.channels, db, extendedBitcoinClient))(WrappedGlobalBalance)
Behaviors.same
case WrappedGlobalBalance(res) =>
res match {
case Success(result) =>
log.info("current balance: total={} onchain.confirmed={} onchain.unconfirmed={} offchain={}", result.total.toDouble, result.onChain.confirmed.toDouble, result.onChain.unconfirmed.toDouble, result.offChain.total.toDouble)
log.debug("current balance details : {}", result)
Metrics.GlobalBalance.withoutTags().update(result.total.toMilliBtc.toDouble)
Metrics.GlobalBalanceDetailed.withTag(Tags.BalanceType, Tags.BalanceTypes.OnchainConfirmed).update(result.onChain.confirmed.toMilliBtc.toLong)
Metrics.GlobalBalanceDetailed.withTag(Tags.BalanceType, Tags.BalanceTypes.OnchainUnconfirmed).update(result.onChain.unconfirmed.toMilliBtc.toLong)
Metrics.GlobalBalanceDetailed.withTag(Tags.BalanceType, Tags.BalanceTypes.Offchain).withTag(Tags.OffchainState, Tags.OffchainStates.waitForFundingConfirmed).update(result.offChain.waitForFundingConfirmed.toMilliBtc.toLong)
Metrics.GlobalBalanceDetailed.withTag(Tags.BalanceType, Tags.BalanceTypes.Offchain).withTag(Tags.OffchainState, Tags.OffchainStates.waitForFundingLocked).update(result.offChain.waitForFundingLocked.toMilliBtc.toLong)
Metrics.GlobalBalanceDetailed.withTag(Tags.BalanceType, Tags.BalanceTypes.Offchain).withTag(Tags.OffchainState, Tags.OffchainStates.normal).update(result.offChain.normal.total.toMilliBtc.toLong)
Metrics.GlobalBalanceDetailed.withTag(Tags.BalanceType, Tags.BalanceTypes.Offchain).withTag(Tags.OffchainState, Tags.OffchainStates.shutdown).update(result.offChain.shutdown.total.toMilliBtc.toLong)
Metrics.GlobalBalanceDetailed.withTag(Tags.BalanceType, Tags.BalanceTypes.Offchain).withTag(Tags.OffchainState, Tags.OffchainStates.closingLocal).update(result.offChain.closing.localCloseBalance.total.toMilliBtc.toLong)
Metrics.GlobalBalanceDetailed.withTag(Tags.BalanceType, Tags.BalanceTypes.Offchain).withTag(Tags.OffchainState, Tags.OffchainStates.closingRemote).update(result.offChain.closing.remoteCloseBalance.total.toMilliBtc.toLong)
Metrics.GlobalBalanceDetailed.withTag(Tags.BalanceType, Tags.BalanceTypes.Offchain).withTag(Tags.OffchainState, Tags.OffchainStates.closingUnknown).update(result.offChain.closing.unknownCloseBalance.total.toMilliBtc.toLong)
Metrics.GlobalBalanceDetailed.withTag(Tags.BalanceType, Tags.BalanceTypes.Offchain).withTag(Tags.OffchainState, Tags.OffchainStates.waitForPublishFutureCommitment).update(result.offChain.waitForPublishFutureCommitment.toMilliBtc.toLong)
refBalance_opt match {
case Some(refBalance) =>
val normalizedValue = 100 + (if (refBalance.total.toSatoshi.toLong > 0) (result.total.toSatoshi.toLong - refBalance.total.toSatoshi.toLong) * 1000D / refBalance.total.toSatoshi.toLong else 0)
val diffValue = result.total.toSatoshi.toLong - refBalance.total.toSatoshi.toLong
log.info("relative balance: current={} reference={} normalized={} diff={}", result.total.toDouble, refBalance.total.toDouble, normalizedValue, diffValue)
Metrics.GlobalBalanceNormalized.withoutTags().update(normalizedValue)
Metrics.GlobalBalanceDiff.withTag(Tags.DiffSign, Tags.DiffSigns.plus).update(diffValue.max(0))
Metrics.GlobalBalanceDiff.withTag(Tags.DiffSign, Tags.DiffSigns.minus).update((-diffValue).max(0))
Behaviors.same
case None =>
log.info("using balance={} as reference", result.total.toDouble)
apply(Some(result))
}
case Failure(t) =>
log.warn("could not compute balance: ", t)
Behaviors.same
}
case GetGlobalBalance(replyTo, channels) =>
CheckBalance.computeGlobalBalance(channels, db, extendedBitcoinClient) onComplete (replyTo ! _)
Behaviors.same
case WrappedUtxoInfo(res) =>
res match {
case Success(UtxoInfo(utxos: Seq[Utxo], ancestorCount: Map[ByteVector32, Long])) =>
val filteredByStatus: Map[String, Seq[Utxo]] = Map(
Monitoring.Tags.UtxoStatuses.Confirmed -> utxos.filter(utxo => utxo.confirmations > 0),
// We cannot create chains of unconfirmed transactions with more than 25 elements, so we ignore such utxos.
Monitoring.Tags.UtxoStatuses.Unconfirmed -> utxos.filter(utxo => utxo.confirmations == 0 && ancestorCount.getOrElse(utxo.txid, 1L) < 25),
Monitoring.Tags.UtxoStatuses.Safe -> utxos.filter(utxo => utxo.safe),
Monitoring.Tags.UtxoStatuses.Unsafe -> utxos.filter(utxo => !utxo.safe),
)
filteredByStatus.foreach {
case (status, filteredUtxos) =>
val amount = filteredUtxos.map(_.amount.toDouble).sum
log.info(s"we have ${filteredUtxos.length} $status utxos ($amount mBTC)")
Monitoring.Metrics.UtxoCount.withTag(Monitoring.Tags.UtxoStatus, status).update(filteredUtxos.length)
Monitoring.Metrics.BitcoinBalance.withTag(Monitoring.Tags.UtxoStatus, status).update(amount)
}
case Failure(t) =>
log.warn("could not check utxos: ", t)
}
Behaviors.same
}
}

0 comments on commit bd57d41

Please sign in to comment.