diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index de235ba60..de63ab727 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -324,6 +324,9 @@ class DocumentLevelMonitorRunner : MonitorRunner() { monitorMetadata.copy(lastRunContext = updatedLastRunContext), true ) + } else { + // Clean up any queries created by the dry run monitor + monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueriesOnDryRun(monitorMetadata) } // TODO: Update the Document as part of the Trigger and return back the trigger action result diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 3b586b759..5bb11e769 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -9,10 +9,13 @@ import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchStatusException import org.opensearch.ResourceAlreadyExistsException +import org.opensearch.action.ActionListener import org.opensearch.action.admin.indices.alias.Alias import org.opensearch.action.admin.indices.create.CreateIndexRequest import org.opensearch.action.admin.indices.create.CreateIndexResponse import org.opensearch.action.admin.indices.delete.DeleteIndexRequest +import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest +import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest import org.opensearch.action.admin.indices.rollover.RolloverRequest import org.opensearch.action.admin.indices.rollover.RolloverResponse @@ -39,7 +42,14 @@ import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING +import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.reindex.BulkByScrollResponse +import org.opensearch.index.reindex.DeleteByQueryAction +import org.opensearch.index.reindex.DeleteByQueryRequestBuilder import org.opensearch.rest.RestStatus +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine private val log = LogManager.getLogger(DocLevelMonitorQueries::class.java) @@ -134,6 +144,42 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ return true } + suspend fun deleteDocLevelQueriesOnDryRun(monitorMetadata: MonitorMetadata) { + try { + monitorMetadata.sourceToQueryIndexMapping.forEach { (_, queryIndex) -> + val indicesExistsResponse: IndicesExistsResponse = + client.suspendUntil { + client.admin().indices().exists(IndicesExistsRequest(queryIndex), it) + } + if (indicesExistsResponse.isExists == false) { + return + } + + val queryBuilder = QueryBuilders.boolQuery() + .must(QueryBuilders.existsQuery("monitor_id")) + .mustNot(QueryBuilders.wildcardQuery("monitor_id", "*")) + + val response: BulkByScrollResponse = suspendCoroutine { cont -> + DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + .source(queryIndex) + .filter(queryBuilder) + .refresh(true) + .execute( + object : ActionListener { + override fun onResponse(response: BulkByScrollResponse) = cont.resume(response) + override fun onFailure(t: Exception) = cont.resumeWithException(t) + } + ) + } + response.bulkFailures.forEach { + log.error("Failed deleting queries while removing dry run queries: [${it.id}] cause: [${it.cause}] ") + } + } + } catch (e: Exception) { + log.error("Failed to delete doc level queries on dry run", e) + } + } + fun docLevelQueryIndexExists(dataSources: DataSources): Boolean { val clusterState = clusterService.state() return clusterState.metadata.hasAlias(dataSources.queryIndex) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 839227b8e..c4106bef4 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -41,7 +41,168 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val index = createTestIndex() - val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + + indexDoc(index, "1", testDoc) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + + val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + assertEquals(1, output.objectMap("trigger_results").values.size) + + for (triggerResult in output.objectMap("trigger_results").values) { + assertEquals(1, triggerResult.objectMap("action_results").values.size) + for (alertActionResult in triggerResult.objectMap("action_results").values) { + for (actionResult in alertActionResult.values) { + @Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map>)["output"] + as Map + assertEquals("Hello ${monitor.name}", actionOutput["subject"]) + assertEquals("Hello ${monitor.name}", actionOutput["message"]) + } + } + } + + val alerts = searchAlerts(monitor) + assertEquals("Alert saved for test monitor", 0, alerts.size) + + // ensure doc level query is deleted on dry run + val request = """{ + "size": 10, + "query": { + "match_all": {} + } + }""" + var httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 0L, it.value) } + } + + fun `test dryrun execute monitor with queryFieldNames set up with correct field`() { + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val index = createTestIndex() + + val docQuery = + DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf(), queryFieldNames = listOf("test_field")) + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + + indexDoc(index, "1", testDoc) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + + val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + assertEquals(1, output.objectMap("trigger_results").values.size) + + for (triggerResult in output.objectMap("trigger_results").values) { + assertEquals(1, triggerResult.objectMap("action_results").values.size) + for (alertActionResult in triggerResult.objectMap("action_results").values) { + for (actionResult in alertActionResult.values) { + @Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map>)["output"] + as Map + assertEquals("Hello ${monitor.name}", actionOutput["subject"]) + assertEquals("Hello ${monitor.name}", actionOutput["message"]) + } + } + } + + val alerts = searchAlerts(monitor) + assertEquals("Alert saved for test monitor", 0, alerts.size) + } + + fun `test dryrun execute monitor with queryFieldNames set up with wrong field`() { + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val index = createTestIndex() + // using wrong field name + val docQuery = DocLevelQuery( + query = "test_field:\"us-west-2\"", + name = "3", + fields = listOf(), + queryFieldNames = listOf("wrong_field") + ) + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + + indexDoc(index, "1", testDoc) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + + val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + assertEquals(1, output.objectMap("trigger_results").values.size) + + for (triggerResult in output.objectMap("trigger_results").values) { + assertEquals(0, triggerResult.objectMap("action_results").values.size) + for (alertActionResult in triggerResult.objectMap("action_results").values) { + for (actionResult in alertActionResult.values) { + @Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map>)["output"] + as Map + assertEquals("Hello ${monitor.name}", actionOutput["subject"]) + assertEquals("Hello ${monitor.name}", actionOutput["message"]) + } + } + } + + val alerts = searchAlerts(monitor) + assertEquals("Alert saved for test monitor", 0, alerts.size) + } + + fun `test fetch_query_field_names setting is disabled by configuring queryFieldNames set up with wrong field still works`() { + adminClient().updateSettings(AlertingSettings.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.key, "false") + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val index = createTestIndex() + // using wrong field name + val docQuery = DocLevelQuery( + query = "test_field:\"us-west-2\"", + name = "3", + fields = listOf(), + queryFieldNames = listOf("wrong_field") + ) val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) @@ -105,6 +266,120 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) assertTrue("Incorrect search result", matchingDocsToQuery.contains("1|$testIndex")) assertTrue("Incorrect search result", matchingDocsToQuery.contains("5|$testIndex")) + + // ensure doc level query is deleted on dry run + val request = """{ + "size": 10, + "query": { + "match_all": {} + } + }""" + var httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 0L, it.value) } + } + + fun `test execute monitor returns search result with dryrun then without dryrun ensure dry run query not saved`() { + val testIndex = createTestIndex() + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger)) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex, "2", testDoc) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) + assertTrue("Incorrect search result", matchingDocsToQuery.contains("1|$testIndex")) + assertTrue("Incorrect search result", matchingDocsToQuery.contains("2|$testIndex")) + + // ensure doc level query is deleted on dry run + val request = """{ + "size": 10, + "query": { + "match_all": {} + } + }""" + var httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.totalHits?.let { assertEquals(0L, it.value) } + + // create and execute second monitor not as dryrun + val testIndex2 = createTestIndex("test1") + val testTime2 = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc2 = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime2", + "test_field" : "us-east-1" + }""" + + val docQuery2 = DocLevelQuery(query = "test_field:\"us-east-1\"", name = "3", fields = listOf()) + val docLevelInput2 = DocLevelMonitorInput("description", listOf(testIndex2), listOf(docQuery2)) + + val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor2 = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput2), triggers = listOf(trigger2))) + assertNotNull(monitor2.id) + + indexDoc(testIndex2, "1", testDoc2) + indexDoc(testIndex2, "5", testDoc2) + + val response2 = executeMonitor(monitor2.id) + val output2 = entityAsMap(response2) + + assertEquals(monitor2.name, output2["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult2 = (output2.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val matchingDocsToQuery2 = searchResult2[docQuery2.id] as List + assertEquals("Incorrect search result", 2, matchingDocsToQuery2.size) + assertTrue("Incorrect search result", matchingDocsToQuery2.containsAll(listOf("1|$testIndex2", "5|$testIndex2"))) + + val alerts = searchAlertsWithFilter(monitor2) + assertEquals("Alert saved for test monitor", 2, alerts.size) + + val findings = searchFindings(monitor2) + assertEquals("Findings saved for test monitor", 2, findings.size) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) + assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5")) + + // ensure query from second monitor was saved + val expectedQueries = listOf("test_field_test1_${monitor2.id}:\"us-east-1\"") + httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.forEach { hit -> + val query = ((hit.sourceAsMap["query"] as Map)["query_string"] as Map)["query"] + assertTrue(expectedQueries.contains(query)) + } + searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 1L, it.value) } } fun `test execute monitor generates alerts and findings`() {