/
LedgerClientJwt.scala
125 lines (101 loc) · 4.63 KB
/
LedgerClientJwt.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
// Copyright (c) 2020 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package com.daml.http
import akka.NotUsed
import akka.stream.scaladsl.Source
import com.daml.lf.data.Ref
import com.daml.jwt.domain.Jwt
import com.daml.ledger.api
import com.daml.ledger.api.v1.active_contracts_service.GetActiveContractsResponse
import com.daml.ledger.api.v1.command_service.{
SubmitAndWaitForTransactionResponse,
SubmitAndWaitForTransactionTreeResponse,
SubmitAndWaitRequest
}
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.api.v1.transaction.Transaction
import com.daml.ledger.api.v1.transaction_filter.TransactionFilter
import com.daml.ledger.client.LedgerClient
import scalaz.OneAnd
import scala.concurrent.{ExecutionContext, Future}
object LedgerClientJwt {
type SubmitAndWaitForTransaction =
(Jwt, SubmitAndWaitRequest) => Future[SubmitAndWaitForTransactionResponse]
type SubmitAndWaitForTransactionTree =
(Jwt, SubmitAndWaitRequest) => Future[SubmitAndWaitForTransactionTreeResponse]
type GetTermination =
Jwt => Future[Option[Terminates.AtAbsolute]]
type GetActiveContracts =
(Jwt, TransactionFilter, Boolean) => Source[GetActiveContractsResponse, NotUsed]
type GetCreatesAndArchivesSince =
(Jwt, TransactionFilter, LedgerOffset, Terminates) => Source[Transaction, NotUsed]
type ListKnownParties =
Jwt => Future[List[api.domain.PartyDetails]]
type GetParties =
(Jwt, OneAnd[Set, Ref.Party]) => Future[List[api.domain.PartyDetails]]
type AllocateParty =
(Jwt, Option[Ref.Party], Option[String]) => Future[api.domain.PartyDetails]
private def bearer(jwt: Jwt): Some[String] = Some(jwt.value: String)
def submitAndWaitForTransaction(client: LedgerClient): SubmitAndWaitForTransaction =
(jwt, req) => client.commandServiceClient.submitAndWaitForTransaction(req, bearer(jwt))
def submitAndWaitForTransactionTree(client: LedgerClient): SubmitAndWaitForTransactionTree =
(jwt, req) => client.commandServiceClient.submitAndWaitForTransactionTree(req, bearer(jwt))
def getTermination(client: LedgerClient)(implicit ec: ExecutionContext): GetTermination =
jwt =>
client.transactionClient.getLedgerEnd(bearer(jwt)).map {
_.offset flatMap {
_.value match {
case off @ LedgerOffset.Value.Absolute(_) => Some(Terminates.AtAbsolute(off))
case LedgerOffset.Value.Boundary(_) | LedgerOffset.Value.Empty => None // at beginning
}
}
}
@SuppressWarnings(Array("org.wartremover.warts.Any"))
def getActiveContracts(client: LedgerClient): GetActiveContracts =
(jwt, filter, verbose) =>
client.activeContractSetClient
.getActiveContracts(filter, verbose, bearer(jwt))
.mapMaterializedValue(_ => NotUsed)
sealed abstract class Terminates extends Product with Serializable {
import Terminates._
def toOffset: Option[LedgerOffset] = this match {
case AtLedgerEnd => Some(ledgerEndOffset)
case Never => None
case AtAbsolute(off) => Some(LedgerOffset(off))
}
}
object Terminates {
case object AtLedgerEnd extends Terminates
case object Never extends Terminates
final case class AtAbsolute(off: LedgerOffset.Value.Absolute) extends Terminates
}
private val ledgerEndOffset =
LedgerOffset(LedgerOffset.Value.Boundary(LedgerOffset.LedgerBoundary.LEDGER_END))
def getCreatesAndArchivesSince(client: LedgerClient): GetCreatesAndArchivesSince =
(jwt, filter, offset, terminates) => {
val end = terminates.toOffset
if (skipRequest(offset, end))
Source.empty[Transaction]
else
client.transactionClient
.getTransactions(offset, terminates.toOffset, filter, verbose = true, token = bearer(jwt))
}
private def skipRequest(start: LedgerOffset, end: Option[LedgerOffset]): Boolean = {
import com.daml.http.util.LedgerOffsetUtil.AbsoluteOffsetOrdering
(start.value, end.map(_.value)) match {
case (s: LedgerOffset.Value.Absolute, Some(e: LedgerOffset.Value.Absolute)) =>
AbsoluteOffsetOrdering.gteq(s, e)
case _ => false
}
}
def listKnownParties(client: LedgerClient): ListKnownParties =
jwt => client.partyManagementClient.listKnownParties(bearer(jwt))
def getParties(client: LedgerClient): GetParties =
(jwt, partyIds) => client.partyManagementClient.getParties(partyIds, bearer(jwt))
def allocateParty(client: LedgerClient): AllocateParty =
(jwt, identifierHint, displayName) =>
client.partyManagementClient.allocateParty(
hint = identifierHint,
displayName = displayName,
token = bearer(jwt))
}