Skip to content

Commit

Permalink
Review: Acknowledgement aggregation changes
Browse files Browse the repository at this point in the history
Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
DerSchwilk committed Jul 1, 2022
1 parent 36e7507 commit b591839
Show file tree
Hide file tree
Showing 9 changed files with 18 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
/**
* Provides {@link Acknowledgement}s based on {@link CommandResponse}s abstracting away model-specific dependencies.
*
* @param <R> the type of the command response which to provide the Acknowledgements for.
* @param <R> the type of the command response for which to provide the Acknowledgements.
* @since 3.0.0
*/
@Immutable
Expand Down Expand Up @@ -52,5 +52,4 @@ public interface CommandResponseAcknowledgementProvider<R extends CommandRespons
*/
Class<R> getMatchedClass();


}
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,13 @@ static boolean isCommand(@Nullable final Signal<?> signal) {
enum Category {
/**
* Category of commands that do not change the state of any entity.
* @since 3.0.0
*/
QUERY,

/**
* Category of commands that creates entities.
*
* @since 3.0.0
*/
CREATE,

Expand Down Expand Up @@ -164,7 +165,7 @@ enum Category {
* Determines whether the passed {@code category} effectively modifies the targeted entity.
*
* @param category the category to check.
* @return whether the passed {@code category} effectively modifies the targeted entity
* @return whether the passed {@code category} effectively modifies the targeted entity.
* @since 3.0.0
*/
public static boolean isEntityModifyingCommand(final Category category) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ private Function<Acknowledgements, DittoRuntimeException> getDefaultGetAsTimeout

private static DittoHeaders calculateHeadersWithEntityId(@Nullable final EntityId entityId,
final WithDittoHeaders withDittoHeaders) {

if (null != entityId) {
return withDittoHeaders.getDittoHeaders()
.toBuilder()
Expand Down Expand Up @@ -306,13 +307,13 @@ private MatchingValidationResult validateResponse(final CommandResponse<?> comma

private static MatchingValidationResult validateLiveResponse(final Command<?> command,
final CommandResponse<?> commandResponse) {

final var responseMatchingValidator =
CommandAndCommandResponseMatchingValidator.getInstance();

return responseMatchingValidator.apply(command, commandResponse);
}


private void handleReceiveTimeout(final Control receiveTimeout) {
log.withCorrelationId(correlationId).info("Timed out waiting for all requested acknowledgements, " +
"completing Acknowledgements with timeouts...");
Expand Down Expand Up @@ -382,6 +383,7 @@ private static boolean containsOnlyTwinPersistedOrLiveResponse(final Acknowledge

private static Duration getTimeout(final Signal<?> originatingSignal, final Duration maxTimeout,
@Nullable final Duration specifiedTimeout) {

if (specifiedTimeout != null) {
return specifiedTimeout;
} else if (Signal.isChannelSmart(originatingSignal)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public static AcknowledgementAggregatorActorStarter of(final ActorRefFactory act
* Public because this is also used in unit tests.
*
* @param signal the signal to check for whether the ack aggregator actor should be started.
* @return whether the ack aggregator actor should be started
* @return whether the ack aggregator actor should be started.
*/
public static boolean shouldStartForIncoming(final Signal<?> signal) {
final boolean result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ protected boolean isStartChildImmediately() {

protected Receive activeBehaviour(final FI.UnitApply<Control> matchProcessNextTwinMessageBehavior,
final FI.UnitApply<Object> matchAnyBehavior) {

return ReceiveBuilder.create()
.match(Terminated.class, this::childTerminated)
.matchEquals(Control.START_CHILDREN, this::startChildren)
Expand Down Expand Up @@ -194,6 +195,7 @@ protected CompletionStage<Object> modifyEnforcerActorEnforcedSignalResponse(fina
*/
protected CompletionStage<Object> modifyTargetActorCommandResponse(final Signal<?> enforcedSignal,
final Object persistenceCommandResponse) {

return CompletableFuture.completedStage(persistenceCommandResponse);
}

Expand Down Expand Up @@ -279,6 +281,7 @@ protected CompletionStage<Object> askEnforcerChild(final Signal<?> signal) {
*/
protected <T> CompletionStage<Object> askTargetActor(final T message, final boolean shouldSendResponse,
final ActorRef sender) {

return getTargetActorForSendingEnforcedMessageTo(message, shouldSendResponse, sender)
.thenCompose(this::askOrForwardToTargetActor)
.thenApply(response -> {
Expand All @@ -294,6 +297,7 @@ protected <T> CompletionStage<Object> askTargetActor(final T message, final bool

private CompletionStage<Object> askOrForwardToTargetActor(
@Nullable final TargetActorWithMessage targetActorWithMessage) {

if (null != targetActorWithMessage) {
if (!targetActorWithMessage.messageTimeout().isZero()) {
return Patterns.ask(
Expand Down Expand Up @@ -328,6 +332,7 @@ private CompletionStage<Object> askOrForwardToTargetActor(
protected <T> CompletionStage<TargetActorWithMessage> getTargetActorForSendingEnforcedMessageTo(final T message,
final boolean shouldSendResponse,
final ActorRef sender) {

if (null != persistenceActorChild) {
return CompletableFuture.completedStage(
new TargetActorWithMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ final class PolicyIdReferencePlaceholderResolver implements ReferencePlaceholder

private PolicyIdReferencePlaceholderResolver(final ActorRef thingsShardRegion,
final AskWithRetryConfig askWithRetryConfig, final ActorSystem actorSystem) {

this.thingsShardRegion = thingsShardRegion;
this.askWithRetryConfig = askWithRetryConfig;
this.actorSystem = actorSystem;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ private LiveResponseAndAcknowledgementForwarder(final ThingQueryCommand<?> thing
* @param messageReceiver Receiver of the message to publish.
* @return The Props object.
*/
public static Props props(final ThingQueryCommand<?> thingQueryCommand,
final ActorRef messageReceiver) {
public static Props props(final ThingQueryCommand<?> thingQueryCommand, final ActorRef messageReceiver) {
return Props.create(LiveResponseAndAcknowledgementForwarder.class, thingQueryCommand, messageReceiver);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,8 @@ protected ActorRef startEntityActor(final ActorSystem system, final ActorRef pub

final LiveSignalPub liveSignalPub = new TestSetup.DummyLiveSignalPub(pubSubMediator);

final Props props =
ThingSupervisorActor.props(pubSubMediator,
new DistributedPub<>() {
final Props props = ThingSupervisorActor.props(pubSubMediator,
new DistributedPub<>() {

@Override
public ActorRef getPublisher() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ public Flow<ThingUpdater.Data, MongoWriteModel, NotUsed> create(final SearchUpda

private Source<Pair<ThingId, JsonObject>, NotUsed> retrieveThingFromCachingFacade(final ThingId thingId,
final Metadata metadata, final int leftRetryAttempts) {

ConsistencyLag.startS3RetrieveThing(metadata);
final CompletionStage<JsonObject> thingFuture = provideThingFuture(thingId, metadata);

Expand Down

0 comments on commit b591839

Please sign in to comment.