Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2019 08 16 process header optimization #701

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -25,14 +25,14 @@ class BlockchainTest extends ChainUnitTest {
val newHeader =
BlockHeaderHelper.buildNextHeader(ChainUnitTest.genesisHeaderDb)

val connectTip = Blockchain.connectTip(header = newHeader.blockHeader,
Vector(blockchain))
val connectTip =
Blockchain.connectTip(header = newHeader.blockHeader, blockchain)

connectTip match {
case BlockchainUpdate.Successful(_, connectedHeader) =>
assert(newHeader == connectedHeader)
case ConnectTipResult.ExtendChain(_, newChain) =>
assert(newHeader == newChain.tip)

case fail: BlockchainUpdate.Failed =>
case fail @ (_: ConnectTipResult.Reorg | _: ConnectTipResult.BadTip) =>
assert(false)
}
}
Expand Down
Expand Up @@ -38,6 +38,13 @@ class ChainHandlerTest extends ChainUnitTest {
mainnetAppConfig.withOverrides(memoryDb)
}

val source = FileUtil.getFileAsSource("block_headers.json")
val arrStr = source.getLines.next
source.close()

import org.bitcoins.rpc.serializers.JsonReaders.BlockHeaderReads
val headersResult = Json.parse(arrStr).validate[Vector[BlockHeader]].get

override val defaultTag: ChainFixtureTag = ChainFixtureTag.GenisisChainHandler

override def withFixture(test: OneArgAsyncTest): FutureOutcome =
Expand Down Expand Up @@ -83,18 +90,8 @@ class ChainHandlerTest extends ChainUnitTest {

it must "be able to process and fetch real headers from mainnet" in {
chainHandler: ChainHandler =>
val source = FileUtil.getFileAsSource("block_headers.json")
val arrStr = source.getLines.next
source.close()

import org.bitcoins.rpc.serializers.JsonReaders.BlockHeaderReads
val headersResult = Json.parse(arrStr).validate[Vector[BlockHeader]]
if (headersResult.isError) {
fail(headersResult.toString)
}

val blockHeaders =
headersResult.get.drop(
headersResult.drop(
ChainUnitTest.FIRST_POW_CHANGE - ChainUnitTest.FIRST_BLOCK_HEIGHT)

val firstBlockHeaderDb =
Expand Down Expand Up @@ -135,6 +132,52 @@ class ChainHandlerTest extends ChainUnitTest {
}
}

it must "benchmark ChainHandler.processHeaders()" in {
chainHandler: ChainHandler =>
val blockHeaders =
headersResult.drop(
ChainUnitTest.FIRST_POW_CHANGE - ChainUnitTest.FIRST_BLOCK_HEIGHT)

val firstBlockHeaderDb =
BlockHeaderDbHelper.fromBlockHeader(ChainUnitTest.FIRST_POW_CHANGE - 2,
ChainTestUtil.blockHeader562462)

val secondBlockHeaderDb =
BlockHeaderDbHelper.fromBlockHeader(ChainUnitTest.FIRST_POW_CHANGE - 1,
ChainTestUtil.blockHeader562463)

val thirdBlockHeaderDb =
BlockHeaderDbHelper.fromBlockHeader(ChainUnitTest.FIRST_POW_CHANGE,
ChainTestUtil.blockHeader562464)

/*
* We need to insert one block before the first POW check because it is used on the next
* POW check. We then need to insert the next to blocks to circumvent a POW check since
* that would require we have an old block in the Blockchain that we don't have.
*/
val firstThreeBlocks =
Vector(firstBlockHeaderDb, secondBlockHeaderDb, thirdBlockHeaderDb)

val createdF = chainHandler.blockHeaderDAO.createAll(firstThreeBlocks)

createdF.flatMap { _ =>
val blockchain = Blockchain.fromHeaders(firstThreeBlocks.reverse)
val handler = ChainHandler(chainHandler.blockHeaderDAO, blockchain)

// Takes way too long to do all blocks
val blockHeadersToTest = blockHeaders.tail
.take(
(2 * chainHandler.chainConfig.chain.difficultyChangeInterval + 1))

val processedF = handler.processHeaders(blockHeadersToTest)

for {
ch <- processedF
bestHash <- ch.getBestBlockHash
} yield assert(bestHash == blockHeadersToTest.last.hashBE)
}
}

it must "handle a very basic reorg where one chain is one block behind the best chain" in {
chainHandler: ChainHandler =>
val reorgFixtureF = buildChainHandlerCompetingHeaders(chainHandler)
Expand Down Expand Up @@ -214,7 +257,7 @@ class ChainHandlerTest extends ChainUnitTest {
}

final def processHeaders(
processorF: Future[ChainHandler],
processorF: Future[ChainApi],
remainingHeaders: List[BlockHeader],
height: Int): Future[Assertion] = {
remainingHeaders match {
Expand Down
11 changes: 4 additions & 7 deletions chain/src/main/scala/org/bitcoins/chain/api/ChainApi.scala
Expand Up @@ -20,7 +20,9 @@ trait ChainApi {
* @return
*/
def processHeader(header: BlockHeader)(
implicit ec: ExecutionContext): Future[ChainApi]
implicit ec: ExecutionContext): Future[ChainApi] = {
processHeaders(Vector(header))
}

/** Process all of the given headers and returns a new [[ChainApi chain api]]
* that contains these headers. This method processes headers in the order
Expand All @@ -29,12 +31,7 @@ trait ChainApi {
* @return
*/
def processHeaders(headers: Vector[BlockHeader])(
implicit ec: ExecutionContext): Future[ChainApi] = {
headers.foldLeft(Future.successful(this)) {
case (chainF, header) =>
chainF.flatMap(_.processHeader(header))
}
}
implicit ec: ExecutionContext): Future[ChainApi]

/** Get's a [[org.bitcoins.chain.models.BlockHeaderDb]] from the chain's database */
def getHeader(hash: DoubleSha256DigestBE)(
Expand Down
198 changes: 125 additions & 73 deletions chain/src/main/scala/org/bitcoins/chain/blockchain/Blockchain.scala
Expand Up @@ -6,6 +6,7 @@ import org.bitcoins.chain.validation.{TipUpdateResult, TipValidation}
import org.bitcoins.core.protocol.blockchain.BlockHeader
import org.bitcoins.chain.ChainVerificationLogger

import scala.annotation.tailrec
import scala.collection.{IndexedSeqLike, mutable}

/**
Expand Down Expand Up @@ -53,103 +54,154 @@ case class Blockchain(headers: Vector[BlockHeaderDb])
}
}

/** Unsafe version for [[org.bitcoins.chain.blockchain.Blockchain.fromHeader() fromHeader]] that can throw [[NoSuchElementException]] */
def fromValidHeader(header: BlockHeaderDb): Blockchain = {
fromHeader(header).get
}

/** The height of the chain */
def height: Int = tip.height

}

object Blockchain extends ChainVerificationLogger {

def fromHeaders(headers: Vector[BlockHeaderDb]): Blockchain = {

Blockchain(headers)
}

/**
* Attempts to connect the given block header with the given blockchain
* This is done via the companion object for blockchain because
* we query [[org.bitcoins.chain.models.BlockHeaderDAO BlockHeaderDAO]] for the chain tips
* We then attempt to connect this block header to all of our current
* chain tips.
* @param header the block header to connect to our chain
* @param blockchains the blockchain we are attempting to connect to
* @return a [[scala.concurrent.Future Future]] that contains a [[org.bitcoins.chain.blockchain.BlockchainUpdate BlockchainUpdate]] indicating
* we [[org.bitcoins.chain.blockchain.BlockchainUpdate.Successful successful]] connected the tip,
* or [[org.bitcoins.chain.blockchain.BlockchainUpdate.Failed Failed]] to connect to a tip
* @param blockchain the blockchain we are attempting to connect to
*/
def connectTip(header: BlockHeader, blockchains: Vector[Blockchain])(
implicit conf: ChainAppConfig): BlockchainUpdate = {
def connectTip(header: BlockHeader, blockchain: Blockchain)(
implicit conf: ChainAppConfig): ConnectTipResult = {
logger.debug(
s"Attempting to add new tip=${header.hashBE.hex} with prevhash=${header.previousBlockHashBE.hex} to chain")

val tipResult: BlockchainUpdate = {
val nested: Vector[BlockchainUpdate] = blockchains.map { blockchain =>
val prevBlockHeaderIdxOpt =
blockchain.headers.zipWithIndex.find {
case (headerDb, _) =>
headerDb.hashBE == header.previousBlockHashBE
}
prevBlockHeaderIdxOpt match {
case None =>
logger.warn(
s"No common ancestor found in the chain to connect to ${header.hashBE}")
val err = TipUpdateResult.BadPreviousBlockHash(header)
val failed = BlockchainUpdate.Failed(blockchain = blockchain,
failedHeader = header,
tipUpdateFailure = err)
failed

case Some((prevBlockHeader, prevHeaderIdx)) =>
//found a header to connect to!
logger.debug(
s"Attempting to add new tip=${header.hashBE.hex} with prevhash=${header.previousBlockHashBE.hex} to chain")
val chain = blockchain.fromHeader(prevBlockHeader)
val tipResult =
TipValidation.checkNewTip(newPotentialTip = header, chain.get)

tipResult match {
case TipUpdateResult.Success(headerDb) =>
logger.debug(
s"Successfully verified=${headerDb.hashBE.hex}, connecting to chain")
val oldChain =
blockchain.takeRight(blockchain.length - prevHeaderIdx)
val newChain =
Blockchain.fromHeaders(headerDb +: oldChain)
BlockchainUpdate.Successful(newChain, headerDb)
case fail: TipUpdateResult.Failure =>
logger.warn(
s"Could not verify header=${header.hashBE.hex}, reason=$fail")
BlockchainUpdate.Failed(blockchain, header, fail)
}
val tipResult: ConnectTipResult = {
val prevBlockHeaderIdxOpt =
blockchain.headers.zipWithIndex.find {
case (headerDb, _) =>
headerDb.hashBE == header.previousBlockHashBE
}
prevBlockHeaderIdxOpt match {
case None =>
logger.warn(
s"No common ancestor found in the chain to connect to ${header.hashBE}")
val err = TipUpdateResult.BadPreviousBlockHash(header)
val failed = ConnectTipResult.BadTip(err)
failed

case Some((prevBlockHeader, prevHeaderIdx)) =>
//found a header to connect to!
logger.debug(
s"Attempting to add new tip=${header.hashBE.hex} with prevhash=${header.previousBlockHashBE.hex} to chain")
val chain = blockchain.fromValidHeader(prevBlockHeader)
val tipResult =
TipValidation.checkNewTip(newPotentialTip = header, chain)

tipResult match {
case success: TipUpdateResult.Success =>
logger.debug(
s"Successfully verified=${success.header.hashBE.hex}, connecting to chain")
val connectionIdx = blockchain.length - prevHeaderIdx

val oldChain =
blockchain.takeRight(connectionIdx)
val newChain =
Blockchain.fromHeaders(success.headerDb +: oldChain)

if (connectionIdx != blockchain.length) {
//means we have a reorg, since we aren't connecting to latest tip
ConnectTipResult.Reorg(success, newChain)
} else {
//we just extended the latest tip
ConnectTipResult.ExtendChain(success, newChain)
}
case fail: TipUpdateResult.Failure =>
logger.warn(
s"Could not verify header=${header.hashBE.hex}, reason=$fail")
ConnectTipResult.BadTip(fail)
}
}
parseSuccessOrFailure(nested)
}

tipResult
}

/** Takes in a vector of blockchain updates being executed asynchronously, we can only connect one [[BlockHeader header]]
* to a tip successfully, which means _all_ other [[BlockchainUpdate updates]] must fail. This is a helper method
* to find the one [[BlockchainUpdate.Successful successful]] update, or else returns one of the [[BlockchainUpdate.Failed failures]]
* @return
*/
private def parseSuccessOrFailure(
updates: Vector[BlockchainUpdate]): BlockchainUpdate = {
require(updates.nonEmpty,
s"Cannot parse success or failure if we don't have any updates!")
val successfulTipOpt: Option[BlockchainUpdate] = {
updates.find {
case update: BlockchainUpdate =>
update.isInstanceOf[BlockchainUpdate.Successful]
/** Iterates through each given blockchains attempting to connect the given headers to that chain
* @return The final updates for each chain
*
* */
def connectHeadersToChains(
headers: Vector[BlockHeader],
blockchains: Vector[Blockchain])(
implicit chainAppConfig: ChainAppConfig): Vector[BlockchainUpdate] = {
logger.debug(
s"Attempting to connect ${headers.length} headers to ${blockchains.length} blockchains")

@tailrec
def loop(
headersToProcess: Vector[BlockHeader],
lastUpdates: Vector[BlockchainUpdate]): Vector[BlockchainUpdate] = {
headersToProcess match {
case h +: t =>
val newUpdates: Vector[BlockchainUpdate] = lastUpdates
.flatMap { lastUpdate =>
val connectTipResult =
Blockchain.connectTip(header = h,
blockchain = lastUpdate.blockchain)
parseConnectTipResult(connectTipResult, lastUpdate)
}

loop(headersToProcess = t, lastUpdates = newUpdates)
case Vector() =>
lastUpdates
}
}

successfulTipOpt match {
case Some(update) => update
case None =>
//if we didn't successfully connect a tip, just take the first failure we see
updates.find {
case update: BlockchainUpdate =>
update.isInstanceOf[BlockchainUpdate.Failed]
}.get
val initUpdates = blockchains.map { blockchain =>
BlockchainUpdate.Successful(blockchain, Vector.empty)
}

loop(headers, initUpdates)
}

/** Parses a connect tip result, and depending on the result it
* 1. Extends the current chain by one block
* 2. Causes a re-org, which returns the old best tip and the new competing chain
* 3. Fails to connect tip, in which case it returns the old best chain
* */
private def parseConnectTipResult(
connectTipResult: ConnectTipResult,
lastUpdate: BlockchainUpdate): Vector[BlockchainUpdate] = {
lastUpdate match {
case _: BlockchainUpdate.Successful =>
connectTipResult match {
case ConnectTipResult.ExtendChain(tipUpdateResult, newChain) =>
val update = BlockchainUpdate.Successful(
newChain,
tipUpdateResult.headerDb +: lastUpdate.successfulHeaders)
Vector(update)

case ConnectTipResult.Reorg(tipUpdateResult, newChain) =>
val competingUpdate = BlockchainUpdate.Successful(
newChain,
tipUpdateResult.headerDb +: lastUpdate.successfulHeaders)
Vector(lastUpdate, competingUpdate)
case ConnectTipResult.BadTip(tipUpdateResult) =>
val failedUpdate = BlockchainUpdate.Failed(
lastUpdate.blockchain,
lastUpdate.successfulHeaders,
tipUpdateResult.header,
tipUpdateResult)
Vector(failedUpdate)

}

case f: BlockchainUpdate.Failed => Vector(f)
}

}
}