Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ class SingleScanConnection private[client] (
with ScanConnection
with BackfillingScanConnection
with HasUrl {
import ScanRoundAggregatesDecoder.*

def url = config.adminApi.url

// cached DSO reference. Never changes.
Expand Down Expand Up @@ -392,76 +394,6 @@ class SingleScanConnection private[client] (
}
}

private def decodeRoundTotal(
rt: org.lfdecentralizedtrust.splice.http.v0.definitions.RoundTotals
): Either[String, ScanAggregator.RoundTotals] = {
(for {
closedRoundEffectiveAt <- CantonTimestamp.fromInstant(rt.closedRoundEffectiveAt.toInstant)
appRewards <- Codec.decode(Codec.BigDecimal)(rt.appRewards)
validatorRewards <- Codec.decode(Codec.BigDecimal)(rt.validatorRewards)
changeToInitialAmountAsOfRoundZero <- Codec
.decode(Codec.BigDecimal)(rt.changeToInitialAmountAsOfRoundZero)
changeToHoldingFeesRate <- Codec.decode(Codec.BigDecimal)(rt.changeToHoldingFeesRate)
cumulativeAppRewards <- Codec.decode(Codec.BigDecimal)(rt.cumulativeAppRewards)
cumulativeValidatorRewards <- Codec
.decode(Codec.BigDecimal)(rt.cumulativeValidatorRewards)
cumulativeChangeToInitialAmountAsOfRoundZero <- Codec
.decode(Codec.BigDecimal)(rt.cumulativeChangeToInitialAmountAsOfRoundZero)
cumulativeChangeToHoldingFeesRate <- Codec
.decode(Codec.BigDecimal)(rt.cumulativeChangeToHoldingFeesRate)
totalAmuletBalance <- Codec.decode(Codec.BigDecimal)(rt.totalAmuletBalance)
} yield {
ScanAggregator.RoundTotals(
closedRound = rt.closedRound,
closedRoundEffectiveAt = closedRoundEffectiveAt,
appRewards = appRewards,
validatorRewards = validatorRewards,
changeToInitialAmountAsOfRoundZero = changeToInitialAmountAsOfRoundZero,
changeToHoldingFeesRate = changeToHoldingFeesRate,
cumulativeAppRewards = cumulativeAppRewards,
cumulativeValidatorRewards = cumulativeValidatorRewards,
cumulativeChangeToInitialAmountAsOfRoundZero = cumulativeChangeToInitialAmountAsOfRoundZero,
cumulativeChangeToHoldingFeesRate = cumulativeChangeToHoldingFeesRate,
totalAmuletBalance = totalAmuletBalance,
)
})
}

private def decodeRoundPartyTotals(
rt: org.lfdecentralizedtrust.splice.http.v0.definitions.RoundPartyTotals
): Either[String, ScanAggregator.RoundPartyTotals] = {
(for {
appRewards <- Codec.decode(Codec.BigDecimal)(rt.appRewards)
validatorRewards <- Codec.decode(Codec.BigDecimal)(rt.validatorRewards)
trafficPurchasedCcSpent <- Codec.decode(Codec.BigDecimal)(rt.trafficPurchasedCcSpent)
cumulativeAppRewards <- Codec.decode(Codec.BigDecimal)(rt.cumulativeAppRewards)
cumulativeValidatorRewards <- Codec.decode(Codec.BigDecimal)(rt.cumulativeValidatorRewards)
cumulativeChangeToInitialAmountAsOfRoundZero <- Codec
.decode(Codec.BigDecimal)(rt.cumulativeChangeToInitialAmountAsOfRoundZero)
cumulativeChangeToHoldingFeesRate <- Codec
.decode(Codec.BigDecimal)(rt.cumulativeChangeToHoldingFeesRate)
cumulativeTrafficPurchasedCcSpent <- Codec
.decode(Codec.BigDecimal)(rt.cumulativeTrafficPurchasedCcSpent)
} yield {
ScanAggregator.RoundPartyTotals(
closedRound = rt.closedRound,
party = rt.party,
appRewards = appRewards,
validatorRewards = validatorRewards,
trafficPurchased = rt.trafficPurchased,
trafficPurchasedCcSpent = trafficPurchasedCcSpent,
trafficNumPurchases = rt.trafficNumPurchases,
cumulativeAppRewards = cumulativeAppRewards,
cumulativeValidatorRewards = cumulativeValidatorRewards,
cumulativeChangeToInitialAmountAsOfRoundZero = cumulativeChangeToInitialAmountAsOfRoundZero,
cumulativeChangeToHoldingFeesRate = cumulativeChangeToHoldingFeesRate,
cumulativeTrafficPurchased = rt.cumulativeTrafficPurchased,
cumulativeTrafficPurchasedCcSpent = cumulativeTrafficPurchasedCcSpent,
cumulativeTrafficNumPurchases = rt.cumulativeTrafficNumPurchases,
)
})
}

override def getMigrationSchedule()(implicit
ec: ExecutionContext,
tc: TraceContext,
Expand Down Expand Up @@ -878,3 +810,63 @@ class CachedScanConnection private[client] (
)
)
}

object ScanRoundAggregatesDecoder {
def decodeRoundTotal(
rt: org.lfdecentralizedtrust.splice.http.v0.definitions.RoundTotals
): Either[String, ScanAggregator.RoundTotals] = {
(for {
closedRoundEffectiveAt <- CantonTimestamp.fromInstant(rt.closedRoundEffectiveAt.toInstant)
appRewards <- Codec.decode(Codec.BigDecimal)(rt.appRewards)
validatorRewards <- Codec.decode(Codec.BigDecimal)(rt.validatorRewards)
cumulativeAppRewards <- Codec.decode(Codec.BigDecimal)(rt.cumulativeAppRewards)
cumulativeValidatorRewards <- Codec
.decode(Codec.BigDecimal)(rt.cumulativeValidatorRewards)
} yield {
// changeToInitialAmountAsOfRoundZero, changeToHoldingFeesRate, cumulativeChangeToInitialAmountAsOfRoundZero,
// cumulativeChangeToHoldingFeesRate and totalAmuletBalance are intentionally left out
// since these do not match up anymore because amulet expires are attributed to the closed round at a later stage
// in scan_txlog_store, at a time that can easily differ between SVs.
ScanAggregator.RoundTotals(
closedRound = rt.closedRound,
closedRoundEffectiveAt = closedRoundEffectiveAt,
appRewards = appRewards,
validatorRewards = validatorRewards,
cumulativeAppRewards = cumulativeAppRewards,
cumulativeValidatorRewards = cumulativeValidatorRewards,
)
})
}

def decodeRoundPartyTotals(
rt: org.lfdecentralizedtrust.splice.http.v0.definitions.RoundPartyTotals
): Either[String, ScanAggregator.RoundPartyTotals] = {
(for {
appRewards <- Codec.decode(Codec.BigDecimal)(rt.appRewards)
validatorRewards <- Codec.decode(Codec.BigDecimal)(rt.validatorRewards)
trafficPurchasedCcSpent <- Codec.decode(Codec.BigDecimal)(rt.trafficPurchasedCcSpent)
cumulativeAppRewards <- Codec.decode(Codec.BigDecimal)(rt.cumulativeAppRewards)
cumulativeValidatorRewards <- Codec.decode(Codec.BigDecimal)(rt.cumulativeValidatorRewards)
cumulativeTrafficPurchasedCcSpent <- Codec
.decode(Codec.BigDecimal)(rt.cumulativeTrafficPurchasedCcSpent)
} yield {
// cumulativeChangeToInitialAmountAsOfRoundZero and cumulativeChangeToHoldingFeesRate are intentionally left out
// since these do not match up anymore because amulet expires are attributed to the closed round at a later stage
// in scan_txlog_store, at a time that can easily differ between SVs.
ScanAggregator.RoundPartyTotals(
closedRound = rt.closedRound,
party = rt.party,
appRewards = appRewards,
validatorRewards = validatorRewards,
trafficPurchased = rt.trafficPurchased,
trafficPurchasedCcSpent = trafficPurchasedCcSpent,
trafficNumPurchases = rt.trafficNumPurchases,
cumulativeAppRewards = cumulativeAppRewards,
cumulativeValidatorRewards = cumulativeValidatorRewards,
cumulativeTrafficPurchased = rt.cumulativeTrafficPurchased,
cumulativeTrafficPurchasedCcSpent = cumulativeTrafficPurchasedCcSpent,
cumulativeTrafficNumPurchases = rt.cumulativeTrafficNumPurchases,
)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ import com.digitalasset.canton.util.ErrorUtil
import org.lfdecentralizedtrust.splice.environment.TopologyAdminConnection.TopologyTransactionType.AuthorizedState
import org.lfdecentralizedtrust.splice.scan.config.BftSequencerConfig
import org.lfdecentralizedtrust.splice.scan.store.AcsSnapshotStore.QueryAcsSnapshotResult
import org.lfdecentralizedtrust.splice.scan.store.db.ScanAggregator.{RoundPartyTotals, RoundTotals}
import org.lfdecentralizedtrust.splice.store.MultiDomainAcsStore.TxLogBackfillingState
import org.lfdecentralizedtrust.splice.store.UpdateHistory.BackfillingState
import org.lfdecentralizedtrust.splice.store.UpdateHistory
Expand Down Expand Up @@ -125,7 +126,7 @@ class HttpScanHandler(
with HttpVotesHandler
with HttpValidatorLicensesHandler
with HttpFeatureSupportHandler {

import HttpScanHandler.*
override protected val workflowId: String = this.getClass.getSimpleName
override protected val votesStore: VotesStore = store
override protected val validatorLicensesStore: AppStore = store
Expand Down Expand Up @@ -1853,25 +1854,7 @@ class HttpScanHandler(
ensureValidRange(request.startRound, request.endRound, 200) {
for {
roundTotals <- store.getRoundTotals(request.startRound, request.endRound)
entries = roundTotals.map { roundTotal =>
definitions.RoundTotals(
closedRound = roundTotal.closedRound,
closedRoundEffectiveAt = java.time.OffsetDateTime
.ofInstant(roundTotal.closedRoundEffectiveAt.toInstant, ZoneOffset.UTC),
appRewards = Codec.encode(roundTotal.appRewards),
validatorRewards = Codec.encode(roundTotal.validatorRewards),
changeToInitialAmountAsOfRoundZero =
Codec.encode(roundTotal.changeToInitialAmountAsOfRoundZero),
changeToHoldingFeesRate = Codec.encode(roundTotal.changeToHoldingFeesRate),
cumulativeAppRewards = Codec.encode(roundTotal.cumulativeAppRewards),
cumulativeValidatorRewards = Codec.encode(roundTotal.cumulativeValidatorRewards),
cumulativeChangeToInitialAmountAsOfRoundZero =
Codec.encode(roundTotal.cumulativeChangeToInitialAmountAsOfRoundZero),
cumulativeChangeToHoldingFeesRate =
Codec.encode(roundTotal.cumulativeChangeToHoldingFeesRate),
totalAmuletBalance = Codec.encode(roundTotal.totalAmuletBalance),
)
}
entries = roundTotals.map(encodeRoundTotals)
} yield v0.ScanResource.ListRoundTotalsResponse.OK(
definitions.ListRoundTotalsResponse(entries.toVector)
)
Expand All @@ -1888,27 +1871,7 @@ class HttpScanHandler(
ensureValidRange(request.startRound, request.endRound, 50) {
for {
roundPartyTotals <- store.getRoundPartyTotals(request.startRound, request.endRound)
entries = roundPartyTotals.map { roundPartyTotal =>
definitions.RoundPartyTotals(
closedRound = roundPartyTotal.closedRound,
party = roundPartyTotal.party,
appRewards = Codec.encode(roundPartyTotal.appRewards),
validatorRewards = Codec.encode(roundPartyTotal.validatorRewards),
trafficPurchased = roundPartyTotal.trafficPurchased,
trafficPurchasedCcSpent = Codec.encode(roundPartyTotal.trafficPurchasedCcSpent),
trafficNumPurchases = roundPartyTotal.trafficNumPurchases,
cumulativeAppRewards = Codec.encode(roundPartyTotal.cumulativeAppRewards),
cumulativeValidatorRewards = Codec.encode(roundPartyTotal.cumulativeValidatorRewards),
cumulativeChangeToInitialAmountAsOfRoundZero =
Codec.encode(roundPartyTotal.cumulativeChangeToInitialAmountAsOfRoundZero),
cumulativeChangeToHoldingFeesRate =
Codec.encode(roundPartyTotal.cumulativeChangeToHoldingFeesRate),
cumulativeTrafficPurchased = roundPartyTotal.cumulativeTrafficPurchased,
cumulativeTrafficPurchasedCcSpent =
Codec.encode(roundPartyTotal.cumulativeTrafficPurchasedCcSpent),
cumulativeTrafficNumPurchases = roundPartyTotal.cumulativeTrafficNumPurchases,
)
}
entries = roundPartyTotals.map(encodeRoundPartyTotals)
} yield v0.ScanResource.ListRoundPartyTotalsResponse.OK(
definitions.ListRoundPartyTotalsResponse(entries.toVector)
)
Expand Down Expand Up @@ -2303,4 +2266,46 @@ object HttpScanHandler {
// We expect a handful at most but want to somewhat guard against attacks
// so we just hardcode a limit of 100.
private val MAX_TRANSFER_COMMAND_CONTRACTS: Int = 100

def encodeRoundTotals(roundTotal: RoundTotals): definitions.RoundTotals = {
definitions.RoundTotals(
closedRound = roundTotal.closedRound,
closedRoundEffectiveAt = java.time.OffsetDateTime
.ofInstant(roundTotal.closedRoundEffectiveAt.toInstant, ZoneOffset.UTC),
appRewards = Codec.encode(roundTotal.appRewards),
validatorRewards = Codec.encode(roundTotal.validatorRewards),
changeToInitialAmountAsOfRoundZero =
Codec.encode(roundTotal.changeToInitialAmountAsOfRoundZero),
changeToHoldingFeesRate = Codec.encode(roundTotal.changeToHoldingFeesRate),
cumulativeAppRewards = Codec.encode(roundTotal.cumulativeAppRewards),
cumulativeValidatorRewards = Codec.encode(roundTotal.cumulativeValidatorRewards),
cumulativeChangeToInitialAmountAsOfRoundZero =
Codec.encode(roundTotal.cumulativeChangeToInitialAmountAsOfRoundZero),
cumulativeChangeToHoldingFeesRate =
Codec.encode(roundTotal.cumulativeChangeToHoldingFeesRate),
totalAmuletBalance = Codec.encode(roundTotal.totalAmuletBalance),
)
}

def encodeRoundPartyTotals(roundPartyTotal: RoundPartyTotals): definitions.RoundPartyTotals = {
definitions.RoundPartyTotals(
closedRound = roundPartyTotal.closedRound,
party = roundPartyTotal.party,
appRewards = Codec.encode(roundPartyTotal.appRewards),
validatorRewards = Codec.encode(roundPartyTotal.validatorRewards),
trafficPurchased = roundPartyTotal.trafficPurchased,
trafficPurchasedCcSpent = Codec.encode(roundPartyTotal.trafficPurchasedCcSpent),
trafficNumPurchases = roundPartyTotal.trafficNumPurchases,
cumulativeAppRewards = Codec.encode(roundPartyTotal.cumulativeAppRewards),
cumulativeValidatorRewards = Codec.encode(roundPartyTotal.cumulativeValidatorRewards),
cumulativeChangeToInitialAmountAsOfRoundZero =
Codec.encode(roundPartyTotal.cumulativeChangeToInitialAmountAsOfRoundZero),
cumulativeChangeToHoldingFeesRate =
Codec.encode(roundPartyTotal.cumulativeChangeToHoldingFeesRate),
cumulativeTrafficPurchased = roundPartyTotal.cumulativeTrafficPurchased,
cumulativeTrafficPurchasedCcSpent =
Codec.encode(roundPartyTotal.cumulativeTrafficPurchasedCcSpent),
cumulativeTrafficNumPurchases = roundPartyTotal.cumulativeTrafficNumPurchases,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ import org.lfdecentralizedtrust.splice.environment.{
SpliceLedgerClient,
}
import org.lfdecentralizedtrust.splice.http.v0.definitions.ErrorResponse

import org.lfdecentralizedtrust.splice.scan.admin.api.client.BftScanConnection.Bft
import org.lfdecentralizedtrust.splice.scan.admin.api.client.commands.HttpScanAppClient.{
DomainScans,
DsoScan,
}
import org.lfdecentralizedtrust.splice.scan.admin.http.HttpScanHandler
import org.lfdecentralizedtrust.splice.scan.config.ScanAppClientConfig
import org.lfdecentralizedtrust.splice.store.HistoryBackfilling.SourceMigrationInfo
import org.lfdecentralizedtrust.splice.store.MultiDomainAcsStore.ContractState
Expand All @@ -54,6 +56,7 @@ import org.slf4j.event.Level
import java.time.{Duration, Instant}
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random

// mock verification triggers this
@SuppressWarnings(Array("com.digitalasset.canton.DiscardedFuture"))
Expand Down Expand Up @@ -922,6 +925,93 @@ class BftScanConnectionTest
result shouldBe Some(roundAggregate)
}

"get BFT round aggregates from scans, ignoring balance fields" in {
val round = 0L
def randomValue = BigDecimal(Random.nextInt(50) + 1)
def mkRoundTotals() = RoundTotals(
closedRound = round,
closedRoundEffectiveAt = CantonTimestamp.MinValue,
appRewards = BigDecimal(100),
validatorRewards = BigDecimal(150),
changeToInitialAmountAsOfRoundZero = randomValue,
changeToHoldingFeesRate = randomValue,
cumulativeAppRewards = BigDecimal(1100),
cumulativeValidatorRewards = BigDecimal(1150),
cumulativeChangeToInitialAmountAsOfRoundZero = randomValue,
cumulativeChangeToHoldingFeesRate = randomValue,
totalAmuletBalance = randomValue,
)
def mkRoundPartyTotals() = RoundPartyTotals(
closedRound = round,
party = "party-id",
appRewards = BigDecimal(10),
validatorRewards = BigDecimal(20),
trafficPurchased = 10L,
trafficPurchasedCcSpent = BigDecimal(30),
trafficNumPurchases = 30L,
cumulativeAppRewards = BigDecimal(40),
cumulativeValidatorRewards = BigDecimal(50),
cumulativeChangeToInitialAmountAsOfRoundZero = randomValue,
cumulativeChangeToHoldingFeesRate = randomValue,
cumulativeTrafficPurchased = 50L,
cumulativeTrafficPurchasedCcSpent = BigDecimal(70),
cumulativeTrafficNumPurchases = 70L,
)
def mkRoundAggregateUsingDecoder() = RoundAggregate(
ScanRoundAggregatesDecoder
.decodeRoundTotal(HttpScanHandler.encodeRoundTotals(mkRoundTotals()))
.value,
Vector(
ScanRoundAggregatesDecoder
.decodeRoundPartyTotals(HttpScanHandler.encodeRoundPartyTotals(mkRoundPartyTotals()))
.value
),
)
def mkRoundAggregateWithoutDecoder() = RoundAggregate(
mkRoundTotals(),
Vector(mkRoundPartyTotals()),
)
val roundAggregateZeroBalanceValues = mkRoundAggregateWithoutDecoder().copy(
roundTotals = mkRoundAggregateWithoutDecoder().roundTotals.copy(
changeToInitialAmountAsOfRoundZero = zero,
changeToHoldingFeesRate = zero,
cumulativeChangeToInitialAmountAsOfRoundZero = zero,
cumulativeChangeToHoldingFeesRate = zero,
totalAmuletBalance = zero,
),
roundPartyTotals = mkRoundAggregateWithoutDecoder().roundPartyTotals.map(
_.copy(
cumulativeChangeToInitialAmountAsOfRoundZero = zero,
cumulativeChangeToHoldingFeesRate = zero,
)
),
)

def getConnections(roundAggregateResponse: () => RoundAggregate) = {
val connections = getMockedConnections(n = 10)
connections.foreach { mock =>
when(mock.getAggregatedRounds())
.thenReturn(Future.successful(Some(RoundRange(round, round))))
when(mock.getRoundAggregate(round))
.thenReturn(Future.successful(Some(roundAggregateResponse())))
}
connections
}

val bft = getBft(getConnections(() => mkRoundAggregateUsingDecoder()))
val con =
new ScanAggregatesConnection(bft, retryProvider, retryProvider.loggerFactory)
val result = con.getRoundAggregate(round).futureValue
result shouldBe Some(roundAggregateZeroBalanceValues)

// not using the decoder should fail on the randomized balance values.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why even test/commit this? That's a code path that doesn't exist like this anymore, isn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be sure the decode stuff does something

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well if it didn't the previous should should fail? But I don't mind keeping it

Copy link
Contributor Author

@ray-roestenburg-da ray-roestenburg-da Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah true, I could remove it, but liked to see it was failing the consensus, so I didn't do some encode/decode that did nothing

val bftFail = getBft(getConnections(() => mkRoundAggregateWithoutDecoder()))
val conFail =
new ScanAggregatesConnection(bftFail, retryProvider, retryProvider.loggerFactory)
val resultFail = conFail.getRoundAggregate(round).failed.futureValue
resultFail shouldBe an[BftScanConnection.ConsensusNotReached]
}

"Not get round aggregates from scans that report having the round aggregate if too many fail" in {
val connections = getMockedConnections(n = 10)
connections.zipWithIndex.foreach { case (mock, index) =>
Expand Down
Loading
Loading