From 42ea8030327276348cc4cb3928924adb566ea179 Mon Sep 17 00:00:00 2001 From: Joanne Wang Date: Mon, 26 Feb 2024 16:21:03 -0800 Subject: [PATCH] clean up doc level queries on dry run (#1430) Signed-off-by: Joanne Wang Signed-off-by: Chase Engelbrecht --- .../alerting/DocumentLevelMonitorRunner.kt | 3 + .../alerting/util/DocLevelMonitorQueries.kt | 46 +++++++ .../alerting/DocumentMonitorRunnerIT.kt | 129 ++++++++++++++++++ 3 files changed, 178 insertions(+) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 55000fd36..611d518cf 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -334,6 +334,9 @@ class DocumentLevelMonitorRunner : MonitorRunner() { monitorMetadata.copy(lastRunContext = updatedLastRunContext), true ) + } else { + // Clean up any queries created by the dry run monitor + monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueriesOnDryRun(monitorMetadata) } // TODO: Update the Document as part of the Trigger and return back the trigger action result diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 3b586b759..109b2e710 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -13,6 +13,8 @@ import org.opensearch.action.admin.indices.alias.Alias import org.opensearch.action.admin.indices.create.CreateIndexRequest import org.opensearch.action.admin.indices.create.CreateIndexResponse import org.opensearch.action.admin.indices.delete.DeleteIndexRequest +import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest +import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest import org.opensearch.action.admin.indices.rollover.RolloverRequest import org.opensearch.action.admin.indices.rollover.RolloverResponse @@ -38,8 +40,16 @@ import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.core.action.ActionListener import org.opensearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING +import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.reindex.BulkByScrollResponse +import org.opensearch.index.reindex.DeleteByQueryAction +import org.opensearch.index.reindex.DeleteByQueryRequestBuilder import org.opensearch.rest.RestStatus +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine private val log = LogManager.getLogger(DocLevelMonitorQueries::class.java) @@ -134,6 +144,42 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ return true } + suspend fun deleteDocLevelQueriesOnDryRun(monitorMetadata: MonitorMetadata) { + try { + monitorMetadata.sourceToQueryIndexMapping.forEach { (_, queryIndex) -> + val indicesExistsResponse: IndicesExistsResponse = + client.suspendUntil { + client.admin().indices().exists(IndicesExistsRequest(queryIndex), it) + } + if (indicesExistsResponse.isExists == false) { + return + } + + val queryBuilder = QueryBuilders.boolQuery() + .must(QueryBuilders.existsQuery("monitor_id")) + .mustNot(QueryBuilders.wildcardQuery("monitor_id", "*")) + + val response: BulkByScrollResponse = suspendCoroutine { cont -> + DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + .source(queryIndex) + .filter(queryBuilder) + .refresh(true) + .execute( + object : ActionListener { + override fun onResponse(response: BulkByScrollResponse) = cont.resume(response) + override fun onFailure(t: Exception) = cont.resumeWithException(t) + } + ) + } + response.bulkFailures.forEach { + log.error("Failed deleting queries while removing dry run queries: [${it.id}] cause: [${it.cause}] ") + } + } + } catch (e: Exception) { + log.error("Failed to delete doc level queries on dry run", e) + } + } + fun docLevelQueryIndexExists(dataSources: DataSources): Boolean { val clusterState = clusterService.state() return clusterState.metadata.hasAlias(dataSources.queryIndex) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 407089fd2..48e54d4ba 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -74,6 +74,21 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val alerts = searchAlerts(monitor) assertEquals("Alert saved for test monitor", 0, alerts.size) + + // ensure doc level query is deleted on dry run + val request = """{ + "size": 10, + "query": { + "match_all": {} + } + }""" + var httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 0L, it.value) } } fun `test dryrun execute monitor with queryFieldNames set up with correct field`() { @@ -297,6 +312,120 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) assertTrue("Incorrect search result", matchingDocsToQuery.contains("1|$testIndex")) assertTrue("Incorrect search result", matchingDocsToQuery.contains("5|$testIndex")) + + // ensure doc level query is deleted on dry run + val request = """{ + "size": 10, + "query": { + "match_all": {} + } + }""" + var httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 0L, it.value) } + } + + fun `test execute monitor returns search result with dryrun then without dryrun ensure dry run query not saved`() { + val testIndex = createTestIndex() + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger)) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex, "2", testDoc) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) + assertTrue("Incorrect search result", matchingDocsToQuery.contains("1|$testIndex")) + assertTrue("Incorrect search result", matchingDocsToQuery.contains("2|$testIndex")) + + // ensure doc level query is deleted on dry run + val request = """{ + "size": 10, + "query": { + "match_all": {} + } + }""" + var httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.totalHits?.let { assertEquals(0L, it.value) } + + // create and execute second monitor not as dryrun + val testIndex2 = createTestIndex("test1") + val testTime2 = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc2 = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime2", + "test_field" : "us-east-1" + }""" + + val docQuery2 = DocLevelQuery(query = "test_field:\"us-east-1\"", name = "3", fields = listOf()) + val docLevelInput2 = DocLevelMonitorInput("description", listOf(testIndex2), listOf(docQuery2)) + + val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor2 = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput2), triggers = listOf(trigger2))) + assertNotNull(monitor2.id) + + indexDoc(testIndex2, "1", testDoc2) + indexDoc(testIndex2, "5", testDoc2) + + val response2 = executeMonitor(monitor2.id) + val output2 = entityAsMap(response2) + + assertEquals(monitor2.name, output2["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult2 = (output2.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val matchingDocsToQuery2 = searchResult2[docQuery2.id] as List + assertEquals("Incorrect search result", 2, matchingDocsToQuery2.size) + assertTrue("Incorrect search result", matchingDocsToQuery2.containsAll(listOf("1|$testIndex2", "5|$testIndex2"))) + + val alerts = searchAlertsWithFilter(monitor2) + assertEquals("Alert saved for test monitor", 2, alerts.size) + + val findings = searchFindings(monitor2) + assertEquals("Findings saved for test monitor", 2, findings.size) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) + assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5")) + + // ensure query from second monitor was saved + val expectedQueries = listOf("test_field_test1_${monitor2.id}:\"us-east-1\"") + httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.forEach { hit -> + val query = ((hit.sourceAsMap["query"] as Map)["query_string"] as Map)["query"] + assertTrue(expectedQueries.contains(query)) + } + searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 1L, it.value) } } fun `test execute monitor generates alerts and findings`() {