Skip to content
Permalink
Browse files

Smarter strategy for sending `channel_update`s (#950)

The goal is to prevent sending a lot of updates for flappy channels.

Instead of sending a disabled `channel_update` after each disconnection, 
we now wait for a payment to try to route through the channel and only 
then reply with a disabled `channel_update` and broadcast it on the
network.

The reason is that in case of a disconnection, if noone cares about that
channel then there is no reason to tell everyone about its current
(disconnected) state.

In addition to that, when switching from `SYNCING`->`NORMAL`, instead
of emitting a new `channel_update` with flag=enabled right away, we wait
a little bit and send it later. We also don't send a new `channel_update` if
it is identical to the previous one (except if the previous one is outdated).

This way, if a connection to a peer is unstable and we keep getting
disconnected/reconnected, we won't spam the network.

The extra delay allows us to remove the change made in #888, which was
a workaround in case we generated `channel_update` too quickly.

Also, increased refresh interval from 7 days to 10 days. There was no
need to be so conservative.

Note that on startup we still need to re-send `channel_update` for all 
channels in order to properly initialize the `Router` and the `Relayer`.
Otherwise they won't know about those channels, and e.g. the 
`Relayer` will return `UnknownNextPeer` errors.

But we don't need to create new `channel_update`s in most cases, so 
this should have little or no impact to gossip because our peers will
already know the updates and will filter them out.

On the other hand, if some global parameters (like relaying fees) are
changed, it will cause the creation a new `channel_update` for all
channels.
  • Loading branch information...
pm47 committed Apr 29, 2019
1 parent 4ba4ce8 commit fb84dfb855ddc1ee0a42897351ac5506a28198da

Large diffs are not rendered by default.

@@ -21,6 +21,7 @@ import fr.acinq.bitcoin.Crypto.{Point, PrivateKey, PublicKey, Scalar, ripemd160,
import fr.acinq.bitcoin.Script._
import fr.acinq.bitcoin.{OutPoint, _}
import fr.acinq.eclair.blockchain.EclairWallet
import fr.acinq.eclair.channel.Channel.REFRESH_CHANNEL_UPDATE_INTERVAL
import fr.acinq.eclair.crypto.{Generators, KeyManager}
import fr.acinq.eclair.db.ChannelsDb
import fr.acinq.eclair.payment.{Local, Origin}
@@ -31,8 +32,9 @@ import fr.acinq.eclair.wire._
import fr.acinq.eclair.{Globals, NodeParams, ShortChannelId, addressToPublicKeyScript}
import scodec.bits.ByteVector

import scala.compat.Platform
import scala.concurrent.Await
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}

/**
@@ -152,6 +154,22 @@ object Helpers {
if (reserveToFundingRatio > nodeParams.maxReserveToFundingRatio) throw ChannelReserveTooHigh(open.temporaryChannelId, accept.channelReserveSatoshis, reserveToFundingRatio, nodeParams.maxReserveToFundingRatio)
}

/**
* Compute the delay until we need to refresh the channel_update for our channel not to be considered stale by
* other nodes.
*
* If current update more than [[Channel.REFRESH_CHANNEL_UPDATE_INTERVAL]] old then the delay will be zero.
*
* @param currentUpdateTimestamp
* @return the delay until the next update
*/
def nextChannelUpdateRefresh(currentUpdateTimestamp: Long)(implicit log: LoggingAdapter): FiniteDuration = {
val age = Platform.currentTime.milliseconds - currentUpdateTimestamp.seconds
val delay = 0.days.max(REFRESH_CHANNEL_UPDATE_INTERVAL - age)
log.info("current channel_update was created {} days ago, will refresh it in {} days", age.toDays, delay.toDays)
delay
}

/**
*
* @param remoteFeeratePerKw remote fee rate per kiloweight
@@ -17,7 +17,7 @@
package fr.acinq.eclair.payment

import akka.actor.{Actor, ActorLogging, Props, Status}
import fr.acinq.bitcoin.{ByteVector32, Crypto, MilliSatoshi}
import fr.acinq.bitcoin.{Crypto, MilliSatoshi}
import fr.acinq.eclair.channel.{CMD_FAIL_HTLC, CMD_FULFILL_HTLC, Channel}
import fr.acinq.eclair.db.IncomingPayment
import fr.acinq.eclair.payment.PaymentLifecycle.ReceivePayment
@@ -26,6 +26,7 @@ import fr.acinq.eclair.{Globals, NodeParams, randomBytes32}
import concurrent.duration._
import scala.compat.Platform
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}

/**
@@ -24,6 +24,7 @@ import scodec.bits.{BitVector, ByteVector}
import shapeless.HNil
import scala.concurrent.duration._
import scala.compat.Platform
import scala.concurrent.duration._

/**
* Created by PM on 03/02/2017.
@@ -454,7 +454,7 @@ class Router(nodeParams: NodeParams, watcher: ActorRef, initialized: Option[Prom
stay

case Event(u: ChannelUpdate, d: Data) =>
// it was sent by us, routing messages that are sent by our peers are now wrapped in a PeerRoutingMessage
// it was sent by us (e.g. the payment lifecycle); routing messages that are sent by our peers are now wrapped in a PeerRoutingMessage
log.debug("received channel update from {}", sender)
stay using handle(u, sender, d)

@@ -758,7 +758,7 @@ object Router {
def hasChannels(nodeId: PublicKey, channels: Iterable[ChannelAnnouncement]): Boolean = channels.exists(c => isRelatedTo(c, nodeId))

def isStale(u: ChannelUpdate): Boolean = {
// BOLT 7: "nodes MAY prune channels should the timestamp of the latest channel_update be older than 2 weeks (1209600 seconds)"
// BOLT 7: "nodes MAY prune channels should the timestamp of the latest channel_update be older than 2 weeks"
// but we don't want to prune brand new channels for which we didn't yet receive a channel update
val staleThresholdSeconds = (Platform.currentTime.milliseconds - 14.days).toSeconds
u.timestamp < staleThresholdSeconds
@@ -843,7 +843,6 @@ object Router {
* @param extraEdges a set of extra edges we want to CONSIDER during the search
* @param ignoredEdges a set of extra edges we want to IGNORE during the search
* @param routeParams a set of parameters that can restrict the route search
* @param wr an object containing the ratios used to 'weight' edges when searching for the shortest path
* @return the computed route to the destination @targetNodeId
*/
def findRoute(g: DirectedGraph,
@@ -34,6 +34,9 @@ import scodec.{Attempt, Codec}
import scala.concurrent.duration._
import scala.compat.Platform

import scala.concurrent.duration._


/**
* Created by PM on 02/06/2017.
*/
@@ -19,6 +19,7 @@ package fr.acinq.eclair.channel
import fr.acinq.eclair.channel.Helpers.Funding
import org.scalatest.FunSuite

import scala.compat.Platform
import scala.concurrent.duration._

class HelpersSpec extends FunSuite {
@@ -46,5 +47,15 @@ class HelpersSpec extends FunSuite {
minDelay = 10 minutes) === (10 minutes))
}

test("compute refresh delay") {
import org.scalatest.Matchers._
implicit val log = akka.event.NoLogging
Helpers.nextChannelUpdateRefresh(1544400000).toSeconds should equal (0)
Helpers.nextChannelUpdateRefresh((Platform.currentTime.milliseconds - 9.days).toSeconds).toSeconds should equal (24 * 3600L +- 100)
Helpers.nextChannelUpdateRefresh((Platform.currentTime.milliseconds - 3.days).toSeconds).toSeconds should equal (7 * 24 * 3600L +- 100)
Helpers.nextChannelUpdateRefresh(Platform.currentTime.milliseconds.toSeconds).toSeconds should equal (10 * 24 * 3600L +- 100)

}

}

@@ -27,7 +27,7 @@ import fr.acinq.eclair.TestConstants.{Alice, Bob}
import fr.acinq.eclair.UInt64.Conversions._
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.blockchain.fee.FeeratesPerKw
import fr.acinq.eclair.channel.Channel.{RevocationTimeout, TickRefreshChannelUpdate}
import fr.acinq.eclair.channel.Channel.{BroadcastChannelUpdate, PeriodicRefresh, Reconnected, RevocationTimeout}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.channel.states.StateTestsHelperMethods
import fr.acinq.eclair.io.Peer
@@ -2085,7 +2085,7 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
alice2bob.expectMsg(annSigsA)
}

test("recv TickRefreshChannelUpdate", Tag("channels_public")) { f =>
test("recv BroadcastChannelUpdate", Tag("channels_public")) { f =>
import f._
val sender = TestProbe()
sender.send(alice, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
@@ -2096,42 +2096,103 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods {

// actual test starts here
Thread.sleep(1100)
sender.send(alice, TickRefreshChannelUpdate)
sender.send(alice, BroadcastChannelUpdate(PeriodicRefresh))
val update2 = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(update1.channelUpdate.timestamp < update2.channelUpdate.timestamp)
}

test("recv INPUT_DISCONNECTED", Tag("channels_public")) { f =>
test("recv BroadcastChannelUpdate (no changes)", Tag("channels_public")) { f =>
import f._
val sender = TestProbe()
sender.send(alice, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
sender.send(bob, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
bob2alice.expectMsgType[AnnouncementSignatures]
bob2alice.forward(alice)
val update1 = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(Announcements.isEnabled(update1.channelUpdate.channelFlags) == true)
channelUpdateListener.expectMsgType[LocalChannelUpdate]

// actual test starts here
Thread.sleep(1100)
sender.send(alice, BroadcastChannelUpdate(Reconnected))
channelUpdateListener.expectNoMsg(1 second)
}

test("recv INPUT_DISCONNECTED") { f =>
import f._
val sender = TestProbe()
sender.send(alice, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
val update1a = alice2bob.expectMsgType[ChannelUpdate]
assert(Announcements.isEnabled(update1a.channelFlags) == true)

// actual test starts here
sender.send(alice, INPUT_DISCONNECTED)
val update2 = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(update1.channelUpdate.timestamp < update2.channelUpdate.timestamp)
assert(Announcements.isEnabled(update2.channelUpdate.channelFlags) == false)
awaitCond(alice.stateName == OFFLINE)
alice2bob.expectNoMsg(1 second)
channelUpdateListener.expectNoMsg(1 second)
}

test("recv INPUT_DISCONNECTED (with pending unsigned htlcs)") { f =>
import f._
val sender = TestProbe()
sender.send(alice, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
val update1a = alice2bob.expectMsgType[ChannelUpdate]
assert(Announcements.isEnabled(update1a.channelFlags) == true)
val (_, htlc1) = addHtlc(10000, alice, bob, alice2bob, bob2alice)
val (_, htlc2) = addHtlc(10000, alice, bob, alice2bob, bob2alice)
val aliceData = alice.stateData.asInstanceOf[DATA_NORMAL]
assert(aliceData.commitments.localChanges.proposed.size == 2)

// actual test starts here
Thread.sleep(1100)
sender.send(alice, INPUT_DISCONNECTED)
assert(relayerA.expectMsgType[Status.Failure].cause.asInstanceOf[AddHtlcFailed].paymentHash === htlc1.paymentHash)
assert(relayerA.expectMsgType[Status.Failure].cause.asInstanceOf[AddHtlcFailed].paymentHash === htlc2.paymentHash)
val update2a = alice2bob.expectMsgType[ChannelUpdate]
assert(channelUpdateListener.expectMsgType[LocalChannelUpdate].channelUpdate === update2a)
assert(Announcements.isEnabled(update2a.channelFlags) == false)
awaitCond(alice.stateName == OFFLINE)
}

test("recv INPUT_DISCONNECTED (public channel)", Tag("channels_public")) { f =>
import f._
val sender = TestProbe()
sender.send(alice, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
sender.send(bob, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
bob2alice.expectMsgType[AnnouncementSignatures]
bob2alice.forward(alice)
val update1 = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(Announcements.isEnabled(update1.channelUpdate.channelFlags) == true)

// actual test starts here
sender.send(alice, INPUT_DISCONNECTED)
awaitCond(alice.stateName == OFFLINE)
channelUpdateListener.expectNoMsg(1 second)
}

test("recv INPUT_DISCONNECTED (public channel, with pending unsigned htlcs)", Tag("channels_public")) { f =>
import f._
val sender = TestProbe()
sender.send(alice, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
sender.send(bob, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
bob2alice.expectMsgType[AnnouncementSignatures]
bob2alice.forward(alice)
alice2bob.expectMsgType[AnnouncementSignatures]
alice2bob.forward(bob)
val update1a = channelUpdateListener.expectMsgType[LocalChannelUpdate]
val update1b = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(Announcements.isEnabled(update1a.channelUpdate.channelFlags) == true)
val (_, htlc1) = addHtlc(10000, alice, bob, alice2bob, bob2alice)
val (_, htlc2) = addHtlc(10000, alice, bob, alice2bob, bob2alice)
val aliceData = alice.stateData.asInstanceOf[DATA_NORMAL]
assert(aliceData.commitments.localChanges.proposed.size == 2)

// actual test starts here
Thread.sleep(1100)
sender.send(alice, INPUT_DISCONNECTED)
assert(relayerA.expectMsgType[Status.Failure].cause.asInstanceOf[AddHtlcFailed].paymentHash === htlc1.paymentHash)
assert(relayerA.expectMsgType[Status.Failure].cause.asInstanceOf[AddHtlcFailed].paymentHash === htlc2.paymentHash)
val update2a = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(update1a.channelUpdate.timestamp < update2a.channelUpdate.timestamp)
assert(Announcements.isEnabled(update2a.channelUpdate.channelFlags) == false)
awaitCond(alice.stateName == OFFLINE)
}

@@ -16,6 +16,7 @@

package fr.acinq.eclair.channel.states.e

import akka.actor.Status
import java.util.UUID

import akka.testkit.TestProbe
@@ -337,9 +338,8 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
awaitCond(alice.stateName == OFFLINE)
awaitCond(bob.stateName == OFFLINE)

// alice and bob announce that their channel is OFFLINE
assert(Announcements.isEnabled(channelUpdateListener.expectMsgType[LocalChannelUpdate].channelUpdate.channelFlags) == false)
assert(Announcements.isEnabled(channelUpdateListener.expectMsgType[LocalChannelUpdate].channelUpdate.channelFlags) == false)
// alice and bob will not announce that their channel is OFFLINE
channelUpdateListener.expectNoMsg(300 millis)

// we make alice update here relay fee
sender.send(alice, CMD_UPDATE_RELAY_FEE(4200, 123456))
@@ -358,8 +358,8 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
// note that we don't forward the channel_reestablish so that only alice reaches NORMAL state, it facilitates the test below
bob2alice.forward(alice)

// then alice reaches NORMAL state, and during the transition she broadcasts the channel_update
val channelUpdate = channelUpdateListener.expectMsgType[LocalChannelUpdate](10 seconds).channelUpdate
// then alice reaches NORMAL state, and after a delay she broadcasts the channel_update
val channelUpdate = channelUpdateListener.expectMsgType[LocalChannelUpdate](20 seconds).channelUpdate
assert(channelUpdate.feeBaseMsat === 4200)
assert(channelUpdate.feeProportionalMillionths === 123456)
assert(Announcements.isEnabled(channelUpdate.channelFlags) == true)
@@ -368,7 +368,7 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
channelUpdateListener.expectNoMsg(300 millis)
}

test("bump timestamp in case of quick reconnection") { f =>
test("broadcast disabled channel_update while offline") { f =>
import f._
val sender = TestProbe()

@@ -378,32 +378,17 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
awaitCond(alice.stateName == OFFLINE)
awaitCond(bob.stateName == OFFLINE)

// alice and bob announce that their channel is OFFLINE
val channelUpdate_alice_disabled = channelUpdateListener.expectMsgType[LocalChannelUpdate].channelUpdate
val channelUpdate_bob_disabled = channelUpdateListener.expectMsgType[LocalChannelUpdate].channelUpdate
assert(Announcements.isEnabled(channelUpdate_alice_disabled.channelFlags) == false)
assert(Announcements.isEnabled(channelUpdate_bob_disabled.channelFlags) == false)

// we immediately reconnect them
sender.send(alice, INPUT_RECONNECTED(alice2bob.ref, aliceInit, bobInit))
sender.send(bob, INPUT_RECONNECTED(bob2alice.ref, bobInit, aliceInit))

// peers exchange channel_reestablish messages
alice2bob.expectMsgType[ChannelReestablish]
bob2alice.expectMsgType[ChannelReestablish]
alice2bob.forward(bob)
bob2alice.forward(alice)

// both nodes reach NORMAL state, and broadcast a new channel_update
val channelUpdate_alice_enabled = channelUpdateListener.expectMsgType[LocalChannelUpdate].channelUpdate
val channelUpdate_bob_enabled = channelUpdateListener.expectMsgType[LocalChannelUpdate].channelUpdate
assert(Announcements.isEnabled(channelUpdate_alice_enabled.channelFlags) == true)
assert(Announcements.isEnabled(channelUpdate_bob_enabled.channelFlags) == true)
// alice and bob will not announce that their channel is OFFLINE
channelUpdateListener.expectNoMsg(300 millis)

// let's check that the two successive channel_update have a different timestamp
assert(channelUpdate_alice_enabled.timestamp > channelUpdate_alice_disabled.timestamp)
assert(channelUpdate_bob_enabled.timestamp > channelUpdate_bob_disabled.timestamp)
// we attempt to send a payment
sender.send(alice, CMD_ADD_HTLC(4200, randomBytes32, 123456, upstream = Left(UUID.randomUUID())))
val failure = sender.expectMsgType[Status.Failure]
val AddHtlcFailed(_, _, ChannelUnavailable(_), _, _, _) = failure.cause

// alice will broadcast a new disabled channel_update
val update = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(Announcements.isEnabled(update.channelUpdate.channelFlags) == false)
}

}

0 comments on commit fb84dfb

Please sign in to comment.
You can’t perform that action at this time.