Skip to content

Commit

Permalink
Add draft of ThingUpdater without bulk updates.
Browse files Browse the repository at this point in the history
Signed-off-by: Yufei Cai <yufei.cai@bosch.io>
  • Loading branch information
yufei-cai committed Apr 5, 2022
1 parent 6b168bc commit 1da0076
Show file tree
Hide file tree
Showing 8 changed files with 921 additions and 393 deletions.
6 changes: 6 additions & 0 deletions thingsearch/service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@
</dependency>

<!-- ### Testing ### -->
<dependency>
<groupId>org.eclipse.ditto</groupId>
<artifactId>ditto-internal-utils-akka</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_${scala.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,15 @@ public static Metadata fromResponse(final UpdateThingResponse updateThingRespons
null);
}

/**
* Create a copy of this object containing only the IDs and revisions of the thing and policy.
*
* @return the exported metadata.
*/
public Metadata export() {
return Metadata.of(thingId, thingRevision, policyId, policyRevision, null);
}

/**
* Create a copy of this metadata requesting cache invalidation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ public static WriteResultAndErrors failure(final Collection<MongoWriteModel> wri
mongoBulkWriteException.getWriteErrors(), null, bulkWriteCorrelationId);
}

public static WriteResultAndErrors failure(final Throwable error) {
return new WriteResultAndErrors(List.of(), BulkWriteResult.unacknowledged(), List.of(), error, "");
}

/**
* Create a WriteResultAndErrors from an unexpected error. Getting called suggests a bug in Ditto or in its
* environment.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import javax.annotation.Nullable;

import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.ShardedMessageEnvelope;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
Expand All @@ -42,29 +43,28 @@

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.japi.Pair;
import akka.stream.DelayOverflowStrategy;
import akka.stream.javadsl.Flow;

/**
* Flow that sends acknowledgements to ThingUpdater according to bulk write results.
*/
final class BulkWriteResultAckFlow {
public final class BulkWriteResultAckFlow {

private static final Counter ERRORS_COUNTER = DittoMetrics.counter("search-index-update-errors");

private static final ThreadSafeDittoLogger LOGGER =
DittoLoggerFactory.getThreadSafeLogger(BulkWriteResultAckFlow.class);

private static final String ERRORS_COUNTER_NAME = "search-index-update-errors";

private static final DittoHeaders INCORRECT_PATCH_HEADERS = DittoHeaders.newBuilder()
.putHeader(SearchUpdaterStream.FORCE_UPDATE_INCORRECT_PATCH, "true")
.build();

private final ActorRef updaterShard;
private final Counter errorsCounter;

private BulkWriteResultAckFlow(final ActorRef updaterShard) {
this.updaterShard = updaterShard;
errorsCounter = DittoMetrics.counter(ERRORS_COUNTER_NAME);
}

static BulkWriteResultAckFlow of(final ActorRef updaterShard) {
Expand All @@ -76,37 +76,48 @@ Flow<WriteResultAndErrors, String, NotUsed> start(final Duration delay) {
}

private Iterable<String> checkBulkWriteResult(final WriteResultAndErrors writeResultAndErrors) {
return checkBulkWriteResult(writeResultAndErrors, updaterShard).second();
}

// TODO
public static Pair<Status, List<String>> checkBulkWriteResult(final WriteResultAndErrors writeResultAndErrors,
@Nullable final ActorRef updaterShard) {

if (wasNotAcknowledged(writeResultAndErrors)) {
// All failed.
acknowledgeFailures(getAllMetadata(writeResultAndErrors), writeResultAndErrors.getBulkWriteCorrelationId());
return Collections.singleton(logResult("NotAcknowledged", writeResultAndErrors, false,
false));
acknowledgeFailures(getAllMetadata(writeResultAndErrors), writeResultAndErrors.getBulkWriteCorrelationId(),
updaterShard);
return Pair.create(Status.UNACKNOWLEDGED,
List.of(logResult("NotAcknowledged", writeResultAndErrors, false, false)));
} else {
final var consistencyError = checkForConsistencyError(writeResultAndErrors);
switch (consistencyError.status) {
case CONSISTENCY_ERROR:
// write result is not consistent; there is a bug with Ditto or with its environment
acknowledgeFailures(getAllMetadata(writeResultAndErrors),
writeResultAndErrors.getBulkWriteCorrelationId());
writeResultAndErrors.getBulkWriteCorrelationId(), updaterShard);

return Collections.singleton(consistencyError.message);
return Pair.create(Status.CONSISTENCY_ERROR, List.of(consistencyError.message));
case INCORRECT_PATCH:
reportIncorrectPatch(writeResultAndErrors);

return getConsistencyOKResult(writeResultAndErrors, true);
return Pair.create(Status.INCORRECT_PATCH,
getConsistencyOKResult(writeResultAndErrors, true, updaterShard));
case OK:
default:
return getConsistencyOKResult(writeResultAndErrors, false);
return Pair.create(Status.OK,
getConsistencyOKResult(writeResultAndErrors, false, updaterShard));
}
}
}

private Iterable<String> getConsistencyOKResult(final WriteResultAndErrors writeResultAndErrors,
final boolean containsIncorrectPatch) {
return acknowledgeSuccessesAndFailures(writeResultAndErrors, containsIncorrectPatch);
private static List<String> getConsistencyOKResult(final WriteResultAndErrors writeResultAndErrors,
final boolean containsIncorrectPatch,
@Nullable final ActorRef updaterShard) {
return acknowledgeSuccessesAndFailures(writeResultAndErrors, containsIncorrectPatch, updaterShard);
}

private void reportIncorrectPatch(final WriteResultAndErrors writeResultAndErrors) {
private static void reportIncorrectPatch(final WriteResultAndErrors writeResultAndErrors) {
// Some patches are not applied due to inconsistent sequence number in the search index.
// It is not possible to identify which patches are not applied; therefore request all patch updates to retry.
writeResultAndErrors.getWriteModels().forEach(model -> {
Expand All @@ -129,10 +140,11 @@ private void reportIncorrectPatch(final WriteResultAndErrors writeResultAndError
});
}

private Collection<String> acknowledgeSuccessesAndFailures(final WriteResultAndErrors writeResultAndErrors,
final boolean containsIncorrectPatch) {
private static List<String> acknowledgeSuccessesAndFailures(final WriteResultAndErrors writeResultAndErrors,
final boolean containsIncorrectPatch,
@Nullable final ActorRef updaterShard) {
final List<BulkWriteError> errors = writeResultAndErrors.getBulkWriteErrors();
final Collection<String> logEntries = new ArrayList<>(errors.size() + 1);
final List<String> logEntries = new ArrayList<>(errors.size() + 1);
final Collection<Metadata> failedMetadata = new ArrayList<>(errors.size());
logEntries.add(logResult("Acknowledged", writeResultAndErrors, errors.isEmpty(), containsIncorrectPatch));
final BitSet failedIndices = new BitSet(writeResultAndErrors.getWriteModels().size());
Expand All @@ -146,7 +158,7 @@ private Collection<String> acknowledgeSuccessesAndFailures(final WriteResultAndE
// duplicate key error is considered success
}
}
acknowledgeFailures(failedMetadata, writeResultAndErrors.getBulkWriteCorrelationId());
acknowledgeFailures(failedMetadata, writeResultAndErrors.getBulkWriteCorrelationId(), updaterShard);
acknowledgeSuccesses(failedIndices, writeResultAndErrors.getBulkWriteCorrelationId(),
writeResultAndErrors.getWriteModels());

Expand All @@ -164,19 +176,24 @@ private static void acknowledgeSuccesses(final BitSet failedIndices, final Strin
}
}

private void acknowledgeFailures(final Collection<Metadata> metadataList, final String bulkWriteCorrelationId) {
errorsCounter.increment(metadataList.size());
private static void acknowledgeFailures(final Collection<Metadata> metadataList,
final String bulkWriteCorrelationId,
@Nullable final ActorRef updaterShard) {
ERRORS_COUNTER.increment(metadataList.size());
for (final Metadata metadata : metadataList) {
metadata.sendNAck(); // also stops timer even if no acknowledgement is requested
metadata.sendBulkWriteCompleteToOrigin(bulkWriteCorrelationId);
final UpdateThingResponse response = createFailureResponse(metadata, DittoHeaders.empty());
metadata.getOrigin().ifPresentOrElse(
origin -> origin.tell(response, ActorRef.noSender()),
() -> {
final ShardedMessageEnvelope envelope =
ShardedMessageEnvelope.of(response.getEntityId(), response.getType(), response.toJson(),
response.getDittoHeaders());
updaterShard.tell(envelope, ActorRef.noSender());
if (updaterShard != null) {
final ShardedMessageEnvelope envelope =
ShardedMessageEnvelope.of(response.getEntityId(), response.getType(),
response.toJson(),
response.getDittoHeaders());
updaterShard.tell(envelope, ActorRef.noSender());
}
}
);
}
Expand Down Expand Up @@ -223,11 +240,11 @@ private static ConsistencyCheckResult checkForConsistencyError(final WriteResult
// some indexes not within bounds
final var message = String.format("ConsistencyError[indexOutOfBound]: %s", resultAndErrors);

return new ConsistencyCheckResult(ConsistencyStatus.CONSISTENCY_ERROR, message);
return new ConsistencyCheckResult(Status.CONSISTENCY_ERROR, message);
} else if (areUpdatesMissing(resultAndErrors)) {
return new ConsistencyCheckResult(ConsistencyStatus.INCORRECT_PATCH, "");
return new ConsistencyCheckResult(Status.INCORRECT_PATCH, "");
} else {
return new ConsistencyCheckResult(ConsistencyStatus.OK, "");
return new ConsistencyCheckResult(Status.OK, "");
}
}

Expand Down Expand Up @@ -302,18 +319,13 @@ private static String logResult(final String status, final WriteResultAndErrors
}
}

private static final class ConsistencyCheckResult {
private record ConsistencyCheckResult(Status status, String message) {}

private final ConsistencyStatus status;
private final String message;

private ConsistencyCheckResult(final ConsistencyStatus status, final String message) {
this.status = status;
this.message = message;
}
}

private enum ConsistencyStatus {
/**
* Summary of the write result status.
*/
public enum Status {
UNACKNOWLEDGED,
CONSISTENCY_ERROR,
INCORRECT_PATCH,
OK
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public Flow<ThingUpdater.Data, MongoWriteModel, NotUsed> create() {
return computeWriteModel(data.metadata(), thing);
})
// TODO: searchUpdateMapper
.flatMapConcat(writeModel -> writeModel.toIncrementalMongo(data.lastWriteModel().orElse(null))
.flatMapConcat(writeModel -> writeModel.toIncrementalMongo(data.lastWriteModel())
.map(Source::single)
.orElseGet(() -> {
data.metadata().sendWeakAck(null);
Expand Down
Loading

0 comments on commit 1da0076

Please sign in to comment.