Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

`if_seq_no` and `if_primary_term` parameters aren't wired correctly in REST Client's CRUD API #38411

Merged
merged 5 commits into from Feb 5, 2019
Merged
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

@@ -191,6 +191,11 @@ static Request bulk(BulkRequest bulkRequest) throws IOException {
}
}

if (action.ifSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
metadata.field("if_seq_no", action.ifSeqNo());
metadata.field("if_primary_term", action.ifPrimaryTerm());
}

if (opType == DocWriteRequest.OpType.INDEX || opType == DocWriteRequest.OpType.CREATE) {
IndexRequest indexRequest = (IndexRequest) action;
if (Strings.hasLength(indexRequest.getPipeline())) {
@@ -104,11 +104,13 @@ public void testDelete() throws IOException {
{
// Testing deletion
String docId = "id";
highLevelClient().index(
IndexResponse indexResponse = highLevelClient().index(
new IndexRequest("index").id(docId).source(Collections.singletonMap("foo", "bar")), RequestOptions.DEFAULT);
assertThat(indexResponse.getSeqNo(), greaterThanOrEqualTo(0L));
DeleteRequest deleteRequest = new DeleteRequest("index", docId);
if (randomBoolean()) {
deleteRequest.version(1L);
deleteRequest.setIfSeqNo(indexResponse.getSeqNo());
deleteRequest.setIfPrimaryTerm(indexResponse.getPrimaryTerm());
}
DeleteResponse deleteResponse = execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync);
assertEquals("index", deleteResponse.getIndex());
@@ -131,7 +133,7 @@ public void testDelete() throws IOException {
String docId = "version_conflict";
highLevelClient().index(
new IndexRequest("index").id( docId).source(Collections.singletonMap("foo", "bar")), RequestOptions.DEFAULT);
DeleteRequest deleteRequest = new DeleteRequest("index", docId).version(2);
DeleteRequest deleteRequest = new DeleteRequest("index", docId).setIfSeqNo(2).setIfPrimaryTerm(2);
ElasticsearchException exception = expectThrows(ElasticsearchException.class,
() -> execute(deleteRequest, highLevelClient()::delete, highLevelClient()::deleteAsync));
assertEquals(RestStatus.CONFLICT, exception.status());
@@ -519,7 +521,7 @@ public void testIndex() throws IOException {
ElasticsearchStatusException exception = expectThrows(ElasticsearchStatusException.class, () -> {
IndexRequest wrongRequest = new IndexRequest("index").id("id");
wrongRequest.source(XContentBuilder.builder(xContentType.xContent()).startObject().field("field", "test").endObject());
wrongRequest.version(5L);
wrongRequest.setIfSeqNo(1L).setIfPrimaryTerm(5L);

execute(wrongRequest, highLevelClient()::index, highLevelClient()::indexAsync);
});
@@ -820,7 +822,8 @@ public void testBulk() throws IOException {
if (opType == DocWriteRequest.OpType.INDEX) {
IndexRequest indexRequest = new IndexRequest("index").id(id).source(source, xContentType);
if (erroneous) {
indexRequest.version(12L);
indexRequest.setIfSeqNo(12L);
indexRequest.setIfPrimaryTerm(12L);
}
bulkRequest.add(indexRequest);

@@ -1130,7 +1133,8 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
if (opType == DocWriteRequest.OpType.INDEX) {
IndexRequest indexRequest = new IndexRequest("index").id(id).source(xContentType, "id", i);
if (erroneous) {
indexRequest.version(12L);
indexRequest.setIfSeqNo(12L);
indexRequest.setIfPrimaryTerm(12L);
}
processor.add(indexRequest);

@@ -892,10 +892,15 @@ public void testBulk() throws IOException {
docWriteRequest.routing(randomAlphaOfLength(10));
}
if (randomBoolean()) {
docWriteRequest.version(randomNonNegativeLong());
}
if (randomBoolean()) {
docWriteRequest.versionType(randomFrom(VersionType.values()));
if (randomBoolean()) {
docWriteRequest.version(randomNonNegativeLong());
}
if (randomBoolean()) {
docWriteRequest.versionType(randomFrom(VersionType.values()));
}
} else if (randomBoolean()) {
docWriteRequest.setIfSeqNo(randomNonNegativeLong());
docWriteRequest.setIfPrimaryTerm(randomLongBetween(1, 200));
}
bulkRequest.add(docWriteRequest);
}
@@ -925,6 +930,8 @@ public void testBulk() throws IOException {
assertEquals(originalRequest.routing(), parsedRequest.routing());
assertEquals(originalRequest.version(), parsedRequest.version());
assertEquals(originalRequest.versionType(), parsedRequest.versionType());
assertEquals(originalRequest.ifSeqNo(), parsedRequest.ifSeqNo());
assertEquals(originalRequest.ifPrimaryTerm(), parsedRequest.ifPrimaryTerm());

DocWriteRequest.OpType opType = originalRequest.opType();
if (opType == DocWriteRequest.OpType.INDEX) {
@@ -0,0 +1,42 @@
---
"Compare And Swap Sequence Numbers":

- skip:
version: " - 6.99.99"
reason: typeless API are add in 7.0.0

- do:
index:
index: test_1
id: 1
body: { foo: bar }
- match: { _version: 1}
- set: { _seq_no: seqno }
- set: { _primary_term: primary_term }

- do:
bulk:
body:
- index:
_index: test_1
_id: 1
if_seq_no: 10000
if_primary_term: $primary_term
- foo: bar2

- match: { errors: true }
- match: { items.0.index.status: 409 }
- match: { items.0.index.error.type: version_conflict_engine_exception }

- do:
bulk:
body:
- index:
_index: test_1
_id: 1
if_seq_no: $seqno
if_primary_term: $primary_term
- foo: bar2

- match: { errors: false}
- match: { items.0.index.status: 200 }
@@ -0,0 +1,45 @@
---
"Compare And Swap Sequence Numbers":

- skip:
version: " - 6.6.99"
reason: cas operations with sequence numbers was added in 6.7

- do:
index:
index: test_1
type: _doc
id: 1
body: { foo: bar }
- match: { _version: 1}
- set: { _seq_no: seqno }
- set: { _primary_term: primary_term }

- do:
bulk:
body:
- index:
_index: test_1
_type: _doc
_id: 1
if_seq_no: 10000
if_primary_term: $primary_term
- foo: bar2

- match: { errors: true }
- match: { items.0.index.status: 409 }
- match: { items.0.index.error.type: version_conflict_engine_exception }

- do:
bulk:
body:
- index:
_index: test_1
_type: _doc
_id: 1
if_seq_no: $seqno
if_primary_term: $primary_term
- foo: bar2

- match: { errors: false}
- match: { items.0.index.status: 200 }
@@ -3,7 +3,7 @@

- skip:
version: " - 6.99.99"
reason: cas ops are introduced in 7.0.0
reason: typesless api was introduces in 7.0

This comment has been minimized.

Copy link
@javanna

javanna Feb 5, 2019

Member

s/typesless/typeless s/introduces/introduced


- do:
index:
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.