Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Luegg committed Mar 14, 2022
1 parent d1b34a1 commit c587609
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 8 deletions.
Expand Up @@ -141,15 +141,15 @@ public void onResponse(SearchResponse response) {
makeCursor(),
() -> client.search(request, this),
delegate,
mightProducePartialPages(getCompositeBuilder(next()))
couldProducePartialPages(getCompositeBuilder(next()))
);
}
});
}

protected Supplier<CompositeAggRowSet> makeRowSet(SearchResponse response) {
CompositeAggregationBuilder aggregation = getCompositeBuilder(nextQuery);
return () -> new CompositeAggRowSet(extractors, mask, response, aggregation.size(), limit, mightProducePartialPages(aggregation));
return () -> new CompositeAggRowSet(extractors, mask, response, aggregation.size(), limit, couldProducePartialPages(aggregation));
}

protected BiFunction<SearchSourceBuilder, CompositeAggRowSet, CompositeAggCursor> makeCursor() {
Expand All @@ -163,15 +163,15 @@ static void handle(
BiFunction<SearchSourceBuilder, CompositeAggRowSet, CompositeAggCursor> makeCursor,
Runnable retry,
ActionListener<Page> listener,
boolean mightProducePartialPages
boolean couldProducePartialPages
) {

if (log.isTraceEnabled()) {
logSearchResponse(response, log);
}

// retry
if (mightProducePartialPages && shouldRetryDueToEmptyPage(response)) {
if (couldProducePartialPages && shouldRetryDueToEmptyPage(response)) {
updateCompositeAfterKey(response, source);
retry.run();
return;
Expand Down Expand Up @@ -208,8 +208,13 @@ static CompositeAggregationBuilder getCompositeBuilder(SearchSourceBuilder sourc
return (CompositeAggregationBuilder) aggregation;
}

static boolean mightProducePartialPages(CompositeAggregationBuilder aggregation) {
return aggregation.getPipelineAggregations().stream().anyMatch(a -> a instanceof BucketSelectorPipelineAggregationBuilder);
static boolean couldProducePartialPages(CompositeAggregationBuilder aggregation) {
for (var agg : aggregation.getPipelineAggregations()) {
if (agg instanceof BucketSelectorPipelineAggregationBuilder) {
return true;
}
}
return false;
}

static CompositeAggregation getComposite(SearchResponse response) {
Expand Down
Expand Up @@ -74,7 +74,7 @@ protected Supplier<CompositeAggRowSet> makeRowSet(SearchResponse response) {
aggregation.size(),
limit(),
previousKey,
mightProducePartialPages(aggregation)
couldProducePartialPages(aggregation)
);
}

Expand Down
Expand Up @@ -454,7 +454,7 @@ static class CompositeActionListener extends BaseAggActionListener {
@Override
protected void handleResponse(SearchResponse response, ActionListener<Page> listener) {
CompositeAggregationBuilder aggregation = CompositeAggCursor.getCompositeBuilder(request.source());
boolean mightProducePartialPages = CompositeAggCursor.mightProducePartialPages(aggregation);
boolean mightProducePartialPages = CompositeAggCursor.couldProducePartialPages(aggregation);

Supplier<CompositeAggRowSet> makeRowSet = isPivot
? () -> new PivotRowSet(
Expand Down

0 comments on commit c587609

Please sign in to comment.