Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Smarter strategy for sending `channel_update`s #950

Merged
merged 21 commits into from Apr 29, 2019
Merged
Changes from 11 commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

Large diffs are not rendered by default.

@@ -21,6 +21,7 @@ import fr.acinq.bitcoin.Crypto.{Point, PrivateKey, PublicKey, Scalar, ripemd160,
import fr.acinq.bitcoin.Script._
import fr.acinq.bitcoin.{OutPoint, _}
import fr.acinq.eclair.blockchain.EclairWallet
import fr.acinq.eclair.channel.Channel.REFRESH_CHANNEL_UPDATE_INTERVAL
import fr.acinq.eclair.crypto.{Generators, KeyManager}
import fr.acinq.eclair.db.ChannelsDb
import fr.acinq.eclair.payment.{Local, Origin}
@@ -31,8 +32,9 @@ import fr.acinq.eclair.wire._
import fr.acinq.eclair.{Globals, NodeParams, ShortChannelId, addressToPublicKeyScript}
import scodec.bits.ByteVector

import scala.compat.Platform
import scala.concurrent.Await
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}

/**
@@ -152,6 +154,19 @@ object Helpers {
if (reserveToFundingRatio > nodeParams.maxReserveToFundingRatio) throw ChannelReserveTooHigh(open.temporaryChannelId, accept.channelReserveSatoshis, reserveToFundingRatio, nodeParams.maxReserveToFundingRatio)
}

/**
* Compute the delay until we need to refresh the channel_update for our channel not to be considered stale by
* other nodes.
*
* If current update more than [[Channel.REFRESH_CHANNEL_UPDATE_INTERVAL]] old then the delay will be zero.
*
* @param currentUpdateTimestamp
* @return the delay until the next update
*/
def nextChannelUpdateRefresh(currentUpdateTimestamp: Long): FiniteDuration = {
0.days.max(REFRESH_CHANNEL_UPDATE_INTERVAL - (Platform.currentTime.milliseconds - currentUpdateTimestamp.seconds))
}

/**
*
* @param remoteFeeratePerKw remote fee rate per kiloweight
@@ -17,7 +17,7 @@
package fr.acinq.eclair.payment

import akka.actor.{Actor, ActorLogging, Props, Status}
import fr.acinq.bitcoin.{ByteVector32, Crypto, MilliSatoshi}
import fr.acinq.bitcoin.{Crypto, MilliSatoshi}
import fr.acinq.eclair.channel.{CMD_FAIL_HTLC, CMD_FULFILL_HTLC, Channel}
import fr.acinq.eclair.db.IncomingPayment
import fr.acinq.eclair.payment.PaymentLifecycle.ReceivePayment
@@ -26,6 +26,7 @@ import fr.acinq.eclair.{Globals, NodeParams, randomBytes32}

import scala.compat.Platform
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}

/**
@@ -75,7 +76,7 @@ class LocalPaymentHandler(nodeParams: NodeParams) extends Actor with ActorLoggin
case _ =>
log.info(s"received payment for paymentHash=${htlc.paymentHash} amountMsat=${htlc.amountMsat}")
// amount is correct or was not specified in the payment request
nodeParams.db.payments.addIncomingPayment(IncomingPayment(htlc.paymentHash, htlc.amountMsat, Platform.currentTime / 1000))
nodeParams.db.payments.addIncomingPayment(IncomingPayment(htlc.paymentHash, htlc.amountMsat, Platform.currentTime.milliseconds.toSeconds))
sender ! CMD_FULFILL_HTLC(htlc.id, paymentPreimage, commit = true)
context.system.eventStream.publish(PaymentReceived(MilliSatoshi(htlc.amountMsat), htlc.paymentHash, htlc.channelId))
}
@@ -24,6 +24,7 @@ import scodec.bits.{BitVector, ByteVector}
import shapeless.HNil

import scala.compat.Platform
import scala.concurrent.duration._


/**
@@ -73,7 +74,7 @@ object Announcements {
)
}

def makeNodeAnnouncement(nodeSecret: PrivateKey, alias: String, color: Color, nodeAddresses: List[NodeAddress], timestamp: Long = Platform.currentTime / 1000): NodeAnnouncement = {
def makeNodeAnnouncement(nodeSecret: PrivateKey, alias: String, color: Color, nodeAddresses: List[NodeAddress], timestamp: Long = Platform.currentTime.milliseconds.toSeconds): NodeAnnouncement = {
require(alias.size <= 32)
val witness = nodeAnnouncementWitnessEncode(timestamp, nodeSecret.publicKey, color, alias, ByteVector.empty, nodeAddresses)
val sig = Crypto.encodeSignature(Crypto.sign(witness, nodeSecret)) :+ 1.toByte
@@ -119,7 +120,7 @@ object Announcements {

def makeChannelFlags(isNode1: Boolean, enable: Boolean): Byte = BitVector.bits(!enable :: !isNode1 :: Nil).padLeft(8).toByte()

def makeChannelUpdate(chainHash: ByteVector32, nodeSecret: PrivateKey, remoteNodeId: PublicKey, shortChannelId: ShortChannelId, cltvExpiryDelta: Int, htlcMinimumMsat: Long, feeBaseMsat: Long, feeProportionalMillionths: Long, htlcMaximumMsat: Long, enable: Boolean = true, timestamp: Long = Platform.currentTime / 1000): ChannelUpdate = {
def makeChannelUpdate(chainHash: ByteVector32, nodeSecret: PrivateKey, remoteNodeId: PublicKey, shortChannelId: ShortChannelId, cltvExpiryDelta: Int, htlcMinimumMsat: Long, feeBaseMsat: Long, feeProportionalMillionths: Long, htlcMaximumMsat: Long, enable: Boolean = true, timestamp: Long = Platform.currentTime.milliseconds.toSeconds): ChannelUpdate = {
val messageFlags = makeMessageFlags(hasOptionChannelHtlcMax = true) // NB: we always support option_channel_htlc_max
val channelFlags = makeChannelFlags(isNode1 = isNode1(nodeSecret.publicKey, remoteNodeId), enable = enable)
val htlcMaximumMsatOpt = Some(htlcMaximumMsat)
@@ -440,7 +440,7 @@ class Router(nodeParams: NodeParams, watcher: ActorRef, initialized: Option[Prom
// the first_timestamp field to the current date/time and timestamp_range to the maximum value
// NB: we can't just set firstTimestamp to 0, because in that case peer would send us all past messages matching
// that (i.e. the whole routing table)
val filter = GossipTimestampFilter(nodeParams.chainHash, firstTimestamp = Platform.currentTime / 1000, timestampRange = Int.MaxValue)
val filter = GossipTimestampFilter(nodeParams.chainHash, firstTimestamp = Platform.currentTime.milliseconds.toSeconds, timestampRange = Int.MaxValue)
remote ! filter

// clean our sync state for this peer: we receive a SendChannelQuery just when we connect/reconnect to a peer and
@@ -454,7 +454,7 @@ class Router(nodeParams: NodeParams, watcher: ActorRef, initialized: Option[Prom
stay

case Event(u: ChannelUpdate, d: Data) =>
// it was sent by us, routing messages that are sent by our peers are now wrapped in a PeerRoutingMessage
// it was sent by us (e.g. the payment lifecycle); routing messages that are sent by our peers are now wrapped in a PeerRoutingMessage
log.debug("received channel update from {}", sender)
stay using handle(u, sender, d)

@@ -738,7 +738,7 @@ object Router {
def toFakeUpdate(extraHop: ExtraHop): ChannelUpdate =
// the `direction` bit in flags will not be accurate but it doesn't matter because it is not used
// what matters is that the `disable` bit is 0 so that this update doesn't get filtered out
ChannelUpdate(signature = ByteVector.empty, chainHash = ByteVector32.Zeroes, extraHop.shortChannelId, Platform.currentTime / 1000, messageFlags = 0, channelFlags = 0, extraHop.cltvExpiryDelta, htlcMinimumMsat = 0L, extraHop.feeBaseMsat, extraHop.feeProportionalMillionths, None)
ChannelUpdate(signature = ByteVector.empty, chainHash = ByteVector32.Zeroes, extraHop.shortChannelId, Platform.currentTime.milliseconds.toSeconds, messageFlags = 0, channelFlags = 0, extraHop.cltvExpiryDelta, htlcMinimumMsat = 0L, extraHop.feeBaseMsat, extraHop.feeProportionalMillionths, None)

def toFakeUpdates(extraRoute: Seq[ExtraHop], targetNodeId: PublicKey): Map[ChannelDesc, ChannelUpdate] = {
// BOLT 11: "For each entry, the pubkey is the node ID of the start of the channel", and the last node is the destination
@@ -758,9 +758,9 @@ object Router {
def hasChannels(nodeId: PublicKey, channels: Iterable[ChannelAnnouncement]): Boolean = channels.exists(c => isRelatedTo(c, nodeId))

def isStale(u: ChannelUpdate): Boolean = {
// BOLT 7: "nodes MAY prune channels should the timestamp of the latest channel_update be older than 2 weeks (1209600 seconds)"
// BOLT 7: "nodes MAY prune channels should the timestamp of the latest channel_update be older than 2 weeks"
// but we don't want to prune brand new channels for which we didn't yet receive a channel update
val staleThresholdSeconds = Platform.currentTime / 1000 - 1209600
val staleThresholdSeconds = (Platform.currentTime.milliseconds - 14.days).toSeconds
u.timestamp < staleThresholdSeconds
}

@@ -843,7 +843,6 @@ object Router {
* @param extraEdges a set of extra edges we want to CONSIDER during the search
* @param ignoredEdges a set of extra edges we want to IGNORE during the search
* @param routeParams a set of parameters that can restrict the route search
* @param wr an object containing the ratios used to 'weight' edges when searching for the shortest path
* @return the computed route to the destination @targetNodeId
*/
def findRoute(g: DirectedGraph,
@@ -32,6 +32,9 @@ import scodec.bits.BitVector
import scodec.codecs._
import scodec.{Attempt, Codec}

import scala.concurrent.duration._


/**
* Created by PM on 02/06/2017.
*/
@@ -237,7 +240,7 @@ object ChannelCodecs extends Logging {
val DATA_WAIT_FOR_FUNDING_CONFIRMED_COMPAT_01_Codec: Codec[DATA_WAIT_FOR_FUNDING_CONFIRMED] = (
("commitments" | commitmentsCodec) ::
("fundingTx" | provide[Option[Transaction]](None)) ::
("waitingSince" | provide(compat.Platform.currentTime / 1000)) ::
("waitingSince" | provide(compat.Platform.currentTime.milliseconds.toSeconds)) ::
("deferred" | optional(bool, fundingLockedCodec)) ::
("lastSent" | either(bool, fundingCreatedCodec, fundingSignedCodec))).as[DATA_WAIT_FOR_FUNDING_CONFIRMED].decodeOnly

@@ -19,6 +19,7 @@ package fr.acinq.eclair.channel
import fr.acinq.eclair.channel.Helpers.Funding
import org.scalatest.FunSuite

import scala.compat.Platform
import scala.concurrent.duration._

class HelpersSpec extends FunSuite {
@@ -46,5 +47,14 @@ class HelpersSpec extends FunSuite {
minDelay = 10 minutes) === (10 minutes))
}

test("compute refresh delay") {
import org.scalatest.Matchers._
Helpers.nextChannelUpdateRefresh(1544400000).toSeconds should equal (0)
Helpers.nextChannelUpdateRefresh((Platform.currentTime.milliseconds - 9.days).toSeconds).toSeconds should equal (24 * 3600L +- 100)
Helpers.nextChannelUpdateRefresh((Platform.currentTime.milliseconds - 3.days).toSeconds).toSeconds should equal (7 * 24 * 3600L +- 100)
Helpers.nextChannelUpdateRefresh(Platform.currentTime.milliseconds.toSeconds).toSeconds should equal (10 * 24 * 3600L +- 100)

}

}

@@ -27,7 +27,7 @@ import fr.acinq.eclair.TestConstants.{Alice, Bob}
import fr.acinq.eclair.UInt64.Conversions._
import fr.acinq.eclair.blockchain._
import fr.acinq.eclair.blockchain.fee.FeeratesPerKw
import fr.acinq.eclair.channel.Channel.{RevocationTimeout, TickRefreshChannelUpdate}
import fr.acinq.eclair.channel.Channel.{BroadcastChannelUpdate, PeriodicRefresh, Reconnected, RevocationTimeout}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.channel.states.StateTestsHelperMethods
import fr.acinq.eclair.io.Peer
@@ -2085,7 +2085,7 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods {
alice2bob.expectMsg(annSigsA)
}

test("recv TickRefreshChannelUpdate", Tag("channels_public")) { f =>
test("recv BroadcastChannelUpdate", Tag("channels_public")) { f =>
import f._
val sender = TestProbe()
sender.send(alice, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
@@ -2096,42 +2096,107 @@ class NormalStateSpec extends TestkitBaseClass with StateTestsHelperMethods {

// actual test starts here
Thread.sleep(1100)
sender.send(alice, TickRefreshChannelUpdate)
sender.send(alice, BroadcastChannelUpdate(PeriodicRefresh))
val update2 = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(update1.channelUpdate.timestamp < update2.channelUpdate.timestamp)
}

test("recv INPUT_DISCONNECTED", Tag("channels_public")) { f =>
test("recv BroadcastChannelUpdate (two in a row)", Tag("channels_public")) { f =>
import f._
val sender = TestProbe()
sender.send(alice, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
sender.send(bob, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
bob2alice.expectMsgType[AnnouncementSignatures]
bob2alice.forward(alice)
val update1 = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(Announcements.isEnabled(update1.channelUpdate.channelFlags) == true)

// actual test starts here
Thread.sleep(1100)
sender.send(alice, INPUT_DISCONNECTED)
sender.send(alice, BroadcastChannelUpdate(PeriodicRefresh))
val update2 = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(update1.channelUpdate.timestamp < update2.channelUpdate.timestamp)
assert(Announcements.isEnabled(update2.channelUpdate.channelFlags) == false)
Thread.sleep(1100)
sender.send(alice, BroadcastChannelUpdate(Reconnected))
channelUpdateListener.expectNoMsg(1 second)
}

test("recv INPUT_DISCONNECTED") { f =>
import f._
val sender = TestProbe()
sender.send(alice, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
val update1a = alice2bob.expectMsgType[ChannelUpdate]
assert(Announcements.isEnabled(update1a.channelFlags) == true)

// actual test starts here
sender.send(alice, INPUT_DISCONNECTED)
awaitCond(alice.stateName == OFFLINE)
alice2bob.expectNoMsg(1 second)
channelUpdateListener.expectNoMsg(1 second)
}

test("recv INPUT_DISCONNECTED (with pending unsigned htlcs)") { f =>
import f._
val sender = TestProbe()
sender.send(alice, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
val update1a = alice2bob.expectMsgType[ChannelUpdate]
assert(Announcements.isEnabled(update1a.channelFlags) == true)
val (_, htlc1) = addHtlc(10000, alice, bob, alice2bob, bob2alice)
val (_, htlc2) = addHtlc(10000, alice, bob, alice2bob, bob2alice)
val aliceData = alice.stateData.asInstanceOf[DATA_NORMAL]
assert(aliceData.commitments.localChanges.proposed.size == 2)

// actual test starts here
Thread.sleep(1100)
sender.send(alice, INPUT_DISCONNECTED)
assert(relayerA.expectMsgType[Status.Failure].cause.asInstanceOf[AddHtlcFailed].paymentHash === htlc1.paymentHash)
assert(relayerA.expectMsgType[Status.Failure].cause.asInstanceOf[AddHtlcFailed].paymentHash === htlc2.paymentHash)
val update2a = alice2bob.expectMsgType[ChannelUpdate]
assert(channelUpdateListener.expectMsgType[LocalChannelUpdate].channelUpdate === update2a)
assert(Announcements.isEnabled(update2a.channelFlags) == false)
awaitCond(alice.stateName == OFFLINE)
}

test("recv INPUT_DISCONNECTED (public channel)", Tag("channels_public")) { f =>
import f._
val sender = TestProbe()
sender.send(alice, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
sender.send(bob, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
bob2alice.expectMsgType[AnnouncementSignatures]
bob2alice.forward(alice)
val update1 = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(Announcements.isEnabled(update1.channelUpdate.channelFlags) == true)

// actual test starts here
sender.send(alice, INPUT_DISCONNECTED)
awaitCond(alice.stateName == OFFLINE)
channelUpdateListener.expectNoMsg(1 second)
}

test("recv INPUT_DISCONNECTED (public channel, with pending unsigned htlcs)", Tag("channels_public")) { f =>
import f._
val sender = TestProbe()
sender.send(alice, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
sender.send(bob, WatchEventConfirmed(BITCOIN_FUNDING_DEEPLYBURIED, 400000, 42))
bob2alice.expectMsgType[AnnouncementSignatures]
bob2alice.forward(alice)
alice2bob.expectMsgType[AnnouncementSignatures]
alice2bob.forward(bob)
val update1a = channelUpdateListener.expectMsgType[LocalChannelUpdate]
val update1b = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(Announcements.isEnabled(update1a.channelUpdate.channelFlags) == true)
val (_, htlc1) = addHtlc(10000, alice, bob, alice2bob, bob2alice)
val (_, htlc2) = addHtlc(10000, alice, bob, alice2bob, bob2alice)
val aliceData = alice.stateData.asInstanceOf[DATA_NORMAL]
assert(aliceData.commitments.localChanges.proposed.size == 2)

// actual test starts here
Thread.sleep(1100)
sender.send(alice, INPUT_DISCONNECTED)
assert(relayerA.expectMsgType[Status.Failure].cause.asInstanceOf[AddHtlcFailed].paymentHash === htlc1.paymentHash)
assert(relayerA.expectMsgType[Status.Failure].cause.asInstanceOf[AddHtlcFailed].paymentHash === htlc2.paymentHash)
val update2a = channelUpdateListener.expectMsgType[LocalChannelUpdate]
assert(update1a.channelUpdate.timestamp < update2a.channelUpdate.timestamp)
assert(Announcements.isEnabled(update2a.channelUpdate.channelFlags) == false)
awaitCond(alice.stateName == OFFLINE)
}

ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.