Permalink
Browse files

ElasticSearchSpout processes results in separate thread; nextTuple an…

…d inQUery moved to abstract class
  • Loading branch information...
jnioche committed Mar 6, 2017
1 parent 4feb344 commit 1b0fb42023165f01a95be5ccbd87943886f4cd8f
@@ -26,6 +26,7 @@
import java.util.Map.Entry;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metric.api.MultiCountMetric;
@@ -35,6 +36,7 @@
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsGroup;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
@@ -105,6 +107,8 @@
private long minDelayBetweenQueries = 2000;
protected AtomicBoolean isInESQuery = new AtomicBoolean(false);
/** Map which holds elements some additional time after the removal. */
public class InProcessMap<K, V> extends HashMap<K, V> {
@@ -240,6 +244,42 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("url", "metadata"));
}
@Override
public void nextTuple() {
// inactive?
if (active == false)
return;
synchronized (buffer) {
// have anything in the buffer?
if (!buffer.isEmpty()) {
Values fields = buffer.remove();
String url = fields.get(0).toString();
beingProcessed.put(url, null);
_collector.emit(fields, url);
eventCounter.scope("emitted").incrBy(1);
return;
}
}
// check that we allowed some time between queries
if (throttleESQueries()) {
// sleep for a bit but not too much in order to give ack/fail a
// chance
Utils.sleep(10);
return;
}
// re-populate the buffer
if (!isInESQuery.get()) {
populateBuffer();
}
}
/** Builds a query and use it retrieve the results from ES **/
protected abstract void populateBuffer();
protected final Metadata fromKeyValues(Map<String, Object> keyValues) {
Map<String, List<String>> mdAsMap = (Map<String, List<String>>) keyValues
.get("metadata");
@@ -21,13 +21,11 @@
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.StringUtils;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
@@ -99,8 +97,6 @@
private int sampleSize = -1;
protected AtomicBoolean isInESQuery = new AtomicBoolean(false);
@Override
public void open(Map stormConf, TopologyContext context,
SpoutOutputCollector collector) {
@@ -124,42 +120,6 @@ public void open(Map stormConf, TopologyContext context,
}
@Override
public void nextTuple() {
// inactive?
if (active == false)
return;
synchronized (buffer) {
// have anything in the buffer?
if (!buffer.isEmpty()) {
Values fields = buffer.remove();
String url = fields.get(0).toString();
beingProcessed.put(url, null);
_collector.emit(fields, url);
eventCounter.scope("emitted").incrBy(1);
return;
}
}
// check that we allowed some time between queries
if (throttleESQueries()) {
// sleep for a bit but not too much in order to give ack/fail a
// chance
Utils.sleep(10);
return;
}
// re-populate the buffer
if (!isInESQuery.get()) {
populateBuffer();
}
}
/** run a query on ES to populate the internal buffer **/
protected void populateBuffer() {
Date now = new Date();
@@ -25,7 +25,7 @@
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
@@ -48,7 +48,8 @@
* have exactly the same number of spout instances as ES shards. Collapses
* results to implement politeness and ensure a good diversity of sources.
**/
public class ElasticSearchSpout extends AbstractSpout {
public class ElasticSearchSpout extends AbstractSpout implements
ActionListener<SearchResponse> {
private static final Logger LOG = LoggerFactory
.getLogger(ElasticSearchSpout.class);
@@ -65,7 +66,8 @@
private Date lastDate;
private int maxSecSinceQueriedDate = -1;
private int maxURLsPerBucket = -1;
// TODO implement this
private int maxURLsPerBucket = 1;
// when using multiple instances - each one is in charge of a specific shard
// useful when sharding based on host or domain to guarantee a good mix of
@@ -98,35 +100,7 @@ public void open(Map stormConf, TopologyContext context,
}
@Override
public void nextTuple() {
// inactive?
if (active == false)
return;
// have anything in the buffer?
if (!buffer.isEmpty()) {
Values fields = buffer.remove();
String url = fields.get(0).toString();
this._collector.emit(fields, url);
eventCounter.scope("emitted").incrBy(1);
return;
}
// check that we allowed some time between queries
if (throttleESQueries()) {
// sleep for a bit but not too much in order to give ack/fail a
// chance
Utils.sleep(10);
return;
}
// re-populate the buffer
populateBuffer();
}
/** run a query on ES to populate the internal buffer **/
private void populateBuffer() {
protected void populateBuffer() {
Date now = new Date();
if (lastDate == null) {
@@ -168,8 +142,23 @@ else if (maxSecSinceQueriedDate != -1) {
CollapseBuilder collapse = new CollapseBuilder(partitionField);
srb.setCollapse(collapse);
// dump query to log
LOG.debug("{} ES query {}", logIdprefix, srb.toString());
timeStartESQuery = System.currentTimeMillis();
SearchResponse response = srb.execute().actionGet();
isInESQuery.set(true);
srb.execute(this);
}
@Override
public void onFailure(Exception e) {
LOG.error("Exception with ES query", e);
isInESQuery.set(false);
}
@Override
public void onResponse(SearchResponse response) {
long end = System.currentTimeMillis();
eventCounter.scope("ES_query_time_msec").incrBy(end - timeStartESQuery);
@@ -191,27 +180,42 @@ else if (maxSecSinceQueriedDate != -1) {
lastStartOffset += numhits;
}
// filter results so that we don't include URLs we are already
// being processed or skip those for which we already have enough
//
for (int i = 0; i < hits.getHits().length; i++) {
Map<String, Object> keyValues = hits.getHits()[i].sourceAsMap();
String url = (String) keyValues.get("url");
// is already being processed - skip it!
if (beingProcessed.containsKey(url)) {
eventCounter.scope("already_being_processed").incrBy(1);
continue;
int alreadyprocessed = 0;
int numBuckets = 0;
synchronized (buffer) {
// filter results so that we don't include URLs we are already
// being processed or skip those for which we already have enough
//
for (int i = 0; i < hits.getHits().length; i++) {
Map<String, Object> keyValues = hits.getHits()[i].sourceAsMap();
String url = (String) keyValues.get("url");
numBuckets++;
// is already being processed - skip it!
if (beingProcessed.containsKey(url)) {
alreadyprocessed++;
eventCounter.scope("already_being_processed").incrBy(1);
continue;
}
Metadata metadata = fromKeyValues(keyValues);
buffer.add(new Values(url, metadata));
}
Metadata metadata = fromKeyValues(keyValues);
buffer.add(new Values(url, metadata));
// Shuffle the URLs so that we don't get blocks of URLs from the
// same
// host or domain
Collections.shuffle((List) buffer);
}
// Shuffle the URLs so that we don't get blocks of URLs from the
// same
// host or domain
Collections.shuffle((List) buffer);
LOG.info(
"{} ES query returned {} hits from {} buckets in {} msec with {} already being processed",
logIdprefix, numhits, numBuckets, end - timeStartESQuery,
alreadyprocessed);
// remove lock
isInESQuery.set(false);
}
}

0 comments on commit 1b0fb42

Please sign in to comment.