Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

I2066 indexer rework #2078

Merged
merged 15 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -606,4 +606,20 @@ api-dispatcher {
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 4
}

indexer-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 1
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 1.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 4
}
}
14 changes: 7 additions & 7 deletions src/main/scala/org/ergoplatform/ErgoApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,13 @@ class ErgoApp(args: Args) extends ScorexLogging {
None
}

if(ergoSettings.nodeSettings.extraIndex)
require(
ergoSettings.nodeSettings.stateType.holdsUtxoSet && !ergoSettings.nodeSettings.isFullBlocksPruned,
"Node must store full UTXO set and all blocks to run extra indexer."
)
// Create an instance of ExtraIndexer actor (will start if "extraIndex = true" in config)
private val indexer: ActorRef = ExtraIndexer(ergoSettings.chainSettings, ergoSettings.cacheSettings)
private val indexerOpt: Option[ActorRef] =
if (ergoSettings.nodeSettings.extraIndex) {
Some(ExtraIndexer(ergoSettings.chainSettings, ergoSettings.cacheSettings))
} else {
None
}

private val syncTracker = ErgoSyncTracker(scorexSettings.network)

Expand Down Expand Up @@ -183,7 +183,7 @@ class ErgoApp(args: Args) extends ScorexLogging {
private val apiRoutes: Seq[ApiRoute] = Seq(
EmissionApiRoute(ergoSettings),
ErgoUtilsApiRoute(ergoSettings),
BlockchainApiRoute(readersHolderRef, ergoSettings, indexer),
BlockchainApiRoute(readersHolderRef, ergoSettings, indexerOpt),
ErgoPeersApiRoute(
peerManagerRef,
networkControllerRef,
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/org/ergoplatform/GlobalConstants.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ object GlobalConstants {
* (to avoid clashing between blockchain processing and API actors)
*/
val ApiDispatcher = "api-dispatcher"

val IndexerDispatcher = "indexer-dispatcher"
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ trait ApiExtraCodecs extends JsonCodecs {
"inputs" -> iEt.inputs.asJson,
"dataInputs" -> iEt.dataInputs.asJson,
"outputs" -> iEt.outputs.asJson,
"size" -> iEt.txSize.asJson
"size" -> iEt.size.asJson
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.ergoplatform.http.api.SortDirection.{ASC, DESC, Direction, INVALID}
import org.ergoplatform.{ErgoAddress, ErgoAddressEncoder}
import org.ergoplatform.nodeView.ErgoReadersHolder.{GetDataFromHistory, GetReaders, Readers}
import org.ergoplatform.nodeView.history.ErgoHistoryReader
import org.ergoplatform.nodeView.history.extra.ExtraIndexer.ReceivableMessages.GetSegmentTreshold
import org.ergoplatform.nodeView.history.extra.ExtraIndexer.ReceivableMessages.GetSegmentThreshold
import org.ergoplatform.nodeView.history.extra.ExtraIndexer.{GlobalBoxIndexKey, GlobalTxIndexKey, getIndex}
import org.ergoplatform.nodeView.history.extra.IndexedErgoAddressSerializer.hashErgoTree
import org.ergoplatform.nodeView.history.extra.IndexedTokenSerializer.uniqueId
Expand All @@ -27,13 +27,14 @@ import scala.concurrent.duration.{Duration, SECONDS}
import scala.concurrent.{Await, Future}
import scala.util.Success

case class BlockchainApiRoute(readersHolder: ActorRef, ergoSettings: ErgoSettings, indexer: ActorRef)
case class BlockchainApiRoute(readersHolder: ActorRef, ergoSettings: ErgoSettings, indexerOpt: Option[ActorRef])
(implicit val context: ActorRefFactory) extends ErgoBaseApiRoute with ApiCodecs with ApiExtraCodecs {

val settings: RESTApiSettings = ergoSettings.scorexSettings.restApi

private implicit val segmentTreshold: Int =
Await.result[Int]((indexer ? GetSegmentTreshold).asInstanceOf[Future[Int]], Duration(3, SECONDS))
private implicit val segmentTreshold: Int = indexerOpt.map { indexer =>
Await.result[Int]((indexer ? GetSegmentThreshold).asInstanceOf[Future[Int]], Duration(3, SECONDS))
}.getOrElse(0)

private val paging: Directive[(Int, Int)] = parameters("offset".as[Int] ? 0, "limit".as[Int] ? 5)

Expand Down