Skip to content

Commit

Permalink
[ChannelRelay] Prioritize lowest capacity channels (#1539)
Browse files Browse the repository at this point in the history
* [ChannelRelayer] Expose Wrapped messages to tests

Exposing the private wrapped messages to tests allows removing the
dependency on capturing logs which felt very brittle.

* Prioritize low capacity channels during relay

This makes it more difficult for attackers to "squat" high-capacity channels
by sending HTLCs and then hodling them.

It results in less locked liquidity during this kind of attacks.
  • Loading branch information
t-bast committed Oct 2, 2020
1 parent 1274168 commit 382868d
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 158 deletions.
Expand Up @@ -131,7 +131,7 @@ class ChannelRelay private(nodeParams: NodeParams,
Metrics.recordPaymentRelayFailed(Tags.FailureType(cmdFail), Tags.RelayType.Channel)
safeSendAndStop(o.add.channelId, cmdFail)

case WrappedAddResponse(addFailed@RES_ADD_FAILED(CMD_ADD_HTLC(_, _, _, _, _, Origin.ChannelRelayedHot(_, add, _), _), _, _)) =>
case WrappedAddResponse(addFailed@RES_ADD_FAILED(CMD_ADD_HTLC(_, _, _, _, _, _: Origin.ChannelRelayedHot, _), _, _)) =>
context.log.info("attempt failed with reason={}", addFailed.t.getClass.getSimpleName)
context.self ! DoRelay
relay(previousFailures :+ PreviouslyTried(selectedShortChannelId, addFailed))
Expand Down Expand Up @@ -191,32 +191,37 @@ class ChannelRelay private(nodeParams: NodeParams,
private val nextNodeId_opt = channels.headOption.map(_._2.nextNodeId)

/**
* Select a channel to the same node to relay the payment to, that has the lowest balance and is compatible in
* terms of fees, expiry_delta, etc.
* Select a channel to the same node to relay the payment to, that has the lowest capacity and balance and is
* compatible in terms of fees, expiry_delta, etc.
*
* If no suitable channel is found we default to the originally requested channel.
*/
def selectPreferredChannel(alreadyTried: Seq[ShortChannelId]): Option[ShortChannelId] = {
val requestedShortChannelId = r.payload.outgoingChannelId
context.log.debug("selecting next channel")
nextNodeId_opt match {
case Some(nextNodeId) =>
case Some(_) =>
// we then filter out channels that we have already tried
val candidateChannels: Map[ShortChannelId, OutgoingChannel] = channels -- alreadyTried
// and we filter again to keep the ones that are compatible with this payment (mainly fees, expiry delta)
candidateChannels
.map { case (shortChannelId, channelInfo) =>
val relayResult = relayOrFail(Some(channelInfo.channelUpdate))
context.log.debug(s"candidate channel: shortChannelId={} balanceMsat={} channelUpdate={} relayResult={}", shortChannelId, channelInfo.commitments.availableBalanceForSend, channelInfo.channelUpdate, relayResult)
context.log.debug(s"candidate channel: shortChannelId={} balanceMsat={} capacitySat={} channelUpdate={} relayResult={}", shortChannelId, channelInfo.commitments.availableBalanceForSend, channelInfo.commitments.commitInput.txOut.amount, channelInfo.channelUpdate, relayResult)
(shortChannelId, channelInfo, relayResult)
}
.collect { case (shortChannelId, channelInfo, _: RelaySuccess) => (shortChannelId, channelInfo.commitments.availableBalanceForSend) }
.filter(_._2 > r.payload.amountToForward) // we only keep channels that have enough balance to handle this payment
.collect {
// we only keep channels that have enough balance to handle this payment
case (shortChannelId, channelInfo, _: RelaySuccess) if channelInfo.commitments.availableBalanceForSend > r.payload.amountToForward => (shortChannelId, channelInfo.commitments)
}
.toList // needed for ordering
.sortBy(_._2) // we want to use the channel with the lowest available balance that can process the payment
// we want to use the channel with:
// - the lowest available capacity to ensure we keep high-capacity channels for big payments
// - the lowest available balance to increase our incoming liquidity
.sortBy { case (_, commitments) => (commitments.commitInput.txOut.amount, commitments.availableBalanceForSend) }
.headOption match {
case Some((preferredShortChannelId, availableBalanceMsat)) if preferredShortChannelId != requestedShortChannelId =>
context.log.info("replacing requestedShortChannelId={} by preferredShortChannelId={} with availableBalanceMsat={}", requestedShortChannelId, preferredShortChannelId, availableBalanceMsat)
case Some((preferredShortChannelId, commitments)) if preferredShortChannelId != requestedShortChannelId =>
context.log.info("replacing requestedShortChannelId={} by preferredShortChannelId={} with availableBalanceMsat={}", requestedShortChannelId, preferredShortChannelId, commitments.availableBalanceForSend)
Some(preferredShortChannelId)
case Some(_) =>
// the requested short_channel_id is already our preferred channel
Expand Down
Expand Up @@ -44,15 +44,14 @@ object ChannelRelayer {
sealed trait Command
case class GetOutgoingChannels(replyTo: ActorRef, getOutgoingChannels: Relayer.GetOutgoingChannels) extends Command
case class Relay(channelRelayPacket: IncomingPacket.ChannelRelayPacket) extends Command
private case class WrappedLocalChannelUpdate(localChannelUpdate: LocalChannelUpdate) extends Command
private case class WrappedLocalChannelDown(localChannelDown: LocalChannelDown) extends Command
private case class WrappedAvailableBalanceChanged(availableBalanceChanged: AvailableBalanceChanged) extends Command
private case class WrappedShortChannelIdAssigned(shortChannelIdAssigned: ShortChannelIdAssigned) extends Command
private[payment] case class WrappedLocalChannelUpdate(localChannelUpdate: LocalChannelUpdate) extends Command
private[payment] case class WrappedLocalChannelDown(localChannelDown: LocalChannelDown) extends Command
private[payment] case class WrappedAvailableBalanceChanged(availableBalanceChanged: AvailableBalanceChanged) extends Command
private[payment] case class WrappedShortChannelIdAssigned(shortChannelIdAssigned: ShortChannelIdAssigned) extends Command
// @formatter:on

def mdc: Command => Map[String, String] = {
case c: Relay => Logs.mdc(
paymentHash_opt = Some(c.channelRelayPacket.add.paymentHash))
case c: Relay => Logs.mdc(paymentHash_opt = Some(c.channelRelayPacket.add.paymentHash))
case _ => Map.empty
}

Expand All @@ -62,73 +61,72 @@ object ChannelRelayer {
def apply(nodeParams: NodeParams,
register: ActorRef,
channelUpdates: ChannelUpdates = Map.empty,
node2channels: NodeChannels = mutable.MultiDict.empty[PublicKey, ShortChannelId]
): Behavior[Command] =
node2channels: NodeChannels = mutable.MultiDict.empty[PublicKey, ShortChannelId]): Behavior[Command] =
Behaviors.setup { context =>
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[LocalChannelUpdate](WrappedLocalChannelUpdate))
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[LocalChannelDown](WrappedLocalChannelDown))
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[AvailableBalanceChanged](WrappedAvailableBalanceChanged))
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[ShortChannelIdAssigned](WrappedShortChannelIdAssigned))
context.messageAdapter[IncomingPacket.ChannelRelayPacket](Relay)
Behaviors.withMdc(Logs.mdc(category_opt = Some(Logs.LogCategory.PAYMENT)), mdc) {
Behaviors.receiveMessage {
case Relay(channelRelayPacket) =>
val relayId = UUID.randomUUID()
val nextNodeId_opt: Option[PublicKey] = channelUpdates.get(channelRelayPacket.payload.outgoingChannelId) match {
case Some(channel) => Some(channel.nextNodeId)
case None => None
}
val channels: Map[ShortChannelId, Relayer.OutgoingChannel] = nextNodeId_opt match {
case Some(nextNodeId) => node2channels.get(nextNodeId).map(channelUpdates).map(c => c.channelUpdate.shortChannelId -> c).toMap
case None => Map.empty
}
Behaviors.receiveMessage {
case Relay(channelRelayPacket) =>
val relayId = UUID.randomUUID()
val nextNodeId_opt: Option[PublicKey] = channelUpdates.get(channelRelayPacket.payload.outgoingChannelId) match {
case Some(channel) => Some(channel.nextNodeId)
case None => None
}
val channels: Map[ShortChannelId, Relayer.OutgoingChannel] = nextNodeId_opt match {
case Some(nextNodeId) => node2channels.get(nextNodeId).map(channelUpdates).map(c => c.channelUpdate.shortChannelId -> c).toMap
case None => Map.empty
}
context.log.debug(s"spawning a new handler with relayId=$relayId to nextNodeId={} with channels={}", nextNodeId_opt.getOrElse(""), channels.keys.mkString(","))
context.spawn(ChannelRelay.apply(nodeParams, register, channels, relayId, channelRelayPacket), name = relayId.toString)
Behaviors.same
context.spawn(ChannelRelay.apply(nodeParams, register, channels, relayId, channelRelayPacket), name = relayId.toString)
Behaviors.same

case GetOutgoingChannels(replyTo, Relayer.GetOutgoingChannels(enabledOnly)) =>
val channels = if (enabledOnly) {
channelUpdates.values.filter(o => Announcements.isEnabled(o.channelUpdate.channelFlags))
} else {
channelUpdates.values
}
replyTo ! Relayer.OutgoingChannels(channels.toSeq)
Behaviors.same
case GetOutgoingChannels(replyTo, Relayer.GetOutgoingChannels(enabledOnly)) =>
val channels = if (enabledOnly) {
channelUpdates.values.filter(o => Announcements.isEnabled(o.channelUpdate.channelFlags))
} else {
channelUpdates.values
}
replyTo ! Relayer.OutgoingChannels(channels.toSeq)
Behaviors.same

case WrappedLocalChannelUpdate(LocalChannelUpdate(_, channelId, shortChannelId, remoteNodeId, _, channelUpdate, commitments)) =>
context.log.debug(s"updating local channel info for channelId=$channelId shortChannelId=$shortChannelId remoteNodeId=$remoteNodeId channelUpdate={} commitments={}", channelUpdate, commitments)
val channelUpdates1 = channelUpdates + (channelUpdate.shortChannelId -> Relayer.OutgoingChannel(remoteNodeId, channelUpdate, commitments))
val node2channels1 = node2channels.addOne(remoteNodeId, channelUpdate.shortChannelId)
apply(nodeParams, register, channelUpdates1, node2channels1)
case WrappedLocalChannelUpdate(LocalChannelUpdate(_, channelId, shortChannelId, remoteNodeId, _, channelUpdate, commitments)) =>
context.log.debug(s"updating local channel info for channelId=$channelId shortChannelId=$shortChannelId remoteNodeId=$remoteNodeId channelUpdate={} commitments={}", channelUpdate, commitments)
val channelUpdates1 = channelUpdates + (channelUpdate.shortChannelId -> Relayer.OutgoingChannel(remoteNodeId, channelUpdate, commitments))
val node2channels1 = node2channels.addOne(remoteNodeId, channelUpdate.shortChannelId)
apply(nodeParams, register, channelUpdates1, node2channels1)

case WrappedLocalChannelDown(LocalChannelDown(_, channelId, shortChannelId, remoteNodeId)) =>
context.log.debug(s"removed local channel info for channelId=$channelId shortChannelId=$shortChannelId")
val node2channels1 = node2channels.subtractOne(remoteNodeId, shortChannelId)
apply(nodeParams, register, channelUpdates - shortChannelId, node2channels1)
case WrappedLocalChannelDown(LocalChannelDown(_, channelId, shortChannelId, remoteNodeId)) =>
context.log.debug(s"removed local channel info for channelId=$channelId shortChannelId=$shortChannelId")
val node2channels1 = node2channels.subtractOne(remoteNodeId, shortChannelId)
apply(nodeParams, register, channelUpdates - shortChannelId, node2channels1)

case WrappedAvailableBalanceChanged(AvailableBalanceChanged(_, channelId, shortChannelId, commitments)) =>
val channelUpdates1 = channelUpdates.get(shortChannelId) match {
case Some(c: Relayer.OutgoingChannel) =>
context.log.debug(s"available balance changed for channelId=$channelId shortChannelId=$shortChannelId availableForSend={} availableForReceive={}", commitments.availableBalanceForSend, commitments.availableBalanceForReceive)
channelUpdates + (shortChannelId -> c.copy(commitments = commitments))
case None => channelUpdates // we only consider the balance if we have the channel_update
}
apply(nodeParams, register, channelUpdates1, node2channels)
case WrappedAvailableBalanceChanged(AvailableBalanceChanged(_, channelId, shortChannelId, commitments)) =>
val channelUpdates1 = channelUpdates.get(shortChannelId) match {
case Some(c: Relayer.OutgoingChannel) =>
context.log.debug(s"available balance changed for channelId=$channelId shortChannelId=$shortChannelId availableForSend={} availableForReceive={}", commitments.availableBalanceForSend, commitments.availableBalanceForReceive)
channelUpdates + (shortChannelId -> c.copy(commitments = commitments))
case None => channelUpdates // we only consider the balance if we have the channel_update
}
apply(nodeParams, register, channelUpdates1, node2channels)

case WrappedShortChannelIdAssigned(ShortChannelIdAssigned(_, channelId, shortChannelId, previousShortChannelId_opt)) =>
val (channelUpdates1, node2channels1) = previousShortChannelId_opt match {
case Some(previousShortChannelId) if previousShortChannelId != shortChannelId =>
context.log.debug(s"shortChannelId changed for channelId=$channelId ($previousShortChannelId->$shortChannelId, probably due to chain re-org)")
// We simply remove the old entry: we should receive a LocalChannelUpdate with the new shortChannelId shortly.
val node2channels1 = channelUpdates.get(previousShortChannelId).map(_.nextNodeId) match {
case Some(remoteNodeId) => node2channels.subtractOne(remoteNodeId, previousShortChannelId)
case None => node2channels
}
(channelUpdates - previousShortChannelId, node2channels1)
case _ => (channelUpdates, node2channels)
}
apply(nodeParams, register, channelUpdates1, node2channels1)
}
case WrappedShortChannelIdAssigned(ShortChannelIdAssigned(_, channelId, shortChannelId, previousShortChannelId_opt)) =>
val (channelUpdates1, node2channels1) = previousShortChannelId_opt match {
case Some(previousShortChannelId) if previousShortChannelId != shortChannelId =>
context.log.debug(s"shortChannelId changed for channelId=$channelId ($previousShortChannelId->$shortChannelId, probably due to chain re-org)")
// We simply remove the old entry: we should receive a LocalChannelUpdate with the new shortChannelId shortly.
val node2channels1 = channelUpdates.get(previousShortChannelId).map(_.nextNodeId) match {
case Some(remoteNodeId) => node2channels.subtractOne(remoteNodeId, previousShortChannelId)
case None => node2channels
}
(channelUpdates - previousShortChannelId, node2channels1)
case _ => (channelUpdates, node2channels)
}
apply(nodeParams, register, channelUpdates1, node2channels1)
}
}
}
}
Expand Up @@ -21,16 +21,16 @@ import java.util.UUID
import akka.actor.ActorRef
import fr.acinq.bitcoin.Crypto.PublicKey
import fr.acinq.bitcoin.DeterministicWallet.ExtendedPrivateKey
import fr.acinq.bitcoin.{Block, ByteVector32, Crypto, DeterministicWallet}
import fr.acinq.bitcoin.{Block, ByteVector32, Crypto, DeterministicWallet, OutPoint, Satoshi, TxOut}
import fr.acinq.eclair.FeatureSupport.Optional
import fr.acinq.eclair.Features._
import fr.acinq.eclair.channel._
import fr.acinq.eclair.crypto.Sphinx
import fr.acinq.eclair.payment.IncomingPacket.{ChannelRelayPacket, FinalPacket, NodeRelayPacket, decrypt}
import fr.acinq.eclair.payment.OutgoingPacket._
import fr.acinq.eclair.payment.PaymentRequest.PaymentRequestFeatures
import fr.acinq.eclair.payment.OutgoingPacket.Upstream
import fr.acinq.eclair.router.Router.{ChannelHop, NodeHop}
import fr.acinq.eclair.transactions.Transactions.InputInfo
import fr.acinq.eclair.wire.Onion.{FinalLegacyPayload, FinalTlvPayload, RelayLegacyPayload}
import fr.acinq.eclair.wire.OnionTlv.{AmountToForward, OutgoingCltv, PaymentData}
import fr.acinq.eclair.wire._
Expand Down Expand Up @@ -396,9 +396,10 @@ object PaymentPacketSpec {
packetType.create(sessionKey, nodes, payloadsBin, associatedData).packet
}

def makeCommitments(channelId: ByteVector32, testAvailableBalanceForSend: MilliSatoshi = 50000000 msat, testAvailableBalanceForReceive: MilliSatoshi = 50000000 msat): Commitments = {
def makeCommitments(channelId: ByteVector32, testAvailableBalanceForSend: MilliSatoshi = 50000000 msat, testAvailableBalanceForReceive: MilliSatoshi = 50000000 msat, testCapacity: Satoshi = 100000 sat): Commitments = {
val params = LocalParams(null, null, null, null, null, null, null, 0, isFunder = true, null, None, null)
new Commitments(ChannelVersion.STANDARD, params, null, 0.toByte, null, null, null, null, 0, 0, Map.empty, null, null, null, channelId) {
val commitInput = InputInfo(OutPoint(randomBytes32, 1), TxOut(testCapacity, Nil), Nil)
new Commitments(ChannelVersion.STANDARD, params, null, 0.toByte, null, null, null, null, 0, 0, Map.empty, null, commitInput, null, channelId) {
override lazy val availableBalanceForSend: MilliSatoshi = testAvailableBalanceForSend.max(0 msat)
override lazy val availableBalanceForReceive: MilliSatoshi = testAvailableBalanceForReceive.max(0 msat)
}
Expand Down

0 comments on commit 382868d

Please sign in to comment.