Skip to content

Commit

Permalink
Always use the latest ledger config (#5669)
Browse files Browse the repository at this point in the history
CHANGELOG_BEGIN
- [Sandbox] The ledger API server will now always use the most recent ledger configuration.
  Until a ledger configuration is read from the ledger, command submissions will fail with the UNAVAILABLE error.
CHANGELOG_END

In kvutils, the first ledger configuration change needs
to have a generation one higher than the one returned
by getLedgerInitialConditions().

Remove initial config writing from sandbox as it's now written by the ledger API server
  • Loading branch information
rautenrieth-da authored May 6, 2020
1 parent c21768c commit 7e448d8
Show file tree
Hide file tree
Showing 34 changed files with 487 additions and 244 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ final class CommandsValidator(ledgerId: LedgerId) {
commands: ProtoCommands,
currentLedgerTime: Instant,
currentUtcTime: Instant,
maxDeduplicationTime: Duration): Either[StatusRuntimeException, domain.Commands] =
maxDeduplicationTime: Option[Duration]): Either[StatusRuntimeException, domain.Commands] =
for {
cmdLegerId <- requireLedgerString(commands.ledgerId, "ledger_id")
ledgerId <- matchLedgerId(ledgerId)(LedgerId(cmdLegerId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ class SubmitAndWaitRequestValidator(commandsValidator: CommandsValidator) {
req: SubmitAndWaitRequest,
currentLedgerTime: Instant,
currentUtcTime: Instant,
maxDeduplicationTime: Duration): Either[StatusRuntimeException, submission.SubmitRequest] =
maxDeduplicationTime: Option[Duration])
: Either[StatusRuntimeException, submission.SubmitRequest] =
for {
commands <- requirePresence(req.commands, "commands")
validatedCommands <- commandsValidator.validateCommands(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ class SubmitRequestValidator(commandsValidator: CommandsValidator) {
req: SubmitRequest,
currentLedgerTime: Instant,
currentUtcTime: Instant,
maxDeduplicationTime: Duration): Either[StatusRuntimeException, submission.SubmitRequest] =
maxDeduplicationTime: Option[Duration])
: Either[StatusRuntimeException, submission.SubmitRequest] =
for {
commands <- requirePresence(req.commands, "commands")
validatedCommands <- commandsValidator.validateCommands(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class GrpcCommandService(
val ledgerId: LedgerId,
currentLedgerTime: () => Instant,
currentUtcTime: () => Instant,
maxDeduplicationTime: () => Duration
maxDeduplicationTime: () => Option[Duration]
) extends CommandService
with GrpcApiService
with ProxyCloseable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class GrpcCommandSubmissionService(
ledgerId: LedgerId,
currentLedgerTime: () => Instant,
currentUtcTime: () => Instant,
maxDeduplicationTime: () => Duration,
maxDeduplicationTime: () => Option[Duration],
metrics: Metrics,
) extends ApiCommandSubmissionService
with ProxyCloseable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ trait ErrorFactories {
def unauthenticated(): StatusRuntimeException =
grpcError(Status.UNAUTHENTICATED)

def missingLedgerConfig(): StatusRuntimeException =
grpcError(Status.UNAVAILABLE.withDescription("The ledger configuration is not available."))

def resourceExhausted(description: String): StatusRuntimeException =
grpcError(Status.RESOURCE_EXHAUSTED.withDescription(description))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,21 +92,24 @@ trait FieldValidations {

def validateDeduplicationTime(
durationO: Option[com.google.protobuf.duration.Duration],
maxDeduplicationTime: Duration,
fieldName: String): Either[StatusRuntimeException, Duration] = durationO match {
case None =>
Right(maxDeduplicationTime)
case Some(duration) =>
val result = Duration.ofSeconds(duration.seconds, duration.nanos.toLong)
if (result.isNegative)
Left(invalidField(fieldName, "Duration must be positive"))
else if (result.compareTo(maxDeduplicationTime) > 0)
Left(invalidField(
fieldName,
s"The given deduplication time of $result exceeds the maximum deduplication time of $maxDeduplicationTime"))
else
Right(result)
}
maxDeduplicationTimeO: Option[Duration],
fieldName: String): Either[StatusRuntimeException, Duration] =
maxDeduplicationTimeO.fold[Either[StatusRuntimeException, Duration]](
Left(missingLedgerConfig()))(maxDeduplicationTime =>
durationO match {
case None =>
Right(maxDeduplicationTime)
case Some(duration) =>
val result = Duration.ofSeconds(duration.seconds, duration.nanos.toLong)
if (result.isNegative)
Left(invalidField(fieldName, "Duration must be positive"))
else if (result.compareTo(maxDeduplicationTime) > 0)
Left(invalidField(
fieldName,
s"The given deduplication time of $result exceeds the maximum deduplication time of $maxDeduplicationTime"))
else
Right(result)
})

def validateIdentifier(identifier: Identifier): Either[StatusRuntimeException, Ref.Identifier] =
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import com.daml.ledger.api.v1.value.Value.Sum
import com.daml.ledger.api.v1.value.{List => ApiList, Map => ApiMap, Optional => ApiOptional, _}
import com.google.protobuf.duration.Duration
import com.google.protobuf.empty.Empty
import io.grpc.Status.Code.INVALID_ARGUMENT
import io.grpc.Status.Code.{INVALID_ARGUMENT, UNAVAILABLE}
import org.scalatest.WordSpec
import org.scalatest.prop.TableDrivenPropertyChecks
import scalaz.syntax.tag._
Expand Down Expand Up @@ -124,7 +124,7 @@ class SubmitRequestValidatorTest
api.commands.withCommands(Seq.empty),
internal.ledgerTime,
internal.submittedAt,
internal.maxDeduplicationTime),
Some(internal.maxDeduplicationTime)),
INVALID_ARGUMENT,
"Missing field: commands"
)
Expand All @@ -137,7 +137,7 @@ class SubmitRequestValidatorTest
api.commands.withLedgerId(""),
internal.ledgerTime,
internal.submittedAt,
internal.maxDeduplicationTime),
Some(internal.maxDeduplicationTime)),
INVALID_ARGUMENT,
"Missing field: ledger_id"
)
Expand All @@ -148,7 +148,7 @@ class SubmitRequestValidatorTest
api.commands.withWorkflowId(""),
internal.ledgerTime,
internal.submittedAt,
internal.maxDeduplicationTime) shouldEqual Right(
Some(internal.maxDeduplicationTime)) shouldEqual Right(
internal.emptyCommands.copy(
workflowId = None,
commands = internal.emptyCommands.commands.copy(commandsReference = "")))
Expand All @@ -160,7 +160,7 @@ class SubmitRequestValidatorTest
api.commands.withApplicationId(""),
internal.ledgerTime,
internal.submittedAt,
internal.maxDeduplicationTime),
Some(internal.maxDeduplicationTime)),
INVALID_ARGUMENT,
"Missing field: application_id"
)
Expand All @@ -172,7 +172,7 @@ class SubmitRequestValidatorTest
api.commands.withCommandId(""),
internal.ledgerTime,
internal.submittedAt,
internal.maxDeduplicationTime),
Some(internal.maxDeduplicationTime)),
INVALID_ARGUMENT,
"Missing field: command_id"
)
Expand All @@ -185,7 +185,7 @@ class SubmitRequestValidatorTest
api.commands.withParty(""),
internal.ledgerTime,
internal.submittedAt,
internal.maxDeduplicationTime),
Some(internal.maxDeduplicationTime)),
INVALID_ARGUMENT,
"""Missing field: party"""
)
Expand All @@ -198,7 +198,7 @@ class SubmitRequestValidatorTest
minLedgerTimeAbs = Some(TimestampConversion.fromInstant(minLedgerTimeAbs))),
internal.ledgerTime,
internal.submittedAt,
internal.maxDeduplicationTime
Some(internal.maxDeduplicationTime)
) shouldEqual Right(withLedgerTime(internal.emptyCommands, minLedgerTimeAbs))
}

Expand All @@ -209,7 +209,7 @@ class SubmitRequestValidatorTest
minLedgerTimeRel = Some(DurationConversion.toProto(internal.timeDelta))),
internal.ledgerTime,
internal.submittedAt,
internal.maxDeduplicationTime
Some(internal.maxDeduplicationTime)
) shouldEqual Right(withLedgerTime(internal.emptyCommands, minLedgerTimeAbs))
}

Expand All @@ -219,7 +219,7 @@ class SubmitRequestValidatorTest
api.commands.copy(deduplicationTime = Some(Duration.of(-1, 0))),
internal.ledgerTime,
internal.submittedAt,
internal.maxDeduplicationTime),
Some(internal.maxDeduplicationTime)),
INVALID_ARGUMENT,
"Invalid field deduplication_time: Duration must be positive"
)
Expand All @@ -232,7 +232,7 @@ class SubmitRequestValidatorTest
api.commands.copy(deduplicationTime = Some(Duration.of(manySeconds, 0))),
internal.ledgerTime,
internal.submittedAt,
internal.maxDeduplicationTime),
Some(internal.maxDeduplicationTime)),
INVALID_ARGUMENT,
s"Invalid field deduplication_time: The given deduplication time of ${java.time.Duration
.ofSeconds(manySeconds)} exceeds the maximum deduplication time of ${internal.maxDeduplicationTime}"
Expand All @@ -244,11 +244,20 @@ class SubmitRequestValidatorTest
api.commands.copy(deduplicationTime = None),
internal.ledgerTime,
internal.submittedAt,
internal.maxDeduplicationTime) shouldEqual Right(
Some(internal.maxDeduplicationTime)) shouldEqual Right(
internal.emptyCommands.copy(
deduplicateUntil = internal.submittedAt.plus(internal.maxDeduplicationTime)))
}

"not allow missing ledger configuration" in {
requestMustFailWith(
commandsValidator
.validateCommands(api.commands, internal.ledgerTime, internal.submittedAt, None),
UNAVAILABLE,
"The ledger configuration is not available."
)
}

}

"validating contractId values" should {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.akkastreams.dispatcher.Dispatcher
import com.daml.platform.apiserver.ApiServerConfig
import com.daml.platform.configuration.LedgerConfiguration
import com.daml.resources.{ProgramResource, ResourceOwner}
import scopt.OptionParser

Expand Down Expand Up @@ -66,6 +67,9 @@ object Main {
engine = engine,
)

override def ledgerConfig(config: Config[ExtraConfig]): LedgerConfiguration =
LedgerConfiguration.defaultLocalLedger

override val defaultExtraConfig: ExtraConfig = ExtraConfig.default

override final def extraConfigParser(parser: OptionParser[Config[ExtraConfig]]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

package com.daml.ledger.on.sql

import java.time.Duration

import akka.stream.Materializer
import com.daml.ledger.participant.state.kvutils.api.KeyValueParticipantState
import com.daml.ledger.participant.state.kvutils.app.{
Expand All @@ -16,6 +18,7 @@ import com.daml.ledger.participant.state.v1.SeedService
import com.daml.lf.engine.Engine
import com.daml.logging.LoggingContext
import com.daml.metrics.Metrics
import com.daml.platform.configuration.LedgerConfiguration
import com.daml.resources.{Resource, ResourceOwner}
import scopt.OptionParser

Expand All @@ -26,6 +29,9 @@ object SqlLedgerFactory extends LedgerFactory[ReadWriteService, ExtraConfig] {
jdbcUrl = None,
)

override def ledgerConfig(config: Config[ExtraConfig]): LedgerConfiguration =
super.ledgerConfig(config).copy(initialConfigurationSubmitDelay = Duration.ZERO)

override def extraConfigParser(parser: OptionParser[Config[ExtraConfig]]): Unit = {
parser
.opt[String]("jdbc-url")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ trait IndexConfigManagementService {
def lookupConfiguration(): Future[Option[(LedgerOffset.Absolute, Configuration)]]

/** Retrieve configuration entries. */
def configurationEntries(
startExclusive: Option[LedgerOffset.Absolute]): Source[ConfigurationEntry, NotUsed]
def configurationEntries(startExclusive: Option[LedgerOffset.Absolute])
: Source[(LedgerOffset.Absolute, ConfigurationEntry), NotUsed]

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

package com.daml.ledger.participant.state.kvutils.app

import java.time.Duration

import akka.stream.Materializer
import com.codahale.metrics.{MetricRegistry, SharedMetricRegistries}
import com.daml.ledger.api.auth.{AuthService, AuthServiceWildcard}
Expand All @@ -17,8 +15,7 @@ import com.daml.platform.apiserver.{ApiServerConfig, TimeServiceBackend}
import com.daml.platform.configuration.{
CommandConfiguration,
LedgerConfiguration,
PartyConfiguration,
SubmissionConfiguration
PartyConfiguration
}
import com.daml.platform.indexer.{IndexerConfig, IndexerStartupMode}
import com.daml.resources.ResourceOwner
Expand Down Expand Up @@ -73,13 +70,8 @@ trait ConfigProvider[ExtraConfig] {
def partyConfig(config: Config[ExtraConfig]): PartyConfiguration =
PartyConfiguration.default

def submissionConfig(config: Config[ExtraConfig]): SubmissionConfiguration =
SubmissionConfiguration.default

def ledgerConfig(config: Config[ExtraConfig]): LedgerConfiguration =
LedgerConfiguration.default.copy(
initialConfigurationSubmitDelay = Duration.ofSeconds(5)
)
LedgerConfiguration.defaultRemote

def timeServiceBackend(config: Config[ExtraConfig]): Option[TimeServiceBackend] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ final class Runner[T <: ReadWriteService, Extra](
config = factory.apiServerConfig(participantConfig, config),
commandConfig = factory.commandConfig(participantConfig, config),
partyConfig = factory.partyConfig(config),
submissionConfig = factory.submissionConfig(config),
ledgerConfig = factory.ledgerConfig(config),
readService = readService,
writeService = writeService,
Expand Down
Loading

0 comments on commit 7e448d8

Please sign in to comment.