Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Smarter strategy for sending `channel_update`s #950

Merged
merged 21 commits into from Apr 29, 2019
Merged
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

don't emit a `channel_update` at disconnection

Instead, wait for a payment to try to route through the channel and only
then reply with a disabled `channel_update` and broadcast it on the
network.

The reason in that in case of a disconnection, if noone cares about that
channel then there is no reason to tell everyone about its current
(disconnected) state.
  • Loading branch information...
pm47 committed Apr 20, 2019
commit 1bf43b2df1e72e3b8191d4ffe60b56ab675cf336
@@ -874,7 +874,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
goto(NORMAL) using store(d.copy(channelUpdate = channelUpdate)) replying "ok"

case Event(BroadcastChannelUpdate(reason), d: DATA_NORMAL) =>
log.info(s"sending channel_update announcement (reason=$reason)")
log.info(s"updating channel_update announcement (reason=$reason)")
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, d.channelUpdate.feeBaseMsat, d.channelUpdate.feeProportionalMillionths, d.commitments.localCommit.spec.totalFunds, enable = Helpers.aboveReserve(d.commitments))
// we use GOTO instead of stay because we want to fire transitions
goto(NORMAL) using store(d.copy(channelUpdate = channelUpdate))
@@ -886,15 +886,20 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case Event(WatchEventSpent(BITCOIN_FUNDING_SPENT, tx), d: DATA_NORMAL) => handleRemoteSpentOther(tx, d)

case Event(INPUT_DISCONNECTED, d: DATA_NORMAL) =>
// we disable the channel
log.debug(s"sending channel_update announcement (disable)")
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, d.channelUpdate.feeBaseMsat, d.channelUpdate.feeProportionalMillionths, d.commitments.localCommit.spec.totalFunds, enable = false)
// we cancel the timer that would have made us send the enabled update after reconnection (flappy channel protection)
cancelTimer(Reconnected.toString)
d.commitments.localChanges.proposed.collect {
case add: UpdateAddHtlc => relayer ! Status.Failure(AddHtlcFailed(d.channelId, add.paymentHash, ChannelUnavailable(d.channelId), d.commitments.originChannels(add.id), Some(channelUpdate), None))
// if we have pending unsigned htlcs, then we cancel them and advertise the fact that the channel is now disabled
val d1 = if (d.commitments.localChanges.proposed.collectFirst { case add: UpdateAddHtlc => add }.isDefined) {
log.info(s"updating channel_update announcement (reason=disabled)")
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, d.channelUpdate.feeBaseMsat, d.channelUpdate.feeProportionalMillionths, d.commitments.localCommit.spec.totalFunds, enable = false)
d.commitments.localChanges.proposed.collect {
case add: UpdateAddHtlc => relayer ! Status.Failure(AddHtlcFailed(d.channelId, add.paymentHash, ChannelUnavailable(d.channelId), d.commitments.originChannels(add.id), Some(channelUpdate), None))
}
d.copy(channelUpdate = channelUpdate)
} else {
d
}
goto(OFFLINE) using d.copy(channelUpdate = channelUpdate)
goto(OFFLINE) using d1

case Event(e: Error, d: DATA_NORMAL) => handleRemoteError(e, d)

@@ -1372,6 +1377,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
// -> in CLOSING we either have mutual closed (so no more htlcs), or already have unilaterally closed (so no action required), and we can't be in OFFLINE state anyway
handleLocalError(HtlcTimedout(d.channelId, d.commitments.timedoutOutgoingHtlcs(count)), d, Some(c))

case Event(c: CMD_ADD_HTLC, d: DATA_NORMAL) => handleAddDisconnected(c, d)

case Event(CMD_UPDATE_RELAY_FEE(feeBaseMsat, feeProportionalMillionths), d: DATA_NORMAL) =>
log.info(s"updating relay fees: prevFeeBaseMsat={} nextFeeBaseMsat={} prevFeeProportionalMillionths={} nextFeeProportionalMillionths={}", d.channelUpdate.feeBaseMsat, feeBaseMsat, d.channelUpdate.feeProportionalMillionths, feeProportionalMillionths)
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, feeBaseMsat, feeProportionalMillionths, d.commitments.localCommit.spec.totalFunds, enable = false)
@@ -1478,6 +1485,8 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
goto(NORMAL) using d.copy(commitments = commitments1)
}

case Event(c: CMD_ADD_HTLC, d: DATA_NORMAL) => handleAddDisconnected(c, d)

case Event(channelReestablish: ChannelReestablish, d: DATA_SHUTDOWN) =>
val commitments1 = handleSync(channelReestablish, d)
// BOLT 2: A node if it has sent a previous shutdown MUST retransmit shutdown.
@@ -1555,10 +1564,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case Event(c: CMD_ADD_HTLC, d: HasCommitments) =>
log.info(s"rejecting htlc request in state=$stateName")
val error = ChannelUnavailable(d.channelId)
d match {
case normal: DATA_NORMAL => handleCommandError(AddHtlcFailed(d.channelId, c.paymentHash, error, origin(c), Some(normal.channelUpdate), Some(c)), c) // can happen if we are in OFFLINE or SYNCING state (channelUpdate will have enable=false)
case _ => handleCommandError(AddHtlcFailed(d.channelId, c.paymentHash, error, origin(c), None, Some(c)), c) // we don't provide a channel_update: this will be a permanent channel failure
}
handleCommandError(AddHtlcFailed(d.channelId, c.paymentHash, error, origin(c), None, Some(c)), c) // we don't provide a channel_update: this will be a permanent channel failure

case Event(c: CMD_CLOSE, d) => handleCommandError(CommandUnavailableInThisState(Helpers.getChannelId(d), "close", stateName), c)

@@ -1629,7 +1635,7 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
case (_, _, d1: DATA_NORMAL, d2: DATA_NORMAL) if d1.channelUpdate == d2.channelUpdate && d1.channelAnnouncement == d2.channelAnnouncement =>
// don't do anything if neither the channel_update nor the channel_announcement didn't change
()
case (WAIT_FOR_FUNDING_LOCKED | NORMAL | SYNCING, NORMAL | OFFLINE, _, normal: DATA_NORMAL) =>
case (WAIT_FOR_FUNDING_LOCKED | NORMAL | OFFLINE | SYNCING, NORMAL | OFFLINE, _, normal: DATA_NORMAL) =>
// when we do WAIT_FOR_FUNDING_LOCKED->NORMAL or NORMAL->NORMAL or SYNCING->NORMAL or NORMAL->OFFLINE, we send out the new channel_update (most of the time it will just be to enable/disable the channel)
context.system.eventStream.publish(LocalChannelUpdate(self, normal.commitments.channelId, normal.shortChannelId, normal.commitments.remoteParams.nodeId, normal.channelAnnouncement, normal.channelUpdate, normal.commitments))
case (_, _, _: DATA_NORMAL, _: DATA_NORMAL) =>
@@ -1670,13 +1676,16 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
stay using newData replying "ok"
}

def handleCommandError(cause: Throwable, cmd: Command) = {
def handleCommandError(cause: Throwable, cmd: Command, newData_opt: Option[Data] = None) = {
log.error(s"${cause.getMessage} while processing cmd=${cmd.getClass.getSimpleName} in state=$stateName")
cause match {
case _: ChannelException => ()
case _ => log.error(cause, s"msg=$cmd stateData=$stateData ")
}
stay replying Status.Failure(cause)
newData_opt match {
case Some(newData) => stay using newData replying Status.Failure(cause)
case None => stay replying Status.Failure(cause)
}
}

def handleFundingPublishFailed(d: DATA_WAIT_FOR_FUNDING_CONFIRMED) = {
@@ -1707,6 +1716,24 @@ class Channel(val nodeParams: NodeParams, val wallet: EclairWallet, remoteNodeId
stay
}

def handleAddDisconnected(c: CMD_ADD_HTLC, d: DATA_NORMAL) = {
log.info(s"rejecting htlc request in state=$stateName")
// in order to reduce gossip spam, we don't disable the channel right away when disconnected
// we will only emit a new channel_update with the disable flag set if someone tries to use that channel
if (Announcements.isEnabled(d.channelUpdate.channelFlags)) {
// if the channel isn't disabled we generate a new channel_update
log.info(s"updating channel_update announcement (reason=disabled)")
val channelUpdate = Announcements.makeChannelUpdate(nodeParams.chainHash, nodeParams.privateKey, remoteNodeId, d.shortChannelId, d.channelUpdate.cltvExpiryDelta, d.channelUpdate.htlcMinimumMsat, d.channelUpdate.feeBaseMsat, d.channelUpdate.feeProportionalMillionths, d.commitments.localCommit.spec.totalFunds, enable = false)
// then we update the state and replay the request
self forward c
// we use goto to fire transitions
goto(stateName) using d.copy(channelUpdate = channelUpdate)
} else {
// channel is already disabled, we reply to the request
handleCommandError(AddHtlcFailed(d.channelId, c.paymentHash, ChannelUnavailable(d.channelId), origin(c), Some(d.channelUpdate), Some(c)), c) // can happen if we are in OFFLINE or SYNCING state (channelUpdate will have enable=false)
}
}

def handleLocalError(cause: Throwable, d: Data, msg: Option[Any]) = {
cause match {
case _: ForcedLocalCommit => log.warning(s"force-closing channel at user request")
@@ -2098,7 +2098,43 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
assert(update1.channelUpdate.timestamp < update2.channelUpdate.timestamp)
}

test("recv INPUT_DISCONNECTED", Tag("channels_public")) { f =>
test("recv INPUT_DISCONNECTED") { f =>
import f._
val sender = TestProbe()
sender.send(alice, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
val update1a = alice2bob.expectMsgType[ChannelUpdate]
assert(Announcements.isEnabled(update1a.channelFlags) == true)

// actual test starts here
sender.send(alice, INPUT_DISCONNECTED)
awaitCond(alice.stateName == OFFLINE)
alice2bob.expectNoMsg(1 second)
channelUpdateListener.expectNoMsg(1 second)
}

test("recv INPUT_DISCONNECTED (with pending unsigned htlcs)") { f =>
import f._
val sender = TestProbe()
sender.send(alice, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
val update1a = alice2bob.expectMsgType[ChannelUpdate]
assert(Announcements.isEnabled(update1a.channelFlags) == true)
val (_, htlc1) = addHtlc(10000, alice, bob, alice2bob, bob2alice)
val (_, htlc2) = addHtlc(10000, alice, bob, alice2bob, bob2alice)
val aliceData = alice.stateData.asInstanceOf[DATA_NORMAL]
assert(aliceData.commitments.localChanges.proposed.size == 2)

// actual test starts here
Thread.sleep(1100)
sender.send(alice, INPUT_DISCONNECTED)
assert(relayerA.expectMsgType[Status.Failure].cause.asInstanceOf[AddHtlcFailed].paymentHash === htlc1.paymentHash)
assert(relayerA.expectMsgType[Status.Failure].cause.asInstanceOf[AddHtlcFailed].paymentHash === htlc2.paymentHash)
val update2a = alice2bob.expectMsgType[ChannelUpdate]
assert(channelUpdateListener.expectMsgType[LocalChannelUpdate].channelUpdate === update2a)
assert(Announcements.isEnabled(update2a.channelFlags) == false)
awaitCond(alice.stateName == OFFLINE)
}

test("recv INPUT_DISCONNECTED (public channel)", Tag("channels_public")) { f =>
import f._
val sender = TestProbe()
sender.send(alice, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
@@ -2109,26 +2145,36 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
assert(Announcements.isEnabled(update1.channelUpdate.channelFlags) == true)

// actual test starts here
Thread.sleep(1100)
sender.send(alice, INPUT_DISCONNECTED)
val update2 = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(update1.channelUpdate.timestamp < update2.channelUpdate.timestamp)
assert(Announcements.isEnabled(update2.channelUpdate.channelFlags) == false)
awaitCond(alice.stateName == OFFLINE)
channelUpdateListener.expectNoMsg(1 second)
}

test("recv INPUT_DISCONNECTED (with pending unsigned htlcs)") { f =>
test("recv INPUT_DISCONNECTED (public channel, with pending unsigned htlcs)", Tag("channels_public")) { f =>
import f._
val sender = TestProbe()
sender.send(alice, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
sender.send(bob, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
bob2alice.expectMsgType[AnnouncementSignatures]
bob2alice.forward(alice)
alice2bob.expectMsgType[AnnouncementSignatures]
alice2bob.forward(bob)
val update1a = channelUpdateListener.expectMsgType[LocalChannelUpdate]
val update1b = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(Announcements.isEnabled(update1a.channelUpdate.channelFlags) == true)
val (_, htlc1) = addHtlc(10000, alice, bob, alice2bob, bob2alice)
val (_, htlc2) = addHtlc(10000, alice, bob, alice2bob, bob2alice)
val aliceData = alice.stateData.asInstanceOf[DATA_NORMAL]
assert(aliceData.commitments.localChanges.proposed.size == 2)

// actual test starts here
Thread.sleep(1100)
sender.send(alice, INPUT_DISCONNECTED)
assert(relayerA.expectMsgType[Status.Failure].cause.asInstanceOf[AddHtlcFailed].paymentHash === htlc1.paymentHash)
assert(relayerA.expectMsgType[Status.Failure].cause.asInstanceOf[AddHtlcFailed].paymentHash === htlc2.paymentHash)
val update2a = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(update1a.channelUpdate.timestamp < update2a.channelUpdate.timestamp)
assert(Announcements.isEnabled(update2a.channelUpdate.channelFlags) == false)
awaitCond(alice.stateName == OFFLINE)
}

@@ -335,9 +335,8 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
awaitCond(alice.stateName == OFFLINE)
awaitCond(bob.stateName == OFFLINE)

// alice and bob announce that their channel is OFFLINE
assert(Announcements.isEnabled(channelUpdateListener.expectMsgType[LocalChannelUpdate].channelUpdate.channelFlags) == false)
assert(Announcements.isEnabled(channelUpdateListener.expectMsgType[LocalChannelUpdate].channelUpdate.channelFlags) == false)
// alice and bob will not announce that their channel is OFFLINE
channelUpdateListener.expectNoMsg(300 millis)

// we make alice update here relay fee
sender.send(alice, CMD_UPDATE_RELAY_FEE(4200, 123456))
@@ -356,8 +355,8 @@ class OfflineStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
// note that we don't forward the channel_reestablish so that only alice reaches NORMAL state, it facilitates the test below
bob2alice.forward(alice)

// then alice reaches NORMAL state, and during the transition she broadcasts the channel_update
val channelUpdate = channelUpdateListener.expectMsgType[LocalChannelUpdate](10 seconds).channelUpdate
// then alice reaches NORMAL state, and after a delay she broadcasts the channel_update
val channelUpdate = channelUpdateListener.expectMsgType[LocalChannelUpdate](20 seconds).channelUpdate
assert(channelUpdate.feeBaseMsat === 4200)
assert(channelUpdate.feeProportionalMillionths === 123456)
assert(Announcements.isEnabled(channelUpdate.channelFlags) == true)
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.