Skip to content

Commit

Permalink
Issue eclipse-ditto#878: Replace AcksUpdater by AckUpdater.
Browse files Browse the repository at this point in the history
- Add serialization and deserialization of grouped acknowledgement
  label declarations.

- Add typed change notifications from AckUpdater.

- Handle remote racing.

Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Nov 19, 2020
1 parent 8e683aa commit bfff23a
Show file tree
Hide file tree
Showing 17 changed files with 475 additions and 533 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
import java.util.Collection;
import java.util.concurrent.CompletionStage;

import javax.annotation.Nullable;

import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.services.utils.pubsub.api.SubAck;
import org.eclipse.ditto.services.utils.pubsub.actors.AckUpdater;

import akka.actor.ActorContext;
import akka.actor.ActorRef;
Expand Down Expand Up @@ -55,14 +57,32 @@ public interface DistributedAcks {
* Each subscriber's declared acknowledgment labels must be different from the labels declared by other subscribers.
* Subscribers relinquish their declared labels when they terminate.
*
* @param group the group in which the actor belongs.
* @param acknowledgementLabels the acknowledgement labels to declare.
* @param subscriber the subscriber.
* @return a future SubAck if the declaration succeeded, or a failed future if it failed.
*/
CompletionStage<SubAck> declareAcknowledgementLabels(
CompletionStage<AckUpdater.AcksDeclared> declareAcknowledgementLabels(
@Nullable String group,
Collection<AcknowledgementLabel> acknowledgementLabels,
ActorRef subscriber);

/**
* Declare labels of acknowledgements that a subscriber may send.
* Each subscriber's declared acknowledgment labels must be different from the labels declared by other subscribers.
* Subscribers relinquish their declared labels when they terminate.
*
* @param acknowledgementLabels the acknowledgement labels to declare.
* @param subscriber the subscriber.
* @return a future SubAck if the declaration succeeded, or a failed future if it failed.
*/
default CompletionStage<AckUpdater.AcksDeclared> declareAcknowledgementLabels(
Collection<AcknowledgementLabel> acknowledgementLabels,
ActorRef subscriber) {

return declareAcknowledgementLabels(null, acknowledgementLabels, subscriber);
}

/**
* Remove the acknowledgement label declaration of a subscriber.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
import java.util.Collection;
import java.util.concurrent.CompletionStage;

import javax.annotation.Nullable;

import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.services.utils.pubsub.api.SubAck;
import org.eclipse.ditto.services.utils.pubsub.actors.AckUpdater;

import akka.actor.ActorRef;

Expand All @@ -42,7 +44,13 @@ public void removeSubscriber(final ActorRef subscriber) {
}

@Override
public CompletionStage<SubAck> declareAcknowledgementLabels(
public CompletionStage<AckUpdater.AcksDeclared> declareAcknowledgementLabels(@Nullable final String group,
final Collection<AcknowledgementLabel> acknowledgementLabels, final ActorRef subscriber) {
throw new UnsupportedOperationException();
}

@Override
public CompletionStage<AckUpdater.AcksDeclared> declareAcknowledgementLabels(
final Collection<AcknowledgementLabel> acknowledgementLabels, final ActorRef subscriber) {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,18 @@
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;

import javax.annotation.Nullable;

import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.model.base.common.ConditionChecker;
import org.eclipse.ditto.services.utils.ddata.DistributedDataConfig;
import org.eclipse.ditto.services.utils.pubsub.actors.AcksSupervisor;
import org.eclipse.ditto.services.utils.pubsub.actors.AcksUpdater;
import org.eclipse.ditto.services.utils.pubsub.api.RemoveSubscriber;
import org.eclipse.ditto.services.utils.pubsub.api.Request;
import org.eclipse.ditto.services.utils.pubsub.api.SubAck;
import org.eclipse.ditto.services.utils.pubsub.api.Subscribe;
import org.eclipse.ditto.services.utils.pubsub.actors.AckSupervisor;
import org.eclipse.ditto.services.utils.pubsub.actors.AckUpdater;
import org.eclipse.ditto.services.utils.pubsub.ddata.literal.LiteralDData;

import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.cluster.ddata.Replicator;
import akka.pattern.Patterns;

/**
Expand All @@ -42,11 +40,11 @@ final class DistributedAcksImpl implements DistributedAcks {
private static final String CLUSTER_ROLE = "acks-aware";

private final DistributedDataConfig config;
private final ActorRef acksSupervisor;
private final ActorRef ackSupervisor;

private DistributedAcksImpl(final DistributedDataConfig config, final ActorRef acksSupervisor) {
private DistributedAcksImpl(final DistributedDataConfig config, final ActorRef ackSupervisor) {
this.config = config;
this.acksSupervisor = acksSupervisor;
this.ackSupervisor = ackSupervisor;
}

static DistributedAcks create(final ActorContext actorContext) {
Expand All @@ -57,62 +55,67 @@ static DistributedAcks create(final ActorContext actorContext) {
static DistributedAcks create(final ActorContext actorContext,
final String clusterRole,
final LiteralDDataProvider provider) {
final String supervisorName = clusterRole + "-acks-supervisor";
final Props props = AcksSupervisor.props(LiteralDData.of(actorContext.system(), provider));
final String supervisorName = clusterRole + "-ack-supervisor";
final Props props = AckSupervisor.props(LiteralDData.of(actorContext.system(), provider));
final ActorRef supervisor = actorContext.actorOf(props, supervisorName);
final DistributedDataConfig config = provider.getConfig(actorContext.system());
return new DistributedAcksImpl(config, supervisor);
}

private CompletionStage<SubAck> askSubSupervisor(final Request request) {
return Patterns.ask(acksSupervisor, request, config.getWriteTimeout())
private CompletionStage<AckUpdater.AcksDeclared> askSupervisor(final AckUpdater.AckRequest request) {
return Patterns.ask(ackSupervisor, request, config.getWriteTimeout())
.thenCompose(DistributedAcksImpl::processAskResponse);
}

@Override
public void receiveLocalDeclaredAcks(final ActorRef receiver) {
acksSupervisor.tell(AcksUpdater.receiveLocalChanges(receiver), ActorRef.noSender());
ackSupervisor.tell(AckUpdater.ReceiveLocalChanges.of(receiver), ActorRef.noSender());
}

@Override
public void receiveDistributedDeclaredAcks(final ActorRef receiver) {
acksSupervisor.tell(AcksUpdater.receiveDDataChanges(receiver), ActorRef.noSender());
ackSupervisor.tell(AckUpdater.ReceiveDDataChanges.of(receiver), ActorRef.noSender());
}

@Override
public void removeSubscriber(final ActorRef subscriber) {
final Request request =
RemoveSubscriber.of(subscriber, (Replicator.WriteConsistency) Replicator.writeLocal(),
false);
acksSupervisor.tell(request, subscriber);
ackSupervisor.tell(AckUpdater.RemoveSubscriberAcks.of(subscriber), subscriber);
}

@Override
public CompletionStage<SubAck> declareAcknowledgementLabels(
public CompletionStage<AckUpdater.AcksDeclared> declareAcknowledgementLabels(
final Collection<AcknowledgementLabel> acknowledgementLabels,
final ActorRef subscriber) {
final Set<String> ackLabelStrings = acknowledgementLabels.stream()
.map(AcknowledgementLabel::toString)
.collect(Collectors.toSet());
final Subscribe subscribe =
Subscribe.of(ackLabelStrings, subscriber, writeLocal(), true);
return askSubSupervisor(subscribe);
final AckUpdater.AckRequest request =
AckUpdater.DeclareAcks.of(subscriber, null, ackLabelStrings);
return askSupervisor(request);
}

@Override
public void removeAcknowledgementLabelDeclaration(final ActorRef subscriber) {
final RemoveSubscriber request =
RemoveSubscriber.of(subscriber, writeLocal(), false);
acksSupervisor.tell(request, ActorRef.noSender());
public CompletionStage<AckUpdater.AcksDeclared> declareAcknowledgementLabels(
@Nullable final String group,
final Collection<AcknowledgementLabel> acknowledgementLabels,
final ActorRef subscriber) {
if (group != null) {
ConditionChecker.checkNotEmpty(group, "group");
}
final Set<String> ackLabelStrings = acknowledgementLabels.stream()
.map(AcknowledgementLabel::toString)
.collect(Collectors.toSet());
return askSupervisor(AckUpdater.DeclareAcks.of(subscriber, group, ackLabelStrings));
}

private static Replicator.WriteConsistency writeLocal() {
return (Replicator.WriteConsistency) Replicator.writeLocal();
@Override
public void removeAcknowledgementLabelDeclaration(final ActorRef subscriber) {
ackSupervisor.tell(AckUpdater.RemoveSubscriberAcks.of(subscriber), ActorRef.noSender());
}

private static CompletionStage<SubAck> processAskResponse(final Object askResponse) {
if (askResponse instanceof SubAck) {
return CompletableFuture.completedStage((SubAck) askResponse);
private static CompletionStage<AckUpdater.AcksDeclared> processAskResponse(final Object askResponse) {
if (askResponse instanceof AckUpdater.AcksDeclared) {
return CompletableFuture.completedStage((AckUpdater.AcksDeclared) askResponse);
} else if (askResponse instanceof Throwable) {
return CompletableFuture.failedStage((Throwable) askResponse);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ public static Props props(final DData<Address, String, LiteralUpdate> ackDData)
@Override
protected Receive createPubSubBehavior() {
return ReceiveBuilder.create()
.match(AckUpdater.Request.class, this::isAckUpdaterAvailable, this::forwardRequest)
.match(AckUpdater.Request.class, this::ackUpdaterUnavailable)
.match(AckUpdater.AckRequest.class, this::isAckUpdaterAvailable, this::forwardRequest)
.match(AckUpdater.AckRequest.class, this::ackUpdaterUnavailable)
.build();
}

Expand All @@ -68,19 +68,19 @@ protected void onChildFailure() {
@Override
protected void startChildren() {
final Props acksUpdaterProps = AckUpdater.props(config, selfAddress, ackDData);
acksUpdater = startChild(acksUpdaterProps, AcksUpdater.ACTOR_NAME_PREFIX);
acksUpdater = startChild(acksUpdaterProps, AckUpdater.ACTOR_NAME_PREFIX);
}

private boolean isAckUpdaterAvailable() {
return acksUpdater != null;
}

@SuppressWarnings("ConstantConditions")
private void forwardRequest(final AckUpdater.Request request) {
private void forwardRequest(final AckUpdater.AckRequest request) {
acksUpdater.tell(request, getSender());
}

private void ackUpdaterUnavailable(final AckUpdater.Request request) {
private void ackUpdaterUnavailable(final AckUpdater.AckRequest request) {
log.error("AcksUpdater unavailable. Failing <{}>", request);
getSender().tell(new IllegalStateException("AcksUpdater not available"), getSelf());
}
Expand Down
Loading

0 comments on commit bfff23a

Please sign in to comment.