Skip to content

Commit

Permalink
Merge pull request #1065 from bosch-io/feature/connection-announcement
Browse files Browse the repository at this point in the history
Inform target endpoint when managed connection is opend or closed #1052
  • Loading branch information
ffendt committed May 26, 2021
2 parents 159953b + ce6f482 commit 30167dc
Show file tree
Hide file tree
Showing 75 changed files with 2,857 additions and 285 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public static ConnectionId generateRandom() {
* it is already an instance of ConnectionId.
*
* @param connectionId the connection id.
* @throws ConnectionIdInvalidException if the provided {@code connectionId} is of invalid format.
* @return the connection ID.
*/
public static ConnectionId of(final CharSequence connectionId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,7 @@ public ConnectionBuilder payloadMappingDefinition(final PayloadMappingDefinition
public Connection build() {
checkSourceAndTargetAreValid();
checkAuthorizationContextsAreValid();
checkConnectionAnnouncementsOnlySetIfClientCount1();
migrateLegacyConfigurationOnTheFly();
return new ImmutableConnection(this);
}
Expand Down Expand Up @@ -768,6 +769,23 @@ private void checkAuthorizationContextsAreValid() {
}
}

private void checkConnectionAnnouncementsOnlySetIfClientCount1() {
if (clientCount > 1 && containsTargetWithConnectionAnnouncementsTopic()) {
final String message = MessageFormat.format("Connection announcements (topic {0}) can" +
" only be used with client count 1.", Topic.CONNECTION_ANNOUNCEMENTS.getName());
throw ConnectionConfigurationInvalidException.newBuilder(message)
.build();
}
}

private boolean containsTargetWithConnectionAnnouncementsTopic() {
return targets.stream()
.map(Target::getTopics)
.flatMap(Set::stream)
.map(FilteredTopic::getTopic)
.anyMatch(Topic.CONNECTION_ANNOUNCEMENTS::equals);
}

}

@Immutable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,23 +198,23 @@ private ImmutableFilteredTopicBuilder(final Topic topic) {

@Override
public ImmutableFilteredTopicBuilder withNamespaces(@Nullable final Collection<String> namespaces) {
this.namespaces = namespaces;
if (supportsNamespaces()) {
this.namespaces = namespaces;
}
return this;
}

@Override
public ImmutableFilteredTopicBuilder withFilter(@Nullable final CharSequence filter) {
// policy announcements do not support filter.
if (topic != Topic.POLICY_ANNOUNCEMENTS) {
if (supportsFilters()) {
this.filter = filter;
}
return this;
}

@Override
public ImmutableFilteredTopicBuilder withExtraFields(@Nullable final ThingFieldSelector extraFields) {
// policy announcements do not support extra fields.
if (topic != Topic.POLICY_ANNOUNCEMENTS) {
if (supportsExtraFields()) {
this.extraFields = extraFields;
}
return this;
Expand All @@ -225,6 +225,18 @@ public ImmutableFilteredTopic build() {
return new ImmutableFilteredTopic(this);
}

private boolean supportsNamespaces() {
return Topic.CONNECTION_ANNOUNCEMENTS != topic;
}

private boolean supportsFilters() {
return Topic.POLICY_ANNOUNCEMENTS != topic && Topic.CONNECTION_ANNOUNCEMENTS != topic;
}

private boolean supportsExtraFields() {
return Topic.POLICY_ANNOUNCEMENTS != topic && Topic.CONNECTION_ANNOUNCEMENTS != topic;
}

}

@Immutable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,14 @@ public enum Topic {
*
* @since 2.0.0
*/
POLICY_ANNOUNCEMENTS("_/_/policies/announcements", "policy-announcements");
POLICY_ANNOUNCEMENTS("_/_/policies/announcements", "policy-announcements"),

/**
* Connection target topic for connection announcements.
*
* @since 2.1.0
*/
CONNECTION_ANNOUNCEMENTS("_/_/connections/announcements", "connection-announcements");

private final String name;
private final String pubSubTopic;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.model.signals.announcements;


import static org.eclipse.ditto.base.model.common.ConditionChecker.checkNotNull;

import java.text.MessageFormat;
import java.util.Objects;
import java.util.function.Predicate;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.announcements.AbstractAnnouncement;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.ConnectionIdInvalidException;
import org.eclipse.ditto.json.JsonExceptionBuilder;
import org.eclipse.ditto.json.JsonField;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonParseException;
import org.eclipse.ditto.json.JsonPointer;

/**
* Abstract superclass of connectivity announcements.
*
* @param <T> type of a concrete subclass.
* @since 2.1.0
*/
public abstract class AbstractConnectivityAnnouncement<T extends AbstractConnectivityAnnouncement<T>>
extends AbstractAnnouncement<T>
implements ConnectivityAnnouncement<T> {

private final ConnectionId connectionId;

/**
* Create a connectivity announcement object.
*
* @param connectionId the connection ID.
* @param dittoHeaders the Ditto headers.
*/
protected AbstractConnectivityAnnouncement(final ConnectionId connectionId, final DittoHeaders dittoHeaders) {
super(dittoHeaders);
this.connectionId = checkNotNull(connectionId, "connectionId");
}

protected static ConnectionId deserializeConnectionId(final JsonObject jsonObject) {
final JsonFieldDefinition<String> fieldDefinition = ConnectivityAnnouncement.JsonFields.JSON_CONNECTION_ID;
final String connectionIdJsonValue = jsonObject.getValueOrThrow(fieldDefinition);
try {
return ConnectionId.of(connectionIdJsonValue);
} catch (final ConnectionIdInvalidException e) {
throw getJsonParseException(fieldDefinition, ConnectionId.class, e);
}
}

protected static JsonParseException getJsonParseException(final JsonFieldDefinition<?> fieldDefinition,
final Class<?> targetClass, final Throwable cause) {
return getJsonParseExceptionBuilder(fieldDefinition, targetClass, cause)
.build();
}

protected static JsonExceptionBuilder<JsonParseException> getJsonParseExceptionBuilder(
final JsonFieldDefinition<?> fieldDefinition, final Class<?> targetClass, final Throwable cause) {

return JsonParseException.newBuilder()
.message(MessageFormat.format("Failed to deserialize field <{0}> as {1}: {2}",
fieldDefinition.getPointer(),
targetClass.getName(),
cause.getMessage()))
.cause(cause);
}

/**
* Append {@code ConnectivityAnnouncement}-specific payload to the passed {@code jsonObjectBuilder}.
*
* @param jsonObjectBuilder the JsonObjectBuilder to add the payload to.
* @param predicate the predicate to evaluate when adding the payload.
*/
protected abstract void appendConnectivityAnnouncementPayload(JsonObjectBuilder jsonObjectBuilder,
Predicate<JsonField> predicate);

@Override
public ConnectionId getEntityId() {
return connectionId;
}

@Override
public JsonPointer getResourcePath() {
return JsonPointer.empty();
}

@Override
public String getResourceType() {
return RESOURCE_TYPE;
}

@Override
protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final Predicate<JsonField> predicate) {
jsonObjectBuilder.set(ConnectivityAnnouncement.JsonFields.JSON_CONNECTION_ID, connectionId.toString(), predicate);
appendConnectivityAnnouncementPayload(jsonObjectBuilder, predicate);
}

@Override
public String toString() {
return super.toString() + ", connectionId=" + connectionId;
}

@Override
public boolean equals(final Object other) {
if (super.equals(other)) {
return Objects.equals(((AbstractConnectivityAnnouncement<?>) other).connectionId, connectionId);
} else {
return false;
}
}

@Override
public int hashCode() {
return Objects.hash(connectionId, super.hashCode());
}
}
Loading

0 comments on commit 30167dc

Please sign in to comment.