Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Smarter strategy for sending `channel_update`s #950

Merged
merged 21 commits into from Apr 29, 2019
Merged
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

delay re-enabling of channel at reconnection

When switching from `SYNCING`->`NORMAL`, instead of emitting a new
`channel_update` with flag=enabled right away, we wait a little bit and
send it later.

This way, if a connection to a peer is unstable and we keep getting
disconnected/reconnected, we won't spam the network.

This extra delay allows us to remove the change made in #888, which was
a workaround in case we generated `channel_update` too quickly.

Also, increased refresh interval from 7 days to 10 days. There was no
need to be so conservative.
  • Loading branch information...
pm47 committed Apr 17, 2019
commit 3450bb462272c080542f61c42db9dab4b0f16c9e
@@ -36,7 +36,7 @@ import fr.acinq.eclair.wire.{ChannelReestablish, _}
import scala.compat.Platform
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
import scala.util.{Failure, Random, Success, Try}


/**
@@ -69,17 +69,26 @@ object Channel {
// as a fundee, we will wait that much time for the funding tx to confirm (funder will rely on the funding tx being double-spent)
val FUNDING_TIMEOUT_FUNDEE = 5 days

val REFRESH_CHANNEL_UPDATE_INTERVAL = 7 days
// pruning occurs if no new update has been received in two weeks (BOLT 7)
val REFRESH_CHANNEL_UPDATE_INTERVAL = 10 days

case object TickRefreshChannelUpdate
case class BroadcastChannelUpdate(reason: BroadcastReason)

case class RevocationTimeout(remoteCommitNumber: Long, peer: ActorRef) // we will receive this message when we waited too long for a revocation for that commit number (NB: we explicitely specify the peer to allow for testing)
// @formatter:off
sealed trait BroadcastReason
case object PeriodicRefresh extends BroadcastReason
case object Reconnected extends BroadcastReason
case object AboveReserve extends BroadcastReason
// @formatter:on

sealed trait ChannelError
// we will receive this message when we waited too long for a revocation for that commit number (NB: we explicitely specify the peer to allow for testing)
case class RevocationTimeout(remoteCommitNumber: Long, peer: ActorRef)

// @formatter:off
sealed trait ChannelError
case class LocalError(t: Throwable) extends ChannelError

case class RemoteError(e: Error) extends ChannelError
// @formatter:on

}

@@ -98,6 +107,9 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// this will be used to make sure the current commitment fee is up-to-date
context.system.eventStream.subscribe(self, classOf[CurrentFeerates])

// we need to periodically re-send channel updates (with some initial randomization to smooth herd effect), otherwise channel will be considered stale and get pruned by network
context.system.scheduler.schedule(initialDelay = Random.nextInt(48).hours, interval = REFRESH_CHANNEL_UPDATE_INTERVAL, receiver = self, message = BroadcastChannelUpdate(PeriodicRefresh))

/*
8888888 888b 888 8888888 88888888888
888 8888b 888 888 888
@@ -136,7 +148,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
startWith(WAIT_FOR_INIT_INTERNAL, Nothing)

when(WAIT_FOR_INIT_INTERNAL)(handleExceptions {
case Event(initFunder@INPUT_INIT_FUNDER(temporaryChannelId, fundingSatoshis, pushMsat, initialFeeratePerKw, fundingTxFeeratePerKw, localParams, remote, remoteInit, channelFlags), Nothing) =>
case Event(initFunder@INPUT_INIT_FUNDER(temporaryChannelId, fundingSatoshis, pushMsat, initialFeeratePerKw, _, localParams, remote, _, channelFlags), Nothing) =>
context.system.eventStream.publish(ChannelCreated(self, context.parent, remoteNodeId, true, temporaryChannelId))
forwarder ! remote
val open = OpenChannel(nodeParams.chainHash,
@@ -656,7 +668,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
if (!Helpers.aboveReserve(d.commitments) && Helpers.aboveReserve(commitments1)) {
// we just went above reserve (can't go below), let's refresh our channel_update to enable/disable it accordingly
log.info(s"updating channel_update aboveReserve=${Helpers.aboveReserve(commitments1)}")
self ! TickRefreshChannelUpdate
self ! BroadcastChannelUpdate(AboveReserve)
}
context.system.eventStream.publish(ChannelSignatureSent(self, commitments1))
if (nextRemoteCommit.spec.toRemoteMsat != d.commitments.remoteCommit.spec.toRemoteMsat) {
@@ -855,19 +867,18 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
stay
}

case Event(TickRefreshChannelUpdate, d: DATA_NORMAL) =>
// periodic refresh is used as a keep alive
log.info(s"sending channel_update announcement (refresh)")
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, d.channelUpdate.feeBaseMsat, d.channelUpdate.feeProportionalMillionths, d.commitments.localCommit.spec.totalFunds, enable = Helpers.aboveReserve(d.commitments))
// we use GOTO instead of stay because we want to fire transitions
goto(NORMAL) using store(d.copy(channelUpdate = channelUpdate))

case Event(CMD_UPDATE_RELAY_FEE(feeBaseMsat, feeProportionalMillionths), d: DATA_NORMAL) =>
log.info(s"updating relay fees: prevFeeBaseMsat={} nextFeeBaseMsat={} prevFeeProportionalMillionths={} nextFeeProportionalMillionths={}", d.channelUpdate.feeBaseMsat, feeBaseMsat, d.channelUpdate.feeProportionalMillionths, feeProportionalMillionths)
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, feeBaseMsat, feeProportionalMillionths, d.commitments.localCommit.spec.totalFunds, enable = Helpers.aboveReserve(d.commitments))
// we use GOTO instead of stay because we want to fire transitions
goto(NORMAL) using store(d.copy(channelUpdate = channelUpdate)) replying "ok"

case Event(BroadcastChannelUpdate(reason), d: DATA_NORMAL) =>
log.info(s"sending channel_update announcement (reason=$reason)")
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, d.channelUpdate.feeBaseMsat, d.channelUpdate.feeProportionalMillionths, d.commitments.localCommit.spec.totalFunds, enable = Helpers.aboveReserve(d.commitments))
// we use GOTO instead of stay because we want to fire transitions
goto(NORMAL) using store(d.copy(channelUpdate = channelUpdate))

case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_NORMAL) if tx.txid == d.commitments.remoteCommit.txid => handleRemoteSpentCurrent(tx, d)

case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_NORMAL) if Some(tx.txid) == d.commitments.remoteNextCommitInfo.left.toOption.map(_.nextRemoteCommit.txid) => handleRemoteSpentNext(tx, d)
@@ -878,11 +889,11 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// we disable the channel
log.debug(s"sending channel_update announcement (disable)")
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, d.channelUpdate.feeBaseMsat, d.channelUpdate.feeProportionalMillionths, d.commitments.localCommit.spec.totalFunds, enable = false)
// we cancel the timer that would have made us send the enabled update after reconnection (flappy channel protection)
cancelTimer(Reconnected.toString)
d.commitments.localChanges.proposed.collect {
case add: UpdateAddHtlc => relayer ! Status.Failure(AddHtlcFailed(d.channelId, add.paymentHash, ChannelUnavailable(d.channelId), d.commitments.originChannels(add.id), Some(channelUpdate), None))
}
// disable the channel_update refresh timer
cancelTimer(TickRefreshChannelUpdate.toString)
goto(OFFLINE) using d.copy(channelUpdate = channelUpdate)

case Event(e: Error, d: DATA_NORMAL) => handleRemoteError(e, d)
@@ -1260,13 +1271,13 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
.foreach(add => context.system.eventStream.publish(PaymentSettlingOnChain(amount = MilliSatoshi(add.amountMsat), add.paymentHash)))
// then let's see if any of the possible close scenarii can be considered done
val mutualCloseDone = d.mutualClosePublished.exists(_.txid == tx.txid) // this case is trivial, in a mutual close scenario we only need to make sure that one of the closing txes is confirmed
val localCommitDone = localCommitPublished1.map(Closing.isLocalCommitDone(_)).getOrElse(false)
val localCommitDone = localCommitPublished1.map(Closing.isLocalCommitDone(_)).getOrElse(false)
val remoteCommitDone = remoteCommitPublished1.map(Closing.isRemoteCommitDone(_)).getOrElse(false)
val nextRemoteCommitDone = nextRemoteCommitPublished1.map(Closing.isRemoteCommitDone(_)).getOrElse(false)
val futureRemoteCommitDone = futureRemoteCommitPublished1.map(Closing.isRemoteCommitDone(_)).getOrElse(false)
val revokedCommitDone = revokedCommitPublished1.map(Closing.isRevokedCommitDone(_)).exists(_ == true) // we only need one revoked commit done
// finally, if one of the unilateral closes is done, we move to CLOSED state, otherwise we stay (note that we don't store the state)
val d1 = d.copy(localCommitPublished = localCommitPublished1, remoteCommitPublished = remoteCommitPublished1, nextRemoteCommitPublished = nextRemoteCommitPublished1, futureRemoteCommitPublished = futureRemoteCommitPublished1, revokedCommitPublished = revokedCommitPublished1)
// finally, if one of the unilateral closes is done, we move to CLOSED state, otherwise we stay (note that we don't store the state)
val d1 = d.copy(localCommitPublished = localCommitPublished1, remoteCommitPublished = remoteCommitPublished1, nextRemoteCommitPublished = nextRemoteCommitPublished1, futureRemoteCommitPublished = futureRemoteCommitPublished1, revokedCommitPublished = revokedCommitPublished1)
// we also send events related to fee
Closing.networkFeePaid(tx, d1) map { case (fee, desc) => feePaid(fee, tx, desc, d.channelId) }
val closeType_opt = if (mutualCloseDone) {
@@ -1461,16 +1472,10 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
()
}
}
// re-enable the channel
val timestamp = Platform.currentTime / 1000 match {
case ts if ts == d.channelUpdate.timestamp => ts + 1 // corner case: in case of quick reconnection, we bump the timestamp of the new channel_update, otherwise it will get ignored by the network
case ts => ts
}
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, nodeParams.expiryDeltaBlocks, d.commitments.remoteParams.htlcMinimumMsat, d.channelUpdate.feeBaseMsat, d.channelUpdate.feeProportionalMillionths, d.commitments.localCommit.spec.totalFunds, enable = Helpers.aboveReserve(d.commitments), timestamp = timestamp)
// we will refresh need to periodically re-send channel updates, otherwise channel will be considered stale and get pruned by network
setTimer(TickRefreshChannelUpdate.toString, TickRefreshChannelUpdate, timeout = REFRESH_CHANNEL_UPDATE_INTERVAL, repeat = true)
// we will re-enable the channel after some delay to prevent flappy updates in case the connection is unstable
setTimer(Reconnected.toString, BroadcastChannelUpdate(Reconnected), 10 seconds, repeat = false)

goto(NORMAL) using d.copy(commitments = commitments1, channelUpdate = channelUpdate)
goto(NORMAL) using d.copy(commitments = commitments1)
}

case Event(channelReestablish: ChannelReestablish, d: DATA_SHUTDOWN) =>
@@ -1574,8 +1579,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// we only care about this event in NORMAL and SHUTDOWN state, and we never unregister to the event stream
case Event(CurrentFeerates(_), _) => stay

// we only care about this event in NORMAL state, and the scheduler is not cancelled in some cases (e.g. NORMAL->CLOSING)
case Event(TickRefreshChannelUpdate, _) => stay
// we only care about this event in NORMAL state
case Event(_: BroadcastChannelUpdate, _) => stay

// we receive this when we send command to ourselves
case Event("ok", _) => stay
@@ -25,7 +25,7 @@ import fr.acinq.eclair.TestConstants.{Alice, Bob}
import fr.acinq.eclair.UInt64.Conversions._
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.blockchain.fee.FeeratesPerKw
import fr.acinq.eclair.channel.Channel.{RevocationTimeout, TickRefreshChannelUpdate}
import fr.acinq.eclair.channel.Channel.{BroadcastChannelUpdate, PeriodicRefresh, RevocationTimeout}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.channel.states.StateTestsHelperMethods
import fr.acinq.eclair.io.Peer
@@ -2082,7 +2082,7 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
alice2bob.expectMsg(annSigsA)
}

test("recv TickRefreshChannelUpdate", Tag("channels_public")) { f =>
test("recv BroadcastChannelUpdate", Tag("channels_public")) { f =>
import f._
val sender = TestProbe()
sender.send(alice, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
@@ -2093,7 +2093,7 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods {

// actual test starts here
Thread.sleep(1100)
sender.send(alice, TickRefreshChannelUpdate)
sender.send(alice, BroadcastChannelUpdate(PeriodicRefresh))
val update2 = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(update1.channelUpdate.timestamp < update2.channelUpdate.timestamp)
}
@@ -366,42 +366,4 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
channelUpdateListener.expectNoMsg(300 millis)
}

test("bump timestamp in case of quick reconnection") { f =>
import f._
val sender = TestProbe()

// we simulate a disconnection
sender.send(alice, INPUT_DISCONNECTED)
sender.send(bob, INPUT_DISCONNECTED)
awaitCond(alice.stateName == OFFLINE)
awaitCond(bob.stateName == OFFLINE)

// alice and bob announce that their channel is OFFLINE
val channelUpdate_alice_disabled = channelUpdateListener.expectMsgType[LocalChannelUpdate].channelUpdate
val channelUpdate_bob_disabled = channelUpdateListener.expectMsgType[LocalChannelUpdate].channelUpdate
assert(Announcements.isEnabled(channelUpdate_alice_disabled.channelFlags) == false)
assert(Announcements.isEnabled(channelUpdate_bob_disabled.channelFlags) == false)

// we immediately reconnect them
sender.send(alice, INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit))
sender.send(bob, INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit))

// peers exchange channel_reestablish messages
alice2bob.expectMsgType[ChannelReestablish]
bob2alice.expectMsgType[ChannelReestablish]
alice2bob.forward(bob)
bob2alice.forward(alice)

// both nodes reach NORMAL state, and broadcast a new channel_update
val channelUpdate_alice_enabled = channelUpdateListener.expectMsgType[LocalChannelUpdate].channelUpdate
val channelUpdate_bob_enabled = channelUpdateListener.expectMsgType[LocalChannelUpdate].channelUpdate
assert(Announcements.isEnabled(channelUpdate_alice_enabled.channelFlags) == true)
assert(Announcements.isEnabled(channelUpdate_bob_enabled.channelFlags) == true)

// let's check that the two successive channel_update have a different timestamp
assert(channelUpdate_alice_enabled.timestamp > channelUpdate_alice_disabled.timestamp)
assert(channelUpdate_bob_enabled.timestamp > channelUpdate_bob_disabled.timestamp)

}

}
@@ -28,7 +28,7 @@ import fr.acinq.bitcoin.{Base58, Base58Check, Bech32, Block, ByteVector32, Crypt
import fr.acinq.eclair.blockchain.bitcoind.BitcoindService
import fr.acinq.eclair.blockchain.bitcoind.rpc.ExtendedBitcoinClient
import fr.acinq.eclair.blockchain.{Watch, WatchConfirmed}
import fr.acinq.eclair.channel.Channel.TickRefreshChannelUpdate
import fr.acinq.eclair.channel.Channel.{BroadcastChannelUpdate, PeriodicRefresh}
import fr.acinq.eclair.channel.Register.{Forward, ForwardShortId}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.crypto.Sphinx.ErrorPacket
@@ -308,7 +308,7 @@ class IntegrationSpec extends TestKit(ActorSystem("test")) with BitcoindService

// first let's wait 3 seconds to make sure the timestamp of the new channel_update will be strictly greater than the former
sender.expectNoMsg(3 seconds)
sender.send(nodes("B").register, ForwardShortId(shortIdBC, TickRefreshChannelUpdate))
sender.send(nodes("B").register, ForwardShortId(shortIdBC, BroadcastChannelUpdate(PeriodicRefresh)))
sender.send(nodes("B").register, ForwardShortId(shortIdBC, CMD_GETINFO))
val channelUpdateBC_new = sender.expectMsgType[RES_GETINFO].data.asInstanceOf[DATA_NORMAL].channelUpdate
logger.info(s"channelUpdateBC=$channelUpdateBC")
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.