Skip to content

Commit

Permalink
Issue #878: replace GroupedAckLabels by Grouped for use in topics ddata.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Nov 20, 2020
1 parent 43e6203 commit 928792b
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 220 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import javax.annotation.Nullable;

import org.eclipse.ditto.json.JsonValue;
import org.eclipse.ditto.model.base.acks.AcknowledgementLabelNotUniqueException;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
Expand All @@ -39,7 +40,6 @@
import org.eclipse.ditto.services.utils.pubsub.ddata.DData;
import org.eclipse.ditto.services.utils.pubsub.ddata.DDataWriter;
import org.eclipse.ditto.services.utils.pubsub.ddata.ack.Grouped;
import org.eclipse.ditto.services.utils.pubsub.ddata.ack.GroupedAckLabels;
import org.eclipse.ditto.services.utils.pubsub.ddata.ack.GroupedRelation;
import org.eclipse.ditto.services.utils.pubsub.ddata.literal.LiteralUpdate;

Expand Down Expand Up @@ -185,9 +185,9 @@ private void tick(Clock tick) {
}

private void onChanged(final Replicator.Changed<?> event) {
final Map<Address, List<GroupedAckLabels>> mmap =
GroupedAckLabels.deserializeORMultiMap(event.get(ackDData.getReader().getKey()));
final List<GroupedAckLabels> remoteGroupedAckLabels = getRemoteGroupedAckLabelsOrderByAddress(mmap);
final Map<Address, List<Grouped<String>>> mmap =
Grouped.deserializeORMultiMap(event.get(ackDData.getReader().getKey()), JsonValue::asString);
final List<Grouped<String>> remoteGroupedAckLabels = getRemoteGroupedAckLabelsOrderByAddress(mmap);
remoteGroups = getRemoteGroups(remoteGroupedAckLabels);
remoteAckLabels = getRemoteAckLabels(remoteGroupedAckLabels);

Expand All @@ -200,8 +200,8 @@ private void onChanged(final Replicator.Changed<?> event) {
ddataChangeRecipients.forEach(recipient -> recipient.tell(ddataChanged, getSelf()));
}

private List<GroupedAckLabels> getRemoteGroupedAckLabelsOrderByAddress(
final Map<Address, List<GroupedAckLabels>> mmap) {
private List<Grouped<String>> getRemoteGroupedAckLabelsOrderByAddress(
final Map<Address, List<Grouped<String>>> mmap) {

return mmap.entrySet()
.stream()
Expand All @@ -216,18 +216,18 @@ private boolean isNotOwnAddress(final Map.Entry<Address, ?> entry) {
return !ownAddress.equals(entry.getKey());
}

private Map<String, Set<String>> getRemoteGroups(final List<GroupedAckLabels> remoteGroupedAckLabels) {
private Map<String, Set<String>> getRemoteGroups(final List<Grouped<String>> remoteGroupedAckLabels) {
final Map<String, Set<String>> result = new HashMap<>();
remoteGroupedAckLabels.stream()
.flatMap(GroupedAckLabels::streamAsGroupedPair)
.flatMap(Grouped<String>::streamAsGroupedPair)
// do not set a group of ack labels if already set by a member of smaller address
.forEach(pair -> result.computeIfAbsent(pair.first(), group -> pair.second()));
return Collections.unmodifiableMap(result);
}

private Set<String> getRemoteAckLabels(final List<GroupedAckLabels> remoteGroupedAckLabels) {
private Set<String> getRemoteAckLabels(final List<Grouped<String>> remoteGroupedAckLabels) {
return remoteGroupedAckLabels.stream()
.flatMap(GroupedAckLabels::streamAckLabels)
.flatMap(Grouped<String>::streamValues)
.collect(Collectors.toSet());
}

Expand Down Expand Up @@ -263,8 +263,7 @@ private void writeLocalDData() {

private LiteralUpdate createDDataUpdate() {
final Set<String> groupedAckLabels = localAckLabels.streamGroupedValues()
.map(GroupedAckLabels::fromGrouped)
.map(GroupedAckLabels::toJsonString)
.map(Grouped::toJsonString)
.collect(Collectors.toSet());
return LiteralUpdate.replaceAll(groupedAckLabels);
}
Expand All @@ -275,12 +274,12 @@ private void failSubscribe(final ActorRef sender) {
sender.tell(error, getSelf());
}

private List<ActorRef> getLocalLosers(final Map<Address, List<GroupedAckLabels>> mmap) {
final Map<Address, List<GroupedAckLabels>> moreImportantEntries = mmap.entrySet()
private List<ActorRef> getLocalLosers(final Map<Address, List<Grouped<String>>> mmap) {
final Map<Address, List<Grouped<String>>> moreImportantEntries = mmap.entrySet()
.stream()
.filter(entry -> Address.addressOrdering().compare(entry.getKey(), ownAddress) < 0)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
final List<GroupedAckLabels> moreImportantGroupedAckLabels =
final List<Grouped<String>> moreImportantGroupedAckLabels =
getRemoteGroupedAckLabelsOrderByAddress(moreImportantEntries);
final Map<String, Set<String>> moreImportantRemoteGroups = getRemoteGroups(moreImportantGroupedAckLabels);
final Set<String> moreImportantAckLabels = getRemoteAckLabels(moreImportantGroupedAckLabels);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import org.eclipse.ditto.services.utils.pubsub.ddata.ack.GroupedAckLabels;
import org.eclipse.ditto.services.utils.pubsub.ddata.ack.Grouped;

import akka.actor.Address;
import akka.japi.Pair;
Expand All @@ -28,9 +28,9 @@
public final class RemoteAcksChanged {

// TODO: store in format for fast lookups by Publisher
private final Map<Address, List<GroupedAckLabels>> mmap;
private final Map<Address, List<Grouped<String>>> mmap;

private RemoteAcksChanged(final Map<Address, List<GroupedAckLabels>> mmap) {
private RemoteAcksChanged(final Map<Address, List<Grouped<String>>> mmap) {
this.mmap = mmap;
}

Expand All @@ -40,7 +40,7 @@ private RemoteAcksChanged(final Map<Address, List<GroupedAckLabels>> mmap) {
* @param mmap the multimap.
* @return the change notification.
*/
public static RemoteAcksChanged of(final Map<Address, List<GroupedAckLabels>> mmap) {
public static RemoteAcksChanged of(final Map<Address, List<Grouped<String>>> mmap) {
return new RemoteAcksChanged(mmap);
}

Expand All @@ -51,7 +51,7 @@ public Map<Address, Set<String>> getMultiMap() {
.map(entry -> Pair.create(entry.getKey(),
entry.getValue()
.stream()
.flatMap(GroupedAckLabels::streamAckLabels)
.flatMap(Grouped<String>::streamValues)
.collect(Collectors.toSet())
))
.collect(Collectors.toMap(Pair::first, Pair::second));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,38 @@

import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import javax.annotation.Nullable;

import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonCollectors;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonFieldDefinition;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonValue;

import akka.cluster.ddata.ORMultiMap;
import akka.japi.Pair;
import scala.jdk.javaapi.CollectionConverters;

/**
* Model of a set of values with an optional group.
*
* @param <T> type of the (optionally grouped) values.
*/
public final class Grouped<T> {

@Nullable private final String group;
private final Set<T> values;
@Nullable final String group;
final Set<T> values;

private Grouped(@Nullable final String group, final Set<T> values) {
Grouped(@Nullable final String group, final Set<T> values) {
this.group = group;
this.values = checkNotNull(values, "values");
}
Expand Down Expand Up @@ -74,4 +90,105 @@ public Optional<String> getGroup() {
public Set<T> getValues() {
return values;
}

/**
* Return a stream containing a pair with guaranteed group name and values if this object has a group name,
* or an empty stream otherwise.
*
* @return the stream.
*/
public Stream<Pair<String, Set<T>>> streamAsGroupedPair() {
if (group != null) {
return Stream.of(Pair.create(group, values));
} else {
return Stream.empty();
}
}

/**
* Stream values of this object.
*
* @return a stream of values.
*/
public Stream<T> streamValues() {
return values.stream();
}

/**
* Serialize this object as a JSON object.
*
* @return JSON representation of this object.
*/
public JsonObject toJson() {
return JsonFactory.newObjectBuilder()
.set(JsonFields.GROUP, group, field -> !field.getValue().isNull())
.set(JsonFields.ACK_LABELS, values.stream()
.map(JsonValue::of)
.collect(JsonCollectors.valuesToArray()))
.build();
}

/**
* Serialize this object as a JSON string.
*
* @return JSON representation of this object.
*/
public String toJsonString() {
return toJson().toString();
}

/**
* Deserialize grouped values from a JSON object.
*
* @param jsonObject the JSON object.
* @param deserializeValue deserializer of values.
* @param <T> type of values.
* @return grouped values.
*/
public static <T> Grouped<T> fromJson(final JsonObject jsonObject, final Function<JsonValue, T> deserializeValue) {
final String group = jsonObject.getValue(JsonFields.GROUP).orElse(null);
final Set<T> ackLabels = jsonObject.getValue(JsonFields.ACK_LABELS)
.stream()
.flatMap(JsonArray::stream)
.map(deserializeValue)
.collect(Collectors.toSet());
return new Grouped<>(group, ackLabels);
}

/**
* Deserialize string values of an {@code ORMultiMap} as grouped acknowledgement labels in JSON format.
*
* @param orMultiMap the ORMultiMap.
* @param valueDeserializer deserializer of values.
* @param <K> the type of keys in the ORMultiMap.
* @param <T> the type of values.
* @return a multi-map from keys to grouped ack labels deserialized from each binding.
*/
public static <K, T> Map<K, List<Grouped<T>>> deserializeORMultiMap(final ORMultiMap<K, String> orMultiMap,
final Function<JsonValue, T> valueDeserializer) {
final Map<K, scala.collection.immutable.Set<String>> entries =
CollectionConverters.asJava(orMultiMap.entries());
return entries.entrySet()
.stream()
.map(entry -> Pair.create(entry.getKey(), deserializeGroupedList(entry.getValue(), valueDeserializer)))
.collect(Collectors.toMap(Pair::first, Pair::second));
}

private static <T> List<Grouped<T>> deserializeGroupedList(
final scala.collection.immutable.Set<String> bindingValues,
final Function<JsonValue, T> valueDeserializer) {
return CollectionConverters.asJava(bindingValues)
.stream()
.map(s -> Grouped.fromJson(JsonObject.of(s), valueDeserializer))
.collect(Collectors.toList());
}

// JSON field names are 1-character long to conserve space in the distributed data.
private static final class JsonFields {

private static final JsonFieldDefinition<String> GROUP = JsonFactory.newStringFieldDefinition("g");

private static final JsonFieldDefinition<JsonArray> ACK_LABELS = JsonFactory.newJsonArrayFieldDefinition("a");

}
}
Loading

0 comments on commit 928792b

Please sign in to comment.