Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
add specialized distributed data for blocked namespaces
Signed-off-by: Cai Yufei (INST/ECS1) <yufei.cai@bosch-si.com>
  • Loading branch information
yufei-cai committed Oct 17, 2018
1 parent f544074 commit e3f142f
Show file tree
Hide file tree
Showing 8 changed files with 386 additions and 63 deletions.
6 changes: 6 additions & 0 deletions services/utils/ddata/pom.xml
Expand Up @@ -37,6 +37,12 @@
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-distributed-data_${scala.version}</artifactId>
</dependency>

<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_${scala.version}</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
@@ -0,0 +1,97 @@
/*
* Copyright (c) 2017-2018 Bosch Software Innovations GmbH.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/org/documents/epl-2.0/index.php
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.utils.ddata;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

import akka.actor.ActorSystem;
import akka.cluster.Cluster;
import akka.cluster.ddata.Key;
import akka.cluster.ddata.ORSet;
import akka.cluster.ddata.ORSetKey;
import akka.cluster.ddata.Replicator;
import scala.concurrent.duration.FiniteDuration;

/**
* Distributed data for blocking of messages addressed entities in certain namespaces.
*/
public final class BlockedNamespaces extends DistributedData<ORSet<String>> {

/**
* Key of the distributed data. Should be unique among ORSets.
*/
public static Key<ORSet<String>> KEY = ORSetKey.create("BlockedNamespaces");

private Cluster node;

private BlockedNamespaces(final DDataConfigReader configReader, final ActorSystem system) {
super(configReader, system);
node = Cluster.get(system);
}

/**
* Create an instance of this distributed data.
*
* @param configReader the configuration.
* @param system the actor system where the replicator actor will be created.
* @return a new instance of the distributed data.
*/
public static BlockedNamespaces of(final DDataConfigReader configReader, final ActorSystem system) {
return new BlockedNamespaces(configReader, system);
}

/**
* Test whether a namespace is stored in the local replica with the configured READ timeout.
*
* @param namespace the namespace.
* @return whether the local replica is retrieved successfully and contains the namespace.
*/
public CompletionStage<Boolean> contains(final String namespace) {
return get(Replicator.readLocal())
.thenApply(maybeORSet -> maybeORSet.orElse(ORSet.empty()).contains(namespace))
.exceptionally(error -> false);
}

/**
* Write a namespace to ALL replicas with the configured WRITE timeout.
*
* @param namespace the namespace.
* @return future that completes after the update propagates to all replicas, exceptionally if there is any error.
*/
public CompletionStage<Void> add(final String namespace) {
return update(writeAll(), orSet -> orSet.add(node, namespace));
}

/**
* Remove a namespace from ALL replicas with the configured WRITE timeout.
*
* @param namespace the namespace to remove.
* @return future that completes after the removal propagates to all replicas, exceptionally if there is any error.
*/
public CompletionStage<Void> remove(final String namespace) {
return update(writeAll(), orSet -> orSet.remove(node, namespace));
}

@Override
protected Key<ORSet<String>> key() {
return KEY;
}

@Override
protected ORSet<String> initialValue() {
return ORSet.empty();
}

private Replicator.WriteConsistency writeAll() {
return new Replicator.WriteAll(FiniteDuration.apply(writeTimeout.toMillis(), TimeUnit.MILLISECONDS));
}
}
Expand Up @@ -40,7 +40,9 @@ public final class DDataConfigReader extends AbstractConfigReader {
*/
private static final String DITTO_CONFIG_PREFIX = "ditto-replicator-facade.";

private static final String ASK_TIMEOUT_KEY = DITTO_CONFIG_PREFIX + "ask-timeout";
private static final String READ_TIMEOUT_KEY = DITTO_CONFIG_PREFIX + "read-timeout";

private static final String WRITE_TIMEOUT_KEY = DITTO_CONFIG_PREFIX + "write-timeout";

private static final Duration DEFAULT_ASK_TIMEOUT = Duration.ofSeconds(5L);

Expand All @@ -55,6 +57,11 @@ public static DDataConfigReader of(final ActorSystem system) {
return new DDataConfigReader(system.settings().config().getConfig(FALLBACK_CONFIG_PATH));
}

/**
* Convert this object into replicator settings.
*
* @return replicator settings.
*/
public ReplicatorSettings toReplicatorSettings() {
return ReplicatorSettings.apply(config);
}
Expand Down Expand Up @@ -94,21 +101,37 @@ public DDataConfigReader withRole(final String role) {
}

/**
* @return timeout of messages to the replicator.
* @return timeout of reads.
*/
public Duration readTimeout() {
return getIfPresent(READ_TIMEOUT_KEY, config::getDuration).orElse(DEFAULT_ASK_TIMEOUT);
}

/**
* Set the timeout of GET-messages to the replicator.
*
* @param askTimeout the new timeout of messages to the replicator.
* @return a copy of this object with a new timeout of messages to the replicator.
*/
public Duration askTimeout() {
return getIfPresent(ASK_TIMEOUT_KEY, config::getDuration).orElse(DEFAULT_ASK_TIMEOUT);
public DDataConfigReader withReadTimeout(final Duration askTimeout) {
return with(READ_TIMEOUT_KEY, askTimeout);
}

/**
* @return timeout of writes.
*/
public Duration writeTimeout() {
return getIfPresent(WRITE_TIMEOUT_KEY, config::getDuration).orElse(DEFAULT_ASK_TIMEOUT);
}

/**
* Set the timeout of messages to the replicator.
* Set the timeout of UPDATE-messages to the replicator.
*
* @param askTimeout the new timeout of messages to the replicator.
* @return a copy of this object with a new timeout of messages to the replicator.
*/
public DDataConfigReader withAskTimeout(final Duration askTimeout) {
return with(ASK_TIMEOUT_KEY, askTimeout);
public DDataConfigReader withWriteTimeout(final Duration askTimeout) {
return with(WRITE_TIMEOUT_KEY, askTimeout);
}

private DDataConfigReader with(final String configKey, final Object configValue) {
Expand Down
@@ -0,0 +1,153 @@
/*
* Copyright (c) 2017-2018 Bosch Software Innovations GmbH.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v2.0
* which accompanies this distribution, and is available at
* https://www.eclipse.org/org/documents/epl-2.0/index.php
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.utils.ddata;

import java.text.MessageFormat;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.function.Function;

import akka.actor.ActorRef;
import akka.actor.ActorRefFactory;
import akka.cluster.ddata.Key;
import akka.cluster.ddata.ReplicatedData;
import akka.cluster.ddata.Replicator;
import akka.pattern.PatternsCS;
import akka.util.Timeout;
import scala.concurrent.duration.FiniteDuration;

/**
* Supertype of typed interfaces for distributed data. Each instance corresponds to one distributed data object.
* Each instance starts its own replicator so that it has its own configuration.
*
* @param <R> type of replicated data.
*/
public abstract class DistributedData<R extends ReplicatedData> {

/**
* Default timeout of read operations.
*/
protected final Duration readTimeout;

/**
* Default timeout of write operations.
*/
protected final Duration writeTimeout;

/**
* Reference of the distributed data replicator.
*/
protected final ActorRef replicator;

/**
* Create a wrapper of distributed data replicator.
*
* @param configReader specific config for this replicator.
* @param factory creator of this replicator.
*/
protected DistributedData(final DDataConfigReader configReader, final ActorRefFactory factory) {
replicator = createReplicator(configReader, factory);
readTimeout = configReader.readTimeout();
writeTimeout = configReader.writeTimeout();
}

/**
* @return key of the distributed collection. Should be unique among collections of the same type.
*/
protected abstract Key<R> key();

/**
* @return initial value of the distributed data.
*/
protected abstract R initialValue();

/**
* Retrieve the replicated data.
*
* @param readConsistency how many replicas to consult.
* @return future of the replicated data that completes exceptionally on error.
*/
public CompletionStage<Optional<R>> get(final Replicator.ReadConsistency readConsistency) {
final Replicator.Get<R> replicatorGet = new Replicator.Get<>(key(), readConsistency);
return PatternsCS.ask(replicator, replicatorGet, getAskTimeout(readConsistency.timeout(), readTimeout))
.thenApply(this::handleGetResponse);
}

/**
* Modify the replicated data.
*
* @param writeConsistency how many replicas to update.
* @param updateFunction what to do to the replicas.
* @return future that completes when the update completes, exceptionally when any error is encountered.
*/
public CompletionStage<Void> update(final Replicator.WriteConsistency writeConsistency,
final Function<R, R> updateFunction) {

final Replicator.Update<R> replicatorUpdate =
new Replicator.Update<>(key(), initialValue(), writeConsistency, updateFunction);
return PatternsCS.ask(replicator, replicatorUpdate, getAskTimeout(writeConsistency.timeout(), writeTimeout))
.thenApply(this::handleUpdateResponse);
}

private Void handleUpdateResponse(final Object reply) {
if (reply instanceof Replicator.UpdateSuccess) {
return null;
} else {
final String errorMessage =
MessageFormat.format("Expect Replicator.UpdateSuccess for key ''{2}'' from ''{1}'', Got: ''{0}''",
reply, replicator, key());
throw new IllegalArgumentException(errorMessage);
}
}

@SuppressWarnings("unchecked")
private Optional<R> handleGetResponse(final Object reply) {
if (reply instanceof Replicator.GetSuccess) {
final Replicator.GetSuccess<R> getSuccess = (Replicator.GetSuccess<R>) reply;
return Optional.of(getSuccess.dataValue());
} else if (reply instanceof Replicator.NotFound) {
return Optional.empty();
} else {
final String errorMessage =
MessageFormat.format("Expect Replicator.GetResponse for key ''{2}'' from ''{1}'', Got: ''{0}''",
reply, replicator, key());
throw new IllegalArgumentException(errorMessage);
}
}

/**
* Compute timeout from a given read/write consistency and a default. If the timeout from the read/write
* consistency is positive, then it is taken. Otherwise the default timeout is taken.
*
* @param givenTimeout timeout from a read/write consistency.
* @param defaultTimeout default timeout.
* @return the timeout.
*/
private static Timeout getAskTimeout(final FiniteDuration givenTimeout, final Duration defaultTimeout) {
if (givenTimeout.gt(FiniteDuration.Zero())) {
return Timeout.durationToTimeout(givenTimeout);
} else {
return Timeout.create(defaultTimeout);
}
}

/**
* Create a distributed data replicator in an actor system.
*
* @param configReader distributed data configuration reader.
* @param factory creator of this replicator.
* @return reference to the created replicator.
*/
private static ActorRef createReplicator(final DDataConfigReader configReader, final ActorRefFactory factory) {
return factory.actorOf(Replicator.props(configReader.toReplicatorSettings()), configReader.name());
}
}

This file was deleted.

0 comments on commit e3f142f

Please sign in to comment.