-
Notifications
You must be signed in to change notification settings - Fork 8
/
PingPongProcessor.java
116 lines (92 loc) · 4.38 KB
/
PingPongProcessor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
package examples.pingpong.reactive;
import com.daml.ledger.javaapi.data.*;
import com.daml.ledger.rxjava.LedgerClient;
import com.google.protobuf.Empty;
import io.reactivex.Flowable;
import io.reactivex.Single;
import io.reactivex.subjects.SingleSubject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/**
* This class subscribes to the stream of transactions for a given party and reacts to Ping or Pong contracts.
*/
public class PingPongProcessor {
private static final Logger logger = LoggerFactory.getLogger(PingPongProcessor.class);
private final String party;
private final String ledgerId;
private LedgerClient client;
private final Identifier pingIdentifier;
private final Identifier pongIdentifier;
public PingPongProcessor(String party, LedgerClient client, Identifier pingIdentifier, Identifier pongIdentifier) {
this.party = party;
this.ledgerId = client.getLedgerId();
this.client = client;
this.pingIdentifier = pingIdentifier;
this.pongIdentifier = pongIdentifier;
}
public void runIndefinitely() {
// assemble the request for the transaction stream
Flowable<Transaction> transactions = client.getTransactionsClient().getTransactions(
LedgerOffset.LedgerEnd.getInstance(),
new FiltersByParty(Collections.singletonMap(party, NoFilter.instance)), true);
transactions.forEach(this::processTransaction);
}
/**
* Processes a transaction and sends the resulting commands to the Command Submission Service
*
* @param tx the Transaction to process
*/
private Single<Empty> processTransaction(Transaction tx) {
List<Command> exerciseCommands = tx.getEvents().stream()
.filter(e -> {
return e instanceof CreatedEvent;
}).map(e -> (CreatedEvent) e)
.flatMap(e -> processEvent(tx.getWorkflowId(), e))
.collect(Collectors.toList());
if (!exerciseCommands.isEmpty()) {
return client.getCommandClient().submitAndWait(
tx.getWorkflowId(),
PingPongReactiveMain.APP_ID,
UUID.randomUUID().toString(),
party,
exerciseCommands);
} else return SingleSubject.create();
}
/**
* For each {@link CreatedEvent} where the <code>receiver</code> is
* the current party, exercise the <code>Pong</code> choice of <code>Ping</code> contracts, or the <code>Ping</code>
* choice of <code>Pong</code> contracts.
*
* @param workflowId the workflow the event is part of
* @param event the {@link CreatedEvent} to process
* @return an empty <code>Stream</code> if this event doesn't trigger any action for this {@link PingPongProcessor}'s
* party
*/
private Stream<Command> processEvent(String workflowId, CreatedEvent event) {
Identifier template = event.getTemplateId();
boolean isPing = template.equals(pingIdentifier);
boolean isPong = template.equals(pongIdentifier);
if (!isPing && !isPong) return Stream.empty();
Map<String, Value> fields = event.getArguments().getFieldsMap();
// check that this party is set as the receiver of the contract
boolean thisPartyIsReceiver = fields.get("receiver").asParty().map(receiver -> receiver.getValue().equals(party))
.orElseThrow(() -> new IllegalStateException("expected 'receiver' to be a party, found " + fields.get("receiver")));
if (!thisPartyIsReceiver) return Stream.empty();
String contractId = event.getContractId();
String choice = isPing ? "RespondPong" : "RespondPing";
Optional<Long> count = fields.get("count").asInt64().map(Int64::getValue);
logger.info("{} is exercising {} on {} in workflow {} at count {}", party, choice, contractId, workflowId, count.orElse(-1L));
// assemble the exercise command
Command cmd = new ExerciseCommand(
template,
contractId,
choice,
new DamlRecord(Collections.emptyList()));
return Stream.of(cmd);
}
}