Skip to content

Commit

Permalink
feat: making pre,post,deploy triggers flows idempotent (#4486)
Browse files Browse the repository at this point in the history
* update common-lib

* added idempotency code and common-lib version bump

* added the optionsal validations pubsub msg pre-processing-hooks logic and added duplicate trigger check as a validation for pre,post,deploy trigger flows

* bump common-lib

* added callback logger func

* sql scripts added

* query fix

* query fix

* fix

* refactoring

* remove dag exececutor dependency from ciEeventHalndler Sevice

* bump common-lib

* move context to trigger context

* add logs

* migration version fix

* remove test data

* update script number

---------

Co-authored-by: Kripansh <kripansh@devtron.ai>
  • Loading branch information
gireesh-naidu and kripanshdevtron committed Jan 19, 2024
1 parent 83889d6 commit 7b71af7
Show file tree
Hide file tree
Showing 25 changed files with 752 additions and 316 deletions.
10 changes: 8 additions & 2 deletions api/restHandler/PipelineTriggerRestHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,10 @@ func (handler PipelineTriggerRestHandlerImpl) OverrideConfig(w http.ResponseWrit
}
ctx := context.WithValue(r.Context(), "token", acdToken)
_, span := otel.Tracer("orchestrator").Start(ctx, "workflowDagExecutor.ManualCdTrigger")
mergeResp, err := handler.workflowDagExecutor.ManualCdTrigger(&overrideRequest, ctx)
triggerContext := pipeline.TriggerContext{
Context: ctx,
}
mergeResp, err := handler.workflowDagExecutor.ManualCdTrigger(triggerContext, &overrideRequest)
span.End()
if err != nil {
handler.logger.Errorw("request err, OverrideConfig", "err", err, "payload", overrideRequest)
Expand Down Expand Up @@ -224,7 +227,10 @@ func (handler PipelineTriggerRestHandlerImpl) StartStopApp(w http.ResponseWriter
return
}
ctx := context.WithValue(r.Context(), "token", acdToken)
mergeResp, err := handler.workflowDagExecutor.StopStartApp(&overrideRequest, ctx)
triggerContext := pipeline.TriggerContext{
Context: ctx,
}
mergeResp, err := handler.workflowDagExecutor.StopStartApp(triggerContext, &overrideRequest)
if err != nil {
handler.logger.Errorw("service err, StartStopApp", "err", err, "payload", overrideRequest)
common.WriteJsonResp(w, err, nil, http.StatusInternalServerError)
Expand Down
45 changes: 32 additions & 13 deletions api/router/pubsub/ApplicationStatusHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"github.com/devtron-labs/common-lib/pubsub-lib/model"
"github.com/devtron-labs/devtron/pkg/app"
"k8s.io/utils/pointer"
"time"

"github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig"
Expand Down Expand Up @@ -74,7 +75,7 @@ func NewApplicationStatusHandlerImpl(logger *zap.SugaredLogger, pubsubClient *pu
}
err := appStatusUpdateHandlerImpl.Subscribe()
if err != nil {
//logger.Error("err", err)
// logger.Error("err", err)
return nil
}
err = appStatusUpdateHandlerImpl.SubscribeDeleteStatus()
Expand All @@ -91,7 +92,6 @@ type ApplicationDetail struct {

func (impl *ApplicationStatusHandlerImpl) Subscribe() error {
callback := func(msg *model.PubSubMsg) {
impl.logger.Debugw("APP_STATUS_UPDATE_REQ", "stage", "raw", "data", msg.Data)
applicationDetail := ApplicationDetail{}
err := json.Unmarshal([]byte(msg.Data), &applicationDetail)
if err != nil {
Expand All @@ -109,10 +109,10 @@ func (impl *ApplicationStatusHandlerImpl) Subscribe() error {
_, err = impl.pipelineRepository.GetArgoPipelineByArgoAppName(app.ObjectMeta.Name)
if err != nil && err == pg.ErrNoRows {
impl.logger.Infow("this app not found in pipeline table looking in installed_apps table", "appName", app.ObjectMeta.Name)
//if not found in pipeline table then search in installed_apps table
// if not found in pipeline table then search in installed_apps table
gitOpsDeployedAppNames, err := impl.installedAppRepository.GetAllGitOpsDeploymentAppName()
if err != nil && err == pg.ErrNoRows {
//no installed_apps found
// no installed_apps found
impl.logger.Errorw("no installed apps found", "err", err)
return
} else if err != nil {
Expand All @@ -127,17 +127,17 @@ func (impl *ApplicationStatusHandlerImpl) Subscribe() error {
devtronGitOpsAppName = app.ObjectMeta.Name
}
if slices.Contains(gitOpsDeployedAppNames, devtronGitOpsAppName) {
//app found in installed_apps table hence setting flag to true
// app found in installed_apps table hence setting flag to true
isAppStoreApplication = true
} else {
//app neither found in installed_apps nor in pipeline table hence returning
// app neither found in installed_apps nor in pipeline table hence returning
return
}
}
isSucceeded, pipelineOverride, err := impl.appService.UpdateDeploymentStatusAndCheckIsSucceeded(app, applicationDetail.StatusTime, isAppStoreApplication)
if err != nil {
impl.logger.Errorw("error on application status update", "err", err, "msg", string(msg.Data))
//TODO - check update for charts - fix this call
// TODO - check update for charts - fix this call
if err == pg.ErrNoRows {
// if not found in charts (which is for devtron apps) try to find in installed app (which is for devtron charts)
_, err := impl.installedAppService.UpdateInstalledAppVersionStatus(app)
Expand All @@ -153,7 +153,10 @@ func (impl *ApplicationStatusHandlerImpl) Subscribe() error {
// invoke DagExecutor, for cd success which will trigger post stage if exist.
if isSucceeded {
impl.logger.Debugw("git hash history", "list", app.Status.History)
err = impl.workflowDagExecutor.HandleDeploymentSuccessEvent(pipelineOverride)
triggerContext := pipeline.TriggerContext{
ReferenceId: pointer.String(msg.MsgId),
}
err = impl.workflowDagExecutor.HandleDeploymentSuccessEvent(triggerContext, pipelineOverride)
if err != nil {
impl.logger.Errorw("deployment success event error", "pipelineOverride", pipelineOverride, "err", err)
return
Expand All @@ -162,7 +165,13 @@ func (impl *ApplicationStatusHandlerImpl) Subscribe() error {
impl.logger.Debugw("application status update completed", "app", app.Name)
}

err := impl.pubsubClient.Subscribe(pubsub.APPLICATION_STATUS_UPDATE_TOPIC, callback)
// add required logging here
var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) {
return "", nil
}

validations := impl.workflowDagExecutor.GetTriggerValidateFuncs()
err := impl.pubsubClient.Subscribe(pubsub.APPLICATION_STATUS_UPDATE_TOPIC, callback, loggerFunc, validations...)
if err != nil {
impl.logger.Error(err)
return err
Expand All @@ -173,7 +182,6 @@ func (impl *ApplicationStatusHandlerImpl) Subscribe() error {
func (impl *ApplicationStatusHandlerImpl) SubscribeDeleteStatus() error {
callback := func(msg *model.PubSubMsg) {

impl.logger.Debugw("APP_STATUS_DELETE_REQ", "stage", "raw", "data", msg.Data)
applicationDetail := ApplicationDetail{}
err := json.Unmarshal([]byte(msg.Data), &applicationDetail)
if err != nil {
Expand All @@ -191,7 +199,18 @@ func (impl *ApplicationStatusHandlerImpl) SubscribeDeleteStatus() error {
impl.logger.Errorw("error in updating pipeline delete status", "err", err, "appName", app.Name)
}
}
err := impl.pubsubClient.Subscribe(pubsub.APPLICATION_STATUS_DELETE_TOPIC, callback)

// add required logging here
var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) {
applicationDetail := ApplicationDetail{}
err := json.Unmarshal([]byte(msg.Data), &applicationDetail)
if err != nil {
return "unmarshal error on app delete status", []interface{}{"err", err}
}
return "got message for application status delete", []interface{}{"appName", applicationDetail.Application.Name, "namespace", applicationDetail.Application.Namespace, "deleteTimestamp", applicationDetail.Application.DeletionTimestamp}
}

err := impl.pubsubClient.Subscribe(pubsub.APPLICATION_STATUS_DELETE_TOPIC, callback, loggerFunc)
if err != nil {
impl.logger.Errorw("error in subscribing to argo application status delete topic", "err", err)
return err
Expand All @@ -210,7 +229,7 @@ func (impl *ApplicationStatusHandlerImpl) updateArgoAppDeleteStatus(app *v1alpha
return errors.New("invalid nats message, pipeline already deleted")
}
if err == pg.ErrNoRows {
//Helm app deployed using argocd
// Helm app deployed using argocd
var gitHash string
if app.Operation != nil && app.Operation.Sync != nil {
gitHash = app.Operation.Sync.Revision
Expand All @@ -229,7 +248,7 @@ func (impl *ApplicationStatusHandlerImpl) updateArgoAppDeleteStatus(app *v1alpha
impl.logger.Errorw("App not found in database", "installedAppId", model.InstalledAppId, "err", err)
return fmt.Errorf("app not found in database %s", err)
} else if installedApp.DeploymentAppDeleteRequest == false {
//TODO 4465 remove app from log after final RCA
// TODO 4465 remove app from log after final RCA
impl.logger.Infow("Deployment delete not requested for app, not deleting app from DB", "appName", app.Name, "app", app)
return nil
}
Expand Down
29 changes: 22 additions & 7 deletions api/router/pubsub/CiEventHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/devtron-labs/devtron/util"
"go.uber.org/zap"
"gopkg.in/go-playground/validator.v9"
"k8s.io/utils/pointer"
"time"
)

Expand Down Expand Up @@ -100,8 +101,6 @@ func NewCiEventHandlerImpl(logger *zap.SugaredLogger, pubsubClient *pubsub.PubSu

func (impl *CiEventHandlerImpl) Subscribe() error {
callback := func(msg *model.PubSubMsg) {
impl.logger.Debugw("ci complete event received")
//defer msg.Ack()
ciCompleteEvent := CiCompleteEvent{}
err := json.Unmarshal([]byte(string(msg.Data)), &ciCompleteEvent)
if err != nil {
Expand All @@ -114,6 +113,10 @@ func (impl *CiEventHandlerImpl) Subscribe() error {
return
}

triggerContext := pipeline.TriggerContext{
ReferenceId: pointer.String(msg.MsgId),
}

if ciCompleteEvent.FailureReason != "" {
req.FailureReason = ciCompleteEvent.FailureReason
err := impl.webhookService.HandleCiStepFailedEvent(ciCompleteEvent.PipelineId, req)
Expand All @@ -136,7 +139,7 @@ func (impl *CiEventHandlerImpl) Subscribe() error {
impl.logger.Error("Error while creating request for pipelineID", "pipelineId", ciCompleteEvent.PipelineId, "err", err)
return
}
resp, err := impl.ValidateAndHandleCiSuccessEvent(ciCompleteEvent.PipelineId, request, detail.ImagePushedAt)
resp, err := impl.ValidateAndHandleCiSuccessEvent(triggerContext, ciCompleteEvent.PipelineId, request, detail.ImagePushedAt)
if err != nil {
return
}
Expand All @@ -146,28 +149,40 @@ func (impl *CiEventHandlerImpl) Subscribe() error {

} else {
util.TriggerCIMetrics(ciCompleteEvent.Metrics, impl.ciEventConfig.ExposeCiMetrics, ciCompleteEvent.PipelineName, ciCompleteEvent.AppName)
resp, err := impl.ValidateAndHandleCiSuccessEvent(ciCompleteEvent.PipelineId, req, &time.Time{})
resp, err := impl.ValidateAndHandleCiSuccessEvent(triggerContext, ciCompleteEvent.PipelineId, req, &time.Time{})
if err != nil {
return
}
impl.logger.Debug(resp)
}
}
err := impl.pubsubClient.Subscribe(pubsub.CI_COMPLETE_TOPIC, callback)

// add required logging here
var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) {
ciCompleteEvent := CiCompleteEvent{}
err := json.Unmarshal([]byte(string(msg.Data)), &ciCompleteEvent)
if err != nil {
return "error while unmarshalling json data", []interface{}{"error", err}
}
return "got message for ci-completion", []interface{}{"ciPipelineId", ciCompleteEvent.PipelineId, "workflowId", ciCompleteEvent.WorkflowId}
}

validations := impl.webhookService.GetTriggerValidateFuncs()
err := impl.pubsubClient.Subscribe(pubsub.CI_COMPLETE_TOPIC, callback, loggerFunc, validations...)
if err != nil {
impl.logger.Error(err)
return err
}
return nil
}

func (impl *CiEventHandlerImpl) ValidateAndHandleCiSuccessEvent(ciPipelineId int, request *pipeline.CiArtifactWebhookRequest, imagePushedAt *time.Time) (int, error) {
func (impl *CiEventHandlerImpl) ValidateAndHandleCiSuccessEvent(triggerContext pipeline.TriggerContext, ciPipelineId int, request *pipeline.CiArtifactWebhookRequest, imagePushedAt *time.Time) (int, error) {
validationErr := impl.validator.Struct(request)
if validationErr != nil {
impl.logger.Errorw("validation err, HandleCiSuccessEvent", "err", validationErr, "payload", request)
return 0, validationErr
}
buildArtifactId, err := impl.webhookService.HandleCiSuccessEvent(ciPipelineId, request, imagePushedAt)
buildArtifactId, err := impl.webhookService.HandleCiSuccessEvent(triggerContext, ciPipelineId, request, imagePushedAt)
if err != nil {
impl.logger.Error("Error while sending event for CI success for pipelineID",
ciPipelineId, "request", request, "error", err)
Expand Down
13 changes: 12 additions & 1 deletion api/router/pubsub/GitWebhookHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,18 @@ func (impl *GitWebhookHandlerImpl) Subscribe() error {
return
}
}
err := impl.pubsubClient.Subscribe(pubsub.NEW_CI_MATERIAL_TOPIC, callback)

// add required logging here
var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) {
ciPipelineMaterial := gitSensor.CiPipelineMaterial{}
err := json.Unmarshal([]byte(string(msg.Data)), &ciPipelineMaterial)
if err != nil {
return "error while unmarshalling json response", []interface{}{"error", err}
}
return "got message for about new ci material", []interface{}{"ciPipelineMaterialId", ciPipelineMaterial.Id, "gitMaterialId", ciPipelineMaterial.GitMaterialId, "type", ciPipelineMaterial.Type}
}

err := impl.pubsubClient.Subscribe(pubsub.NEW_CI_MATERIAL_TOPIC, callback, loggerFunc)
if err != nil {
impl.logger.Error("err", err)
return err
Expand Down
34 changes: 27 additions & 7 deletions api/router/pubsub/WorkflowStatusUpdateHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ func NewWorkflowStatusUpdateHandlerImpl(logger *zap.SugaredLogger, pubsubClient

func (impl *WorkflowStatusUpdateHandlerImpl) Subscribe() error {
callback := func(msg *model.PubSubMsg) {
impl.logger.Debug("received wf update request")
//defer msg.Ack()
wfStatus := v1alpha1.WorkflowStatus{}
err := json.Unmarshal([]byte(string(msg.Data)), &wfStatus)
if err != nil {
Expand All @@ -93,7 +91,20 @@ func (impl *WorkflowStatusUpdateHandlerImpl) Subscribe() error {
}

}
err := impl.pubsubClient.Subscribe(pubsub.WORKFLOW_STATUS_UPDATE_TOPIC, callback)

// add required logging here
var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) {
wfStatus := v1alpha1.WorkflowStatus{}
err := json.Unmarshal([]byte(string(msg.Data)), &wfStatus)
if err != nil {
return "error while unmarshalling wf status update", []interface{}{"err", err, "msg", string(msg.Data)}
}

workflowName, status, _, message, _, _ := pipeline.ExtractWorkflowStatus(wfStatus)
return "got message for ci workflow status update ", []interface{}{"workflowName", workflowName, "status", status, "message", message}
}

err := impl.pubsubClient.Subscribe(pubsub.WORKFLOW_STATUS_UPDATE_TOPIC, callback, loggerFunc)

if err != nil {
impl.logger.Error("err", err)
Expand All @@ -104,16 +115,13 @@ func (impl *WorkflowStatusUpdateHandlerImpl) Subscribe() error {

func (impl *WorkflowStatusUpdateHandlerImpl) SubscribeCD() error {
callback := func(msg *model.PubSubMsg) {
impl.logger.Debug("received cd wf update request")
//defer msg.Ack()
wfStatus := v1alpha1.WorkflowStatus{}
err := json.Unmarshal([]byte(string(msg.Data)), &wfStatus)
if err != nil {
impl.logger.Error("Error while unmarshalling wfStatus json object", "error", err)
return
}

impl.logger.Debugw("received cd wf update request body", "body", wfStatus)
wfrId, wfrStatus, err := impl.cdHandler.UpdateWorkflow(wfStatus)
impl.logger.Debugw("UpdateWorkflow", "wfrId", wfrId, "wfrStatus", wfrStatus)
if err != nil {
Expand Down Expand Up @@ -170,7 +178,19 @@ func (impl *WorkflowStatusUpdateHandlerImpl) SubscribeCD() error {
}
}
}
err := impl.pubsubClient.Subscribe(pubsub.CD_WORKFLOW_STATUS_UPDATE, callback)

// add required logging here
var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) {
wfStatus := v1alpha1.WorkflowStatus{}
err := json.Unmarshal([]byte(string(msg.Data)), &wfStatus)
if err != nil {
return "error while unmarshalling wfStatus json object", []interface{}{"error", err}
}
workflowName, status, _, message, _, _ := pipeline.ExtractWorkflowStatus(wfStatus)
return "got message for cd workflow status", []interface{}{"workflowName", workflowName, "status", status, "message", message}
}

err := impl.pubsubClient.Subscribe(pubsub.CD_WORKFLOW_STATUS_UPDATE, callback, loggerFunc)
if err != nil {
impl.logger.Error("err", err)
return err
Expand Down
Loading

0 comments on commit 7b71af7

Please sign in to comment.