Skip to content
This repository was archived by the owner on Aug 20, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.service.SearchService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;

import java.io.IOException;
Expand All @@ -34,10 +35,12 @@
@Service
public class IndexDaoSearchServiceImpl implements SearchService {
private IndexDao dao;
private Environment environment;

@Autowired
public IndexDaoSearchServiceImpl(IndexDao dao) {
public IndexDaoSearchServiceImpl(IndexDao dao, Environment environment) {
this.dao = dao;
this.environment = environment;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,18 @@
import org.apache.metron.indexing.dao.IndexDao;
import org.apache.metron.indexing.dao.search.*;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.index.mapper.ip.IpFieldMapper;
import org.elasticsearch.index.query.QueryStringQueryBuilder;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms;
import org.elasticsearch.search.aggregations.bucket.terms.LongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.*;

Expand All @@ -37,6 +45,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;

public class ElasticsearchDao implements IndexDao {
Expand Down Expand Up @@ -92,9 +101,18 @@ public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchEx
}
searchSourceBuilder = searchSourceBuilder.sort(fieldSortBuilder);
}
Optional<List<String>> facetFields = searchRequest.getFacetFields();
if (facetFields.isPresent()) {
addFacetFields(searchSourceBuilder, facetFields.get());
}
String[] wildcardIndices = searchRequest.getIndices().stream().map(index -> String.format("%s*", index)).toArray(value -> new String[searchRequest.getIndices().size()]);
org.elasticsearch.action.search.SearchResponse elasticsearchResponse = client.search(new org.elasticsearch.action.search.SearchRequest(wildcardIndices)
.source(searchSourceBuilder)).actionGet();
org.elasticsearch.action.search.SearchResponse elasticsearchResponse;
try {
elasticsearchResponse = client.search(new org.elasticsearch.action.search.SearchRequest(wildcardIndices)
.source(searchSourceBuilder)).actionGet();
} catch (SearchPhaseExecutionException e) {
throw new InvalidSearchException("Could not execute search", e);
}
SearchResponse searchResponse = new SearchResponse();
searchResponse.setTotal(elasticsearchResponse.getHits().getTotalHits());
searchResponse.setResults(Arrays.stream(elasticsearchResponse.getHits().getHits()).map(searchHit -> {
Expand All @@ -104,6 +122,15 @@ public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchEx
searchResult.setScore(searchHit.getScore());
return searchResult;
}).collect(Collectors.toList()));
if (facetFields.isPresent()) {
Map<String, FieldType> commonColumnMetadata;
try {
commonColumnMetadata = getCommonColumnMetadata(searchRequest.getIndices());
} catch (IOException e) {
throw new InvalidSearchException(String.format("Could not get common column metadata for indices %s", Arrays.toString(searchRequest.getIndices().toArray())));
}
searchResponse.setFacetCounts(getFacetCounts(facetFields.get(), elasticsearchResponse.getAggregations(), commonColumnMetadata ));
}
return searchResponse;
}

Expand Down Expand Up @@ -179,4 +206,43 @@ protected String[] getLatestIndices(List<String> includeIndices) {
return latestIndices.values().toArray(new String[latestIndices.size()]);
}

public void addFacetFields(SearchSourceBuilder searchSourceBuilder, List<String> fields) {
for(String field: fields) {
searchSourceBuilder = searchSourceBuilder.aggregation(new TermsBuilder(getAggregationName(field)).field(field));
}
}

public Map<String, Map<String, Long>> getFacetCounts(List<String> fields, Aggregations aggregations, Map<String, FieldType> commonColumnMetadata) {
Map<String, Map<String, Long>> fieldCounts = new HashMap<>();
for (String field: fields) {
Map<String, Long> valueCounts = new HashMap<>();
Aggregation aggregation = aggregations.get(getAggregationName(field));
if (aggregation instanceof LongTerms) {
LongTerms longTerms = (LongTerms) aggregation;
FieldType type = commonColumnMetadata.get(field);
if (FieldType.IP.equals(type)) {
longTerms.getBuckets().stream().forEach(bucket -> valueCounts.put(IpFieldMapper.longToIp((Long) bucket.getKey()), bucket.getDocCount()));
} else if (FieldType.BOOLEAN.equals(type)) {
longTerms.getBuckets().stream().forEach(bucket -> {
String key = (Long) bucket.getKey() == 1 ? "true" : "false";
valueCounts.put(key, bucket.getDocCount());
});
} else {
longTerms.getBuckets().stream().forEach(bucket -> valueCounts.put(bucket.getKeyAsString(), bucket.getDocCount()));
}
} else if (aggregation instanceof DoubleTerms) {
DoubleTerms doubleTerms = (DoubleTerms) aggregation;
doubleTerms.getBuckets().stream().forEach(bucket -> valueCounts.put(bucket.getKeyAsString(), bucket.getDocCount()));
} else if (aggregation instanceof StringTerms) {
StringTerms stringTerms = (StringTerms) aggregation;
stringTerms.getBuckets().stream().forEach(bucket -> valueCounts.put(bucket.getKeyAsString(), bucket.getDocCount()));
}
fieldCounts.put(field, valueCounts);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any other *Terms here or does this cover our "Other" type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ES java api can be pretty awkward to use. This is a good example of that. The Aggregation object is abstract and has 3 subclasses related to term aggregations: LongTerms, DoubleTerms and StringTerms. All types fall into one of these (as far as I could tell) which is one reason we needed a function to get the fields types. For example, an aggregation for a field of type "ip" is represented as a LongTerms object and the value returned is also a long. To get it to display correct we need to convert it to a string representation of the ip address. Same thing for booleans. There are returned as LongTerms with a value of 1 or 0.

Hopefully I covered all the different types in the integration tests.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

}
return fieldCounts;
}

private String getAggregationName(String field) {
return String.format("%s_count", field);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

public class SearchRequest {

Expand All @@ -27,13 +28,15 @@ public class SearchRequest {
private int size;
private int from;
private List<SortField> sort;
private List<String> facetFields;

public SearchRequest() {
SortField defaultSortField = new SortField();
defaultSortField.setField("timestamp");
defaultSortField.setSortOrder(SortOrder.DESC.toString());
sort = new ArrayList<>();
sort.add(defaultSortField);
facetFields = new ArrayList<>();
}

public List<String> getIndices() {
Expand Down Expand Up @@ -75,4 +78,12 @@ public List<SortField> getSort() {
public void setSort(List<SortField> sort) {
this.sort = sort;
}

public Optional<List<String>> getFacetFields() {
return facetFields == null || facetFields.size() == 0 ? Optional.empty() : Optional.of(facetFields);
}

public void setFacetFields(List<String> facetFields) {
this.facetFields = facetFields;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@
*/
package org.apache.metron.indexing.dao.search;

import com.fasterxml.jackson.annotation.JsonInclude;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

public class SearchResponse {

private long total;
private List<SearchResult> results = new ArrayList<>();
private Map<String, Map<String, Long>> facetCounts;

public long getTotal() {
return total;
Expand All @@ -40,4 +44,13 @@ public List<SearchResult> getResults() {
public void setResults(List<SearchResult> results) {
this.results = results;
}

@JsonInclude(JsonInclude.Include.NON_NULL)
public Map<String, Map<String, Long>> getFacetCounts() {
return facetCounts;
}

public void setFacetCounts(Map<String, Map<String, Long>> facetCounts) {
this.facetCounts = facetCounts;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@
import org.apache.metron.indexing.dao.search.SearchResponse;
import org.apache.metron.indexing.dao.search.SearchResult;
import org.apache.metron.integration.InMemoryComponent;
import org.json.simple.parser.ParseException;
import org.junit.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand All @@ -39,9 +38,9 @@ public abstract class IndexingDaoIntegrationTest {
* [
* {"source:type": "bro", "ip_src_addr":"192.168.1.1", "ip_src_port": 8010, "long_field": 10000, "timestamp":1, "latitude": 48.5839, "double_field": 1.00001, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 1", "duplicate_name_field": "data 1"},
* {"source:type": "bro", "ip_src_addr":"192.168.1.2", "ip_src_port": 8009, "long_field": 20000, "timestamp":2, "latitude": 48.0001, "double_field": 1.00002, "is_alert":false, "location_point": "48.5839,7.7455", "bro_field": "bro data 2", "duplicate_name_field": "data 2"},
* {"source:type": "bro", "ip_src_addr":"192.168.1.3", "ip_src_port": 8008, "long_field": 10000, "timestamp":3, "latitude": 48.5839, "double_field": 1.00003, "is_alert":true, "location_point": "50.0,7.7455", "bro_field": "bro data 3", "duplicate_name_field": "data 3"},
* {"source:type": "bro", "ip_src_addr":"192.168.1.4", "ip_src_port": 8007, "long_field": 10000, "timestamp":4, "latitude": 48.5839, "double_field": 1.00004, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 4", "duplicate_name_field": "data 4"},
* {"source:type": "bro", "ip_src_addr":"192.168.1.5", "ip_src_port": 8006, "long_field": 10000, "timestamp":5, "latitude": 48.5839, "double_field": 1.00005, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 5", "duplicate_name_field": "data 5"}
* {"source:type": "bro", "ip_src_addr":"192.168.1.3", "ip_src_port": 8008, "long_field": 10000, "timestamp":3, "latitude": 48.5839, "double_field": 1.00002, "is_alert":true, "location_point": "50.0,7.7455", "bro_field": "bro data 3", "duplicate_name_field": "data 3"},
* {"source:type": "bro", "ip_src_addr":"192.168.1.4", "ip_src_port": 8007, "long_field": 10000, "timestamp":4, "latitude": 48.5839, "double_field": 1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 4", "duplicate_name_field": "data 4"},
* {"source:type": "bro", "ip_src_addr":"192.168.1.5", "ip_src_port": 8006, "long_field": 10000, "timestamp":5, "latitude": 48.5839, "double_field": 1.00001, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 5", "duplicate_name_field": "data 5"}
* ]
*/
@Multiline
Expand All @@ -51,9 +50,9 @@ public abstract class IndexingDaoIntegrationTest {
* [
* {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "double_field": 1.00001, "is_alert":false, "location_point": "50.0,7.7455", "snort_field": 10, "duplicate_name_field": 1},
* {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "double_field": 1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 20, "duplicate_name_field": 2},
* {"source:type": "snort", "ip_src_addr":"192.168.1.7", "ip_src_port": 8003, "long_field": 10000, "timestamp":8, "latitude": 48.5839, "double_field": 1.00003, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 30, "duplicate_name_field": 3},
* {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8002, "long_field": 20000, "timestamp":9, "latitude": 48.0001, "double_field": 1.00004, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 40, "duplicate_name_field": 4},
* {"source:type": "snort", "ip_src_addr":"192.168.1.8", "ip_src_port": 8001, "long_field": 10000, "timestamp":10, "latitude": 48.5839, "double_field": 1.00005, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 50, "duplicate_name_field": 5}
* {"source:type": "snort", "ip_src_addr":"192.168.1.7", "ip_src_port": 8003, "long_field": 10000, "timestamp":8, "latitude": 48.5839, "double_field": 1.00001, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 30, "duplicate_name_field": 3},
* {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8002, "long_field": 20000, "timestamp":9, "latitude": 48.0001, "double_field": 1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 40, "duplicate_name_field": 4},
* {"source:type": "snort", "ip_src_addr":"192.168.1.8", "ip_src_port": 8001, "long_field": 10000, "timestamp":10, "latitude": 48.5839, "double_field": 1.00001, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 50, "duplicate_name_field": 5}
* ]
*/
@Multiline
Expand Down Expand Up @@ -144,6 +143,59 @@ public abstract class IndexingDaoIntegrationTest {
@Multiline
public static String indexQuery;

/**
* {
* "facetFields": ["source:type", "ip_src_addr", "ip_src_port", "long_field", "timestamp", "latitude", "double_field", "is_alert"],
* "indices": ["bro", "snort"],
* "query": "*",
* "from": 0,
* "size": 10,
* "sort": [
* {
* "field": "timestamp",
* "sortOrder": "desc"
* }
* ]
* }
*/
@Multiline
public static String facetQuery;

/**
* {
* "facetFields": ["location_point"],
* "indices": ["bro", "snort"],
* "query": "*",
* "from": 0,
* "size": 10,
* "sort": [
* {
* "field": "timestamp",
* "sortOrder": "desc"
* }
* ]
* }
*/
@Multiline
public static String badFacetQuery;

/**
* {
* "indices": ["bro", "snort"],
* "query": "*",
* "from": 0,
* "size": 10,
* "sort": [
* {
* "field": "timestamp",
* "sortOrder": "desc"
* }
* ]
* }
*/
@Multiline
public static String disabledFacetQuery;

/**
* {
* "indices": ["bro", "snort"],
Expand Down Expand Up @@ -236,6 +288,93 @@ public void test() throws Exception {
Assert.assertEquals(i, results.get(j).getSource().get("timestamp"));
}
}
//Facet query including all field types
{
SearchRequest request = JSONUtils.INSTANCE.load(facetQuery, SearchRequest.class);
SearchResponse response = dao.search(request);
Assert.assertEquals(10, response.getTotal());
Map<String, Map<String, Long>> facetCounts = response.getFacetCounts();
Assert.assertEquals(8, facetCounts.size());
Map<String, Long> sourceTypeCounts = facetCounts.get("source:type");
Assert.assertEquals(2, sourceTypeCounts.size());
Assert.assertEquals(new Long(5), sourceTypeCounts.get("bro"));
Assert.assertEquals(new Long(5), sourceTypeCounts.get("snort"));
Map<String, Long> ipSrcAddrCounts = facetCounts.get("ip_src_addr");
Assert.assertEquals(8, ipSrcAddrCounts.size());
Assert.assertEquals(new Long(3), ipSrcAddrCounts.get("192.168.1.1"));
Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.2"));
Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.3"));
Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.4"));
Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.5"));
Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.6"));
Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.7"));
Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.8"));
Map<String, Long> ipSrcPortCounts = facetCounts.get("ip_src_port");
Assert.assertEquals(10, ipSrcPortCounts.size());
Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8001"));
Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8002"));
Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8003"));
Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8004"));
Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8005"));
Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8006"));
Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8007"));
Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8008"));
Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8009"));
Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8010"));
Map<String, Long> longFieldCounts = facetCounts.get("long_field");
Assert.assertEquals(2, longFieldCounts.size());
Assert.assertEquals(new Long(8), longFieldCounts.get("10000"));
Assert.assertEquals(new Long(2), longFieldCounts.get("20000"));
Map<String, Long> timestampCounts = facetCounts.get("timestamp");
Assert.assertEquals(10, timestampCounts.size());
Assert.assertEquals(new Long(1), timestampCounts.get("1"));
Assert.assertEquals(new Long(1), timestampCounts.get("2"));
Assert.assertEquals(new Long(1), timestampCounts.get("3"));
Assert.assertEquals(new Long(1), timestampCounts.get("4"));
Assert.assertEquals(new Long(1), timestampCounts.get("5"));
Assert.assertEquals(new Long(1), timestampCounts.get("6"));
Assert.assertEquals(new Long(1), timestampCounts.get("7"));
Assert.assertEquals(new Long(1), timestampCounts.get("8"));
Assert.assertEquals(new Long(1), timestampCounts.get("9"));
Assert.assertEquals(new Long(1), timestampCounts.get("10"));
Map<String, Long> latitudeCounts = facetCounts.get("latitude");
Assert.assertEquals(2, latitudeCounts.size());
List<String> latitudeKeys = new ArrayList<>(latitudeCounts.keySet());
Collections.sort(latitudeKeys);
Assert.assertEquals(48.0001, Double.parseDouble(latitudeKeys.get(0)), 0.00001);
Assert.assertEquals(48.5839, Double.parseDouble(latitudeKeys.get(1)), 0.00001);
Assert.assertEquals(new Long(2), latitudeCounts.get(latitudeKeys.get(0)));
Assert.assertEquals(new Long(8), latitudeCounts.get(latitudeKeys.get(1)));
Map<String, Long> doubleFieldCounts = facetCounts.get("double_field");
Assert.assertEquals(2, doubleFieldCounts.size());
List<String> doubleFieldKeys = new ArrayList<>(doubleFieldCounts.keySet());
Collections.sort(doubleFieldKeys);
Assert.assertEquals(1.00001, Double.parseDouble(doubleFieldKeys.get(0)), 0.00001);
Assert.assertEquals(1.00002, Double.parseDouble(doubleFieldKeys.get(1)), 0.00001);
Assert.assertEquals(new Long(5), doubleFieldCounts.get(doubleFieldKeys.get(0)));
Assert.assertEquals(new Long(5), doubleFieldCounts.get(doubleFieldKeys.get(1)));
Map<String, Long> isAlertCounts = facetCounts.get("is_alert");
Assert.assertEquals(2, isAlertCounts.size());
Assert.assertEquals(new Long(6), isAlertCounts.get("true"));
Assert.assertEquals(new Long(4), isAlertCounts.get("false"));
}
//Bad facet query
{
SearchRequest request = JSONUtils.INSTANCE.load(badFacetQuery, SearchRequest.class);
try {
dao.search(request);
Assert.fail("Exception expected, but did not come.");
}
catch(InvalidSearchException ise) {
Assert.assertEquals("Could not execute search", ise.getMessage());
}
}
//Disabled facet query
{
SearchRequest request = JSONUtils.INSTANCE.load(disabledFacetQuery, SearchRequest.class);
SearchResponse response = dao.search(request);
Assert.assertNull(response.getFacetCounts());
}
//Exceeded maximum results query
{
SearchRequest request = JSONUtils.INSTANCE.load(exceededMaxResultsQuery, SearchRequest.class);
Expand Down