diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java index b71b925b95e8f..10699022417ca 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConfiguration.java @@ -43,7 +43,7 @@ public class ElasticsearchConfiguration { @UriPath @Metadata(required = "true") private String clusterName; - @UriParam(enums = "INDEX,BULK,BULK_INDEX,GET_BY_ID,DELETE") @Metadata(required = "true") + @UriParam(enums = "INDEX,UPDATE,BULK,BULK_INDEX,GET_BY_ID,DELETE") @Metadata(required = "true") private String operation; @UriParam private String indexName; diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java index 66388bea15900..a8f79e121e0de 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchConstants.java @@ -24,6 +24,7 @@ public interface ElasticsearchConstants { String PARAM_OPERATION = "operation"; String OPERATION_INDEX = "INDEX"; + String OPERATION_UPDATE = "UPDATE"; String OPERATION_BULK = "BULK"; String OPERATION_BULK_INDEX = "BULK_INDEX"; String OPERATION_GET_BY_ID = "GET_BY_ID"; diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java index 3a1afdb62d1d3..476f7b710b4f5 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/ElasticsearchProducer.java @@ -28,6 +28,7 @@ import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.Client; /** @@ -60,6 +61,8 @@ private String resolveOperation(Exchange exchange) { return ElasticsearchConstants.OPERATION_INDEX; } else if (request instanceof GetRequest) { return ElasticsearchConstants.OPERATION_GET_BY_ID; + } else if (request instanceof UpdateRequest) { + return ElasticsearchConstants.OPERATION_UPDATE; } else if (request instanceof BulkRequest) { // do we want bulk or bulk_index? if ("BULK_INDEX".equals(getEndpoint().getConfig().getOperation())) { @@ -131,6 +134,9 @@ public void process(Exchange exchange) throws Exception { if (ElasticsearchConstants.OPERATION_INDEX.equals(operation)) { IndexRequest indexRequest = message.getBody(IndexRequest.class); message.setBody(client.index(indexRequest).actionGet().getId()); + } else if (ElasticsearchConstants.OPERATION_UPDATE.equals(operation)) { + UpdateRequest updateRequest = message.getBody(UpdateRequest.class); + message.setBody(client.update(updateRequest).actionGet().getId()); } else if (ElasticsearchConstants.OPERATION_GET_BY_ID.equals(operation)) { GetRequest getRequest = message.getBody(GetRequest.class); message.setBody(client.get(getRequest)); diff --git a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java index 3763ff1ec7464..cc6e9bd30b7b5 100644 --- a/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java +++ b/components/camel-elasticsearch/src/main/java/org/apache/camel/component/elasticsearch/converter/ElasticsearchActionRequestConverter.java @@ -29,6 +29,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.replication.ReplicationType; +import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.common.xcontent.XContentBuilder; @Converter @@ -37,6 +38,38 @@ public final class ElasticsearchActionRequestConverter { private ElasticsearchActionRequestConverter() { } + // Update requests + private static UpdateRequest createUpdateRequest(Object document, Exchange exchange) { + UpdateRequest updateRequest = new UpdateRequest(); + if (document instanceof byte[]) { + updateRequest.doc((byte[]) document); + } else if (document instanceof Map) { + updateRequest.doc((Map) document); + } else if (document instanceof String) { + updateRequest.doc((String) document); + } else if (document instanceof XContentBuilder) { + updateRequest.doc((XContentBuilder) document); + } else { + return null; + } + + return updateRequest + .consistencyLevel(exchange.getIn().getHeader( + ElasticsearchConstants.PARAM_CONSISTENCY_LEVEL, WriteConsistencyLevel.class)) + .replicationType(exchange.getIn().getHeader( + ElasticsearchConstants.PARAM_REPLICATION_TYPE, ReplicationType.class)) + .parent(exchange.getIn().getHeader( + ElasticsearchConstants.PARENT, String.class)) + .index(exchange.getIn().getHeader( + ElasticsearchConstants.PARAM_INDEX_NAME, String.class)) + .type(exchange.getIn().getHeader( + ElasticsearchConstants.PARAM_INDEX_TYPE, String.class)) + .id(exchange.getIn().getHeader( + ElasticsearchConstants.PARAM_INDEX_ID, String.class)); + } + + + // Index requests private static IndexRequest createIndexRequest(Object document, Exchange exchange) { IndexRequest indexRequest = new IndexRequest(); @@ -71,6 +104,12 @@ public static IndexRequest toIndexRequest(Object document, Exchange exchange) { .id(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_ID, String.class)); } + @Converter + public static UpdateRequest toUpdateRequest(Object document, Exchange exchange) { + return createUpdateRequest(document, exchange) + .id(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_ID, String.class)); + } + @Converter public static GetRequest toGetRequest(String id, Exchange exchange) { return new GetRequest(exchange.getIn().getHeader( diff --git a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java index 78d31963157db..0006920c64f2c 100644 --- a/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java +++ b/components/camel-elasticsearch/src/test/java/org/apache/camel/component/elasticsearch/ElasticsearchComponentTest.java @@ -31,6 +31,7 @@ import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.update.UpdateRequest; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Ignore; @@ -93,6 +94,20 @@ public void testIndex() throws Exception { assertNotNull("indexId should be set", indexId); } + @Test + public void testUpdate() throws Exception { + Map map = createIndexedData(); + String indexId = template.requestBody("direct:index", map, String.class); + assertNotNull("indexId should be set", indexId); + + Map newMap = new HashMap<>(); + newMap.put(createPrefix() + "key2", createPrefix() + "value2"); + Map headers = new HashMap<>(); + headers.put(ElasticsearchConstants.PARAM_INDEX_ID, indexId); + indexId = template.requestBodyAndHeaders("direct:update", newMap, headers, String.class); + assertNotNull("indexId should be set", indexId); + } + @Test public void testIndexWithReplication() throws Exception { Map map = createIndexedData(); @@ -202,6 +217,26 @@ public void testIndexWithIDInHeader() throws Exception { assertEquals("indexId should be equals to the provided id", "123", indexId); } + @Test + public void testUpdateWithIDInHeader() throws Exception { + Map map = createIndexedData(); + Map headers = new HashMap(); + headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_INDEX); + headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "twitter"); + headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "tweet"); + headers.put(ElasticsearchConstants.PARAM_INDEX_ID, "123"); + + String indexId = template.requestBodyAndHeaders("direct:start", map, headers, String.class); + assertNotNull("indexId should be set", indexId); + assertEquals("indexId should be equals to the provided id", "123", indexId); + + headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_UPDATE); + + indexId = template.requestBodyAndHeaders("direct:start", map, headers, String.class); + assertNotNull("indexId should be set", indexId); + assertEquals("indexId should be equals to the provided id", "123", indexId); + } + @Test @Ignore("need to setup the cluster IP for this test") public void indexWithIp() throws Exception { @@ -317,6 +352,23 @@ public void indexRequestBody() throws Exception { assertThat(documentId, equalTo(prefix + "testId")); } + @Test + public void updateRequestBody() throws Exception { + String prefix = createPrefix(); + + // first index data + IndexRequest indexRequest = new IndexRequest(prefix + "foo", prefix + "bar", prefix + "testId"); + indexRequest.source("{\"" + prefix + "content\": \"" + prefix + "hello\"}"); + template.requestBody("direct:index", indexRequest, String.class); + + // then update + UpdateRequest request = new UpdateRequest(prefix + "foo", prefix + "bar", prefix + "testId"); + request.doc("{\"" + prefix + "content2\": \"" + prefix + "hello2\"}"); + String documentId = template.requestBody("direct:update", request, String.class); + + assertThat(documentId, equalTo(prefix + "testId")); + } + @Test public void getRequestBody() throws Exception { String prefix = createPrefix(); @@ -398,6 +450,7 @@ protected RouteBuilder createRouteBuilder() throws Exception { public void configure() { from("direct:start").to("elasticsearch://local"); from("direct:index").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet"); + from("direct:update").to("elasticsearch://local?operation=UPDATE&indexName=twitter&indexType=tweet"); from("direct:indexWithReplication").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&replicationType=SYNC"); from("direct:indexWithWriteConsistency").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&consistencyLevel=ONE"); from("direct:get").to("elasticsearch://local?operation=GET_BY_ID&indexName=twitter&indexType=tweet");