Skip to content

Commit

Permalink
fixed behavior switch which did not handle too many messages in the p…
Browse files Browse the repository at this point in the history
…revious "awaitingBulkWriteResult"

* now only 2 behaviors exist: one waiting for bulkWrite result and one not, while the one waiting for bulkWrite result only stashes writeModels which occur during waiting for the bulkWrite result

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Feb 23, 2022
1 parent ce57d24 commit b44ebcb
Showing 1 changed file with 21 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -200,55 +200,55 @@ private void recoveryComplete(final AbstractWriteModel writeModel) {
thingRevision = writeModel.getMetadata().getThingRevision();
writeModel.getMetadata().getPolicyRevision().ifPresent(rev -> this.policyRevision = rev);
lastWriteModel = writeModel;
getContext().become(recoveredBehavior());
getContext().become(recoveredBehavior(), true);
unstashAll();
recoveryCompleteConsumer.accept(writeModel);
}

private Receive recoveredBehavior() {
return recoveredBehavior("recoveredBehavior");
}

private Receive recoveredBehavior(final String currentBehaviorHintForLogging) {
return shutdownBehaviour.createReceive()
.match(ThingEvent.class, this::processThingEvent)
.match(AbstractWriteModel.class, this::onNextWriteModel)
.match(PolicyReferenceTag.class, this::processPolicyReferenceTag)
.match(UpdateThing.class, this::updateThing)
.match(UpdateThingResponse.class, this::processUpdateThingResponse)
.match(ReceiveTimeout.class, this::stopThisActor)
.matchEquals(FORCE_UPDATE_AFTER_START, this::forceUpdateAfterStart)
.build();
}

private Receive recoveredAcceptingWriteModelsBehavior() {
return recoveredBehavior().orElse(ReceiveBuilder.create()
.match(AbstractWriteModel.class, this::onNextWriteModel)
.matchAny(m -> {
log.warning("Unknown message in 'recoveredAcceptingWriteModelsBehavior': {}", m);
log.warning("Unknown message in '{}': {}", currentBehaviorHintForLogging, m);
unhandled(m);
})
.build());
.build();
}

private Receive awaitingBulkWriteResult() {
private Receive recoveredAwaitingBulkWriteResultBehavior() {
getContext().setReceiveTimeout(BULK_RESULT_AWAITING_TIMEOUT);
return ReceiveBuilder.create()
.match(AbstractWriteModel.class, writeModel -> {
log.info("Stashing received writeModel while being in 'recoveredAwaitingBulkWriteResultBehavior': {}",
writeModel);
stash();
})
.match(BulkWriteComplete.class, bulkWriteComplete -> {
log.withCorrelationId(bulkWriteComplete.getBulkWriteCorrelationId())
.debug("Got confirmation bulkWrite was performed - switching to 'recoveredBehavior'");
getContext().cancelReceiveTimeout();
getContext().become(recoveredBehavior());
getContext().become(recoveredBehavior(), true);
unstashAll();
})
.match(ReceiveTimeout.class, rt -> {
log.warning("Encountered ReceiveTimeout being in 'awaitingBulkWriteResult' - " +
log.warning("Encountered ReceiveTimeout being in 'recoveredAwaitingBulkWriteResultBehavior' - " +
"switching back to 'recoveredBehavior'");
getContext().cancelReceiveTimeout();
getContext().become(recoveredBehavior());
getContext().become(recoveredBehavior(), true);
unstashAll();
})
.matchAny(m -> {
log.info("Received unexpected message being in 'awaitingBulkWriteResult' - " +
"going to stash: <{}> from sender: <{}>", m, getSender());
stash();
})
.build();
.build()
.orElse(recoveredBehavior("recoveredAwaitingBulkWriteResultBehavior"));
}

private void matchAnyDuringRecovery(final Object message) {
Expand Down Expand Up @@ -314,8 +314,8 @@ private void onNextWriteModel(final AbstractWriteModel nextWriteModel) {
getSender().tell(mongoWriteModel, getSelf());
lastWriteModel = nextWriteModel;

log.debug("Responded with mongoWriteModel - switching to 'awaitingBulkWriteResult'");
getContext().become(awaitingBulkWriteResult());
log.debug("Responded with mongoWriteModel - switching to 'recoveredAwaitingBulkWriteResultBehavior'");
getContext().become(recoveredAwaitingBulkWriteResultBehavior(), true);
}

private Optional<BsonDiff> tryComputeDiff(final BsonDocument minuend, final BsonDocument subtrahend) {
Expand Down Expand Up @@ -366,8 +366,6 @@ private void enqueueMetadata(final UpdateReason updateReason) {

private void enqueueMetadata(final Metadata metadata) {
changeQueueActor.tell(metadata.withOrigin(getSelf()), getSelf());
log.debug("Enqueued metadata - switching to 'recoveredAcceptingWriteModelsBehavior'");
getContext().become(recoveredAcceptingWriteModelsBehavior());
}

private void updateThing(final UpdateThing updateThing) {
Expand Down

0 comments on commit b44ebcb

Please sign in to comment.