Skip to content

Commit

Permalink
first check for incorrect patch update in the if branch;
Browse files Browse the repository at this point in the history
Signed-off-by: Stefan Maute <stefan.maute@bosch.io>
  • Loading branch information
Stefan Maute committed Feb 7, 2022
1 parent 5e6b089 commit 06f47b3
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@

/**
* Command response to acknowledge a search index update.
* Currently a Ditto-internal message, but could become public API at some point.
* Currently, a Ditto-internal message, but could become public API at some point.
*/
@Immutable
@AllValuesAreNonnullByDefault
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,11 @@ private Iterable<String> checkBulkWriteResult(final WriteResultAndErrors writeRe
case CONSISTENCY_ERROR:
// write result is not consistent; there is a bug with Ditto or with its environment
acknowledgeFailures(getAllMetadata(writeResultAndErrors));

return Collections.singleton(consistencyError.message);
case INCORRECT_PATCH:
reportIncorrectPatch(writeResultAndErrors);

return getConsistencyOKResult(writeResultAndErrors);
case OK:
default:
Expand All @@ -98,7 +100,7 @@ private Iterable<String> getConsistencyOKResult(final WriteResultAndErrors write
private void reportIncorrectPatch(final WriteResultAndErrors writeResultAndErrors) {
// Some patches are not applied due to inconsistent sequence number in the search index.
// It is not possible to identify which patches are not applied; therefore request all patch updates to retry.
writeResultAndErrors.getWriteModels().stream().forEach(model -> {
writeResultAndErrors.getWriteModels().forEach(model -> {
final var response =
createFailureResponse(model.getMetadata()).setDittoHeaders(INCORRECT_PATCH_HEADERS);
model.getMetadata().getOrigin().ifPresent(updater -> updater.tell(response, ActorRef.noSender()));
Expand All @@ -122,6 +124,7 @@ private Collection<String> acknowledgeSuccessesAndFailures(final WriteResultAndE
}
acknowledgeFailures(failedMetadata);
acknowledgeSuccesses(failedIndices, writeResultAndErrors.getWriteModels());

return logEntries;
}

Expand Down Expand Up @@ -190,6 +193,7 @@ private static ConsistencyCheckResult checkForConsistencyError(final WriteResult
if (!areAllIndexesWithinBounds(resultAndErrors.getBulkWriteErrors(), requested)) {
// some indexes not within bounds
final var message = String.format("ConsistencyError[indexOutOfBound]: %s", resultAndErrors);

return new ConsistencyCheckResult(ConsistencyStatus.CONSISTENCY_ERROR, message);
} else if (areUpdatesMissing(resultAndErrors)) {
return new ConsistencyCheckResult(ConsistencyStatus.INCORRECT_PATCH, "");
Expand All @@ -205,12 +209,12 @@ private static boolean areUpdatesMissing(final WriteResultAndErrors resultAndErr
.count();
final long matchedCount = result.getMatchedCount();
final long upsertCount = result.getUpserts().size();

return matchedCount + upsertCount < writeModelCount;
}

private static boolean areAllIndexesWithinBounds(final Collection<BulkWriteError> bulkWriteErrors,
final int requested) {

return bulkWriteErrors.stream().mapToInt(BulkWriteError::getIndex).allMatch(i -> 0 <= i && i < requested);
}

Expand All @@ -232,6 +236,7 @@ private static String logResult(final String status, final WriteResultAndErrors
return stackTraceWriter.append("]").toString();
} else if (isCompleteSuccess) {
final BulkWriteResult bulkWriteResult = writeResultAndErrors.getBulkWriteResult();

return String.format(
"%s: Success[ack=%b,errors=%d,matched=%d,upserts=%d,inserted=%d,modified=%d,deleted=%d]",
status,
Expand All @@ -245,6 +250,7 @@ private static String logResult(final String status, final WriteResultAndErrors
} else {
// partial success
final BulkWriteResult bulkWriteResult = writeResultAndErrors.getBulkWriteResult();

return String.format(
"%s: PartialSuccess[ack=%b,errorCount=%d,matched=%d,upserts=%d,inserted=%d,modified=%d," +
"deleted=%d,errors=%s]",
Expand Down Expand Up @@ -277,4 +283,5 @@ private enum ConsistencyStatus {
INCORRECT_PATCH,
OK
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public static SearchUpdaterStream of(final UpdaterConfig updaterConfig,
public KillSwitch start(final ActorContext actorContext) {
final Source<Source<AbstractWriteModel, NotUsed>, NotUsed> restartSource = createRestartSource();
final Sink<Source<AbstractWriteModel, NotUsed>, NotUsed> restartSink = createRestartSink();

return restartSource.viaMat(KillSwitches.single(), Keep.right())
.toMat(restartSink, Keep.left())
.run(actorContext.system());
Expand All @@ -149,6 +150,7 @@ private Source<Source<AbstractWriteModel, NotUsed>, NotUsed> createRestartSource
writeModelSource.via(blockNamespaceFlow(SearchUpdaterStream::namespaceOfWriteModel)));

final var backOffConfig = retrievalConfig.getExponentialBackOffConfig();

return RestartSource.withBackoff(
RestartSettings.create(backOffConfig.getMin(), backOffConfig.getMax(), backOffConfig.getRandomFactor()),
() -> source);
Expand All @@ -172,6 +174,7 @@ private Sink<Source<AbstractWriteModel, NotUsed>, NotUsed> createRestartSink() {
.to(Sink.ignore());

final var backOffConfig = persistenceConfig.getExponentialBackOffConfig();

return RestartSink.withBackoff(
RestartSettings.create(backOffConfig.getMin(), backOffConfig.getMax(), backOffConfig.getRandomFactor()),
() -> sink);
Expand Down Expand Up @@ -206,5 +209,4 @@ private static String namespaceOfWriteModel(final AbstractWriteModel writeModel)
return writeModel.getMetadata().getThingId().getNamespace();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ static Props props(final ActorRef pubSubMediator, final ActorRef changeQueueActo
final var effectiveForceUpdateAfterStartTimeout = updaterConfig.isForceUpdateAfterStartEnabled()
? updaterConfig.getForceUpdateAfterStartTimeout()
: Duration.ZERO;

return Props.create(ThingUpdater.class, pubSubMediator, changeQueueActor,
updaterConfig.getForceUpdateProbability(),
effectiveForceUpdateAfterStartTimeout,
Expand Down Expand Up @@ -223,6 +224,7 @@ private void onNextWriteModel(final AbstractWriteModel nextWriteModel) {
log.debug("Skipping update due to empty diff <{}>", nextWriteModel);
getSender().tell(Done.getInstance(), getSelf());
PATCH_SKIP_COUNT.increment();

return;
}
final var filter = ((ThingWriteModel) nextWriteModel)
Expand Down Expand Up @@ -259,6 +261,7 @@ private Optional<BsonDiff> tryComputeDiff(final BsonDocument minuend, final Bson
return Optional.of(BsonDiff.minusThingDocs(minuend, subtrahend));
} catch (BsonInvalidOperationException e) {
log.error(e, "Failed to compute BSON diff between <{}> and <{}>", minuend, subtrahend);

return Optional.empty();
}
}
Expand Down Expand Up @@ -327,14 +330,16 @@ private void processUpdateThingResponse(final UpdateThingResponse response) {
if (isFailure || isIncorrectPatch) {
// discard last write model: index document is not known
lastWriteModel = null;
final Metadata metadata = exportMetadata(null, null).invalidateCaches(true, true);
final Metadata metadata =
exportMetadata(null, null).invalidateCaches(true, true);
final String warningTemplate;
if (isFailure) {
warningTemplate = "Got negative acknowledgement for <{}>; updating to <{}>.";
UPDATE_FAILURE_COUNT.increment();
} else {
// check first for incorrect patch update otherwise the else branch is never triggered.
if (isIncorrectPatch) {
warningTemplate = "Inconsistent patch update detected for <{}>; updating to <{}>.";
INCORRECT_PATCH_UPDATE_COUNT.increment();
} else {
warningTemplate = "Got negative acknowledgement for <{}>; updating to <{}>.";
UPDATE_FAILURE_COUNT.increment();
}
log.warning(warningTemplate, Metadata.fromResponse(response), metadata);
enqueueMetadata(metadata.withUpdateReason(UpdateReason.RETRY));
Expand Down

0 comments on commit 06f47b3

Please sign in to comment.