From afa87ae912f2139d87ac3723f82be60b7c72074b Mon Sep 17 00:00:00 2001 From: merrimanr Date: Wed, 2 Aug 2017 09:47:45 -0500 Subject: [PATCH] initial commit --- .../impl/IndexDaoSearchServiceImpl.java | 5 +- .../elasticsearch/dao/ElasticsearchDao.java | 70 +++++++- .../indexing/dao/search/SearchRequest.java | 11 ++ .../indexing/dao/search/SearchResponse.java | 13 ++ .../dao/IndexingDaoIntegrationTest.java | 155 +++++++++++++++++- 5 files changed, 243 insertions(+), 11 deletions(-) diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/IndexDaoSearchServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/IndexDaoSearchServiceImpl.java index 5ff22c96a2..b93a5fc35d 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/IndexDaoSearchServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/IndexDaoSearchServiceImpl.java @@ -25,6 +25,7 @@ import org.apache.metron.rest.RestException; import org.apache.metron.rest.service.SearchService; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.env.Environment; import org.springframework.stereotype.Service; import java.io.IOException; @@ -34,10 +35,12 @@ @Service public class IndexDaoSearchServiceImpl implements SearchService { private IndexDao dao; + private Environment environment; @Autowired - public IndexDaoSearchServiceImpl(IndexDao dao) { + public IndexDaoSearchServiceImpl(IndexDao dao, Environment environment) { this.dao = dao; + this.environment = environment; } @Override diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java index cb2b1ca7be..64bf4b6337 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java @@ -22,10 +22,18 @@ import org.apache.metron.indexing.dao.IndexDao; import org.apache.metron.indexing.dao.search.*; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.index.mapper.ip.IpFieldMapper; import org.elasticsearch.index.query.QueryStringQueryBuilder; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms; +import org.elasticsearch.search.aggregations.bucket.terms.LongTerms; +import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.*; @@ -37,6 +45,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; public class ElasticsearchDao implements IndexDao { @@ -92,9 +101,18 @@ public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchEx } searchSourceBuilder = searchSourceBuilder.sort(fieldSortBuilder); } + Optional> facetFields = searchRequest.getFacetFields(); + if (facetFields.isPresent()) { + addFacetFields(searchSourceBuilder, facetFields.get()); + } String[] wildcardIndices = searchRequest.getIndices().stream().map(index -> String.format("%s*", index)).toArray(value -> new String[searchRequest.getIndices().size()]); - org.elasticsearch.action.search.SearchResponse elasticsearchResponse = client.search(new org.elasticsearch.action.search.SearchRequest(wildcardIndices) - .source(searchSourceBuilder)).actionGet(); + org.elasticsearch.action.search.SearchResponse elasticsearchResponse; + try { + elasticsearchResponse = client.search(new org.elasticsearch.action.search.SearchRequest(wildcardIndices) + .source(searchSourceBuilder)).actionGet(); + } catch (SearchPhaseExecutionException e) { + throw new InvalidSearchException("Could not execute search", e); + } SearchResponse searchResponse = new SearchResponse(); searchResponse.setTotal(elasticsearchResponse.getHits().getTotalHits()); searchResponse.setResults(Arrays.stream(elasticsearchResponse.getHits().getHits()).map(searchHit -> { @@ -104,6 +122,15 @@ public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchEx searchResult.setScore(searchHit.getScore()); return searchResult; }).collect(Collectors.toList())); + if (facetFields.isPresent()) { + Map commonColumnMetadata; + try { + commonColumnMetadata = getCommonColumnMetadata(searchRequest.getIndices()); + } catch (IOException e) { + throw new InvalidSearchException(String.format("Could not get common column metadata for indices %s", Arrays.toString(searchRequest.getIndices().toArray()))); + } + searchResponse.setFacetCounts(getFacetCounts(facetFields.get(), elasticsearchResponse.getAggregations(), commonColumnMetadata )); + } return searchResponse; } @@ -179,4 +206,43 @@ protected String[] getLatestIndices(List includeIndices) { return latestIndices.values().toArray(new String[latestIndices.size()]); } + public void addFacetFields(SearchSourceBuilder searchSourceBuilder, List fields) { + for(String field: fields) { + searchSourceBuilder = searchSourceBuilder.aggregation(new TermsBuilder(getAggregationName(field)).field(field)); + } + } + + public Map> getFacetCounts(List fields, Aggregations aggregations, Map commonColumnMetadata) { + Map> fieldCounts = new HashMap<>(); + for (String field: fields) { + Map valueCounts = new HashMap<>(); + Aggregation aggregation = aggregations.get(getAggregationName(field)); + if (aggregation instanceof LongTerms) { + LongTerms longTerms = (LongTerms) aggregation; + FieldType type = commonColumnMetadata.get(field); + if (FieldType.IP.equals(type)) { + longTerms.getBuckets().stream().forEach(bucket -> valueCounts.put(IpFieldMapper.longToIp((Long) bucket.getKey()), bucket.getDocCount())); + } else if (FieldType.BOOLEAN.equals(type)) { + longTerms.getBuckets().stream().forEach(bucket -> { + String key = (Long) bucket.getKey() == 1 ? "true" : "false"; + valueCounts.put(key, bucket.getDocCount()); + }); + } else { + longTerms.getBuckets().stream().forEach(bucket -> valueCounts.put(bucket.getKeyAsString(), bucket.getDocCount())); + } + } else if (aggregation instanceof DoubleTerms) { + DoubleTerms doubleTerms = (DoubleTerms) aggregation; + doubleTerms.getBuckets().stream().forEach(bucket -> valueCounts.put(bucket.getKeyAsString(), bucket.getDocCount())); + } else if (aggregation instanceof StringTerms) { + StringTerms stringTerms = (StringTerms) aggregation; + stringTerms.getBuckets().stream().forEach(bucket -> valueCounts.put(bucket.getKeyAsString(), bucket.getDocCount())); + } + fieldCounts.put(field, valueCounts); + } + return fieldCounts; + } + + private String getAggregationName(String field) { + return String.format("%s_count", field); + } } diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchRequest.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchRequest.java index ecf6b57f4f..b92b36ded0 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchRequest.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchRequest.java @@ -19,6 +19,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Optional; public class SearchRequest { @@ -27,6 +28,7 @@ public class SearchRequest { private int size; private int from; private List sort; + private List facetFields; public SearchRequest() { SortField defaultSortField = new SortField(); @@ -34,6 +36,7 @@ public SearchRequest() { defaultSortField.setSortOrder(SortOrder.DESC.toString()); sort = new ArrayList<>(); sort.add(defaultSortField); + facetFields = new ArrayList<>(); } public List getIndices() { @@ -75,4 +78,12 @@ public List getSort() { public void setSort(List sort) { this.sort = sort; } + + public Optional> getFacetFields() { + return facetFields == null || facetFields.size() == 0 ? Optional.empty() : Optional.of(facetFields); + } + + public void setFacetFields(List facetFields) { + this.facetFields = facetFields; + } } diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResponse.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResponse.java index 7f616940b6..24163579bf 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResponse.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchResponse.java @@ -17,13 +17,17 @@ */ package org.apache.metron.indexing.dao.search; +import com.fasterxml.jackson.annotation.JsonInclude; + import java.util.ArrayList; import java.util.List; +import java.util.Map; public class SearchResponse { private long total; private List results = new ArrayList<>(); + private Map> facetCounts; public long getTotal() { return total; @@ -40,4 +44,13 @@ public List getResults() { public void setResults(List results) { this.results = results; } + + @JsonInclude(JsonInclude.Include.NON_NULL) + public Map> getFacetCounts() { + return facetCounts; + } + + public void setFacetCounts(Map> facetCounts) { + this.facetCounts = facetCounts; + } } diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/IndexingDaoIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/IndexingDaoIntegrationTest.java index 209c2349c2..eb809f9fd3 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/IndexingDaoIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/IndexingDaoIntegrationTest.java @@ -25,10 +25,9 @@ import org.apache.metron.indexing.dao.search.SearchResponse; import org.apache.metron.indexing.dao.search.SearchResult; import org.apache.metron.integration.InMemoryComponent; -import org.json.simple.parser.ParseException; import org.junit.*; -import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -39,9 +38,9 @@ public abstract class IndexingDaoIntegrationTest { * [ * {"source:type": "bro", "ip_src_addr":"192.168.1.1", "ip_src_port": 8010, "long_field": 10000, "timestamp":1, "latitude": 48.5839, "double_field": 1.00001, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 1", "duplicate_name_field": "data 1"}, * {"source:type": "bro", "ip_src_addr":"192.168.1.2", "ip_src_port": 8009, "long_field": 20000, "timestamp":2, "latitude": 48.0001, "double_field": 1.00002, "is_alert":false, "location_point": "48.5839,7.7455", "bro_field": "bro data 2", "duplicate_name_field": "data 2"}, - * {"source:type": "bro", "ip_src_addr":"192.168.1.3", "ip_src_port": 8008, "long_field": 10000, "timestamp":3, "latitude": 48.5839, "double_field": 1.00003, "is_alert":true, "location_point": "50.0,7.7455", "bro_field": "bro data 3", "duplicate_name_field": "data 3"}, - * {"source:type": "bro", "ip_src_addr":"192.168.1.4", "ip_src_port": 8007, "long_field": 10000, "timestamp":4, "latitude": 48.5839, "double_field": 1.00004, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 4", "duplicate_name_field": "data 4"}, - * {"source:type": "bro", "ip_src_addr":"192.168.1.5", "ip_src_port": 8006, "long_field": 10000, "timestamp":5, "latitude": 48.5839, "double_field": 1.00005, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 5", "duplicate_name_field": "data 5"} + * {"source:type": "bro", "ip_src_addr":"192.168.1.3", "ip_src_port": 8008, "long_field": 10000, "timestamp":3, "latitude": 48.5839, "double_field": 1.00002, "is_alert":true, "location_point": "50.0,7.7455", "bro_field": "bro data 3", "duplicate_name_field": "data 3"}, + * {"source:type": "bro", "ip_src_addr":"192.168.1.4", "ip_src_port": 8007, "long_field": 10000, "timestamp":4, "latitude": 48.5839, "double_field": 1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 4", "duplicate_name_field": "data 4"}, + * {"source:type": "bro", "ip_src_addr":"192.168.1.5", "ip_src_port": 8006, "long_field": 10000, "timestamp":5, "latitude": 48.5839, "double_field": 1.00001, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 5", "duplicate_name_field": "data 5"} * ] */ @Multiline @@ -51,9 +50,9 @@ public abstract class IndexingDaoIntegrationTest { * [ * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "double_field": 1.00001, "is_alert":false, "location_point": "50.0,7.7455", "snort_field": 10, "duplicate_name_field": 1}, * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "double_field": 1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 20, "duplicate_name_field": 2}, - * {"source:type": "snort", "ip_src_addr":"192.168.1.7", "ip_src_port": 8003, "long_field": 10000, "timestamp":8, "latitude": 48.5839, "double_field": 1.00003, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 30, "duplicate_name_field": 3}, - * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8002, "long_field": 20000, "timestamp":9, "latitude": 48.0001, "double_field": 1.00004, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 40, "duplicate_name_field": 4}, - * {"source:type": "snort", "ip_src_addr":"192.168.1.8", "ip_src_port": 8001, "long_field": 10000, "timestamp":10, "latitude": 48.5839, "double_field": 1.00005, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 50, "duplicate_name_field": 5} + * {"source:type": "snort", "ip_src_addr":"192.168.1.7", "ip_src_port": 8003, "long_field": 10000, "timestamp":8, "latitude": 48.5839, "double_field": 1.00001, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 30, "duplicate_name_field": 3}, + * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8002, "long_field": 20000, "timestamp":9, "latitude": 48.0001, "double_field": 1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 40, "duplicate_name_field": 4}, + * {"source:type": "snort", "ip_src_addr":"192.168.1.8", "ip_src_port": 8001, "long_field": 10000, "timestamp":10, "latitude": 48.5839, "double_field": 1.00001, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 50, "duplicate_name_field": 5} * ] */ @Multiline @@ -144,6 +143,59 @@ public abstract class IndexingDaoIntegrationTest { @Multiline public static String indexQuery; + /** + * { + * "facetFields": ["source:type", "ip_src_addr", "ip_src_port", "long_field", "timestamp", "latitude", "double_field", "is_alert"], + * "indices": ["bro", "snort"], + * "query": "*", + * "from": 0, + * "size": 10, + * "sort": [ + * { + * "field": "timestamp", + * "sortOrder": "desc" + * } + * ] + * } + */ + @Multiline + public static String facetQuery; + + /** + * { + * "facetFields": ["location_point"], + * "indices": ["bro", "snort"], + * "query": "*", + * "from": 0, + * "size": 10, + * "sort": [ + * { + * "field": "timestamp", + * "sortOrder": "desc" + * } + * ] + * } + */ + @Multiline + public static String badFacetQuery; + + /** + * { + * "indices": ["bro", "snort"], + * "query": "*", + * "from": 0, + * "size": 10, + * "sort": [ + * { + * "field": "timestamp", + * "sortOrder": "desc" + * } + * ] + * } + */ + @Multiline + public static String disabledFacetQuery; + /** * { * "indices": ["bro", "snort"], @@ -236,6 +288,93 @@ public void test() throws Exception { Assert.assertEquals(i, results.get(j).getSource().get("timestamp")); } } + //Facet query including all field types + { + SearchRequest request = JSONUtils.INSTANCE.load(facetQuery, SearchRequest.class); + SearchResponse response = dao.search(request); + Assert.assertEquals(10, response.getTotal()); + Map> facetCounts = response.getFacetCounts(); + Assert.assertEquals(8, facetCounts.size()); + Map sourceTypeCounts = facetCounts.get("source:type"); + Assert.assertEquals(2, sourceTypeCounts.size()); + Assert.assertEquals(new Long(5), sourceTypeCounts.get("bro")); + Assert.assertEquals(new Long(5), sourceTypeCounts.get("snort")); + Map ipSrcAddrCounts = facetCounts.get("ip_src_addr"); + Assert.assertEquals(8, ipSrcAddrCounts.size()); + Assert.assertEquals(new Long(3), ipSrcAddrCounts.get("192.168.1.1")); + Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.2")); + Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.3")); + Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.4")); + Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.5")); + Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.6")); + Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.7")); + Assert.assertEquals(new Long(1), ipSrcAddrCounts.get("192.168.1.8")); + Map ipSrcPortCounts = facetCounts.get("ip_src_port"); + Assert.assertEquals(10, ipSrcPortCounts.size()); + Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8001")); + Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8002")); + Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8003")); + Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8004")); + Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8005")); + Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8006")); + Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8007")); + Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8008")); + Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8009")); + Assert.assertEquals(new Long(1), ipSrcPortCounts.get("8010")); + Map longFieldCounts = facetCounts.get("long_field"); + Assert.assertEquals(2, longFieldCounts.size()); + Assert.assertEquals(new Long(8), longFieldCounts.get("10000")); + Assert.assertEquals(new Long(2), longFieldCounts.get("20000")); + Map timestampCounts = facetCounts.get("timestamp"); + Assert.assertEquals(10, timestampCounts.size()); + Assert.assertEquals(new Long(1), timestampCounts.get("1")); + Assert.assertEquals(new Long(1), timestampCounts.get("2")); + Assert.assertEquals(new Long(1), timestampCounts.get("3")); + Assert.assertEquals(new Long(1), timestampCounts.get("4")); + Assert.assertEquals(new Long(1), timestampCounts.get("5")); + Assert.assertEquals(new Long(1), timestampCounts.get("6")); + Assert.assertEquals(new Long(1), timestampCounts.get("7")); + Assert.assertEquals(new Long(1), timestampCounts.get("8")); + Assert.assertEquals(new Long(1), timestampCounts.get("9")); + Assert.assertEquals(new Long(1), timestampCounts.get("10")); + Map latitudeCounts = facetCounts.get("latitude"); + Assert.assertEquals(2, latitudeCounts.size()); + List latitudeKeys = new ArrayList<>(latitudeCounts.keySet()); + Collections.sort(latitudeKeys); + Assert.assertEquals(48.0001, Double.parseDouble(latitudeKeys.get(0)), 0.00001); + Assert.assertEquals(48.5839, Double.parseDouble(latitudeKeys.get(1)), 0.00001); + Assert.assertEquals(new Long(2), latitudeCounts.get(latitudeKeys.get(0))); + Assert.assertEquals(new Long(8), latitudeCounts.get(latitudeKeys.get(1))); + Map doubleFieldCounts = facetCounts.get("double_field"); + Assert.assertEquals(2, doubleFieldCounts.size()); + List doubleFieldKeys = new ArrayList<>(doubleFieldCounts.keySet()); + Collections.sort(doubleFieldKeys); + Assert.assertEquals(1.00001, Double.parseDouble(doubleFieldKeys.get(0)), 0.00001); + Assert.assertEquals(1.00002, Double.parseDouble(doubleFieldKeys.get(1)), 0.00001); + Assert.assertEquals(new Long(5), doubleFieldCounts.get(doubleFieldKeys.get(0))); + Assert.assertEquals(new Long(5), doubleFieldCounts.get(doubleFieldKeys.get(1))); + Map isAlertCounts = facetCounts.get("is_alert"); + Assert.assertEquals(2, isAlertCounts.size()); + Assert.assertEquals(new Long(6), isAlertCounts.get("true")); + Assert.assertEquals(new Long(4), isAlertCounts.get("false")); + } + //Bad facet query + { + SearchRequest request = JSONUtils.INSTANCE.load(badFacetQuery, SearchRequest.class); + try { + dao.search(request); + Assert.fail("Exception expected, but did not come."); + } + catch(InvalidSearchException ise) { + Assert.assertEquals("Could not execute search", ise.getMessage()); + } + } + //Disabled facet query + { + SearchRequest request = JSONUtils.INSTANCE.load(disabledFacetQuery, SearchRequest.class); + SearchResponse response = dao.search(request); + Assert.assertNull(response.getFacetCounts()); + } //Exceeded maximum results query { SearchRequest request = JSONUtils.INSTANCE.load(exceededMaxResultsQuery, SearchRequest.class);