Skip to content

Commit

Permalink
CR: handle bulk rejections after mapping updates
Browse files Browse the repository at this point in the history
  • Loading branch information
original-brownbear committed Mar 22, 2019
1 parent b47e5f7 commit f3b59c2
Showing 1 changed file with 20 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -158,18 +158,37 @@ public static void performOnPrimary(
protected void doRun() {
while (context.hasMoreOperationsToExecute()) {
if (executeBulkItemRequest(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate,
ActionListener.wrap(v -> executor.execute(this), listener::onFailure)) == false) {
ActionListener.wrap(v -> executor.execute(this), this::onRejection)) == false) {
// We are waiting for a mapping update on another thread, that will invoke this action again once its done
// so we just break out here.
return;
}
assert context.isInitial(); // either completed and moved to next or reset
}
// We're done, there's no more operations to execute so we resolve the wrapped listener
finishRequest();
}

@Override
public void onRejection(Exception e) {
while (context.hasMoreOperationsToExecute()) {
context.setRequestToExecute(context.getCurrent());
final long version = context.getRequestToExecute().version();
final Engine.Result result = context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE
? primary.getFailedDeleteResult(e, version)
: primary.getFailedIndexResult(e, version);
onComplete(result, context, null);
}
// We're done, there's no more operations to execute so we resolve the wrapped listener
finishRequest();
}

private void finishRequest() {
listener.onResponse(
new WritePrimaryResult<>(context.getBulkShardRequest(), context.buildShardResponse(), context.getLocationToSync(),
null, context.getPrimary(), logger));
}

}.doRun();
}

Expand Down

0 comments on commit f3b59c2

Please sign in to comment.