Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework ElectrumClient #512

Merged
merged 16 commits into from Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/test.yml
Expand Up @@ -65,3 +65,12 @@ jobs:
- name: Check without integration
if: matrix.os == 'macOS-latest'
run: ./gradlew build -x jvmTest

# Uncomment the lines below to store test results for debugging failed tests (useful for iOS)
# - name: Store test results
# if: always()
# uses: actions/upload-artifact@v3
# with:
# name: test results
# path: build/reports
# retention-days: 1

Large diffs are not rendered by default.

Expand Up @@ -10,26 +10,21 @@ import fr.acinq.lightning.transactions.Transactions
import fr.acinq.lightning.utils.MDCLogger
import fr.acinq.lightning.utils.sat


suspend fun IElectrumClient.getConfirmations(txId: ByteVector32): Int? {
val tx = kotlin.runCatching { getTx(txId) }.getOrNull()
return tx?.let { getConfirmations(tx) }
}
suspend fun IElectrumClient.getConfirmations(txId: ByteVector32, currentBlockHeight: Int): Int? = getTx(txId)?.let { tx -> getConfirmations(tx, currentBlockHeight) }

/**
* @return the number of confirmations, zero if the transaction is in the mempool, null if the transaction is not found
* @return the number of confirmations, zero if the transaction is in the mempool, null if the transaction is not found
*/
suspend fun IElectrumClient.getConfirmations(tx: Transaction): Int? {
suspend fun IElectrumClient.getConfirmations(tx: Transaction, currentBlockHeight: Int): Int? {
pm47 marked this conversation as resolved.
Show resolved Hide resolved
val scriptHash = ElectrumClient.computeScriptHash(tx.txOut.first().publicKeyScript)
val scriptHashHistory = getScriptHashHistory(scriptHash)
val item = scriptHashHistory.find { it.txid == tx.txid }
val blockHeight = startHeaderSubscription().blockHeight
return item?.let { if (item.blockHeight > 0) blockHeight - item.blockHeight + 1 else 0 }
return item?.let { if (item.blockHeight > 0) currentBlockHeight - item.blockHeight + 1 else 0 }
}

suspend fun IElectrumClient.computeSpliceCpfpFeerate(commitments: Commitments, targetFeerate: FeeratePerKw, spliceWeight: Int, logger: MDCLogger): Pair<FeeratePerKw, Satoshi> {
suspend fun IElectrumClient.computeSpliceCpfpFeerate(commitments: Commitments, targetFeerate: FeeratePerKw, spliceWeight: Int, currentBlockHeight: Int, logger: MDCLogger): Pair<FeeratePerKw, Satoshi> {
val (parentsWeight, parentsFees) = commitments.all
.takeWhile { getConfirmations(it.fundingTxId).let { confirmations -> confirmations == null || confirmations == 0 } } // we check for null in case the tx has been evicted
.takeWhile { getConfirmations(it.fundingTxId, currentBlockHeight).let { confirmations -> confirmations == null || confirmations == 0 } } // we check for null in case the tx has been evicted
.fold(Pair(0, 0.sat)) { (parentsWeight, parentsFees), commitment ->
val weight = when (commitment.localFundingStatus) {
// weight will be underestimated if the transaction is not fully signed
Expand Down
Expand Up @@ -18,7 +18,7 @@ import kotlinx.serialization.json.*

/**
* Common communication objects between [ElectrumClient] and external ressources (e.g. [ElectrumWatcher])
* See the documentation for the ElectrumX protocol there: https://electrumx.readthedocs.io
* See the documentation for the ElectrumX protocol there: https://electrumx-spesmilo.readthedocs.io
*/

/**
Expand Down
@@ -1,7 +1,6 @@
package fr.acinq.lightning.blockchain.electrum

import fr.acinq.bitcoin.*
import fr.acinq.lightning.utils.Connection
import fr.acinq.lightning.utils.sum
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
Expand Down Expand Up @@ -138,11 +137,8 @@ class ElectrumMiniWallet(
val unspents = client.getScriptHashUnspents(msg.scriptHash)
val newUtxos = unspents.minus((_walletStateFlow.value.addresses[bitcoinAddress] ?: emptyList()).toSet())
// request new parent txs
val parentTxs = newUtxos.map { utxo ->
val tx = client.getTx(utxo.txid)
logger.mdcinfo { "received parent transaction with txid=${tx.txid}" }
tx
}
val parentTxs = newUtxos.mapNotNull { utxo -> client.getTx(utxo.txid) }
parentTxs.forEach { tx -> logger.mdcinfo { "received parent transaction with txid=${tx.txid}" } }
val nextWalletState = this.copy(addresses = this.addresses + (bitcoinAddress to unspents), parentTxs = this.parentTxs + parentTxs.associateBy { it.txid })
logger.mdcinfo { "${unspents.size} utxo(s) for address=$bitcoinAddress balance=${nextWalletState.totalBalance}" }
unspents.forEach { logger.debug { "utxo=${it.outPoint.txid}:${it.outPoint.index} amount=${it.value} sat" } }
Expand All @@ -162,7 +158,6 @@ class ElectrumMiniWallet(
logger.error { "cannot subscribe to $bitcoinAddress ($result)" }
null
}

is AddressToPublicKeyScriptResult.Success -> {
val pubkeyScript = ByteVector(Script.write(result.script))
val scriptHash = ElectrumClient.computeScriptHash(pubkeyScript)
Expand All @@ -178,7 +173,7 @@ class ElectrumMiniWallet(
job = launch {
launch {
// listen to connection events
client.connectionState.filterIsInstance<Connection.ESTABLISHED>().collect { mailbox.send(WalletCommand.Companion.ElectrumConnected) }
client.connectionStatus.filterIsInstance<ElectrumConnectionStatus.Connected>().collect { mailbox.send(WalletCommand.Companion.ElectrumConnected) }
}
launch {
// listen to subscriptions events
Expand All @@ -191,13 +186,11 @@ class ElectrumMiniWallet(
logger.mdcinfo { "electrum connected" }
scriptHashes.values.forEach { scriptHash -> subscribe(scriptHash) }
}

is WalletCommand.Companion.ElectrumNotification -> {
if (it.msg is ScriptHashSubscriptionResponse) {
_walletStateFlow.value = _walletStateFlow.value.processSubscriptionResponse(it.msg)
}
}

is WalletCommand.Companion.AddAddress -> {
logger.mdcinfo { "adding new address=${it.bitcoinAddress}" }
subscribe(it.bitcoinAddress)?.let {
Expand Down
Expand Up @@ -67,7 +67,7 @@ class ElectrumWatcher(val client: IElectrumClient, val scope: CoroutineScope, lo
logger.info { "initializing electrum watcher" }

suspend fun processScripHashHistory(history: List<TransactionHistoryItem>) = runCatching {
val txs = history.filter { it.blockHeight >= -1 }.map { client.getTx(it.txid) }
val txs = history.filter { it.blockHeight >= -1 }.mapNotNull { client.getTx(it.txid) }

// WatchSpent
txs.forEach { tx ->
Expand All @@ -91,30 +91,30 @@ class ElectrumWatcher(val client: IElectrumClient, val scope: CoroutineScope, lo
.filter { it.txId == item.txid }
.filter { state.height - item.blockHeight + 1 >= it.minDepth }
triggered.forEach { w ->
val merkle = client.getMerkle(w.txId, item.blockHeight)
val confirmations = state.height - merkle.block_height + 1
logger.info { "txid=${w.txId} had confirmations=$confirmations in block=${merkle.block_height} pos=${merkle.pos}" }
_notificationsFlow.emit(WatchEventConfirmed(w.channelId, w.event, merkle.block_height, merkle.pos, txMap[w.txId]!!))

// check whether we have transactions to publish
when (val event = w.event) {
is BITCOIN_PARENT_TX_CONFIRMED -> {
val tx = event.childTx
logger.info { "parent tx of txid=${tx.txid} has been confirmed" }
val cltvTimeout = Scripts.cltvTimeout(tx)
val csvTimeout = Scripts.csvTimeout(tx)
val absTimeout = max(merkle.block_height + csvTimeout, cltvTimeout)
state = if (absTimeout > state.height) {
logger.info { "delaying publication of txid=${tx.txid} until block=$absTimeout (curblock=${state.height})" }
val block2tx = state.block2tx + (absTimeout to state.block2tx.getOrElse(absTimeout) { setOf() } + tx)
state.copy(block2tx = block2tx)
} else {
client.broadcastTransaction(tx)
state.copy(sent = state.sent + tx)
client.getMerkle(w.txId, item.blockHeight)?.let { merkle ->
val confirmations = state.height - merkle.block_height + 1
logger.info { "txid=${w.txId} had confirmations=$confirmations in block=${merkle.block_height} pos=${merkle.pos}" }
_notificationsFlow.emit(WatchEventConfirmed(w.channelId, w.event, merkle.block_height, merkle.pos, txMap[w.txId]!!))

// check whether we have transactions to publish
when (val event = w.event) {
is BITCOIN_PARENT_TX_CONFIRMED -> {
val tx = event.childTx
logger.info { "parent tx of txid=${tx.txid} has been confirmed" }
val cltvTimeout = Scripts.cltvTimeout(tx)
val csvTimeout = Scripts.csvTimeout(tx)
val absTimeout = max(merkle.block_height + csvTimeout, cltvTimeout)
state = if (absTimeout > state.height) {
logger.info { "delaying publication of txid=${tx.txid} until block=$absTimeout (curblock=${state.height})" }
val block2tx = state.block2tx + (absTimeout to state.block2tx.getOrElse(absTimeout) { setOf() } + tx)
state.copy(block2tx = block2tx)
} else {
client.broadcastTransaction(tx)
state.copy(sent = state.sent + tx)
}
}
else -> {}
}

else -> {}
}
}
state = state.copy(watches = state.watches - triggered.toSet())
Expand Down
@@ -1,35 +1,49 @@
package fr.acinq.lightning.blockchain.electrum

import fr.acinq.bitcoin.BlockHeader
import fr.acinq.bitcoin.ByteVector32
import fr.acinq.bitcoin.Transaction
import fr.acinq.lightning.utils.Connection
import kotlinx.coroutines.CompletableDeferred
import fr.acinq.lightning.blockchain.fee.FeeratePerKw
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.StateFlow

/** Note to implementers: methods exposed through this interface must *not* throw exceptions. */
interface IElectrumClient {
val notifications: Flow<ElectrumSubscriptionResponse>
val connectionStatus: StateFlow<ElectrumConnectionStatus>

/** Return the transaction matching the txId provided, if it can be found. */
suspend fun getTx(txid: ByteVector32): Transaction?

suspend fun send(request: ElectrumRequest, replyTo: CompletableDeferred<ElectrumResponse>)
/** Return the block header at the given height, if it exists. */
suspend fun getHeader(blockHeight: Int): BlockHeader?

suspend fun getTx(txid: ByteVector32): Transaction
/** Return the block headers starting at the given height, if they exist (empty list otherwise). */
suspend fun getHeaders(startHeight: Int, count: Int): List<BlockHeader>

suspend fun getMerkle(txid: ByteVector32, blockHeight: Int, contextOpt: Transaction? = null): GetMerkleResponse
/** Return a merkle proof for the given transaction, if it can be found. */
suspend fun getMerkle(txid: ByteVector32, blockHeight: Int, contextOpt: Transaction? = null): GetMerkleResponse?

/** Return the transaction history for a given script, or an empty list if the script is unknown. */
suspend fun getScriptHashHistory(scriptHash: ByteVector32): List<TransactionHistoryItem>

/** Return the utxos matching a given script, or an empty list if the script is unknown. */
suspend fun getScriptHashUnspents(scriptHash: ByteVector32): List<UnspentItem>

suspend fun startScriptHashSubscription(scriptHash: ByteVector32): ScriptHashSubscriptionResponse

suspend fun startHeaderSubscription(): HeaderSubscriptionResponse
/**
* Try broadcasting a transaction: we cannot know whether the remote server really broadcast the transaction,
* so we always consider it to be a success. The client should regularly retry transactions that don't confirm.
*/
suspend fun broadcastTransaction(tx: Transaction): ByteVector32

suspend fun broadcastTransaction(tx: Transaction): BroadcastTransactionResponse
/** Estimate the feerate required for a transaction to be confirmed in the next [confirmations] blocks. */
suspend fun estimateFees(confirmations: Int): FeeratePerKw?

suspend fun estimateFees(confirmations: Int): EstimateFeeResponse
/******************** Subscriptions ********************/

val notifications: Flow<ElectrumSubscriptionResponse>

val connectionStatus: StateFlow<ElectrumConnectionStatus>
/** Subscribe to changes to a given script. */
suspend fun startScriptHashSubscription(scriptHash: ByteVector32): ScriptHashSubscriptionResponse

val connectionState: StateFlow<Connection>
/** Subscribe to headers for new blocks found. */
suspend fun startHeaderSubscription(): HeaderSubscriptionResponse
}
21 changes: 12 additions & 9 deletions src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt
Expand Up @@ -193,7 +193,7 @@ class Peer(
}
}
launch {
watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.collect {
watcher.client.connectionStatus.filter { it is ElectrumConnectionStatus.Connected }.collect {
// onchain fees are retrieved punctually, when electrum status moves to Connection.ESTABLISHED
// since the application is not running most of the time, and when it is, it will be only for a few minutes, this is good enough.
// (for a node that is online most of the time things would be different and we would need to re-evaluate onchain fee estimates on a regular basis)
Expand Down Expand Up @@ -256,7 +256,7 @@ class Peer(
}

private suspend fun updateEstimateFees() {
watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.first()
watcher.client.connectionStatus.filter { it is ElectrumConnectionStatus.Connected }.first()
val sortedFees = listOf(
watcher.client.estimateFees(2),
watcher.client.estimateFees(6),
Expand All @@ -266,10 +266,10 @@ class Peer(
logger.info { "on-chain fees: $sortedFees" }
// TODO: If some feerates are null, we may implement a retry
onChainFeeratesFlow.value = OnChainFeerates(
fundingFeerate = sortedFees[3].feerate ?: FeeratePerKw(FeeratePerByte(2.sat)),
mutualCloseFeerate = sortedFees[2].feerate ?: FeeratePerKw(FeeratePerByte(10.sat)),
claimMainFeerate = sortedFees[1].feerate ?: FeeratePerKw(FeeratePerByte(20.sat)),
fastFeerate = sortedFees[0].feerate ?: FeeratePerKw(FeeratePerByte(50.sat))
fundingFeerate = sortedFees[3] ?: FeeratePerKw(FeeratePerByte(2.sat)),
mutualCloseFeerate = sortedFees[2] ?: FeeratePerKw(FeeratePerByte(10.sat)),
claimMainFeerate = sortedFees[1] ?: FeeratePerKw(FeeratePerByte(20.sat)),
fastFeerate = sortedFees[0] ?: FeeratePerKw(FeeratePerByte(50.sat))
)
}

Expand Down Expand Up @@ -411,12 +411,13 @@ class Peer(
* for a splice out, taking into account potential unconfirmed parent splices.
*/
suspend fun estimateFeeForSpliceOut(amount: Satoshi, scriptPubKey: ByteVector, targetFeerate: FeeratePerKw): Pair<FeeratePerKw, Satoshi>? {
val currentBlockHeight = currentTipFlow.filterNotNull().first().first
return channels.values
.filterIsInstance<Normal>()
.firstOrNull { it.commitments.availableBalanceForSend() > amount }
?.let { channel ->
val weight = FundingContributions.computeWeightPaid(isInitiator = true, commitment = channel.commitments.active.first(), walletInputs = emptyList(), localOutputs = listOf(TxOut(amount, scriptPubKey)))
watcher.client.computeSpliceCpfpFeerate(channel.commitments, targetFeerate, spliceWeight = weight, logger)
watcher.client.computeSpliceCpfpFeerate(channel.commitments, targetFeerate, spliceWeight = weight, currentBlockHeight, logger)
}
}

Expand All @@ -429,12 +430,13 @@ class Peer(
* should not be attempted.
*/
suspend fun estimateFeeForSpliceCpfp(channelId: ByteVector32, targetFeerate: FeeratePerKw): Pair<FeeratePerKw, Satoshi>? {
val currentBlockHeight = currentTipFlow.filterNotNull().first().first
return channels.values
.filterIsInstance<Normal>()
.find { it.channelId == channelId }
?.let { channel ->
val weight = FundingContributions.computeWeightPaid(isInitiator = true, commitment = channel.commitments.active.first(), walletInputs = emptyList(), localOutputs = emptyList())
watcher.client.computeSpliceCpfpFeerate(channel.commitments, targetFeerate, spliceWeight = weight, logger)
watcher.client.computeSpliceCpfpFeerate(channel.commitments, targetFeerate, spliceWeight = weight, currentBlockHeight, logger)
}
}

Expand Down Expand Up @@ -948,9 +950,10 @@ class Peer(
is RequestChannelOpen -> {
when (val channel = channels.values.firstOrNull { it is Normal }) {
is ChannelStateWithCommitments -> {
val currentBlockHeight = currentTipFlow.filterNotNull().first().first
val targetFeerate = swapInFeeratesFlow.filterNotNull().first()
val weight = FundingContributions.computeWeightPaid(isInitiator = true, commitment = channel.commitments.active.first(), walletInputs = cmd.walletInputs, localOutputs = emptyList())
val (feerate, fee) = watcher.client.computeSpliceCpfpFeerate(channel.commitments, targetFeerate, spliceWeight = weight, logger)
val (feerate, fee) = watcher.client.computeSpliceCpfpFeerate(channel.commitments, targetFeerate, spliceWeight = weight, currentBlockHeight, logger)

logger.info { "requesting splice-in using balance=${cmd.walletInputs.balance} feerate=$feerate fee=$fee" }

Expand Down
13 changes: 0 additions & 13 deletions src/commonMain/kotlin/fr/acinq/lightning/io/TcpSocket.kt
@@ -1,10 +1,5 @@
package fr.acinq.lightning.io

import fr.acinq.lightning.utils.decodeToString
import fr.acinq.lightning.utils.splitByLines
import fr.acinq.lightning.utils.subArray
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import org.kodein.log.LoggerFactory

interface TcpSocket {
Expand Down Expand Up @@ -69,11 +64,3 @@ suspend fun TcpSocket.receiveAvailable(buffer: ByteArray) = receiveAvailable(buf
internal expect object PlatformSocketBuilder : TcpSocket.Builder

suspend fun TcpSocket.receiveFully(size: Int): ByteArray = ByteArray(size).also { receiveFully(it) }

fun TcpSocket.linesFlow(): Flow<String> = flow {
val buffer = ByteArray(8192)
while (true) {
val size = receiveAvailable(buffer)
emit(buffer.subArray(size))
}
}.decodeToString().splitByLines()