Skip to content

Commit

Permalink
Fix rescans that are larger than the batch size (#1916)
Browse files Browse the repository at this point in the history
* Fix rescans that are larger than the batch size

* Add test
  • Loading branch information
benthecarman committed Aug 27, 2020
1 parent e8c28de commit 4104e0c
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ object FutureUtil {
}

/** Batches the elements by batchSize, executes f, and then aggregates all of the results
* into a vector and returns it. This is is the synchrononous version of [[batchAndParallelExecute()]]
* into a vector and returns it. This is is the synchronous version of [[batchAndParallelExecute()]]
*/
def batchAndSyncExecute[T, U](
elements: Vector[T],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package org.bitcoins.wallet

import org.bitcoins.core.currency.{Bitcoins, CurrencyUnits, Satoshis}
import org.bitcoins.core.protocol.BlockStamp
import org.bitcoins.core.protocol.script.ScriptPubKey
import org.bitcoins.core.util.FutureUtil
import org.bitcoins.server.BitcoinSAppConfig
import org.bitcoins.testkit.BitcoinSTestAppConfig
Expand All @@ -12,6 +13,8 @@ import org.bitcoins.testkit.wallet.{
}
import org.scalatest.FutureOutcome

import scala.concurrent.Future

class RescanHandlingTest extends BitcoinSWalletTest {

/** Wallet config with data directory set to user temp directory */
Expand Down Expand Up @@ -131,6 +134,9 @@ class RescanHandlingTest extends BitcoinSWalletTest {
initBlockHeight <- initBlockHeightF
txInBlockHeight = initBlockHeight + numBlocks
txInBlockHeightOpt = Some(BlockStamp.BlockHeight(txInBlockHeight))
_ <- newTxWallet.clearAllUtxosAndAddresses()
zeroBalance <- newTxWallet.getBalance()
_ = assert(zeroBalance == Satoshis.zero)
_ <- newTxWallet.rescanNeutrinoWallet(startOpt = txInBlockHeightOpt,
endOpt = None,
addressBatchSize =
Expand All @@ -142,6 +148,66 @@ class RescanHandlingTest extends BitcoinSWalletTest {
}
}

it must "be able to discover funds using multiple batches" in {
fixture: WalletWithBitcoind =>
val WalletWithBitcoindV19(wallet, bitcoind) = fixture

val amt = Bitcoins.one
val numBlocks = 1

//send funds to a fresh wallet address
val addrF = wallet.getNewAddress()
val bitcoindAddrF = bitcoind.getNewAddress
val newTxWalletF = for {
addr <- addrF
txid <- bitcoind.sendToAddress(addr, amt)
tx <- bitcoind.getRawTransactionRaw(txid)
bitcoindAddr <- bitcoindAddrF
blockHashes <-
bitcoind.generateToAddress(blocks = numBlocks, address = bitcoindAddr)
newTxWallet <- wallet.processTransaction(transaction = tx,
blockHashOpt =
blockHashes.headOption)
balance <- newTxWallet.getBalance()
unconfirmedBalance <- newTxWallet.getUnconfirmedBalance()
} yield {
//balance doesn't have to exactly equal, as there was money in the
//wallet before hand.
assert(balance >= amt)
assert(balance == unconfirmedBalance)
newTxWallet
}

for {
newTxWallet <- newTxWalletF

account <- newTxWallet.getDefaultAccount()
blocks <-
newTxWallet.spendingInfoDAO
.findAllForAccount(account.hdAccount)
.map(_.flatMap(_.blockHash).distinct)

_ <- newTxWallet.clearAllUtxosAndAddresses()
scriptPubKeys <-
1.to(10).foldLeft(Future.successful(Vector.empty[ScriptPubKey])) {
(prevFuture, _) =>
for {
prev <- prevFuture
address <- newTxWallet.getNewAddress(account)
changeAddress <- newTxWallet.getNewChangeAddress(account)
} yield prev :+ address.scriptPubKey :+ changeAddress.scriptPubKey
}
matches <- newTxWallet.getMatchingBlocks(scriptPubKeys,
None,
None,
batchSize = 1)
} yield {
assert(matches.size == blocks.size)
assert(
matches.forall(blockMatch => blocks.contains(blockMatch.blockHash)))
}
}

it must "be able to discover funds that occurred from the wallet creation time" in {
fixture: WalletWithBitcoind =>
val WalletWithBitcoindV19(wallet, bitcoind) = fixture
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,9 @@ private[wallet] trait RescanHandling extends WalletLogger {
_ = logger.info(
s"Beginning to search for matches between ${startHeight}:${endHeight} against ${scripts.length} spks")
range = startHeight.to(endHeight)
matched <- FutureUtil.batchExecute(
matched <- FutureUtil.batchAndSyncExecute(
elements = range.toVector,
f = fetchFiltersInRange(scripts, parallelismLevel),
init = Vector.empty,
batchSize = batchSize)
} yield {
logger.info(s"Matched ${matched.length} blocks on rescan")
Expand Down Expand Up @@ -264,20 +263,28 @@ private[wallet] trait RescanHandling extends WalletLogger {
startHeight = startHeight,
endHeight = endHeight)
filtered <- findMatches(filtersResponse, scripts, parallelismLevel)
} yield filtered.toVector
} yield {
logger.info(
s"Found ${filtered.length} matches from start=$startHeight to end=$endHeight")
filtered
}
}

private def findMatches(
filters: Vector[FilterResponse],
scripts: Vector[ScriptPubKey],
parallelismLevel: Int): Future[Iterator[BlockMatchingResponse]] = {
if (filters.isEmpty)
Future.successful(Iterator.empty)
else {
parallelismLevel: Int): Future[Vector[BlockMatchingResponse]] = {
if (filters.isEmpty) {
logger.info("No Filters to check against")
Future.successful(Vector.empty)
} else if (scripts.isEmpty) {
logger.info("No scripts to check against")
Future.successful(Vector.empty)
} else {
val bytes = scripts.map(_.asmBytes)
/* Iterates over the grouped vector of filters to find matches with the given [[bytes]]. */
val groupSize = calcGroupSize(filters.size, parallelismLevel)
val filterGroups = filters.grouped(groupSize)
val filterGroups = filters.grouped(groupSize).toVector
// Sequence on the filter groups making sure the number of threads doesn't exceed [[parallelismLevel]].
Future
.sequence(filterGroups.map { filterGroup =>
Expand All @@ -290,6 +297,7 @@ private[wallet] trait RescanHandling extends WalletLogger {
(blocks, filter) =>
val matcher = SimpleFilterMatcher(filter.compactFilter)
if (matcher.matchesAny(bytes)) {
logger.info(s"Found a match in block ${filter.blockHeight}")
blocks :+ BlockMatchingResponse(filter.blockHash,
filter.blockHeight)
} else {
Expand Down

0 comments on commit 4104e0c

Please sign in to comment.