-
Notifications
You must be signed in to change notification settings - Fork 199
/
ApiSubmissionService.scala
278 lines (255 loc) · 10.8 KB
/
ApiSubmissionService.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
// Copyright (c) 2022 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.platform.apiserver.services
import com.daml.api.util.TimeProvider
import com.daml.error.ErrorCode.LoggedApiException
import com.daml.error.definitions.{ErrorCause, LedgerApiErrors, RejectionGenerators}
import com.daml.error.{ContextualizedErrorLogger, DamlContextualizedErrorLogger}
import com.daml.ledger.api.SubmissionIdGenerator
import com.daml.ledger.api.domain.{LedgerId, SubmissionId, Commands => ApiCommands}
import com.daml.ledger.api.messages.command.submission.SubmitRequest
import com.daml.ledger.configuration.Configuration
import com.daml.ledger.participant.state.index.v2._
import com.daml.ledger.participant.state.{v2 => state}
import com.daml.lf.crypto
import com.daml.lf.data.Ref
import com.daml.lf.transaction.SubmittedTransaction
import com.daml.logging.LoggingContext.withEnrichedLoggingContext
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.metrics.Metrics
import com.daml.platform.api.grpc.GrpcApiService
import com.daml.platform.apiserver.SeedService
import com.daml.platform.apiserver.configuration.LedgerConfigurationSubscription
import com.daml.platform.apiserver.execution.{CommandExecutionResult, CommandExecutor}
import com.daml.platform.server.api.services.domain.CommandSubmissionService
import com.daml.platform.server.api.services.grpc.GrpcCommandSubmissionService
import com.daml.platform.services.time.TimeProviderType
import com.daml.scalautil.future.FutureConversion.CompletionStageConversionOps
import com.daml.telemetry.TelemetryContext
import com.daml.timer.Delayed
import java.time.{Duration, Instant}
import java.util.UUID
import scala.concurrent.{ExecutionContext, Future}
import scala.jdk.FutureConverters.CompletionStageOps
import scala.util.{Failure, Success, Try}
private[apiserver] object ApiSubmissionService {
def create(
ledgerId: LedgerId,
writeService: state.WriteService,
partyManagementService: IndexPartyManagementService,
timeProvider: TimeProvider,
timeProviderType: TimeProviderType,
ledgerConfigurationSubscription: LedgerConfigurationSubscription,
seedService: SeedService,
commandExecutor: CommandExecutor,
checkOverloaded: TelemetryContext => Option[state.SubmissionResult],
configuration: ApiSubmissionService.Configuration,
metrics: Metrics,
)(implicit
executionContext: ExecutionContext,
loggingContext: LoggingContext,
): GrpcCommandSubmissionService with GrpcApiService =
new GrpcCommandSubmissionService(
service = new ApiSubmissionService(
writeService,
partyManagementService,
timeProvider,
timeProviderType,
ledgerConfigurationSubscription,
seedService,
commandExecutor,
checkOverloaded,
configuration,
metrics,
),
ledgerId = ledgerId,
currentLedgerTime = () => timeProvider.getCurrentTime,
currentUtcTime = () => Instant.now,
maxDeduplicationDuration = () =>
ledgerConfigurationSubscription.latestConfiguration().map(_.maxDeduplicationDuration),
submissionIdGenerator = SubmissionIdGenerator.Random,
metrics = metrics,
)
final case class Configuration(
implicitPartyAllocation: Boolean
)
}
private[apiserver] final class ApiSubmissionService private[services] (
writeService: state.WriteService,
partyManagementService: IndexPartyManagementService,
timeProvider: TimeProvider,
timeProviderType: TimeProviderType,
ledgerConfigurationSubscription: LedgerConfigurationSubscription,
seedService: SeedService,
commandExecutor: CommandExecutor,
checkOverloaded: TelemetryContext => Option[state.SubmissionResult],
configuration: ApiSubmissionService.Configuration,
metrics: Metrics,
)(implicit executionContext: ExecutionContext, loggingContext: LoggingContext)
extends CommandSubmissionService
with AutoCloseable {
private val logger = ContextualizedLogger.get(this.getClass)
override def submit(
request: SubmitRequest
)(implicit telemetryContext: TelemetryContext): Future[Unit] =
withEnrichedLoggingContext(logging.commands(request.commands)) { implicit loggingContext =>
logger.info("Submitting commands for interpretation")
logger.trace(s"Commands: ${request.commands.commands.commands}")
implicit val contextualizedErrorLogger: ContextualizedErrorLogger =
new DamlContextualizedErrorLogger(
logger,
loggingContext,
request.commands.submissionId.map(SubmissionId.unwrap),
)
val evaluatedCommand = ledgerConfigurationSubscription
.latestConfiguration() match {
case Some(ledgerConfiguration) =>
evaluateAndSubmit(seedService.nextSeed(), request.commands, ledgerConfiguration)
.transform(handleSubmissionResult)
case None =>
Future.failed(
LedgerApiErrors.RequestValidation.NotFound.LedgerConfiguration
.Reject()
.asGrpcError
)
}
evaluatedCommand.andThen(logger.logErrorsOnCall[Unit])
}
private def handleSubmissionResult(result: Try[state.SubmissionResult])(implicit
loggingContext: LoggingContext
): Try[Unit] = {
import state.SubmissionResult._
result match {
case Success(Acknowledged) =>
logger.debug("Success")
Success(())
case Success(result: SynchronousError) =>
logger.info(s"Rejected: ${result.description}")
Failure(result.exception)
// Do not log again on errors that are logging on creation
case Failure(error: LoggedApiException) => Failure(error)
case Failure(error) =>
logger.info(s"Rejected: ${error.getMessage}")
Failure(error)
}
}
private def handleCommandExecutionResult(
result: Either[ErrorCause, CommandExecutionResult]
)(implicit contextualizedErrorLogger: ContextualizedErrorLogger): Future[CommandExecutionResult] =
result.fold(
error => {
metrics.daml.commands.failedCommandInterpretations.mark()
failedOnCommandExecution(error)
},
Future.successful,
)
private def evaluateAndSubmit(
submissionSeed: crypto.Hash,
commands: ApiCommands,
ledgerConfig: Configuration,
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
contextualizedErrorLogger: ContextualizedErrorLogger,
): Future[state.SubmissionResult] =
checkOverloaded(telemetryContext) match {
case Some(submissionResult) => Future.successful(submissionResult)
case None =>
for {
result <- commandExecutor.execute(commands, submissionSeed, ledgerConfig)
transactionInfo <- handleCommandExecutionResult(result)
partyAllocationResults <- allocateMissingInformees(transactionInfo.transaction)
submissionResult <- submitTransaction(
transactionInfo,
partyAllocationResults,
ledgerConfig,
)
} yield submissionResult
}
// Takes the whole transaction to ensure to traverse it only if necessary
private[services] def allocateMissingInformees(
transaction: SubmittedTransaction
)(implicit
loggingContext: LoggingContext,
telemetryContext: TelemetryContext,
): Future[Seq[state.SubmissionResult]] =
if (configuration.implicitPartyAllocation) {
val partiesInTransaction = transaction.informees.toSeq
for {
fetchedParties <- partyManagementService.getParties(partiesInTransaction)
knownParties = fetchedParties.iterator.map(_.party).toSet
missingParties = partiesInTransaction.filterNot(knownParties)
submissionResults <- Future.sequence(missingParties.map(allocateParty))
} yield submissionResults
} else Future.successful(Seq.empty)
private def allocateParty(
name: Ref.Party
)(implicit telemetryContext: TelemetryContext): Future[state.SubmissionResult] = {
val submissionId = Ref.SubmissionId.assertFromString(UUID.randomUUID().toString)
withEnrichedLoggingContext(logging.party(name), logging.submissionId(submissionId)) {
implicit loggingContext =>
logger.info("Implicit party allocation")
writeService
.allocateParty(
hint = Some(name),
displayName = Some(name),
submissionId = submissionId,
)
}
}.asScala
private def submitTransaction(
transactionInfo: CommandExecutionResult,
partyAllocationResults: Seq[state.SubmissionResult],
ledgerConfig: Configuration,
)(implicit telemetryContext: TelemetryContext): Future[state.SubmissionResult] =
partyAllocationResults.find(_ != state.SubmissionResult.Acknowledged) match {
case Some(result) =>
Future.successful(result)
case None =>
timeProviderType match {
case TimeProviderType.WallClock =>
// Submit transactions such that they arrive at the ledger sequencer exactly when record time equals ledger time.
// If the ledger time of the transaction is far in the future (farther than the expected latency),
// the submission to the WriteService is delayed.
val submitAt = transactionInfo.transactionMeta.ledgerEffectiveTime.toInstant
.minus(ledgerConfig.timeModel.avgTransactionLatency)
val submissionDelay = Duration.between(timeProvider.getCurrentTime, submitAt)
if (submissionDelay.isNegative)
submitTransaction(transactionInfo)
else {
logger.info(s"Delaying submission by $submissionDelay")
metrics.daml.commands.delayedSubmissions.mark()
val scalaDelay = scala.concurrent.duration.Duration.fromNanos(submissionDelay.toNanos)
Delayed.Future.by(scalaDelay)(submitTransaction(transactionInfo))
}
case TimeProviderType.Static =>
// In static time mode, record time is always equal to ledger time
submitTransaction(transactionInfo)
}
}
private def submitTransaction(
result: CommandExecutionResult
)(implicit telemetryContext: TelemetryContext): Future[state.SubmissionResult] = {
metrics.daml.commands.validSubmissions.mark()
logger.debug("Submitting transaction to ledger")
writeService
.submitTransaction(
result.submitterInfo,
result.transactionMeta,
result.transaction,
result.interpretationTimeNanos,
result.globalKeyMapping,
result.usedDisclosedContracts,
)
.toScalaUnwrapped
}
private def failedOnCommandExecution(
error: ErrorCause
)(implicit contextualizedErrorLogger: ContextualizedErrorLogger): Future[CommandExecutionResult] =
Future.failed(
RejectionGenerators
.commandExecutorError(error)
.asGrpcError
)
override def close(): Unit = ()
}