Skip to content

Commit

Permalink
Add connectivity announcements to the ditto protocol
Browse files Browse the repository at this point in the history
Signed-off-by: Florian Fendt <Florian.Fendt@bosch.io>
  • Loading branch information
ffendt committed May 5, 2021
1 parent 486d3d2 commit 296e2dc
Show file tree
Hide file tree
Showing 32 changed files with 961 additions and 40 deletions.
Expand Up @@ -107,7 +107,7 @@ public String getResourceType() {

@Override
protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final Predicate<JsonField> predicate) {
jsonObjectBuilder.set(ConnectivityAnnouncement.JsonFields.JSON_CONNECTION_ID, connectionId.toString());
jsonObjectBuilder.set(ConnectivityAnnouncement.JsonFields.JSON_CONNECTION_ID, connectionId.toString(), predicate);
appendConnectivityAnnouncementPayload(jsonObjectBuilder, predicate);
}

Expand Down
Expand Up @@ -29,10 +29,15 @@
public interface ConnectivityAnnouncement<T extends ConnectivityAnnouncement<T>>
extends Announcement<T>, SignalWithEntityId<T> {

/**
* The service prefix for connectivity announcement commands.
*/
String SERVICE_PREFIX = "connectivity";

/**
* Type prefix of connection announcements.
*/
String TYPE_PREFIX = "connectivity." + TYPE_QUALIFIER + ":";
String TYPE_PREFIX = SERVICE_PREFIX + "." + TYPE_QUALIFIER + ":";

/**
* Connection resource type.
Expand Down
Expand Up @@ -72,7 +72,7 @@ public String getResourceType() {

@Override
protected void appendPayload(final JsonObjectBuilder jsonObjectBuilder, final Predicate<JsonField> predicate) {
jsonObjectBuilder.set(PolicyAnnouncement.JsonFields.JSON_POLICY_ID, policyId.toString());
jsonObjectBuilder.set(PolicyAnnouncement.JsonFields.JSON_POLICY_ID, policyId.toString(), predicate);
appendPolicyAnnouncementPayload(jsonObjectBuilder, predicate);
}

Expand Down
4 changes: 4 additions & 0 deletions protocol/pom.xml
Expand Up @@ -41,6 +41,10 @@
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-thingsearch-model</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-connectivity-model</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-messages-model</artifactId>
Expand Down
Expand Up @@ -65,10 +65,10 @@ private ImmutableTopicPath(final String namespace,
}

private Channel checkChannelArgument(final Channel channel, final Group group) {
if (group == Group.POLICIES) {
if (group == Group.POLICIES || group == Group.CONNECTIONS) {
// for policies group no channel is required/allowed
checkArgument(channel, ch -> ch == null || ch == Channel.NONE,
() -> "The policies group requires no channel.");
() -> "The policies and connections groups require no channel.");
return Channel.NONE;
} else {
// for other groups just check that a channel is there
Expand Down
Expand Up @@ -95,6 +95,12 @@ public TopicPathBuilder policies() {
return this;
}

@Override
public TopicPathBuilder connections() {
this.group = TopicPath.Group.CONNECTIONS;
return this;
}

@Override
public SearchTopicPathBuilder search() {
this.criterion = TopicPath.Criterion.SEARCH;
Expand Down
Expand Up @@ -112,6 +112,7 @@ public static TopicPath newTopicPath(final String path) {
final TopicPath.Channel channel;
switch (group) {
case POLICIES:
case CONNECTIONS:
channel = TopicPath.Channel.NONE;
break;
case THINGS:
Expand Down Expand Up @@ -204,7 +205,7 @@ public static TopicPathBuilder newTopicPathBuilder(final PolicyId policyId) {
}

/**
* Returns a new {@code TopicPathBuilder}. The {@code id} part of the {@code TopicPath} is set to
* Returns a new {@code TopicPathBuilder}. The {@code name} part of the {@code TopicPath} is set to
* {@link TopicPath#ID_PLACEHOLDER}.
*
* @param namespace the namespace.
Expand All @@ -215,6 +216,18 @@ public static TopicPathBuilder newTopicPathBuilderFromNamespace(final String nam
return ImmutableTopicPathBuilder.of(namespace, TopicPath.ID_PLACEHOLDER).things();
}

/**
* Returns a new {@code TopicPathBuilder}. The {@code namespace} part of the {@code TopicPath} is set to
* {@link TopicPath#ID_PLACEHOLDER}.
*
* @param name the name.
* @return the builder.
* @throws NullPointerException if {@code name} is {@code null}.
*/
public static TopicPathBuilder newTopicPathBuilderFromName(final String name) {
return ImmutableTopicPathBuilder.of(TopicPath.ID_PLACEHOLDER, name);
}

/**
* Returns a new {@code Payload} from the specified {@code jsonString}.
*
Expand Down
Expand Up @@ -17,6 +17,7 @@
import java.util.stream.Stream;

import org.eclipse.ditto.base.model.entity.type.EntityType;
import org.eclipse.ditto.connectivity.model.ConnectivityConstants;
import org.eclipse.ditto.policies.model.PolicyConstants;
import org.eclipse.ditto.policies.model.PolicyId;
import org.eclipse.ditto.things.model.ThingConstants;
Expand Down Expand Up @@ -150,7 +151,14 @@ enum Group {

POLICIES("policies", PolicyConstants.ENTITY_TYPE),

THINGS("things", ThingConstants.ENTITY_TYPE);
THINGS("things", ThingConstants.ENTITY_TYPE),

/**
* Connections group.
*
* @since 2.1.0
*/
CONNECTIONS("connections", ConnectivityConstants.ENTITY_TYPE);

private final String name;
private final EntityType entityType;
Expand Down
Expand Up @@ -32,7 +32,16 @@ public interface TopicPathBuilder {
TopicPathBuilder policies();

/**
* Sets the {@code Group} of this builder to {@link TopicPath.Criterion#SEARCH}. A previously set group is replaced.
* Sets the {@code Group} of this builder to {@link TopicPath.Group#CONNECTIONS}. A previously set group is
* replaced.
*
* @return this builder to allow method chaining.
*/
TopicPathBuilder connections();

/**
* Sets the {@code Group} of this builder to {@link TopicPath.Criterion#SEARCH}. A previously set group is
* replaced.
*
* @return this builder to allow method chaining.
*/
Expand Down
Expand Up @@ -19,6 +19,7 @@

import java.util.Arrays;

import org.eclipse.ditto.connectivity.model.signals.announcements.ConnectivityAnnouncement;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.base.model.exceptions.DittoRuntimeException;
Expand All @@ -36,6 +37,8 @@
import org.eclipse.ditto.protocol.UnknownEventException;
import org.eclipse.ditto.protocol.UnknownSignalException;
import org.eclipse.ditto.protocol.adapter.acknowledgements.DefaultAcknowledgementsAdapterProvider;
import org.eclipse.ditto.protocol.adapter.connectivity.ConnectivityCommandAdapterProvider;
import org.eclipse.ditto.protocol.adapter.connectivity.DefaultConnectivityCommandAdapterProvider;
import org.eclipse.ditto.protocol.adapter.policies.DefaultPolicyCommandAdapterProvider;
import org.eclipse.ditto.protocol.adapter.provider.AcknowledgementAdapterProvider;
import org.eclipse.ditto.protocol.adapter.provider.PolicyCommandAdapterProvider;
Expand Down Expand Up @@ -82,6 +85,7 @@ public final class DittoProtocolAdapter implements ProtocolAdapter {
private final HeaderTranslator headerTranslator;
private final ThingCommandAdapterProvider thingsAdapters;
private final PolicyCommandAdapterProvider policiesAdapters;
private final ConnectivityCommandAdapterProvider connectivityAdapters;
private final AcknowledgementAdapterProvider acknowledgementAdapters;
private final AdapterResolver adapterResolver;

Expand All @@ -90,16 +94,19 @@ private DittoProtocolAdapter(final ErrorRegistry<DittoRuntimeException> errorReg
this.headerTranslator = checkNotNull(headerTranslator, "headerTranslator");
this.thingsAdapters = new DefaultThingCommandAdapterProvider(errorRegistry, headerTranslator);
this.policiesAdapters = new DefaultPolicyCommandAdapterProvider(errorRegistry, headerTranslator);
this.connectivityAdapters = new DefaultConnectivityCommandAdapterProvider(headerTranslator);
this.acknowledgementAdapters = new DefaultAcknowledgementsAdapterProvider(errorRegistry, headerTranslator);
this.adapterResolver = new DefaultAdapterResolver(thingsAdapters, policiesAdapters, acknowledgementAdapters);
}

private DittoProtocolAdapter(final HeaderTranslator headerTranslator,
final ThingCommandAdapterProvider thingsAdapters, final PolicyCommandAdapterProvider policiesAdapters,
final ConnectivityCommandAdapterProvider connectivityAdapters,
final AcknowledgementAdapterProvider acknowledgementAdapters, final AdapterResolver adapterResolver) {
this.headerTranslator = checkNotNull(headerTranslator, "headerTranslator");
this.thingsAdapters = checkNotNull(thingsAdapters, "thingsAdapters");
this.policiesAdapters = checkNotNull(policiesAdapters, "policiesAdapters");
this.connectivityAdapters = checkNotNull(connectivityAdapters, "connectivityAdapters");
this.acknowledgementAdapters = checkNotNull(acknowledgementAdapters, "acknowledgementAdapters");
this.adapterResolver = checkNotNull(adapterResolver, "adapterResolver");
}
Expand Down Expand Up @@ -138,17 +145,19 @@ public static HeaderTranslator getHeaderTranslator() {
* @param headerTranslator translator between external and Ditto headers
* @param thingCommandAdapterProvider command adapters for thing commands
* @param policyCommandAdapterProvider command adapters for policy commands
* @param connectivityAdapters adapters for connectivity commands.
* @param acknowledgementAdapters adapters for acknowledgements.
* @param adapterResolver resolves the correct adapter from a command
* @return new instance of {@link DittoProtocolAdapter}
*/
static DittoProtocolAdapter newInstance(final HeaderTranslator headerTranslator,
final ThingCommandAdapterProvider thingCommandAdapterProvider,
final PolicyCommandAdapterProvider policyCommandAdapterProvider,
final ConnectivityCommandAdapterProvider connectivityAdapters,
final AcknowledgementAdapterProvider acknowledgementAdapters,
final AdapterResolver adapterResolver) {
return new DittoProtocolAdapter(headerTranslator, thingCommandAdapterProvider, policyCommandAdapterProvider,
acknowledgementAdapters, adapterResolver
connectivityAdapters, acknowledgementAdapters, adapterResolver
);
}

Expand Down Expand Up @@ -181,6 +190,8 @@ public Adaptable toAdaptable(final Signal<?> signal, final TopicPath.Channel cha
return toAdaptable((Event<?>) signal, channel);
} else if (signal instanceof PolicyAnnouncement) {
return adaptPolicyAnnouncement((PolicyAnnouncement<?>) signal);
} else if (signal instanceof ConnectivityAnnouncement) {
return adaptConnectivityAnnouncement((ConnectivityAnnouncement<?>) signal);
}
throw UnknownSignalException.newBuilder(signal.getName()).dittoHeaders(signal.getDittoHeaders()).build();
}
Expand Down Expand Up @@ -333,6 +344,10 @@ private Adaptable adaptPolicyAnnouncement(final PolicyAnnouncement<?> announceme
return policiesAdapters.getAnnouncementAdapter().toAdaptable(announcement);
}

private Adaptable adaptConnectivityAnnouncement(final ConnectivityAnnouncement<?> announcement) {
return connectivityAdapters.getAnnouncementAdapter().toAdaptable(announcement);
}

public Adaptable toAdaptable(final SubscriptionEvent<?> subscriptionEvent, final TopicPath.Channel channel) {
validateNotLive(subscriptionEvent);
return thingsAdapters.getSubscriptionEventAdapter().toAdaptable(subscriptionEvent, channel);
Expand Down
Expand Up @@ -10,7 +10,7 @@
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.protocol.adapter.things;
package org.eclipse.ditto.protocol.adapter;

import org.eclipse.ditto.json.JsonPointer;
import org.eclipse.ditto.protocol.PayloadPathMatcher;
Expand All @@ -21,14 +21,14 @@
* <p>
* @since 2.0.0
*/
final class EmptyPathMatcher implements PayloadPathMatcher {
public final class EmptyPathMatcher implements PayloadPathMatcher {

private static final EmptyPathMatcher INSTANCE = new EmptyPathMatcher();

private EmptyPathMatcher() {
}

static EmptyPathMatcher getInstance() {
public static EmptyPathMatcher getInstance() {
return INSTANCE;
}

Expand Down
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.protocol.adapter.connectivity;

import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.protocol.Adaptable;
import org.eclipse.ditto.protocol.HeaderTranslator;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.protocol.adapter.AbstractAdapter;
import org.eclipse.ditto.protocol.adapter.EmptyPathMatcher;
import org.eclipse.ditto.protocol.mapper.SignalMapper;
import org.eclipse.ditto.protocol.mappingstrategies.MappingStrategies;

/**
* Base class for {@link org.eclipse.ditto.protocol.adapter.Adapter}s that handle connectivity commands.
*
* @param <T> the type of the connectivity command
* @since 2.1.0
*/
abstract class AbstractConnectivityAdapter<T extends Signal<?>> extends AbstractAdapter<T> implements ConnectivityAdapter<T> {

private final SignalMapper<T> signalMapper;

/**
* @param mappingStrategies the {@link MappingStrategies} used to convert {@link Adaptable}s to
* {@link Signal}s
* @param signalMapper the {@link SignalMapper} used to convert from a
* {@link Signal} to an {@link Adaptable}
* @param headerTranslator the header translator
*/
protected AbstractConnectivityAdapter(final MappingStrategies<T> mappingStrategies,
final SignalMapper<T> signalMapper, final HeaderTranslator headerTranslator) {
super(mappingStrategies, headerTranslator, EmptyPathMatcher.getInstance());
this.signalMapper = signalMapper;
}

@Override
protected Adaptable mapSignalToAdaptable(final T signal, final TopicPath.Channel channel) {
return signalMapper.mapSignalToAdaptable(signal, channel);
}

@Override
public Adaptable toAdaptable(final T t) {
return super.toAdaptable(t, TopicPath.Channel.NONE);
}

}
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.protocol.adapter.connectivity;

import java.util.EnumSet;
import java.util.Set;

import org.eclipse.ditto.base.model.signals.Signal;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.protocol.adapter.Adapter;

/**
* Mixin for connectivity adapters.
*
* @since 2.1.0
*/
interface ConnectivityAdapter<T extends Signal<?>> extends Adapter<T> {

@Override
default Set<TopicPath.Group> getGroups() {
return EnumSet.of(TopicPath.Group.CONNECTIONS);
}

@Override
default Set<TopicPath.Channel> getChannels() {
return EnumSet.of(TopicPath.Channel.NONE);
}

}

0 comments on commit 296e2dc

Please sign in to comment.