-
Notifications
You must be signed in to change notification settings - Fork 215
/
AcknowledgementForwarderActorStarter.java
282 lines (253 loc) · 12.7 KB
/
AcknowledgementForwarderActorStarter.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.internal.models.acks;
import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.eclipse.ditto.base.model.acks.AcknowledgementLabel;
import org.eclipse.ditto.base.model.acks.AcknowledgementRequest;
import org.eclipse.ditto.base.model.acks.DittoAcknowledgementLabel;
import org.eclipse.ditto.base.model.entity.id.EntityId;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.DittoHeadersBuilder;
import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.base.model.signals.acks.Acknowledgement;
import org.eclipse.ditto.base.model.signals.acks.AcknowledgementRequestDuplicateCorrelationIdException;
import org.eclipse.ditto.base.model.signals.announcements.Announcement;
import org.eclipse.ditto.base.model.signals.commands.Command;
import org.eclipse.ditto.base.model.signals.events.Event;
import org.eclipse.ditto.internal.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLogger;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.actor.ActorSelection;
import akka.actor.InvalidActorNameException;
import akka.actor.Props;
import akka.japi.Pair;
/**
* Starting an acknowledgement forwarder actor is more complex than simply call {@code actorOf}.
* Thus starting logic is worth to be handled within its own class.
*
* @since 1.1.0
*/
final class AcknowledgementForwarderActorStarter implements Supplier<Optional<ActorRef>> {
private static final DittoLogger LOGGER = DittoLoggerFactory.getLogger(AcknowledgementForwarderActorStarter.class);
private static final String PREFIX_COUNTER_SEPARATOR = "#";
private static final String LIVE_CHANNEL = "live";
private final ActorRefFactory actorRefFactory;
private final ActorRef parent;
private final EntityId entityId;
private final Signal<?> signal;
private final DittoHeaders dittoHeaders;
private final AcknowledgementConfig acknowledgementConfig;
private final Set<AcknowledgementRequest> acknowledgementRequests;
private AcknowledgementForwarderActorStarter(final ActorRefFactory actorRefFactory,
final ActorRef parent,
final EntityId entityId,
final Signal<?> signal,
final AcknowledgementConfig acknowledgementConfig,
final Predicate<AcknowledgementLabel> isAckLabelAllowed) {
this.actorRefFactory = checkNotNull(actorRefFactory, "actorRefFactory");
this.parent = parent;
this.entityId = checkNotNull(entityId, "entityId");
this.signal = checkNotNull(signal, "signal");
dittoHeaders = signal.getDittoHeaders();
this.acknowledgementConfig = checkNotNull(acknowledgementConfig, "acknowledgementConfig");
acknowledgementRequests = dittoHeaders.getAcknowledgementRequests()
.stream()
.filter(request -> isAckLabelAllowed.test(request.getLabel()))
.collect(Collectors.toSet());
}
/**
* Returns an instance of {@code ActorStarter}.
*
* @param actorRefFactory the factory to start the forwarder actor in.
* @param parent the parent of the forwarder actor.
* @param entityId is used for the NACKs if the forwarder actor cannot be started.
* @param signal the signal for which the forwarder actor is to start.
* @param acknowledgementConfig provides configuration setting regarding acknowledgement handling.
* @param isAckLabelAllowed predicate for whether an ack label is allowed for publication at this channel.
* "live-response" is always allowed.
* @return a means to start an acknowledgement forwarder actor.
* @throws NullPointerException if any argument is {@code null}.
*/
static AcknowledgementForwarderActorStarter getInstance(final ActorRefFactory actorRefFactory,
final ActorRef parent,
final EntityId entityId,
final Signal<?> signal,
final AcknowledgementConfig acknowledgementConfig,
final Predicate<AcknowledgementLabel> isAckLabelAllowed) {
return new AcknowledgementForwarderActorStarter(actorRefFactory, parent, entityId, signal,
acknowledgementConfig,
// live-response is always allowed
isAckLabelAllowed.or(DittoAcknowledgementLabel.LIVE_RESPONSE::equals));
}
/**
* Retrieve the acknowledgement requests allowed for this actor starter.
* Any acknowledgement request in the original signal are discarded if this channel is not allowed to fulfill them.
*
* @return the meaningful acknowledgement requests.
*/
public Set<AcknowledgementRequest> getAllowedAckRequests() {
return acknowledgementRequests;
}
@Override
public Optional<ActorRef> get() {
ActorRef actorRef = null;
if (hasEffectiveAckRequests(signal, acknowledgementRequests)) {
try {
actorRef = startAckForwarderActor(dittoHeaders);
} catch (final InvalidActorNameException e) {
// In case that the actor with that name already existed, the correlation-id was already used recently:
declineAllNonDittoAckRequests(getDuplicateCorrelationIdException(e));
}
}
return Optional.ofNullable(actorRef);
}
/**
* Start an acknowledgement forwarder.
* Always succeeds.
*
* @return the new correlation ID if an ack forwarder started, or an empty optional if the ack forwarder did not
* start because no acknowledgement was requested.
*/
public Optional<String> getConflictFree() {
if (hasEffectiveAckRequests(signal, acknowledgementRequests)) {
final DittoHeadersBuilder<?, ?> builder = dittoHeaders.toBuilder()
.acknowledgementRequests(acknowledgementRequests);
final Pair<String, Integer> prefixPair = parseCorrelationId(dittoHeaders);
final String prefix = prefixPair.first();
int counter = prefixPair.second();
String correlationId = dittoHeaders.getCorrelationId().orElse(prefix);
while (true) {
try {
builder.correlationId(correlationId);
startAckForwarderActor(builder.build());
return Optional.of(correlationId);
} catch (final InvalidActorNameException e) {
// generate a new ID
correlationId = joinPrefixAndCounter(prefix, ++counter);
}
}
} else {
return Optional.empty();
}
}
private String joinPrefixAndCounter(final String prefix, final int counter) {
return String.format("%s%s%d", prefix, PREFIX_COUNTER_SEPARATOR, counter);
}
private Pair<String, Integer> parseCorrelationId(final DittoHeaders dittoHeaders) {
final Optional<String> providedCorrelationId = dittoHeaders.getCorrelationId();
if (providedCorrelationId.isPresent()) {
final String correlationId = providedCorrelationId.get();
final int separatorIndex = correlationId.lastIndexOf(PREFIX_COUNTER_SEPARATOR);
if (separatorIndex >= 0 && isNumber(correlationId, separatorIndex + 1)) {
final String prefix = correlationId.substring(0, separatorIndex);
final String number = correlationId.substring(separatorIndex + 1);
try {
return Pair.create(prefix, Integer.valueOf(number));
} catch (final NumberFormatException e) {
return Pair.create(prefix, -1);
}
} else {
return Pair.create(correlationId, -1);
}
}
return Pair.create(UUID.randomUUID().toString(), -1);
}
private ActorRef startAckForwarderActor(final DittoHeaders dittoHeaders) {
final Props props = AcknowledgementForwarderActor.props(dittoHeaders,
acknowledgementConfig.getForwarderFallbackTimeout());
return actorRefFactory.actorOf(props, AcknowledgementForwarderActor.determineActorName(dittoHeaders));
}
private DittoRuntimeException getDuplicateCorrelationIdException(final Throwable cause) {
return AcknowledgementRequestDuplicateCorrelationIdException
.newBuilder(dittoHeaders.getCorrelationId().orElse("?"))
.dittoHeaders(dittoHeaders)
.cause(cause)
.build();
}
private void declineAllNonDittoAckRequests(final DittoRuntimeException dittoRuntimeException) {
final DittoHeaders headers = dittoRuntimeException.getDittoHeaders();
final String ackregatorAddress = headers.get(DittoHeaderDefinition.DITTO_ACKREGATOR_ADDRESS.getKey());
if (null != ackregatorAddress) {
final ActorSelection acknowledgementRequester = actorRefFactory.actorSelection(ackregatorAddress);
// answer NACKs for all AcknowledgementRequests with labels which were not Ditto-defined
acknowledgementRequests.stream()
.map(AcknowledgementRequest::getLabel)
.filter(Predicate.not(DittoAcknowledgementLabel::contains))
.map(label -> getNack(label, dittoRuntimeException))
.forEach(nack -> acknowledgementRequester.tell(nack, parent));
} else {
LOGGER.withCorrelationId(headers)
.error("Received DittoRuntimeException <{}> did not contain header of Ackgregator address: {}",
dittoRuntimeException.getClass().getSimpleName(), headers);
}
}
private Acknowledgement getNack(final AcknowledgementLabel label,
final DittoRuntimeException dittoRuntimeException) {
return Acknowledgement.of(label, entityId, dittoRuntimeException.getHttpStatus(), dittoHeaders,
dittoRuntimeException.toJson());
}
static boolean isNotTwinPersistedOrLiveResponse(final AcknowledgementRequest request) {
return isNotLiveResponse(request) && isNotTwinPersisted(request);
}
static boolean isNotTwinPersisted(final AcknowledgementRequest request) {
return !DittoAcknowledgementLabel.TWIN_PERSISTED.equals(request.getLabel());
}
static boolean isNotLiveResponse(final AcknowledgementRequest request) {
return !DittoAcknowledgementLabel.LIVE_RESPONSE.equals(request.getLabel());
}
static boolean isLiveSignal(final Signal<?> signal) {
return signal.getDittoHeaders().getChannel().stream().anyMatch(LIVE_CHANNEL::equals);
}
static boolean hasEffectiveAckRequests(final Signal<?> signal, final Set<AcknowledgementRequest> ackRequests) {
final boolean isLiveSignal = isLiveSignal(signal);
if (Event.isThingEvent(signal) && !isLiveSignal) {
return ackRequests.stream()
.anyMatch(AcknowledgementForwarderActorStarter::isNotTwinPersistedOrLiveResponse);
} else if (Command.isMessageCommand(signal) || (isLiveSignal && Command.isThingCommand(signal))) {
return ackRequests.stream().anyMatch(AcknowledgementForwarderActorStarter::isNotTwinPersisted) ||
signal.getDittoHeaders().isResponseRequired();
} else if (Announcement.isPolicyAnnouncement(signal)) {
return !ackRequests.isEmpty();
} else {
return false;
}
}
private static boolean isNumber(final String string, final int startIndex) {
if (startIndex > string.length()) {
return false;
}
final char firstChar = string.charAt(startIndex);
if (!Character.isDigit(firstChar)) {
if (firstChar != '-' || startIndex + 1 > string.length()) {
// singular "-" is not a number.
return false;
}
}
for (int i = startIndex + 1; i < string.length(); ++i) {
if (!Character.isDigit(string.charAt(i))) {
return false;
}
}
return true;
}
}