Skip to content

Commit

Permalink
Remove write consistency from PubSub requests; read write consistency…
Browse files Browse the repository at this point in the history
… from ddata config.

Reason: Write consistency was not guaranteed in the presence of parallel requests.

Example:

Start with state S0.

Request R1 changes state to S1 with local write concern.

Local write succeeds.

Request R2 changes state to S1 from S0, but with write concern ALL.

Request R2 is considered NOOP, no cluster write is initiated.

Request R2 is acknowledged immediately before cluster write happens.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Jul 26, 2022
1 parent a6b0ed9 commit 33c60ee
Show file tree
Hide file tree
Showing 13 changed files with 57 additions and 93 deletions.
Expand Up @@ -24,7 +24,7 @@
/**
* This class is the default implementation of the distributed data config.
*/
class DefaultDistributedDataConfig implements DistributedDataConfig {
final class DefaultDistributedDataConfig implements DistributedDataConfig {

private static final String CONFIG_PATH = "ddata";

Expand Down
Expand Up @@ -65,6 +65,8 @@ public abstract class DistributedData<R extends ReplicatedData> implements Exten

private final Executor ddataExecutor;

private final DistributedDataConfig config;

/**
* Create a wrapper of distributed data replicator.
*
Expand All @@ -80,6 +82,7 @@ protected DistributedData(final DistributedDataConfig config, final ActorRefFact
readTimeout = config.getReadTimeout();
writeTimeout = config.getWriteTimeout();
numberOfShards = config.getNumberOfShards();
this.config = config;
}

/**
Expand Down Expand Up @@ -171,6 +174,16 @@ public ActorRef getReplicator() {
return replicator;
}

/**
* Get the config of this distributed data.
*
* @return The config.
* @since 3.0.0
*/
public DistributedDataConfig getConfig() {
return config;
}

private Void handleUpdateResponse(final Object reply, final Key<R> key) {
if (reply instanceof Replicator.UpdateSuccess) {
return null;
Expand Down
Expand Up @@ -61,7 +61,7 @@ public CompletionStage<SubAck> subscribeWithFilterAndGroup(final Collection<Stri
if (group != null) {
checkNotEmpty(group, "group");
}
final Subscribe subscribe = Subscribe.of(topics, subscriber, writeConsistency, true, filter, group, resubscribe);
final Subscribe subscribe = Subscribe.of(topics, subscriber, true, filter, group, resubscribe);
final CompletionStage<SubAck> subAckFuture = askSubSupervisor(subscribe);

if (ddataDelayInMillis <= 0) {
Expand All @@ -79,7 +79,7 @@ public CompletionStage<SubAck> subscribeWithFilterAndGroup(final Collection<Stri
@Override
public CompletionStage<SubAck> unsubscribeWithAck(final Collection<String> topics,
final ActorRef subscriber) {
return askSubSupervisor(Unsubscribe.of(topics, subscriber, writeConsistency, true));
return askSubSupervisor(Unsubscribe.of(topics, subscriber, true));
}

private CompletionStage<SubAck> askSubSupervisor(final Request request) {
Expand All @@ -89,24 +89,19 @@ private CompletionStage<SubAck> askSubSupervisor(final Request request) {

@Override
public void subscribeWithoutAck(final Collection<String> topics, final ActorRef subscriber) {
final Request request =
Subscribe.of(topics, subscriber,
(Replicator.WriteConsistency) Replicator.writeLocal(), false, null);
final Request request = Subscribe.of(topics, subscriber, false, null);
subSupervisor.tell(request, subscriber);
}

@Override
public void unsubscribeWithoutAck(final Collection<String> topics, final ActorRef subscriber) {
final Request request =
Unsubscribe.of(topics, subscriber,
(Replicator.WriteConsistency) Replicator.writeLocal(), false);
final Request request = Unsubscribe.of(topics, subscriber, false);
subSupervisor.tell(request, subscriber);
}

@Override
public void removeSubscriber(final ActorRef subscriber) {
final Request request =
RemoveSubscriber.of(subscriber, (Replicator.WriteConsistency) Replicator.writeLocal(), false);
final Request request = RemoveSubscriber.of(subscriber, false);
subSupervisor.tell(request, subscriber);
}

Expand Down
Expand Up @@ -93,7 +93,7 @@ public final class SubUpdater extends akka.actor.AbstractActorWithTimers
/**
* Write consistency of the next message to the replicator.
*/
private Replicator.WriteConsistency nextWriteConsistency = defaultWriteConsistency();
private final Replicator.WriteConsistency writeConsistency;

/**
* Whether local subscriptions changed.
Expand All @@ -113,6 +113,7 @@ private SubUpdater(final PubSubConfig config,
this.ddata = ddata;
cluster = Cluster.get(getContext().getSystem());
resetProbability = config.getResetProbability();
writeConsistency = ddata.getConfig().getSubscriptionWriteConsistency();

// tag metrics by parent name + this name prefix
// so that the tag is finite and distinct between twin and live topics and declared ack labels.
Expand Down Expand Up @@ -154,25 +155,6 @@ public Receive createReceive() {
.orElse(ReceiveBuilder.create().matchAny(this::logUnhandled).build());
}

private boolean isMoreConsistent(final Replicator.WriteConsistency a, final Replicator.WriteConsistency b) {
return rank(a) > rank(b);
}

// roughly rank write consistency from the most local to the most global.
private int rank(final Replicator.WriteConsistency a) {
if (writeLocal().equals(a)) {
return Integer.MIN_VALUE;
} else if (a instanceof Replicator.WriteAll) {
return Integer.MAX_VALUE;
} else if (a instanceof Replicator.WriteMajority) {
return ((Replicator.WriteMajority) a).minCap();
} else if (a instanceof Replicator.WriteTo) {
return ((Replicator.WriteTo) a).n();
} else {
return 0;
}
}

private void subscribe(final Subscribe subscribe) {
final boolean changed =
subscriptions.subscribe(subscribe.getSubscriber(), subscribe.getTopics(), subscribe.getFilter(),
Expand Down Expand Up @@ -213,12 +195,11 @@ private void ddataOpSuccess(final DDataOpSuccess<SubscriptionsReader> opSuccess)
// reset changed flags if there are no more pending changes
if (awaitSubAck.isEmpty() && awaitUpdate.isEmpty()) {
localSubscriptionsChanged = false;
nextWriteConsistency = writeLocal();
}
}

private void tick(final Clock tick) {
performDDataOp(localSubscriptionsChanged, nextWriteConsistency)
performDDataOp(localSubscriptionsChanged, writeConsistency)
.handle(handleDDataWriteResult(getSeqNr()));
moveAwaitUpdateToAwaitAcknowledge();
}
Expand All @@ -234,7 +215,7 @@ private CompletionStage<SubscriptionsReader> performDDataOp(final boolean localS
final SubscriptionsReader snapshot = subscriptions.snapshot();
final CompletionStage<Void> ddataOp;
log().debug("Tick seq=<{}> changed=<{}> empty=<{}> writeConsistency=<{}>", seqNr, localSubscriptionsChanged,
subscriptions.isEmpty(), nextWriteConsistency);
subscriptions.isEmpty(), writeConsistency);
if (resetProbability > 0 && Math.random() < resetProbability) {
log().debug("Resetting ddata topics: <{}>", getSelf());
ddataOp = ddata.getWriter().reset(subscriber, subscriptions.export(), writeConsistency);
Expand Down Expand Up @@ -290,7 +271,6 @@ private void updateFailure(final Status.Failure failure) {
private void enqueueRequest(final Request request, final boolean changed, final ActorRef sender,
final Collection<SubAck> queue, final Gauge queueSizeMetric, final boolean consistent) {
localSubscriptionsChanged |= changed;
upgradeWriteConsistency(request.getWriteConsistency());
if (request.shouldAcknowledge()) {
final SubAck subAck = SubAck.of(request, sender, ++seqNr, consistent);
queue.add(subAck);
Expand Down Expand Up @@ -391,17 +371,10 @@ private void pubSubTerminated(final ActorEvent terminated) {
subscriptions.clear();
awaitUpdate.clear();
awaitSubAck.clear();
nextWriteConsistency = writeLocal();

// subscriber will recover from this error on its own.
}

private void upgradeWriteConsistency(final Replicator.WriteConsistency nextWriteConsistency) {
if (isMoreConsistent(nextWriteConsistency, this.nextWriteConsistency)) {
this.nextWriteConsistency = nextWriteConsistency;
}
}

@Override
public Cluster getCluster() {
return cluster;
Expand Down
Expand Up @@ -25,17 +25,14 @@ abstract class AbstractRequest implements Request {

private final Set<String> topics;
private final ActorRef subscriber;
private final Replicator.WriteConsistency writeConsistency;
private final boolean acknowledge;

protected AbstractRequest(final Collection<String> topics,
final ActorRef subscriber,
final Replicator.WriteConsistency writeConsistency,
final boolean acknowledge) {

this.topics = Set.copyOf(topics);
this.subscriber = subscriber;
this.writeConsistency = writeConsistency;
this.acknowledge = acknowledge;
}

Expand All @@ -53,13 +50,6 @@ public ActorRef getSubscriber() {
return subscriber;
}

/**
* @return write consistency for the request.
*/
public Replicator.WriteConsistency getWriteConsistency() {
return writeConsistency;
}

/**
* @return whether acknowledgement is expected.
*/
Expand All @@ -72,7 +62,6 @@ public String toString() {
return getClass().getSimpleName() + " [" +
"topics=" + topics +
", subscriber=" + subscriber +
", writeConsistency=" + writeConsistency +
", acknowledge=" + acknowledge +
"]";
}
Expand Down
Expand Up @@ -15,31 +15,25 @@
import java.util.Collections;

import akka.actor.ActorRef;
import akka.cluster.ddata.Replicator;

/**
* Request to remove a subscriber.
*/
public final class RemoveSubscriber extends AbstractRequest {

private RemoveSubscriber(final ActorRef subscriber,
final Replicator.WriteConsistency writeConsistency,
final boolean acknowledge) {
super(Collections.emptySet(), subscriber, writeConsistency, acknowledge);
private RemoveSubscriber(final ActorRef subscriber, final boolean acknowledge) {
super(Collections.emptySet(), subscriber, acknowledge);
}

/**
* Create an "unsubscribe" request.
*
* @param subscriber who is subscribing.
* @param writeConsistency with which write consistency should this subscription be updated.
* @param acknowledge whether acknowledgement is desired.
* @return the request.
*/
public static RemoveSubscriber of(final ActorRef subscriber,
final Replicator.WriteConsistency writeConsistency,
final boolean acknowledge) {
return new RemoveSubscriber(subscriber, writeConsistency, acknowledge);
public static RemoveSubscriber of(final ActorRef subscriber, final boolean acknowledge) {
return new RemoveSubscriber(subscriber, acknowledge);
}

}
Expand Up @@ -26,11 +26,6 @@ public interface Request {
*/
Set<String> getTopics();

/**
* @return write consistency for the request.
*/
Replicator.WriteConsistency getWriteConsistency();

/**
* @return whether acknowledgement is expected.
*/
Expand Down
Expand Up @@ -19,7 +19,6 @@
import javax.annotation.Nullable;

import akka.actor.ActorRef;
import akka.cluster.ddata.Replicator;

/**
* Request to subscribe to topics.
Expand All @@ -32,12 +31,11 @@ public final class Subscribe extends AbstractRequest {

private Subscribe(final Collection<String> topics,
final ActorRef subscriber,
final Replicator.WriteConsistency writeConsistency,
final boolean acknowledge,
@Nullable final Predicate<Collection<String>> filter,
@Nullable final String group,
final boolean resubscribe) {
super(topics, subscriber, writeConsistency, acknowledge);
super(topics, subscriber, acknowledge);
this.filter = filter;
this.group = group;
this.resubscribe = resubscribe;
Expand All @@ -48,38 +46,34 @@ private Subscribe(final Collection<String> topics,
*
* @param topics the topics to subscribe to.
* @param subscriber who is subscribing.
* @param writeConsistency with which write consistency should this subscription be updated.
* @param acknowledge whether acknowledgement is desired.
* @param group any group the subscriber belongs to, or null.
* @return the request.
*/
public static Subscribe of(final Collection<String> topics,
final ActorRef subscriber,
final Replicator.WriteConsistency writeConsistency,
final boolean acknowledge,
@Nullable final String group) {
return new Subscribe(topics, subscriber, writeConsistency, acknowledge, null, group, false);
return new Subscribe(topics, subscriber, acknowledge, null, group, false);
}

/**
* Create a "subscribe" request.
*
* @param topics the topics to subscribe to.
* @param subscriber who is subscribing.
* @param writeConsistency with which write consistency should this subscription be updated.
* @param acknowledge whether acknowledgement is desired.
* @param filter local filter for incoming messages.
* @param group any group the subscriber belongs to, or null.
* @return the request.
*/
public static Subscribe of(final Collection<String> topics,
final ActorRef subscriber,
final Replicator.WriteConsistency writeConsistency,
final boolean acknowledge,
@Nullable final Predicate<Collection<String>> filter,
@Nullable final String group,
final boolean resubscribe) {
return new Subscribe(topics, subscriber, writeConsistency, acknowledge, filter, group, resubscribe);
return new Subscribe(topics, subscriber, acknowledge, filter, group, resubscribe);
}

/**
Expand Down
Expand Up @@ -15,33 +15,26 @@
import java.util.Collection;

import akka.actor.ActorRef;
import akka.cluster.ddata.Replicator;

/**
* Request to unsubscribe to topics.
*/
public final class Unsubscribe extends AbstractRequest {

private Unsubscribe(final Collection<String> topics,
final ActorRef subscriber,
final Replicator.WriteConsistency writeConsistency,
final boolean acknowledge) {
super(topics, subscriber, writeConsistency, acknowledge);
private Unsubscribe(final Collection<String> topics, final ActorRef subscriber, final boolean acknowledge) {
super(topics, subscriber, acknowledge);
}

/**
* Create an "unsubscribe" request.
*
* @param topics the set of topics to subscribe.
* @param subscriber who is subscribing.
* @param writeConsistency with which write consistency should this subscription be updated.
* @param acknowledge whether acknowledgement is desired.
* @return the request.
*/
public static Unsubscribe of(final Collection<String> topics,
final ActorRef subscriber,
final Replicator.WriteConsistency writeConsistency,
public static Unsubscribe of(final Collection<String> topics, final ActorRef subscriber,
final boolean acknowledge) {
return new Unsubscribe(topics, subscriber, writeConsistency, acknowledge);
return new Unsubscribe(topics, subscriber, acknowledge);
}
}
Expand Up @@ -12,6 +12,8 @@
*/
package org.eclipse.ditto.internal.utils.pubsub.ddata;

import org.eclipse.ditto.internal.utils.ddata.DistributedDataConfig;

/**
* A package of ddata reader, writer, and creator of local subscriptions to plug into a pub-sub framework.
*
Expand All @@ -31,4 +33,10 @@ public interface DData<K, R, W extends DDataUpdate<?>> {
*/
DDataWriter<K, W> getWriter();

/**
* @return the config of the distributed data.
* @since 3.0.0
*/
DistributedDataConfig getConfig();

}

0 comments on commit 33c60ee

Please sign in to comment.