Skip to content

Commit

Permalink
version-service - Move command deduplication feature descriptors into…
Browse files Browse the repository at this point in the history
… the experimental features [kvl-1218] (#12318)

Co-authored-by: fabiotudone-da <fabio.tudone@digitalasset.com>
Co-authored-by: Simon Meier <simon@digitalasset.com>
  • Loading branch information
3 people committed Jan 11, 2022
1 parent 4211557 commit a7c51ef
Show file tree
Hide file tree
Showing 12 changed files with 78 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,46 @@ option csharp_namespace = "Com.Daml.Ledger.Api.V1";
message ExperimentalFeatures {
ExperimentalSelfServiceErrorCodes self_service_error_codes = 1;
ExperimentalStaticTime static_time = 2;
CommandDeduplicationFeatures command_deduplication = 3;
}

// GRPC self-service error codes are returned by the Ledger API.
message ExperimentalSelfServiceErrorCodes {}

// Ledger is in the static time mode and exposes a time service.
message ExperimentalStaticTime {}

// Feature descriptors for command deduplication intended to be used for adapting Ledger API tests.
message CommandDeduplicationFeatures {
CommandDeduplicationPeriodSupport deduplication_period_support = 1;
CommandDeduplicationType deduplication_type = 2;
}

// Feature descriptor specifying how deduplication periods can be specified and how they are handled by the participant
// node.
message CommandDeduplicationPeriodSupport {
// How the participant node supports deduplication periods specified using offsets.
enum OffsetSupport {
OFFSET_NOT_SUPPORTED = 0;
OFFSET_NATIVE_SUPPORT = 1;
OFFSET_CONVERT_TO_DURATION = 2;
}
// How the participant node supports deduplication periods specified as durations.
enum DurationSupport {
DURATION_NATIVE_SUPPORT = 0;
DURATION_CONVERT_TO_OFFSET = 1;
}
OffsetSupport offset_support = 1;
DurationSupport duration_support = 2;
}

// How the participant node reports duplicate command submissions.
enum CommandDeduplicationType {
// Duplicate commands are exclusively reported asynchronously via completions.
ASYNC_ONLY = 0;
// Commands that are duplicates of concurrently submitted commands are reported synchronously via a gRPC error on the
// command submission, while all other duplicate commands are reported asynchronously via completions.
ASYNC_AND_CONCURRENT_SYNC = 1;
// Duplicate commands are always reported synchronously via a synchronous gRPC error on the command submission.
SYNC_ONLY = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,34 +55,8 @@ message FeaturesDescriptor {
// Daml applications SHOULD not depend on these in production.
ExperimentalFeatures experimental = 1;

// Features related to command deduplication
CommandDeduplicationFeatures command_deduplication = 3;
}

// Used to signal the presence of the user management service.
// Defined as a message to enable future addition of individual per-service features.
message UserManagementFeature {}

message CommandDeduplicationFeatures {
DeduplicationPeriodSupport deduplication_period_support = 1;
ParticipantDeduplicationSupport participant_deduplication_support = 2;
}

message DeduplicationPeriodSupport {
enum OffsetSupport {
OFFSET_NATIVE_SUPPORT = 0;
OFFSET_CONVERT_TO_DURATION = 1;
OFFSET_NOT_SUPPORTED = 2;
}
enum DurationSupport {
DURATION_NATIVE_SUPPORT = 0;
DURATION_CONVERT_TO_OFFSET = 1;
}
OffsetSupport offset_support = 1;
DurationSupport duration_support = 2;
}
enum ParticipantDeduplicationSupport {
PARTICIPANT_DEDUPLICATION_NOT_SUPPORTED = 0;
PARTICIPANT_DEDUPLICATION_PARALLEL_ONLY = 1;
PARTICIPANT_DEDUPLICATION_SUPPORTED = 2;
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@

package com.daml.ledger.api.testtool.infrastructure.participant

import com.daml.ledger.api.v1.version_service.{
CommandDeduplicationFeatures,
GetLedgerApiVersionResponse,
}
import com.daml.ledger.api.v1.experimental_features.CommandDeduplicationFeatures
import com.daml.ledger.api.v1.version_service.GetLedgerApiVersionResponse

final case class Features(
selfServiceErrorCodes: Boolean = false,
Expand All @@ -26,8 +24,8 @@ object Features {
Features(
selfServiceErrorCodes = experimental.flatMap(_.selfServiceErrorCodes).isDefined,
userManagement = features.flatMap(_.userManagement).isDefined,
commandDeduplicationFeatures = response.getFeatures.getCommandDeduplication,
staticTime = experimental.flatMap(_.staticTime).isDefined,
commandDeduplicationFeatures = response.getFeatures.getExperimental.getCommandDeduplication,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import com.daml.ledger.api.v1.completion.Completion
import com.daml.ledger.api.v1.completion.Completion.{
DeduplicationPeriod => CompletionDeduplicationPeriod
}
import com.daml.ledger.api.v1.experimental_features.CommandDeduplicationPeriodSupport.OffsetSupport
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.api.v1.version_service.DeduplicationPeriodSupport.OffsetSupport
import com.daml.ledger.client.binding.Primitive.Party
import com.daml.ledger.test.model.DA.Types.Tuple2
import com.daml.ledger.test.model.Test.{Dummy, DummyWithAnnotation, TextKey, TextKeyOperations}
Expand Down Expand Up @@ -328,7 +328,7 @@ final class CommandDeduplicationIT(
submitAndAssertAccepted(thirdCall)
}
_ = if ( // participant deduplication is based on submittedAt, and thus the delta between record times can actually be smaller than the deduplication duration
!ledger.features.commandDeduplicationFeatures.participantDeduplicationSupport.isParticipantDeduplicationSupported
!ledger.features.commandDeduplicationFeatures.deduplicationType.isSyncOnly
)
assert(
time.Duration
Expand Down Expand Up @@ -586,9 +586,7 @@ final class CommandDeduplicationIT(
)(implicit
ec: ExecutionContext
): Future[Option[Completion]] =
if (
ledger.features.commandDeduplicationFeatures.participantDeduplicationSupport.isParticipantDeduplicationSupported
)
if (ledger.features.commandDeduplicationFeatures.deduplicationType.isSyncOnly)
submitRequestAndAssertSyncDeduplication(ledger, request, acceptedSubmissionId, acceptedOffset)
.map(_ => None)
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,8 @@ class CommandDeduplicationParallelIT extends LedgerTestSuite {
// Canton can return ABORTED for parallel in-flight duplicate submissions
val abortedResponses = responses.getOrElse(Code.ABORTED, 0)
val duplicateResponses =
if (
ledger.features.commandDeduplicationFeatures.participantDeduplicationSupport.isParticipantDeduplicationParallelOnly
) alreadyExistsResponses + abortedResponses
if (ledger.features.commandDeduplicationFeatures.deduplicationType.isAsyncAndConcurrentSync)
alreadyExistsResponses + abortedResponses
else alreadyExistsResponses
assert(
okResponses == 1 && duplicateResponses == numberOfParallelRequests - 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import com.daml.ledger.api.auth.Authorizer
import com.daml.ledger.api.auth.services._
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.health.HealthChecks
import com.daml.ledger.api.v1.version_service.CommandDeduplicationFeatures
import com.daml.ledger.api.v1.experimental_features.CommandDeduplicationFeatures
import com.daml.ledger.client.services.commands.CommandSubmissionFlow
import com.daml.ledger.participant.state.index.v2._
import com.daml.ledger.participant.state.{v2 => state}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import com.daml.error.ErrorCodesVersionSwitcher
import com.daml.ledger.api.auth.interceptor.AuthorizationInterceptor
import com.daml.ledger.api.auth.{AuthService, Authorizer}
import com.daml.ledger.api.health.HealthChecks
import com.daml.ledger.api.v1.version_service.CommandDeduplicationFeatures
import com.daml.ledger.api.v1.experimental_features.CommandDeduplicationFeatures
import com.daml.ledger.configuration.LedgerId
import com.daml.ledger.participant.state.index.v2.{IndexService, UserManagementStore}
import com.daml.ledger.participant.state.{v2 => state}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import com.daml.error.{
ErrorCodesVersionSwitcher,
}
import com.daml.ledger.api.v1.experimental_features.{
CommandDeduplicationFeatures,
ExperimentalFeatures,
ExperimentalSelfServiceErrorCodes,
ExperimentalStaticTime,
}
import com.daml.ledger.api.v1.version_service.VersionServiceGrpc.VersionService
import com.daml.ledger.api.v1.version_service.{
CommandDeduplicationFeatures,
FeaturesDescriptor,
GetLedgerApiVersionRequest,
GetLedgerApiVersionResponse,
Expand Down Expand Up @@ -59,9 +59,9 @@ private[apiserver] final class ApiVersionService private (
selfServiceErrorCodes =
Option.when(enableSelfServiceErrorCodes)(ExperimentalSelfServiceErrorCodes()),
staticTime = Option.when(enableStaticTime)(ExperimentalStaticTime()),
commandDeduplication = Some(commandDeduplicationFeatures),
)
),
commandDeduplication = Some(commandDeduplicationFeatures),
)

override def getLedgerApiVersion(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import akka.stream.Materializer
import com.codahale.metrics.InstrumentedExecutorService
import com.daml.error.ErrorCodesVersionSwitcher
import com.daml.ledger.api.health.HealthChecks
import com.daml.ledger.api.v1.version_service.{
import com.daml.ledger.api.v1.experimental_features.{
CommandDeduplicationFeatures,
DeduplicationPeriodSupport,
ParticipantDeduplicationSupport,
CommandDeduplicationPeriodSupport,
CommandDeduplicationType,
}
import com.daml.ledger.participant.state.index.impl.inmemory.InMemoryUserManagementStore
import com.daml.ledger.participant.state.v2.WritePackagesService
Expand Down Expand Up @@ -201,14 +201,14 @@ final class Runner[T <: ReadWriteService, Extra](
servicesExecutionContext = servicesExecutionContext,
commandDeduplicationFeatures = CommandDeduplicationFeatures.of(
Some(
DeduplicationPeriodSupport.of(
CommandDeduplicationPeriodSupport.of(
offsetSupport =
DeduplicationPeriodSupport.OffsetSupport.OFFSET_CONVERT_TO_DURATION,
CommandDeduplicationPeriodSupport.OffsetSupport.OFFSET_CONVERT_TO_DURATION,
durationSupport =
DeduplicationPeriodSupport.DurationSupport.DURATION_NATIVE_SUPPORT,
CommandDeduplicationPeriodSupport.DurationSupport.DURATION_NATIVE_SUPPORT,
)
),
ParticipantDeduplicationSupport.PARTICIPANT_DEDUPLICATION_NOT_SUPPORTED,
CommandDeduplicationType.ASYNC_ONLY,
),
).acquire()
} yield {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import com.daml.ledger.api.auth.interceptor.AuthorizationInterceptor
import com.daml.ledger.api.auth.{AuthService, AuthServiceWildcard, Authorizer}
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.health.HealthChecks
import com.daml.ledger.api.v1.version_service.{
import com.daml.ledger.api.v1.experimental_features.{
CommandDeduplicationFeatures,
DeduplicationPeriodSupport,
ParticipantDeduplicationSupport,
CommandDeduplicationPeriodSupport,
CommandDeduplicationType,
}
import com.daml.ledger.participant.state.index.impl.inmemory.InMemoryUserManagementStore
import com.daml.ledger.participant.state.v2.metrics.TimedWriteService
Expand Down Expand Up @@ -428,12 +428,13 @@ final class SandboxServer(
userManagementStore = userManagementStore,
commandDeduplicationFeatures = CommandDeduplicationFeatures.of(
Some(
DeduplicationPeriodSupport.of(
offsetSupport = DeduplicationPeriodSupport.OffsetSupport.OFFSET_NOT_SUPPORTED,
durationSupport = DeduplicationPeriodSupport.DurationSupport.DURATION_NATIVE_SUPPORT,
CommandDeduplicationPeriodSupport.of(
offsetSupport = CommandDeduplicationPeriodSupport.OffsetSupport.OFFSET_NOT_SUPPORTED,
durationSupport =
CommandDeduplicationPeriodSupport.DurationSupport.DURATION_NATIVE_SUPPORT,
)
),
ParticipantDeduplicationSupport.PARTICIPANT_DEDUPLICATION_SUPPORTED,
CommandDeduplicationType.SYNC_ONLY,
),
)(materializer, executionSequencerFactory, loggingContext)
.map(_.withServices(List(resetService)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package com.daml.ledger.sandbox

import java.util.UUID
import java.util.concurrent.{Executors, TimeUnit}

import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.Materializer
Expand All @@ -13,10 +14,10 @@ import com.codahale.metrics.InstrumentedExecutorService
import com.daml.api.util.TimeProvider
import com.daml.error.ErrorCodesVersionSwitcher
import com.daml.ledger.api.health.HealthChecks
import com.daml.ledger.api.v1.version_service.{
import com.daml.ledger.api.v1.experimental_features.{
CommandDeduplicationFeatures,
DeduplicationPeriodSupport,
ParticipantDeduplicationSupport,
CommandDeduplicationPeriodSupport,
CommandDeduplicationType,
}
import com.daml.ledger.offset.Offset
import com.daml.ledger.participant.state.index.impl.inmemory.InMemoryUserManagementStore
Expand Down Expand Up @@ -266,12 +267,12 @@ object SandboxOnXRunner {
userManagementStore = new InMemoryUserManagementStore, // TODO persistence wiring comes here
commandDeduplicationFeatures = CommandDeduplicationFeatures.of(
Some(
DeduplicationPeriodSupport.of(
DeduplicationPeriodSupport.OffsetSupport.OFFSET_NOT_SUPPORTED,
DeduplicationPeriodSupport.DurationSupport.DURATION_NATIVE_SUPPORT,
CommandDeduplicationPeriodSupport.of(
CommandDeduplicationPeriodSupport.OffsetSupport.OFFSET_NOT_SUPPORTED,
CommandDeduplicationPeriodSupport.DurationSupport.DURATION_NATIVE_SUPPORT,
)
),
ParticipantDeduplicationSupport.PARTICIPANT_DEDUPLICATION_SUPPORTED,
CommandDeduplicationType.SYNC_ONLY,
),
)

Expand Down
14 changes: 7 additions & 7 deletions ledger/sandbox/src/main/scala/platform/sandboxnext/Runner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import com.daml.error.ErrorCodesVersionSwitcher
import com.daml.ledger.api.auth.{AuthServiceWildcard, Authorizer}
import com.daml.ledger.api.domain
import com.daml.ledger.api.health.HealthChecks
import com.daml.ledger.api.v1.version_service.{
import com.daml.ledger.api.v1.experimental_features.{
CommandDeduplicationFeatures,
DeduplicationPeriodSupport,
ParticipantDeduplicationSupport,
CommandDeduplicationPeriodSupport,
CommandDeduplicationType,
}
import com.daml.ledger.configuration.LedgerId
import com.daml.ledger.on.sql.Database.InvalidDatabaseException
Expand Down Expand Up @@ -319,14 +319,14 @@ class Runner(config: SandboxConfig) extends ResourceOwner[Port] {
servicesExecutionContext = servicesExecutionContext,
commandDeduplicationFeatures = CommandDeduplicationFeatures.of(
Some(
DeduplicationPeriodSupport.of(
CommandDeduplicationPeriodSupport.of(
offsetSupport =
DeduplicationPeriodSupport.OffsetSupport.OFFSET_CONVERT_TO_DURATION,
CommandDeduplicationPeriodSupport.OffsetSupport.OFFSET_CONVERT_TO_DURATION,
durationSupport =
DeduplicationPeriodSupport.DurationSupport.DURATION_NATIVE_SUPPORT,
CommandDeduplicationPeriodSupport.DurationSupport.DURATION_NATIVE_SUPPORT,
)
),
ParticipantDeduplicationSupport.PARTICIPANT_DEDUPLICATION_NOT_SUPPORTED,
CommandDeduplicationType.ASYNC_ONLY,
),
)
_ = apiServerServicesClosed.completeWith(apiServer.servicesClosed())
Expand Down

0 comments on commit a7c51ef

Please sign in to comment.