Skip to content

Commit

Permalink
Add offer manager
Browse files Browse the repository at this point in the history
Because offers are a very generic mechanism, handling them can require interacting with an inventory system (do we actually have the quantity that the payer is requesting) or other such systems which do not have their place inside eclair. For this reason offer handlers must be implemented as plugins that communicate with the offer manager.
On startup, the offer handlers must register their offers with the offer manager, the offer manager will then forward the invoice requests and blinded payments to the relevant offer handler for approval.
  • Loading branch information
thomash-acinq committed Jan 20, 2023
1 parent d495617 commit fee48ff
Show file tree
Hide file tree
Showing 27 changed files with 839 additions and 282 deletions.
3 changes: 2 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Eclair.scala
Expand Up @@ -57,6 +57,7 @@ import java.nio.charset.StandardCharsets
import java.util.UUID
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.Try

case class GetInfoResponse(version: String, nodeId: PublicKey, alias: String, color: String, features: Features[Feature], chainHash: ByteVector32, network: String, blockHeight: Int, publicAddresses: Seq[NodeAddress], onionAddress: Option[NodeAddress], instanceId: String)

Expand Down Expand Up @@ -281,7 +282,7 @@ class EclairImpl(appKit: Kit) extends Eclair with Logging {

override def receive(description: Either[String, ByteVector32], amount_opt: Option[MilliSatoshi], expire_opt: Option[Long], fallbackAddress_opt: Option[String], paymentPreimage_opt: Option[ByteVector32])(implicit timeout: Timeout): Future[Bolt11Invoice] = {
fallbackAddress_opt.map { fa => fr.acinq.eclair.addressToPublicKeyScript(fa, appKit.nodeParams.chainHash) } // if it's not a bitcoin address throws an exception
(appKit.paymentHandler ? ReceiveStandardPayment(amount_opt, description, expire_opt, fallbackAddress_opt = fallbackAddress_opt, paymentPreimage_opt = paymentPreimage_opt)).mapTo[Bolt11Invoice]
(appKit.paymentHandler ? ReceiveStandardPayment(amount_opt, description, expire_opt, fallbackAddress_opt = fallbackAddress_opt, paymentPreimage_opt = paymentPreimage_opt)).mapTo[Try[Bolt11Invoice]].map(_.get)
}

override def newAddress(): Future[String] = {
Expand Down
8 changes: 6 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Expand Up @@ -39,6 +39,7 @@ import fr.acinq.eclair.db.FileBackupHandler.FileBackupParams
import fr.acinq.eclair.db.{Databases, DbEventHandler, FileBackupHandler}
import fr.acinq.eclair.io.{ClientSpawner, Peer, Server, Switchboard}
import fr.acinq.eclair.message.Postman
import fr.acinq.eclair.offer.OfferManager
import fr.acinq.eclair.payment.receive.PaymentHandler
import fr.acinq.eclair.payment.relay.{AsyncPaymentTriggerer, Relayer}
import fr.acinq.eclair.payment.send.{Autoprobe, PaymentInitiator}
Expand Down Expand Up @@ -335,7 +336,8 @@ class Setup(val datadir: File,
}
dbEventHandler = system.actorOf(SimpleSupervisor.props(DbEventHandler.props(nodeParams), "db-event-handler", SupervisorStrategy.Resume))
register = system.actorOf(SimpleSupervisor.props(Register.props(), "register", SupervisorStrategy.Resume))
paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register), "payment-handler", SupervisorStrategy.Resume))
offerManager = system.spawn(Behaviors.supervise(OfferManager(nodeParams, router)).onFailure(typed.SupervisorStrategy.restart), name = "offer-manager")
paymentHandler = system.actorOf(SimpleSupervisor.props(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler", SupervisorStrategy.Resume))
triggerer = system.spawn(Behaviors.supervise(AsyncPaymentTriggerer()).onFailure(typed.SupervisorStrategy.resume), name = "async-payment-triggerer")
relayer = system.actorOf(SimpleSupervisor.props(Relayer.props(nodeParams, router, register, paymentHandler, triggerer, Some(postRestartCleanUpInitialized)), "relayer", SupervisorStrategy.Resume))
// Before initializing the switchboard (which re-connects us to the network) and the user-facing parts of the system,
Expand All @@ -355,7 +357,7 @@ class Setup(val datadir: File,
_ = triggerer ! AsyncPaymentTriggerer.Start(switchboard.toTyped)
balanceActor = system.spawn(BalanceActor(nodeParams.db, bitcoinClient, channelsListener, nodeParams.balanceCheckInterval), name = "balance-actor")

postman = system.spawn(Behaviors.supervise(Postman(switchboard.toTyped)).onFailure(typed.SupervisorStrategy.restart), name = "postman")
postman = system.spawn(Behaviors.supervise(Postman(switchboard.toTyped, offerManager)).onFailure(typed.SupervisorStrategy.restart), name = "postman")

kit = Kit(
nodeParams = nodeParams,
Expand All @@ -371,6 +373,7 @@ class Setup(val datadir: File,
channelsListener = channelsListener,
balanceActor = balanceActor,
postman = postman,
offerManager = offerManager,
wallet = bitcoinClient)

zmqBlockTimeout = after(5 seconds, using = system.scheduler)(Future.failed(BitcoinZMQConnectionTimeoutException))
Expand Down Expand Up @@ -439,6 +442,7 @@ case class Kit(nodeParams: NodeParams,
channelsListener: typed.ActorRef[ChannelsListener.Command],
balanceActor: typed.ActorRef[BalanceActor.Command],
postman: typed.ActorRef[Postman.Command],
offerManager: typed.ActorRef[OfferManager.Command],
wallet: OnChainWallet)

object Kit {
Expand Down
Expand Up @@ -299,6 +299,11 @@ case class DualPaymentsDb(primary: PaymentsDb, secondary: PaymentsDb) extends Pa
primary.receiveIncomingPayment(paymentHash, amount, receivedAt)
}

override def receiveAddIncomingBlindedPayment(pr: Bolt12Invoice, preimage: ByteVector32, amount: MilliSatoshi, receivedAt: TimestampMilli, paymentType: String): Unit = {
runAsync(secondary.receiveAddIncomingBlindedPayment(pr, preimage, amount, receivedAt, paymentType))
primary.receiveAddIncomingBlindedPayment(pr, preimage, amount, receivedAt, paymentType)
}

override def getIncomingPayment(paymentHash: ByteVector32): Option[IncomingPayment] = {
runAsync(secondary.getIncomingPayment(paymentHash))
primary.getIncomingPayment(paymentHash)
Expand Down
Expand Up @@ -42,6 +42,9 @@ trait IncomingPaymentsDb {
*/
def receiveIncomingPayment(paymentHash: ByteVector32, amount: MilliSatoshi, receivedAt: TimestampMilli = TimestampMilli.now()): Boolean

/** Add a new blinded incoming payment as it is received. */
def receiveAddIncomingBlindedPayment(pr: Bolt12Invoice, preimage: ByteVector32, amount: MilliSatoshi, receivedAt: TimestampMilli = TimestampMilli.now(), paymentType: String = PaymentType.Blinded): Unit

/** Get information about the incoming payment (paid or not) for the given payment hash, if any. */
def getIncomingPayment(paymentHash: ByteVector32): Option[IncomingPayment]

Expand Down Expand Up @@ -133,7 +136,7 @@ case class IncomingStandardPayment(invoice: Bolt11Invoice,
case class IncomingBlindedPayment(invoice: Bolt12Invoice,
paymentPreimage: ByteVector32,
paymentType: String,
pathIds: Map[PublicKey, ByteVector],
pathIds: Option[Map[PublicKey, ByteVector]],
createdAt: TimestampMilli,
status: IncomingPaymentStatus) extends IncomingPayment

Expand Down
Expand Up @@ -268,6 +268,22 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
}
}

override def receiveAddIncomingBlindedPayment(invoice: Bolt12Invoice, preimage: ByteVector32, amount: MilliSatoshi, receivedAt: TimestampMilli, paymentType: String): Unit = withMetrics("payments/receive-incoming-blinded", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("INSERT INTO payments.received (payment_hash, payment_preimage, payment_type, payment_request, created_at, expire_at, received_msat, received_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)")) { statement =>
statement.setString(1, invoice.paymentHash.toHex)
statement.setString(2, preimage.toHex)
statement.setString(3, paymentType)
statement.setString(4, invoice.toString)
statement.setTimestamp(5, invoice.createdAt.toSqlTimestamp)
statement.setTimestamp(6, (invoice.createdAt + invoice.relativeExpiry.toSeconds).toSqlTimestamp)
statement.setLong(7, amount.toLong)
statement.setTimestamp(8, receivedAt.toSqlTimestamp)
statement.executeUpdate()
}
}
}

private def parseIncomingPayment(rs: ResultSet): Option[IncomingPayment] = {
val invoice = rs.getString("payment_request")
val preimage = rs.getByteVector32FromHex("payment_preimage")
Expand All @@ -279,7 +295,7 @@ class PgPaymentsDb(implicit ds: DataSource, lock: PgLock) extends PaymentsDb wit
Some(IncomingStandardPayment(invoice, preimage, paymentType, createdAt, status))
case Success(invoice: Bolt12Invoice) =>
val status = buildIncomingPaymentStatus(rs.getMilliSatoshiNullable("received_msat"), invoice, rs.getTimestampNullable("received_at").map(TimestampMilli.fromSqlTimestamp))
val pathIds = decodePathIds(BitVector(rs.getBytes("path_ids")))
val pathIds = Option(rs.getBytes("path_ids")).map(bytes => decodePathIds(BitVector(bytes)))
Some(IncomingBlindedPayment(invoice, preimage, paymentType, pathIds, createdAt, status))
case _ =>
logger.error(s"could not parse DB invoice=$invoice, this should not happen")
Expand Down
Expand Up @@ -278,6 +278,20 @@ class SqlitePaymentsDb(val sqlite: Connection) extends PaymentsDb with Logging {
}
}

override def receiveAddIncomingBlindedPayment(invoice: Bolt12Invoice, preimage: ByteVector32, amount: MilliSatoshi, receivedAt: TimestampMilli, paymentType: String): Unit = withMetrics("payments/receive-incoming-blinded", DbBackends.Sqlite) {
using(sqlite.prepareStatement("INSERT INTO received_payments (payment_hash, payment_preimage, payment_type, payment_request, created_at, expire_at, received_msat, received_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)")) { statement =>
statement.setBytes(1, invoice.paymentHash.toArray)
statement.setBytes(2, preimage.toArray)
statement.setString(3, paymentType)
statement.setString(4, invoice.toString)
statement.setLong(5, invoice.createdAt.toTimestampMilli.toLong)
statement.setLong(6, (invoice.createdAt + invoice.relativeExpiry).toLong.seconds.toMillis)
statement.setLong(7, amount.toLong)
statement.setLong(8, receivedAt.toLong)
println(Try{statement.executeUpdate()})
}
}

private def parseIncomingPayment(rs: ResultSet): Option[IncomingPayment] = {
val invoice = rs.getString("payment_request")
val preimage = rs.getByteVector32("payment_preimage")
Expand All @@ -289,7 +303,7 @@ class SqlitePaymentsDb(val sqlite: Connection) extends PaymentsDb with Logging {
Some(IncomingStandardPayment(invoice, preimage, paymentType, createdAt, status))
case Success(invoice: Bolt12Invoice) =>
val status = buildIncomingPaymentStatus(rs.getMilliSatoshiNullable("received_msat"), invoice, rs.getLongNullable("received_at").map(TimestampMilli(_)))
val pathIds = decodePathIds(BitVector(rs.getBytes("path_ids")))
val pathIds = Option(rs.getBytes("path_ids")).map(bytes => decodePathIds(BitVector(bytes)))
Some(IncomingBlindedPayment(invoice, preimage, paymentType, pathIds, createdAt, status))
case _ =>
logger.error(s"could not parse DB invoice=$invoice, this should not happen")
Expand Down
26 changes: 16 additions & 10 deletions eclair-core/src/main/scala/fr/acinq/eclair/message/Postman.scala
Expand Up @@ -16,13 +16,15 @@

package fr.acinq.eclair.message

import akka.actor.typed
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.io.{MessageRelay, Switchboard}
import fr.acinq.eclair.message.OnionMessages.{Destination, ReceiveMessage}
import fr.acinq.eclair.offer.OfferManager
import fr.acinq.eclair.wire.protocol.MessageOnion.FinalPayload
import fr.acinq.eclair.wire.protocol.{OnionMessagePayloadTlv, TlvStream}
import fr.acinq.eclair.{randomBytes32, randomKey}
Expand Down Expand Up @@ -51,7 +53,7 @@ object Postman {
replyTo: ActorRef[OnionMessageResponse],
timeout: FiniteDuration) extends Command
private case class Unsubscribe(pathId: ByteVector32) extends Command
private case class WrappedMessage(finalPayload: FinalPayload) extends Command
case class WrappedMessage(finalPayload: FinalPayload) extends Command
case class SendingStatus(status: MessageRelay.Status) extends Command

sealed trait OnionMessageResponse
Expand All @@ -62,7 +64,7 @@ object Postman {
case class MessageFailed(reason: String) extends MessageStatus
// @formatter:on

def apply(switchboard: ActorRef[Switchboard.RelayMessage]): Behavior[Command] = {
def apply(switchboard: ActorRef[Switchboard.RelayMessage], offerManager: typed.ActorRef[OfferManager.RequestInvoice]): Behavior[Command] = {
Behaviors.setup(context => {
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[ReceiveMessage](r => WrappedMessage(r.finalPayload)))

Expand All @@ -76,14 +78,18 @@ object Postman {

Behaviors.receiveMessagePartial {
case WrappedMessage(finalPayload) =>
finalPayload.pathId_opt match {
case Some(pathId) if pathId.length == 32 =>
val id = ByteVector32(pathId)
subscribed.get(id).foreach(ref => {
subscribed -= id
ref ! Response(finalPayload)
})
case _ => // ignoring message with invalid or missing pathId
if (finalPayload.records.get[OnionMessagePayloadTlv.InvoiceRequest].nonEmpty) {
offerManager ! OfferManager.RequestInvoice(finalPayload, context.self)
} else {
finalPayload.pathId_opt match {
case Some(pathId) if pathId.length == 32 =>
val id = ByteVector32(pathId)
subscribed.get(id).foreach(ref => {
subscribed -= id
ref ! Response(finalPayload)
})
case _ => // ignoring message with invalid or missing pathId
}
}
Behaviors.same
case SendMessage(intermediateNodes, destination, replyPath, messageContent, replyTo, timeout) =>
Expand Down

0 comments on commit fee48ff

Please sign in to comment.