From 351a5e4057f2b2a87034304472d45fbe182a2820 Mon Sep 17 00:00:00 2001 From: justinjleet Date: Thu, 12 Oct 2017 07:32:35 -0400 Subject: [PATCH] Updated to use Ids query and added test for getLatest --- metron-interface/metron-alerts/README.md | 6 +- metron-interface/metron-rest/README.md | 4 +- .../elasticsearch/dao/ElasticsearchDao.java | 60 ++++++----------- .../ElasticsearchSearchIntegrationTest.java | 65 +++++++++++++++++-- .../indexing/dao/SearchIntegrationTest.java | 20 ++++++ 5 files changed, 106 insertions(+), 49 deletions(-) diff --git a/metron-interface/metron-alerts/README.md b/metron-interface/metron-alerts/README.md index c312efa9bc..b0433d0444 100644 --- a/metron-interface/metron-alerts/README.md +++ b/metron-interface/metron-alerts/README.md @@ -6,7 +6,11 @@ - [Installing on an existing Cluster](#installing-on-an-existing-cluster) ## Caveats -* UI uses local storage to save all the data. A middleware needs to be designed and developed for persisting the data +### Local Storage +UI uses local storage to save all the data. A middleware needs to be designed and developed for persisting the data + +### Search for Alert GUIDs +Alert GUIDs must be double-quoted when being searched on to ensure correctness of results, e.g. guid:"id1". ## Prerequisites * The Metron REST application should be up and running and Elasticsearch should have some alerts populated by Metron topologies diff --git a/metron-interface/metron-rest/README.md b/metron-interface/metron-rest/README.md index 394f4a8aca..382491752c 100644 --- a/metron-interface/metron-rest/README.md +++ b/metron-interface/metron-rest/README.md @@ -430,14 +430,14 @@ Request and Response objects are JSON formatted. The JSON schemas are available * 200 - The meta alert was created ### `POST /api/v1/search/search` - * Description: Searches the indexing store + * Description: Searches the indexing store. GUIDs must be quoted to ensure correct results. * Input: * searchRequest - Search request * Returns: * 200 - Search response ### `POST /api/v1/search/group` - * Description: Searches the indexing store and returns field groups. Groups are hierarchical and nested in the order the fields appear in the 'groups' request parameter. The default sorting within groups is by count descending. A groupOrder type of count will sort based on then number of documents in a group while a groupType of term will sort by the groupBy term. + * Description: Searches the indexing store and returns field groups. GUIDs must be quoted to ensure correct results. Groups are hierarchical and nested in the order the fields appear in the 'groups' request parameter. The default sorting within groups is by count descending. A groupOrder type of count will sort based on then number of documents in a group while a groupType of term will sort by the groupBy term. * Input: * groupRequest - Group request * indices - list of indices to search diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java index e097a99ad5..aa56ed0485 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java @@ -32,7 +32,6 @@ import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; -import org.apache.metron.common.Constants; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.IndexDao; @@ -46,20 +45,16 @@ import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; -import org.elasticsearch.action.ActionWriteResponse.ShardInfo; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.*; -import org.elasticsearch.action.update.UpdateRequest; import org.apache.metron.indexing.dao.search.SearchResult; import org.apache.metron.indexing.dao.search.SortOrder; import org.apache.metron.indexing.dao.update.Document; +import org.elasticsearch.action.ActionWriteResponse.ShardInfo; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchPhaseExecutionException; -import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -78,24 +73,6 @@ import org.elasticsearch.search.aggregations.metrics.sum.Sum; import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.search.sort.*; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHits; -import java.io.IOException; -import java.util.Arrays; -import java.util.Date; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.function.Function; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -253,30 +230,31 @@ public Document getLatest(final String guid, final String sensorType) throws IOE * Return the search hit based on the UUID and sensor type. * A callback can be specified to transform the hit into a type T. * If more than one hit happens, the first one will be returned. - * @throws IOException */ - Optional searchByGuid(String guid, String sensorType, Function> callback) throws IOException{ - QueryBuilder query = QueryBuilders.matchQuery(Constants.GUID, guid); + Optional searchByGuid(String guid, String sensorType, + Function> callback) { + QueryBuilder query = QueryBuilders.idsQuery(sensorType + "_doc").ids(guid); SearchRequestBuilder request = client.prepareSearch() - .setTypes(sensorType + "_doc") .setQuery(query) .setSource("message") ; - MultiSearchResponse response = client.prepareMultiSearch() - .add(request) - .get(); - for(MultiSearchResponse.Item i : response) { - org.elasticsearch.action.search.SearchResponse resp = i.getResponse(); - SearchHits hits = resp.getHits(); - for(SearchHit hit : hits) { - Optional ret = callback.apply(hit); - if(ret.isPresent()) { - return ret; - } + org.elasticsearch.action.search.SearchResponse response = request.get(); + SearchHits hits = response.getHits(); + long totalHits = hits.getTotalHits(); + if (totalHits > 1) { + LOG.warn("Encountered {} results for guid {} in sensor {}. Returning first hit.", + totalHits, + guid, + sensorType + ); + } + for (SearchHit hit : hits) { + Optional ret = callback.apply(hit); + if (ret.isPresent()) { + return ret; } } return Optional.empty(); - } @Override diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java index adb69ee5a3..e21bb1319a 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java @@ -18,6 +18,10 @@ package org.apache.metron.elasticsearch.integration; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.concurrent.ExecutionException; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.elasticsearch.dao.ElasticsearchDao; import org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao; @@ -29,18 +33,21 @@ import org.apache.metron.integration.InMemoryComponent; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; -import java.io.File; -import java.util.HashMap; - public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { private static String indexDir = "target/elasticsearch_search"; private static String dateFormat = "yyyy.MM.dd.HH"; + private static final int MAX_RETRIES = 10; + private static final int SLEEP_MS = 500; /** * { @@ -120,7 +127,8 @@ protected InMemoryComponent startIndex() throws Exception { } @Override - protected void loadTestData() throws ParseException { + protected void loadTestData() + throws ParseException, IOException, ExecutionException, InterruptedException { ElasticSearchComponent es = (ElasticSearchComponent)indexComponent; es.getClient().admin().indices().prepareCreate("bro_index_2017.01.01.01") .addMapping("bro_doc", broTypeMappings).get(); @@ -149,12 +157,59 @@ protected void loadTestData() throws ParseException { JSONObject jsonObject = (JSONObject) o; IndexRequestBuilder indexRequestBuilder = es.getClient().prepareIndex("metaalerts", "metaalert_doc"); indexRequestBuilder = indexRequestBuilder.setSource(jsonObject.toJSONString()); -// indexRequestBuilder = indexRequestBuilder.setTimestamp(jsonObject.get("timestamp").toString()); bulkRequest.add(indexRequestBuilder); } BulkResponse bulkResponse = bulkRequest.execute().actionGet(); if (bulkResponse.hasFailures()) { throw new RuntimeException("Failed to index test data"); } + + SearchResponse broDocs = es.getClient() + .prepareSearch("bro_index_2017.01.01.01") + .setTypes("bro_doc") + .setQuery(QueryBuilders.matchAllQuery()) + .get(); + // We're changing the _id field, we need to create a copy and delete the original. + for (SearchHit hit : broDocs.getHits()) { + // Bro GUIDs to collide while using the standard analyzer + // Use timestamp as part of guid because query may not return in order each time + IndexRequest indexRequest = new IndexRequest() + .index("bro_index_2017.01.01.01") + .type("bro_doc") + .id("bro-" + hit.getSource().get("timestamp")) + .source(hit.getSource()); + es.getClient().index(indexRequest).get(); + + // Delete the original + es.getClient() + .prepareDelete("bro_index_2017.01.01.01", "bro_doc", hit.getId()) + .get(); + } + + // Wait until everything is updated + // Assume true until proven otherwise. + boolean allUpdated = true; + for (int t = 0; t < MAX_RETRIES; ++t, Thread.sleep(SLEEP_MS)) { + allUpdated = true; + SearchResponse response = es.getClient() + .prepareSearch("bro_index_2017.01.01.01") + .setTypes("bro_doc") + .setQuery(QueryBuilders.matchAllQuery()) + .get(); + if (response.getHits().getTotalHits() == 0) { + throw new IllegalStateException("Bro index is empty. No docs to validate were updated"); + } + for (SearchHit hit : response.getHits()) { + if (!hit.getId().startsWith("bro-")) { + allUpdated = false; + } + } + if (allUpdated) { + break; + } + } + if (!allUpdated) { + throw new IllegalStateException("Unable to update Elasticsearch ids properly"); + } } } diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java index 26d1a75336..e2a37f1d00 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java @@ -17,9 +17,11 @@ */ package org.apache.metron.indexing.dao; +import java.util.Optional; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.indexing.dao.search.FieldType; +import org.apache.metron.indexing.dao.search.GetRequest; import org.apache.metron.indexing.dao.search.GroupRequest; import org.apache.metron.indexing.dao.search.GroupResponse; import org.apache.metron.indexing.dao.search.InvalidSearchException; @@ -91,6 +93,15 @@ public abstract class SearchIntegrationTest { @Multiline public static String allQuery; + /** + * { + * "guid": "bro-3", + * "sensorType": "bro" + * } + */ + @Multiline + public static String findOneGuidQuery; + /** * { * "indices": ["bro", "snort"], @@ -370,6 +381,15 @@ public void test() throws Exception { Assert.assertEquals(10-i, results.get(i).getSource().get("timestamp")); } } + //Find One Guid Testcase + { + GetRequest request = JSONUtils.INSTANCE.load(findOneGuidQuery, GetRequest.class); + Optional> response = dao.getLatestResult(request); + Assert.assertTrue(response.isPresent()); + Map doc = response.get(); + Assert.assertEquals("bro", doc.get("source:type")); + Assert.assertEquals(3, doc.get("timestamp")); + } //Filter test case { SearchRequest request = JSONUtils.INSTANCE.load(filterQuery, SearchRequest.class);