diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/AckUpdater.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/AckUpdater.java
index 9008d811bd..5108fc84e8 100644
--- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/AckUpdater.java
+++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/actors/AckUpdater.java
@@ -25,6 +25,7 @@
import javax.annotation.Nullable;
+import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabelNotUniqueException;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
@@ -39,7 +40,6 @@
import org.eclipse.ditto.services.utils.pubsub.ddata.DData;
import org.eclipse.ditto.services.utils.pubsub.ddata.DDataWriter;
import org.eclipse.ditto.services.utils.pubsub.ddata.ack.Grouped;
-import org.eclipse.ditto.services.utils.pubsub.ddata.ack.GroupedAckLabels;
import org.eclipse.ditto.services.utils.pubsub.ddata.ack.GroupedRelation;
import org.eclipse.ditto.services.utils.pubsub.ddata.literal.LiteralUpdate;
@@ -185,9 +185,9 @@ private void tick(Clock tick) {
}
private void onChanged(final Replicator.Changed> event) {
- final Map
> mmap =
- GroupedAckLabels.deserializeORMultiMap(event.get(ackDData.getReader().getKey()));
- final List remoteGroupedAckLabels = getRemoteGroupedAckLabelsOrderByAddress(mmap);
+ final Map>> mmap =
+ Grouped.deserializeORMultiMap(event.get(ackDData.getReader().getKey()), JsonValue::asString);
+ final List> remoteGroupedAckLabels = getRemoteGroupedAckLabelsOrderByAddress(mmap);
remoteGroups = getRemoteGroups(remoteGroupedAckLabels);
remoteAckLabels = getRemoteAckLabels(remoteGroupedAckLabels);
@@ -200,8 +200,8 @@ private void onChanged(final Replicator.Changed> event) {
ddataChangeRecipients.forEach(recipient -> recipient.tell(ddataChanged, getSelf()));
}
- private List getRemoteGroupedAckLabelsOrderByAddress(
- final Map> mmap) {
+ private List> getRemoteGroupedAckLabelsOrderByAddress(
+ final Map>> mmap) {
return mmap.entrySet()
.stream()
@@ -216,18 +216,18 @@ private boolean isNotOwnAddress(final Map.Entry entry) {
return !ownAddress.equals(entry.getKey());
}
- private Map> getRemoteGroups(final List remoteGroupedAckLabels) {
+ private Map> getRemoteGroups(final List> remoteGroupedAckLabels) {
final Map> result = new HashMap<>();
remoteGroupedAckLabels.stream()
- .flatMap(GroupedAckLabels::streamAsGroupedPair)
+ .flatMap(Grouped::streamAsGroupedPair)
// do not set a group of ack labels if already set by a member of smaller address
.forEach(pair -> result.computeIfAbsent(pair.first(), group -> pair.second()));
return Collections.unmodifiableMap(result);
}
- private Set getRemoteAckLabels(final List remoteGroupedAckLabels) {
+ private Set getRemoteAckLabels(final List> remoteGroupedAckLabels) {
return remoteGroupedAckLabels.stream()
- .flatMap(GroupedAckLabels::streamAckLabels)
+ .flatMap(Grouped::streamValues)
.collect(Collectors.toSet());
}
@@ -263,8 +263,7 @@ private void writeLocalDData() {
private LiteralUpdate createDDataUpdate() {
final Set groupedAckLabels = localAckLabels.streamGroupedValues()
- .map(GroupedAckLabels::fromGrouped)
- .map(GroupedAckLabels::toJsonString)
+ .map(Grouped::toJsonString)
.collect(Collectors.toSet());
return LiteralUpdate.replaceAll(groupedAckLabels);
}
@@ -275,12 +274,12 @@ private void failSubscribe(final ActorRef sender) {
sender.tell(error, getSelf());
}
- private List getLocalLosers(final Map> mmap) {
- final Map> moreImportantEntries = mmap.entrySet()
+ private List getLocalLosers(final Map>> mmap) {
+ final Map>> moreImportantEntries = mmap.entrySet()
.stream()
.filter(entry -> Address.addressOrdering().compare(entry.getKey(), ownAddress) < 0)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- final List moreImportantGroupedAckLabels =
+ final List> moreImportantGroupedAckLabels =
getRemoteGroupedAckLabelsOrderByAddress(moreImportantEntries);
final Map> moreImportantRemoteGroups = getRemoteGroups(moreImportantGroupedAckLabels);
final Set moreImportantAckLabels = getRemoteAckLabels(moreImportantGroupedAckLabels);
diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/api/RemoteAcksChanged.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/api/RemoteAcksChanged.java
index 58bc0bd4d2..1131f2911d 100644
--- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/api/RemoteAcksChanged.java
+++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/api/RemoteAcksChanged.java
@@ -17,7 +17,7 @@
import java.util.Set;
import java.util.stream.Collectors;
-import org.eclipse.ditto.services.utils.pubsub.ddata.ack.GroupedAckLabels;
+import org.eclipse.ditto.services.utils.pubsub.ddata.ack.Grouped;
import akka.actor.Address;
import akka.japi.Pair;
@@ -28,9 +28,9 @@
public final class RemoteAcksChanged {
// TODO: store in format for fast lookups by Publisher
- private final Map> mmap;
+ private final Map>> mmap;
- private RemoteAcksChanged(final Map> mmap) {
+ private RemoteAcksChanged(final Map>> mmap) {
this.mmap = mmap;
}
@@ -40,7 +40,7 @@ private RemoteAcksChanged(final Map> mmap) {
* @param mmap the multimap.
* @return the change notification.
*/
- public static RemoteAcksChanged of(final Map> mmap) {
+ public static RemoteAcksChanged of(final Map>> mmap) {
return new RemoteAcksChanged(mmap);
}
@@ -51,7 +51,7 @@ public Map> getMultiMap() {
.map(entry -> Pair.create(entry.getKey(),
entry.getValue()
.stream()
- .flatMap(GroupedAckLabels::streamAckLabels)
+ .flatMap(Grouped::streamValues)
.collect(Collectors.toSet())
))
.collect(Collectors.toMap(Pair::first, Pair::second));
diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/ack/Grouped.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/ack/Grouped.java
index 2cfb0e68b9..d9425678a2 100644
--- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/ack/Grouped.java
+++ b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/ack/Grouped.java
@@ -14,11 +14,27 @@
import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import javax.annotation.Nullable;
+import org.eclipse.ditto.json.JsonArray;
+import org.eclipse.ditto.json.JsonCollectors;
+import org.eclipse.ditto.json.JsonFactory;
+import org.eclipse.ditto.json.JsonFieldDefinition;
+import org.eclipse.ditto.json.JsonObject;
+import org.eclipse.ditto.json.JsonValue;
+
+import akka.cluster.ddata.ORMultiMap;
+import akka.japi.Pair;
+import scala.jdk.javaapi.CollectionConverters;
+
/**
* Model of a set of values with an optional group.
*
@@ -26,10 +42,10 @@
*/
public final class Grouped {
- @Nullable private final String group;
- private final Set values;
+ @Nullable final String group;
+ final Set values;
- private Grouped(@Nullable final String group, final Set values) {
+ Grouped(@Nullable final String group, final Set values) {
this.group = group;
this.values = checkNotNull(values, "values");
}
@@ -74,4 +90,105 @@ public Optional getGroup() {
public Set getValues() {
return values;
}
+
+ /**
+ * Return a stream containing a pair with guaranteed group name and values if this object has a group name,
+ * or an empty stream otherwise.
+ *
+ * @return the stream.
+ */
+ public Stream>> streamAsGroupedPair() {
+ if (group != null) {
+ return Stream.of(Pair.create(group, values));
+ } else {
+ return Stream.empty();
+ }
+ }
+
+ /**
+ * Stream values of this object.
+ *
+ * @return a stream of values.
+ */
+ public Stream streamValues() {
+ return values.stream();
+ }
+
+ /**
+ * Serialize this object as a JSON object.
+ *
+ * @return JSON representation of this object.
+ */
+ public JsonObject toJson() {
+ return JsonFactory.newObjectBuilder()
+ .set(JsonFields.GROUP, group, field -> !field.getValue().isNull())
+ .set(JsonFields.ACK_LABELS, values.stream()
+ .map(JsonValue::of)
+ .collect(JsonCollectors.valuesToArray()))
+ .build();
+ }
+
+ /**
+ * Serialize this object as a JSON string.
+ *
+ * @return JSON representation of this object.
+ */
+ public String toJsonString() {
+ return toJson().toString();
+ }
+
+ /**
+ * Deserialize grouped values from a JSON object.
+ *
+ * @param jsonObject the JSON object.
+ * @param deserializeValue deserializer of values.
+ * @param type of values.
+ * @return grouped values.
+ */
+ public static Grouped fromJson(final JsonObject jsonObject, final Function deserializeValue) {
+ final String group = jsonObject.getValue(JsonFields.GROUP).orElse(null);
+ final Set ackLabels = jsonObject.getValue(JsonFields.ACK_LABELS)
+ .stream()
+ .flatMap(JsonArray::stream)
+ .map(deserializeValue)
+ .collect(Collectors.toSet());
+ return new Grouped<>(group, ackLabels);
+ }
+
+ /**
+ * Deserialize string values of an {@code ORMultiMap} as grouped acknowledgement labels in JSON format.
+ *
+ * @param orMultiMap the ORMultiMap.
+ * @param valueDeserializer deserializer of values.
+ * @param the type of keys in the ORMultiMap.
+ * @param the type of values.
+ * @return a multi-map from keys to grouped ack labels deserialized from each binding.
+ */
+ public static Map>> deserializeORMultiMap(final ORMultiMap orMultiMap,
+ final Function valueDeserializer) {
+ final Map> entries =
+ CollectionConverters.asJava(orMultiMap.entries());
+ return entries.entrySet()
+ .stream()
+ .map(entry -> Pair.create(entry.getKey(), deserializeGroupedList(entry.getValue(), valueDeserializer)))
+ .collect(Collectors.toMap(Pair::first, Pair::second));
+ }
+
+ private static List> deserializeGroupedList(
+ final scala.collection.immutable.Set bindingValues,
+ final Function valueDeserializer) {
+ return CollectionConverters.asJava(bindingValues)
+ .stream()
+ .map(s -> Grouped.fromJson(JsonObject.of(s), valueDeserializer))
+ .collect(Collectors.toList());
+ }
+
+ // JSON field names are 1-character long to conserve space in the distributed data.
+ private static final class JsonFields {
+
+ private static final JsonFieldDefinition GROUP = JsonFactory.newStringFieldDefinition("g");
+
+ private static final JsonFieldDefinition ACK_LABELS = JsonFactory.newJsonArrayFieldDefinition("a");
+
+ }
}
diff --git a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/ack/GroupedAckLabels.java b/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/ack/GroupedAckLabels.java
deleted file mode 100644
index fc571ac30a..0000000000
--- a/services/utils/pubsub/src/main/java/org/eclipse/ditto/services/utils/pubsub/ddata/ack/GroupedAckLabels.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Copyright (c) 2020 Contributors to the Eclipse Foundation
- *
- * See the NOTICE file(s) distributed with this work for additional
- * information regarding copyright ownership.
- *
- * This program and the accompanying materials are made available under the
- * terms of the Eclipse Public License 2.0 which is available at
- * http://www.eclipse.org/legal/epl-2.0
- *
- * SPDX-License-Identifier: EPL-2.0
- */
-package org.eclipse.ditto.services.utils.pubsub.ddata.ack;
-
-import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;
-
-import java.util.AbstractMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import javax.annotation.Nullable;
-
-import org.eclipse.ditto.json.JsonArray;
-import org.eclipse.ditto.json.JsonCollectors;
-import org.eclipse.ditto.json.JsonFactory;
-import org.eclipse.ditto.json.JsonFieldDefinition;
-import org.eclipse.ditto.json.JsonObject;
-import org.eclipse.ditto.json.JsonValue;
-
-import akka.cluster.ddata.ORMultiMap;
-import akka.japi.Pair;
-import scala.jdk.javaapi.CollectionConverters;
-
-/**
- * A set of acknowledgement labels together with an optional group name for storage in an {@code ORMultiMap}.
- */
-public final class GroupedAckLabels {
-
- @Nullable private final String group;
- private final Set ackLabels;
-
- private GroupedAckLabels(@Nullable final String group, final Set ackLabels) {
- this.group = group;
- this.ackLabels = checkNotNull(ackLabels, "values");
- }
-
- /**
- * Create grouped ack labels from grouped strings.
- *
- * @param grouped a set of strings with an optional group name.
- * @return Grouped ack labels.
- */
- public static GroupedAckLabels fromGrouped(final Grouped grouped) {
- return new GroupedAckLabels(grouped.getGroup().orElse(null), grouped.getValues());
- }
-
- /**
- * Deserialize string values of an {@code ORMultiMap} as grouped acknowledgement labels in JSON format.
- *
- * @param orMultiMap the ORMultiMap.
- * @param the type of keys in the ORMultiMap.
- * @return a multi-map from keys to grouped ack labels deserialized from each binding.
- */
- public static Map> deserializeORMultiMap(final ORMultiMap orMultiMap) {
- final Map> entries =
- CollectionConverters.asJava(orMultiMap.entries());
- return entries.entrySet()
- .stream()
- .map(entry -> new AbstractMap.SimpleEntry<>(entry.getKey(), deserializeAckGroups(entry.getValue())))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- }
-
- /**
- * Deserialize grouped ack labels from a JSON string.
- *
- * @param jsonString the JSON string.
- * @return grouped ack labels.
- */
- public static GroupedAckLabels fromJsonString(final String jsonString) {
- return fromJson(JsonFactory.newObject(jsonString));
- }
-
- /**
- * Deserialize grouped ack labels from a JSON object.
- *
- * @param jsonObject the JSON object.
- * @return grouped ack labels.
- */
- public static GroupedAckLabels fromJson(final JsonObject jsonObject) {
- final String group = jsonObject.getValue(JsonFields.GROUP).orElse(null);
- final Set ackLabels = jsonObject.getValue(JsonFields.ACK_LABELS)
- .stream()
- .flatMap(JsonArray::stream)
- .map(JsonValue::asString)
- .collect(Collectors.toSet());
- return new GroupedAckLabels(group, ackLabels);
- }
-
- /**
- * Return a stream containing a pair with guaranteed group name and ack labels if this object has a group name,
- * or an empty stream otherwise.
- *
- * @return the stream.
- */
- public Stream>> streamAsGroupedPair() {
- if (group != null) {
- return Stream.of(Pair.create(group, ackLabels));
- } else {
- return Stream.empty();
- }
- }
-
- /**
- * Stream ack labels of this object.
- *
- * @return a stream of ack labels.
- */
- public Stream streamAckLabels() {
- return ackLabels.stream();
- }
-
-
- /**
- * Get the group name if any exists.
- *
- * @return the group name, or an empty optional.
- */
- public Optional getGroup() {
- return Optional.ofNullable(group);
- }
-
- /**
- * Serialize this object as a JSON object.
- *
- * @return JSON representation of this object.
- */
- public JsonObject toJson() {
- return JsonFactory.newObjectBuilder()
- .set(JsonFields.GROUP, group, field -> !field.getValue().isNull())
- .set(JsonFields.ACK_LABELS, ackLabels.stream()
- .map(JsonFactory::newValue)
- .collect(JsonCollectors.valuesToArray()))
- .build();
- }
-
- /**
- * Serialize this object as a JSON string.
- *
- * @return JSON representation of this object.
- */
- public String toJsonString() {
- return toJson().toString();
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(group, ackLabels);
- }
-
- @Override
- public boolean equals(final Object other) {
- if (other instanceof GroupedAckLabels) {
- final GroupedAckLabels that = (GroupedAckLabels) other;
- return Objects.equals(group, that.group) && Objects.equals(ackLabels, that.ackLabels);
- } else {
- return false;
- }
- }
-
- @Override
- public String toString() {
- return getClass().getSimpleName() + toJsonString();
- }
-
- private static List deserializeAckGroups(
- final scala.collection.immutable.Set bindingValues) {
- return CollectionConverters.asJava(bindingValues)
- .stream()
- .map(GroupedAckLabels::fromJsonString)
- .collect(Collectors.toList());
- }
-
- // JSON field names are 1-character long to conserve space in the distributed data.
- private static final class JsonFields {
-
- private static final JsonFieldDefinition GROUP = JsonFactory.newStringFieldDefinition("g");
-
- private static final JsonFieldDefinition ACK_LABELS = JsonFactory.newJsonArrayFieldDefinition("a");
-
- }
-}
diff --git a/services/utils/pubsub/src/test/java/org/eclipse/ditto/services/utils/pubsub/actors/AckUpdaterTest.java b/services/utils/pubsub/src/test/java/org/eclipse/ditto/services/utils/pubsub/actors/AckUpdaterTest.java
index f056d06036..b95383559d 100644
--- a/services/utils/pubsub/src/test/java/org/eclipse/ditto/services/utils/pubsub/actors/AckUpdaterTest.java
+++ b/services/utils/pubsub/src/test/java/org/eclipse/ditto/services/utils/pubsub/actors/AckUpdaterTest.java
@@ -138,7 +138,7 @@ public void localDeclarationsWithoutGroupDoNotOverrideEachOther() {
final TestProbe s2 = TestProbe.apply("s2", system1);
final TestProbe s3 = TestProbe.apply("s3", system1);
- // GIVEN: aack labels are declared
+ // GIVEN: ack labels are declared
underTest.tell(DeclareAcks.of(s1.ref(), null, Set.of("a1")), getRef());
expectMsgClass(AcksDeclared.class);