Skip to content

Commit

Permalink
Merge pull request #2078 from ergoplatform/i2066-indexer-rework
Browse files Browse the repository at this point in the history
I2066 indexer rework
  • Loading branch information
kushti committed Dec 18, 2023
2 parents 1dea1ef + c9b38db commit f0b1b84
Show file tree
Hide file tree
Showing 14 changed files with 731 additions and 534 deletions.
16 changes: 16 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -606,4 +606,20 @@ api-dispatcher {
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 4
}

indexer-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 1.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 4
}
}
14 changes: 7 additions & 7 deletions src/main/scala/org/ergoplatform/ErgoApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,13 @@ class ErgoApp(args: Args) extends ScorexLogging {
None
}

if(ergoSettings.nodeSettings.extraIndex)
require(
ergoSettings.nodeSettings.stateType.holdsUtxoSet && !ergoSettings.nodeSettings.isFullBlocksPruned,
"Node must store full UTXO set and all blocks to run extra indexer."
)
// Create an instance of ExtraIndexer actor (will start if "extraIndex = true" in config)
private val indexer: ActorRef = ExtraIndexer(ergoSettings.chainSettings, ergoSettings.cacheSettings)
private val indexerOpt: Option[ActorRef] =
if (ergoSettings.nodeSettings.extraIndex) {
Some(ExtraIndexer(ergoSettings.chainSettings, ergoSettings.cacheSettings))
} else {
None
}

private val syncTracker = ErgoSyncTracker(scorexSettings.network)

Expand Down Expand Up @@ -183,7 +183,7 @@ class ErgoApp(args: Args) extends ScorexLogging {
private val apiRoutes: Seq[ApiRoute] = Seq(
EmissionApiRoute(ergoSettings),
ErgoUtilsApiRoute(ergoSettings),
BlockchainApiRoute(readersHolderRef, ergoSettings, indexer),
BlockchainApiRoute(readersHolderRef, ergoSettings, indexerOpt),
ErgoPeersApiRoute(
peerManagerRef,
networkControllerRef,
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/org/ergoplatform/GlobalConstants.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ object GlobalConstants {
* (to avoid clashing between blockchain processing and API actors)
*/
val ApiDispatcher = "api-dispatcher"

val IndexerDispatcher = "indexer-dispatcher"
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ trait ApiExtraCodecs extends JsonCodecs {
"inputs" -> iEt.inputs.asJson,
"dataInputs" -> iEt.dataInputs.asJson,
"outputs" -> iEt.outputs.asJson,
"size" -> iEt.txSize.asJson
"size" -> iEt.size.asJson
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.ergoplatform.http.api.SortDirection.{ASC, DESC, Direction, INVALID}
import org.ergoplatform.{ErgoAddress, ErgoAddressEncoder}
import org.ergoplatform.nodeView.ErgoReadersHolder.{GetDataFromHistory, GetReaders, Readers}
import org.ergoplatform.nodeView.history.ErgoHistoryReader
import org.ergoplatform.nodeView.history.extra.ExtraIndexer.ReceivableMessages.GetSegmentTreshold
import org.ergoplatform.nodeView.history.extra.ExtraIndexer.ReceivableMessages.GetSegmentThreshold
import org.ergoplatform.nodeView.history.extra.ExtraIndexer.{GlobalBoxIndexKey, GlobalTxIndexKey, getIndex}
import org.ergoplatform.nodeView.history.extra.IndexedErgoAddressSerializer.hashErgoTree
import org.ergoplatform.nodeView.history.extra.IndexedTokenSerializer.uniqueId
Expand All @@ -27,13 +27,14 @@ import scala.concurrent.duration.{Duration, SECONDS}
import scala.concurrent.{Await, Future}
import scala.util.Success

case class BlockchainApiRoute(readersHolder: ActorRef, ergoSettings: ErgoSettings, indexer: ActorRef)
case class BlockchainApiRoute(readersHolder: ActorRef, ergoSettings: ErgoSettings, indexerOpt: Option[ActorRef])
(implicit val context: ActorRefFactory) extends ErgoBaseApiRoute with ApiCodecs with ApiExtraCodecs {

val settings: RESTApiSettings = ergoSettings.scorexSettings.restApi

private implicit val segmentTreshold: Int =
Await.result[Int]((indexer ? GetSegmentTreshold).asInstanceOf[Future[Int]], Duration(3, SECONDS))
private implicit val segmentTreshold: Int = indexerOpt.map { indexer =>
Await.result[Int]((indexer ? GetSegmentThreshold).asInstanceOf[Future[Int]], Duration(3, SECONDS))
}.getOrElse(0)

private val paging: Directive[(Int, Int)] = parameters("offset".as[Int] ? 0, "limit".as[Int] ? 5)

Expand Down

0 comments on commit f0b1b84

Please sign in to comment.