Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Smarter strategy for sending `channel_update`s #950

Merged
merged 21 commits into from Apr 29, 2019
Merged
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

do not filter disabled updates in the router

We now do it directly at the channel level.
  • Loading branch information...
pm47 committed Apr 24, 2019
commit 965365bf78e11d8941d50263df28c26680ecfc57
@@ -19,7 +19,6 @@ package fr.acinq.eclair.router
import akka.Done
import akka.actor.{ActorRef, Props, Status}
import akka.event.Logging.MDC
import akka.event.LoggingAdapter
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.Script.{pay2wsh, write}
@@ -113,9 +112,6 @@ class Router(nodeParams: NodeParams, watcher: ActorRef, initialized: Option[Prom

import ExecutionContext.Implicits.global

// we pass these to helpers classes so that they have the logging context
implicit def implicitLog = log

context.system.eventStream.subscribe(self, classOf[LocalChannelUpdate])
context.system.eventStream.subscribe(self, classOf[LocalChannelDown])

@@ -648,16 +644,14 @@ class Router(nodeParams: NodeParams, watcher: ActorRef, initialized: Option[Prom
case true => d.graph.removeEdge(desc).addEdge(desc, u)
case false => d.graph.removeEdge(desc) // if the channel is now disabled, we remove it from the graph
}
val origin_opt = if (origin == self) None else Some(origin)
d.copy(updates = d.updates + (desc -> u), rebroadcast = maybeRebroadcast(d.rebroadcast, u, origin_opt), graph = graph1)
d.copy(updates = d.updates + (desc -> u), rebroadcast = d.rebroadcast.copy(updates = d.rebroadcast.updates + (u -> Set(origin))), graph = graph1)
} else {
log.debug("added channel_update for shortChannelId={} public={} flags={} {}", u.shortChannelId, publicChannel, u.channelFlags, u)
context.system.eventStream.publish(ChannelUpdatesReceived(u :: Nil))
db.addChannelUpdate(u)
// we also need to update the graph
val graph1 = d.graph.addEdge(desc, u)
val origin_opt = if (origin == self) None else Some(origin)
d.copy(updates = d.updates + (desc -> u), privateUpdates = d.privateUpdates - desc, rebroadcast = maybeRebroadcast(d.rebroadcast, u, origin_opt), graph = graph1)
d.copy(updates = d.updates + (desc -> u), privateUpdates = d.privateUpdates - desc, rebroadcast = d.rebroadcast.copy(updates = d.rebroadcast.updates + (u -> Set(origin))), graph = graph1)
}
} else if (d.awaiting.keys.exists(c => c.shortChannelId == u.shortChannelId)) {
// channel is currently being validated
@@ -763,28 +757,6 @@ object Router {

def hasChannels(nodeId: PublicKey, channels: Iterable[ChannelAnnouncement]): Boolean = channels.exists(c => isRelatedTo(c, nodeId))

/**
* Simple heuristic which will prevent the router from broadcasting disabled updates for local channels. The goal is to prevent
* sending a lot of updates for flappy channels.
*/
def maybeRebroadcast(r: Rebroadcast, u: ChannelUpdate, origin_opt: Option[ActorRef])(implicit log: LoggingAdapter): Rebroadcast = {
require(!r.updates.contains(u))
origin_opt match {
case Some(origin) =>
// we always rebroadcast external updates
log.info("rebroadcasting external update for shortChannelId={} u={}", u.shortChannelId, u)
r.copy(updates = r.updates + (u -> Set(origin)))
case None if !Announcements.isEnabled(u.channelFlags) =>
// we don't rebroadcast disabled updates for local channels
log.info("*not* rebroadcasting disabled update for shortChannelId={} u={}", u.shortChannelId, u)
r
case _ =>
// otherwise we rebroadcast the update
log.info("broadcasting local update for shortChannelId={} u={}", u.shortChannelId, u)
r.copy(updates = r.updates + (u -> Set.empty))
}
}

def isStale(u: ChannelUpdate): Boolean = {
// BOLT 7: "nodes MAY prune channels should the timestamp of the latest channel_update be older than 2 weeks (1209600 seconds)"
// but we don't want to prune brand new channels for which we didn't yet receive a channel update
@@ -16,6 +16,7 @@

package fr.acinq.eclair.channel.states.e

import akka.actor.Status
import akka.testkit.TestProbe
import fr.acinq.bitcoin.Crypto.Scalar
import fr.acinq.bitcoin.{ByteVector32, ScriptFlags, Transaction}
@@ -365,4 +366,27 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
channelUpdateListener.expectNoMsg(300 millis)
}

test("broadcast disabled channel_update while offline") { f =>
import f._
val sender = TestProbe()

// we simulate a disconnection
sender.send(alice, INPUT_DISCONNECTED)
sender.send(bob, INPUT_DISCONNECTED)
awaitCond(alice.stateName == OFFLINE)
awaitCond(bob.stateName == OFFLINE)

// alice and bob will not announce that their channel is OFFLINE
channelUpdateListener.expectNoMsg(300 millis)

// we attempt to send a payment
sender.send(alice, CMD_ADD_HTLC(4200, randomBytes32, 123456))
val failure = sender.expectMsgType[Status.Failure]
val AddHtlcFailed(_, _, ChannelUnavailable(_), _, _, _) = failure.cause

// alice doesn't broadcast the new channel_update yet
This conversation was marked as resolved by pm47

This comment has been minimized.

Copy link
@sstone

sstone Apr 25, 2019

Member

I don't understand this comment ?

val update = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(Announcements.isEnabled(update.channelUpdate.channelFlags) == false)
}

}
@@ -247,20 +247,10 @@ class IntegrationSpec extends TestKit(ActorSystem("test")) with BitcoindService
sender.send(bitcoincli, BitcoinReq("generate", 4))
sender.expectMsgType[JValue]
// A requires private channels, as a consequence:
// - only A and B know about channel A-B
// - A is not announced
// The case for updates is a bit complicated because not all of them are broadcast
// - for A and B:
// - one update per public channel (the side that is above reserve and is enabled)
// - two updates per local private channel (both sides because peers directly send it to each other)
// - for other nodes:
// - one update per public channel (the side that is above reserve and is enabled)
// - one update per incoming public channel (the local side that is below reserve, and won't be broadcast)
awaitAnnouncements(nodes.filterKeys(key => List("A", "B").contains(key)), 10, 12, 14)
awaitAnnouncements(nodes.filterKeys(key => List("C").contains(key)), 10, 12, 15)
awaitAnnouncements(nodes.filterKeys(key => List("D").contains(key)), 10, 12, 14)
awaitAnnouncements(nodes.filterKeys(key => List("E", "G").contains(key)), 10, 12, 13)
awaitAnnouncements(nodes.filterKeys(key => key.startsWith("F")), 10, 12, 13)
// - only A and B know about channel A-B (and there is no channel_announcement)
// - A is not announced (no node_announcement)
awaitAnnouncements(nodes.filterKeys(key => List("A", "B").contains(key)), 10, 12, 26)
awaitAnnouncements(nodes.filterKeys(key => !List("A", "B").contains(key)), 10, 12, 24)
}

test("send an HTLC A->D") {
@@ -333,8 +323,6 @@ class IntegrationSpec extends TestKit(ActorSystem("test")) with BitcoindService
sender.send(nodes("A").paymentInitiator, sendReq)
// A will first receive an error from C, then retry and route around C: A->B->E->C->D
sender.expectMsgType[PaymentSucceeded](5 seconds)
// some channels are now above reserve and will broadcast an update
awaitAnnouncements(nodes.filterKeys(_ == "A"), 10, 12, 16)
}

test("send an HTLC A->D with an unknown payment hash") {
@@ -410,8 +398,6 @@ class IntegrationSpec extends TestKit(ActorSystem("test")) with BitcoindService
sender.send(nodes("A").paymentInitiator, sendReq)
sender.expectMsgType[PaymentSucceeded]
}
// some channels are now above reserve and will broadcast an update
awaitAnnouncements(nodes.filterKeys(_ == "A"), 10, 12, 19)
}

test("send an HTLC A->B->G->C using heuristics to select the route") {
@@ -526,7 +512,7 @@ class IntegrationSpec extends TestKit(ActorSystem("test")) with BitcoindService
sender.expectMsgType[JValue](10 seconds)
// and we wait for C'channel to close
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 30 seconds)
awaitAnnouncements(nodes.filterKeys(_ == "A"), 9, 11, 18)
awaitAnnouncements(nodes.filterKeys(_ == "A"), 9, 11, 24)
}

test("propagate a fulfill upstream when a downstream htlc is redeemed on-chain (remote commit)") {
@@ -601,7 +587,7 @@ class IntegrationSpec extends TestKit(ActorSystem("test")) with BitcoindService
sender.expectMsgType[JValue](10 seconds)
// and we wait for C'channel to close
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 30 seconds)
awaitAnnouncements(nodes.filterKeys(_ == "A"), 8, 10, 17)
awaitAnnouncements(nodes.filterKeys(_ == "A"), 8, 10, 22)
}

test("propagate a failure upstream when a downstream htlc times out (local commit)") {
@@ -661,7 +647,7 @@ class IntegrationSpec extends TestKit(ActorSystem("test")) with BitcoindService
sender.expectMsgType[JValue](10 seconds)
// and we wait for C'channel to close
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 30 seconds)
awaitAnnouncements(nodes.filterKeys(_ == "A"), 7, 9, 16)
awaitAnnouncements(nodes.filterKeys(_ == "A"), 7, 9, 20)
}

test("propagate a failure upstream when a downstream htlc times out (remote commit)") {
@@ -724,7 +710,7 @@ class IntegrationSpec extends TestKit(ActorSystem("test")) with BitcoindService
sender.expectMsgType[JValue](10 seconds)
// and we wait for C'channel to close
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 30 seconds)
awaitAnnouncements(nodes.filterKeys(_ == "A"), 6, 8, 15)
awaitAnnouncements(nodes.filterKeys(_ == "A"), 6, 8, 18)
}

test("punish a node that has published a revoked commit tx") {
@@ -849,7 +835,7 @@ class IntegrationSpec extends TestKit(ActorSystem("test")) with BitcoindService
// and we wait for C'channel to close
awaitCond(stateListener.expectMsgType[ChannelStateChanged].currentState == CLOSED, max = 30 seconds)
// this will remove the channel
awaitAnnouncements(nodes.filterKeys(_ == "A"), 5, 7, 14)
awaitAnnouncements(nodes.filterKeys(_ == "A"), 5, 7, 16)
}

test("generate and validate lots of channels") {
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.