Skip to content

Commit

Permalink
Ensure Ack.LabelNotUniqueException and Ack.LabelNotDeclaredException …
Browse files Browse the repository at this point in the history
…are given to connection error handlers.

- Added the error-code classification to filter for errors.

- Added client-wide subscription on acknowledgement label related
  exceptions and call the connection error handler with them.

- Added a delay before a WebSocketMessagingProvider destroys itself
  after the server severs the connection with reconnect disabled,
  in order to handle the final error message.

- Fix another NullPointerException in ClientShutdownTest.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Oct 20, 2020
1 parent 92efb4f commit 3bfbec1
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
package org.eclipse.ditto.client.internal;

import java.text.MessageFormat;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;
Expand All @@ -24,7 +25,9 @@
import org.eclipse.ditto.client.changes.internal.ImmutableFeatureChange;
import org.eclipse.ditto.client.changes.internal.ImmutableFeaturesChange;
import org.eclipse.ditto.client.changes.internal.ImmutableThingChange;
import org.eclipse.ditto.client.internal.bus.AdaptableBus;
import org.eclipse.ditto.client.internal.bus.BusFactory;
import org.eclipse.ditto.client.internal.bus.Classification;
import org.eclipse.ditto.client.internal.bus.JsonPointerSelectors;
import org.eclipse.ditto.client.internal.bus.PointerBus;
import org.eclipse.ditto.client.internal.bus.SelectorUtil;
Expand All @@ -37,6 +40,8 @@
import org.eclipse.ditto.client.twin.Twin;
import org.eclipse.ditto.client.twin.internal.TwinImpl;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabelNotDeclaredException;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabelNotUniqueException;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.eclipse.ditto.model.base.headers.DittoHeadersBuilder;
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;
Expand All @@ -47,9 +52,11 @@
import org.eclipse.ditto.protocoladapter.DittoProtocolAdapter;
import org.eclipse.ditto.protocoladapter.HeaderTranslator;
import org.eclipse.ditto.protocoladapter.ProtocolAdapter;
import org.eclipse.ditto.protocoladapter.ProtocolFactory;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.signals.acks.base.Acknowledgement;
import org.eclipse.ditto.signals.base.Signal;
import org.eclipse.ditto.signals.commands.base.ErrorResponse;
import org.eclipse.ditto.signals.events.things.AclEntryCreated;
import org.eclipse.ditto.signals.events.things.AclEntryDeleted;
import org.eclipse.ditto.signals.events.things.AclEntryModified;
Expand Down Expand Up @@ -107,6 +114,7 @@ private DefaultDittoClient(final TwinImpl twin, final LiveImpl live, final Polic
this.live = live;
this.policies = policies;
logVersionInformation();
handleSpontaneousErrors();
}

/**
Expand Down Expand Up @@ -587,4 +595,48 @@ public CompletionStage<DittoClient> connect() {
.thenCompose(result -> policies.messagingProvider.initializeAsync())
.thenApply(result -> this);
}

private void handleSpontaneousErrors() {
handleSpontaneousErrors(twin.messagingProvider);
if (live.messagingProvider != twin.messagingProvider) {
handleSpontaneousErrors(live.messagingProvider);
}
if (policies.messagingProvider != twin.messagingProvider) {
handleSpontaneousErrors(policies.messagingProvider);
}
}

/**
* Handle {@code DittoRuntimeException}s from the back-end that are not replies of anything.
*
* @param provider the messaging provider.
*/
private static void handleSpontaneousErrors(final MessagingProvider provider) {
final Optional<Consumer<Throwable>> connectionErrorHandler =
provider.getMessagingConfiguration().getConnectionErrorHandler();
if (connectionErrorHandler.isPresent()) {
final AdaptableBus adaptableBus = provider.getAdaptableBus();
final Consumer<Throwable> consumer = connectionErrorHandler.get();

final Classification ackLabelNotUnique =
Classification.forErrorCode(AcknowledgementLabelNotUniqueException.ERROR_CODE);
final Classification ackLabelNotDeclared =
Classification.forErrorCode(AcknowledgementLabelNotDeclaredException.ERROR_CODE);

adaptableBus.subscribeForAdaptableExclusively(ackLabelNotUnique,
adaptable -> consumer.accept(asDittoRuntimeException(adaptable)));
adaptableBus.subscribeForAdaptableExclusively(ackLabelNotDeclared,
adaptable -> consumer.accept(asDittoRuntimeException(adaptable)));
}
}

private static Throwable asDittoRuntimeException(final Adaptable adaptable) {
final Signal<?> signal = AbstractHandle.PROTOCOL_ADAPTER.fromAdaptable(adaptable);
if (signal instanceof ErrorResponse) {
return ((ErrorResponse<?>) signal).getDittoRuntimeException();
} else {
return new ClassCastException("Expect an error response, got: " +
ProtocolFactory.wrapAsJsonifiableAdaptable(adaptable).toJsonString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import java.time.Duration;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.Predicate;

Expand Down Expand Up @@ -83,6 +84,16 @@ public interface AdaptableBus {
*/
SubscriptionId subscribeForAdaptable(Classification tag, Consumer<Adaptable> adaptableConsumer);

/**
* Add a persistent subscriber for an adaptable message and remove all other subscribers.
* Only effective if no one-time string or adaptable subscriber matches.
*
* @param tag the adaptable classification.
* @param adaptableConsumer the consumer of the adaptable message.
* @return the subscription ID.
*/
SubscriptionId subscribeForAdaptableExclusively(Classification tag, Consumer<Adaptable> adaptableConsumer);

/**
* Add a persistent subscriber for an adaptable message that are removed after a timeout.
* If tag requires sequentialization, take care that all consumer and predicate parameters are fast,
Expand All @@ -109,6 +120,11 @@ SubscriptionId subscribeForAdaptableWithTimeout(Classification tag,
*/
boolean unsubscribe(@Nullable SubscriptionId subscriptionId);

/**
* @return the scheduled executor service of this adaptable bus.
*/
ScheduledExecutorService getScheduledExecutor();

/**
* Closes the executor of the adaptable bus .
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public static AdaptableBus createAdaptableBus() {
.addStringClassifier(Classifiers.identity())
.addAdaptableClassifier(Classifiers.correlationId())
.addAdaptableClassifier(Classifiers.streamingType())
.addAdaptableClassifier(Classifiers.thingsSearch());
.addAdaptableClassifier(Classifiers.thingsSearch())
.addAdaptableClassifier(Classifiers.errorCode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ static Classification forThingsSearch(final String searchSubscriptionId) {
return new SearchSubscriptionId(searchSubscriptionId);
}

/**
* Create an error-code classification key.
*
* @param errorCode the error code.
* @return the key.
*/
static Classification forErrorCode(final String errorCode) {
return new ErrorCode(errorCode);
}

/**
* Check whether subscribers for this classification requires sequential dispatching.
*
Expand Down Expand Up @@ -192,4 +202,11 @@ static <T> Optional<Classification> of(final T value) {
return Optional.of(new Identity<>(value));
}
}

final class ErrorCode extends Literal<String> {

private ErrorCode(final String errorCode) {
super(errorCode);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Optional;

import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.protocoladapter.Adaptable;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.signals.events.thingsearch.SubscriptionEvent;
Expand Down Expand Up @@ -66,6 +67,15 @@ public static Classifier<Adaptable> thingsSearch() {
return Instances.THINGS_SEARCH_CLASSIFIER;
}

/**
* Classify Ditto protocol errors by error codes.
*
* @return the error code classifier.
*/
public static Classifier<Adaptable> errorCode() {
return Instances.ERROR_CODE_CLASSIFIER;
}

private static final class StreamingTypeClassifier implements Classifier<Adaptable> {

@Override
Expand Down Expand Up @@ -116,6 +126,22 @@ public Optional<Classification> classify(final Adaptable message) {
}
}

private static final class ErrorCodeClassifier implements Classifier<Adaptable> {

@Override
public Optional<Classification> classify(final Adaptable message) {
if (message.getTopicPath().getCriterion() == TopicPath.Criterion.ERRORS) {
return message.getPayload()
.getValue()
.filter(JsonValue::isObject)
.flatMap(value -> value.asObject().getValue(DittoRuntimeException.JsonFields.ERROR_CODE))
.map(Classification::forErrorCode);
} else {
return Optional.empty();
}
}
}

private static final class Instances {

private static final Classifier<Adaptable> CORRELATION_ID_CLASSIFIER = adaptable ->
Expand All @@ -126,5 +152,7 @@ private static final class Instances {
private static final Classifier<Adaptable> STREAMING_TYPE_CLASSIFIER = new StreamingTypeClassifier();

private static final Classifier<Adaptable> THINGS_SEARCH_CLASSIFIER = new ThingsSearchClassifier();

private static final Classifier<Adaptable> ERROR_CODE_CLASSIFIER = new ErrorCodeClassifier();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,15 @@ public SubscriptionId subscribeForAdaptable(final Classification tag,
return entry;
}

@Override
public SubscriptionId subscribeForAdaptableExclusively(final Classification tag,
final Consumer<Adaptable> adaptableConsumer) {

final Entry<Consumer<Adaptable>> entry = new Entry<>(tag, adaptableConsumer);
replaceEntry(persistentAdaptableConsumers, entry);
return entry;
}

@Override
public SubscriptionId subscribeForAdaptableWithTimeout(final Classification tag, final Duration timeout,
final Consumer<Adaptable> adaptableConsumer, final Predicate<Adaptable> terminationPredicate,
Expand Down Expand Up @@ -136,6 +145,11 @@ public boolean unsubscribe(@Nullable final SubscriptionId subscriptionId) {
}
}

@Override
public ScheduledExecutorService getScheduledExecutor() {
return scheduledExecutorService;
}

@Override
public void publish(final String message) {
singleThreadedExecutorService.submit(() -> doPublish(message));
Expand Down Expand Up @@ -302,6 +316,12 @@ private static <T> void addEntry(final Map<Classification, Set<Entry<T>>> regist
});
}

private static <T> void replaceEntry(final Map<Classification, Set<Entry<T>>> registry, final Entry<T> entry) {
final Set<Entry<T>> set = ConcurrentHashMap.newKeySet();
set.add(entry);
registry.put(entry.key, set);
}

private Optional<Adaptable> parseAsAdaptable(final String message) {
try {
final JsonObject jsonObject = JsonObject.of(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.io.IOException;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -69,6 +70,9 @@
*/
public final class WebSocketMessagingProvider extends WebSocketAdapter implements MessagingProvider {

// how long this object survives after the websocket connection is closed by server and reconnect is disabled
private static final Duration ZOMBIE_LIFETIME = Duration.ofSeconds(3L);

private static final String DITTO_CLIENT_USER_AGENT = "DittoClient/" + VersionReader.determineClientVersion();
private static final Logger LOGGER = LoggerFactory.getLogger(WebSocketMessagingProvider.class);
private static final int CONNECTION_TIMEOUT_MS = 5000;
Expand Down Expand Up @@ -361,7 +365,9 @@ private void handleReconnectionIfEnabled() {
}
} else {
LOGGER.info("Client <{}>: Reconnection is NOT enabled. Closing client ...", sessionId);
close();
// delay self destruction in order to handle any final error message
adaptableBus.getScheduledExecutor()
.schedule(this::close, ZOMBIE_LIFETIME.toMillis(), TimeUnit.MILLISECONDS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ private static List<String> getActiveThreads(final Collection<String> startingTh

// filter out main thread and monitor thread
return Stream.of(threads)
.filter(Objects::nonNull)
.map(Thread::getName)
.filter(Objects::nonNull)
.filter(name -> !ALLOWED_THREADS.contains(name) && !startingThreadNames.contains(name))
Expand Down

0 comments on commit 3bfbec1

Please sign in to comment.