Skip to content

Commit

Permalink
reject concurrent consumption requests earlier, add correlation-id to…
Browse files Browse the repository at this point in the history
… consumption requests to allow request/(error-)response correlation, add more consumption tests

Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch.io>
  • Loading branch information
dguggemos committed Nov 23, 2020
1 parent 14b9665 commit 787fc0d
Show file tree
Hide file tree
Showing 14 changed files with 588 additions and 165 deletions.
14 changes: 8 additions & 6 deletions java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@
<equals-verifier.version>3.0.3</equals-verifier.version>
<mockito.version>3.1.0</mockito.version>
<jsonassert.version>1.2.3</jsonassert.version>
<awaitility.version>4.0.3</awaitility.version>

<!-- reactive streams versions -->
<reactive-streams.version>1.0.3</reactive-streams.version>
Expand Down Expand Up @@ -332,12 +333,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M4</version>
<configuration>
<systemProperties>
<kamon.auto-start>true</kamon.auto-start>
</systemProperties>
</configuration>
<version>3.0.0-M5</version>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand Down Expand Up @@ -920,6 +916,12 @@
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mutabilitydetector</groupId>
<artifactId>MutabilityDetector</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,12 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.annotation.Nullable;

Expand All @@ -52,11 +51,13 @@
import org.eclipse.ditto.client.options.Option;
import org.eclipse.ditto.client.options.OptionName;
import org.eclipse.ditto.client.options.internal.OptionsEvaluator;
import org.eclipse.ditto.client.twin.internal.UncompletedConsumptionRequestException;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.common.HttpStatusCode;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.eclipse.ditto.model.messages.Message;
import org.eclipse.ditto.model.messages.MessageDirection;
import org.eclipse.ditto.model.messages.MessageHeaders;
Expand All @@ -71,6 +72,7 @@
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.base.WithOptionalEntity;
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.things.ThingErrorResponse;
import org.eclipse.ditto.signals.commands.things.modify.CreateThing;
import org.eclipse.ditto.signals.commands.things.modify.DeleteThing;
import org.eclipse.ditto.signals.commands.things.modify.ModifyThing;
Expand All @@ -97,6 +99,7 @@ public abstract class CommonManagementImpl<T extends ThingHandle<F>, F extends F

protected final OutgoingMessageFactory outgoingMessageFactory;

private final AtomicBoolean subscriptionRequestPending = new AtomicBoolean(false);
private final HandlerRegistry<T, F> handlerRegistry;
private final PointerBus bus;

Expand All @@ -115,12 +118,20 @@ protected CommonManagementImpl(

@Override
public CompletableFuture<Void> startConsumption() {
return doStartConsumption(Collections.emptyMap());
// do not call doStartConsumption directly
return startConsumption(new Option[]{});
}

@Override
public CompletableFuture<Void> startConsumption(final Option<?>... consumptionOptions) {

// concurrent consumption requests can have strange effects, so better avoid it
if (!subscriptionRequestPending.compareAndSet(false, true)) {
final CompletableFuture<Void> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(new ConcurrentConsumptionRequestException());
return failedFuture;
}

// only accept "Consumption" related options here:
final Optional<Option<?>> unknownOptionIncluded = Arrays.stream(consumptionOptions)
.filter(option -> !option.getName().equals(OptionName.Consumption.NAMESPACES))
Expand All @@ -143,7 +154,8 @@ public CompletableFuture<Void> startConsumption(final Option<?>... consumptionOp
options.getExtraFields().ifPresent(extraFields ->
subscriptionConfig.put(CONSUMPTION_PARAM_EXTRA_FIELDS, extraFields.toString()));

return doStartConsumption(subscriptionConfig);
// make sure to reset the flag when consumption request completes
return doStartConsumption(subscriptionConfig).whenComplete((v, t) -> subscriptionRequestPending.set(false));
}

/**
Expand Down Expand Up @@ -643,17 +655,11 @@ protected AdaptableBus.SubscriptionId subscribeAndPublishMessage(
final CompletableFuture<Void> futureToCompleteOrFailAfterAck,
final Function<Adaptable, NotifyMessage> adaptableToNotifier) {

LOGGER.trace("Sending {} and waiting for {}", protocolCommand, protocolCommandAck);
final String correlationId = UUID.randomUUID().toString();
final String protocolCommandWithCorrelationId = appendCorrelationIdParameter(protocolCommand, correlationId);
LOGGER.trace("Sending {} and waiting for {}", protocolCommandWithCorrelationId, protocolCommandAck);
final AdaptableBus adaptableBus = messagingProvider.getAdaptableBus();

try {
if (previousSubscriptionId != null
&& checkIfTwinEventIsInsertedTwiceElseThrow(adaptableBus, futureToCompleteOrFailAfterAck)) {
return previousSubscriptionId;
}
} catch (UncompletedConsumptionRequestException e) {
LOGGER.error(e.getMessage());
}
if (previousSubscriptionId != null) {
// remove previous subscription without going through back-end because subscription will be replaced
adaptableBus.unsubscribe(previousSubscriptionId);
Expand All @@ -662,28 +668,40 @@ && checkIfTwinEventIsInsertedTwiceElseThrow(adaptableBus, futureToCompleteOrFail
adaptableBus.subscribeForAdaptable(streamingType,
adaptable -> adaptableToNotifier.apply(adaptable).accept(getBus()));
final Classification tag = Classification.forString(protocolCommandAck);
adjoin(adaptableBus.subscribeOnceForString(tag, getTimeout()), futureToCompleteOrFailAfterAck);
messagingProvider.emit(protocolCommand);

// subscribe exclusively because we allow only one request at a time
final CompletionStage<String> ackStage = adaptableBus.subscribeOnceForStringExclusively(tag, getTimeout());
final CompletableFuture<String> ackFuture = ackStage.toCompletableFuture();

// subscribe for possible error responses by correlationId
final Classification correlationIdTag = Classification.forCorrelationId(correlationId);
adaptableBus.subscribeOnceForAdaptable(correlationIdTag, getTimeout())
.thenAccept(adaptable -> {
final Signal<?> signal = AbstractHandle.PROTOCOL_ADAPTER.fromAdaptable(adaptable);
if (signal instanceof ThingErrorResponse) {
ackFuture.completeExceptionally(((ThingErrorResponse) signal).getDittoRuntimeException());
} else {
ackFuture.completeExceptionally(getUnexpectedSignalException(signal));
}
});

adjoin(ackFuture, futureToCompleteOrFailAfterAck);
messagingProvider.emit(protocolCommandWithCorrelationId);

return subscriptionId;
}

private boolean checkIfTwinEventIsInsertedTwiceElseThrow(final AdaptableBus adaptableBus,
final CompletableFuture<Void> futureToCompleteOrFailAfterAck) {
private DittoRuntimeException getUnexpectedSignalException(final Signal<?> signal) {
return DittoRuntimeException
.newBuilder("signal.unexpected", HttpStatusCode.BAD_REQUEST)
.message(() -> String.format("Received unexpected response of type '%s'.", signal.getType()))
.build();
}

final Set<Classification> stringList = Stream.of(
Classification.forString(Classification.StreamingType.TWIN_EVENT.startAck()),
Classification.forString(Classification.StreamingType.LIVE_COMMAND.startAck()),
Classification.forString(Classification.StreamingType.LIVE_EVENT.startAck()),
Classification.forString(Classification.StreamingType.LIVE_MESSAGE.startAck()))
.collect(Collectors.toSet());

if (adaptableBus.getUnmodifiableOneTimeStringConsumers().keySet().stream().anyMatch(stringList::contains)) {
LOGGER.warn("First consumption request on this channel must be completed first");
futureToCompleteOrFailAfterAck.completeExceptionally(new UncompletedConsumptionRequestException());
return true;
}
return false;
private String appendCorrelationIdParameter(final String protocolCommand, final String correlationId) {
final String separator = protocolCommand.contains("?") ? "&" : "?";
return String.format("%s%s%s=%s", protocolCommand, separator,
DittoHeaderDefinition.CORRELATION_ID.getKey(), correlationId);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,20 @@
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.client.twin.internal;
package org.eclipse.ditto.client.internal;

public class UncompletedConsumptionRequestException extends RuntimeException {
/**
* This exception may be thrown if concurrent consumption requests are detected.
*/
public class ConcurrentConsumptionRequestException extends RuntimeException {

private static final long serialVersionUID = -565137801315595348L;
private static final String MESSAGE = "First consumption request on this channel must be completed first";
private static final String MESSAGE = "Concurrent consumption requests are not allowed on one channel.";

/**
* Constructs a new {@code UncompletedTwinConsumptionRequestException} object.
*/
public UncompletedConsumptionRequestException() {
public ConcurrentConsumptionRequestException() {
super(MESSAGE, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@
package org.eclipse.ditto.client.internal.bus;

import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
Expand Down Expand Up @@ -55,23 +53,25 @@ public interface AdaptableBus {
*/
AdaptableBus addAdaptableClassifier(Classifier<Adaptable> adaptableClassifier);


/**
* Get oneTimeStringConsumers but unmodifiable to only grant read access.
* Add a one-time subscriber for a string message.
*
* @return a {@code UnmodifiableMap}
* @param tag the string classification, usually itself.
* @param timeout how long to wait for a match.
* @return a future string matching the tag according to the classifiers, or a failed future
* if no string is matched within the timeout.
*/
Map<Classification, Set<Entry<Consumer<String>>>> getUnmodifiableOneTimeStringConsumers();
CompletionStage<String> subscribeOnceForString(Classification tag, Duration timeout);

/**
* Add a one-time subscriber for a string message.
* Add a one-time subscriber for a string message by replacing an existing with the same string value.
*
* @param tag the string classification, usually itself.
* @param timeout how long to wait for a match.
* @return a future adaptable matching the tag according to the classifiers, or a failed future
* if no adaptable is matched within the timeout.
* @return a future string matching the tag according to the classifiers, or a failed future
* if no string is matched within the timeout.
*/
CompletionStage<String> subscribeOnceForString(Classification tag, Duration timeout);
CompletionStage<String> subscribeOnceForStringExclusively(Classification tag, Duration timeout);

/**
* Add a one-time subscriber for an adaptable message. Only effective if no one-time string subscriber matches.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -86,13 +85,13 @@ public AdaptableBus addAdaptableClassifier(final Classifier<Adaptable> adaptable
}

@Override
public final Map<Classification, Set<Entry<Consumer<String>>>> getUnmodifiableOneTimeStringConsumers() {
return Collections.unmodifiableMap(oneTimeStringConsumers);
public CompletionStage<String> subscribeOnceForString(final Classification tag, final Duration timeout) {
return subscribeOnce(oneTimeStringConsumers, tag, timeout);
}

@Override
public CompletionStage<String> subscribeOnceForString(final Classification tag, final Duration timeout) {
return subscribeOnce(oneTimeStringConsumers, tag, timeout);
public CompletionStage<String> subscribeOnceForStringExclusively(final Classification tag, final Duration timeout) {
return subscribeOnce(oneTimeStringConsumers, tag, timeout, true);
}

@Override
Expand Down Expand Up @@ -164,8 +163,14 @@ public void publish(final String message) {
@Override
public void shutdownExecutor() {
LOGGER.trace("Shutting down AdaptableBus Executors");
singleThreadedExecutorService.shutdownNow();
scheduledExecutorService.shutdownNow();
try {
singleThreadedExecutorService.shutdownNow();
scheduledExecutorService.shutdownNow();
singleThreadedExecutorService.awaitTermination(2, TimeUnit.SECONDS);
scheduledExecutorService.awaitTermination(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOGGER.info("Waiting for termination was interrupted.");
}
}

// call this in a single-threaded executor so that ordering is preserved
Expand Down Expand Up @@ -212,9 +217,21 @@ private <T> CompletionStage<T> subscribeOnce(
final Map<Classification, Set<Entry<Consumer<T>>>> registry,
final Classification tag,
final Duration timeout) {
return subscribeOnce(registry, tag, timeout, false);
}

private <T> CompletionStage<T> subscribeOnce(
final Map<Classification, Set<Entry<Consumer<T>>>> registry,
final Classification tag,
final Duration timeout,
final boolean exclusively) {
final CompletableFuture<T> resultFuture = new CompletableFuture<>();
final Entry<Consumer<T>> subscriber = new Entry<>(tag, resultFuture::complete);
addEntry(registry, subscriber);
if (exclusively) {
replaceEntry(registry, subscriber);
} else {
addEntry(registry, subscriber);
}
removeAfter(registry, subscriber, timeout, resultFuture);
return resultFuture;
}
Expand Down Expand Up @@ -262,7 +279,7 @@ private boolean publishToPersistentAdaptableSubscribers(final Adaptable adaptabl
if (persistentConsumers != null && !persistentConsumers.isEmpty()) {
publishedToPersistentSubscribers = true;
for (final Entry<Consumer<Adaptable>> entry : persistentConsumers) {
runConsumerAsync(entry.getValue(), adaptable, tag);
runConsumerAsync(entry.value, adaptable, tag);
}
}
}
Expand Down Expand Up @@ -314,7 +331,7 @@ private <T> void removeAfterIdle(

private static <T> void addEntry(final Map<Classification, Set<Entry<T>>> registry,
final Entry<T> entry) {
registry.compute(entry.getKey(), (key, previousSet) -> {
registry.compute(entry.key, (key, previousSet) -> {
final Set<Entry<T>> concurrentHashSet =
previousSet != null ? previousSet : ConcurrentHashMap.newKeySet();
concurrentHashSet.add(entry);
Expand All @@ -325,7 +342,7 @@ private static <T> void addEntry(final Map<Classification, Set<Entry<T>>> regist
private static <T> void replaceEntry(final Map<Classification, Set<Entry<T>>> registry, final Entry<T> entry) {
final Set<Entry<T>> set = ConcurrentHashMap.newKeySet();
set.add(entry);
registry.put(entry.getKey(), set);
registry.put(entry.key, set);
}

private Optional<Adaptable> parseAsAdaptable(final String message) {
Expand All @@ -345,7 +362,7 @@ private Optional<Adaptable> parseAsAdaptable(final String message) {
private <T> void removeEntry(final Map<Classification, Set<Entry<T>>> registry,
final Entry<?> entry,
final Runnable onRemove) {
registry.computeIfPresent(entry.getKey(), (key, set) -> {
registry.computeIfPresent(entry.key, (key, set) -> {
if (set.remove(entry)) {
onRemove.run();
}
Expand All @@ -365,7 +382,7 @@ private static <T> T removeOne(final Map<Classification, Set<Entry<T>>> registry
.findAny()
.map(entry -> {
if (set.remove(entry)) {
result.set(entry.getValue());
result.set(entry.value);
}
return set.isEmpty() ? null : set;
})
Expand All @@ -377,4 +394,18 @@ private static Throwable timeout(final Duration duration) {
return new TimeoutException("Timed out after " + duration);
}

/**
* Similar to Map.Entry but with object reference identity and fixed key type to act as identifier for
* a subscription.
*/
private static final class Entry<T> implements SubscriptionId {

private final Classification key;
private final T value;

private Entry(final Classification key, final T value) {
this.key = key;
this.value = value;
}
}
}
Loading

0 comments on commit 787fc0d

Please sign in to comment.