Skip to content

Commit

Permalink
Add filter support to search types (Graylog2/graylog-plugin-enterpris…
Browse files Browse the repository at this point in the history
…e#217)

new filter type: query_string

for the ES backend, aggregation based search types will add a top-level filter aggregation with the corresponding expression
message list does not support filters yet, because we would need to differentiate between have a single message list (top level hits)
and multiple ones (filter aggs + top_hist subaggs) and we aren't sure about performance impacts yet.

fixes Graylog2/graylog-plugin-enterprise#170 

An example for a query string filter:

{
	"queries": [
		{
			"id": "1",
			"timerange": {
				"type": "absolute",
				"from": "2018-01-16T12:00:00.000Z",
				"to": "2018-01-16T14:00:00.000Z"
			},
			"query": {
				"type": "elasticsearch",
				"query_string": "*"
			},
			"search_types": [
				{
					"type": "aggregation",
					"id": "vals",
					"filter": {
						"type":"query_string",
						"query":"nf_version:9"
					},
					"groups": [
						{
							"type": "values",
							"field": "nf_version",
							"metrics": [
								{
									"type": "count"
								}
							]
						}
					]
				}
			]
		}
	]
}
  • Loading branch information
kroepke authored and dennisoelkers committed Jun 11, 2019
1 parent 7bfbd26 commit c1254af
Show file tree
Hide file tree
Showing 19 changed files with 243 additions and 44 deletions.
Expand Up @@ -7,11 +7,12 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import javax.annotation.Nullable;
import java.util.Objects;

/**
* A search type represents parts of a query that generates a {@see Result result}.
*
* <p>
* Plain queries only select a set of data but by themselves do not return any specific parts from it.
* Typical search types are aggregations across fields, a list of messages and other metadata.
*/
Expand All @@ -30,10 +31,30 @@ public interface SearchType {
@JsonProperty("id")
String id();

@Nullable
@JsonProperty("filter")
Filter filter();

SearchType withId(String id);

SearchType applyExecutionContext(ObjectMapper objectMapper, JsonNode state);

/**
* Each search type should declare an implementation of its result conforming to this interface.
* <p>
* The frontend components then make use of the structured data to display it.
*/
interface Result {
@JsonProperty("id")
String id();

/**
* The json type info property of the surrounding SearchType class. Must be set manually by subclasses.
*/
@JsonProperty("type")
String type();
}

@JsonAutoDetect
class Fallback implements SearchType {

Expand All @@ -43,6 +64,10 @@ class Fallback implements SearchType {
@JsonProperty
private String id;

@Nullable
@JsonProperty
private Filter filter;

@Override
public String type() {
return type;
Expand All @@ -53,6 +78,11 @@ public String id() {
return id;
}

@Override
public Filter filter() {
return filter;
}

@Override
public SearchType withId(String id) {
this.id = id;
Expand Down Expand Up @@ -87,20 +117,4 @@ public int hashCode() {
return Objects.hash(type, id);
}
}

/**
* Each search type should declare an implementation of its result conforming to this interface.
*
* The frontend components then make use of the structured data to display it.
*/
interface Result {
@JsonProperty("id")
String id();

/**
* The json type info property of the surrounding SearchType class. Must be set manually by subclasses.
*/
@JsonProperty("type")
String type();
}
}
Expand Up @@ -3,25 +3,35 @@
import com.google.common.base.MoreObjects;
import com.google.common.collect.Maps;
import io.searchbox.core.search.aggregation.Aggregation;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.filter.FilterAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.graylog.plugins.enterprise.search.Filter;
import org.graylog.plugins.enterprise.search.SearchType;
import org.graylog.plugins.enterprise.search.engine.GeneratedQueryContext;
import org.graylog.plugins.enterprise.search.searchtypes.aggregation.AggregationSpec;
import org.graylog.plugins.enterprise.search.util.UniqueNamer;
import org.jooq.lambda.tuple.Tuple;
import org.jooq.lambda.tuple.Tuple2;

import java.util.Collection;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Optional;

public class ESGeneratedQueryContext implements GeneratedQueryContext {

private final ElasticsearchBackend elasticsearchBackend;
private SearchSourceBuilder ssb;
// do _NOT_ turn this into a regular hashmap!
private IdentityHashMap<AggregationSpec, Tuple2<String, Class<? extends Aggregation>>> aggResultTypes = Maps.newIdentityHashMap();
private Map<Object, Object> contextMap = Maps.newHashMap();
private final UniqueNamer uniqueNamer = new UniqueNamer("agg-");

public ESGeneratedQueryContext(SearchSourceBuilder ssb) {
public ESGeneratedQueryContext(ElasticsearchBackend elasticsearchBackend, SearchSourceBuilder ssb) {
this.elasticsearchBackend = elasticsearchBackend;
this.ssb = ssb;
}

Expand Down Expand Up @@ -53,7 +63,27 @@ public String nextName() {
return uniqueNamer.nextName();
}

public String currentName() {
return uniqueNamer.currentName();
public Optional<QueryBuilder> generateFilterClause(Filter filter) {
return elasticsearchBackend.generateFilterClause(filter);
}

public void addFilteredAggregation(AggregationBuilder builder, SearchType searchType) {
final Optional<QueryBuilder> filterClause = generateFilterClause(searchType.filter());
if (filterClause.isPresent()) {
builder = AggregationBuilders.filter("filtered-" + searchType.id(), filterClause.get())
.subAggregation(builder);
}
ssb.aggregation(builder);
}

public void addFilteredAggregations(Collection<AggregationBuilder> builders, SearchType searchType) {
final Optional<QueryBuilder> filterClause = generateFilterClause(searchType.filter());
if (filterClause.isPresent()) {
final FilterAggregationBuilder filter = AggregationBuilders.filter("filtered-" + searchType.id(), filterClause.get());
builders.forEach(filter::subAggregation);
ssb.aggregation(filter);
} else {
builders.forEach(ssb::aggregation);
}
}
}
Expand Up @@ -3,7 +3,9 @@
import io.searchbox.core.SearchResult;
import io.searchbox.core.search.aggregation.Bucket;
import io.searchbox.core.search.aggregation.DateHistogramAggregation;
import io.searchbox.core.search.aggregation.MetricAggregation;
import one.util.streamex.StreamEx;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.graylog.plugins.enterprise.search.Query;
import org.graylog.plugins.enterprise.search.SearchJob;
Expand All @@ -20,16 +22,16 @@
public class ESDateHistogram implements ESSearchTypeHandler<DateHistogram> {
@Override
public void doGenerateQueryPart(SearchJob job, Query query, DateHistogram dateHistogram, ESGeneratedQueryContext queryContext) {
queryContext.searchSourceBuilder().aggregation(
AggregationBuilders.dateHistogram(dateHistogram.id())
.field(Message.FIELD_TIMESTAMP)
.dateHistogramInterval(dateHistogram.interval().toESInterval()));
AggregationBuilder builder = AggregationBuilders.dateHistogram(dateHistogram.id())
.field(Message.FIELD_TIMESTAMP)
.dateHistogramInterval(dateHistogram.interval().toESInterval());
queryContext.addFilteredAggregation(builder, dateHistogram);
}

@SuppressWarnings("unchecked")
@Override
public SearchType.Result doExtractResult(SearchJob job, Query query, DateHistogram searchType, SearchResult result, ESGeneratedQueryContext queryContext) {
final DateHistogramAggregation dateHistogramAggregation = result.getAggregations().getDateHistogramAggregation(searchType.id());
public SearchType.Result doExtractResult(SearchJob job, Query query, DateHistogram searchType, SearchResult result, MetricAggregation aggregations, ESGeneratedQueryContext queryContext) {
final DateHistogramAggregation dateHistogramAggregation = aggregations.getDateHistogramAggregation(searchType.id());
final Map<Long, Long> buckets = StreamEx.of(dateHistogramAggregation.getBuckets())
.mapToEntry(bucket -> new DateTime(bucket.getKey()).getMillis() / 1000L,
Bucket::getCount)
Expand Down
Expand Up @@ -42,8 +42,7 @@ public void doGenerateQueryPart(SearchJob job, Query query, FieldMetric fieldMet
}

@Override
public SearchType.Result doExtractResult(SearchJob job, Query query, FieldMetric fieldMetric, SearchResult queryResult, ESGeneratedQueryContext queryContext) {
final MetricAggregation aggregations = queryResult.getAggregations();
public SearchType.Result doExtractResult(SearchJob job, Query query, FieldMetric fieldMetric, SearchResult queryResult, MetricAggregation aggregations, ESGeneratedQueryContext queryContext) {
final String id = fieldMetric.id();
final SearchType.Result result;

Expand Down
Expand Up @@ -4,6 +4,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import io.searchbox.core.SearchResult;
import io.searchbox.core.search.aggregation.MetricAggregation;
import io.searchbox.core.search.aggregation.TermsAggregation;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
Expand Down Expand Up @@ -80,8 +81,8 @@ public void doGenerateQueryPart(SearchJob job, Query query, GroupBy groupBy, ESG
}

@Override
public SearchType.Result doExtractResult(SearchJob job, Query query, GroupBy groupBy, SearchResult queryResult, ESGeneratedQueryContext queryContext) {
final TermsAggregation termsAggregation = queryResult.getAggregations()
public SearchType.Result doExtractResult(SearchJob job, Query query, GroupBy groupBy, SearchResult queryResult, MetricAggregation aggregations, ESGeneratedQueryContext queryContext) {
final TermsAggregation termsAggregation = aggregations
.getFilterAggregation(filterAggName(groupBy))
.getTermsAggregation(termsAggName(groupBy));

Expand Down
Expand Up @@ -3,6 +3,7 @@
import com.google.common.collect.ImmutableMap;
import io.searchbox.core.SearchResult;
import io.searchbox.core.search.aggregation.DateHistogramAggregation;
import io.searchbox.core.search.aggregation.MetricAggregation;
import io.searchbox.core.search.aggregation.TermsAggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
Expand Down Expand Up @@ -51,8 +52,8 @@ public void doGenerateQueryPart(SearchJob job, Query query, GroupByHistogram gro
}

@Override
public SearchType.Result doExtractResult(SearchJob job, Query query, GroupByHistogram groupByHistogram, SearchResult queryResult, ESGeneratedQueryContext queryContext) {
final DateHistogramAggregation aggregation = queryResult.getAggregations().getDateHistogramAggregation(histogramAggName(groupByHistogram));
public SearchType.Result doExtractResult(SearchJob job, Query query, GroupByHistogram groupByHistogram, SearchResult queryResult, MetricAggregation aggregations, ESGeneratedQueryContext queryContext) {
final DateHistogramAggregation aggregation = aggregations.getDateHistogramAggregation(histogramAggName(groupByHistogram));
final GroupBy groupBy = createGroupBy(groupByHistogram);
final ESGroupBy esGroupBy = new ESGroupBy();
final ImmutableMap.Builder<Long, GroupByHistogram.Bucket> buckets = ImmutableMap.builder();
Expand Down
@@ -1,6 +1,7 @@
package org.graylog.plugins.enterprise.search.elasticsearch.searchtypes;

import io.searchbox.core.SearchResult;
import io.searchbox.core.search.aggregation.MetricAggregation;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.graylog.plugins.enterprise.search.Query;
Expand All @@ -12,14 +13,21 @@
import org.graylog2.indexer.results.ResultMessage;
import org.graylog2.plugin.Message;
import org.graylog2.rest.models.messages.responses.ResultMessageSummary;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class ESMessageList implements ESSearchTypeHandler<MessageList> {
private static final Logger LOG = LoggerFactory.getLogger(ESMessageList.class);

@Override
public void doGenerateQueryPart(SearchJob job, Query query, MessageList messageList, ESGeneratedQueryContext queryContext) {
if (messageList.filter() != null) {
LOG.warn("Search type messages currently does not support filters yet.");
}
final SearchSourceBuilder searchSourceBuilder = queryContext.searchSourceBuilder();
searchSourceBuilder
.size(messageList.limit() - messageList.offset())
Expand All @@ -35,7 +43,7 @@ public void doGenerateQueryPart(SearchJob job, Query query, MessageList messageL
}

@Override
public SearchType.Result doExtractResult(SearchJob job, Query query, MessageList searchType, SearchResult result, ESGeneratedQueryContext queryContext) {
public SearchType.Result doExtractResult(SearchJob job, Query query, MessageList searchType, SearchResult result, MetricAggregation aggregations, ESGeneratedQueryContext queryContext) {
//noinspection unchecked
final List<ResultMessageSummary> messages = result.getHits(Map.class, false).stream()
.map(hit -> ResultMessage.parseFromSource(hit.id, hit.index, (Map<String, Object>) hit.source, hit.highlight))
Expand Down
@@ -1,6 +1,10 @@
package org.graylog.plugins.enterprise.search.elasticsearch.searchtypes;

import io.searchbox.core.SearchResult;
import io.searchbox.core.search.aggregation.FilterAggregation;
import io.searchbox.core.search.aggregation.MetricAggregation;
import org.graylog.plugins.enterprise.search.Query;
import org.graylog.plugins.enterprise.search.SearchJob;
import org.graylog.plugins.enterprise.search.SearchType;
import org.graylog.plugins.enterprise.search.elasticsearch.ESGeneratedQueryContext;
import org.graylog.plugins.enterprise.search.engine.SearchTypeHandler;
Expand All @@ -12,4 +16,16 @@
* @param <S> the {@link org.graylog.plugins.enterprise.search.SearchType SearchType} this handler deals with
*/
public interface ESSearchTypeHandler<S extends SearchType> extends SearchTypeHandler<S, ESGeneratedQueryContext, SearchResult> {
@Override
default SearchType.Result doExtractResultImpl(SearchJob job, Query query, S searchType, SearchResult queryResult, ESGeneratedQueryContext queryContext) {
// if the search type was filtered, extract the sub-aggregation before passing it to the handler
// this way we don't have to duplicate this step everywhere
MetricAggregation aggregations = queryResult.getAggregations();
if (searchType.filter() != null) {
aggregations = queryResult.getAggregations().getAggregation("filtered-" + searchType.id(), FilterAggregation.class);
}
return doExtractResult(job, query, searchType, queryResult, aggregations, queryContext);
}

SearchType.Result doExtractResult(SearchJob job, Query query, S searchType, SearchResult queryResult, MetricAggregation aggregations, ESGeneratedQueryContext queryContext);
}
@@ -1,7 +1,9 @@
package org.graylog.plugins.enterprise.search.elasticsearch.searchtypes.aggregation;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import io.searchbox.core.SearchResult;
import io.searchbox.core.search.aggregation.MetricAggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.graylog.plugins.enterprise.search.Query;
Expand Down Expand Up @@ -41,14 +43,16 @@ public void doGenerateQueryPart(SearchJob job, Query query, Aggregation searchTy
// thus we ask the spec handlers to consume as much, or as little of the tree as they need to and they will call into
// our generation methods as needed
// the top level aggregations we'll ask directly to generate themselves
List<AggregationBuilder> builders = Lists.newArrayList();
searchType.aggregations().forEach(aggSpec -> {
final Optional<AggregationBuilder> aggregation = handlerForType(aggSpec.type())
.createAggregation(namer.nextName(), aggSpec, this, queryContext);
aggregation.ifPresent(agg -> {
LOG.debug("Adding top level aggregation {}: {}", agg.getName(), agg.getType());
searchSourceBuilder.aggregation(agg);
builders.add(agg);
});
});
queryContext.addFilteredAggregations(builders, searchType);
}

public ESAggregationSpecHandler<? extends AggregationSpec, ? extends io.searchbox.core.search.aggregation.Aggregation> handlerForType(String specType) {
Expand All @@ -60,17 +64,17 @@ public void doGenerateQueryPart(SearchJob job, Query query, Aggregation searchTy
}

@Override
public SearchType.Result doExtractResult(SearchJob job, Query query, Aggregation searchType, SearchResult queryResult, ESGeneratedQueryContext queryContext) {
public SearchType.Result doExtractResult(SearchJob job, Query query, Aggregation searchType, SearchResult queryResult, MetricAggregation aggregations, ESGeneratedQueryContext queryContext) {
final ImmutableList.Builder<Object> metrics = ImmutableList.builder();
final ImmutableList.Builder<Object> groups = ImmutableList.builder();
searchType.metrics().forEach(aggSpec -> {
final ESAggregationSpecHandler<? extends AggregationSpec, ? extends io.searchbox.core.search.aggregation.Aggregation> handler = handlerForType(aggSpec.type());
final Object result = handler.handleResult(aggSpec, queryResult, handler.extractAggregationFromResult(aggSpec, queryResult.getAggregations(), queryContext), this, queryContext);
final Object result = handler.handleResult(aggSpec, queryResult, handler.extractAggregationFromResult(aggSpec, aggregations, queryContext), this, queryContext);
metrics.add(result);
});
searchType.groups().forEach(aggSpec -> {
final ESAggregationSpecHandler<? extends AggregationSpec, ? extends io.searchbox.core.search.aggregation.Aggregation> handler = handlerForType(aggSpec.type());
final Object result = handler.handleResult(aggSpec, queryResult, handler.extractAggregationFromResult(aggSpec, queryResult.getAggregations(), queryContext), this, queryContext);
final Object result = handler.handleResult(aggSpec, queryResult, handler.extractAggregationFromResult(aggSpec, aggregations, queryContext), this, queryContext);
groups.add(result);
});
return new Aggregation.Result() {
Expand Down

0 comments on commit c1254af

Please sign in to comment.