Skip to content

Commit

Permalink
Make submission ID optional and generate it in CommandTracker if em…
Browse files Browse the repository at this point in the history
…pty [KVL-1107]

CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
hubert-da committed Sep 23, 2021
1 parent a043926 commit c663219
Show file tree
Hide file tree
Showing 26 changed files with 40 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ message Completion {

// The submission ID this completion refers to, as described in ``commands.proto``.
// Must be a valid LedgerString (as described in ``value.proto``).
// Optional for historic completions where this data is not available.
// Optional
string submission_id = 6;

reserved "submission_rank"; // For future use.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ private[commands] class CommandTracker[Context](
val submissionId = commands.submissionId
val commandId = commands.commandId
logger.trace(s"Begin tracking of command $commandId for submission $submissionId.")
if (commands.submissionId.isEmpty) {
if (submissionId.isEmpty) {
throw new IllegalArgumentException(
s"The submission ID for the command ID $commandId is empty. This should not happen."
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ final class CommandsValidator(ledgerId: LedgerId) {
appId <- requireLedgerString(commands.applicationId, "application_id")
.map(domain.ApplicationId(_))
commandId <- requireLedgerString(commands.commandId, "command_id").map(domain.CommandId(_))
submissionId <- requireSubmissionId(commands.submissionId)
submissionId <- validateSubmissionId(commands.submissionId)
submitters <- CommandsValidator.validateSubmitters(commands)
commandz <- requireNonEmpty(commands.commands, "commands")
validatedCommands <- validateInnerCommands(commandz)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

package com.daml.platform.server.api.services.grpc

import com.daml.ledger.api.SubmissionIdGenerator
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.command_submission_service.CommandSubmissionServiceGrpc.{
CommandSubmissionService => ApiCommandSubmissionService
Expand Down Expand Up @@ -31,7 +30,6 @@ class GrpcCommandSubmissionService(
currentLedgerTime: () => Instant,
currentUtcTime: () => Instant,
maxDeduplicationTime: () => Option[Duration],
submissionIdGenerator: SubmissionIdGenerator,
metrics: Metrics,
)(implicit executionContext: ExecutionContext)
extends ApiCommandSubmissionService
Expand All @@ -51,37 +49,26 @@ class GrpcCommandSubmissionService(
telemetryContext.setAttribute(SpanAttribute.Submitter, commands.party)
telemetryContext.setAttribute(SpanAttribute.WorkflowId, commands.workflowId)
}
val requestWithSubmissionId = generateSubmissionIdIfEmpty(request)
Timed.timedAndTrackedFuture(
metrics.daml.commands.submissions,
metrics.daml.commands.submissionsRunning,
Timed
.value(
metrics.daml.commands.validation,
validator.validate(
requestWithSubmissionId,
request,
currentLedgerTime(),
currentUtcTime(),
maxDeduplicationTime(),
),
)
.fold(
t => Future.failed(ValidationLogger.logFailure(requestWithSubmissionId, t)),
t => Future.failed(ValidationLogger.logFailure(request, t)),
service.submit(_).map(_ => Empty.defaultInstance),
),
)
}

override def bindService(): ServerServiceDefinition =
CommandSubmissionServiceGrpc.bindService(this, executionContext)

private def generateSubmissionIdIfEmpty(request: ApiSubmitRequest): ApiSubmitRequest = {
if (request.commands.exists(_.submissionId.isEmpty)) {
val commandsWithSubmissionId =
request.commands.map(_.copy(submissionId = submissionIdGenerator.generate()))
request.copy(commands = commandsWithSubmissionId)
} else {
request
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,16 @@ trait FieldValidations {
def requireLedgerString(s: String): Either[StatusRuntimeException, Ref.LedgerString] =
Ref.LedgerString.fromString(s).left.map(invalidArgument(definiteAnswer = Some(false)))

def requireSubmissionId(s: String): Either[StatusRuntimeException, domain.SubmissionId] = {
val fieldName = "submission_id"
def validateSubmissionId(s: String): Either[StatusRuntimeException, Option[domain.SubmissionId]] =
if (s.isEmpty) {
Left(missingField(fieldName, definiteAnswer = Some(false)))
Right(None)
} else {
Ref.SubmissionId
.fromString(s)
.map(domain.SubmissionId(_))
.map(submissionId => Some(domain.SubmissionId(submissionId)))
.left
.map(invalidField(fieldName, _, definiteAnswer = Some(false)))
.map(invalidField("submission_id", _, definiteAnswer = Some(false)))
}
}

def requireContractId(
s: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class SubmitRequestValidatorTest
workflowId = Some(workflowId),
applicationId = applicationId,
commandId = commandId,
submissionId = submissionId,
submissionId = None,
actAs = Set(DomainMocks.party),
readAs = Set.empty,
submittedAt = submittedAt,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import com.daml.ledger.api.messages.command.submission.SubmitRequest
import com.daml.ledger.api.testing.utils.MockMessages._
import com.daml.ledger.api.v1.commands.{Command, CreateCommand}
import com.daml.ledger.api.v1.value.{Identifier, Record, RecordField, Value}
import com.daml.lf.data.Ref
import com.daml.metrics.Metrics
import com.daml.platform.server.api.services.domain.CommandSubmissionService
import com.daml.telemetry.{SpanAttribute, TelemetryContext, TelemetrySpecBase}
Expand Down Expand Up @@ -41,7 +40,6 @@ class GrpcCommandSubmissionServiceSpec
currentLedgerTime = () => Instant.EPOCH,
currentUtcTime = () => Instant.EPOCH,
maxDeduplicationTime = () => Some(Duration.ZERO),
submissionIdGenerator = () => Ref.SubmissionId.assertFromString("submissionId"),
metrics = new Metrics(new MetricRegistry),
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ object domain {
workflowId: Option[WorkflowId],
applicationId: ApplicationId,
commandId: CommandId,
submissionId: SubmissionId,
submissionId: Option[SubmissionId],
actAs: Set[Ref.Party],
readAs: Set[Ref.Party],
submittedAt: Instant,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ private[migration] class V2_1__Rebuild_Acs extends BaseJavaMigration {
offset,
) =>
// We don't have a submission ID, so we need to generate one.
val submissionId = Ref.SubmissionId.assertFromString(UUID.randomUUID().toString)
val submissionId = Some(Ref.SubmissionId.assertFromString(UUID.randomUUID().toString))
val rejectionReason = readRejectionReason(rejectionType, rejectionDescription)
offset -> LedgerEntry.Rejection(
recordTime = recordedAt.toInstant,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private[apiserver] final class StoreBackedCommandExecutor(
commands.applicationId.unwrap,
commands.commandId.unwrap,
commands.deduplicationPeriod,
commands.submissionId.unwrap,
commands.submissionId.map(_.unwrap),
ledgerConfiguration,
),
transactionMeta = state.TransactionMeta(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
package com.daml.platform.apiserver.services

import com.daml.api.util.TimeProvider
import com.daml.ledger.api.DeduplicationPeriod
import com.daml.ledger.api.domain.{LedgerId, Commands => ApiCommands}
import com.daml.ledger.api.messages.command.submission.SubmitRequest
import com.daml.ledger.api.{DeduplicationPeriod, SubmissionIdGenerator}
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.index.v2._
import com.daml.ledger.participant.state.{v2 => state}
Expand Down Expand Up @@ -74,7 +74,6 @@ private[apiserver] object ApiSubmissionService {
currentUtcTime = () => Instant.now,
maxDeduplicationTime = () =>
ledgerConfigurationSubscription.latestConfiguration().map(_.maxDeduplicationTime),
submissionIdGenerator = SubmissionIdGenerator.Random,
metrics = metrics,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ private class JdbcLedgerDao(
state.Update.CommandRejected(
recordTime = Time.Timestamp.assertFromInstant(recordTime),
completionInfo = state
.CompletionInfo(actAs, applicationId, commandId, None, Some(submissionId)),
.CompletionInfo(actAs, applicationId, commandId, None, submissionId),
reasonTemplate = reason.toParticipantStateRejectionReason,
)
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ private class JdbcLedgerDao(
queries
.prepareRejectionInsert(
completionInfo =
state.CompletionInfo(actAs, applicationId, commandId, None, Some(submissionId)),
state.CompletionInfo(actAs, applicationId, commandId, None, submissionId),
offset = offset,
recordTime = recordTime,
reason = reason.toParticipantStateRejectionReason,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ private[platform] object LedgerEntry {
recordTime: Instant,
commandId: Ref.CommandId,
applicationId: Ref.ApplicationId,
submissionId: Ref.SubmissionId,
submissionId: Option[Ref.SubmissionId],
actAs: List[Ref.Party],
rejectionReason: RejectionReason,
) extends LedgerEntry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import java.time.{Duration, Instant}

import com.codahale.metrics.MetricRegistry
import com.daml.ledger.api.DeduplicationPeriod
import com.daml.ledger.api.domain.{ApplicationId, CommandId, Commands, LedgerId, SubmissionId}
import com.daml.ledger.api.domain.{ApplicationId, CommandId, Commands, LedgerId}
import com.daml.ledger.configuration.{Configuration, LedgerTimeModel}
import com.daml.ledger.participant.state.index.v2.{ContractStore, IndexPackagesService}
import com.daml.lf.crypto.Hash
Expand Down Expand Up @@ -60,7 +60,7 @@ class StoreBackedCommandExecutorSpec
workflowId = None,
applicationId = ApplicationId(Ref.ApplicationId.assertFromString("applicationId")),
commandId = CommandId(Ref.CommandId.assertFromString("commandId")),
submissionId = SubmissionId(Ref.SubmissionId.assertFromString("submissionId")),
submissionId = None,
actAs = Set.empty,
readAs = Set.empty,
submittedAt = Instant.EPOCH,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,11 @@

package com.daml.platform.apiserver.services

import java.time.{Duration, Instant}
import java.util.UUID
import java.util.concurrent.CompletableFuture.completedFuture
import java.util.concurrent.atomic.AtomicInteger

import com.codahale.metrics.MetricRegistry
import com.daml.ledger.api.{DeduplicationPeriod, DomainMocks}
import com.daml.ledger.api.domain.{CommandId, Commands, LedgerId, PartyDetails, SubmissionId}
import com.daml.ledger.api.domain.{CommandId, Commands, LedgerId, PartyDetails}
import com.daml.ledger.api.messages.command.submission.SubmitRequest
import com.daml.ledger.api.testing.utils.AkkaBeforeAndAfterAll
import com.daml.ledger.api.{DeduplicationPeriod, DomainMocks}
import com.daml.ledger.configuration.{Configuration, LedgerTimeModel}
import com.daml.ledger.participant.state.index.v2.{
CommandDeduplicationNew,
Expand Down Expand Up @@ -47,6 +42,9 @@ import org.scalatest.Inside
import org.scalatest.flatspec.AsyncFlatSpec
import org.scalatest.matchers.should.Matchers

import java.time.{Duration, Instant}
import java.util.concurrent.CompletableFuture.completedFuture
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success}

Expand Down Expand Up @@ -415,7 +413,7 @@ object ApiSubmissionServiceSpec {
commandId = CommandId(
Ref.CommandId.assertFromString(s"commandId-${commandId.incrementAndGet()}")
),
submissionId = SubmissionId(Ref.SubmissionId.assertFromString(UUID.randomUUID().toString)),
submissionId = None,
actAs = Set.empty,
readAs = Set.empty,
submittedAt = Instant.MIN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ private[state] object Conversions {
.addAllSubmitters((subInfo.actAs: List[String]).asJava)
.setApplicationId(subInfo.applicationId)
.setCommandId(subInfo.commandId)
.setSubmissionId(subInfo.submissionId)
.setSubmissionId(subInfo.submissionId.getOrElse(""))
subInfo.deduplicationPeriod match {
case DeduplicationPeriod.DeduplicationDuration(duration) =>
submitterInfoBuilder.setDeduplicationDuration(buildDuration(duration))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,6 @@ object KVTest {
submitter,
commandId,
deduplicationTime,
randomLedgerString,
)
testState.keyValueSubmission.transactionToSubmission(
submitterInfo = submitterInfo,
Expand Down Expand Up @@ -431,14 +430,13 @@ object KVTest {
submitter: Ref.Party,
commandId: Ref.CommandId,
deduplicationTime: Duration,
submissionId: Ref.SubmissionId,
): SubmitterInfo = {
SubmitterInfo(
actAs = List(submitter),
applicationId = Ref.LedgerString.assertFromString("test"),
commandId = commandId,
deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(deduplicationTime),
submissionId = submissionId,
submissionId = None,
ledgerConfiguration =
Configuration(1, LedgerTimeModel.reasonableDefault, Duration.ofSeconds(1)),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -704,7 +704,7 @@ abstract class ParticipantStateIntegrationSpecBase(implementationName: String)(i
applicationId = Ref.LedgerString.assertFromString("tests"),
commandId = Ref.LedgerString.assertFromString(commandId),
deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(Duration.ofSeconds(10)),
submissionId = Ref.LedgerString.assertFromString("submissionId"),
submissionId = None,
ledgerConfiguration =
Configuration(1, LedgerTimeModel.reasonableDefault, Duration.ofSeconds(1)),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class KeyValueCommittingSpec extends AnyWordSpec with Matchers {
applicationId = applicationId,
commandId = commandId,
deduplicationPeriod = ApiDeduplicationPeriod.DeduplicationDuration(Duration.ZERO),
submissionId = Ref.LedgerString.assertFromString("submission"),
submissionId = None,
ledgerConfiguration =
Configuration(1, LedgerTimeModel.reasonableDefault, Duration.ofSeconds(1)),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ object KeyValueParticipantStateWriterSpec {
applicationId = Ref.LedgerString.assertFromString("tests"),
commandId = Ref.LedgerString.assertFromString(commandId),
deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(Duration.ofDays(1)),
submissionId = Ref.LedgerString.assertFromString("submission"),
submissionId = None,
ledgerConfiguration =
Configuration(1, LedgerTimeModel.reasonableDefault, Duration.ofSeconds(1)),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ final case class SubmitterInfo(
applicationId: Ref.ApplicationId,
commandId: Ref.CommandId,
deduplicationPeriod: DeduplicationPeriod,
submissionId: Ref.SubmissionId,
submissionId: Option[Ref.SubmissionId],
ledgerConfiguration: Configuration,
) {

Expand All @@ -44,7 +44,7 @@ final case class SubmitterInfo(
applicationId,
commandId,
Some(deduplicationPeriod),
Some(submissionId),
submissionId,
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ private[sandbox] final class InMemoryLedger(
Some(submitterInfo.commandId),
transactionId,
Some(submitterInfo.applicationId),
Some(submitterInfo.submissionId),
submitterInfo.submissionId,
submitterInfo.actAs,
transactionMeta.workflowId,
transactionMeta.ledgerEffectiveTime.toInstant,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ class TransactionTimeModelComplianceIT
ledger: Ledger,
ledgerTime: Instant,
commandId: String,
submissionId: String = UUID.randomUUID().toString,
configuration: Configuration,
) = {
val dummyTransaction = TransactionBuilder.EmptySubmitted
Expand All @@ -98,7 +97,7 @@ class TransactionTimeModelComplianceIT
applicationId = Ref.ApplicationId.assertFromString("appId"),
commandId = Ref.CommandId.assertFromString(commandId + UUID.randomUUID().toString),
deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(JDuration.ZERO),
submissionId = Ref.SubmissionId.assertFromString(submissionId),
submissionId = None,
ledgerConfiguration = configuration,
)
val transactionMeta = state.TransactionMeta(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ final class SqlLedgerOnMutableIndexSpec
applicationId = applicationId,
commandId = commandId1,
deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(Duration.ofHours(1)),
submissionId = submissionId1,
submissionId = None,
ledgerConfiguration = Configuration.reasonableInitialConfiguration,
),
transactionMeta = emptyTransactionMeta(seedService, ledgerEffectiveTime = now),
Expand Down Expand Up @@ -313,7 +313,7 @@ final class SqlLedgerOnMutableIndexSpec
applicationId = applicationId,
commandId = commandId1,
deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(Duration.ofHours(1)),
submissionId = submissionId1,
submissionId = None,
ledgerConfiguration = Configuration.reasonableInitialConfiguration,
),
transactionMeta = emptyTransactionMeta(seedService, ledgerEffectiveTime = now),
Expand Down Expand Up @@ -371,7 +371,7 @@ final class SqlLedgerOnMutableIndexSpec
applicationId = applicationId,
commandId = commandId1,
deduplicationPeriod = DeduplicationPeriod.DeduplicationDuration(Duration.ofHours(1)),
submissionId = submissionId1,
submissionId = None,
ledgerConfiguration = configuration,
),
transactionMeta =
Expand Down

0 comments on commit c663219

Please sign in to comment.