Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added integrations tests for checking workflow creation and update sc… #1

Open
wants to merge 4 commits into
base: composite-workflow
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,16 @@ import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction
import org.opensearch.alerting.transport.TransportDeleteMonitorAction
import org.opensearch.alerting.transport.TransportDeleteWorkflowAction
import org.opensearch.alerting.transport.TransportExecuteMonitorAction
import org.opensearch.alerting.transport.TransportGetAlertsAction
import org.opensearch.alerting.transport.TransportGetDestinationsAction
import org.opensearch.alerting.transport.TransportGetEmailAccountAction
import org.opensearch.alerting.transport.TransportGetEmailGroupAction
import org.opensearch.alerting.transport.TransportGetFindingsSearchAction
import org.opensearch.alerting.transport.TransportGetMonitorAction
import org.opensearch.alerting.transport.TransportGetWorkflowAction
import org.opensearch.alerting.transport.TransportIndexCompositeWorkflowAction
import org.opensearch.alerting.transport.TransportIndexMonitorAction
import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
Expand Down Expand Up @@ -80,6 +83,7 @@ import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.env.Environment
import org.opensearch.env.NodeEnvironment
import org.opensearch.index.IndexModule
Expand Down Expand Up @@ -180,8 +184,10 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(SearchEmailGroupAction.INSTANCE, TransportSearchEmailGroupAction::class.java),
ActionPlugin.ActionHandler(GetDestinationsAction.INSTANCE, TransportGetDestinationsAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_ALERTS_ACTION_TYPE, TransportGetAlertsAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_FINDINGS_ACTION_TYPE, TransportGetFindingsSearchAction::class.java)

ActionPlugin.ActionHandler(AlertingActions.GET_FINDINGS_ACTION_TYPE, TransportGetFindingsSearchAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexCompositeWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java)
)
}

Expand All @@ -193,7 +199,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
QueryLevelTrigger.XCONTENT_REGISTRY,
BucketLevelTrigger.XCONTENT_REGISTRY,
ClusterMetricsInput.XCONTENT_REGISTRY,
DocumentLevelTrigger.XCONTENT_REGISTRY
DocumentLevelTrigger.XCONTENT_REGISTRY,
Workflow.XCONTENT_REGISTRY
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
package org.opensearch.alerting.model.workflow

data class WorkflowRunResult {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should return list of monitor run results within this class with info from each delegate monitor run.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should return list of monitor run results within this class with info from each delegate monitor run.

Yeah - I agree. That's in another PR that contains execution of the workflow changes. Check it out here

}
data class WorkflowRunResult(private val someArg: String)
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to make changes in TransportDeleteMonitorAction to stop a monitor from being deleted if its a part of an existing composite workflow sequence as a delegate monitor.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good point Sashank!

* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.transport

import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ActionListener
import org.opensearch.action.ActionRequest
import org.opensearch.action.delete.DeleteRequest
import org.opensearch.action.delete.DeleteResponse
import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AlertingException
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.DeleteWorkflowRequest
import org.opensearch.commons.alerting.action.DeleteWorkflowResponse
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.authuser.User
import org.opensearch.commons.utils.recreateObject
import org.opensearch.rest.RestStatus
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add java docs for classes and comments for methods having complicated logic

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. This class is pretty much obvious and the methods inside it are pretty much descriptive. So probably for indexing the workflow

class TransportDeleteWorkflowAction @Inject constructor(
transportService: TransportService,
val client: Client,
actionFilters: ActionFilters,
val clusterService: ClusterService,
settings: Settings,
val xContentRegistry: NamedXContentRegistry
) : HandledTransportAction<ActionRequest, DeleteWorkflowResponse>(
AlertingActions.DELETE_WORKFLOW_ACTION_NAME, transportService, actionFilters, ::DeleteWorkflowRequest
),
SecureTransportAction {

@Volatile override var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings)

init {
listenFilterBySettingChange(clusterService)
}

override fun doExecute(task: Task, request: ActionRequest, actionListener: ActionListener<DeleteWorkflowResponse>) {
val transformedRequest = request as? DeleteWorkflowRequest
?: recreateObject(request) { DeleteWorkflowRequest(it) }

val user = readUserFromThreadContext(client)
val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, transformedRequest.workflowId)
.setRefreshPolicy(transformedRequest.refreshPolicy)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz remove refresh policy
we will always choose to refresh immediately.


if (!validateUserBackendRoles(user, actionListener)) {
return
}

GlobalScope.launch(Dispatchers.IO + CoroutineName("DeleteWorkflowAction")) {
DeleteWorkflowHandler(client, actionListener, deleteRequest, user, transformedRequest.workflowId).resolveUserAndStart()
}
}

inner class DeleteWorkflowHandler(
private val client: Client,
private val actionListener: ActionListener<DeleteWorkflowResponse>,
private val deleteRequest: DeleteRequest,
private val user: User?,
private val workflowId: String
) {
suspend fun resolveUserAndStart() {
try {
val workflow = getWorkflow()

val canDelete = user == null ||
!doFilterForUser(user) ||
checkUserPermissionsWithResource(
user,
workflow.user,
actionListener,
"workflow",
workflowId
)

if (canDelete) {
val deleteResponse = deleteWorkflow(workflow)
// TODO - uncomment once the workflow metadata is added
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be uncommented?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well no - because the workflow metadata are not added still

// deleteMetadata(workflow)
actionListener.onResponse(DeleteWorkflowResponse(deleteResponse.id, deleteResponse.version))
} else {
actionListener.onFailure(
AlertingException(
"Not allowed to delete this workflow!",
RestStatus.FORBIDDEN,
IllegalStateException()
)
)
}
} catch (t: Exception) {
actionListener.onFailure(AlertingException.wrap(t))
}
}

private suspend fun getWorkflow(): Workflow {
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, workflowId)

val getResponse: GetResponse = client.suspendUntil { get(getRequest, it) }
if (getResponse.isExists == false) {
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException("Workflow with $workflowId is not found", RestStatus.NOT_FOUND)
)
)
}
val xcp = XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
getResponse.sourceAsBytesRef, XContentType.JSON
)
return ScheduledJob.parse(xcp, getResponse.id, getResponse.version) as Workflow
}

private suspend fun deleteWorkflow(workflow: Workflow): DeleteResponse {
return client.suspendUntil { delete(deleteRequest, it) }
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how are we handling failure in deletion?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add debug logs "deleted monitor $monitorId"

Copy link
Author

@stevanbz stevanbz Feb 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how are we handling failure in deletion?

Well we are calling delete(deleteRequest, it) as a lambda. You can see that second parameter of delete function call is it -> which is ActionListener forwarded to a delete function.
Something similar we are doing here

}

private suspend fun deleteMetadata(workflow: Workflow) {
val deleteRequest = DeleteRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, "${workflow.id}-metadata")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

plz add error handling

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add debug logs "deleted monitor $monitorMetadata"

Copy link
Author

@stevanbz stevanbz Feb 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So just FYI - right now there is no existing workflow metadata. This method is not called at all. Once we introduce workflow metadata I will add error handling. How this sounds?

val deleteResponse: DeleteResponse = client.suspendUntil { delete(deleteRequest, it) }
}
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.transport

import org.opensearch.OpenSearchStatusException
Expand All @@ -6,9 +11,6 @@ import org.opensearch.action.get.GetRequest
import org.opensearch.action.get.GetResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.alerting.action.GetMonitorAction
import org.opensearch.alerting.action.GetMonitorRequest
import org.opensearch.alerting.action.GetMonitorResponse
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AlertingException
import org.opensearch.client.Client
Expand All @@ -19,8 +21,11 @@ import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.NamedXContentRegistry
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.GetWorkflowRequest
import org.opensearch.commons.alerting.action.GetWorkflowResponse
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.rest.RestStatus
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
Expand All @@ -32,8 +37,8 @@ class TransportGetWorkflowAction @Inject constructor(
val xContentRegistry: NamedXContentRegistry,
val clusterService: ClusterService,
settings: Settings
) : HandledTransportAction<GetMonitorRequest, GetMonitorResponse>(
GetMonitorAction.NAME, transportService, actionFilters, ::GetMonitorRequest
) : HandledTransportAction<GetWorkflowRequest, GetWorkflowResponse>(
AlertingActions.GET_WORKFLOW_ACTION_NAME, transportService, actionFilters, ::GetWorkflowRequest
),
SecureTransportAction {

Expand All @@ -43,12 +48,12 @@ class TransportGetWorkflowAction @Inject constructor(
listenFilterBySettingChange(clusterService)
}

override fun doExecute(task: Task, getMonitorRequest: GetMonitorRequest, actionListener: ActionListener<GetMonitorResponse>) {
override fun doExecute(task: Task, getWorkflowRequest: GetWorkflowRequest, actionListener: ActionListener<GetWorkflowResponse>) {
val user = readUserFromThreadContext(client)

val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, getMonitorRequest.monitorId)
.version(getMonitorRequest.version)
.fetchSourceContext(getMonitorRequest.srcContext)
val getRequest = GetRequest(ScheduledJob.SCHEDULED_JOBS_INDEX, getWorkflowRequest.workflowId)
.version(getWorkflowRequest.version)
.fetchSourceContext(getWorkflowRequest.srcContext)

if (!validateUserBackendRoles(user, actionListener)) {
return
Expand All @@ -69,29 +74,29 @@ class TransportGetWorkflowAction @Inject constructor(
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException(
"Monitor not found.",
"Workflow not found.",
RestStatus.NOT_FOUND
)
)
)
return
}

var monitor: Monitor? = null
var workflow: Workflow? = null
if (!response.isSourceEmpty) {
XContentHelper.createParser(
xContentRegistry, LoggingDeprecationHandler.INSTANCE,
response.sourceAsBytesRef, XContentType.JSON
).use { xcp ->
monitor = ScheduledJob.parse(xcp, response.id, response.version) as Monitor
workflow = ScheduledJob.parse(xcp, response.id, response.version) as Workflow

// security is enabled and filterby is enabled
if (!checkUserPermissionsWithResource(
user,
monitor?.user,
workflow?.user,
actionListener,
"monitor",
getMonitorRequest.monitorId
"workflow",
getWorkflowRequest.workflowId
)
) {
return
Expand All @@ -100,13 +105,13 @@ class TransportGetWorkflowAction @Inject constructor(
}

actionListener.onResponse(
GetMonitorResponse(
GetWorkflowResponse(
response.id,
response.version,
response.seqNo,
response.primaryTerm,
RestStatus.OK,
monitor
workflow
)
)
}
Expand Down
Loading