-
Notifications
You must be signed in to change notification settings - Fork 214
/
BulkWriteResultAckFlow.java
277 lines (246 loc) · 12.5 KB
/
BulkWriteResultAckFlow.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
/*
* Copyright (c) 2020 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.thingsearch.service.persistence.write.streaming;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.time.Duration;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.signals.ShardedMessageEnvelope;
import org.eclipse.ditto.internal.utils.metrics.DittoMetrics;
import org.eclipse.ditto.internal.utils.metrics.instruments.counter.Counter;
import org.eclipse.ditto.thingsearch.api.commands.sudo.UpdateThingResponse;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.AbstractWriteModel;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.Metadata;
import org.eclipse.ditto.thingsearch.service.persistence.write.model.WriteResultAndErrors;
import com.mongodb.ErrorCategory;
import com.mongodb.bulk.BulkWriteError;
import com.mongodb.bulk.BulkWriteResult;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.stream.DelayOverflowStrategy;
import akka.stream.javadsl.Flow;
/**
* Flow that sends acknowledgements to ThingUpdater according to bulk write results.
*/
final class BulkWriteResultAckFlow {
private static final String ERRORS_COUNTER_NAME = "search-index-update-errors";
private static final DittoHeaders INCORRECT_PATCH_HEADERS = DittoHeaders.newBuilder()
.putHeader(SearchUpdaterStream.FORCE_UPDATE_INCORRECT_PATCH, "true")
.build();
private final ActorRef updaterShard;
private final Counter errorsCounter;
private BulkWriteResultAckFlow(final ActorRef updaterShard) {
this.updaterShard = updaterShard;
errorsCounter = DittoMetrics.counter(ERRORS_COUNTER_NAME);
}
static BulkWriteResultAckFlow of(final ActorRef updaterShard) {
return new BulkWriteResultAckFlow(updaterShard);
}
Flow<WriteResultAndErrors, String, NotUsed> start(final Duration delay) {
return getDelayFlow(delay).mapConcat(this::checkBulkWriteResult);
}
private Iterable<String> checkBulkWriteResult(final WriteResultAndErrors writeResultAndErrors) {
if (wasNotAcknowledged(writeResultAndErrors)) {
// All failed.
acknowledgeFailures(getAllMetadata(writeResultAndErrors));
return Collections.singleton(logResult("NotAcknowledged", writeResultAndErrors, false));
} else {
final var consistencyError = checkForConsistencyError(writeResultAndErrors);
switch (consistencyError.status) {
case CONSISTENCY_ERROR:
// write result is not consistent; there is a bug with Ditto or with its environment
acknowledgeFailures(getAllMetadata(writeResultAndErrors));
return Collections.singleton(consistencyError.message);
case INCORRECT_PATCH:
reportIncorrectPatch(writeResultAndErrors);
return getConsistencyOKResult(writeResultAndErrors);
case OK:
default:
return getConsistencyOKResult(writeResultAndErrors);
}
}
}
private Iterable<String> getConsistencyOKResult(final WriteResultAndErrors writeResultAndErrors) {
return acknowledgeSuccessesAndFailures(writeResultAndErrors);
}
private void reportIncorrectPatch(final WriteResultAndErrors writeResultAndErrors) {
// Some patches are not applied due to inconsistent sequence number in the search index.
// It is not possible to identify which patches are not applied; therefore request all patch updates to retry.
writeResultAndErrors.getWriteModels().stream().forEach(model -> {
final var response =
createFailureResponse(model.getMetadata()).setDittoHeaders(INCORRECT_PATCH_HEADERS);
model.getMetadata().getOrigin().ifPresent(updater -> updater.tell(response, ActorRef.noSender()));
});
}
private Collection<String> acknowledgeSuccessesAndFailures(final WriteResultAndErrors writeResultAndErrors) {
final List<BulkWriteError> errors = writeResultAndErrors.getBulkWriteErrors();
final Collection<String> logEntries = new ArrayList<>(errors.size() + 1);
final Collection<Metadata> failedMetadata = new ArrayList<>(errors.size());
logEntries.add(logResult("Acknowledged", writeResultAndErrors, errors.isEmpty()));
final BitSet failedIndices = new BitSet(writeResultAndErrors.getWriteModels().size());
for (final BulkWriteError error : errors) {
final Metadata metadata = writeResultAndErrors.getWriteModels().get(error.getIndex()).getMetadata();
logEntries.add(String.format("UpdateFailed for %s due to %s", metadata, error));
if (error.getCategory() != ErrorCategory.DUPLICATE_KEY) {
failedIndices.set(error.getIndex());
failedMetadata.add(metadata);
// duplicate key error is considered success
}
}
acknowledgeFailures(failedMetadata);
acknowledgeSuccesses(failedIndices, writeResultAndErrors.getWriteModels());
return logEntries;
}
private static void acknowledgeSuccesses(final BitSet failedIndices, final List<AbstractWriteModel> writeModels) {
for (int i = 0; i < writeModels.size(); ++i) {
if (!failedIndices.get(i)) {
writeModels.get(i).getMetadata().sendAck();
}
}
}
private void acknowledgeFailures(final Collection<Metadata> metadataList) {
errorsCounter.increment(metadataList.size());
for (final Metadata metadata : metadataList) {
metadata.sendNAck(); // also stops timer even if no acknowledgement is requested
final UpdateThingResponse response = createFailureResponse(metadata);
metadata.getOrigin().ifPresentOrElse(
origin -> origin.tell(response, ActorRef.noSender()),
() -> {
final ShardedMessageEnvelope envelope =
ShardedMessageEnvelope.of(response.getEntityId(), response.getType(), response.toJson(),
response.getDittoHeaders());
updaterShard.tell(envelope, ActorRef.noSender());
}
);
}
}
private static Flow<WriteResultAndErrors, WriteResultAndErrors, NotUsed> getDelayFlow(final Duration delay) {
if (isPositive(delay)) {
// delay required to delay the first result. delay applied for each buffered batch.
return Flow.<WriteResultAndErrors>create()
.delay(delay, DelayOverflowStrategy.backpressure());
} else {
return Flow.create();
}
}
private static boolean isPositive(final Duration duration) {
return Duration.ZERO.minus(duration).isNegative();
}
private static UpdateThingResponse createFailureResponse(final Metadata metadata) {
return UpdateThingResponse.of(
metadata.getThingId(),
metadata.getThingRevision(),
metadata.getPolicyId().orElse(null),
metadata.getPolicyId().flatMap(policyId -> metadata.getPolicyRevision()).orElse(null),
false,
DittoHeaders.empty()
);
}
private static boolean wasNotAcknowledged(final WriteResultAndErrors writeResultAndErrors) {
return !writeResultAndErrors.getBulkWriteResult().wasAcknowledged();
}
/**
* Check if the bulk write result is consistent with the requested write models.
*
* @param resultAndErrors data structure containing input and output of the bulk write operation.
* @return whether the data is consistent.
*/
private static ConsistencyCheckResult checkForConsistencyError(final WriteResultAndErrors resultAndErrors) {
final int requested = resultAndErrors.getWriteModels().size();
if (!areAllIndexesWithinBounds(resultAndErrors.getBulkWriteErrors(), requested)) {
// some indexes not within bounds
final var message = String.format("ConsistencyError[indexOutOfBound]: %s", resultAndErrors);
return new ConsistencyCheckResult(ConsistencyStatus.CONSISTENCY_ERROR, message);
} else if (areUpdatesMissing(resultAndErrors)) {
return new ConsistencyCheckResult(ConsistencyStatus.INCORRECT_PATCH, "");
} else {
return new ConsistencyCheckResult(ConsistencyStatus.OK, "");
}
}
private static boolean areUpdatesMissing(final WriteResultAndErrors resultAndErrors) {
final var result = resultAndErrors.getBulkWriteResult();
final int writeModelCount = resultAndErrors.getWriteModels().size();
final int matchedCount = result.getMatchedCount();
final int upsertCount = result.getUpserts().size();
return matchedCount + upsertCount < writeModelCount;
}
private static boolean areAllIndexesWithinBounds(final Collection<BulkWriteError> bulkWriteErrors,
final int requested) {
return bulkWriteErrors.stream().mapToInt(BulkWriteError::getIndex).allMatch(i -> 0 <= i && i < requested);
}
private static List<Metadata> getAllMetadata(final WriteResultAndErrors writeResultAndErrors) {
return writeResultAndErrors.getWriteModels()
.stream()
.map(AbstractWriteModel::getMetadata)
.collect(Collectors.toList());
}
private static String logResult(final String status, final WriteResultAndErrors writeResultAndErrors,
final boolean isCompleteSuccess) {
final Optional<Throwable> unexpectedError = writeResultAndErrors.getUnexpectedError();
if (unexpectedError.isPresent()) {
final Throwable error = unexpectedError.get();
final StringWriter stackTraceWriter = new StringWriter();
stackTraceWriter.append(String.format("%s: UnexpectedError[stacktrace=", status));
error.printStackTrace(new PrintWriter(stackTraceWriter));
return stackTraceWriter.append("]").toString();
} else if (isCompleteSuccess) {
final BulkWriteResult bulkWriteResult = writeResultAndErrors.getBulkWriteResult();
return String.format(
"%s: Success[ack=%b,errors=%d,matched=%d,upserts=%d,inserted=%d,modified=%d,deleted=%d]",
status,
bulkWriteResult.wasAcknowledged(),
writeResultAndErrors.getBulkWriteErrors().size(),
bulkWriteResult.getMatchedCount(),
bulkWriteResult.getUpserts().size(),
bulkWriteResult.getInsertedCount(),
bulkWriteResult.getModifiedCount(),
bulkWriteResult.getDeletedCount());
} else {
// partial success
final BulkWriteResult bulkWriteResult = writeResultAndErrors.getBulkWriteResult();
return String.format(
"%s: PartialSuccess[ack=%b,errorCount=%d,matched=%d,upserts=%d,inserted=%d,modified=%d," +
"deleted=%d,errors=%s]",
status,
bulkWriteResult.wasAcknowledged(),
writeResultAndErrors.getBulkWriteErrors().size(),
bulkWriteResult.getMatchedCount(),
bulkWriteResult.getUpserts().size(),
bulkWriteResult.getInsertedCount(),
bulkWriteResult.getModifiedCount(),
bulkWriteResult.getDeletedCount(),
writeResultAndErrors.getBulkWriteErrors()
);
}
}
private static final class ConsistencyCheckResult {
private final ConsistencyStatus status;
private final String message;
private ConsistencyCheckResult(final ConsistencyStatus status, final String message) {
this.status = status;
this.message = message;
}
}
private enum ConsistencyStatus {
CONSISTENCY_ERROR,
INCORRECT_PATCH,
OK
}
}