Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 28 additions & 20 deletions external/storm-elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,53 @@
## EsIndexBolt (org.apache.storm.elasticsearch.bolt.EsIndexBolt)

EsIndexBolt streams tuples directly into Elasticsearch. Tuples are indexed in specified index & type combination.
User should make sure that there are "source", "index","type", and "id" fields declared in preceding bolts or spout.
"index" and "type" fields are used for identifying target index and type.
Users should make sure that ```EsTupleMapper``` can extract "source", "index", "type", and "id" from input tuple.
"index" and "type" are used for identifying target index and type.
"source" is a document in JSON format string that will be indexed in Elasticsearch.

```java
EsConfig esConfig = new EsConfig();
esConfig.setClusterName(clusterName);
esConfig.setNodes(new String[]{"localhost:9300"});
EsIndexBolt indexBolt = new EsIndexBolt(esConfig);
EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
EsIndexBolt indexBolt = new EsIndexBolt(esConfig, tupleMapper);
```

## EsPercolateBolt (org.apache.storm.elasticsearch.bolt.EsPercolateBolt)

EsPercolateBolt streams tuples directly into Elasticsearch. Tuples are used to send percolate request to specified index & type combination.
User should make sure that there are "source", "index", and "type" fields declared in preceding bolts or spout.
"index" and "type" fields are used for identifying target index and type.
User should make sure ```EsTupleMapper``` can extract "source", "index", "type" from input tuple.
"index" and "type" are used for identifying target index and type.
"source" is a document in JSON format string that will be sent in percolate request to Elasticsearch.

```java
EsConfig esConfig = new EsConfig();
esConfig.setClusterName(clusterName);
esConfig.setNodes(new String[]{"localhost:9300"});
EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig);
EsTupleMapper tupleMapper = new DefaultEsTupleMapper();
EsPercolateBolt percolateBolt = new EsPercolateBolt(esConfig, tupleMapper);
```

If there exists non-empty percolate response, EsPercolateBolt will emit tuple with original source and Percolate.Match
for each Percolate.Match in PercolateResponse.

## EsState (org.apache.storm.elasticsearch.trident.EsState)

Elasticsearch Trident state also follows similar pattern to EsBolts. It takes in EsConfig and EsTupleMapper as an arg.

```code
EsConfig esConfig = new EsConfig();
esConfig.setClusterName(clusterName);
esConfig.setNodes(new String[]{"localhost:9300"});
EsTupleMapper tupleMapper = new DefaultEsTupleMapper();

StateFactory factory = new EsStateFactory(esConfig, tupleMapper);
TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
```

## EsConfig (org.apache.storm.elasticsearch.common.EsConfig)

Two bolts above takes in EsConfig as a constructor arg.
Provided components (Bolt, State) takes in EsConfig as a constructor arg.

```java
EsConfig esConfig = new EsConfig();
Expand All @@ -51,20 +67,12 @@ Two bolts above takes in EsConfig as a constructor arg.
|clusterName | Elasticsearch cluster name | String (required) |
|nodes | Elasticsearch nodes in a String array, each element should follow {host}:{port} pattern | String array (required) |

## EsTupleMapper (org.apache.storm.elasticsearch.common.EsTupleMapper)


## EsState (org.apache.storm.elasticsearch.trident.EsState)

Elasticsearch Trident state also follows similar pattern to EsBolts. It takes in EsConfig as an arg.

```code
EsConfig esConfig = new EsConfig();
esConfig.setClusterName(clusterName);
esConfig.setNodes(new String[]{"localhost:9300"});

StateFactory factory = new EsStateFactory(esConfig);
TridentState state = stream.partitionPersist(factory, esFields, new EsUpdater(), new Fields());
```
For storing tuple to Elasticsearch or percolating tuple from Elasticsearch, we need to define which fields are used for.
Users need to define your own by implementing ```EsTupleMapper```.
Storm-elasticsearch presents default mapper ```org.apache.storm.elasticsearch.common.DefaultEsTupleMapper```, which extracts its source, index, type, id values from identical fields.
You can refer implementation of DefaultEsTupleMapper to see how to implement your own.

## Committer Sponsors

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,24 @@
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import org.apache.storm.elasticsearch.common.EsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.storm.elasticsearch.common.EsTupleMapper;

import java.util.Map;

/**
* Basic bolt for storing tuple to ES document.
*/
public class EsIndexBolt extends AbstractEsBolt {
private final EsTupleMapper tupleMapper;

/**
* EsIndexBolt constructor
* @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig}
* @param tupleMapper Tuple to ES document mapper {@link EsTupleMapper}
*/
public EsIndexBolt(EsConfig esConfig) {
public EsIndexBolt(EsConfig esConfig, EsTupleMapper tupleMapper) {
super(esConfig);
this.tupleMapper = tupleMapper;
}

@Override
Expand All @@ -43,16 +48,16 @@ public void prepare(Map map, TopologyContext topologyContext, OutputCollector ou
}

/**
* Executes index request for given tuple.
* @param tuple should contain string values of 4 declared fields: "source", "index", "type", "id"
* {@inheritDoc}
* Tuple should have relevant fields (source, index, type, id) for tupleMapper to extract ES document.
*/
@Override
public void execute(Tuple tuple) {
try {
String source = tuple.getStringByField("source");
String index = tuple.getStringByField("index");
String type = tuple.getStringByField("type");
String id = tuple.getStringByField("id");
String source = tupleMapper.getSource(tuple);
String index = tupleMapper.getIndex(tuple);
String type = tupleMapper.getType(tuple);
String id = tupleMapper.getId(tuple);

client.prepareIndex(index, type, id).setSource(source).execute().actionGet();
collector.ack(tuple);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,27 @@
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.apache.storm.elasticsearch.common.EsConfig;
import org.apache.storm.elasticsearch.common.EsTupleMapper;
import org.elasticsearch.action.percolate.PercolateResponse;
import org.elasticsearch.action.percolate.PercolateSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trivial, this two line imports could be removed

import java.util.Map;

/**
* Basic bolt for retrieve matched percolate queries.
*/
public class EsPercolateBolt extends AbstractEsBolt {

private final EsTupleMapper tupleMapper;

/**
* EsPercolateBolt constructor
* @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig}
* @param tupleMapper Tuple to ES document mapper {@link EsTupleMapper}
*/
public EsPercolateBolt(EsConfig esConfig) {
public EsPercolateBolt(EsConfig esConfig, EsTupleMapper tupleMapper) {
super(esConfig);
this.tupleMapper = tupleMapper;
}

@Override
Expand All @@ -47,15 +53,17 @@ public void prepare(Map map, TopologyContext topologyContext, OutputCollector ou
}

/**
* Executes percolate request for given tuple.
* @param tuple should contain string values of 3 declared fields: "source", "index", "type"
* {@inheritDoc}
* Tuple should have relevant fields (source, index, type) for storeMapper to extract ES document.<br/>
* If there exists non-empty percolate response, EsPercolateBolt will emit tuple with original source
* and Percolate.Match for each Percolate.Match in PercolateResponse.
*/
@Override
public void execute(Tuple tuple) {
try {
String source = tuple.getStringByField("source");
String index = tuple.getStringByField("index");
String type = tuple.getStringByField("type");
String source = tupleMapper.getSource(tuple);
String index = tupleMapper.getIndex(tuple);
String type = tupleMapper.getType(tuple);

PercolateResponse response = client.preparePercolate().setIndices(index).setDocumentType(type)
.setPercolateDoc(PercolateSourceBuilder.docBuilder().setDoc(source)).execute().actionGet();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.elasticsearch.common;

import backtype.storm.tuple.ITuple;

public class DefaultEsTupleMapper implements EsTupleMapper {
@Override
public String getSource(ITuple tuple) {
return tuple.getStringByField("source");
}

@Override
public String getIndex(ITuple tuple) {
return tuple.getStringByField("index");
}

@Override
public String getType(ITuple tuple) {
return tuple.getStringByField("type");
}

@Override
public String getId(ITuple tuple) {
return tuple.getStringByField("id");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.elasticsearch.common;

import backtype.storm.tuple.ITuple;

import java.io.Serializable;

/**
* TupleMapper defines how to extract source, index, type, and id from tuple for ElasticSearch.
*/
public interface EsTupleMapper extends Serializable {
/**
* Extracts source from tuple.
* @param tuple source tuple
* @return source
*/
String getSource(ITuple tuple);

/**
* Extracts index from tuple.
* @param tuple source tuple
* @return index
*/
String getIndex(ITuple tuple);

/**
* Extracts type from tuple.
* @param tuple source tuple
* @return type
*/
String getType(ITuple tuple);

/**
* Extracts id from tuple.
* @param tuple source tuple
* @return id
*/
String getId(ITuple tuple);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import backtype.storm.task.IMetricsContext;
import backtype.storm.topology.FailedException;
import org.apache.storm.elasticsearch.common.EsConfig;
import org.apache.storm.elasticsearch.common.EsTupleMapper;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.client.Client;
Expand All @@ -37,17 +38,23 @@
import java.util.List;
import java.util.Map;

/**
* Trident State for storing tuple to ES document.
*/
public class EsState implements State {
private static final Logger LOG = LoggerFactory.getLogger(EsState.class);
private static Client client;
private EsConfig esConfig;
private EsTupleMapper tupleMapper;

/**
* EsState constructor
* @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig}
* @param tupleMapper Tuple to ES document mapper {@link EsTupleMapper}
*/
public EsState(EsConfig esConfig) {
public EsState(EsConfig esConfig, EsTupleMapper tupleMapper) {
this.esConfig = esConfig;
this.tupleMapper = tupleMapper;
}

/**
Expand Down Expand Up @@ -98,13 +105,20 @@ public void prepare(Map conf, IMetricsContext metrics, int partitionIndex, int n
}
}

/**
* Store current state to ElasticSearch.
*
* @param tuples list of tuples for storing to ES.
* Each tuple should have relevant fields (source, index, type, id) for EsState's tupleMapper to extract ES document.
* @param collector
*/
public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (TridentTuple tuple : tuples) {
String source = tuple.getStringByField("source");
String index = tuple.getStringByField("index");
String type = tuple.getStringByField("type");
String id = tuple.getStringByField("id");
String source = tupleMapper.getSource(tuple);
String index = tupleMapper.getIndex(tuple);
String type = tupleMapper.getType(tuple);
String id = tupleMapper.getId(tuple);

bulkRequest.add(client.prepareIndex(index, type, id).setSource(source));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@

import backtype.storm.task.IMetricsContext;
import org.apache.storm.elasticsearch.common.EsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.storm.elasticsearch.common.EsTupleMapper;
import storm.trident.state.State;
import storm.trident.state.StateFactory;

import java.util.Map;

/**
* StateFactory for providing EsState.
*/
public class EsStateFactory implements StateFactory {
private EsConfig esConfig;
private EsTupleMapper tupleMapper;

public EsStateFactory(){

Expand All @@ -36,14 +39,16 @@ public EsStateFactory(){
/**
* EsStateFactory constructor
* @param esConfig Elasticsearch configuration containing node addresses and cluster name {@link EsConfig}
* @param tupleMapper Tuple to ES document mapper {@link EsTupleMapper}
*/
public EsStateFactory(EsConfig esConfig){
public EsStateFactory(EsConfig esConfig, EsTupleMapper tupleMapper){
this.esConfig = esConfig;
this.tupleMapper = tupleMapper;
}

@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
EsState esState = new EsState(esConfig);
EsState esState = new EsState(esConfig, tupleMapper);
esState.prepare(conf, metrics, partitionIndex, numPartitions);
return esState;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
import java.util.List;

public class EsUpdater extends BaseStateUpdater<EsState> {
/**
* {@inheritDoc}
* Each tuple should have relevant fields (source, index, type, id) for EsState's tupleMapper to extract ES document.
*/
@Override
public void updateState(EsState state, List<TridentTuple> tuples, TridentCollector collector) {
state.updateState(tuples, collector);
Expand Down
Loading