Skip to content

Commit

Permalink
Issue #878: move AckUpdater messages to package /api.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Nov 20, 2020
1 parent bfff23a commit 43e6203
Show file tree
Hide file tree
Showing 13 changed files with 405 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.services.utils.pubsub.actors.AckUpdater;
import org.eclipse.ditto.services.utils.pubsub.api.AcksDeclared;

import akka.actor.ActorContext;
import akka.actor.ActorRef;
Expand Down Expand Up @@ -62,7 +62,7 @@ public interface DistributedAcks {
* @param subscriber the subscriber.
* @return a future SubAck if the declaration succeeded, or a failed future if it failed.
*/
CompletionStage<AckUpdater.AcksDeclared> declareAcknowledgementLabels(
CompletionStage<AcksDeclared> declareAcknowledgementLabels(
@Nullable String group,
Collection<AcknowledgementLabel> acknowledgementLabels,
ActorRef subscriber);
Expand All @@ -76,7 +76,7 @@ CompletionStage<AckUpdater.AcksDeclared> declareAcknowledgementLabels(
* @param subscriber the subscriber.
* @return a future SubAck if the declaration succeeded, or a failed future if it failed.
*/
default CompletionStage<AckUpdater.AcksDeclared> declareAcknowledgementLabels(
default CompletionStage<AcksDeclared> declareAcknowledgementLabels(
Collection<AcknowledgementLabel> acknowledgementLabels,
ActorRef subscriber) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import javax.annotation.Nullable;

import org.eclipse.ditto.model.base.acks.AcknowledgementLabel;
import org.eclipse.ditto.services.utils.pubsub.actors.AckUpdater;
import org.eclipse.ditto.services.utils.pubsub.api.AcksDeclared;

import akka.actor.ActorRef;

Expand All @@ -44,13 +44,13 @@ public void removeSubscriber(final ActorRef subscriber) {
}

@Override
public CompletionStage<AckUpdater.AcksDeclared> declareAcknowledgementLabels(@Nullable final String group,
public CompletionStage<AcksDeclared> declareAcknowledgementLabels(@Nullable final String group,
final Collection<AcknowledgementLabel> acknowledgementLabels, final ActorRef subscriber) {
throw new UnsupportedOperationException();
}

@Override
public CompletionStage<AckUpdater.AcksDeclared> declareAcknowledgementLabels(
public CompletionStage<AcksDeclared> declareAcknowledgementLabels(
final Collection<AcknowledgementLabel> acknowledgementLabels, final ActorRef subscriber) {
throw new UnsupportedOperationException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@
import org.eclipse.ditto.model.base.common.ConditionChecker;
import org.eclipse.ditto.services.utils.ddata.DistributedDataConfig;
import org.eclipse.ditto.services.utils.pubsub.actors.AckSupervisor;
import org.eclipse.ditto.services.utils.pubsub.actors.AckUpdater;
import org.eclipse.ditto.services.utils.pubsub.api.AckRequest;
import org.eclipse.ditto.services.utils.pubsub.api.AcksDeclared;
import org.eclipse.ditto.services.utils.pubsub.api.DeclareAcks;
import org.eclipse.ditto.services.utils.pubsub.api.ReceiveLocalAcks;
import org.eclipse.ditto.services.utils.pubsub.api.ReceiveRemoteAcks;
import org.eclipse.ditto.services.utils.pubsub.api.RemoveSubscriberAcks;
import org.eclipse.ditto.services.utils.pubsub.ddata.literal.LiteralDData;

import akka.actor.ActorContext;
Expand Down Expand Up @@ -62,40 +67,40 @@ static DistributedAcks create(final ActorContext actorContext,
return new DistributedAcksImpl(config, supervisor);
}

private CompletionStage<AckUpdater.AcksDeclared> askSupervisor(final AckUpdater.AckRequest request) {
private CompletionStage<AcksDeclared> askSupervisor(final AckRequest request) {
return Patterns.ask(ackSupervisor, request, config.getWriteTimeout())
.thenCompose(DistributedAcksImpl::processAskResponse);
}

@Override
public void receiveLocalDeclaredAcks(final ActorRef receiver) {
ackSupervisor.tell(AckUpdater.ReceiveLocalChanges.of(receiver), ActorRef.noSender());
ackSupervisor.tell(ReceiveLocalAcks.of(receiver), ActorRef.noSender());
}

@Override
public void receiveDistributedDeclaredAcks(final ActorRef receiver) {
ackSupervisor.tell(AckUpdater.ReceiveDDataChanges.of(receiver), ActorRef.noSender());
ackSupervisor.tell(ReceiveRemoteAcks.of(receiver), ActorRef.noSender());
}

@Override
public void removeSubscriber(final ActorRef subscriber) {
ackSupervisor.tell(AckUpdater.RemoveSubscriberAcks.of(subscriber), subscriber);
ackSupervisor.tell(RemoveSubscriberAcks.of(subscriber), subscriber);
}

@Override
public CompletionStage<AckUpdater.AcksDeclared> declareAcknowledgementLabels(
public CompletionStage<AcksDeclared> declareAcknowledgementLabels(
final Collection<AcknowledgementLabel> acknowledgementLabels,
final ActorRef subscriber) {
final Set<String> ackLabelStrings = acknowledgementLabels.stream()
.map(AcknowledgementLabel::toString)
.collect(Collectors.toSet());
final AckUpdater.AckRequest request =
AckUpdater.DeclareAcks.of(subscriber, null, ackLabelStrings);
final AckRequest request =
DeclareAcks.of(subscriber, null, ackLabelStrings);
return askSupervisor(request);
}

@Override
public CompletionStage<AckUpdater.AcksDeclared> declareAcknowledgementLabels(
public CompletionStage<AcksDeclared> declareAcknowledgementLabels(
@Nullable final String group,
final Collection<AcknowledgementLabel> acknowledgementLabels,
final ActorRef subscriber) {
Expand All @@ -105,17 +110,17 @@ public CompletionStage<AckUpdater.AcksDeclared> declareAcknowledgementLabels(
final Set<String> ackLabelStrings = acknowledgementLabels.stream()
.map(AcknowledgementLabel::toString)
.collect(Collectors.toSet());
return askSupervisor(AckUpdater.DeclareAcks.of(subscriber, group, ackLabelStrings));
return askSupervisor(DeclareAcks.of(subscriber, group, ackLabelStrings));
}

@Override
public void removeAcknowledgementLabelDeclaration(final ActorRef subscriber) {
ackSupervisor.tell(AckUpdater.RemoveSubscriberAcks.of(subscriber), ActorRef.noSender());
ackSupervisor.tell(RemoveSubscriberAcks.of(subscriber), ActorRef.noSender());
}

private static CompletionStage<AckUpdater.AcksDeclared> processAskResponse(final Object askResponse) {
if (askResponse instanceof AckUpdater.AcksDeclared) {
return CompletableFuture.completedStage((AckUpdater.AcksDeclared) askResponse);
private static CompletionStage<AcksDeclared> processAskResponse(final Object askResponse) {
if (askResponse instanceof AcksDeclared) {
return CompletableFuture.completedStage((AcksDeclared) askResponse);
} else if (askResponse instanceof Throwable) {
return CompletableFuture.failedStage((Throwable) askResponse);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import javax.annotation.Nullable;

import org.eclipse.ditto.services.utils.pubsub.api.AckRequest;
import org.eclipse.ditto.services.utils.pubsub.ddata.DData;
import org.eclipse.ditto.services.utils.pubsub.ddata.literal.LiteralUpdate;

Expand Down Expand Up @@ -53,8 +54,8 @@ public static Props props(final DData<Address, String, LiteralUpdate> ackDData)
@Override
protected Receive createPubSubBehavior() {
return ReceiveBuilder.create()
.match(AckUpdater.AckRequest.class, this::isAckUpdaterAvailable, this::forwardRequest)
.match(AckUpdater.AckRequest.class, this::ackUpdaterUnavailable)
.match(AckRequest.class, this::isAckUpdaterAvailable, this::forwardRequest)
.match(AckRequest.class, this::ackUpdaterUnavailable)
.build();
}

Expand All @@ -76,11 +77,11 @@ private boolean isAckUpdaterAvailable() {
}

@SuppressWarnings("ConstantConditions")
private void forwardRequest(final AckUpdater.AckRequest request) {
private void forwardRequest(final AckRequest request) {
acksUpdater.tell(request, getSender());
}

private void ackUpdaterUnavailable(final AckUpdater.AckRequest request) {
private void ackUpdaterUnavailable(final AckRequest request) {
log.error("AcksUpdater unavailable. Failing <{}>", request);
getSender().tell(new IllegalStateException("AcksUpdater not available"), getSelf());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@
import org.eclipse.ditto.model.base.acks.AcknowledgementLabelNotUniqueException;
import org.eclipse.ditto.services.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.services.utils.akka.logging.ThreadSafeDittoLoggingAdapter;
import org.eclipse.ditto.services.utils.pubsub.api.AcksDeclared;
import org.eclipse.ditto.services.utils.pubsub.api.DeclareAcks;
import org.eclipse.ditto.services.utils.pubsub.api.LocalAcksChanged;
import org.eclipse.ditto.services.utils.pubsub.api.ReceiveLocalAcks;
import org.eclipse.ditto.services.utils.pubsub.api.ReceiveRemoteAcks;
import org.eclipse.ditto.services.utils.pubsub.api.RemoteAcksChanged;
import org.eclipse.ditto.services.utils.pubsub.api.RemoveSubscriberAcks;
import org.eclipse.ditto.services.utils.pubsub.config.PubSubConfig;
import org.eclipse.ditto.services.utils.pubsub.ddata.DData;
import org.eclipse.ditto.services.utils.pubsub.ddata.DDataWriter;
Expand Down Expand Up @@ -103,8 +108,8 @@ public Receive createReceive() {
.match(DeclareAcks.class, this::declare)
.match(Terminated.class, this::terminated)
.match(RemoveSubscriberAcks.class, this::removeSubscriber)
.match(ReceiveDDataChanges.class, this::onReceiveDDataChanges)
.match(ReceiveLocalChanges.class, this::onReceiveLocalChanges)
.match(ReceiveRemoteAcks.class, this::onReceiveDDataChanges)
.match(ReceiveLocalAcks.class, this::onReceiveLocalChanges)
.matchEquals(Clock.TICK, this::tick)
.match(Replicator.Changed.class, this::onChanged)
.matchAny(this::logUnhandled)
Expand All @@ -124,9 +129,12 @@ public LoggingAdapter log() {

private void declare(final DeclareAcks request) {
final ActorRef sender = getSender();
if (isAllowedLocally(request.group, request.ackLabels) && isAllowedRemotely(request.group, request.ackLabels)) {
localAckLabels.put(request.subscriber, request.group, request.ackLabels);
getContext().watch(request.subscriber);
final ActorRef subscriber = request.getSubscriber();
final String group = request.getGroup().orElse(null);
final Set<String> ackLabels = request.getAckLabels();
if (isAllowedLocally(group, ackLabels) && isAllowedRemotely(group, ackLabels)) {
localAckLabels.put(subscriber, group, ackLabels);
getContext().watch(subscriber);
getSender().tell(AcksDeclared.of(request, sender), getSelf());
} else {
failSubscribe(sender);
Expand Down Expand Up @@ -235,7 +243,7 @@ private void terminated(final Terminated terminated) {
}

private void removeSubscriber(final RemoveSubscriberAcks request) {
doRemoveSubscriber(request.subscriber);
doRemoveSubscriber(request.getSubscriber());
}

private void doRemoveSubscriber(final ActorRef subscriber) {
Expand Down Expand Up @@ -284,14 +292,14 @@ private List<ActorRef> getLocalLosers(final Map<Address, List<GroupedAckLabels>>
.collect(Collectors.toList());
}

private void onReceiveDDataChanges(final ReceiveDDataChanges request) {
ddataChangeRecipients.add(request.receiver);
getContext().watch(request.receiver);
private void onReceiveDDataChanges(final ReceiveRemoteAcks request) {
ddataChangeRecipients.add(request.getReceiver());
getContext().watch(request.getReceiver());
}

private void onReceiveLocalChanges(final ReceiveLocalChanges request) {
localChangeRecipients.add(request.receiver);
getContext().watch(request.receiver);
private void onReceiveLocalChanges(final ReceiveLocalAcks request) {
localChangeRecipients.add(request.getReceiver());
getContext().watch(request.getReceiver());
}

private static <T> Comparator<Map.Entry<Address, T>> entryKeyAddressComparator() {
Expand All @@ -302,133 +310,4 @@ private enum Clock {
TICK
}

// TODO: javadoc
// TODO: move to /api
public interface AckRequest {
}

public static final class DeclareAcks implements AckRequest {

private final ActorRef subscriber;
@Nullable private final String group;
private final Set<String> ackLabels;

private DeclareAcks(final ActorRef subscriber,
@Nullable final String group,
final Set<String> ackLabels) {
this.subscriber = subscriber;
this.group = group;
this.ackLabels = ackLabels;
}

// TODO: javadoc
public static AckRequest of(final ActorRef subscriber,
@Nullable final String group,
final Set<String> ackLabels) {
return new DeclareAcks(subscriber, group, ackLabels);
}

@Override
public String toString() {
return getClass().getSimpleName() +
"[subscriber=" + subscriber +
",group=" + group +
",ackLabels=" + ackLabels +
"]";
}
}

public static final class RemoveSubscriberAcks implements AckRequest {

private final ActorRef subscriber;

private RemoveSubscriberAcks(final ActorRef subscriber) {
this.subscriber = subscriber;
}

// TODO: move to model.
public static AckRequest of(final ActorRef subscriber) {
return new RemoveSubscriberAcks(subscriber);
}

@Override
public String toString() {
return getClass().getSimpleName() +
"[subscriber=" + subscriber +
"]";
}
}

/**
* Acknowledgement for requests.
*/
public static final class AcksDeclared {

private final AckRequest request;
private final ActorRef sender;

private AcksDeclared(final AckRequest request, final ActorRef sender) {
this.request = request;
this.sender = sender;
}

static AcksDeclared of(final AckRequest request, final ActorRef sender) {
return new AcksDeclared(request, sender);
}

/**
* @return the request this object is acknowledging.
*/
public AckRequest getRequest() {
return request;
}

/**
* @return sender of the request.
*/
public ActorRef getSender() {
return sender;
}

@Override
public String toString() {
return getClass().getSimpleName() +
"[request=" + request +
",sender=" + sender +
"]";
}
}

private abstract static class ReceiveChanges implements AckRequest {

protected final ActorRef receiver;

private ReceiveChanges(final ActorRef receiver) {
this.receiver = receiver;
}
}

public static final class ReceiveDDataChanges extends ReceiveChanges {

private ReceiveDDataChanges(final ActorRef receiver) {
super(receiver);
}

// TODO: javadoc
public static AckRequest of(final ActorRef receiver) {
return new ReceiveDDataChanges(receiver);
}
}

public static final class ReceiveLocalChanges extends ReceiveChanges {

private ReceiveLocalChanges(final ActorRef receiver) {
super(receiver);
}

// TODO: javadoc
public static AckRequest of(final ActorRef receiver) {
return new ReceiveLocalChanges(receiver);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.utils.pubsub.api;

/**
* Super class of all requests for the {@code AckUpdater}.
*/
public interface AckRequest {
}
Loading

0 comments on commit 43e6203

Please sign in to comment.