Skip to content

Commit

Permalink
Add offer manager (#2566)
Browse files Browse the repository at this point in the history
Because offers are a very generic mechanism, handling them can require interacting with an inventory system (do we actually have the quantity that the payer is requesting) or other such systems which do not have their place inside eclair. For this reason offer handlers must be implemented as plugins that communicate with the offer manager.
On startup, the offer handlers must register their offers with the offer manager, the offer manager will then forward the invoice requests and blinded payments to the relevant offer handler for approval.
  • Loading branch information
thomash-acinq committed Mar 27, 2023
1 parent 732eb31 commit df0e712
Show file tree
Hide file tree
Showing 32 changed files with 1,452 additions and 660 deletions.
14 changes: 13 additions & 1 deletion docs/release-notes/eclair-vnext.md
Expand Up @@ -6,7 +6,8 @@

### Offers

Eclair now supports paying offers:
#### Paying offers

```shell
$ ./eclair-cli payoffer --offer=<offer-to-pay> --amountMsat=<amountToPay>
```
Expand All @@ -17,6 +18,17 @@ Eclair will request an invoice and pay it (assuming it matches our request) with

Offers are still experimental and some details could still change before they are widely supported.

#### Receiving payments for offers

To be able to receive payments for offers, you will need to use a plugin.
The plugin needs to create the offer and register a handler that will accept or reject the invoice requests and the payments.
Eclair will check that these satisfy all the protocol requirements and the handler only needs to consider whether the item on offer can be delivered or not.

Invoices generated for offers are not stored in the database to prevent a DoS vector.
Instead, all the relevant data (offer id, preimage, amount, quantity, creation date and payer id) is included in the blinded route that will be used for payment.
The handler can also add its own data.
All this data is signed and encrypted so that it can not be read or forged by the payer.

### API changes

- `audit` now accepts `--count` and `--skip` parameters to limit the number of retrieved items (#2474, #2487)
Expand Down
2 changes: 1 addition & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala
Expand Up @@ -279,7 +279,7 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {

override def receive(description: Either[String, ByteVector32], amount_opt: Option[MilliSatoshi], expire_opt: Option[Long], fallbackAddress_opt: Option[String], paymentPreimage_opt: Option[ByteVector32])(implicit timeout: Timeout): Future[Bolt11Invoice] = {
fallbackAddress_opt.map { fa => fr.acinq.eclair.addressToPublicKeyScript(fa, appKit.nodeParams.chainHash) } // if it's not a bitcoin address throws an exception
(appKit.paymentHandler ? ReceiveStandardPayment(amount_opt, description, expire_opt, fallbackAddress_opt = fallbackAddress_opt, paymentPreimage_opt = paymentPreimage_opt)).mapTo[Bolt11Invoice]
appKit.paymentHandler.toTyped.ask(ref => ReceiveStandardPayment(ref, amount_opt, description, expire_opt, fallbackAddress_opt = fallbackAddress_opt, paymentPreimage_opt = paymentPreimage_opt))
}

override def newAddress(): Future[String] = {
Expand Down
8 changes: 6 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Expand Up @@ -40,6 +40,7 @@ import fr.acinq.eclair.db.FileBackupHandler.FileBackupParams
import fr.acinq.eclair.db.{Databases, DbEventHandler, FileBackupHandler}
import fr.acinq.eclair.io.{ClientSpawner, Peer, PendingChannelsRateLimiter, Server, Switchboard}
import fr.acinq.eclair.message.Postman
import fr.acinq.eclair.payment.offer.OfferManager
import fr.acinq.eclair.payment.receive.PaymentHandler
import fr.acinq.eclair.payment.relay.{AsyncPaymentTriggerer, PostRestartHtlcCleaner, Relayer}
import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator}
Expand Down Expand Up @@ -349,7 +350,8 @@ class Setup(val datadir: File,
}
dbEventHandler = system.actorOf(SimpleSupervisor.props(DbEventHandler.props(nodeParams), "db-event-handler", SupervisorStrategy.Resume))
register = system.actorOf(SimpleSupervisor.props(Register.props(), "register", SupervisorStrategy.Resume))
paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register), "payment-handler", SupervisorStrategy.Resume))
offerManager = system.spawn(Behaviors.supervise(OfferManager(nodeParams, router, paymentTimeout = 1 minute)).onFailure(typed.SupervisorStrategy.resume), name = "offer-manager")
paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler", SupervisorStrategy.Resume))
triggerer = system.spawn(Behaviors.supervise(AsyncPaymentTriggerer()).onFailure(typed.SupervisorStrategy.resume), name = "async-payment-triggerer")
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, triggerer, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))
_ = relayer ! PostRestartHtlcCleaner.Init(channels)
Expand All @@ -372,7 +374,7 @@ class Setup(val datadir: File,
_ = triggerer ! AsyncPaymentTriggerer.Start(switchboard.toTyped)
balanceActor = system.spawn(BalanceActor(nodeParams.db, bitcoinClient, channelsListener, nodeParams.balanceCheckInterval), name = "balance-actor")

postman = system.spawn(Behaviors.supervise(Postman(nodeParams, switchboard.toTyped)).onFailure(typed.SupervisorStrategy.restart), name = "postman")
postman = system.spawn(Behaviors.supervise(Postman(nodeParams, switchboard.toTyped, offerManager)).onFailure(typed.SupervisorStrategy.restart), name = "postman")

kit = Kit(
nodeParams = nodeParams,
Expand All @@ -388,6 +390,7 @@ class Setup(val datadir: File,
channelsListener = channelsListener,
balanceActor = balanceActor,
postman = postman,
offerManager = offerManager,
wallet = bitcoinClient)

zmqBlockTimeout = after(5 seconds, using = system.scheduler)(Future.failed(BitcoinZMQConnectionTimeoutException))
Expand Down Expand Up @@ -456,6 +459,7 @@ case class Kit(nodeParams: NodeParams,
channelsListener: typed.ActorRef[ChannelsListener.Command],
balanceActor: typed.ActorRef[BalanceActor.Command],
postman: typed.ActorRef[Postman.Command],
offerManager: typed.ActorRef[OfferManager.Command],
wallet: OnChainWallet with OnchainPubkeyCache)

object Kit {
Expand Down
@@ -1,7 +1,6 @@
package fr.acinq.eclair.db

import com.google.common.util.concurrent.ThreadFactoryBuilder
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Crypto, Satoshi}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.db.Databases.{FileBackup, PostgresDatabases, SqliteDatabases}
Expand All @@ -13,7 +12,6 @@ import fr.acinq.eclair.router.Router
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAddress, NodeAnnouncement}
import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli}
import grizzled.slf4j.Logging
import scodec.bits.ByteVector

import java.io.File
import java.util.UUID
Expand Down Expand Up @@ -286,16 +284,16 @@ case class DualPaymentsDb(primary: PaymentsDb, secondary: PaymentsDb) extends Pa
primary.addIncomingPayment(pr, preimage, paymentType)
}

override def addIncomingBlindedPayment(pr: Bolt12Invoice, preimage: ByteVector32, pathIds: Map[PublicKey, ByteVector], paymentType: String): Unit = {
runAsync(secondary.addIncomingBlindedPayment(pr, preimage, pathIds, paymentType))
primary.addIncomingBlindedPayment(pr, preimage, pathIds, paymentType)
}

override def receiveIncomingPayment(paymentHash: ByteVector32, amount: MilliSatoshi, receivedAt: TimestampMilli): Boolean = {
runAsync(secondary.receiveIncomingPayment(paymentHash, amount, receivedAt))
primary.receiveIncomingPayment(paymentHash, amount, receivedAt)
}

override def receiveIncomingOfferPayment(pr: MinimalBolt12Invoice, preimage: ByteVector32, amount: MilliSatoshi, receivedAt: TimestampMilli, paymentType: String): Unit = {
runAsync(secondary.receiveIncomingOfferPayment(pr, preimage, amount, receivedAt, paymentType))
primary.receiveIncomingOfferPayment(pr, preimage, amount, receivedAt, paymentType)
}

override def getIncomingPayment(paymentHash: ByteVector32): Option[IncomingPayment] = {
runAsync(secondary.getIncomingPayment(paymentHash))
primary.getIncomingPayment(paymentHash)
Expand Down
34 changes: 8 additions & 26 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/PaymentsDb.scala
Expand Up @@ -21,7 +21,6 @@ import fr.acinq.bitcoin.scalacompat.Crypto.{PrivateKey, PublicKey}
import fr.acinq.eclair.payment._
import fr.acinq.eclair.router.Router.{BlindedHop, ChannelHop, Hop, NodeHop}
import fr.acinq.eclair.{MilliSatoshi, Paginated, ShortChannelId, TimestampMilli}
import scodec.bits.ByteVector

import java.util.UUID
import scala.util.Try
Expand All @@ -33,15 +32,18 @@ trait IncomingPaymentsDb {
/** Add a new expected standard incoming payment (not yet received). */
def addIncomingPayment(pr: Bolt11Invoice, preimage: ByteVector32, paymentType: String = PaymentType.Standard): Unit

/** Add a new expected blinded incoming payment (not yet received). */
def addIncomingBlindedPayment(pr: Bolt12Invoice, preimage: ByteVector32, pathIds: Map[PublicKey, ByteVector], paymentType: String = PaymentType.Blinded): Unit

/**
* Mark an incoming payment as received (paid). The received amount may exceed the invoice amount.
* If there was no matching invoice in the DB, this will return false.
*/
def receiveIncomingPayment(paymentHash: ByteVector32, amount: MilliSatoshi, receivedAt: TimestampMilli = TimestampMilli.now()): Boolean

/**
* Add a new incoming offer payment as received.
* If the invoice is already paid, adds `amount` to the amount paid.
*/
def receiveIncomingOfferPayment(pr: MinimalBolt12Invoice, preimage: ByteVector32, amount: MilliSatoshi, receivedAt: TimestampMilli = TimestampMilli.now(), paymentType: String = PaymentType.Blinded): Unit

/** Get information about the incoming payment (paid or not) for the given payment hash, if any. */
def getIncomingPayment(paymentHash: ByteVector32): Option[IncomingPayment]

Expand Down Expand Up @@ -128,15 +130,10 @@ case class IncomingStandardPayment(invoice: Bolt11Invoice,
createdAt: TimestampMilli,
status: IncomingPaymentStatus) extends IncomingPayment

/**
* A blinded incoming payment received by this node.
*
* @param pathIds map the last blinding point of a blinded path to the corresponding pathId.
*/
case class IncomingBlindedPayment(invoice: Bolt12Invoice,
/** A blinded incoming payment received by this node. */
case class IncomingBlindedPayment(invoice: MinimalBolt12Invoice,
paymentPreimage: ByteVector32,
paymentType: String,
pathIds: Map[PublicKey, ByteVector],
createdAt: TimestampMilli,
status: IncomingPaymentStatus) extends IncomingPayment

Expand Down Expand Up @@ -302,19 +299,4 @@ object PaymentsDb {
case Attempt.Failure(_) => Nil
}
}

private val pathIdCodec = (("blinding_key" | CommonCodecs.publicKey) :: ("path_id" | variableSizeBytes(uint16, bytes))).as[(PublicKey, ByteVector)]
private val pathIdsCodec = "path_ids" | listOfN(uint16, pathIdCodec)

def encodePathIds(pathIds: Map[PublicKey, ByteVector]): Array[Byte] = {
pathIdsCodec.encode(pathIds.toList).require.toByteArray
}

def decodePathIds(b: BitVector): Map[PublicKey, ByteVector] = {
pathIdsCodec.decode(b) match {
case Attempt.Successful(pathIds) => pathIds.value.toMap
case Attempt.Failure(_) => Map.empty
}
}

}
39 changes: 20 additions & 19 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPaymentsDb.scala
Expand Up @@ -26,7 +26,7 @@ import fr.acinq.eclair.db.pg.PgUtils.PgLock
import fr.acinq.eclair.payment._
import fr.acinq.eclair.{MilliSatoshi, Paginated, TimestampMilli, TimestampMilliLong}
import grizzled.slf4j.Logging
import scodec.bits.{BitVector, ByteVector}
import scodec.bits.BitVector

import java.sql.{Connection, ResultSet, Statement, Timestamp}
import java.time.Instant
Expand Down Expand Up @@ -268,21 +268,6 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
}
}

override def addIncomingBlindedPayment(invoice: Bolt12Invoice, preimage: ByteVector32, pathIds: Map[PublicKey, ByteVector], paymentType: String): Unit = withMetrics("payments/add-incoming-blinded", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("INSERT INTO payments.received (payment_hash, payment_preimage, path_ids, payment_type, payment_request, created_at, expire_at) VALUES (?, ?, ?, ?, ?, ?, ?)")) { statement =>
statement.setString(1, invoice.paymentHash.toHex)
statement.setString(2, preimage.toHex)
statement.setBytes(3, encodePathIds(pathIds))
statement.setString(4, paymentType)
statement.setString(5, invoice.toString)
statement.setTimestamp(6, invoice.createdAt.toSqlTimestamp)
statement.setTimestamp(7, (invoice.createdAt + invoice.relativeExpiry.toSeconds).toSqlTimestamp)
statement.executeUpdate()
}
}
}

override def receiveIncomingPayment(paymentHash: ByteVector32, amount: MilliSatoshi, receivedAt: TimestampMilli): Boolean = withMetrics("payments/receive-incoming", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("UPDATE payments.received SET (received_msat, received_at) = (? + COALESCE(received_msat, 0), ?) WHERE payment_hash = ?")) { update =>
Expand All @@ -295,6 +280,23 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
}
}

override def receiveIncomingOfferPayment(invoice: MinimalBolt12Invoice, preimage: ByteVector32, amount: MilliSatoshi, receivedAt: TimestampMilli, paymentType: String): Unit = withMetrics("payments/receive-incoming-offer", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("INSERT INTO payments.received (payment_hash, payment_preimage, payment_type, payment_request, created_at, expire_at, received_msat, received_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)" +
"ON CONFLICT (payment_hash) DO UPDATE SET (received_msat, received_at) = (payments.received.received_msat + EXCLUDED.received_msat, EXCLUDED.received_at)")) { statement =>
statement.setString(1, invoice.paymentHash.toHex)
statement.setString(2, preimage.toHex)
statement.setString(3, paymentType)
statement.setString(4, invoice.toString)
statement.setTimestamp(5, invoice.createdAt.toSqlTimestamp)
statement.setTimestamp(6, (invoice.createdAt + invoice.relativeExpiry.toSeconds).toSqlTimestamp)
statement.setLong(7, amount.toLong)
statement.setTimestamp(8, receivedAt.toSqlTimestamp)
statement.executeUpdate()
}
}
}

private def parseIncomingPayment(rs: ResultSet): Option[IncomingPayment] = {
val invoice = rs.getString("payment_request")
val preimage = rs.getByteVector32FromHex("payment_preimage")
Expand All @@ -304,10 +306,9 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
case Success(invoice: Bolt11Invoice) =>
val status = buildIncomingPaymentStatus(rs.getMilliSatoshiNullable("received_msat"), invoice, rs.getTimestampNullable("received_at").map(TimestampMilli.fromSqlTimestamp))
Some(IncomingStandardPayment(invoice, preimage, paymentType, createdAt, status))
case Success(invoice: Bolt12Invoice) =>
case Success(invoice: MinimalBolt12Invoice) =>
val status = buildIncomingPaymentStatus(rs.getMilliSatoshiNullable("received_msat"), invoice, rs.getTimestampNullable("received_at").map(TimestampMilli.fromSqlTimestamp))
val pathIds = decodePathIds(BitVector(rs.getBytes("path_ids")))
Some(IncomingBlindedPayment(invoice, preimage, paymentType, pathIds, createdAt, status))
Some(IncomingBlindedPayment(invoice, preimage, paymentType, createdAt, status))
case _ =>
logger.error(s"could not parse DB invoice=$invoice, this should not happen")
None
Expand Down
Expand Up @@ -26,7 +26,7 @@ import fr.acinq.eclair.db.sqlite.SqliteUtils._
import fr.acinq.eclair.payment._
import fr.acinq.eclair.{MilliSatoshi, Paginated, TimestampMilli, TimestampMilliLong}
import grizzled.slf4j.Logging
import scodec.bits.{BitVector, ByteVector}
import scodec.bits.BitVector

import java.sql.{Connection, ResultSet, Statement}
import java.util.UUID
Expand Down Expand Up @@ -279,19 +279,6 @@ class SqlitePaymentsDb(val sqlite: Connection) extends PaymentsDb with Logging {
}
}

override def addIncomingBlindedPayment(invoice: Bolt12Invoice, preimage: ByteVector32, pathIds: Map[PublicKey, ByteVector], paymentType: String): Unit = withMetrics("payments/add-incoming-blinded", DbBackends.Sqlite) {
using(sqlite.prepareStatement("INSERT INTO received_payments (payment_hash, payment_preimage, path_ids, payment_type, payment_request, created_at, expire_at) VALUES (?, ?, ?, ?, ?, ?, ?)")) { statement =>
statement.setBytes(1, invoice.paymentHash.toArray)
statement.setBytes(2, preimage.toArray)
statement.setBytes(3, encodePathIds(pathIds))
statement.setString(4, paymentType)
statement.setString(5, invoice.toString)
statement.setLong(6, invoice.createdAt.toTimestampMilli.toLong)
statement.setLong(7, (invoice.createdAt + invoice.relativeExpiry).toLong.seconds.toMillis)
statement.executeUpdate()
}
}

override def receiveIncomingPayment(paymentHash: ByteVector32, amount: MilliSatoshi, receivedAt: TimestampMilli): Boolean = withMetrics("payments/receive-incoming", DbBackends.Sqlite) {
using(sqlite.prepareStatement("UPDATE received_payments SET (received_msat, received_at) = (? + COALESCE(received_msat, 0), ?) WHERE payment_hash = ?")) { update =>
update.setLong(1, amount.toLong)
Expand All @@ -302,6 +289,27 @@ class SqlitePaymentsDb(val sqlite: Connection) extends PaymentsDb with Logging {
}
}

override def receiveIncomingOfferPayment(invoice: MinimalBolt12Invoice, preimage: ByteVector32, amount: MilliSatoshi, receivedAt: TimestampMilli, paymentType: String): Unit = withMetrics("payments/receive-incoming-offer", DbBackends.Sqlite) {
if (using(sqlite.prepareStatement("INSERT OR IGNORE INTO received_payments (payment_hash, payment_preimage, payment_type, payment_request, created_at, expire_at, received_msat, received_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)")) { statement =>
statement.setBytes(1, invoice.paymentHash.toArray)
statement.setBytes(2, preimage.toArray)
statement.setString(3, paymentType)
statement.setString(4, invoice.toString)
statement.setLong(5, invoice.createdAt.toTimestampMilli.toLong)
statement.setLong(6, (invoice.createdAt + invoice.relativeExpiry).toLong.seconds.toMillis)
statement.setLong(7, amount.toLong)
statement.setLong(8, receivedAt.toLong)
statement.executeUpdate()
} == 0) {
using(sqlite.prepareStatement("UPDATE received_payments SET (received_msat, received_at) = (received_msat + ?, ?) WHERE payment_hash = ?")) { statement =>
statement.setLong(1, amount.toLong)
statement.setLong(2, receivedAt.toLong)
statement.setBytes(3, invoice.paymentHash.toArray)
statement.executeUpdate()
}
}
}

private def parseIncomingPayment(rs: ResultSet): Option[IncomingPayment] = {
val invoice = rs.getString("payment_request")
val preimage = rs.getByteVector32("payment_preimage")
Expand All @@ -311,10 +319,9 @@ class SqlitePaymentsDb(val sqlite: Connection) extends PaymentsDb with Logging {
case Success(invoice: Bolt11Invoice) =>
val status = buildIncomingPaymentStatus(rs.getMilliSatoshiNullable("received_msat"), invoice, rs.getLongNullable("received_at").map(TimestampMilli(_)))
Some(IncomingStandardPayment(invoice, preimage, paymentType, createdAt, status))
case Success(invoice: Bolt12Invoice) =>
case Success(invoice: MinimalBolt12Invoice) =>
val status = buildIncomingPaymentStatus(rs.getMilliSatoshiNullable("received_msat"), invoice, rs.getLongNullable("received_at").map(TimestampMilli(_)))
val pathIds = decodePathIds(BitVector(rs.getBytes("path_ids")))
Some(IncomingBlindedPayment(invoice, preimage, paymentType, pathIds, createdAt, status))
Some(IncomingBlindedPayment(invoice, preimage, paymentType, createdAt, status))
case _ =>
logger.error(s"could not parse DB invoice=$invoice, this should not happen")
None
Expand Down

0 comments on commit df0e712

Please sign in to comment.