Skip to content

Commit

Permalink
Added missing metadata fields to upserted documents (parent, routing,…
Browse files Browse the repository at this point in the history
… ttl, timestamp, version and versionType)

Closes #3444
  • Loading branch information
javanna committed Aug 6, 2013
1 parent 88a0e46 commit 636c35d
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 9 deletions.
25 changes: 23 additions & 2 deletions src/main/java/org/elasticsearch/action/bulk/BulkRequest.java
Expand Up @@ -340,9 +340,30 @@ public BulkRequest add(BytesReference data, boolean contentUnsafe, @Nullable Str
.create(true)
.source(data.slice(from, nextMarker - from), contentUnsafe), payload);
} else if ("update".equals(action)) {
internalAdd(new UpdateRequest(index, type, id).routing(routing).parent(parent).retryOnConflict(retryOnConflict)
UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).parent(parent).retryOnConflict(retryOnConflict)
.version(version).versionType(versionType)
.source(data.slice(from, nextMarker - from)), payload);
.source(data.slice(from, nextMarker - from));

IndexRequest upsertRequest = updateRequest.upsertRequest();
if (upsertRequest != null) {
upsertRequest.routing(routing);
upsertRequest.parent(parent); // order is important, set it after routing, so it will set the routing
upsertRequest.timestamp(timestamp);
upsertRequest.ttl(ttl);
upsertRequest.version(version);
upsertRequest.versionType(versionType);
}
IndexRequest doc = updateRequest.doc();
if (doc != null) {
doc.routing(routing);
doc.parent(parent); // order is important, set it after routing, so it will set the routing
doc.timestamp(timestamp);
doc.ttl(ttl);
doc.version(version);
doc.versionType(versionType);
}

internalAdd(updateRequest, payload);
}
// move pointers
from = nextMarker + 1;
Expand Down
Expand Up @@ -4,19 +4,19 @@
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.integration.AbstractSharedClusterTest;
import org.junit.Test;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

/**
*/
Expand All @@ -25,8 +25,6 @@ public class BulkTests extends AbstractSharedClusterTest {

@Test
public void testBulkUpdate_simple() throws Exception {
client().admin().indices().prepareDelete().execute().actionGet();

client().admin().indices().prepareCreate("test")
.setSettings(
ImmutableSettings.settingsBuilder()
Expand Down Expand Up @@ -199,8 +197,6 @@ public void testBulkUpdate_malformedScripts() throws Exception {

@Test
public void testBulkUpdate_largerVolume() throws Exception {
client().admin().indices().prepareDelete().execute().actionGet();

client().admin().indices().prepareCreate("test")
.setSettings(
ImmutableSettings.settingsBuilder()
Expand Down Expand Up @@ -337,4 +333,88 @@ public void testBulkUpdate_largerVolume() throws Exception {
}
}

//Test for https://github.com/elasticsearch/elasticsearch/issues/3444
@Test
public void testBulkUpdateDocAsUpsertWithParent() throws Exception {
client().admin().indices().prepareCreate("test")
.setSettings(
ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 1)
).addMapping("child", "{\"child\": {\"_parent\": {\"type\": \"parent\"}}}")
.execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();

BulkRequestBuilder builder = client().prepareBulk();


byte[] addParent = ("{\"index\" : { \"_index\" : \"test\", \"_type\" : \"parent\", \"_id\" : \"parent1\"}}\n" +
"{\"field1\" : \"value1\"}\n").getBytes("utf-8");

byte[] addChild = ("{ \"update\" : { \"_index\" : \"test\", \"_type\" : \"child\", \"_id\" : \"child1\", \"parent\" : \"parent1\"}}\n" +
"{\"doc\" : { \"field1\" : \"value1\"}, \"doc_as_upsert\" : \"true\"}\n").getBytes("utf-8");

builder.add(addParent, 0, addParent.length, false);
builder.add(addChild, 0, addChild.length, false);

BulkResponse bulkResponse = builder.get();
assertThat(bulkResponse.getItems().length, equalTo(2));
assertThat(bulkResponse.getItems()[0].isFailed(), equalTo(false));
assertThat(bulkResponse.getItems()[1].isFailed(), equalTo(false));

client().admin().indices().prepareRefresh("test").get();

//we check that the _parent field was set on the child document by using the has parent query
SearchResponse searchResponse = client().prepareSearch("test")
.setQuery(QueryBuilders.hasParentQuery("parent", QueryBuilders.matchAllQuery()))
.get();

assertThat(searchResponse.getFailedShards(), equalTo(0));
SearchHit[] hits = searchResponse.getHits().getHits();
assertThat(hits.length, equalTo(1));
assertThat(hits[0].getId(), equalTo("child1"));

}


@Test
public void testBulkUpdateUpsertWithParent() throws Exception {
client().admin().indices().prepareCreate("test")
.setSettings(
ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 1)
).addMapping("child", "{\"child\": {\"_parent\": {\"type\": \"parent\"}}}")
.execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();

BulkRequestBuilder builder = client().prepareBulk();


byte[] addParent = ("{\"index\" : { \"_index\" : \"test\", \"_type\" : \"parent\", \"_id\" : \"parent1\"}}\n" +
"{\"field1\" : \"value1\"}\n").getBytes("utf-8");

byte[] addChild = ("{\"update\" : { \"_id\" : \"child1\", \"_type\" : \"child\", \"_index\" : \"test\", \"parent\" : \"parent1\"} }\n" +
"{ \"script\" : \"ctx._source.field2 = 'value2'\", \"upsert\" : {\"field1\" : \"value1\"}}\n").getBytes("utf-8");

builder.add(addParent, 0, addParent.length, false);
builder.add(addChild, 0, addChild.length, false);

BulkResponse bulkResponse = builder.get();
assertThat(bulkResponse.getItems().length, equalTo(2));
assertThat(bulkResponse.getItems()[0].isFailed(), equalTo(false));
assertThat(bulkResponse.getItems()[1].isFailed(), equalTo(false));

client().admin().indices().prepareRefresh("test").get();

SearchResponse searchResponse = client().prepareSearch("test")
.setQuery(QueryBuilders.hasParentQuery("parent", QueryBuilders.matchAllQuery()))
.get();

assertThat(searchResponse.getFailedShards(), equalTo(0));
SearchHit[] hits = searchResponse.getHits().getHits();
assertThat(hits.length, equalTo(1));
assertThat(hits[0].getId(), equalTo("child1"));

}
}

1 comment on commit 636c35d

@poorvamandar
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Luca!

Please sign in to comment.