Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public class ElasticsearchConfiguration {

@UriPath @Metadata(required = "true")
private String clusterName;
@UriParam(enums = "INDEX,BULK,BULK_INDEX,GET_BY_ID,DELETE") @Metadata(required = "true")
@UriParam(enums = "INDEX,UPDATE,BULK,BULK_INDEX,GET_BY_ID,DELETE") @Metadata(required = "true")
private String operation;
@UriParam
private String indexName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public interface ElasticsearchConstants {

String PARAM_OPERATION = "operation";
String OPERATION_INDEX = "INDEX";
String OPERATION_UPDATE = "UPDATE";
String OPERATION_BULK = "BULK";
String OPERATION_BULK_INDEX = "BULK_INDEX";
String OPERATION_GET_BY_ID = "GET_BY_ID";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.Client;

/**
Expand Down Expand Up @@ -60,6 +61,8 @@ private String resolveOperation(Exchange exchange) {
return ElasticsearchConstants.OPERATION_INDEX;
} else if (request instanceof GetRequest) {
return ElasticsearchConstants.OPERATION_GET_BY_ID;
} else if (request instanceof UpdateRequest) {
return ElasticsearchConstants.OPERATION_UPDATE;
} else if (request instanceof BulkRequest) {
// do we want bulk or bulk_index?
if ("BULK_INDEX".equals(getEndpoint().getConfig().getOperation())) {
Expand Down Expand Up @@ -131,6 +134,9 @@ public void process(Exchange exchange) throws Exception {
if (ElasticsearchConstants.OPERATION_INDEX.equals(operation)) {
IndexRequest indexRequest = message.getBody(IndexRequest.class);
message.setBody(client.index(indexRequest).actionGet().getId());
} else if (ElasticsearchConstants.OPERATION_UPDATE.equals(operation)) {
UpdateRequest updateRequest = message.getBody(UpdateRequest.class);
message.setBody(client.update(updateRequest).actionGet().getId());
} else if (ElasticsearchConstants.OPERATION_GET_BY_ID.equals(operation)) {
GetRequest getRequest = message.getBody(GetRequest.class);
message.setBody(client.get(getRequest));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.replication.ReplicationType;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.common.xcontent.XContentBuilder;

@Converter
Expand All @@ -37,6 +38,38 @@ public final class ElasticsearchActionRequestConverter {
private ElasticsearchActionRequestConverter() {
}

// Update requests
private static UpdateRequest createUpdateRequest(Object document, Exchange exchange) {
UpdateRequest updateRequest = new UpdateRequest();
if (document instanceof byte[]) {
updateRequest.doc((byte[]) document);
} else if (document instanceof Map) {
updateRequest.doc((Map<String, Object>) document);
} else if (document instanceof String) {
updateRequest.doc((String) document);
} else if (document instanceof XContentBuilder) {
updateRequest.doc((XContentBuilder) document);
} else {
return null;
}

return updateRequest
.consistencyLevel(exchange.getIn().getHeader(
ElasticsearchConstants.PARAM_CONSISTENCY_LEVEL, WriteConsistencyLevel.class))
.replicationType(exchange.getIn().getHeader(
ElasticsearchConstants.PARAM_REPLICATION_TYPE, ReplicationType.class))
.parent(exchange.getIn().getHeader(
ElasticsearchConstants.PARENT, String.class))
.index(exchange.getIn().getHeader(
ElasticsearchConstants.PARAM_INDEX_NAME, String.class))
.type(exchange.getIn().getHeader(
ElasticsearchConstants.PARAM_INDEX_TYPE, String.class))
.id(exchange.getIn().getHeader(
ElasticsearchConstants.PARAM_INDEX_ID, String.class));
}



// Index requests
private static IndexRequest createIndexRequest(Object document, Exchange exchange) {
IndexRequest indexRequest = new IndexRequest();
Expand Down Expand Up @@ -71,6 +104,12 @@ public static IndexRequest toIndexRequest(Object document, Exchange exchange) {
.id(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_ID, String.class));
}

@Converter
public static UpdateRequest toUpdateRequest(Object document, Exchange exchange) {
return createUpdateRequest(document, exchange)
.id(exchange.getIn().getHeader(ElasticsearchConstants.PARAM_INDEX_ID, String.class));
}

@Converter
public static GetRequest toGetRequest(String id, Exchange exchange) {
return new GetRequest(exchange.getIn().getHeader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
Expand Down Expand Up @@ -93,6 +94,20 @@ public void testIndex() throws Exception {
assertNotNull("indexId should be set", indexId);
}

@Test
public void testUpdate() throws Exception {
Map<String, String> map = createIndexedData();
String indexId = template.requestBody("direct:index", map, String.class);
assertNotNull("indexId should be set", indexId);

Map<String, String> newMap = new HashMap<>();
newMap.put(createPrefix() + "key2", createPrefix() + "value2");
Map<String, Object> headers = new HashMap<>();
headers.put(ElasticsearchConstants.PARAM_INDEX_ID, indexId);
indexId = template.requestBodyAndHeaders("direct:update", newMap, headers, String.class);
assertNotNull("indexId should be set", indexId);
}

@Test
public void testIndexWithReplication() throws Exception {
Map<String, String> map = createIndexedData();
Expand Down Expand Up @@ -202,6 +217,26 @@ public void testIndexWithIDInHeader() throws Exception {
assertEquals("indexId should be equals to the provided id", "123", indexId);
}

@Test
public void testUpdateWithIDInHeader() throws Exception {
Map<String, String> map = createIndexedData();
Map<String, Object> headers = new HashMap<String, Object>();
headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_INDEX);
headers.put(ElasticsearchConstants.PARAM_INDEX_NAME, "twitter");
headers.put(ElasticsearchConstants.PARAM_INDEX_TYPE, "tweet");
headers.put(ElasticsearchConstants.PARAM_INDEX_ID, "123");

String indexId = template.requestBodyAndHeaders("direct:start", map, headers, String.class);
assertNotNull("indexId should be set", indexId);
assertEquals("indexId should be equals to the provided id", "123", indexId);

headers.put(ElasticsearchConstants.PARAM_OPERATION, ElasticsearchConstants.OPERATION_UPDATE);

indexId = template.requestBodyAndHeaders("direct:start", map, headers, String.class);
assertNotNull("indexId should be set", indexId);
assertEquals("indexId should be equals to the provided id", "123", indexId);
}

@Test
@Ignore("need to setup the cluster IP for this test")
public void indexWithIp() throws Exception {
Expand Down Expand Up @@ -317,6 +352,23 @@ public void indexRequestBody() throws Exception {
assertThat(documentId, equalTo(prefix + "testId"));
}

@Test
public void updateRequestBody() throws Exception {
String prefix = createPrefix();

// first index data
IndexRequest indexRequest = new IndexRequest(prefix + "foo", prefix + "bar", prefix + "testId");
indexRequest.source("{\"" + prefix + "content\": \"" + prefix + "hello\"}");
template.requestBody("direct:index", indexRequest, String.class);

// then update
UpdateRequest request = new UpdateRequest(prefix + "foo", prefix + "bar", prefix + "testId");
request.doc("{\"" + prefix + "content2\": \"" + prefix + "hello2\"}");
String documentId = template.requestBody("direct:update", request, String.class);

assertThat(documentId, equalTo(prefix + "testId"));
}

@Test
public void getRequestBody() throws Exception {
String prefix = createPrefix();
Expand Down Expand Up @@ -398,6 +450,7 @@ protected RouteBuilder createRouteBuilder() throws Exception {
public void configure() {
from("direct:start").to("elasticsearch://local");
from("direct:index").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet");
from("direct:update").to("elasticsearch://local?operation=UPDATE&indexName=twitter&indexType=tweet");
from("direct:indexWithReplication").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&replicationType=SYNC");
from("direct:indexWithWriteConsistency").to("elasticsearch://local?operation=INDEX&indexName=twitter&indexType=tweet&consistencyLevel=ONE");
from("direct:get").to("elasticsearch://local?operation=GET_BY_ID&indexName=twitter&indexType=tweet");
Expand Down