Skip to content

Commit

Permalink
Add "resubscribe" flag to subscribe and ack label declaration.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed May 18, 2022
1 parent 0c26901 commit 6b6ae4b
Show file tree
Hide file tree
Showing 16 changed files with 128 additions and 93 deletions.
Expand Up @@ -1183,7 +1183,7 @@ protected CompletionStage<InitializationResult> startPublisherAndConsumerActors(

return publisherReady
.thenCompose(unused -> consumersReady)
.thenCompose(unused -> subscribeAndDeclareAcknowledgementLabels(dryRun))
.thenCompose(unused -> subscribeAndDeclareAcknowledgementLabels(dryRun, false))
.thenApply(unused -> InitializationResult.success())
.exceptionally(InitializationResult::failed);
}
Expand Down Expand Up @@ -1953,27 +1953,29 @@ private SupervisorStrategy createSupervisorStrategy(final ActorRef self) {
* Subscribe for signals. NOT thread-safe due to querying actor state.
*
* @param isDryRun whether this is a dry run
* @param resubscribe whether this is a resubscription
* @return a future that completes when subscription and ack label declaration succeed and fails when either fails.
*/
private CompletionStage<Void> subscribeAndDeclareAcknowledgementLabels(final boolean isDryRun) {
private CompletionStage<Void> subscribeAndDeclareAcknowledgementLabels(final boolean isDryRun,
final boolean resubscribe) {
if (isDryRun) {
// no point writing to the distributed data in a dry run - this actor will stop right away
return CompletableFuture.completedFuture(null);
} else {
final String group = getPubsubGroup();
final CompletionStage<Void> subscribe = subscribeToStreamingTypes(group);
final CompletionStage<Void> subscribe = subscribeToStreamingTypes(group, resubscribe);
final CompletionStage<Void> declare =
dittoProtocolSub.declareAcknowledgementLabels(getDeclaredAcks(), getSelf(), group);
dittoProtocolSub.declareAcknowledgementLabels(getDeclaredAcks(), getSelf(), group, resubscribe);
return declare.thenCompose(unused -> subscribe);
}
}

private CompletionStage<Void> subscribeToStreamingTypes(final String pubSubGroup) {
private CompletionStage<Void> subscribeToStreamingTypes(final String pubSubGroup, final boolean resubscribe) {
final Set<StreamingType> streamingTypes = getUniqueStreamingTypes();
if (streamingTypes.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
return dittoProtocolSub.subscribe(streamingTypes, getTargetAuthSubjects(), getSelf(), pubSubGroup);
return dittoProtocolSub.subscribe(streamingTypes, getTargetAuthSubjects(), getSelf(), pubSubGroup, resubscribe);
}

private Set<AcknowledgementLabel> getDeclaredAcks() {
Expand Down
Expand Up @@ -255,7 +255,8 @@ public static DittoProtocolSub dummyDittoProtocolSub(final ActorRef pubSubMediat
return new DittoProtocolSub() {
@Override
public CompletionStage<Void> subscribe(final Collection<StreamingType> types,
final Collection<String> topics, final ActorRef subscriber, @Nullable final String group) {
final Collection<String> topics, final ActorRef subscriber, @Nullable final String group,
final boolean resubscribe) {
doDelegate(d -> d.subscribe(types, topics, subscriber));
return CompletableFuture.allOf(types.stream()
.map(type -> {
Expand Down Expand Up @@ -293,9 +294,9 @@ public CompletionStage<Void> removePolicyAnnouncementSubscriber(final ActorRef s
@Override
public CompletionStage<Void> declareAcknowledgementLabels(
final Collection<AcknowledgementLabel> acknowledgementLabels, final ActorRef subscriber,
@Nullable final String group) {
@Nullable final String group, final boolean resubscribe) {
if (delegate != null) {
return delegate.declareAcknowledgementLabels(acknowledgementLabels, subscriber, group);
return delegate.declareAcknowledgementLabels(acknowledgementLabels, subscriber, group, resubscribe);
} else {
return CompletableFuture.completedStage(null);
}
Expand Down
Expand Up @@ -630,7 +630,7 @@ private void refreshWebSocketSession(final Jwt jwt) {
private void declareAcknowledgementLabels(final Collection<AcknowledgementLabel> acknowledgementLabels) {
final ActorRef self = getSelf();
logger.info("Declaring acknowledgement labels <{}>", acknowledgementLabels);
dittoProtocolSub.declareAcknowledgementLabels(acknowledgementLabels, self, null)
dittoProtocolSub.declareAcknowledgementLabels(acknowledgementLabels, self, null, false)
.thenAccept(unused -> logger.info("Acknowledgement label declaration successful for labels: <{}>",
acknowledgementLabels))
.exceptionally(error -> {
Expand Down
Expand Up @@ -14,6 +14,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.when;

import java.time.Duration;
Expand Down Expand Up @@ -139,7 +140,7 @@ public static void shutdown() {

@Before
public void setup() {
when(dittoProtocolSub.declareAcknowledgementLabels(any(), any(), any()))
when(dittoProtocolSub.declareAcknowledgementLabels(any(), any(), any(), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(null));
}

Expand Down
Expand Up @@ -13,6 +13,7 @@
package org.eclipse.ditto.gateway.service.streaming.actors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.anyBoolean;

import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -123,7 +124,7 @@ public static void beforeClass() {
public void before() {
commandRouterProbe = actorSystemResource.newTestProbe("commandRouter");

Mockito.when(mockSub.declareAcknowledgementLabels(Mockito.any(), Mockito.any(), Mockito.any()))
Mockito.when(mockSub.declareAcknowledgementLabels(Mockito.any(), Mockito.any(), Mockito.any(), anyBoolean()))
.thenReturn(CompletableFuture.completedFuture(null));

final Sink<SessionedJsonifiable, TestSubscriber.Probe<SessionedJsonifiable>> sink =
Expand Down Expand Up @@ -381,7 +382,7 @@ private Props getProps(final String... declaredAcks) {
}

private void onDeclareAckLabels(final CompletionStage<Void> answer) {
Mockito.when(mockSub.declareAcknowledgementLabels(Mockito.any(), Mockito.any(), Mockito.any()))
Mockito.when(mockSub.declareAcknowledgementLabels(Mockito.any(), Mockito.any(), Mockito.any(), anyBoolean()))
.thenReturn(answer);
}

Expand Down
Expand Up @@ -66,7 +66,8 @@ public interface DistributedAcks extends Extension {
* @return a future SubAck if the declaration succeeded, or a failed future if it failed.
*/
CompletionStage<AcksDeclared> declareAcknowledgementLabels(
Collection<AcknowledgementLabel> acknowledgementLabels, ActorRef subscriber, @Nullable String group);
Collection<AcknowledgementLabel> acknowledgementLabels, ActorRef subscriber, @Nullable String group,
final boolean resubscribe);

/**
* Declare labels of acknowledgements that a subscriber may send.
Expand All @@ -81,7 +82,7 @@ default CompletionStage<AcksDeclared> declareAcknowledgementLabels(
Collection<AcknowledgementLabel> acknowledgementLabels,
ActorRef subscriber) {

return declareAcknowledgementLabels(acknowledgementLabels, subscriber, null);
return declareAcknowledgementLabels(acknowledgementLabels, subscriber, null, false);
}

/**
Expand Down
Expand Up @@ -46,7 +46,7 @@ public void removeSubscriber(final ActorRef subscriber) {
@Override
public CompletionStage<AcksDeclared> declareAcknowledgementLabels(
final Collection<AcknowledgementLabel> acknowledgementLabels, final ActorRef subscriber,
@Nullable final String group) {
@Nullable final String group, final boolean resubscribe) {
throw new UnsupportedOperationException();
}

Expand Down
Expand Up @@ -104,21 +104,21 @@ public CompletionStage<AcksDeclared> declareAcknowledgementLabels(
.map(AcknowledgementLabel::toString)
.collect(Collectors.toSet());
final AckRequest request =
DeclareAcks.of(subscriber, null, ackLabelStrings);
DeclareAcks.of(subscriber, null, ackLabelStrings, false);
return askSupervisor(request);
}

@Override
public CompletionStage<AcksDeclared> declareAcknowledgementLabels(
final Collection<AcknowledgementLabel> acknowledgementLabels, final ActorRef subscriber,
@Nullable final String group) {
@Nullable final String group, final boolean resubscribe) {
if (group != null) {
ConditionChecker.checkNotEmpty(group, "group");
}
final Set<String> ackLabelStrings = acknowledgementLabels.stream()
.map(AcknowledgementLabel::toString)
.collect(Collectors.toSet());
return askSupervisor(DeclareAcks.of(subscriber, group, ackLabelStrings));
return askSupervisor(DeclareAcks.of(subscriber, group, ackLabelStrings, resubscribe));
}

@Override
Expand Down
Expand Up @@ -35,10 +35,12 @@ public interface DistributedSub {
* @param subscriber who is subscribing.
* @param filter a local topic filter.
* @param group the subscriber's group, if any.
* @param resubscribe whether this is a resubscription.
* @return a future that completes after subscription becomes effective on all nodes.
*/
CompletionStage<SubAck> subscribeWithFilterAndGroup(Collection<String> topics,
ActorRef subscriber, @Nullable Predicate<Collection<String>> filter, @Nullable String group);
ActorRef subscriber, @Nullable Predicate<Collection<String>> filter, @Nullable String group,
final boolean resubscribe);

/**
* Unsubscribe for a collection of topics.
Expand Down
Expand Up @@ -56,11 +56,12 @@ final class DistributedSubImpl implements DistributedSub {
public CompletionStage<SubAck> subscribeWithFilterAndGroup(final Collection<String> topics,
final ActorRef subscriber,
@Nullable final Predicate<Collection<String>> filter,
@Nullable final String group) {
@Nullable final String group,
final boolean resubscribe) {
if (group != null) {
checkNotEmpty(group, "group");
}
final Subscribe subscribe = Subscribe.of(topics, subscriber, writeConsistency, true, filter, group);
final Subscribe subscribe = Subscribe.of(topics, subscriber, writeConsistency, true, filter, group, resubscribe);
final CompletionStage<SubAck> subAckFuture = askSubSupervisor(subscribe);

if (ddataDelayInMillis <= 0) {
Expand Down
Expand Up @@ -38,7 +38,7 @@ public interface DittoProtocolSub extends Extension {
*/
default CompletionStage<Void> subscribe(Collection<StreamingType> types, Collection<String> topics,
ActorRef subscriber) {
return subscribe(types, topics, subscriber, null);
return subscribe(types, topics, subscriber, null, false);
}

/**
Expand All @@ -51,7 +51,7 @@ default CompletionStage<Void> subscribe(Collection<StreamingType> types, Collect
* @return future that completes or fails according to the acknowledgement.
*/
CompletionStage<Void> subscribe(Collection<StreamingType> types, Collection<String> topics, ActorRef subscriber,
@Nullable String group);
@Nullable String group, final boolean resubscribe);

/**
* Remove a subscriber.
Expand Down Expand Up @@ -104,7 +104,7 @@ CompletionStage<Void> updateLiveSubscriptions(Collection<StreamingType> types, C
* {@code AcknowledgementLabelNotUniqueException} later.
*/
CompletionStage<Void> declareAcknowledgementLabels(Collection<AcknowledgementLabel> acknowledgementLabels,
ActorRef subscriber, @Nullable String group);
ActorRef subscriber, @Nullable String group, final boolean resubscribe);

/**
* Relinquish any acknowledgement labels declared by a subscriber.
Expand Down
Expand Up @@ -65,17 +65,20 @@ static DittoProtocolSubImpl of(final ActorSystem system, final DistributedAcks d
public CompletionStage<Void> subscribe(final Collection<StreamingType> types,
final Collection<String> topics,
final ActorRef subscriber,
@Nullable final String group) {
@Nullable final String group,
final boolean resubscribe) {
final CompletionStage<?> nop = CompletableFuture.completedFuture(null);
return partitionByStreamingTypes(types,
liveTypes -> !liveTypes.isEmpty()
? liveSignalSub.subscribeWithFilterAndGroup(topics, subscriber, toFilter(liveTypes), group)
? liveSignalSub.subscribeWithFilterAndGroup(topics, subscriber, toFilter(liveTypes), group,
resubscribe)
: nop,
hasTwinEvents -> hasTwinEvents
? twinEventSub.subscribeWithFilterAndGroup(topics, subscriber, null, group)
? twinEventSub.subscribeWithFilterAndGroup(topics, subscriber, null, group, resubscribe)
: nop,
hasPolicyAnnouncements -> hasPolicyAnnouncements
? policyAnnouncementSub.subscribeWithFilterAndGroup(topics, subscriber, null, group)
? policyAnnouncementSub.subscribeWithFilterAndGroup(topics, subscriber, null, group,
resubscribe)
: nop
);
}
Expand All @@ -95,7 +98,8 @@ public CompletionStage<Void> updateLiveSubscriptions(final Collection<StreamingT

return partitionByStreamingTypes(types,
liveTypes -> !liveTypes.isEmpty()
? liveSignalSub.subscribeWithFilterAndGroup(topics, subscriber, toFilter(liveTypes), null)
? liveSignalSub.subscribeWithFilterAndGroup(topics, subscriber, toFilter(liveTypes), null,
false)
: liveSignalSub.unsubscribeWithAck(topics, subscriber),
hasTwinEvents -> CompletableFuture.completedStage(null),
hasPolicyAnnouncements -> CompletableFuture.completedStage(null)
Expand All @@ -117,7 +121,8 @@ public CompletionStage<Void> removePolicyAnnouncementSubscriber(final ActorRef s
public CompletionStage<Void> declareAcknowledgementLabels(
final Collection<AcknowledgementLabel> acknowledgementLabels,
final ActorRef subscriber,
@Nullable final String group) {
@Nullable final String group,
final boolean resubscribe) {
if (acknowledgementLabels.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
Expand All @@ -126,7 +131,7 @@ public CompletionStage<Void> declareAcknowledgementLabels(
// via the actor supervision strategy
ensureAcknowledgementLabelsAreFullyResolved(acknowledgementLabels);

return distributedAcks.declareAcknowledgementLabels(acknowledgementLabels, subscriber, group)
return distributedAcks.declareAcknowledgementLabels(acknowledgementLabels, subscriber, group, resubscribe)
.thenApply(ack -> null);
}

Expand Down
Expand Up @@ -27,13 +27,14 @@ public final class DeclareAcks implements AckRequest {
private final ActorRef subscriber;
@Nullable private final String group;
private final Set<String> ackLabels;
private final boolean resubscribe;

private DeclareAcks(final ActorRef subscriber,
@Nullable final String group,
final Set<String> ackLabels) {
private DeclareAcks(final ActorRef subscriber, @Nullable final String group, final Set<String> ackLabels,
final boolean resubscribe) {
this.subscriber = subscriber;
this.group = group;
this.ackLabels = ackLabels;
this.resubscribe = resubscribe;
}

/**
Expand All @@ -44,10 +45,9 @@ private DeclareAcks(final ActorRef subscriber,
* @param ackLabels the set of acknowledgement labels being declared - may be empty.
* @return the request.
*/
public static AckRequest of(final ActorRef subscriber,
@Nullable final String group,
final Set<String> ackLabels) {
return new DeclareAcks(subscriber, group, ackLabels);
public static AckRequest of(final ActorRef subscriber, @Nullable final String group, final Set<String> ackLabels,
final boolean resubscribe) {
return new DeclareAcks(subscriber, group, ackLabels, resubscribe);
}

/**
Expand Down Expand Up @@ -77,12 +77,20 @@ public Set<String> getAckLabels() {
return ackLabels;
}

/**
* @return Whether this is a resubscribe request.
*/
public boolean isResubscribe() {
return resubscribe;
}

@Override
public String toString() {
return getClass().getSimpleName() +
"[subscriber=" + subscriber +
",group=" + group +
",ackLabels=" + ackLabels +
",resubscribe=" + resubscribe +
"]";
}
}
Expand Up @@ -28,16 +28,19 @@ public final class Subscribe extends AbstractRequest {

@Nullable private final Predicate<Collection<String>> filter;
@Nullable private final String group;
private final boolean resubscribe;

private Subscribe(final Collection<String> topics,
final ActorRef subscriber,
final Replicator.WriteConsistency writeConsistency,
final boolean acknowledge,
@Nullable final Predicate<Collection<String>> filter,
@Nullable final String group) {
@Nullable final String group,
final boolean resubscribe) {
super(topics, subscriber, writeConsistency, acknowledge);
this.filter = filter;
this.group = group;
this.resubscribe = resubscribe;
}

/**
Expand All @@ -55,7 +58,7 @@ public static Subscribe of(final Collection<String> topics,
final Replicator.WriteConsistency writeConsistency,
final boolean acknowledge,
@Nullable final String group) {
return new Subscribe(topics, subscriber, writeConsistency, acknowledge, null, group);
return new Subscribe(topics, subscriber, writeConsistency, acknowledge, null, group, false);
}

/**
Expand All @@ -74,8 +77,9 @@ public static Subscribe of(final Collection<String> topics,
final Replicator.WriteConsistency writeConsistency,
final boolean acknowledge,
@Nullable final Predicate<Collection<String>> filter,
@Nullable final String group) {
return new Subscribe(topics, subscriber, writeConsistency, acknowledge, filter, group);
@Nullable final String group,
final boolean resubscribe) {
return new Subscribe(topics, subscriber, writeConsistency, acknowledge, filter, group, resubscribe);
}

/**
Expand All @@ -93,4 +97,11 @@ public Optional<String> getGroup() {
return Optional.ofNullable(group);
}

/**
* @return Whether this is a resubscribe request.
*/
boolean isResubscribe() {
return resubscribe;
}

}

0 comments on commit 6b6ae4b

Please sign in to comment.