Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ import org.lfdecentralizedtrust.splice.integration.tests.SpliceTests.{
SpliceTestConsoleEnvironment,
}
import org.lfdecentralizedtrust.splice.scan.admin.http.ProtobufJsonScanHttpEncodings
import org.lfdecentralizedtrust.splice.scan.automation.ScanHistoryBackfillingTrigger
import org.lfdecentralizedtrust.splice.scan.automation.{
DeleteCorruptAcsSnapshotTrigger,
ScanHistoryBackfillingTrigger,
}
import org.lfdecentralizedtrust.splice.store.{PageLimit, TreeUpdateWithMigrationId}
import org.lfdecentralizedtrust.splice.sv.automation.delegatebased.AdvanceOpenMiningRoundTrigger
import org.lfdecentralizedtrust.splice.util.{EventId, UpdateHistoryTestUtil, WalletTestUtil}
Expand Down Expand Up @@ -57,6 +60,7 @@ class ScanHistoryBackfillingIntegrationTest
updateAutomationConfig(ConfigurableApp.Scan)(
_.withPausedTrigger[ScanHistoryBackfillingTrigger]
.withPausedTrigger[TxLogBackfillingTrigger[TxLogEntry]]
.withPausedTrigger[DeleteCorruptAcsSnapshotTrigger]
)(config)
)
.addConfigTransforms((_, config) =>
Expand Down Expand Up @@ -258,6 +262,28 @@ class ScanHistoryBackfillingIntegrationTest
}
}

clue(
"Backfilling triggers state. All idle while waiting for corrupt snapshots to be deleted."
) {
sv1BackfillTrigger.retrieveTasks().futureValue should be(empty)
sv2BackfillTrigger.retrieveTasks().futureValue should be(empty)
sv1ScanTxLogBackfillTrigger.retrieveTasks().futureValue should be(empty)
sv2ScanTxLogBackfillTrigger.retrieveTasks().futureValue should be(empty)
}

actAndCheck(
"Run trigger that checks for corrupt snapshots once on all scans", {
sv1DeleteSnapshotsTrigger.runOnce().futureValue
sv2DeleteSnapshotsTrigger.runOnce().futureValue
},
)(
"History marked as free of corrupt snapshots",
_ => {
sv1ScanBackend.appState.store.updateHistory.corruptAcsSnapshotsDeleted shouldBe true
sv2ScanBackend.appState.store.updateHistory.corruptAcsSnapshotsDeleted shouldBe true
},
)

clue(
"Backfilling triggers state. SV1+SV2: update is about to initialize, txlog is not doing anything yet"
) {
Expand Down Expand Up @@ -521,6 +547,10 @@ class ScanHistoryBackfillingIntegrationTest
sv1ScanBackend.automation.trigger[TxLogBackfillingTrigger[TxLogEntry]]
private def sv2ScanTxLogBackfillTrigger(implicit env: SpliceTestConsoleEnvironment) =
sv2ScanBackend.automation.trigger[TxLogBackfillingTrigger[TxLogEntry]]
private def sv1DeleteSnapshotsTrigger(implicit env: SpliceTestConsoleEnvironment) =
sv1ScanBackend.automation.trigger[DeleteCorruptAcsSnapshotTrigger]
private def sv2DeleteSnapshotsTrigger(implicit env: SpliceTestConsoleEnvironment) =
sv2ScanBackend.automation.trigger[DeleteCorruptAcsSnapshotTrigger]

private def allUpdatesFromScanBackend(scanBackend: ScanAppBackendReference) = {
// Need to use the store directly, as the HTTP endpoint refuses to return data unless it's completely backfilled
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,41 @@ class HistoryMetrics(metricsFactory: LabeledMetricsFactory)(implicit
)(metricsContext)
}

object CorruptAcsSnapshots {
private val corruptAcsSnapshotsPrefix: MetricName = prefix :+ "corrupt-acs-snapshots"

type CantonTimestampMicros =
Long // OpenTelemetry Gauges only allow numeric types and there's no way to map it
val latestRecordTime: Gauge[CantonTimestampMicros] =
metricsFactory.gauge(
MetricInfo(
name = corruptAcsSnapshotsPrefix :+ "latest-record-time",
summary = "The record time of the latest corrupt snapshot that has been deleted",
Traffic,
),
initial = CantonTimestamp.MinValue.toMicros,
)(metricsContext)

val count: Counter =
metricsFactory.counter(
MetricInfo(
name = corruptAcsSnapshotsPrefix :+ "count",
summary = "The number of corrupt ACS snapshots deleted",
Traffic,
)
)(metricsContext)

val completed: Gauge[Int] =
metricsFactory.gauge(
MetricInfo(
name = corruptAcsSnapshotsPrefix :+ "completed",
summary = "Whether all corrupt snapshots are deleted (1) or not (0)",
Debug,
),
initial = 0,
)(metricsContext)
}

object UpdateHistory {
private val updateHistoryPrefix: MetricName = prefix :+ "updates"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,23 @@ import scala.jdk.CollectionConverters.*
import scala.jdk.OptionConverters.*
import org.lfdecentralizedtrust.splice.util.FutureUnlessShutdownUtil.futureUnlessShutdownToFuture

/** Stores all original daml updates visible to `updateStreamParty`.
*
* ==Related triggers==
*
* The following triggers perform long-running background tasks related to [[UpdateHistory]],
* and must complete in the following order:
*
* 1. [[DeleteCorruptAcsSnapshotTrigger]] deletes all ACS snapshots that were computed from this UpdateHistory while
* it was missing import updates. Such snapshots are easily identified by the trigger.
* UpdateHistory has an in-memory flag (as part of [[UpdateHistory.State]]) that stores
* whether all corrupt updates have been deleted.
* See [[corruptAcsSnapshotsDeleted]] and [[markCorruptAcsSnapshotsDeleted]].
* 1. [[ScanHistoryBackfillingTrigger]] backfills missing updates from peer scan applications.
* Information on the progress of this backfilling process is stored in the database.
* See [[destinationHistory.markBackfillingComplete]] and [[destinationHistory.markImportUpdatesBackfillingComplete]].
* 1. [[AcsSnapshotTrigger]] backfills ACS snapshots.
*/
class UpdateHistory(
storage: DbStorage,
val domainMigrationInfo: DomainMigrationInfo,
Expand Down Expand Up @@ -246,14 +263,6 @@ class UpdateHistory(
.map(_.map(LegacyOffset.Api.assertFromStringToLong))

_ <- cleanUpDataAfterDomainMigration(newHistoryId)

_ <-
if (enableImportUpdateBackfill) {
deleteInvalidAcsSnapshots(newHistoryId)
} else {
logger.info(s"Not deleting invalid ACS snapshots for history $newHistoryId")
Future.unit
}
} yield {
state.updateAndGet(
_.copy(
Expand Down Expand Up @@ -654,21 +663,17 @@ class UpdateHistory(
"""
}

private[this] def deleteInvalidAcsSnapshots(
historyId: Long
)(implicit tc: TraceContext): Future[Unit] = {
assert(enableImportUpdateBackfill)
def migrationsWithCorruptSnapshots(): Future[Set[Long]] = {
for {
migrationsWithImportUpdates <- storage
.query(
// The following is equivalent to:
// """select distinct migration_id
// from update_history_transactions
// where history_id = $historyId
// and record_time = ${CantonTimestamp.MinValue}"""
// but it uses a recursive CTE to implement a loose index scan
sql"""
def migrationsWithCorruptSnapshots()(implicit tc: TraceContext): Future[Set[Long]] = {
for {
migrationsWithImportUpdates <- storage
.query(
// The following is equivalent to:
// """select distinct migration_id
// from update_history_transactions
// where history_id = $historyId
// and record_time = ${CantonTimestamp.MinValue}"""
// but it uses a recursive CTE to implement a loose index scan
sql"""
with recursive t as (
(
select migration_id
Expand All @@ -688,56 +693,34 @@ class UpdateHistory(
)
select migration_id from t where migration_id is not null
""".as[Long],
"deleteInvalidAcsSnapshots.1",
)
firstMigrationId <- getFirstMigrationId(historyId)
migrationsWithSnapshots <- storage
.query(
sql"""
"deleteInvalidAcsSnapshots.1",
)
firstMigrationIdO <- getFirstMigrationId(historyId)
migrationsWithSnapshots <- storage
.query(
sql"""
select distinct migration_id
from acs_snapshot
where history_id = $historyId
""".as[Long],
"deleteInvalidAcsSnapshots.2",
)
} yield {
val migrationsThatNeedImportUpdates: Set[Long] =
migrationsWithSnapshots.toSet - firstMigrationId.getOrElse(
throw new RuntimeException("No first migration found")
)
migrationsThatNeedImportUpdates -- migrationsWithImportUpdates.toSet
"deleteInvalidAcsSnapshots.2",
)
} yield {
firstMigrationIdO match {
case None =>
Set.empty
case Some(firstMigrationId) =>
val migrationsThatNeedImportUpdates: Set[Long] =
migrationsWithSnapshots.toSet - firstMigrationId
migrationsThatNeedImportUpdates -- migrationsWithImportUpdates.toSet
}
}
}

for {
state <- getBackfillingStateForHistory(historyId)
_ <- state match {
// Note: we want to handle the case where backfilling finished before import update backfilling was implemented,
// because in that case UpdateHistory signalled that history was complete when it fact it was missing import updates,
// which caused [[AcsSnapshotTrigger]] to compute corrupt snapshots.
// It is fine to run the code below on each application startup because it only deletes corrupt snapshots.
case BackfillingState.InProgress(_, _) =>
logger.info(
s"This update history may be missing import updates, checking for corrupt ACS snapshots"
)
migrationsWithCorruptSnapshots()
.flatMap { migrations =>
Future.sequence(migrations.map { migrationId =>
deleteAcsSnapshotsAfter(historyId, migrationId, CantonTimestamp.MinValue)
})
}
.andThen { case _ =>
logger.info(s"Finished checking for corrupt ACS snapshots")
}
case _ =>
logger.debug(
s"History is in backfilling state $state, no need to check for corrupt ACS snapshots"
)
Future.unit
}
} yield {
()
}
def corruptAcsSnapshotsDeleted: Boolean = state.get.corruptSnapshotsDeleted
def markCorruptAcsSnapshotsDeleted(): Unit = {
state.updateAndGet(_.copy(corruptSnapshotsDeleted = true))
()
}

private[this] def cleanUpDataAfterDomainMigration(
Expand Down Expand Up @@ -2232,11 +2215,15 @@ object UpdateHistory {
)

case class State(
historyId: Option[Long]
historyId: Option[Long],
corruptSnapshotsDeleted: Boolean,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also remove this flag and have ScanHistoryBackfillingTrigger independently check for corrupt snapshots, at the cost of calling migrationsWithCorruptSnapshots more often.

migrationsWithCorruptSnapshots executes 3 independent SQL queries, each of them taking <10ms to run on CILR.

) {}

object State {
def empty(): State = State(None)
def empty(): State = State(
historyId = None,
corruptSnapshotsDeleted = false,
)
}

sealed trait BackfillingState
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// Copyright (c) 2024 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package org.lfdecentralizedtrust.splice.scan.automation

import com.daml.metrics.api.MetricsContext
import org.lfdecentralizedtrust.splice.automation.{
PollingParallelTaskExecutionTrigger,
TaskOutcome,
TaskSuccess,
TriggerContext,
}
import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore
import org.lfdecentralizedtrust.splice.store.{HistoryMetrics, UpdateHistory}
import com.digitalasset.canton.data.CantonTimestamp
import com.digitalasset.canton.logging.pretty.{Pretty, PrettyPrinting}
import com.digitalasset.canton.tracing.TraceContext
import io.opentelemetry.api.trace.Tracer
import org.apache.pekko.stream.Materializer

import scala.concurrent.{ExecutionContext, Future}

class DeleteCorruptAcsSnapshotTrigger(
store: AcsSnapshotStore,
updateHistory: UpdateHistory,
protected val context: TriggerContext,
)(implicit
ec: ExecutionContext,
tracer: Tracer,
mat: Materializer,
// we always return 1 task, so PollingParallelTaskExecutionTrigger in effect does nothing in parallel
) extends PollingParallelTaskExecutionTrigger[DeleteCorruptAcsSnapshotTrigger.Task] {

private val historyMetrics = new HistoryMetrics(context.metricsFactory)(MetricsContext.Empty)

override def retrieveTasks()(implicit
tc: TraceContext
): Future[Seq[DeleteCorruptAcsSnapshotTrigger.Task]] = {
if (!updateHistory.isReady) {
Future.successful(Seq.empty)
} else if (updateHistory.corruptAcsSnapshotsDeleted) {
Future.successful(Seq.empty)
} else {
for {
migrations <- updateHistory.migrationsWithCorruptSnapshots()
} yield migrations.lastOption match {
case Some(migrationToClean) =>
historyMetrics.CorruptAcsSnapshots.completed.updateValue(0)
Seq(DeleteCorruptAcsSnapshotTrigger.Task(migrationToClean))
case None =>
updateHistory.markCorruptAcsSnapshotsDeleted()
historyMetrics.CorruptAcsSnapshots.completed.updateValue(1)
Seq.empty
}
}
}

override protected def completeTask(task: DeleteCorruptAcsSnapshotTrigger.Task)(implicit
tc: TraceContext
): Future[TaskOutcome] = task match {
case DeleteCorruptAcsSnapshotTrigger.Task(migrationId) =>
for {
lastSnapshotO <- store.lookupSnapshotBefore(migrationId, CantonTimestamp.MaxValue)
lastSnapshot = lastSnapshotO.getOrElse(
throw new RuntimeException("Task should never become stale")
)
_ <- store.deleteSnapshot(lastSnapshot)
} yield {
historyMetrics.CorruptAcsSnapshots.count.inc()
historyMetrics.CorruptAcsSnapshots.latestRecordTime.updateValue(
lastSnapshot.snapshotRecordTime.toMicros
)
TaskSuccess(
s"Successfully deleted snapshot $lastSnapshot."
)
}
}

override protected def isStaleTask(task: DeleteCorruptAcsSnapshotTrigger.Task)(implicit
tc: TraceContext
): Future[Boolean] = Future.successful(false)
}

object DeleteCorruptAcsSnapshotTrigger {

case class Task(
migrationId: Long
) extends PrettyPrinting {
import org.lfdecentralizedtrust.splice.util.PrettyInstances.*

override def pretty: Pretty[this.type] = prettyOfClass(
param("migrationId", _.migrationId)
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,15 @@ class ScanAutomationService(
triggerContext,
)
)
if (config.updateHistoryBackfillImportUpdatesEnabled) {
registerTrigger(
new DeleteCorruptAcsSnapshotTrigger(
snapshotStore,
store.updateHistory,
triggerContext,
)
)
}
if (config.txLogBackfillEnabled) {
registerTrigger(
new TxLogBackfillingTrigger(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ class ScanHistoryBackfillingTrigger(
if (!store.updateHistory.isReady) {
logger.debug("UpdateHistory is not yet ready")
Future.successful(Seq.empty)
} else if (importUpdateBackfillingEnabled && !store.updateHistory.corruptAcsSnapshotsDeleted) {
logger.debug("There may be corrupt ACS snapshots that need to be deleted")
Future.successful(Seq.empty)
} else {
store.updateHistory.getBackfillingState().map {
case BackfillingState.Complete =>
Expand Down
Loading
Loading