Skip to content

Commit

Permalink
Refactor isClosingAlreadyKnown
Browse files Browse the repository at this point in the history
  • Loading branch information
t-bast committed Mar 31, 2020
1 parent 687a2a5 commit 6576390
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 91 deletions.
53 changes: 25 additions & 28 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Expand Up @@ -216,17 +216,17 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// already reached mindepth
// - there is no need to attempt to publish transactions for other type of closes
closingType_opt match {
case Some(Closing.LocalClose) =>
closing.localCommitPublished.foreach(doPublish)
case Some(Closing.CurrentRemoteClose) =>
closing.remoteCommitPublished.foreach(doPublish)
case Some(Closing.NextRemoteClose) =>
closing.nextRemoteCommitPublished.foreach(doPublish)
case Some(Closing.RecoveryClose) =>
closing.futureRemoteCommitPublished.foreach(doPublish)
case Some(Closing.RevokedClose) =>
closing.revokedCommitPublished.foreach(doPublish)
case _ =>
case Some(c: Closing.MutualClose) =>
doPublish(c.tx)
case Some(c: Closing.LocalClose) =>
doPublish(c.localCommitPublished)
case Some(c: Closing.RemoteClose) =>
doPublish(c.remoteCommitPublished)
case Some(c: Closing.RecoveryClose) =>
doPublish(c.remoteCommitPublished)
case Some(c: Closing.RevokedClose) =>
doPublish(c.revokedCommitPublished)
case None =>
// in all other cases we need to be ready for any type of closing
// TODO: should we wait for an acknowledgment from the watcher?
blockchain ! WatchSpent(self, data.commitments.commitInput.outPoint.txid, data.commitments.commitInput.outPoint.index.toInt, data.commitments.commitInput.txOut.publicKeyScript, BITCOIN_FUNDING_SPENT)
Expand Down Expand Up @@ -1302,25 +1302,24 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId

case Event(WatchEventConfirmed(BITCOIN_TX_CONFIRMED(tx), blockHeight, _, _), d: DATA_CLOSING) =>
log.info(s"txid=${tx.txid} has reached mindepth, updating closing state")
// first we check if this tx belongs to one of the current local/remote commits and update it
val localCommitPublished1 = d.localCommitPublished.map(Closing.updateLocalCommitPublished(_, tx))
val remoteCommitPublished1 = d.remoteCommitPublished.map(Closing.updateRemoteCommitPublished(_, tx))
val nextRemoteCommitPublished1 = d.nextRemoteCommitPublished.map(Closing.updateRemoteCommitPublished(_, tx))
val futureRemoteCommitPublished1 = d.futureRemoteCommitPublished.map(Closing.updateRemoteCommitPublished(_, tx))
val revokedCommitPublished1 = d.revokedCommitPublished.map(Closing.updateRevokedCommitPublished(_, tx))
// first we check if this tx belongs to one of the current local/remote commits, update it and update the channel data
val d1 = d.copy(
localCommitPublished = d.localCommitPublished.map(Closing.updateLocalCommitPublished(_, tx)),
remoteCommitPublished = d.remoteCommitPublished.map(Closing.updateRemoteCommitPublished(_, tx)),
nextRemoteCommitPublished = d.nextRemoteCommitPublished.map(Closing.updateRemoteCommitPublished(_, tx)),
futureRemoteCommitPublished = d.futureRemoteCommitPublished.map(Closing.updateRemoteCommitPublished(_, tx)),
revokedCommitPublished = d.revokedCommitPublished.map(Closing.updateRevokedCommitPublished(_, tx))
)
// if the local commitment tx just got confirmed, let's send an event telling when we will get the main output refund
if (localCommitPublished1.map(_.commitTx.txid).contains(tx.txid)) {
if (d1.localCommitPublished.map(_.commitTx.txid).contains(tx.txid)) {
context.system.eventStream.publish(LocalCommitConfirmed(self, remoteNodeId, d.channelId, blockHeight + d.commitments.remoteParams.toSelfDelay.toInt))
}
// we may need to fail some htlcs in case a commitment tx was published and they have reached the timeout threshold
val timedoutHtlcs = if (localCommitPublished1 != d.localCommitPublished) {
Closing.timedoutHtlcs(d.commitments.localCommit, d.commitments.localParams.dustLimit, tx, localCommitPublished1)
} else if (remoteCommitPublished1 != d.remoteCommitPublished) {
Closing.timedoutHtlcs(d.commitments.remoteCommit, d.commitments.remoteParams.dustLimit, tx, remoteCommitPublished1)
} else if (nextRemoteCommitPublished1 != d.nextRemoteCommitPublished) {
Closing.timedoutHtlcs(d.commitments.remoteNextCommitInfo.left.get.nextRemoteCommit, d.commitments.remoteParams.dustLimit, tx, nextRemoteCommitPublished1)
} else {
Set.empty[UpdateAddHtlc] // we lose htlc outputs in dataloss protection scenarii (future remote commit)
val timedoutHtlcs = Closing.isClosingTypeAlreadyKnown(d1) match {
case Some(c: Closing.LocalClose) => Closing.timedoutHtlcs(c.localCommit, d.commitments.localParams.dustLimit, tx, c.localCommitPublished)
case Some(c: Closing.CurrentRemoteClose) => Closing.timedoutHtlcs(c.remoteCommit, d.commitments.remoteParams.dustLimit, tx, c.remoteCommitPublished)
case Some(c: Closing.NextRemoteClose) => Closing.timedoutHtlcs(c.remoteCommit, d.commitments.remoteParams.dustLimit, tx, c.remoteCommitPublished)
case _ => Set.empty[UpdateAddHtlc] // we lose htlc outputs in dataloss protection scenarii (future remote commit)
}
timedoutHtlcs.foreach { add =>
d.commitments.originChannels.get(add.id) match {
Expand Down Expand Up @@ -1348,8 +1347,6 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
.onchainOutgoingHtlcs(d.commitments.localCommit, d.commitments.remoteCommit, d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit), tx)
.map(add => (add, d.commitments.originChannels.get(add.id).collect { case Origin.Local(id, _) => id })) // we resolve the payment id if this was a local payment
.collect { case (add, Some(id)) => context.system.eventStream.publish(PaymentSettlingOnChain(id, amount = add.amountMsat, add.paymentHash)) }
// we update the channel data
val d1 = d.copy(localCommitPublished = localCommitPublished1, remoteCommitPublished = remoteCommitPublished1, nextRemoteCommitPublished = nextRemoteCommitPublished1, futureRemoteCommitPublished = futureRemoteCommitPublished1, revokedCommitPublished = revokedCommitPublished1)
// and we also send events related to fee
Closing.networkFeePaid(tx, d1) foreach { case (fee, desc) => feePaid(fee, tx, desc, d.channelId) }
// then let's see if any of the possible close scenarii can be considered done
Expand Down
44 changes: 22 additions & 22 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Helpers.scala
Expand Up @@ -367,13 +367,13 @@ object Helpers {

// @formatter:off
sealed trait ClosingType
case object MutualClose extends ClosingType
case object LocalClose extends ClosingType
sealed trait RemoteClose extends ClosingType
case object CurrentRemoteClose extends RemoteClose
case object NextRemoteClose extends RemoteClose
case object RecoveryClose extends ClosingType
case object RevokedClose extends ClosingType
case class MutualClose(tx: Transaction) extends ClosingType
case class LocalClose(localCommit: LocalCommit, localCommitPublished: LocalCommitPublished) extends ClosingType
sealed trait RemoteClose extends ClosingType { def remoteCommitPublished: RemoteCommitPublished }
case class CurrentRemoteClose(remoteCommit: RemoteCommit, remoteCommitPublished: RemoteCommitPublished) extends RemoteClose
case class NextRemoteClose(remoteCommit: RemoteCommit, remoteCommitPublished: RemoteCommitPublished) extends RemoteClose
case class RecoveryClose(remoteCommitPublished: RemoteCommitPublished) extends ClosingType
case class RevokedClose(revokedCommitPublished: RevokedCommitPublished) extends ClosingType
// @formatter:on

/**
Expand Down Expand Up @@ -401,15 +401,15 @@ object Helpers {
*/
def isClosingTypeAlreadyKnown(closing: DATA_CLOSING): Option[ClosingType] = closing match {
case _ if closing.localCommitPublished.exists(lcp => lcp.irrevocablySpent.values.toSet.contains(lcp.commitTx.txid)) =>
Some(LocalClose)
Some(LocalClose(closing.commitments.localCommit, closing.localCommitPublished.get))
case _ if closing.remoteCommitPublished.exists(rcp => rcp.irrevocablySpent.values.toSet.contains(rcp.commitTx.txid)) =>
Some(CurrentRemoteClose)
Some(CurrentRemoteClose(closing.commitments.remoteCommit, closing.remoteCommitPublished.get))
case _ if closing.nextRemoteCommitPublished.exists(rcp => rcp.irrevocablySpent.values.toSet.contains(rcp.commitTx.txid)) =>
Some(NextRemoteClose)
Some(NextRemoteClose(closing.commitments.remoteNextCommitInfo.left.get.nextRemoteCommit, closing.nextRemoteCommitPublished.get))
case _ if closing.futureRemoteCommitPublished.exists(rcp => rcp.irrevocablySpent.values.toSet.contains(rcp.commitTx.txid)) =>
Some(RecoveryClose)
Some(RecoveryClose(closing.futureRemoteCommitPublished.get))
case _ if closing.revokedCommitPublished.exists(rcp => rcp.irrevocablySpent.values.toSet.contains(rcp.commitTx.txid)) =>
Some(RevokedClose)
Some(RevokedClose(closing.revokedCommitPublished.find(rcp => rcp.irrevocablySpent.values.toSet.contains(rcp.commitTx.txid)).get))
case _ => None // we don't know yet what the closing type will be
}

Expand All @@ -423,17 +423,17 @@ object Helpers {
*/
def isClosed(data: HasCommitments, additionalConfirmedTx_opt: Option[Transaction]): Option[ClosingType] = data match {
case closing: DATA_CLOSING if additionalConfirmedTx_opt.exists(closing.mutualClosePublished.contains) =>
Some(MutualClose)
Some(MutualClose(additionalConfirmedTx_opt.get))
case closing: DATA_CLOSING if closing.localCommitPublished.exists(Closing.isLocalCommitDone) =>
Some(LocalClose)
Some(LocalClose(closing.commitments.localCommit, closing.localCommitPublished.get))
case closing: DATA_CLOSING if closing.remoteCommitPublished.exists(Closing.isRemoteCommitDone) =>
Some(CurrentRemoteClose)
Some(CurrentRemoteClose(closing.commitments.remoteCommit, closing.remoteCommitPublished.get))
case closing: DATA_CLOSING if closing.nextRemoteCommitPublished.exists(Closing.isRemoteCommitDone) =>
Some(NextRemoteClose)
Some(NextRemoteClose(closing.commitments.remoteNextCommitInfo.left.get.nextRemoteCommit, closing.nextRemoteCommitPublished.get))
case closing: DATA_CLOSING if closing.futureRemoteCommitPublished.exists(Closing.isRemoteCommitDone) =>
Some(RecoveryClose)
Some(RecoveryClose(closing.futureRemoteCommitPublished.get))
case closing: DATA_CLOSING if closing.revokedCommitPublished.exists(Closing.isRevokedCommitDone) =>
Some(RevokedClose)
Some(RevokedClose(closing.revokedCommitPublished.find(Closing.isRevokedCommitDone).get))
case _ => None
}

Expand Down Expand Up @@ -890,7 +890,7 @@ object Helpers {
* @param tx a tx that has reached mindepth
* @return a set of htlcs that need to be failed upstream
*/
def timedoutHtlcs(localCommit: LocalCommit, localDustLimit: Satoshi, tx: Transaction, localCommitPublished: Option[LocalCommitPublished])(implicit log: LoggingAdapter): Set[UpdateAddHtlc] = {
def timedoutHtlcs(localCommit: LocalCommit, localDustLimit: Satoshi, tx: Transaction, localCommitPublished: LocalCommitPublished)(implicit log: LoggingAdapter): Set[UpdateAddHtlc] = {
val untrimmedHtlcs = Transactions.trimOfferedHtlcs(localDustLimit, localCommit.spec).map(_.add)
if (tx.txid == localCommit.publishableTxs.commitTx.tx.txid) {
// the tx is a commitment tx, we can immediately fail all dust htlcs (they don't have an output in the tx)
Expand All @@ -905,7 +905,7 @@ object Helpers {
findTimedOutHtlc(tx,
paymentHash160,
untrimmedHtlcs,
localCommitPublished.toSeq.flatMap(_.htlcTimeoutTxs),
localCommitPublished.htlcTimeoutTxs,
Scripts.extractPaymentHashFromHtlcTimeout)
}
.toSet
Expand All @@ -919,7 +919,7 @@ object Helpers {
* @param tx a tx that has reached mindepth
* @return a set of htlcs that need to be failed upstream
*/
def timedoutHtlcs(remoteCommit: RemoteCommit, remoteDustLimit: Satoshi, tx: Transaction, remoteCommitPublished: Option[RemoteCommitPublished])(implicit log: LoggingAdapter): Set[UpdateAddHtlc] = {
def timedoutHtlcs(remoteCommit: RemoteCommit, remoteDustLimit: Satoshi, tx: Transaction, remoteCommitPublished: RemoteCommitPublished)(implicit log: LoggingAdapter): Set[UpdateAddHtlc] = {
val untrimmedHtlcs = Transactions.trimReceivedHtlcs(remoteDustLimit, remoteCommit.spec).map(_.add)
if (tx.txid == remoteCommit.txid) {
// the tx is a commitment tx, we can immediately fail all dust htlcs (they don't have an output in the tx)
Expand All @@ -934,7 +934,7 @@ object Helpers {
findTimedOutHtlc(tx,
paymentHash160,
untrimmedHtlcs,
remoteCommitPublished.toSeq.flatMap(_.claimHtlcTimeoutTxs),
remoteCommitPublished.claimHtlcTimeoutTxs,
Scripts.extractPaymentHashFromClaimHtlcTimeout)
}
.toSet
Expand Down
Expand Up @@ -92,11 +92,11 @@ class Auditor(nodeParams: NodeParams) extends Actor with ActorLogging {
case e: ChannelClosed =>
ChannelMetrics.ChannelLifecycleEvents.withTag(ChannelTags.Event, ChannelTags.Events.Closed).increment()
val event = e.closingType match {
case MutualClose => "mutual"
case LocalClose => "local"
case _: MutualClose => "mutual"
case _: LocalClose => "local"
case _: RemoteClose => "remote" // can be current or next
case RecoveryClose => "recovery"
case RevokedClose => "revoked"
case _: RecoveryClose => "recovery"
case _: RevokedClose => "revoked"
}
db.add(ChannelLifecycleEvent(e.channelId, e.commitments.remoteParams.nodeId, e.commitments.commitInput.txOut.amount, e.commitments.localParams.isFunder, !e.commitments.announceChannel, event))

Expand Down
Expand Up @@ -308,43 +308,49 @@ object PostRestartHtlcCleaner {
case Right(p@IncomingPacket.FinalPacket(add, _)) => IncomingHtlc(add, shouldFulfill(p, paymentsDb))
}

// We are only interested in HTLCs that are pending upstream (not fulfilled nor failed yet).
// It may be the case that we have unresolved HTLCs downstream that have been resolved upstream when the downstream
// channel is closing (e.g. due to an HTLC timeout) because cooperatively failing the HTLC downstream will be
// instant whereas the uncooperative close of the downstream channel will take time.
def isPendingUpstream(channelId: ByteVector32, htlcId: Long): Boolean =
htlcsIn.exists(htlc => htlc.add.channelId == channelId && htlc.add.id == htlcId)

// We group relayed outgoing HTLCs by their origin.
val relayedOut = channels
.flatMap(c => {
.flatMap { c =>
// Filter out HTLCs that will never reach the blockchain or have already been timed-out on-chain.
val htlcsToIgnore: Set[Long] = c match {
case c: DATA_CLOSING =>
val irrevocablySpentTxes = Closing.irrevocablySpentTxes(c)
val localCommitConfirmed = c.localCommitPublished.exists(x => irrevocablySpentTxes.exists(tx => tx.txid == x.commitTx.txid))
val remoteCommitConfirmed = c.remoteCommitPublished.exists(x => irrevocablySpentTxes.exists(tx => tx.txid == x.commitTx.txid))
val nextRemoteCommitConfirmed = c.nextRemoteCommitPublished.exists(x => irrevocablySpentTxes.exists(tx => tx.txid == x.commitTx.txid))
val timedOutHtlcs = irrevocablySpentTxes.flatMap(tx => {
if (localCommitConfirmed) {
Closing.timedoutHtlcs(c.commitments.localCommit, c.commitments.localParams.dustLimit, tx, c.localCommitPublished)
} else if (remoteCommitConfirmed) {
Closing.timedoutHtlcs(c.commitments.remoteCommit, c.commitments.remoteParams.dustLimit, tx, c.remoteCommitPublished)
} else if (nextRemoteCommitConfirmed) {
c.commitments.remoteNextCommitInfo.left.toSeq.flatMap(r => Closing.timedoutHtlcs(r.nextRemoteCommit, c.commitments.remoteParams.dustLimit, tx, c.nextRemoteCommitPublished))
} else {
Set.empty[UpdateAddHtlc]
}
case d: DATA_CLOSING =>
val closingType_opt = Closing.isClosingTypeAlreadyKnown(d)
val overriddenHtlcs: Set[Long] = (closingType_opt match {
case Some(c: Closing.LocalClose) => Closing.overriddenOutgoingHtlcs(d, c.localCommitPublished.commitTx)
case Some(c: Closing.RemoteClose) => Closing.overriddenOutgoingHtlcs(d, c.remoteCommitPublished.commitTx)
case _ => Set.empty[UpdateAddHtlc]
}).map(_.id)
val overriddenHtlcs = irrevocablySpentTxes.flatMap(tx => Closing.overriddenOutgoingHtlcs(c, tx)).map(_.id)
timedOutHtlcs ++ overriddenHtlcs
val irrevocablySpent = closingType_opt match {
case Some(c: Closing.LocalClose) => c.localCommitPublished.irrevocablySpent.values.toSet
case Some(c: Closing.RemoteClose) => c.remoteCommitPublished.irrevocablySpent.values.toSet
case _ => Set.empty[ByteVector32]
}
val timedoutHtlcs: Set[Long] = (closingType_opt match {
case Some(c: Closing.LocalClose) =>
val confirmedTxs = c.localCommitPublished.commitTx :: c.localCommitPublished.htlcTimeoutTxs.filter(tx => irrevocablySpent.contains(tx.txid))
confirmedTxs.flatMap(tx => Closing.timedoutHtlcs(c.localCommit, d.commitments.localParams.dustLimit, tx, c.localCommitPublished))
case Some(c: Closing.CurrentRemoteClose) =>
val confirmedTxs = c.remoteCommitPublished.commitTx :: c.remoteCommitPublished.claimHtlcTimeoutTxs.filter(tx => irrevocablySpent.contains(tx.txid))
confirmedTxs.flatMap(tx => Closing.timedoutHtlcs(c.remoteCommit, d.commitments.remoteParams.dustLimit, tx, c.remoteCommitPublished))
case Some(c: Closing.NextRemoteClose) =>
val confirmedTxs = c.remoteCommitPublished.commitTx :: c.remoteCommitPublished.claimHtlcTimeoutTxs.filter(tx => irrevocablySpent.contains(tx.txid))
confirmedTxs.flatMap(tx => Closing.timedoutHtlcs(c.remoteCommit, d.commitments.remoteParams.dustLimit, tx, c.remoteCommitPublished))
case _ => Seq.empty[UpdateAddHtlc]
}).map(_.id).toSet
overriddenHtlcs ++ timedoutHtlcs
case _ => Set.empty
}
c.commitments.originChannels.filterKeys(htlcId => !htlcsToIgnore.contains(htlcId)).map { case (outgoingHtlcId, origin) => (origin, c.channelId, outgoingHtlcId) }
})
}
.groupBy { case (origin, _, _) => origin }
.mapValues(_.map { case (_, channelId, htlcId) => (channelId, htlcId) }.toSet)
// Filter out HTLCs that are already fulfilled/failed upstream.
// We are only interested in HTLCs that are pending upstream (not fulfilled nor failed yet).
// It may be the case that we have unresolved HTLCs downstream that have been resolved upstream when the downstream
// channel is closing (e.g. due to an HTLC timeout) because cooperatively failing the HTLC downstream will be
// instant whereas the uncooperative close of the downstream channel will take time.
.filterKeys {
case _: Origin.Local => true
case Origin.Relayed(channelId, htlcId, _, _) => isPendingUpstream(channelId, htlcId)
Expand Down

0 comments on commit 6576390

Please sign in to comment.