Skip to content

Commit

Permalink
SubUpdater: Remove "localSubscriptionsChanged" flag; always reset on …
Browse files Browse the repository at this point in the history
…empty previous update.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Aug 3, 2022
1 parent 4b3b351 commit e17ff19
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 35 deletions.
Expand Up @@ -12,7 +12,6 @@
*/
package org.eclipse.ditto.internal.utils.pubsub.actors;

import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -95,10 +94,6 @@ public final class SubUpdater extends akka.actor.AbstractActorWithTimers
*/
private final Replicator.WriteConsistency writeConsistency;

/**
* Whether local subscriptions changed.
*/
private boolean localSubscriptionsChanged = false;
private int seqNr = 0;
private LiteralUpdate previousUpdate = LiteralUpdate.empty();
private int errorCounter = 0;
Expand Down Expand Up @@ -160,15 +155,15 @@ private void subscribe(final Subscribe subscribe) {
subscriptions.subscribe(subscribe.getSubscriber(), subscribe.getTopics(), subscribe.getFilter(),
subscribe.getGroup().orElse(null));
final var consistent = checkForLostSubscriber(subscribe, changed);
enqueueRequest(subscribe, changed, getSender(), awaitUpdate, awaitUpdateMetric, consistent);
enqueueRequest(subscribe, getSender(), awaitUpdate, awaitUpdateMetric, consistent);
if (changed) {
getContext().watch(subscribe.getSubscriber());
}
}

private void unsubscribe(final Unsubscribe unsubscribe) {
final boolean changed = subscriptions.unsubscribe(unsubscribe.getSubscriber(), unsubscribe.getTopics());
enqueueRequest(unsubscribe, changed, getSender(), awaitUpdate, awaitUpdateMetric, true);
enqueueRequest(unsubscribe, getSender(), awaitUpdate, awaitUpdateMetric, true);
if (changed && !subscriptions.contains(unsubscribe.getSubscriber())) {
getContext().unwatch(unsubscribe.getSubscriber());
}
Expand All @@ -191,16 +186,10 @@ private void ddataOpSuccess(final DDataOpSuccess<SubscriptionsReader> opSuccess)
// could solve it by having pubSubSubscriber forward acknowledgements. probably not worth it.
subscriber.tell(opSuccess.payload, getSelf());
flushSubAcks(opSuccess.seqNr);

// reset changed flags if there are no more pending changes
if (awaitSubAck.isEmpty() && awaitUpdate.isEmpty()) {
localSubscriptionsChanged = false;
}
}

private void tick(final Clock tick) {
performDDataOp(localSubscriptionsChanged, writeConsistency)
.handle(handleDDataWriteResult(getSeqNr()));
performDDataOp(writeConsistency).handle(handleDDataWriteResult(getSeqNr()));
moveAwaitUpdateToAwaitAcknowledge();
}

Expand All @@ -210,19 +199,18 @@ private void flushSubAcks(final int seqNr) {
}
}

private CompletionStage<SubscriptionsReader> performDDataOp(final boolean localSubscriptionsChanged,
final Replicator.WriteConsistency writeConsistency) {
private CompletionStage<SubscriptionsReader> performDDataOp(final Replicator.WriteConsistency writeConsistency) {
final SubscriptionsReader snapshot;
final CompletionStage<Void> ddataOp;
log().debug("Tick seq=<{}> changed=<{}> empty=<{}> writeConsistency=<{}>", seqNr, localSubscriptionsChanged,
subscriptions.isEmpty(), writeConsistency);
if (resetProbability > 0 && Math.random() < resetProbability) {
log().debug("Tick seq=<{}> empty=<{}> writeConsistency=<{}>", seqNr, subscriptions.isEmpty(), writeConsistency);
final boolean isReset = previousUpdate.isEmpty() && !subscriptions.isEmpty();
if (isReset || resetProbability > 0 && Math.random() < resetProbability) {
log().debug("Resetting ddata topics: <{}>", getSelf());
ddataOp = ddata.getWriter().reset(subscriber, subscriptions.export(), writeConsistency);
final var nextUpdate = subscriptions.export();
ddataOp = ddata.getWriter().reset(subscriber, nextUpdate, writeConsistency);
snapshot = subscriptions.snapshot();
// TODO client subscription delay
// } else if (!localSubscriptionsChanged) {
// ddataOp = CompletableFuture.completedStage(null);
previousUpdate = nextUpdate;
topicSizeMetric.set(subscriptions.estimateSize());
} else if (subscriptions.isEmpty()) {
ddataOp = ddata.getWriter().removeSubscriber(subscriber, writeConsistency);
snapshot = subscriptions.snapshot();
Expand Down Expand Up @@ -258,23 +246,20 @@ private void updateFailure(final Status.Failure failure) {
} else {
log.warning("Failure updating Ditto pub/sub subscription - trying again next clock tick");
}
// try again next clock tick
localSubscriptionsChanged = true;
previousUpdate = LiteralUpdate.empty();
}

/**
* Add a request to the queue to be handled after cluster update.
*
* @param request the request.
* @param changed whether the request changed ddata.
* @param sender sender of the request.
* @param queue the queue to enqueue the request.
* @param queueSizeMetric the metrics for the queue size.
* @param consistent whether the consistency check of a resubscription succeeded.
*/
private void enqueueRequest(final Request request, final boolean changed, final ActorRef sender,
private void enqueueRequest(final Request request, final ActorRef sender,
final Collection<SubAck> queue, final Gauge queueSizeMetric, final boolean consistent) {
localSubscriptionsChanged |= changed;
if (request.shouldAcknowledge()) {
final SubAck subAck = SubAck.of(request, sender, ++seqNr, consistent);
queue.add(subAck);
Expand Down Expand Up @@ -356,7 +341,7 @@ private void removeSubscriber(final RemoveSubscriber request) {
}

private void doRemoveSubscriber(final ActorRef subscriber) {
localSubscriptionsChanged |= subscriptions.removeSubscriber(subscriber);
subscriptions.removeSubscriber(subscriber);
getContext().unwatch(subscriber);
}

Expand Down Expand Up @@ -403,7 +388,6 @@ public ThreadSafeDittoLoggingAdapter log() {
public void verifyNoDDataForCurrentMember() {
if (!subscriptions.isEmpty()) {
previousUpdate = LiteralUpdate.empty();
localSubscriptionsChanged = true;
}
// Do nothing for empty subscriptions: No data is expected for the current member.
}
Expand Down Expand Up @@ -431,8 +415,4 @@ private DDataOpSuccess(final P payload, final int seqNr) {
this.seqNr = seqNr;
}
}

private static Replicator.WriteConsistency defaultWriteConsistency() {
return new Replicator.WriteAll(Duration.ofSeconds(25L));
}
}
Expand Up @@ -119,7 +119,7 @@ public void clusterStateInSync() throws Exception {
Mockito.verify(ddata.getReader(), Mockito.timeout(5000))
.getAllShards(eq((Replicator.ReadConsistency) Replicator.readLocal()));
Thread.sleep(3000);
Mockito.verify(ddata.getWriter(), Mockito.times(1)).put(any(), any(), any());
Mockito.verify(ddata.getWriter(), Mockito.times(1)).reset(any(), any(), any());
Mockito.verify(ddata.getWriter(), Mockito.never()).removeAddress(any(), any());
}};
}
Expand Down Expand Up @@ -196,6 +196,7 @@ private CompressedDData mockDistributedData(final Map<ActorRef, String> result)
Mockito.when(reader.get(any(), any())).thenReturn(CompletableFuture.completedStage(Optional.of(map)));
Mockito.when(reader.getAllShards(any())).thenReturn(CompletableFuture.completedStage(List.of(map)));
Mockito.when(writer.put(any(), any(), any())).thenReturn(CompletableFuture.completedStage(null));
Mockito.when(writer.reset(any(), any(), any())).thenReturn(CompletableFuture.completedStage(null));
return mock;
}

Expand Down

0 comments on commit e17ff19

Please sign in to comment.