/
BaseConsumerActor.java
279 lines (248 loc) · 13.1 KB
/
BaseConsumerActor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
/*
* Copyright (c) 2017 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.connectivity.messaging;
import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.eclipse.ditto.model.base.common.HttpStatusCode;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.connectivity.ConnectionId;
import org.eclipse.ditto.model.connectivity.ConnectivityModelFactory;
import org.eclipse.ditto.model.connectivity.ConnectivityStatus;
import org.eclipse.ditto.model.connectivity.ResourceStatus;
import org.eclipse.ditto.model.connectivity.Source;
import org.eclipse.ditto.services.connectivity.messaging.config.ConnectivityConfig;
import org.eclipse.ditto.services.connectivity.messaging.config.DittoConnectivityConfig;
import org.eclipse.ditto.services.connectivity.messaging.monitoring.ConnectionMonitor;
import org.eclipse.ditto.services.connectivity.messaging.monitoring.DefaultConnectionMonitorRegistry;
import org.eclipse.ditto.services.models.acks.config.AcknowledgementConfig;
import org.eclipse.ditto.services.models.connectivity.ExternalMessage;
import org.eclipse.ditto.services.models.connectivity.ExternalMessageBuilder;
import org.eclipse.ditto.services.models.connectivity.ExternalMessageFactory;
import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter;
import org.eclipse.ditto.services.utils.config.DefaultScopedConfig;
import org.eclipse.ditto.services.utils.config.InstanceIdentifierSupplier;
import org.eclipse.ditto.signals.acks.base.Acknowledgements;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.connectivity.exceptions.ConnectionUnavailableException;
import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
import akka.pattern.Patterns;
/**
* Base class for consumer actors that holds common fields and handles the address status.
*/
public abstract class BaseConsumerActor extends AbstractActorWithTimers {
protected final String sourceAddress;
protected final Source source;
protected final ConnectionMonitor inboundMonitor;
protected final ConnectionMonitor inboundAcknowledgedMonitor;
protected final ConnectionId connectionId;
private final ActorRef inboundMappingProcessor;
private final AcknowledgementConfig acknowledgementConfig;
@Nullable private ResourceStatus resourceStatus;
protected BaseConsumerActor(final ConnectionId connectionId, final String sourceAddress,
final ActorRef inboundMappingProcessor, final Source source) {
this.connectionId = checkNotNull(connectionId, "connectionId");
this.sourceAddress = checkNotNull(sourceAddress, "sourceAddress");
this.inboundMappingProcessor = checkNotNull(inboundMappingProcessor, "messageMappingProcessor");
this.source = checkNotNull(source, "source");
resetResourceStatus();
final ConnectivityConfig connectivityConfig = DittoConnectivityConfig.of(
DefaultScopedConfig.dittoScoped(getContext().getSystem().settings().config()));
acknowledgementConfig = connectivityConfig.getAcknowledgementConfig();
inboundMonitor = DefaultConnectionMonitorRegistry.fromConfig(connectivityConfig.getMonitoringConfig())
.forInboundConsumed(connectionId, sourceAddress);
inboundAcknowledgedMonitor =
DefaultConnectionMonitorRegistry.fromConfig(connectivityConfig.getMonitoringConfig())
.forInboundAcknowledged(connectionId, sourceAddress);
}
/**
* @return the logging adapter of this actor.
*/
protected abstract DittoDiagnosticLoggingAdapter log();
/**
* Send an external message to the mapping processor actor.
* NOT thread-safe!
*
* @param message the external message
* @param settle technically settle the incoming message. MUST be thread-safe.
* @param reject technically reject the incoming message. MUST be thread-safe.
*/
protected final void forwardToMappingActor(final ExternalMessage message, final Runnable settle,
final Reject reject) {
forwardAndAwaitAck(addSourceAndReplyTarget(message))
.handle((output, error) -> {
if (output != null) {
final List<CommandResponse<?>> failedResponses = output.getFailedResponses();
if (output.allExpectedResponsesArrived() && failedResponses.isEmpty()) {
settle.run();
} else {
// empty failed responses indicate that SetCount was missing
final boolean shouldRedeliver = failedResponses.isEmpty() ||
someFailedResponseRequiresRedelivery(failedResponses);
log().debug("Rejecting [redeliver={}] due to failed responses <{}>",
shouldRedeliver, failedResponses);
reject.reject(shouldRedeliver);
}
} else {
final DittoRuntimeException dittoRuntimeException =
DittoRuntimeException.asDittoRuntimeException(error, rootCause -> {
// Redeliver and pray this unexpected error goes away
log().debug("Rejecting [redeliver=true] due to error <{}>", rootCause);
reject.reject(true);
inboundFailure(rootCause);
return null;
});
if (dittoRuntimeException != null) {
final boolean shouldRedeliver = requiresRedelivery(dittoRuntimeException.getStatusCode());
log().debug("Rejecting [redeliver={}] due to error <{}>",
shouldRedeliver, dittoRuntimeException);
reject.reject(shouldRedeliver);
inboundFailure(dittoRuntimeException);
}
}
return null;
})
.exceptionally(e -> {
log().error(e, "Unexpected error during manual acknowledgement.");
return null;
});
}
/**
* Send an error to the mapping processor actor to be published in the reply-target.
*
* @param message the error.
*/
protected final void forwardToMappingActor(final DittoRuntimeException message) {
final DittoRuntimeException messageWithReplyInformation =
message.setDittoHeaders(enrichHeadersWithReplyInformation(message.getDittoHeaders()));
inboundMappingProcessor.tell(messageWithReplyInformation, ActorRef.noSender());
}
protected void resetResourceStatus() {
resourceStatus = ConnectivityModelFactory.newSourceStatus(getInstanceIdentifier(),
ConnectivityStatus.OPEN, sourceAddress, "Started at " + Instant.now());
}
protected ResourceStatus getCurrentSourceStatus() {
return ConnectivityModelFactory.newSourceStatus(getInstanceIdentifier(),
resourceStatus != null ? resourceStatus.getStatus() : ConnectivityStatus.UNKNOWN,
sourceAddress,
resourceStatus != null ? resourceStatus.getStatusDetails().orElse(null) : null);
}
protected void handleAddressStatus(final ResourceStatus resourceStatus) {
if (resourceStatus.getResourceType() == ResourceStatus.ResourceType.UNKNOWN) {
this.resourceStatus = ConnectivityModelFactory.newSourceStatus(getInstanceIdentifier(),
resourceStatus.getStatus(), sourceAddress,
resourceStatus.getStatusDetails().orElse(null));
} else {
this.resourceStatus = resourceStatus;
}
}
protected void inboundFailure(final Throwable error) {
final DittoRuntimeException dittoRuntimeException = DittoRuntimeException.asDittoRuntimeException(
error,
e -> {
log().error(e, "Inbound failure");
return ConnectionUnavailableException.newBuilder(connectionId).build();
}
);
inboundMonitor.failure(dittoRuntimeException);
}
private CompletionStage<ResponseCollectorActor.Output> forwardAndAwaitAck(final Object message) {
// 1. start per-inbound-signal actor to collect acks of all thing-modify-commands mapped from incoming signal
final Duration collectorLifetime = acknowledgementConfig.getCollectorFallbackLifetime();
final Duration askTimeout = acknowledgementConfig.getCollectorFallbackAskTimeout();
final ActorRef responseCollector = getContext().actorOf(ResponseCollectorActor.props(collectorLifetime));
// 2. forward message to mapping processor actor with response collector actor as sender
// message mapping processor actor will set the number of expected acks (can be 0)
// and start the same amount of ack aggregator actors
inboundMappingProcessor.tell(message, responseCollector);
// 3. ask response collector actor to get the collected responses in a future
return Patterns.ask(responseCollector, ResponseCollectorActor.query(), askTimeout).thenCompose(output -> {
if (output instanceof ResponseCollectorActor.Output) {
return CompletableFuture.completedFuture((ResponseCollectorActor.Output) output);
} else if (output instanceof Throwable) {
return CompletableFuture.failedFuture((Throwable) output);
} else {
log().error("Expect ResponseCollectorActor.Output, got: <{}>", output);
return CompletableFuture.failedFuture(new ClassCastException("Unexpected acknowledgement type."));
}
});
}
private ExternalMessage addSourceAndReplyTarget(final ExternalMessage message) {
final ExternalMessageBuilder externalMessageBuilder =
ExternalMessageFactory.newExternalMessageBuilder(message)
.withSource(source);
externalMessageBuilder.withInternalHeaders(enrichHeadersWithReplyInformation(message.getInternalHeaders()));
return externalMessageBuilder.build();
}
protected DittoHeaders enrichHeadersWithReplyInformation(final DittoHeaders headers) {
return source.getReplyTarget()
.map(replyTarget -> headers.toBuilder()
.replyTarget(source.getIndex())
.expectedResponseTypes(replyTarget.getExpectedResponseTypes())
.build())
.orElse(headers);
}
private static String getInstanceIdentifier() {
return InstanceIdentifierSupplier.getInstance().get();
}
private static boolean someFailedResponseRequiresRedelivery(final Collection<CommandResponse<?>> failedResponses) {
return failedResponses.isEmpty() || failedResponses.stream()
.flatMap(BaseConsumerActor::extractAggregatedResponses)
.map(CommandResponse::getStatusCode)
.anyMatch(BaseConsumerActor::requiresRedelivery);
}
private static Stream<? extends CommandResponse<?>> extractAggregatedResponses(final CommandResponse<?> response) {
if (response instanceof Acknowledgements) {
return ((Acknowledgements) response).stream();
} else {
return Stream.of(response);
}
}
/**
* Decide whether an Acknowledgement or DittoRuntimeException requires redelivery based on the status code.
* Client errors excluding 408 request-timeout and 424 failed-dependency are considered unrecoverable
* and no redelivery will be attempted.
*
* @param status Status code of the Acknowledgement or DittoRuntimeException.
* @return whether it requires redelivery.
*/
private static boolean requiresRedelivery(final HttpStatusCode status) {
switch (status) {
case REQUEST_TIMEOUT:
case FAILED_DEPENDENCY:
return true;
default:
return status.isInternalError();
}
}
/**
* Reject an incoming message.
*/
@FunctionalInterface
public interface Reject {
/**
* Reject a message.
*
* @param shouldRedeliver whether the broker should redeliver.
*/
void reject(boolean shouldRedeliver);
}
}