Skip to content

Commit

Permalink
AggregationSpout reuses nextFetchDate until no buckets are returned. F…
Browse files Browse the repository at this point in the history
…ixes #429
  • Loading branch information
jnioche committed Apr 19, 2017
1 parent 3860d07 commit 67e483f
Showing 1 changed file with 12 additions and 3 deletions.
Expand Up @@ -69,6 +69,8 @@ public class AggregationSpout extends AbstractSpout implements

private boolean sample = false;

private String lastDate;

@Override
public void open(Map stormConf, TopologyContext context,
SpoutOutputCollector collector) {
Expand All @@ -80,13 +82,15 @@ public void open(Map stormConf, TopologyContext context,
@Override
protected void populateBuffer() {

Date now = new Date();
if (lastDate == null) {
lastDate = String.format(DATEFORMAT, new Date());
}

LOG.info("{} Populating buffer with nextFetchDate <= {}", logIdprefix,
now);
lastDate);

QueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery(
"nextFetchDate").lte(String.format(DATEFORMAT, now));
"nextFetchDate").lte(lastDate);

SearchRequestBuilder srb = client.prepareSearch(indexName)
.setTypes(docType).setSearchType(SearchType.QUERY_THEN_FETCH)
Expand Down Expand Up @@ -216,6 +220,11 @@ public void onResponse(SearchResponse response) {
eventCounter.scope("ES_queries").incrBy(1);
eventCounter.scope("ES_docs").incrBy(numhits);

// change the date only if we don't get any results at all
if (numBuckets == 0) {
lastDate = null;
}

// remove lock
isInESQuery.set(false);
}
Expand Down

0 comments on commit 67e483f

Please sign in to comment.