Skip to content

Commit

Permalink
fix sending BulkWriteComplete message for all cases
Browse files Browse the repository at this point in the history
* send where acks are issued
* change "ReceiveTimeout" in ThingUpdater to a separate timer fixing that any consumed message prolonges the timeout of 2 minutes again

Signed-off-by: Thomas Jaeckle <thomas.jaeckle@bosch.io>
  • Loading branch information
thjaeckle committed Mar 9, 2022
1 parent 5429603 commit 7355752
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 28 deletions.
Expand Up @@ -12,6 +12,9 @@
*/
package org.eclipse.ditto.thingsearch.service.persistence;

import java.util.Optional;

import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;

/**
Expand All @@ -20,9 +23,9 @@
@Immutable
public final class BulkWriteComplete {

private final String bulkWriteCorrelationId;
@Nullable private final String bulkWriteCorrelationId;

private BulkWriteComplete(final String bulkWriteCorrelationId) {
private BulkWriteComplete(@Nullable final String bulkWriteCorrelationId) {
this.bulkWriteCorrelationId = bulkWriteCorrelationId;
}

Expand All @@ -32,14 +35,14 @@ private BulkWriteComplete(final String bulkWriteCorrelationId) {
* @param bulkWriteCorrelationId the correlationId of the bulkWrite.
* @return the instance.
*/
public static BulkWriteComplete of(final String bulkWriteCorrelationId) {
public static BulkWriteComplete of(@Nullable final String bulkWriteCorrelationId) {
return new BulkWriteComplete(bulkWriteCorrelationId);
}

/**
* @return the correlationId of the bulkWrite.
*/
public String getBulkWriteCorrelationId() {
return bulkWriteCorrelationId;
public Optional<String> getBulkWriteCorrelationId() {
return Optional.ofNullable(bulkWriteCorrelationId);
}
}
Expand Up @@ -35,6 +35,7 @@
import org.eclipse.ditto.things.model.signals.events.ThingEvent;
import org.eclipse.ditto.thingsearch.api.UpdateReason;
import org.eclipse.ditto.thingsearch.api.commands.sudo.UpdateThingResponse;
import org.eclipse.ditto.thingsearch.service.persistence.BulkWriteComplete;

import akka.actor.ActorRef;

Expand Down Expand Up @@ -419,6 +420,17 @@ public void sendWeakAck(@Nullable final JsonValue payload) {
send(Acknowledgement.weak(DittoAcknowledgementLabel.SEARCH_PERSISTED, thingId, DittoHeaders.empty(), payload));
}

/**
* Send {@link BulkWriteComplete} message to ThingUpdater {@code origin}.
*
* @param bulkWriteCorrelationId the correlation-id of the bulk write.
*/
public void sendBulkWriteCompleteToOrigin(@Nullable final String bulkWriteCorrelationId) {
if (null != origin) {
origin.tell(BulkWriteComplete.of(bulkWriteCorrelationId), ActorRef.noSender());
}
}

private void send(final Acknowledgement ack) {
timers.forEach(timer -> {
if (timer.isRunning()) {
Expand Down
Expand Up @@ -77,15 +77,16 @@ Flow<WriteResultAndErrors, String, NotUsed> start(final Duration delay) {
private Iterable<String> checkBulkWriteResult(final WriteResultAndErrors writeResultAndErrors) {
if (wasNotAcknowledged(writeResultAndErrors)) {
// All failed.
acknowledgeFailures(getAllMetadata(writeResultAndErrors));
acknowledgeFailures(getAllMetadata(writeResultAndErrors), writeResultAndErrors.getBulkWriteCorrelationId());
return Collections.singleton(logResult("NotAcknowledged", writeResultAndErrors, false,
false));
} else {
final var consistencyError = checkForConsistencyError(writeResultAndErrors);
switch (consistencyError.status) {
case CONSISTENCY_ERROR:
// write result is not consistent; there is a bug with Ditto or with its environment
acknowledgeFailures(getAllMetadata(writeResultAndErrors));
acknowledgeFailures(getAllMetadata(writeResultAndErrors),
writeResultAndErrors.getBulkWriteCorrelationId());

return Collections.singleton(consistencyError.message);
case INCORRECT_PATCH:
Expand Down Expand Up @@ -134,24 +135,28 @@ private Collection<String> acknowledgeSuccessesAndFailures(final WriteResultAndE
// duplicate key error is considered success
}
}
acknowledgeFailures(failedMetadata);
acknowledgeSuccesses(failedIndices, writeResultAndErrors.getWriteModels());
acknowledgeFailures(failedMetadata, writeResultAndErrors.getBulkWriteCorrelationId());
acknowledgeSuccesses(failedIndices, writeResultAndErrors.getBulkWriteCorrelationId(),
writeResultAndErrors.getWriteModels());

return logEntries;
}

private static void acknowledgeSuccesses(final BitSet failedIndices, final List<AbstractWriteModel> writeModels) {
private static void acknowledgeSuccesses(final BitSet failedIndices, final String bulkWriteCorrelationId,
final List<AbstractWriteModel> writeModels) {
for (int i = 0; i < writeModels.size(); ++i) {
if (!failedIndices.get(i)) {
writeModels.get(i).getMetadata().sendAck();
writeModels.get(i).getMetadata().sendBulkWriteCompleteToOrigin(bulkWriteCorrelationId);
}
}
}

private void acknowledgeFailures(final Collection<Metadata> metadataList) {
private void acknowledgeFailures(final Collection<Metadata> metadataList, final String bulkWriteCorrelationId) {
errorsCounter.increment(metadataList.size());
for (final Metadata metadata : metadataList) {
metadata.sendNAck(); // also stops timer even if no acknowledgement is requested
metadata.sendBulkWriteCompleteToOrigin(bulkWriteCorrelationId);
final UpdateThingResponse response = createFailureResponse(metadata, DittoHeaders.empty());
metadata.getOrigin().ifPresentOrElse(
origin -> origin.tell(response, ActorRef.noSender()),
Expand Down
Expand Up @@ -23,7 +23,6 @@
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.timer.StartedTimer;
import org.eclipse.ditto.thingsearch.service.common.config.PersistenceStreamConfig;
import org.eclipse.ditto.thingsearch.service.persistence.BulkWriteComplete;
import org.eclipse.ditto.thingsearch.service.persistence.PersistenceConstants;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.AbstractWriteModel;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.WriteResultAndErrors;
Expand All @@ -40,7 +39,6 @@
import com.mongodb.reactivestreams.client.MongoDatabase;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.japi.Pair;
import akka.japi.pf.PFBuilder;
import akka.stream.javadsl.Flow;
Expand Down Expand Up @@ -141,8 +139,7 @@ private Source<WriteResultAndErrors, NotUsed> executeBulkWrite(final boolean sho
.debug("Requested to make empty update by write models <{}>", abstractWriteModels);
for (final var abstractWriteModel : abstractWriteModels) {
abstractWriteModel.getMetadata().sendWeakAck(null);
abstractWriteModel.getMetadata().getOrigin().ifPresent(origin ->
origin.tell(BulkWriteComplete.of(bulkWriteCorrelationId), ActorRef.noSender()));
abstractWriteModel.getMetadata().sendBulkWriteCompleteToOrigin(bulkWriteCorrelationId);
}
return Source.empty();
}
Expand Down Expand Up @@ -179,11 +176,8 @@ private Source<WriteResultAndErrors, NotUsed> executeBulkWrite(final boolean sho
)
.map(resultAndErrors -> {
stopBulkWriteTimer(bulkWriteTimer);
abstractWriteModels.forEach(writeModel -> {
writeModel.getMetadata().getOrigin().ifPresent(origin ->
origin.tell(BulkWriteComplete.of(bulkWriteCorrelationId), ActorRef.noSender()));
ConsistencyLag.startS6Acknowledge(writeModel.getMetadata());
}
abstractWriteModels.forEach(writeModel ->
ConsistencyLag.startS6Acknowledge(writeModel.getMetadata())
);
return resultAndErrors;
});
Expand Down
Expand Up @@ -89,6 +89,7 @@ public static SearchUpdateMapper get(final ActorSystem actorSystem) {
if (mongoWriteModelOpt.isEmpty()) {
logger.debug("Write model is unchanged, skipping update: <{}>", model);
model.getMetadata().sendWeakAck(null);
model.getMetadata().sendBulkWriteCompleteToOrigin(null);
return List.<Pair<AbstractWriteModel, WriteModel<BsonDocument>>>of();
} else {
ConsistencyLag.startS5MongoBulkWrite(model.getMetadata());
Expand Down
Expand Up @@ -86,14 +86,16 @@ final class ThingUpdater extends AbstractActorWithStashWithTimers {
AcknowledgementRequest.of(DittoAcknowledgementLabel.SEARCH_PERSISTED);

static final String FORCE_UPDATE_AFTER_START = "FORCE_UPDATE_AFTER_START";
static final String BULK_RESULT_AWAITING_TIMEOUT = "BULK_RESULT_AWAITING_TIMEOUT";

private static final Duration BULK_RESULT_AWAITING_TIMEOUT_DURATION = Duration.ofMinutes(2);

private static final Counter INCORRECT_PATCH_UPDATE_COUNT = DittoMetrics.counter("search_incorrect_patch_updates");
private static final Counter UPDATE_FAILURE_COUNT = DittoMetrics.counter("search_update_failures");
private static final Counter PATCH_UPDATE_COUNT = DittoMetrics.counter("search_patch_updates");
private static final Counter PATCH_SKIP_COUNT = DittoMetrics.counter("search_patch_skips");
private static final Counter FULL_UPDATE_COUNT = DittoMetrics.counter("search_full_updates");

private static final Duration BULK_RESULT_AWAITING_TIMEOUT = Duration.ofMinutes(2);
private static final Duration THING_DELETION_TIMEOUT = Duration.ofMinutes(5);

private static final DittoLogger LOGGER = DittoLoggerFactory.getLogger(ThingUpdater.class);
Expand Down Expand Up @@ -227,25 +229,26 @@ private Receive recoveredBehavior(final String currentBehaviorHintForLogging) {
}

private Receive recoveredAwaitingBulkWriteResultBehavior() {
getContext().setReceiveTimeout(BULK_RESULT_AWAITING_TIMEOUT);
getTimers().startSingleTimer(BULK_RESULT_AWAITING_TIMEOUT, BULK_RESULT_AWAITING_TIMEOUT,
BULK_RESULT_AWAITING_TIMEOUT_DURATION);
return ReceiveBuilder.create()
.match(AbstractWriteModel.class, writeModel -> {
log.debug("Stashing received writeModel while being in " +
log.info("Stashing received writeModel while being in " +
"'recoveredAwaitingBulkWriteResultBehavior': <{}> with revision: <{}>",
writeModel.getClass().getSimpleName(), writeModel.getMetadata().getThingRevision());
stash();
})
.match(BulkWriteComplete.class, bulkWriteComplete -> {
log.withCorrelationId(bulkWriteComplete.getBulkWriteCorrelationId())
log.withCorrelationId(bulkWriteComplete.getBulkWriteCorrelationId().orElse(null))
.debug("Got confirmation bulkWrite was performed - switching to 'recoveredBehavior'");
getContext().cancelReceiveTimeout();
getTimers().cancel(BULK_RESULT_AWAITING_TIMEOUT);
getContext().become(recoveredBehavior(), true);
unstashAll();
})
.match(ReceiveTimeout.class, rt -> {
log.warning("Encountered ReceiveTimeout being in 'recoveredAwaitingBulkWriteResultBehavior' - " +
.matchEquals(BULK_RESULT_AWAITING_TIMEOUT, bra -> {
log.warning("Encountered timeout being in 'recoveredAwaitingBulkWriteResultBehavior' - " +
"switching back to 'recoveredBehavior'");
getContext().cancelReceiveTimeout();
getTimers().cancel(BULK_RESULT_AWAITING_TIMEOUT);
getContext().become(recoveredBehavior(), true);
unstashAll();
})
Expand Down

0 comments on commit 7355752

Please sign in to comment.