diff --git a/external/storm-elasticsearch/README.md b/external/storm-elasticsearch/README.md
index 4fa10982ea8..24355db80a8 100644
--- a/external/storm-elasticsearch/README.md
+++ b/external/storm-elasticsearch/README.md
@@ -6,37 +6,53 @@
## EsIndexBolt (org.apache.storm.elasticsearch.bolt.EsIndexBolt)
EsIndexBolt streams tuples directly into Elasticsearch. Tuples are indexed in specified index & type combination.
-User should make sure that there are "source", "index","type", and "id" fields declared in preceding bolts or spout.
-"index" and "type" fields are used for identifying target index and type.
+Users should make sure that ```EsTupleMapper``` can extract "source", "index", "type", and "id" from input tuple.
+"index" and "type" are used for identifying target index and type.
"source" is a document in JSON format string that will be indexed in Elasticsearch.
```java
EsConfig esConfig = new EsConfig();
esConfig.setClusterName(clusterName);
esConfig.setNodes(new String[]{"localhost:9300"});
-EsIndexBolt indexBolt = new EsIndexBolt(esConfig);
+EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
+EsIndexBolt indexBolt = new EsIndexBolt(esConfig, tupleMapper);
```
## EsPercolateBolt (org.apache.storm.elasticsearch.bolt.EsPercolateBolt)
EsPercolateBolt streams tuples directly into Elasticsearch. Tuples are used to send percolate request to specified index & type combination.
-User should make sure that there are "source", "index", and "type" fields declared in preceding bolts or spout.
-"index" and "type" fields are used for identifying target index and type.
+User should make sure ```EsTupleMapper``` can extract "source", "index", "type" from input tuple.
+"index" and "type" are used for identifying target index and type.
"source" is a document in JSON format string that will be sent in percolate request to Elasticsearch.
```java
EsConfig esConfig = new EsConfig();
esConfig.setClusterName(clusterName);
esConfig.setNodes(new String[]{"localhost:9300"});
-EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig);
+EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
+EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig, tupleMapper);
```
If there exists non-empty percolate response, EsPercolateBolt will emit tuple with original source and Percolate.Match
for each Percolate.Match in PercolateResponse.
+## EsState (org.apache.storm.elasticsearch.trident.EsState)
+
+Elasticsearch Trident state also follows similar pattern to EsBolts. It takes in EsConfig and EsTupleMapper as an arg.
+
+```code
+ EsConfig esConfig = new EsConfig();
+ esConfig.setClusterName(clusterName);
+ esConfig.setNodes(new String[]{"localhost:9300"});
+ EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
+
+ StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
+ TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
+ ```
+
## EsConfig (org.apache.storm.elasticsearch.common.EsConfig)
-Two bolts above takes in EsConfig as a constructor arg.
+Provided components (Bolt, State) takes in EsConfig as a constructor arg.
```java
EsConfig esConfig = new EsConfig();
@@ -51,20 +67,12 @@ Two bolts above takes in EsConfig as a constructor arg.
|clusterName | Elasticsearch cluster name | String (required) |
|nodes | Elasticsearch nodes in a String array, each element should follow {host}:{port} pattern | String array (required) |
+## EsTupleMapper (org.apache.storm.elasticsearch.common.EsTupleMapper)
-
-## EsState (org.apache.storm.elasticsearch.trident.EsState)
-
-Elasticsearch Trident state also follows similar pattern to EsBolts. It takes in EsConfig as an arg.
-
-```code
- EsConfig esConfig = new EsConfig();
- esConfig.setClusterName(clusterName);
- esConfig.setNodes(new String[]{"localhost:9300"});
-
- StateFactory factory = new EsStateFactory(esConfig);
- TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
- ```
+For storing tuple to Elasticsearch or percolating tuple from Elasticsearch, we need to define which fields are used for.
+Users need to define your own by implementing ```EsTupleMapper```.
+Storm-elasticsearch presents default mapper ```org.apache.storm.elasticsearch.common.DefaultEsTupleMapper```, which extracts its source, index, type, id values from identical fields.
+You can refer implementation of DefaultEsTupleMapper to see how to implement your own.
## Committer Sponsors
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
index 0d5cff85365..36b9d28a988 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsIndexBolt.java
@@ -22,19 +22,24 @@
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import org.apache.storm.elasticsearch.common.EsConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
import java.util.Map;
+/**
+ * Basic bolt for storing tuple to ES document.
+ */
public class EsIndexBolt extends AbstractEsBolt {
+ private final EsTupleMapper tupleMapper;
/**
* EsIndexBolt constructor
* @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig}
+ * @param tupleMapper Tuple to ES document mapper {@link EsTupleMapper}
*/
- public EsIndexBolt(EsConfig esConfig) {
+ public EsIndexBolt(EsConfig esConfig, EsTupleMapper tupleMapper) {
super(esConfig);
+ this.tupleMapper = tupleMapper;
}
@Override
@@ -43,16 +48,16 @@ public void prepare(Map map, TopologyContext topologyContext, OutputCollector ou
}
/**
- * Executes index request for given tuple.
- * @param tuple should contain string values of 4 declared fields: "source", "index", "type", "id"
+ * {@inheritDoc}
+ * Tuple should have relevant fields (source, index, type, id) for tupleMapper to extract ES document.
*/
@Override
public void execute(Tuple tuple) {
try {
- String source = tuple.getStringByField("source");
- String index = tuple.getStringByField("index");
- String type = tuple.getStringByField("type");
- String id = tuple.getStringByField("id");
+ String source = tupleMapper.getSource(tuple);
+ String index = tupleMapper.getIndex(tuple);
+ String type = tupleMapper.getType(tuple);
+ String id = tupleMapper.getId(tuple);
client.prepareIndex(index, type, id).setSource(source).execute().actionGet();
collector.ack(tuple);
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
index 394462edf20..27e5e008d55 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/bolt/EsPercolateBolt.java
@@ -24,21 +24,27 @@
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.action.percolate.PercolateSourceBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Map;
+/**
+ * Basic bolt for retrieve matched percolate queries.
+ */
public class EsPercolateBolt extends AbstractEsBolt {
+ private final EsTupleMapper tupleMapper;
+
/**
* EsPercolateBolt constructor
* @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig}
+ * @param tupleMapper Tuple to ES document mapper {@link EsTupleMapper}
*/
- public EsPercolateBolt(EsConfig esConfig) {
+ public EsPercolateBolt(EsConfig esConfig, EsTupleMapper tupleMapper) {
super(esConfig);
+ this.tupleMapper = tupleMapper;
}
@Override
@@ -47,15 +53,17 @@ public void prepare(Map map, TopologyContext topologyContext, OutputCollector ou
}
/**
- * Executes percolate request for given tuple.
- * @param tuple should contain string values of 3 declared fields: "source", "index", "type"
+ * {@inheritDoc}
+ * Tuple should have relevant fields (source, index, type) for storeMapper to extract ES document.
+ * If there exists non-empty percolate response, EsPercolateBolt will emit tuple with original source
+ * and Percolate.Match for each Percolate.Match in PercolateResponse.
*/
@Override
public void execute(Tuple tuple) {
try {
- String source = tuple.getStringByField("source");
- String index = tuple.getStringByField("index");
- String type = tuple.getStringByField("type");
+ String source = tupleMapper.getSource(tuple);
+ String index = tupleMapper.getIndex(tuple);
+ String type = tupleMapper.getType(tuple);
PercolateResponse response = client.preparePercolate().setIndices(index).setDocumentType(type)
.setPercolateDoc(PercolateSourceBuilder.docBuilder().setDoc(source)).execute().actionGet();
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/DefaultEsTupleMapper.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/DefaultEsTupleMapper.java
new file mode 100644
index 00000000000..fd0bbcc5f90
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/DefaultEsTupleMapper.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.common;
+
+import backtype.storm.tuple.ITuple;
+
+public class DefaultEsTupleMapper implements EsTupleMapper {
+ @Override
+ public String getSource(ITuple tuple) {
+ return tuple.getStringByField("source");
+ }
+
+ @Override
+ public String getIndex(ITuple tuple) {
+ return tuple.getStringByField("index");
+ }
+
+ @Override
+ public String getType(ITuple tuple) {
+ return tuple.getStringByField("type");
+ }
+
+ @Override
+ public String getId(ITuple tuple) {
+ return tuple.getStringByField("id");
+ }
+}
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsTupleMapper.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsTupleMapper.java
new file mode 100644
index 00000000000..f8a66bdb31c
--- /dev/null
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/common/EsTupleMapper.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.elasticsearch.common;
+
+import backtype.storm.tuple.ITuple;
+
+import java.io.Serializable;
+
+/**
+ * TupleMapper defines how to extract source, index, type, and id from tuple for ElasticSearch.
+ */
+public interface EsTupleMapper extends Serializable {
+ /**
+ * Extracts source from tuple.
+ * @param tuple source tuple
+ * @return source
+ */
+ String getSource(ITuple tuple);
+
+ /**
+ * Extracts index from tuple.
+ * @param tuple source tuple
+ * @return index
+ */
+ String getIndex(ITuple tuple);
+
+ /**
+ * Extracts type from tuple.
+ * @param tuple source tuple
+ * @return type
+ */
+ String getType(ITuple tuple);
+
+ /**
+ * Extracts id from tuple.
+ * @param tuple source tuple
+ * @return id
+ */
+ String getId(ITuple tuple);
+}
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
index e804084d0ce..e3865e5ef9a 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsState.java
@@ -20,6 +20,7 @@
import backtype.storm.task.IMetricsContext;
import backtype.storm.topology.FailedException;
import org.apache.storm.elasticsearch.common.EsConfig;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
@@ -37,17 +38,23 @@
import java.util.List;
import java.util.Map;
+/**
+ * Trident State for storing tuple to ES document.
+ */
public class EsState implements State {
private static final Logger LOG = LoggerFactory.getLogger(EsState.class);
private static Client client;
private EsConfig esConfig;
+ private EsTupleMapper tupleMapper;
/**
* EsState constructor
* @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig}
+ * @param tupleMapper Tuple to ES document mapper {@link EsTupleMapper}
*/
- public EsState(EsConfig esConfig) {
+ public EsState(EsConfig esConfig, EsTupleMapper tupleMapper) {
this.esConfig = esConfig;
+ this.tupleMapper = tupleMapper;
}
/**
@@ -98,13 +105,20 @@ public void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int n
}
}
+ /**
+ * Store current state to ElasticSearch.
+ *
+ * @param tuples list of tuples for storing to ES.
+ * Each tuple should have relevant fields (source, index, type, id) for EsState's tupleMapper to extract ES document.
+ * @param collector
+ */
public void updateState(List tuples, TridentCollector collector) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (TridentTuple tuple : tuples) {
- String source = tuple.getStringByField("source");
- String index = tuple.getStringByField("index");
- String type = tuple.getStringByField("type");
- String id = tuple.getStringByField("id");
+ String source = tupleMapper.getSource(tuple);
+ String index = tupleMapper.getIndex(tuple);
+ String type = tupleMapper.getType(tuple);
+ String id = tupleMapper.getId(tuple);
bulkRequest.add(client.prepareIndex(index, type, id).setSource(source));
}
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
index c3a2e6cbd7d..9fdf7c69714 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsStateFactory.java
@@ -19,15 +19,18 @@
import backtype.storm.task.IMetricsContext;
import org.apache.storm.elasticsearch.common.EsConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
import storm.trident.state.State;
import storm.trident.state.StateFactory;
import java.util.Map;
+/**
+ * StateFactory for providing EsState.
+ */
public class EsStateFactory implements StateFactory {
private EsConfig esConfig;
+ private EsTupleMapper tupleMapper;
public EsStateFactory(){
@@ -36,14 +39,16 @@ public EsStateFactory(){
/**
* EsStateFactory constructor
* @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig}
+ * @param tupleMapper Tuple to ES document mapper {@link EsTupleMapper}
*/
- public EsStateFactory(EsConfig esConfig){
+ public EsStateFactory(EsConfig esConfig, EsTupleMapper tupleMapper){
this.esConfig = esConfig;
+ this.tupleMapper = tupleMapper;
}
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
- EsState esState = new EsState(esConfig);
+ EsState esState = new EsState(esConfig, tupleMapper);
esState.prepare(conf, metrics, partitionIndex, numPartitions);
return esState;
}
diff --git a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
index 6fa42f37bf7..935c92eb26a 100644
--- a/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
+++ b/external/storm-elasticsearch/src/main/java/org/apache/storm/elasticsearch/trident/EsUpdater.java
@@ -24,6 +24,10 @@
import java.util.List;
public class EsUpdater extends BaseStateUpdater {
+ /**
+ * {@inheritDoc}
+ * Each tuple should have relevant fields (source, index, type, id) for EsState's tupleMapper to extract ES document.
+ */
@Override
public void updateState(EsState state, List tuples, TridentCollector collector) {
state.updateState(tuples, collector);
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
index ae6b321bff1..0ad549d6ee4 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/AbstractEsBoltTest.java
@@ -31,9 +31,7 @@
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.node.Node;
import org.elasticsearch.node.NodeBuilder;
-import org.junit.After;
import org.junit.AfterClass;
-import org.junit.Before;
import org.junit.BeforeClass;
import java.io.File;
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
index dd4b088b4d0..a4f2db28810 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexBoltTest.java
@@ -20,20 +20,15 @@
import backtype.storm.tuple.Tuple;
import org.apache.storm.elasticsearch.common.EsConfig;
import org.apache.storm.elasticsearch.common.EsTestUtil;
-import org.elasticsearch.action.count.CountRequest;
-import org.elasticsearch.action.count.CountRequestBuilder;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
import org.elasticsearch.action.count.CountResponse;
-import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.junit.Assert;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static org.mockito.Mockito.verify;
public class EsIndexBoltTest extends AbstractEsBoltTest{
- private static final Logger LOG = LoggerFactory.getLogger(EsIndexBoltTest.class);
private EsIndexBolt bolt;
@Test
@@ -43,7 +38,9 @@ public void testEsIndexBolt()
esConfig.setClusterName("test-cluster");
esConfig.setNodes(new String[]{"127.0.0.1:9300"});
- bolt = new EsIndexBolt(esConfig);
+ EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
+
+ bolt = new EsIndexBolt(esConfig, tupleMapper);
bolt.prepare(config, null, collector);
String source = "{\"user\":\"user1\"}";
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
index fc9c17896f0..1f0118b2152 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsIndexTopology.java
@@ -29,6 +29,7 @@
import org.apache.storm.elasticsearch.common.EsConfig;
import org.apache.storm.elasticsearch.common.EsConstants;
import org.apache.storm.elasticsearch.common.EsTestUtil;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
import java.util.Map;
import java.util.UUID;
@@ -49,7 +50,8 @@ public static void main(String[] args) throws Exception {
EsConfig esConfig = new EsConfig();
esConfig.setClusterName(EsConstants.clusterName);
esConfig.setNodes(new String[]{"localhost:9300"});
- builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig), 1).shuffleGrouping(SPOUT_ID);
+ EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
+ builder.setBolt(BOLT_ID, new EsIndexBolt(esConfig, tupleMapper), 1).shuffleGrouping(SPOUT_ID);
EsTestUtil.startEsNode();
EsTestUtil.waitForSeconds(5);
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
index fd4fa4f6abc..da80204b2ca 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/bolt/EsPercolateBoltTest.java
@@ -21,19 +21,14 @@
import backtype.storm.tuple.Values;
import org.apache.storm.elasticsearch.common.EsConfig;
import org.apache.storm.elasticsearch.common.EsTestUtil;
-import org.elasticsearch.action.count.CountResponse;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
import org.elasticsearch.action.percolate.PercolateResponse;
-import org.elasticsearch.index.query.TermQueryBuilder;
-import org.junit.Assert;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import static org.mockito.Mockito.verify;
import static org.mockito.Matchers.any;
public class EsPercolateBoltTest extends AbstractEsBoltTest {
- private static final Logger LOG = LoggerFactory.getLogger(EsIndexBoltTest.class);
private EsPercolateBolt bolt;
@Test
@@ -42,7 +37,8 @@ public void testEsPercolateBolt()
EsConfig esConfig = new EsConfig();
esConfig.setClusterName("test-cluster");
esConfig.setNodes(new String[]{"localhost:9300"});
- bolt = new EsPercolateBolt(esConfig);
+ EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
+ bolt = new EsPercolateBolt(esConfig, tupleMapper);
bolt.prepare(config, null, collector);
String source = "{\"user\":\"user1\"}";
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
index 3b2038384cc..30e684f2837 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/common/EsTestUtil.java
@@ -21,6 +21,7 @@
import backtype.storm.task.GeneralTopologyContext;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.ITuple;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.TupleImpl;
import backtype.storm.tuple.Values;
@@ -46,6 +47,10 @@ public Fields getComponentOutputFields(String componentId, String streamId) {
return new TupleImpl(topologyContext, new Values(source, index, type, id), 1, "");
}
+ public static EsTupleMapper generateDefaultTupleMapper() {
+ return new DefaultEsTupleMapper();
+ }
+
public static Node startEsNode(){
Node node = NodeBuilder.nodeBuilder().data(true).settings(
ImmutableSettings.builder()
diff --git a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
index 2c951f89a92..ee5e607759b 100644
--- a/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
+++ b/external/storm-elasticsearch/src/test/java/org/apache/storm/elasticsearch/trident/TridentEsTopology.java
@@ -25,6 +25,7 @@
import org.apache.storm.elasticsearch.common.EsConfig;
import org.apache.storm.elasticsearch.common.EsConstants;
import org.apache.storm.elasticsearch.common.EsTestUtil;
+import org.apache.storm.elasticsearch.common.EsTupleMapper;
import storm.trident.Stream;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
@@ -48,8 +49,9 @@ public static void main(String[] args) {
EsConfig esConfig = new EsConfig();
esConfig.setClusterName(EsConstants.clusterName);
esConfig.setNodes(new String[]{"localhost:9300"});
- Fields esFields = new Fields("index", "type", "source");
- StateFactory factory = new EsStateFactory(esConfig);
+ Fields esFields = new Fields("index", "type", "source", "id");
+ EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
+ StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
EsTestUtil.startEsNode();