Skip to content

Commit

Permalink
Calling twin().startConsumption twice for the same channel, will lead…
Browse files Browse the repository at this point in the history
… to an UncompletedTwinConsumptionRequestException

* It is possible to start twin().startConsumption multiple times but not in parallel

Signed-off-by: Vadim Guenther <vadim.guenther@bosch.io>
  • Loading branch information
VadimGue committed Nov 16, 2020
1 parent 95b4f2c commit 9bfbac8
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.eclipse.ditto.client.options.Option;
import org.eclipse.ditto.client.options.OptionName;
import org.eclipse.ditto.client.options.internal.OptionsEvaluator;
import org.eclipse.ditto.client.twin.internal.UncompletedTwinConsumptionRequestException;
import org.eclipse.ditto.json.JsonFieldSelector;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonPointer;
Expand Down Expand Up @@ -641,19 +642,43 @@ protected AdaptableBus.SubscriptionId subscribeAndPublishMessage(

LOGGER.trace("Sending {} and waiting for {}", protocolCommand, protocolCommandAck);
final AdaptableBus adaptableBus = messagingProvider.getAdaptableBus();

try {
if (previousSubscriptionId != null
&& checkIfTwinEventIsInsertedTwiceElseThrow(adaptableBus, futureToCompleteOrFailAfterAck)) {
return previousSubscriptionId;
}
} catch (UncompletedTwinConsumptionRequestException e) {
LOGGER.error(e.getMessage());
}
if (previousSubscriptionId != null) {
// remove previous subscription without going through back-end because subscription will be replaced
adaptableBus.unsubscribe(previousSubscriptionId);
}
final AdaptableBus.SubscriptionId subscriptionId =
AdaptableBus.SubscriptionId subscriptionId =
adaptableBus.subscribeForAdaptable(streamingType,
adaptable -> adaptableToNotifier.apply(adaptable).accept(getBus()));
final Classification tag = Classification.forString(protocolCommandAck);
adjoin(adaptableBus.subscribeOnceForString(tag, getTimeout()), futureToCompleteOrFailAfterAck);
messagingProvider.emit(protocolCommand);

return subscriptionId;
}

private boolean checkIfTwinEventIsInsertedTwiceElseThrow(
final AdaptableBus adaptableBus,
final CompletableFuture<Void> futureToCompleteOrFailAfterAck) {

if (adaptableBus.getUnmodifiableOneTimeStringConsumers()
.containsKey(Classification.forString(Classification.StreamingType.TWIN_EVENT.startAck()))) {

LOGGER.warn("First consumption request on this channel must be completed first");
futureToCompleteOrFailAfterAck.completeExceptionally(new UncompletedTwinConsumptionRequestException());
return true;
}
return false;
}

/**
* Remove a subscription.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
package org.eclipse.ditto.client.internal.bus;

import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
Expand Down Expand Up @@ -53,6 +55,14 @@ public interface AdaptableBus {
*/
AdaptableBus addAdaptableClassifier(Classifier<Adaptable> adaptableClassifier);


/**
* Get oneTimeStringConsumers but unmodifiable to only grant read access.
*
* @return a {@code UnmodifiableMap}
*/
Map<Classification, Set<Entry<Consumer<String>>>> getUnmodifiableOneTimeStringConsumers();

/**
* Add a one-time subscriber for a string message.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -84,6 +85,12 @@ public AdaptableBus addAdaptableClassifier(final Classifier<Adaptable> adaptable
return this;
}

@Override
public final Map<Classification, Set<Entry<Consumer<String>>>> getUnmodifiableOneTimeStringConsumers() {
return Collections.unmodifiableMap(oneTimeStringConsumers);
}


@Override
public CompletionStage<String> subscribeOnceForString(final Classification tag, final Duration timeout) {
return subscribeOnce(oneTimeStringConsumers, tag, timeout);
Expand Down Expand Up @@ -256,7 +263,7 @@ private boolean publishToPersistentAdaptableSubscribers(final Adaptable adaptabl
if (persistentConsumers != null && !persistentConsumers.isEmpty()) {
publishedToPersistentSubscribers = true;
for (final Entry<Consumer<Adaptable>> entry : persistentConsumers) {
runConsumerAsync(entry.value, adaptable, tag);
runConsumerAsync(entry.getValue(), adaptable, tag);
}
}
}
Expand Down Expand Up @@ -308,7 +315,7 @@ private <T> void removeAfterIdle(

private static <T> void addEntry(final Map<Classification, Set<Entry<T>>> registry,
final Entry<T> entry) {
registry.compute(entry.key, (key, previousSet) -> {
registry.compute(entry.getKey(), (key, previousSet) -> {
final Set<Entry<T>> concurrentHashSet =
previousSet != null ? previousSet : ConcurrentHashMap.newKeySet();
concurrentHashSet.add(entry);
Expand All @@ -319,7 +326,7 @@ private static <T> void addEntry(final Map<Classification, Set<Entry<T>>> regist
private static <T> void replaceEntry(final Map<Classification, Set<Entry<T>>> registry, final Entry<T> entry) {
final Set<Entry<T>> set = ConcurrentHashMap.newKeySet();
set.add(entry);
registry.put(entry.key, set);
registry.put(entry.getKey(), set);
}

private Optional<Adaptable> parseAsAdaptable(final String message) {
Expand All @@ -339,7 +346,7 @@ private Optional<Adaptable> parseAsAdaptable(final String message) {
private <T> void removeEntry(final Map<Classification, Set<Entry<T>>> registry,
final Entry<?> entry,
final Runnable onRemove) {
registry.computeIfPresent(entry.key, (key, set) -> {
registry.computeIfPresent(entry.getKey(), (key, set) -> {
if (set.remove(entry)) {
onRemove.run();
}
Expand All @@ -359,7 +366,7 @@ private static <T> T removeOne(final Map<Classification, Set<Entry<T>>> registry
.findAny()
.map(entry -> {
if (set.remove(entry)) {
result.set(entry.value);
result.set(entry.getValue());
}
return set.isEmpty() ? null : set;
})
Expand All @@ -371,18 +378,4 @@ private static Throwable timeout(final Duration duration) {
return new TimeoutException("Timed out after " + duration);
}

/**
* Similar to Map.Entry but with object reference identity and fixed key type to act as identifier for
* a subscription.
*/
private static final class Entry<T> implements SubscriptionId {

private final Classification key;
private final T value;

private Entry(final Classification key, final T value) {
this.key = key;
this.value = value;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.eclipse.ditto.client.internal.bus;

/**
* Similar to Map.Entry but with object reference identity and fixed key type to act as identifier for
* a subscription.
*/
final class Entry<T> implements AdaptableBus.SubscriptionId {

private final Classification key;
private final T value;

public Entry(final Classification key, final T value) {
this.key = key;
this.value = value;
}

public Classification getKey() {
return key;
}

public T getValue() {
return value;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.eclipse.ditto.client.twin.internal;

public class UncompletedTwinConsumptionRequestException extends RuntimeException {

private static final long serialVersionUID = -565137801315595348L;
private static final String MESSAGE = "First consumption request on this channel must be completed first";

/**
* Constructs a new {@code UncompletedTwinConsumptionRequestException} object.
*/
public UncompletedTwinConsumptionRequestException() {
super(MESSAGE, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,18 @@
package org.eclipse.ditto.client;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.eclipse.ditto.client.TestConstants.Thing.THING_ID;
import static org.eclipse.ditto.model.base.acks.AcknowledgementRequest.parseAcknowledgementRequest;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.assertj.core.api.Assertions;
import org.eclipse.ditto.client.internal.AbstractDittoClientTest;
import org.eclipse.ditto.client.twin.internal.UncompletedTwinConsumptionRequestException;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.common.HttpStatusCode;
Expand Down Expand Up @@ -97,6 +104,21 @@ public void testAttributeEventAcknowledgement() {
.isEqualTo(HttpStatusCode.INTERNAL_SERVER_ERROR);
}

@Test
public void startConsumptionParallelOnSameTwinChannelShouldThrowException()
throws InterruptedException, ExecutionException, TimeoutException {

final CompletableFuture<Void> voidCompletableFuture1 = client.twin().startConsumption();
final CompletableFuture<Void> voidCompletableFuture2 = client.twin().startConsumption();

messaging.receivePlainString("START-SEND-EVENTS:ACK");

voidCompletableFuture1.get(10, TimeUnit.SECONDS);
assertThatExceptionOfType(ExecutionException.class)
.isThrownBy(() -> voidCompletableFuture2.get(10, TimeUnit.SECONDS))
.withCauseInstanceOf(UncompletedTwinConsumptionRequestException.class);
}

@Test
public void testFeatureEventAcknowledgement() {
client.twin().startConsumption();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ public void receiveEvent(final Message<ThingEvent> message) {
adaptableBus.publish(ProtocolFactory.wrapAsJsonifiableAdaptable(adaptable).toJsonString());
}

public void receivePlainString(final String plain) {
adaptableBus.publish(plain);
}

@Override
public void close() {
executor.shutdownNow();
Expand Down

0 comments on commit 9bfbac8

Please sign in to comment.