Skip to content

Commit

Permalink
Allow to declare acknowledgements via ditto client
Browse files Browse the repository at this point in the history
Signed-off-by: Yannic Klem <yannic.klem@bosch.io>
  • Loading branch information
Yannic92 committed Oct 2, 2020
1 parent 3d257c7 commit f853baf
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

import java.net.URI;
import java.time.Duration;
import java.util.Collection;
import java.util.Optional;
import java.util.function.Consumer;

import javax.annotation.Nullable;

import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;

/**
Expand Down Expand Up @@ -49,6 +51,13 @@ public interface MessagingConfiguration {
*/
URI getEndpointUri();

/**
* Returns the labels of all acknowledgements that are declared to be provided by this connection.
*
* @return the acknowledgment labels.
*/
Collection<AcknowledgementLabel> getDeclaredAcknowledgements();

/**
* @return {@code true} if client should try to reconnect when connection is lost.
*/
Expand Down Expand Up @@ -107,6 +116,14 @@ interface Builder {
*/
Builder endpoint(String endpoint);

/**
* Sets the labels of all acknowledgements that are declared to be provided by this connection.
*
* @param acknowledgementLabels the acknowledgement labels
* @return this builder.
*/
Builder declaredAcknowledgements(Collection<AcknowledgementLabel> acknowledgementLabels);

/**
* Sets if {@code reconnectEnabled}.
* <p> Default is enabled. If a connection was established once, the client tries to reconnect <em>every 5
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,22 @@
import java.text.MessageFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.annotation.Nullable;

import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.model.base.json.JsonSchemaVersion;

/**
Expand All @@ -43,6 +51,7 @@ public final class WebSocketMessagingConfiguration implements MessagingConfigura
@Nullable private final ProxyConfiguration proxyConfiguration;
@Nullable private final TrustStoreConfiguration trustStoreConfiguration;
@Nullable private final Consumer<Throwable> connectionErrorHandler;
private final Collection<AcknowledgementLabel> declaredAcknowledgements;

public WebSocketMessagingConfiguration(final WebSocketMessagingConfigurationBuilder builder,
final URI endpointUri) {
Expand All @@ -53,6 +62,7 @@ public WebSocketMessagingConfiguration(final WebSocketMessagingConfigurationBuil
trustStoreConfiguration = builder.trustStoreConfiguration;
connectionErrorHandler = builder.connectionErrorHandler;
this.timeout = builder.timeout;
this.declaredAcknowledgements = Collections.unmodifiableSet(builder.declaredAcknowledgements);
this.endpointUri = endpointUri;
}

Expand All @@ -75,6 +85,11 @@ public URI getEndpointUri() {
return endpointUri;
}

@Override
public Collection<AcknowledgementLabel> getDeclaredAcknowledgements() {
return declaredAcknowledgements;
}

@Override
public boolean isReconnectEnabled() {
return reconnectEnabled;
Expand Down Expand Up @@ -108,6 +123,7 @@ private static final class WebSocketMessagingConfigurationBuilder implements Mes
@Nullable private ProxyConfiguration proxyConfiguration;
private TrustStoreConfiguration trustStoreConfiguration;
@Nullable private Consumer<Throwable> connectionErrorHandler;
private final Set<AcknowledgementLabel> declaredAcknowledgements = new HashSet<>();

private WebSocketMessagingConfigurationBuilder() {
jsonSchemaVersion = JsonSchemaVersion.LATEST;
Expand Down Expand Up @@ -141,6 +157,13 @@ public MessagingConfiguration.Builder endpoint(final String endpoint) {
return this;
}

@Override
public Builder declaredAcknowledgements(final Collection<AcknowledgementLabel> acknowledgementLabels) {
this.declaredAcknowledgements.clear();
this.declaredAcknowledgements.addAll(acknowledgementLabels);
return this;
}

@Override
public MessagingConfiguration.Builder reconnectEnabled(final boolean reconnectEnabled) {
this.reconnectEnabled = reconnectEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.io.IOException;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
Expand All @@ -43,7 +44,11 @@
import org.eclipse.ditto.client.messaging.AuthenticationProvider;
import org.eclipse.ditto.client.messaging.MessagingException;
import org.eclipse.ditto.client.messaging.MessagingProvider;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.model.base.common.HttpStatusCode;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -181,7 +186,14 @@ private WebSocket createWebsocket() {
final WebSocketFactory webSocketFactory = WebSocketFactoryFactory.newWebSocketFactory(messagingConfiguration);
final WebSocket ws;
try {
ws = webSocketFactory.createSocket(messagingConfiguration.getEndpointUri());
final String declaredAcksJsonArrayString = messagingConfiguration.getDeclaredAcknowledgements()
.stream()
.map(AcknowledgementLabel::toString)
.map(JsonValue::of)
.collect(JsonCollectors.valuesToArray())
.toString();
ws = webSocketFactory.createSocket(messagingConfiguration.getEndpointUri())
.addHeader(DittoHeaderDefinition.DECLARED_ACKS.getKey(), declaredAcksJsonArrayString);
} catch (final IOException e) {
throw MessagingException.connectFailed(sessionId, e);
}
Expand Down

0 comments on commit f853baf

Please sign in to comment.