Skip to content

Commit

Permalink
Name BulkItemResponse ctors (backport of #76439) (#76460)
Browse files Browse the repository at this point in the history
`BulkItemResponse` can contain either a success or failure. This
replaces the two constructors used to build either case with named
static methods. So instead of
```
return new BulkItemResponse(0, OpType.CREATE, createResponse);
return new BulkItemResponse(0, OpType.CREATE, failure);
```
you now use
```
return BulkItemResponse.success(0, OpType.CREATE, createResponse);
return BulkItemResponse.failure(0, OpType.CREATE, failure);
```

This makes it marginally easier to read code building these things - you
don't have to know the type of the parameter to know if its a failure
or success.
  • Loading branch information
nik9000 committed Aug 12, 2021
1 parent 9b6084b commit ecf5317
Show file tree
Hide file tree
Showing 28 changed files with 157 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
}

private static class BulkRestBuilderListener extends RestBuilderListener<BulkRequest> {
private final BulkItemResponse ITEM_RESPONSE = new BulkItemResponse(1, DocWriteRequest.OpType.UPDATE,
private final BulkItemResponse ITEM_RESPONSE = BulkItemResponse.success(1, DocWriteRequest.OpType.UPDATE,
new UpdateResponse(new ShardId("mock", "", 1), "mock_type", "1", 0L, 1L, 1L, DocWriteResponse.Result.CREATED));

private final RestRequest request;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.elasticsearch.transport.TransportService;

public class TransportNoopBulkAction extends HandledTransportAction<BulkRequest, BulkResponse> {
private static final BulkItemResponse ITEM_RESPONSE = new BulkItemResponse(1, DocWriteRequest.OpType.UPDATE,
private static final BulkItemResponse ITEM_RESPONSE = BulkItemResponse.success(1, DocWriteRequest.OpType.UPDATE,
new UpdateResponse(new ShardId("mock", "", 1), "mock_type", "1", 0L, 1L, 1L, DocWriteResponse.Result.CREATED));

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public void testBulkResponseSetsLotsOfStatus() throws Exception {
ShardId shardId = new ShardId(new Index("name", "uid"), 0);
if (rarely()) {
versionConflicts++;
responses[i] = new BulkItemResponse(i, randomFrom(DocWriteRequest.OpType.values()),
responses[i] = BulkItemResponse.failure(i, randomFrom(DocWriteRequest.OpType.values()),
new Failure(shardId.getIndexName(), "type", "id" + i,
new VersionConflictEngineException(shardId, "id", "test")));
continue;
Expand Down Expand Up @@ -310,7 +310,7 @@ public void testBulkResponseSetsLotsOfStatus() throws Exception {
final int primaryTerm = randomIntBetween(1, 16);
final IndexResponse response =
new IndexResponse(shardId, "type", "id" + i, seqNo, primaryTerm, randomInt(), createdResponse);
responses[i] = new BulkItemResponse(i, opType, response);
responses[i] = BulkItemResponse.success(i, opType, response);
}
assertExactlyOnce(onSuccess ->
new DummyAsyncBulkByScrollAction().onBulkResponse(new BulkResponse(responses, 0),
Expand Down Expand Up @@ -392,8 +392,10 @@ public void testSearchTimeoutsAbortRequest() throws Exception {
public void testBulkFailuresAbortRequest() throws Exception {
Failure failure = new Failure("index", "type", "id", new RuntimeException("test"));
DummyAsyncBulkByScrollAction action = new DummyAsyncBulkByScrollAction();
BulkResponse bulkResponse = new BulkResponse(new BulkItemResponse[]
{new BulkItemResponse(0, DocWriteRequest.OpType.CREATE, failure)}, randomLong());
BulkResponse bulkResponse = new BulkResponse(
new BulkItemResponse[] { BulkItemResponse.failure(0, DocWriteRequest.OpType.CREATE, failure) },
randomLong()
);
action.onBulkResponse(bulkResponse, Assert::fail);
BulkByScrollResponse response = listener.get();
assertThat(response.getBulkFailures(), contains(failure));
Expand Down Expand Up @@ -979,10 +981,10 @@ void doExecute(ActionType<Response> action, Request request, ActionListener<Resp
throw new RuntimeException("Unknown request: " + item);
}
if (i == toReject) {
responses[i] = new BulkItemResponse(i, item.opType(),
responses[i] = BulkItemResponse.failure(i, item.opType(),
new Failure(response.getIndex(), response.getType(), response.getId(), new EsRejectedExecutionException()));
} else {
responses[i] = new BulkItemResponse(i, item.opType(), response);
responses[i] = BulkItemResponse.success(i, item.opType(), response);
}
}
listener.onResponse((Response) new BulkResponse(responses, 1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void abort(String index, Exception cause) {
if (primaryResponse == null) {
final BulkItemResponse.Failure failure = new BulkItemResponse.Failure(index, request.type(), request.id(),
Objects.requireNonNull(cause), true);
setPrimaryResponse(new BulkItemResponse(id, request.opType(), failure));
setPrimaryResponse(BulkItemResponse.failure(id, request.opType(), failure));
} else {
assert primaryResponse.isFailed() && primaryResponse.getFailure().isAborted()
: "response [" + Strings.toString(primaryResponse) + "]; cause [" + cause + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ public static BulkItemResponse fromXContent(XContentParser parser, int id) throw
BulkItemResponse bulkItemResponse;
if (exception != null) {
Failure failure = new Failure(builder.getShardId().getIndexName(), builder.getType(), builder.getId(), exception, status);
bulkItemResponse = new BulkItemResponse(id, opType, failure);
bulkItemResponse = BulkItemResponse.failure(id, opType, failure);
} else {
bulkItemResponse = new BulkItemResponse(id, opType, builder.build());
bulkItemResponse = BulkItemResponse.success(id, opType, builder.build());
}
return bulkItemResponse;
}
Expand Down Expand Up @@ -346,66 +346,48 @@ public String toString() {
}
}

private int id;
private final int id;

private OpType opType;
private final OpType opType;

private DocWriteResponse response;
private final DocWriteResponse response;

private Failure failure;

BulkItemResponse() {}
private final Failure failure;

BulkItemResponse(ShardId shardId, StreamInput in) throws IOException {
id = in.readVInt();
opType = OpType.fromId(in.readByte());

byte type = in.readByte();
if (type == 0) {
response = new IndexResponse(shardId, in);
} else if (type == 1) {
response = new DeleteResponse(shardId, in);
} else if (type == 3) { // make 3 instead of 2, because 2 is already in use for 'no responses'
response = new UpdateResponse(shardId, in);
} else if (type != 2) {
throw new IllegalArgumentException("Unexpected type [" + type + "]");
}

if (in.readBoolean()) {
failure = new Failure(in);
}
response = readResponse(shardId, in);
failure = in.readBoolean() ? new Failure(in) : null;
assertConsistent();
}

BulkItemResponse(StreamInput in) throws IOException {
id = in.readVInt();
opType = OpType.fromId(in.readByte());

byte type = in.readByte();
if (type == 0) {
response = new IndexResponse(in);
} else if (type == 1) {
response = new DeleteResponse(in);
} else if (type == 3) { // make 3 instead of 2, because 2 is already in use for 'no responses'
response = new UpdateResponse(in);
} else if (type != 2) {
throw new IllegalArgumentException("Unexpected type [" + type + "]");
}

if (in.readBoolean()) {
failure = new Failure(in);
}
response = readResponse(in);
failure = in.readBoolean() ? new Failure(in) : null;
assertConsistent();
}

public BulkItemResponse(int id, OpType opType, DocWriteResponse response) {
private BulkItemResponse(int id, OpType opType, DocWriteResponse response, Failure failure) {
this.id = id;
this.response = response;
this.opType = opType;
this.failure = failure;
assertConsistent();
}

public BulkItemResponse(int id, OpType opType, Failure failure) {
this.id = id;
this.opType = opType;
this.failure = failure;
private void assertConsistent() {
assert (response == null) ^ (failure == null) : "only one of response or failure may be set";
}

public static BulkItemResponse success(int id, OpType opType, DocWriteResponse response) {
return new BulkItemResponse(id, opType, response, null);
}

public static BulkItemResponse failure(int id, OpType opType, Failure failure) {
return new BulkItemResponse(id, opType, null, failure);
}

/**
Expand Down Expand Up @@ -543,4 +525,36 @@ private void writeResponseType(StreamOutput out) throws IOException {
throw new IllegalStateException("Unexpected response type found [" + response.getClass() + "]");
}
}

private static DocWriteResponse readResponse(ShardId shardId, StreamInput in) throws IOException {
int type = in.readByte();
switch (type) {
case 0:
return new IndexResponse(shardId, in);
case 1:
return new DeleteResponse(shardId, in);
case 2:
return null;
case 3:
return new UpdateResponse(shardId, in);
default:
throw new IllegalArgumentException("Unexpected type [" + type + "]");
}
}

private static DocWriteResponse readResponse(StreamInput in) throws IOException {
int type = in.readByte();
switch (type) {
case 0:
return new IndexResponse(in);
case 1:
return new DeleteResponse(in);
case 2:
return null;
case 3:
return new UpdateResponse(in);
default:
throw new IllegalArgumentException("Unexpected type [" + type + "]");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void resetForExecutionForRetry() {
/** completes the operation without doing anything on the primary */
public void markOperationAsNoOp(DocWriteResponse response) {
assertInvariants(ItemProcessingState.INITIAL);
executionResult = new BulkItemResponse(getCurrentItem().id(), getCurrentItem().request().opType(), response);
executionResult = BulkItemResponse.success(getCurrentItem().id(), getCurrentItem().request().opType(), response);
currentItemState = ItemProcessingState.EXECUTED;
assertInvariants(ItemProcessingState.EXECUTED);
}
Expand All @@ -226,7 +226,7 @@ public void failOnMappingUpdate(Exception cause) {
assert assertInvariants(ItemProcessingState.WAIT_FOR_MAPPING_UPDATE);
currentItemState = ItemProcessingState.EXECUTED;
final DocWriteRequest<?> docWriteRequest = getCurrentItem().request();
executionResult = new BulkItemResponse(getCurrentItem().id(), docWriteRequest.opType(),
executionResult = BulkItemResponse.failure(getCurrentItem().id(), docWriteRequest.opType(),
// Make sure to use getCurrentItem().index() here, if you use docWriteRequest.index() it will use the
// concrete index instead of an alias if used!
new BulkItemResponse.Failure(getCurrentItem().index(), docWriteRequest.type(), docWriteRequest.id(), cause));
Expand All @@ -253,13 +253,13 @@ public void markOperationAsExecuted(Engine.Result result) {
} else {
throw new AssertionError("unknown result type :" + result.getResultType());
}
executionResult = new BulkItemResponse(current.id(), current.request().opType(), response);
executionResult = BulkItemResponse.success(current.id(), current.request().opType(), response);
// set a blank ShardInfo so we can safely send it to the replicas. We won't use it in the real response though.
executionResult.getResponse().setShardInfo(new ReplicationResponse.ShardInfo());
locationToSync = TransportWriteAction.locationToSync(locationToSync, result.getTranslogLocation());
break;
case FAILURE:
executionResult = new BulkItemResponse(current.id(), docWriteRequest.opType(),
executionResult = BulkItemResponse.failure(current.id(), docWriteRequest.opType(),
// Make sure to use request.index() here, if you
// use docWriteRequest.index() it will use the
// concrete index instead of an alias if used!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.IndexingPressure;
Expand Down Expand Up @@ -376,7 +376,7 @@ void createIndex(String index,
private boolean setResponseFailureIfIndexMatches(AtomicArray<BulkItemResponse> responses, int idx, DocWriteRequest<?> request,
String index, Exception e) {
if (index.equals(request.index())) {
responses.set(idx, new BulkItemResponse(idx, request.opType(), new BulkItemResponse.Failure(request.index(), request.type(),
responses.set(idx, BulkItemResponse.failure(idx, request.opType(), new BulkItemResponse.Failure(request.index(), request.type(),
request.id(), e)));
return true;
}
Expand Down Expand Up @@ -479,7 +479,7 @@ protected void doRun() {
} catch (ElasticsearchParseException | IllegalArgumentException | RoutingMissingException e) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(concreteIndex.getName(), docWriteRequest.type(),
docWriteRequest.id(), e);
BulkItemResponse bulkItemResponse = new BulkItemResponse(i, docWriteRequest.opType(), failure);
BulkItemResponse bulkItemResponse = BulkItemResponse.failure(i, docWriteRequest.opType(), failure);
responses.set(i, bulkItemResponse);
// make sure the request gets never processed again
bulkRequest.requests.set(i, null);
Expand Down Expand Up @@ -526,8 +526,13 @@ public void onFailure(Exception e) {
for (BulkItemRequest request : requests) {
final String indexName = concreteIndices.getConcreteIndex(request.index()).getName();
DocWriteRequest<?> docWriteRequest = request.request();
responses.set(request.id(), new BulkItemResponse(request.id(), docWriteRequest.opType(),
new BulkItemResponse.Failure(indexName, docWriteRequest.type(), docWriteRequest.id(), e)));
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(
indexName,
docWriteRequest.type(),
docWriteRequest.id(),
e
);
responses.set(request.id(), BulkItemResponse.failure(request.id(), docWriteRequest.opType(), failure));
}
if (counter.decrementAndGet() == 0) {
finishHim();
Expand Down Expand Up @@ -624,7 +629,7 @@ private boolean addFailureIfIndexIsUnavailable(DocWriteRequest<?> request, int i
private void addFailure(DocWriteRequest<?> request, int idx, Exception unavailableException) {
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(request.index(), request.type(), request.id(),
unavailableException);
BulkItemResponse bulkItemResponse = new BulkItemResponse(idx, request.opType(), failure);
BulkItemResponse bulkItemResponse = BulkItemResponse.failure(idx, request.opType(), failure);
responses.set(idx, bulkItemResponse);
// make sure the request gets never processed again
bulkRequest.requests.set(idx, null);
Expand Down Expand Up @@ -796,7 +801,7 @@ synchronized void markItemAsDropped(int slot) {
failedSlots.set(slot);
final String id = indexRequest.id() == null ? DROPPED_ITEM_WITH_AUTO_GENERATED_ID : indexRequest.id();
itemResponses.add(
new BulkItemResponse(slot, indexRequest.opType(),
BulkItemResponse.success(slot, indexRequest.opType(),
new UpdateResponse(
new ShardId(indexRequest.index(), IndexMetadata.INDEX_UUID_NA_VALUE, 0),
indexRequest.type(), id, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM,
Expand All @@ -818,7 +823,7 @@ synchronized void markItemAsFailed(int slot, Exception e) {
failedSlots.set(slot);
BulkItemResponse.Failure failure = new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(),
indexRequest.id(), e);
itemResponses.add(new BulkItemResponse(slot, indexRequest.opType(), failure));
itemResponses.add(BulkItemResponse.failure(slot, indexRequest.opType(), failure));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,11 @@ static BulkItemResponse processUpdateResponse(final UpdateRequest updateRequest,
final UpdateHelper.Result translate) {
final BulkItemResponse response;
if (operationResponse.isFailed()) {
response = new BulkItemResponse(operationResponse.getItemId(), DocWriteRequest.OpType.UPDATE, operationResponse.getFailure());
response = BulkItemResponse.failure(
operationResponse.getItemId(),
DocWriteRequest.OpType.UPDATE,
operationResponse.getFailure()
);
} else {
final DocWriteResponse.Result translatedResult = translate.getResponseResult();
final UpdateResponse updateResponse;
Expand Down Expand Up @@ -400,7 +404,7 @@ static BulkItemResponse processUpdateResponse(final UpdateRequest updateRequest,
} else {
throw new IllegalArgumentException("unknown operation type: " + translatedResult);
}
response = new BulkItemResponse(operationResponse.getItemId(), DocWriteRequest.OpType.UPDATE, updateResponse);
response = BulkItemResponse.success(operationResponse.getItemId(), DocWriteRequest.OpType.UPDATE, updateResponse);
}
return response;
}
Expand Down

0 comments on commit ecf5317

Please sign in to comment.