Skip to content

Commit

Permalink
[eclipse-ditto#964] add notifications to Ditto protocol; fix ShardReg…
Browse files Browse the repository at this point in the history
…ionExtractorTest.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Feb 12, 2021
1 parent c613db9 commit 5c1d54c
Show file tree
Hide file tree
Showing 28 changed files with 536 additions and 70 deletions.
4 changes: 4 additions & 0 deletions protocol-adapter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-signals-events-thingsearch</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-signals-notifications-policies</artifactId>
</dependency>

<!-- ### Provided ### -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.eclipse.ditto.signals.events.things.ThingEvent;
import org.eclipse.ditto.signals.events.things.ThingMerged;
import org.eclipse.ditto.signals.events.thingsearch.SubscriptionEvent;
import org.eclipse.ditto.signals.notifications.policies.PolicyNotification;

/**
* Adapter for the Ditto protocol.
Expand Down Expand Up @@ -168,6 +169,8 @@ public Adaptable toAdaptable(final Signal<?> signal, final TopicPath.Channel cha
return toAdaptable((CommandResponse<?>) signal, channel);
} else if (signal instanceof Event) {
return toAdaptable((Event<?>) signal, channel);
} else if (signal instanceof PolicyNotification) {
return adaptPolicyNotification((PolicyNotification<?>) signal);
}
throw UnknownSignalException.newBuilder(signal.getName()).dittoHeaders(signal.getDittoHeaders()).build();
}
Expand Down Expand Up @@ -298,10 +301,8 @@ public Adaptable toAdaptable(final Event<?> event, final TopicPath.Channel chann
return toAdaptable((ThingEvent<?>) event, channel);
} else if (event instanceof SubscriptionEvent) {
validateChannel(channel, event, TWIN);
return toAdaptable((SubscriptionEvent<?>) event, channel);
}

else {
return toAdaptable((SubscriptionEvent<?>) event, channel);
} else {
throw UnknownEventException.newBuilder(event.getName()).build();
}
}
Expand All @@ -316,7 +317,11 @@ public Adaptable toAdaptable(final ThingEvent<?> thingEvent, final TopicPath.Cha
}
}

public Adaptable toAdaptable(final SubscriptionEvent<?> subscriptionEvent, final TopicPath.Channel channel){
private Adaptable adaptPolicyNotification(final PolicyNotification<?> notification) {
return policiesAdapters.getNotificationAdapter().toAdaptable(notification);
}

public Adaptable toAdaptable(final SubscriptionEvent<?> subscriptionEvent, final TopicPath.Channel channel) {
validateNotLive(subscriptionEvent);
return thingsAdapters.getSubscriptionEventAdapter().toAdaptable(subscriptionEvent, channel);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
*/
@NotThreadSafe
final class ImmutableTopicPathBuilder implements TopicPathBuilder, MessagesTopicPathBuilder, EventsTopicPathBuilder,
CommandsTopicPathBuilder, AcknowledgementTopicPathBuilder, SearchTopicPathBuilder {
CommandsTopicPathBuilder, AcknowledgementTopicPathBuilder, SearchTopicPathBuilder,
NotificationsTopicPathBuilder {

private final String namespace;
private final String name;
Expand Down Expand Up @@ -106,6 +107,12 @@ public CommandsTopicPathBuilder commands() {
return this;
}

@Override
public NotificationsTopicPathBuilder notifications() {
this.criterion = TopicPath.Criterion.NOTIFICATIONS;
return this;
}

@Override
public TopicPathBuilder twin() {
this.channel = TopicPath.Channel.TWIN;
Expand Down Expand Up @@ -250,6 +257,12 @@ public MessagesTopicPathBuilder subject(final String subject) {
return this;
}

@Override
public NotificationsTopicPathBuilder name(final String name) {
this.subject = checkNotNull(name, "name");
return this;
}

@Override
public AcknowledgementTopicPathBuilder label(final CharSequence label) {
subject = checkNotNull(label, "label").toString();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.protocoladapter;

/**
* Builder to create a topic path for notifications.
*
* @since 2.0.0
*/
public interface NotificationsTopicPathBuilder extends TopicPathBuildable {

/**
* Set the notification name on the topic path.
*
* @return this builder.
*/
NotificationsTopicPathBuilder name(String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,16 @@ public static TopicPath newTopicPath(final String path) {
return ImmutableTopicPath.of(namespace, id, group, channel, criterion);
case MESSAGES:
case ACKS:
// messages should always contain a non-empty subject:
case NOTIFICATIONS:
// messages and notifications should always contain a non-empty subject:
// ACK Paths contain a custom acknowledgement label or an empty subject for aggregated ACKs:
final String subject = String.join(TopicPath.PATH_DELIMITER, parts);
if (subject.isEmpty()) {
return ImmutableTopicPath.of(namespace, id, group, channel, criterion);
} else {
return ImmutableTopicPath.of(namespace, id, group, channel, criterion, subject);
}

default:
throw UnknownTopicPathException.newBuilder(path).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,14 @@ enum Criterion {
*
* @since 1.1.0
*/
ACKS("acks");
ACKS("acks"),

/**
* Criterion for notifications.
*
* @since 2.0.0
*/
NOTIFICATIONS("notifications");

private final String name;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ public interface TopicPathBuilder {
*/
CommandsTopicPathBuilder commands();

/**
* Sets the {@code Criterion} of this builder to {@link TopicPath.Criterion#NOTIFICATIONS}.
*
* @return this builder.
* @since 2.0.0
*/
NotificationsTopicPathBuilder notifications();

/**
* Sets the {@code Channel} of this builder to {@link TopicPath.Channel#TWIN}. A previously set channel is
* replaced.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ public static PolicyQueryCommandResponseMappingStrategies getPolicyQueryCommandR
return PolicyQueryCommandResponseMappingStrategies.getInstance();
}

public static PolicyNotificationMappingStrategies getPolicyNotificationMappingStrategies() {
return PolicyNotificationMappingStrategies.getInstance();
}

public static ThingMergeCommandMappingStrategies getThingMergeCommandMappingStrategies() {
return ThingMergeCommandMappingStrategies.getInstance();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.protocoladapter.adaptables;

import java.time.Instant;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;

import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.policies.PolicyId;
import org.eclipse.ditto.model.policies.SubjectId;
import org.eclipse.ditto.protocoladapter.Adaptable;
import org.eclipse.ditto.protocoladapter.JsonifiableMapper;
import org.eclipse.ditto.signals.notifications.policies.PolicyNotification;
import org.eclipse.ditto.signals.notifications.policies.SubjectExpiryNotification;

/**
* Defines mapping strategies (map from signal type to JsonifiableMapper) for policy notifications.
*
* @since 2.0.0
*/
final class PolicyNotificationMappingStrategies extends AbstractPolicyMappingStrategies<PolicyNotification<?>> {

private static final PolicyNotificationMappingStrategies INSTANCE = new PolicyNotificationMappingStrategies();

private PolicyNotificationMappingStrategies() {
super(initMappingStrategies());
}

/**
* Get the unique instance of this class.
*
* @return the instance.
*/
public static PolicyNotificationMappingStrategies getInstance() {
return INSTANCE;
}

private static Map<String, JsonifiableMapper<PolicyNotification<?>>> initMappingStrategies() {
final Map<String, JsonifiableMapper<PolicyNotification<?>>> mappingStrategies = new HashMap<>();
mappingStrategies.put(SubjectExpiryNotification.TYPE,
PolicyNotificationMappingStrategies::toSubjectExpiryNotification);
return mappingStrategies;
}

private static SubjectExpiryNotification toSubjectExpiryNotification(final Adaptable adaptable) {
final PolicyId policyId = policyIdFromTopicPath(adaptable.getTopicPath());
final DittoHeaders dittoHeaders = dittoHeadersFrom(adaptable);
final JsonObject payload = adaptable.getPayload()
.getValue()
.filter(JsonValue::isObject)
.map(JsonValue::asObject)
.orElseThrow(NoSuchElementException::new);
final Instant expiry = Instant.parse(payload.getValueOrThrow(SubjectExpiryNotification.JsonFields.EXPIRY));
final Collection<SubjectId> expiringSubjectIds =
payload.getValueOrThrow(SubjectExpiryNotification.JsonFields.EXPIRING_SUBJECTS)
.stream()
.map(JsonValue::asString)
.map(SubjectId::newInstance)
.collect(Collectors.toList());
return SubjectExpiryNotification.of(policyId, expiry, expiringSubjectIds, dittoHeaders);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@
import org.eclipse.ditto.signals.commands.policies.modify.PolicyModifyCommandResponse;
import org.eclipse.ditto.signals.commands.policies.query.PolicyQueryCommand;
import org.eclipse.ditto.signals.commands.policies.query.PolicyQueryCommandResponse;
import org.eclipse.ditto.signals.notifications.policies.PolicyNotification;

/**
* Instantiates and provides {@link Adapter}s used to process Policy commands, responses and errors.
* Instantiates and provides {@link Adapter}s used to process Policy commands, responses and errors and notifications.
*
* @since 1.1.0
*/
Expand All @@ -38,6 +39,7 @@ public final class DefaultPolicyCommandAdapterProvider implements PolicyCommandA
private final PolicyQueryCommandAdapter policyQueryCommandAdapter;
private final PolicyModifyCommandResponseAdapter policyModifyCommandResponseAdapter;
private final PolicyQueryCommandResponseAdapter policyQueryCommandResponseAdapter;
private final PolicyNotificationAdapter policyNotificationAdapter;

public DefaultPolicyCommandAdapterProvider(final ErrorRegistry<DittoRuntimeException> errorRegistry,
final HeaderTranslator headerTranslator) {
Expand All @@ -46,6 +48,7 @@ public DefaultPolicyCommandAdapterProvider(final ErrorRegistry<DittoRuntimeExcep
policyQueryCommandAdapter = PolicyQueryCommandAdapter.of(headerTranslator);
policyModifyCommandResponseAdapter = PolicyModifyCommandResponseAdapter.of(headerTranslator);
policyQueryCommandResponseAdapter = PolicyQueryCommandResponseAdapter.of(headerTranslator);
policyNotificationAdapter = PolicyNotificationAdapter.of(headerTranslator);
}

public Adapter<PolicyErrorResponse> getErrorResponseAdapter() {
Expand All @@ -60,22 +63,27 @@ public Adapter<PolicyModifyCommandResponse<?>> getModifyCommandResponseAdapter()
return policyModifyCommandResponseAdapter;
}

public Adapter<PolicyQueryCommand<?>> getQueryCommandAdapter() {
return policyQueryCommandAdapter;
}

public Adapter<PolicyQueryCommandResponse<?>> getQueryCommandResponseAdapter() {
return policyQueryCommandResponseAdapter;
}

public Adapter<PolicyNotification<?>> getNotificationAdapter() {
return policyNotificationAdapter;
}

@Override
public List<Adapter<?>> getAdapters() {
return Arrays.asList(
policyErrorResponseAdapter,
policyModifyCommandAdapter,
policyQueryCommandAdapter,
policyModifyCommandResponseAdapter,
policyQueryCommandResponseAdapter
policyQueryCommandResponseAdapter,
policyNotificationAdapter
);
}

public Adapter<PolicyQueryCommand<?>> getQueryCommandAdapter() {
return policyQueryCommandAdapter;
}

public Adapter<PolicyQueryCommandResponse<?>> getQueryCommandResponseAdapter() {
return policyQueryCommandResponseAdapter;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.protocoladapter.policies;

import static java.util.Objects.requireNonNull;

import java.util.Collections;
import java.util.Set;

import org.eclipse.ditto.protocoladapter.Adaptable;
import org.eclipse.ditto.protocoladapter.HeaderTranslator;
import org.eclipse.ditto.protocoladapter.TopicPath;
import org.eclipse.ditto.protocoladapter.UnknownTopicPathException;
import org.eclipse.ditto.protocoladapter.adaptables.MappingStrategiesFactory;
import org.eclipse.ditto.protocoladapter.signals.SignalMapperFactory;
import org.eclipse.ditto.signals.notifications.policies.PolicyNotification;

/**
* Adapter for mapping a {@link PolicyNotification} to and from an {@link org.eclipse.ditto.protocoladapter.Adaptable}.
*/
final class PolicyNotificationAdapter extends AbstractPolicyAdapter<PolicyNotification<?>> {

private PolicyNotificationAdapter(final HeaderTranslator headerTranslator) {
super(MappingStrategiesFactory.getPolicyNotificationMappingStrategies(),
SignalMapperFactory.newPolicyNotificationSignalMapper(),
headerTranslator);
}

/**
* Returns a new PolicyNotificationAdapter.
*
* @param headerTranslator translator between external and Ditto headers.
* @return the adapter.
*/
public static PolicyNotificationAdapter of(final HeaderTranslator headerTranslator) {
return new PolicyNotificationAdapter(requireNonNull(headerTranslator));
}

@Override
public Set<TopicPath.Criterion> getCriteria() {
return Collections.singleton(TopicPath.Criterion.NOTIFICATIONS);
}

@Override
public Set<TopicPath.Action> getActions() {
return Collections.emptySet();
}

@Override
public boolean isForResponses() {
return false;
}

@Override
protected String getType(final Adaptable adaptable) {
final TopicPath topicPath = adaptable.getTopicPath();
final String commandName =
topicPath.getSubject().orElseThrow(() -> UnknownTopicPathException.newBuilder(topicPath).build());
return topicPath.getGroup() + "." + getTypeCriterionAsString(topicPath) + ":" + commandName;
}

}

0 comments on commit 5c1d54c

Please sign in to comment.