diff --git a/api/restHandler/PipelineTriggerRestHandler.go b/api/restHandler/PipelineTriggerRestHandler.go index ea67eade5b7..f019462e65a 100644 --- a/api/restHandler/PipelineTriggerRestHandler.go +++ b/api/restHandler/PipelineTriggerRestHandler.go @@ -129,7 +129,10 @@ func (handler PipelineTriggerRestHandlerImpl) OverrideConfig(w http.ResponseWrit } ctx := context.WithValue(r.Context(), "token", acdToken) _, span := otel.Tracer("orchestrator").Start(ctx, "workflowDagExecutor.ManualCdTrigger") - mergeResp, err := handler.workflowDagExecutor.ManualCdTrigger(&overrideRequest, ctx) + triggerContext := pipeline.TriggerContext{ + Context: ctx, + } + mergeResp, err := handler.workflowDagExecutor.ManualCdTrigger(triggerContext, &overrideRequest) span.End() if err != nil { handler.logger.Errorw("request err, OverrideConfig", "err", err, "payload", overrideRequest) @@ -224,7 +227,10 @@ func (handler PipelineTriggerRestHandlerImpl) StartStopApp(w http.ResponseWriter return } ctx := context.WithValue(r.Context(), "token", acdToken) - mergeResp, err := handler.workflowDagExecutor.StopStartApp(&overrideRequest, ctx) + triggerContext := pipeline.TriggerContext{ + Context: ctx, + } + mergeResp, err := handler.workflowDagExecutor.StopStartApp(triggerContext, &overrideRequest) if err != nil { handler.logger.Errorw("service err, StartStopApp", "err", err, "payload", overrideRequest) common.WriteJsonResp(w, err, nil, http.StatusInternalServerError) diff --git a/api/router/pubsub/ApplicationStatusHandler.go b/api/router/pubsub/ApplicationStatusHandler.go index fdee49452ec..3767a44f1ec 100644 --- a/api/router/pubsub/ApplicationStatusHandler.go +++ b/api/router/pubsub/ApplicationStatusHandler.go @@ -24,6 +24,7 @@ import ( "fmt" "github.com/devtron-labs/common-lib/pubsub-lib/model" "github.com/devtron-labs/devtron/pkg/app" + "k8s.io/utils/pointer" "time" "github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig" @@ -74,7 +75,7 @@ func NewApplicationStatusHandlerImpl(logger *zap.SugaredLogger, pubsubClient *pu } err := appStatusUpdateHandlerImpl.Subscribe() if err != nil { - //logger.Error("err", err) + // logger.Error("err", err) return nil } err = appStatusUpdateHandlerImpl.SubscribeDeleteStatus() @@ -91,7 +92,6 @@ type ApplicationDetail struct { func (impl *ApplicationStatusHandlerImpl) Subscribe() error { callback := func(msg *model.PubSubMsg) { - impl.logger.Debugw("APP_STATUS_UPDATE_REQ", "stage", "raw", "data", msg.Data) applicationDetail := ApplicationDetail{} err := json.Unmarshal([]byte(msg.Data), &applicationDetail) if err != nil { @@ -109,10 +109,10 @@ func (impl *ApplicationStatusHandlerImpl) Subscribe() error { _, err = impl.pipelineRepository.GetArgoPipelineByArgoAppName(app.ObjectMeta.Name) if err != nil && err == pg.ErrNoRows { impl.logger.Infow("this app not found in pipeline table looking in installed_apps table", "appName", app.ObjectMeta.Name) - //if not found in pipeline table then search in installed_apps table + // if not found in pipeline table then search in installed_apps table gitOpsDeployedAppNames, err := impl.installedAppRepository.GetAllGitOpsDeploymentAppName() if err != nil && err == pg.ErrNoRows { - //no installed_apps found + // no installed_apps found impl.logger.Errorw("no installed apps found", "err", err) return } else if err != nil { @@ -127,17 +127,17 @@ func (impl *ApplicationStatusHandlerImpl) Subscribe() error { devtronGitOpsAppName = app.ObjectMeta.Name } if slices.Contains(gitOpsDeployedAppNames, devtronGitOpsAppName) { - //app found in installed_apps table hence setting flag to true + // app found in installed_apps table hence setting flag to true isAppStoreApplication = true } else { - //app neither found in installed_apps nor in pipeline table hence returning + // app neither found in installed_apps nor in pipeline table hence returning return } } isSucceeded, pipelineOverride, err := impl.appService.UpdateDeploymentStatusAndCheckIsSucceeded(app, applicationDetail.StatusTime, isAppStoreApplication) if err != nil { impl.logger.Errorw("error on application status update", "err", err, "msg", string(msg.Data)) - //TODO - check update for charts - fix this call + // TODO - check update for charts - fix this call if err == pg.ErrNoRows { // if not found in charts (which is for devtron apps) try to find in installed app (which is for devtron charts) _, err := impl.installedAppService.UpdateInstalledAppVersionStatus(app) @@ -153,7 +153,10 @@ func (impl *ApplicationStatusHandlerImpl) Subscribe() error { // invoke DagExecutor, for cd success which will trigger post stage if exist. if isSucceeded { impl.logger.Debugw("git hash history", "list", app.Status.History) - err = impl.workflowDagExecutor.HandleDeploymentSuccessEvent(pipelineOverride) + triggerContext := pipeline.TriggerContext{ + ReferenceId: pointer.String(msg.MsgId), + } + err = impl.workflowDagExecutor.HandleDeploymentSuccessEvent(triggerContext, pipelineOverride) if err != nil { impl.logger.Errorw("deployment success event error", "pipelineOverride", pipelineOverride, "err", err) return @@ -162,7 +165,13 @@ func (impl *ApplicationStatusHandlerImpl) Subscribe() error { impl.logger.Debugw("application status update completed", "app", app.Name) } - err := impl.pubsubClient.Subscribe(pubsub.APPLICATION_STATUS_UPDATE_TOPIC, callback) + // add required logging here + var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) { + return "", nil + } + + validations := impl.workflowDagExecutor.GetTriggerValidateFuncs() + err := impl.pubsubClient.Subscribe(pubsub.APPLICATION_STATUS_UPDATE_TOPIC, callback, loggerFunc, validations...) if err != nil { impl.logger.Error(err) return err @@ -173,7 +182,6 @@ func (impl *ApplicationStatusHandlerImpl) Subscribe() error { func (impl *ApplicationStatusHandlerImpl) SubscribeDeleteStatus() error { callback := func(msg *model.PubSubMsg) { - impl.logger.Debugw("APP_STATUS_DELETE_REQ", "stage", "raw", "data", msg.Data) applicationDetail := ApplicationDetail{} err := json.Unmarshal([]byte(msg.Data), &applicationDetail) if err != nil { @@ -191,7 +199,18 @@ func (impl *ApplicationStatusHandlerImpl) SubscribeDeleteStatus() error { impl.logger.Errorw("error in updating pipeline delete status", "err", err, "appName", app.Name) } } - err := impl.pubsubClient.Subscribe(pubsub.APPLICATION_STATUS_DELETE_TOPIC, callback) + + // add required logging here + var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) { + applicationDetail := ApplicationDetail{} + err := json.Unmarshal([]byte(msg.Data), &applicationDetail) + if err != nil { + return "unmarshal error on app delete status", []interface{}{"err", err} + } + return "got message for application status delete", []interface{}{"appName", applicationDetail.Application.Name, "namespace", applicationDetail.Application.Namespace, "deleteTimestamp", applicationDetail.Application.DeletionTimestamp} + } + + err := impl.pubsubClient.Subscribe(pubsub.APPLICATION_STATUS_DELETE_TOPIC, callback, loggerFunc) if err != nil { impl.logger.Errorw("error in subscribing to argo application status delete topic", "err", err) return err @@ -210,7 +229,7 @@ func (impl *ApplicationStatusHandlerImpl) updateArgoAppDeleteStatus(app *v1alpha return errors.New("invalid nats message, pipeline already deleted") } if err == pg.ErrNoRows { - //Helm app deployed using argocd + // Helm app deployed using argocd var gitHash string if app.Operation != nil && app.Operation.Sync != nil { gitHash = app.Operation.Sync.Revision @@ -229,7 +248,7 @@ func (impl *ApplicationStatusHandlerImpl) updateArgoAppDeleteStatus(app *v1alpha impl.logger.Errorw("App not found in database", "installedAppId", model.InstalledAppId, "err", err) return fmt.Errorf("app not found in database %s", err) } else if installedApp.DeploymentAppDeleteRequest == false { - //TODO 4465 remove app from log after final RCA + // TODO 4465 remove app from log after final RCA impl.logger.Infow("Deployment delete not requested for app, not deleting app from DB", "appName", app.Name, "app", app) return nil } diff --git a/api/router/pubsub/CiEventHandler.go b/api/router/pubsub/CiEventHandler.go index 53f95fc9387..a759854dc3f 100644 --- a/api/router/pubsub/CiEventHandler.go +++ b/api/router/pubsub/CiEventHandler.go @@ -31,6 +31,7 @@ import ( "github.com/devtron-labs/devtron/util" "go.uber.org/zap" "gopkg.in/go-playground/validator.v9" + "k8s.io/utils/pointer" "time" ) @@ -100,8 +101,6 @@ func NewCiEventHandlerImpl(logger *zap.SugaredLogger, pubsubClient *pubsub.PubSu func (impl *CiEventHandlerImpl) Subscribe() error { callback := func(msg *model.PubSubMsg) { - impl.logger.Debugw("ci complete event received") - //defer msg.Ack() ciCompleteEvent := CiCompleteEvent{} err := json.Unmarshal([]byte(string(msg.Data)), &ciCompleteEvent) if err != nil { @@ -114,6 +113,10 @@ func (impl *CiEventHandlerImpl) Subscribe() error { return } + triggerContext := pipeline.TriggerContext{ + ReferenceId: pointer.String(msg.MsgId), + } + if ciCompleteEvent.FailureReason != "" { req.FailureReason = ciCompleteEvent.FailureReason err := impl.webhookService.HandleCiStepFailedEvent(ciCompleteEvent.PipelineId, req) @@ -136,7 +139,7 @@ func (impl *CiEventHandlerImpl) Subscribe() error { impl.logger.Error("Error while creating request for pipelineID", "pipelineId", ciCompleteEvent.PipelineId, "err", err) return } - resp, err := impl.ValidateAndHandleCiSuccessEvent(ciCompleteEvent.PipelineId, request, detail.ImagePushedAt) + resp, err := impl.ValidateAndHandleCiSuccessEvent(triggerContext, ciCompleteEvent.PipelineId, request, detail.ImagePushedAt) if err != nil { return } @@ -146,14 +149,26 @@ func (impl *CiEventHandlerImpl) Subscribe() error { } else { util.TriggerCIMetrics(ciCompleteEvent.Metrics, impl.ciEventConfig.ExposeCiMetrics, ciCompleteEvent.PipelineName, ciCompleteEvent.AppName) - resp, err := impl.ValidateAndHandleCiSuccessEvent(ciCompleteEvent.PipelineId, req, &time.Time{}) + resp, err := impl.ValidateAndHandleCiSuccessEvent(triggerContext, ciCompleteEvent.PipelineId, req, &time.Time{}) if err != nil { return } impl.logger.Debug(resp) } } - err := impl.pubsubClient.Subscribe(pubsub.CI_COMPLETE_TOPIC, callback) + + // add required logging here + var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) { + ciCompleteEvent := CiCompleteEvent{} + err := json.Unmarshal([]byte(string(msg.Data)), &ciCompleteEvent) + if err != nil { + return "error while unmarshalling json data", []interface{}{"error", err} + } + return "got message for ci-completion", []interface{}{"ciPipelineId", ciCompleteEvent.PipelineId, "workflowId", ciCompleteEvent.WorkflowId} + } + + validations := impl.webhookService.GetTriggerValidateFuncs() + err := impl.pubsubClient.Subscribe(pubsub.CI_COMPLETE_TOPIC, callback, loggerFunc, validations...) if err != nil { impl.logger.Error(err) return err @@ -161,13 +176,13 @@ func (impl *CiEventHandlerImpl) Subscribe() error { return nil } -func (impl *CiEventHandlerImpl) ValidateAndHandleCiSuccessEvent(ciPipelineId int, request *pipeline.CiArtifactWebhookRequest, imagePushedAt *time.Time) (int, error) { +func (impl *CiEventHandlerImpl) ValidateAndHandleCiSuccessEvent(triggerContext pipeline.TriggerContext, ciPipelineId int, request *pipeline.CiArtifactWebhookRequest, imagePushedAt *time.Time) (int, error) { validationErr := impl.validator.Struct(request) if validationErr != nil { impl.logger.Errorw("validation err, HandleCiSuccessEvent", "err", validationErr, "payload", request) return 0, validationErr } - buildArtifactId, err := impl.webhookService.HandleCiSuccessEvent(ciPipelineId, request, imagePushedAt) + buildArtifactId, err := impl.webhookService.HandleCiSuccessEvent(triggerContext, ciPipelineId, request, imagePushedAt) if err != nil { impl.logger.Error("Error while sending event for CI success for pipelineID", ciPipelineId, "request", request, "error", err) diff --git a/api/router/pubsub/GitWebhookHandler.go b/api/router/pubsub/GitWebhookHandler.go index 9d5974b2d8f..32415cdf5c5 100644 --- a/api/router/pubsub/GitWebhookHandler.go +++ b/api/router/pubsub/GitWebhookHandler.go @@ -68,7 +68,18 @@ func (impl *GitWebhookHandlerImpl) Subscribe() error { return } } - err := impl.pubsubClient.Subscribe(pubsub.NEW_CI_MATERIAL_TOPIC, callback) + + // add required logging here + var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) { + ciPipelineMaterial := gitSensor.CiPipelineMaterial{} + err := json.Unmarshal([]byte(string(msg.Data)), &ciPipelineMaterial) + if err != nil { + return "error while unmarshalling json response", []interface{}{"error", err} + } + return "got message for about new ci material", []interface{}{"ciPipelineMaterialId", ciPipelineMaterial.Id, "gitMaterialId", ciPipelineMaterial.GitMaterialId, "type", ciPipelineMaterial.Type} + } + + err := impl.pubsubClient.Subscribe(pubsub.NEW_CI_MATERIAL_TOPIC, callback, loggerFunc) if err != nil { impl.logger.Error("err", err) return err diff --git a/api/router/pubsub/WorkflowStatusUpdateHandler.go b/api/router/pubsub/WorkflowStatusUpdateHandler.go index 99970f573e9..f44c2af4a3a 100644 --- a/api/router/pubsub/WorkflowStatusUpdateHandler.go +++ b/api/router/pubsub/WorkflowStatusUpdateHandler.go @@ -71,8 +71,6 @@ func NewWorkflowStatusUpdateHandlerImpl(logger *zap.SugaredLogger, pubsubClient func (impl *WorkflowStatusUpdateHandlerImpl) Subscribe() error { callback := func(msg *model.PubSubMsg) { - impl.logger.Debug("received wf update request") - //defer msg.Ack() wfStatus := v1alpha1.WorkflowStatus{} err := json.Unmarshal([]byte(string(msg.Data)), &wfStatus) if err != nil { @@ -93,7 +91,20 @@ func (impl *WorkflowStatusUpdateHandlerImpl) Subscribe() error { } } - err := impl.pubsubClient.Subscribe(pubsub.WORKFLOW_STATUS_UPDATE_TOPIC, callback) + + // add required logging here + var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) { + wfStatus := v1alpha1.WorkflowStatus{} + err := json.Unmarshal([]byte(string(msg.Data)), &wfStatus) + if err != nil { + return "error while unmarshalling wf status update", []interface{}{"err", err, "msg", string(msg.Data)} + } + + workflowName, status, _, message, _, _ := pipeline.ExtractWorkflowStatus(wfStatus) + return "got message for ci workflow status update ", []interface{}{"workflowName", workflowName, "status", status, "message", message} + } + + err := impl.pubsubClient.Subscribe(pubsub.WORKFLOW_STATUS_UPDATE_TOPIC, callback, loggerFunc) if err != nil { impl.logger.Error("err", err) @@ -104,8 +115,6 @@ func (impl *WorkflowStatusUpdateHandlerImpl) Subscribe() error { func (impl *WorkflowStatusUpdateHandlerImpl) SubscribeCD() error { callback := func(msg *model.PubSubMsg) { - impl.logger.Debug("received cd wf update request") - //defer msg.Ack() wfStatus := v1alpha1.WorkflowStatus{} err := json.Unmarshal([]byte(string(msg.Data)), &wfStatus) if err != nil { @@ -113,7 +122,6 @@ func (impl *WorkflowStatusUpdateHandlerImpl) SubscribeCD() error { return } - impl.logger.Debugw("received cd wf update request body", "body", wfStatus) wfrId, wfrStatus, err := impl.cdHandler.UpdateWorkflow(wfStatus) impl.logger.Debugw("UpdateWorkflow", "wfrId", wfrId, "wfrStatus", wfrStatus) if err != nil { @@ -170,7 +178,19 @@ func (impl *WorkflowStatusUpdateHandlerImpl) SubscribeCD() error { } } } - err := impl.pubsubClient.Subscribe(pubsub.CD_WORKFLOW_STATUS_UPDATE, callback) + + // add required logging here + var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) { + wfStatus := v1alpha1.WorkflowStatus{} + err := json.Unmarshal([]byte(string(msg.Data)), &wfStatus) + if err != nil { + return "error while unmarshalling wfStatus json object", []interface{}{"error", err} + } + workflowName, status, _, message, _, _ := pipeline.ExtractWorkflowStatus(wfStatus) + return "got message for cd workflow status", []interface{}{"workflowName", workflowName, "status", status, "message", message} + } + + err := impl.pubsubClient.Subscribe(pubsub.CD_WORKFLOW_STATUS_UPDATE, callback, loggerFunc) if err != nil { impl.logger.Error("err", err) return err diff --git a/client/cron/CdApplicationStatusUpdateHandler.go b/client/cron/CdApplicationStatusUpdateHandler.go index 42550080ebe..3645a6dda57 100644 --- a/client/cron/CdApplicationStatusUpdateHandler.go +++ b/client/cron/CdApplicationStatusUpdateHandler.go @@ -18,6 +18,7 @@ import ( "github.com/devtron-labs/devtron/util" "github.com/robfig/cron/v3" "go.uber.org/zap" + "k8s.io/utils/pointer" "strconv" "time" ) @@ -128,7 +129,6 @@ func (impl *CdApplicationStatusUpdateHandlerImpl) Subscribe() error { impl.logger.Errorw("unmarshal error on argo pipeline status update event", "err", err) return } - impl.logger.Debugw("ARGO_PIPELINE_STATUS_UPDATE_REQ", "stage", "subscribeDataUnmarshal", "data", statusUpdateEvent) if statusUpdateEvent.IsAppStoreApplication { installedApp, err = impl.installedAppVersionRepository.GetInstalledAppByInstalledAppVersionId(statusUpdateEvent.InstalledAppVersionId) @@ -144,13 +144,29 @@ func (impl *CdApplicationStatusUpdateHandlerImpl) Subscribe() error { } } - err, _ = impl.CdHandler.UpdatePipelineTimelineAndStatusByLiveApplicationFetch(cdPipeline, installedApp, statusUpdateEvent.UserId) + triggerContext := pipeline.TriggerContext{ + ReferenceId: pointer.String(msg.MsgId), + } + + err, _ = impl.CdHandler.UpdatePipelineTimelineAndStatusByLiveApplicationFetch(triggerContext, cdPipeline, installedApp, statusUpdateEvent.UserId) if err != nil { impl.logger.Errorw("error on argo pipeline status update", "err", err, "msg", string(msg.Data)) return } } - err := impl.pubsubClient.Subscribe(pubsub.ARGO_PIPELINE_STATUS_UPDATE_TOPIC, callback) + + // add required logging here + var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) { + statusUpdateEvent := pipeline.ArgoPipelineStatusSyncEvent{} + err := json.Unmarshal([]byte(string(msg.Data)), &statusUpdateEvent) + if err != nil { + return "unmarshal error on argo pipeline status update event", []interface{}{"err", err} + } + return "got message for argo pipeline status update", []interface{}{"pipelineId", statusUpdateEvent.PipelineId, "installedAppVersionId", statusUpdateEvent.InstalledAppVersionId, "isAppStoreApplication", statusUpdateEvent.IsAppStoreApplication} + } + + validations := impl.workflowDagExecutor.GetTriggerValidateFuncs() + err := impl.pubsubClient.Subscribe(pubsub.ARGO_PIPELINE_STATUS_UPDATE_TOPIC, callback, loggerFunc, validations...) if err != nil { impl.logger.Errorw("error in subscribing to argo application status update topic", "err", err) return err @@ -182,7 +198,7 @@ func (impl *CdApplicationStatusUpdateHandlerImpl) ArgoApplicationStatusUpdate() defer func() { middleware.DeploymentStatusCronDuration.WithLabelValues(pipeline.DEVTRON_APP_ARGO_PIPELINE_STATUS_UPDATE_CRON).Observe(time.Since(cronProcessStartTime).Seconds()) }() - //TODO: remove below cron with division of cron for argo pipelines of devtron-apps and helm-apps + // TODO: remove below cron with division of cron for argo pipelines of devtron-apps and helm-apps defer func() { middleware.DeploymentStatusCronDuration.WithLabelValues(pipeline.HELM_APP_ARGO_PIPELINE_STATUS_UPDATE_CRON).Observe(time.Since(cronProcessStartTime).Seconds()) }() @@ -227,7 +243,7 @@ func (impl *CdApplicationStatusUpdateHandlerImpl) SyncPipelineStatusForResourceT } func (impl *CdApplicationStatusUpdateHandlerImpl) SyncPipelineStatusForAppStoreForResourceTreeCall(installedAppVersion *repository2.InstalledAppVersions) error { - //find installed app version history using parameter obj + // find installed app version history using parameter obj installedAppVersionHistory, err := impl.installedAppVersionHistoryRepository.GetLatestInstalledAppVersionHistory(installedAppVersion.Id) if err != nil { impl.logger.Errorw("error in getting latest installedAppVersionHistory by installedAppVersionId", "err", err, "installedAppVersionId", installedAppVersion.Id) @@ -263,7 +279,7 @@ func (impl *CdApplicationStatusUpdateHandlerImpl) ManualSyncPipelineStatus(appId cdPipeline = cdPipelines[0] } - err, isTimelineUpdated := impl.CdHandler.UpdatePipelineTimelineAndStatusByLiveApplicationFetch(cdPipeline, installedApp, userId) + err, isTimelineUpdated := impl.CdHandler.UpdatePipelineTimelineAndStatusByLiveApplicationFetch(pipeline.TriggerContext{}, cdPipeline, installedApp, userId) if err != nil { impl.logger.Errorw("error on argo pipeline status update", "err", err) return nil diff --git a/go.mod b/go.mod index 12632838e5a..335d2a07149 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,7 @@ require ( github.com/davecgh/go-spew v1.1.1 github.com/deckarep/golang-set v1.8.0 github.com/devtron-labs/authenticator v0.4.33 - github.com/devtron-labs/common-lib v0.0.9-0.20231226070212-c47f7a07ebf5 + github.com/devtron-labs/common-lib v0.0.9-0.20240104121009-1052d04e42b1 github.com/devtron-labs/protos v0.0.0-20230503113602-282404f70fd2 github.com/evanphx/json-patch v5.6.0+incompatible github.com/ghodss/yaml v1.0.1-0.20190212211648-25d852aebe32 diff --git a/go.sum b/go.sum index 6cb0c4b6dce..e18020b3362 100644 --- a/go.sum +++ b/go.sum @@ -223,8 +223,8 @@ github.com/denisenkom/go-mssqldb v0.0.0-20190707035753-2be1aa521ff4 h1:YcpmyvADG github.com/denisenkom/go-mssqldb v0.0.0-20190707035753-2be1aa521ff4/go.mod h1:zAg7JM8CkOJ43xKXIj7eRO9kmWm/TW578qo+oDO6tuM= github.com/devtron-labs/authenticator v0.4.33 h1:FpAV3ZgFluaRFcMwPpwxr/mwSipJ16XRvgABq3BzP5Y= github.com/devtron-labs/authenticator v0.4.33/go.mod h1:ozNfT8WcruiSgnUbyp48WVfc41++W6xYXhKFp67lNTU= -github.com/devtron-labs/common-lib v0.0.9-0.20231226070212-c47f7a07ebf5 h1:+Nh2SMzAdgBr1tgdKAlF5cN0CvTPUj1V/sI5aRUrZnE= -github.com/devtron-labs/common-lib v0.0.9-0.20231226070212-c47f7a07ebf5/go.mod h1:pBThgympEjsza6GShqNNGCPBFXNDx0DGMc7ID/VHTAw= +github.com/devtron-labs/common-lib v0.0.9-0.20240104121009-1052d04e42b1 h1:V12SY+XsnyaD6CyGc+7Sma1WXdGC4pS5GrQk2Qpilh4= +github.com/devtron-labs/common-lib v0.0.9-0.20240104121009-1052d04e42b1/go.mod h1:pBThgympEjsza6GShqNNGCPBFXNDx0DGMc7ID/VHTAw= github.com/devtron-labs/protos v0.0.0-20230503113602-282404f70fd2 h1:/IEIsJTxDZ3hv8uOoCaqdWCXqcv7nCAgX9AP/v84dUY= github.com/devtron-labs/protos v0.0.0-20230503113602-282404f70fd2/go.mod h1:l85jxWHlcSo910hdUfRycL40yGzC6glE93V1sVxVPto= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= diff --git a/internal/sql/repository/pipelineConfig/CdWorfkflowRepository.go b/internal/sql/repository/pipelineConfig/CdWorfkflowRepository.go index d4ba0f9d12f..ed6be1c69fc 100644 --- a/internal/sql/repository/pipelineConfig/CdWorfkflowRepository.go +++ b/internal/sql/repository/pipelineConfig/CdWorfkflowRepository.go @@ -19,6 +19,7 @@ package pipelineConfig import ( "context" + "errors" "fmt" "github.com/devtron-labs/common-lib/utils/k8s/health" "github.com/devtron-labs/devtron/api/bean" @@ -35,6 +36,7 @@ import ( ) type CdWorkflowRepository interface { + CheckWorkflowRunnerByReferenceId(referenceId string) (bool, error) SaveWorkFlow(ctx context.Context, wf *CdWorkflow) error UpdateWorkFlow(wf *CdWorkflow) error FindById(wfId int) (*CdWorkflow, error) @@ -164,8 +166,8 @@ type CdWorkflowRunner struct { tableName struct{} `sql:"cd_workflow_runner" pg:",discard_unknown_columns"` Id int `sql:"id,pk"` Name string `sql:"name"` - WorkflowType bean.WorkflowType `sql:"workflow_type"` //pre,post,deploy - ExecutorType WorkflowExecutorType `sql:"executor_type"` //awf, system + WorkflowType bean.WorkflowType `sql:"workflow_type"` // pre,post,deploy + ExecutorType WorkflowExecutorType `sql:"executor_type"` // awf, system Status string `sql:"status"` PodStatus string `sql:"pod_status"` Message string `sql:"message"` @@ -179,6 +181,7 @@ type CdWorkflowRunner struct { BlobStorageEnabled bool `sql:"blob_storage_enabled,notnull"` RefCdWorkflowRunnerId int `sql:"ref_cd_workflow_runner_id,notnull"` ImagePathReservationIds []int `sql:"image_path_reservation_ids" pg:",array,notnull"` + ReferenceId *string `sql:"reference_id"` CdWorkflow *CdWorkflow sql.AuditLog } @@ -329,7 +332,7 @@ func (impl *CdWorkflowRepositoryImpl) FindLatestCdWorkflowByPipelineId(pipelineI func (impl *CdWorkflowRepositoryImpl) FindLatestCdWorkflowByPipelineIdV2(pipelineIds []int) ([]*CdWorkflow, error) { var cdWorkflow []*CdWorkflow - //err := impl.dbConnection.Model(&cdWorkflow).Where("pipeline_id in (?)", pg.In(pipelineIds)).Order("id DESC").Select() + // err := impl.dbConnection.Model(&cdWorkflow).Where("pipeline_id in (?)", pg.In(pipelineIds)).Order("id DESC").Select() query := "SELECT cdw.pipeline_id, cdw.workflow_status, MAX(id) as id from cd_workflow cdw" + " WHERE cdw.pipeline_id in(?)" + " GROUP by cdw.pipeline_id, cdw.workflow_status ORDER by id desc;" @@ -337,7 +340,7 @@ func (impl *CdWorkflowRepositoryImpl) FindLatestCdWorkflowByPipelineIdV2(pipelin if err != nil { return cdWorkflow, err } - //TODO - Group By Environment And Pipeline will get latest pipeline from top + // TODO - Group By Environment And Pipeline will get latest pipeline from top return cdWorkflow, err } @@ -353,7 +356,7 @@ func (impl *CdWorkflowRepositoryImpl) FindCdWorkflowMetaByEnvironmentId(appId in Join("inner join cd_workflow wf on wf.id = cd_workflow_runner.cd_workflow_id"). Join("inner join ci_artifact cia on cia.id = wf.ci_artifact_id"). Join("inner join pipeline p on p.id = wf.pipeline_id"). - //Join("left join users u on u.id = wfr.triggered_by"). + // Join("left join users u on u.id = wfr.triggered_by"). Offset(offset).Limit(limit). Select() if err != nil { @@ -392,11 +395,11 @@ func (impl *CdWorkflowRepositoryImpl) FindCdWorkflowMetaByPipelineId(pipelineId Column("cd_workflow_runner.*", "CdWorkflow", "CdWorkflow.Pipeline", "CdWorkflow.CiArtifact"). Where("cd_workflow.pipeline_id = ?", pipelineId). Order("cd_workflow_runner.id DESC"). - //Join("inner join cd_workflow wf on wf.id = cd_workflow_runner.cd_workflow_id"). - //Join("inner join ci_artifact cia on cia.id = wf.ci_artifact_id"). - //Join("inner join pipeline p on p.id = wf.pipeline_id"). - //Join("left join users u on u.id = wfr.triggered_by"). - //Order("ORDER BY cd_workflow_runner.started_on DESC"). + // Join("inner join cd_workflow wf on wf.id = cd_workflow_runner.cd_workflow_id"). + // Join("inner join ci_artifact cia on cia.id = wf.ci_artifact_id"). + // Join("inner join pipeline p on p.id = wf.pipeline_id"). + // Join("left join users u on u.id = wfr.triggered_by"). + // Order("ORDER BY cd_workflow_runner.started_on DESC"). Offset(offset).Limit(limit). Select() @@ -713,3 +716,13 @@ func (impl *CdWorkflowRepositoryImpl) GetLatestTriggersOfHelmPipelinesStuckInNon } return wfrList, err } + +func (impl *CdWorkflowRepositoryImpl) CheckWorkflowRunnerByReferenceId(referenceId string) (bool, error) { + exists, err := impl.dbConnection.Model((*CdWorkflowRunner)(nil)). + Where("cd_workflow_runner.reference_id = ?", referenceId). + Exists() + if errors.Is(err, pg.ErrNoRows) { + return false, nil + } + return exists, err +} diff --git a/pkg/appStore/deployment/fullMode/AppStoreDeploymentFullModeService.go b/pkg/appStore/deployment/fullMode/AppStoreDeploymentFullModeService.go index 0899e6d7d0f..09270dec201 100644 --- a/pkg/appStore/deployment/fullMode/AppStoreDeploymentFullModeService.go +++ b/pkg/appStore/deployment/fullMode/AppStoreDeploymentFullModeService.go @@ -488,7 +488,6 @@ func (impl AppStoreDeploymentFullModeServiceImpl) SubscribeHelmInstallStatus() e callback := func(msg *model.PubSubMsg) { - impl.logger.Debug("received helm install status event - HELM_INSTALL_STATUS", "data", msg.Data) helmInstallNatsMessage := &appStoreBean.HelmReleaseStatusConfig{} err := json.Unmarshal([]byte(msg.Data), helmInstallNatsMessage) if err != nil { @@ -514,7 +513,17 @@ func (impl AppStoreDeploymentFullModeServiceImpl) SubscribeHelmInstallStatus() e } } - err := impl.pubSubClient.Subscribe(pubsub_lib.HELM_CHART_INSTALL_STATUS_TOPIC, callback) + // add required logging here + var loggerFunc pubsub_lib.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) { + helmInstallNatsMessage := &appStoreBean.HelmReleaseStatusConfig{} + err := json.Unmarshal([]byte(msg.Data), helmInstallNatsMessage) + if err != nil { + return "error in unmarshalling helm install status nats message", []interface{}{"err", err} + } + return "got nats msg for helm chart install status", []interface{}{"InstallAppVersionHistoryId", helmInstallNatsMessage.InstallAppVersionHistoryId, "ErrorInInstallation", helmInstallNatsMessage.ErrorInInstallation, "IsReleaseInstalled", helmInstallNatsMessage.IsReleaseInstalled} + } + + err := impl.pubSubClient.Subscribe(pubsub_lib.HELM_CHART_INSTALL_STATUS_TOPIC, callback, loggerFunc) if err != nil { impl.logger.Error(err) return err diff --git a/pkg/appStore/deployment/service/InstalledAppService.go b/pkg/appStore/deployment/service/InstalledAppService.go index 58c54eb5a11..9f25ef5ace0 100644 --- a/pkg/appStore/deployment/service/InstalledAppService.go +++ b/pkg/appStore/deployment/service/InstalledAppService.go @@ -647,8 +647,6 @@ func (impl *InstalledAppServiceImpl) triggerDeploymentEvent(installAppVersions [ func (impl *InstalledAppServiceImpl) Subscribe() error { callback := func(msg *model.PubSubMsg) { - impl.logger.Debug("cd stage event received") - //defer msg.Ack() deployPayload := &appStoreBean.DeployPayload{} err := json.Unmarshal([]byte(string(msg.Data)), &deployPayload) if err != nil { @@ -662,7 +660,18 @@ func (impl *InstalledAppServiceImpl) Subscribe() error { impl.logger.Errorw("error in performing deploy stage", "deployPayload", deployPayload, "err", err) } } - err := impl.pubsubClient.Subscribe(pubsub.BULK_APPSTORE_DEPLOY_TOPIC, callback) + + // add required logging here + var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) { + deployPayload := &appStoreBean.DeployPayload{} + err := json.Unmarshal([]byte(string(msg.Data)), &deployPayload) + if err != nil { + return "error while unmarshalling deployPayload json object", []interface{}{"error", err} + } + return "got message for deploy app-store apps in bulk", []interface{}{"installedAppVersionId", deployPayload.InstalledAppVersionId, "installedAppVersionHistoryId", deployPayload.InstalledAppVersionHistoryId} + } + + err := impl.pubsubClient.Subscribe(pubsub.BULK_APPSTORE_DEPLOY_TOPIC, callback, loggerFunc) if err != nil { impl.logger.Error("err", err) return err diff --git a/pkg/bulkAction/BulkUpdateService.go b/pkg/bulkAction/BulkUpdateService.go index c60c415a0b7..d696620d5a3 100644 --- a/pkg/bulkAction/BulkUpdateService.go +++ b/pkg/bulkAction/BulkUpdateService.go @@ -39,6 +39,7 @@ import ( "github.com/tidwall/gjson" "github.com/tidwall/sjson" "go.uber.org/zap" + "k8s.io/utils/pointer" "net/http" "sort" "strings" @@ -1069,7 +1070,10 @@ func (impl BulkUpdateServiceImpl) BulkHibernate(request *BulkApplicationForEnvir UserId: request.UserId, RequestType: pipeline1.STOP, } - _, hibernateReqError = impl.workflowDagExecutor.StopStartApp(stopRequest, ctx) + triggerContext := pipeline1.TriggerContext{ + Context: ctx, + } + _, hibernateReqError = impl.workflowDagExecutor.StopStartApp(triggerContext, stopRequest) if hibernateReqError != nil { impl.logger.Errorw("error in hibernating application", "err", hibernateReqError, "pipeline", pipeline) pipelineResponse := response[appKey] @@ -1225,7 +1229,10 @@ func (impl BulkUpdateServiceImpl) BulkUnHibernate(request *BulkApplicationForEnv UserId: request.UserId, RequestType: pipeline1.START, } - _, hibernateReqError = impl.workflowDagExecutor.StopStartApp(stopRequest, ctx) + triggerContext := pipeline1.TriggerContext{ + Context: ctx, + } + _, hibernateReqError = impl.workflowDagExecutor.StopStartApp(triggerContext, stopRequest) if hibernateReqError != nil { impl.logger.Errorw("error in un-hibernating application", "err", hibernateReqError, "pipeline", pipeline) pipelineResponse := response[appKey] @@ -1423,9 +1430,6 @@ func (impl BulkUpdateServiceImpl) BulkDeploy(request *BulkApplicationForEnvironm func (impl BulkUpdateServiceImpl) SubscribeToCdBulkTriggerTopic() error { callback := func(msg *model.PubSubMsg) { - impl.logger.Infow("Event received", - "topic", pubsub.CD_BULK_DEPLOY_TRIGGER_TOPIC, - "msg", msg.Data) event := &bean.BulkCdDeployEvent{} err := json.Unmarshal([]byte(msg.Data), event) @@ -1446,7 +1450,12 @@ func (impl BulkUpdateServiceImpl) SubscribeToCdBulkTriggerTopic() error { return } - _, err = impl.workflowDagExecutor.ManualCdTrigger(event.ValuesOverrideRequest, ctx) + triggerContext := pipeline1.TriggerContext{ + ReferenceId: pointer.String(msg.MsgId), + Context: ctx, + } + + _, err = impl.workflowDagExecutor.ManualCdTrigger(triggerContext, event.ValuesOverrideRequest) if err != nil { impl.logger.Errorw("Error triggering CD", "topic", pubsub.CD_BULK_DEPLOY_TRIGGER_TOPIC, @@ -1454,7 +1463,19 @@ func (impl BulkUpdateServiceImpl) SubscribeToCdBulkTriggerTopic() error { "err", err) } } - err := impl.pubsubClient.Subscribe(pubsub.CD_BULK_DEPLOY_TRIGGER_TOPIC, callback) + + // add required logging here + var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) { + event := &bean.BulkCdDeployEvent{} + err := json.Unmarshal([]byte(msg.Data), event) + if err != nil { + return "error unmarshalling received event", []interface{}{"msg", msg.Data, "err", err} + } + return "got message for trigger cd in bulk", []interface{}{"pipelineId", event.ValuesOverrideRequest.PipelineId, "appId", event.ValuesOverrideRequest.AppId, "cdWorkflowType", event.ValuesOverrideRequest.CdWorkflowType, "ciArtifactId", event.ValuesOverrideRequest.CiArtifactId} + } + + validations := impl.workflowDagExecutor.GetTriggerValidateFuncs() + err := impl.pubsubClient.Subscribe(pubsub.CD_BULK_DEPLOY_TRIGGER_TOPIC, callback, loggerFunc, validations...) if err != nil { impl.logger.Error("failed to subscribe to NATS topic", "topic", pubsub.CD_BULK_DEPLOY_TRIGGER_TOPIC, diff --git a/pkg/pipeline/CdHandler.go b/pkg/pipeline/CdHandler.go index 7737a2668d1..7b75325dc65 100644 --- a/pkg/pipeline/CdHandler.go +++ b/pkg/pipeline/CdHandler.go @@ -84,7 +84,7 @@ type CdHandler interface { CheckHelmAppStatusPeriodicallyAndUpdateInDb(helmPipelineStatusCheckEligibleTime int, getPipelineDeployedWithinHours int) error CheckArgoAppStatusPeriodicallyAndUpdateInDb(getPipelineDeployedBeforeMinutes int, getPipelineDeployedWithinHours int) error CheckArgoPipelineTimelineStatusPeriodicallyAndUpdateInDb(pendingSinceSeconds int, timeForDegradation int) error - UpdatePipelineTimelineAndStatusByLiveApplicationFetch(pipeline *pipelineConfig.Pipeline, installedApp repository3.InstalledApps, userId int32) (err error, isTimelineUpdated bool) + UpdatePipelineTimelineAndStatusByLiveApplicationFetch(triggerContext TriggerContext, pipeline *pipelineConfig.Pipeline, installedApp repository3.InstalledApps, userId int32) (err error, isTimelineUpdated bool) CheckAndSendArgoPipelineStatusSyncEventIfNeeded(pipelineId int, installedAppVersionId int, userId int32, isAppStoreApplication bool) FetchAppWorkflowStatusForTriggerViewForEnvironment(request resourceGroup2.ResourceGroupingRequest, token string) ([]*pipelineConfig.CdWorkflowStatus, error) FetchAppDeploymentStatusForEnvironments(request resourceGroup2.ResourceGroupingRequest, token string) ([]*pipelineConfig.AppDeploymentStatus, error) @@ -204,7 +204,7 @@ func (impl *CdHandlerImpl) HandleCdStageReTrigger(runner *pipelineConfig.CdWorkf impl.Logger.Infow("re triggering cd stage ", "runnerId", runner.Id) var err error - //add comment for this logic + // add comment for this logic if runner.RefCdWorkflowRunnerId != 0 { runner, err = impl.cdWorkflowRepository.FindWorkflowRunnerById(runner.RefCdWorkflowRunnerId) if err != nil { @@ -223,14 +223,26 @@ func (impl *CdHandlerImpl) HandleCdStageReTrigger(runner *pipelineConfig.CdWorkf return nil } + triggerRequest := TriggerRequest{ + CdWf: runner.CdWorkflow, + Pipeline: runner.CdWorkflow.Pipeline, + Artifact: runner.CdWorkflow.CiArtifact, + TriggeredBy: 1, + ApplyAuth: false, + RefCdWorkflowRunnerId: runner.Id, + TriggerContext: TriggerContext{ + Context: context.Background(), + }, + } + if runner.WorkflowType == bean.CD_WORKFLOW_TYPE_PRE { - err = impl.workflowDagExecutor.TriggerPreStage(context.Background(), runner.CdWorkflow, runner.CdWorkflow.CiArtifact, runner.CdWorkflow.Pipeline, 1, runner.Id) + err = impl.workflowDagExecutor.TriggerPreStage(triggerRequest) if err != nil { impl.Logger.Errorw("error in TriggerPreStage ", "err", err, "cdWorkflowRunnerId", runner.Id) return err } } else if runner.WorkflowType == bean.CD_WORKFLOW_TYPE_POST { - err = impl.workflowDagExecutor.TriggerPostStage(runner.CdWorkflow, runner.CdWorkflow.Pipeline, 1, runner.Id) + err = impl.workflowDagExecutor.TriggerPostStage(triggerRequest) if err != nil { impl.Logger.Errorw("error in TriggerPostStage ", "err", err, "cdWorkflowRunnerId", runner.Id) return err @@ -267,8 +279,8 @@ func (impl *CdHandlerImpl) CheckArgoAppStatusPeriodicallyAndUpdateInDb(getPipeli } func (impl *CdHandlerImpl) CheckArgoPipelineTimelineStatusPeriodicallyAndUpdateInDb(pendingSinceSeconds int, timeForDegradation int) error { - //getting all the progressing status that are stuck since some time after kubectl apply success sync stage - //and are not eligible for CheckArgoAppStatusPeriodicallyAndUpdateInDb + // getting all the progressing status that are stuck since some time after kubectl apply success sync stage + // and are not eligible for CheckArgoAppStatusPeriodicallyAndUpdateInDb pipelines, err := impl.pipelineRepository.GetArgoPipelinesHavingTriggersStuckInLastPossibleNonTerminalTimelines(pendingSinceSeconds, timeForDegradation) if err != nil && err != pg.ErrNoRows { impl.Logger.Errorw("err in GetArgoPipelinesHavingTriggersStuckInLastPossibleNonTerminalTimelines", "err", err) @@ -322,15 +334,15 @@ func (impl *CdHandlerImpl) CheckAndSendArgoPipelineStatusSyncEventIfNeeded(pipel } } - //pipelineId can be cdPipelineId or installedAppVersionId, using isAppStoreApplication flag to identify between them - if lastSyncTime.IsZero() || (!lastSyncTime.IsZero() && time.Since(lastSyncTime) > 5*time.Second) { //create new nats event + // pipelineId can be cdPipelineId or installedAppVersionId, using isAppStoreApplication flag to identify between them + if lastSyncTime.IsZero() || (!lastSyncTime.IsZero() && time.Since(lastSyncTime) > 5*time.Second) { // create new nats event statusUpdateEvent := ArgoPipelineStatusSyncEvent{ PipelineId: pipelineId, InstalledAppVersionId: installedAppVersionId, UserId: userId, IsAppStoreApplication: isAppStoreApplication, } - //write event + // write event err = impl.eventClient.WriteNatsEvent(pubub.ARGO_PIPELINE_STATUS_UPDATE_TOPIC, statusUpdateEvent) if err != nil { impl.Logger.Errorw("error in writing nats event", "topic", pubub.ARGO_PIPELINE_STATUS_UPDATE_TOPIC, "payload", statusUpdateEvent) @@ -338,7 +350,7 @@ func (impl *CdHandlerImpl) CheckAndSendArgoPipelineStatusSyncEventIfNeeded(pipel } } -func (impl *CdHandlerImpl) UpdatePipelineTimelineAndStatusByLiveApplicationFetch(pipeline *pipelineConfig.Pipeline, installedApp repository3.InstalledApps, userId int32) (error, bool) { +func (impl *CdHandlerImpl) UpdatePipelineTimelineAndStatusByLiveApplicationFetch(triggerContext TriggerContext, pipeline *pipelineConfig.Pipeline, installedApp repository3.InstalledApps, userId int32) (error, bool) { isTimelineUpdated := false isSucceeded := false var pipelineOverride *chartConfig.PipelineOverride @@ -351,7 +363,7 @@ func (impl *CdHandlerImpl) UpdatePipelineTimelineAndStatusByLiveApplicationFetch } impl.Logger.Debugw("ARGO_PIPELINE_STATUS_UPDATE_REQ", "stage", "checkingDeploymentStatus", "argoAppName", pipeline, "cdWfr", cdWfr) if util3.IsTerminalStatus(cdWfr.Status) { - //drop event + // drop event return nil, isTimelineUpdated } @@ -362,8 +374,8 @@ func (impl *CdHandlerImpl) UpdatePipelineTimelineAndStatusByLiveApplicationFetch return nil, isTimelineUpdated } } - //this should only be called when we have git-ops configured - //try fetching status from argo cd + // this should only be called when we have git-ops configured + // try fetching status from argo cd acdToken, err := impl.argoUserService.GetLatestDevtronArgoCdUserToken() if err != nil { impl.Logger.Errorw("error in getting acd token", "err", err) @@ -375,7 +387,7 @@ func (impl *CdHandlerImpl) UpdatePipelineTimelineAndStatusByLiveApplicationFetch app, err := impl.application.Get(ctx, query) if err != nil { impl.Logger.Errorw("error in getting acd application", "err", err, "argoAppName", pipeline) - //updating cdWfr status + // updating cdWfr status cdWfr.Status = pipelineConfig.WorkflowUnableToFetchState cdWfr.UpdatedOn = time.Now() cdWfr.UpdatedBy = 1 @@ -420,8 +432,8 @@ func (impl *CdHandlerImpl) UpdatePipelineTimelineAndStatusByLiveApplicationFetch } } if isSucceeded { - //handling deployment success event - err = impl.workflowDagExecutor.HandleDeploymentSuccessEvent(pipelineOverride) + // handling deployment success event + err = impl.workflowDagExecutor.HandleDeploymentSuccessEvent(triggerContext, pipelineOverride) if err != nil { impl.Logger.Errorw("error in handling deployment success event", "pipelineOverride", pipelineOverride, "err", err) return err, isTimelineUpdated @@ -436,7 +448,7 @@ func (impl *CdHandlerImpl) UpdatePipelineTimelineAndStatusByLiveApplicationFetch } impl.Logger.Debugw("ARGO_PIPELINE_STATUS_UPDATE_REQ", "stage", "checkingDeploymentStatus", "argoAppName", installedApp, "installedAppVersionHistory", installedAppVersionHistory) if util3.IsTerminalStatus(installedAppVersionHistory.Status) { - //drop event + // drop event return nil, isTimelineUpdated } if !impl.acdConfig.ArgoCDAutoSyncEnabled { @@ -450,7 +462,7 @@ func (impl *CdHandlerImpl) UpdatePipelineTimelineAndStatusByLiveApplicationFetch impl.Logger.Errorw("error in getting appDetails from appId", "err", err) return nil, isTimelineUpdated } - //TODO if Environment object in installedApp is nil then fetch envDetails also from envRepository + // TODO if Environment object in installedApp is nil then fetch envDetails also from envRepository envDetail, err := impl.envRepository.FindById(installedApp.EnvironmentId) if err != nil { impl.Logger.Errorw("error in getting envDetails from environment id", "err", err) @@ -463,8 +475,8 @@ func (impl *CdHandlerImpl) UpdatePipelineTimelineAndStatusByLiveApplicationFetch acdAppName = appDetails.AppName + "-" + envDetail.Name } - //this should only be called when we have git-ops configured - //try fetching status from argo cd + // this should only be called when we have git-ops configured + // try fetching status from argo cd acdToken, err := impl.argoUserService.GetLatestDevtronArgoCdUserToken() if err != nil { impl.Logger.Errorw("error in getting acd token", "err", err) @@ -477,7 +489,7 @@ func (impl *CdHandlerImpl) UpdatePipelineTimelineAndStatusByLiveApplicationFetch app, err := impl.application.Get(ctx, query) if err != nil { impl.Logger.Errorw("error in getting acd application", "err", err, "argoAppName", installedApp) - //updating cdWfr status + // updating cdWfr status installedAppVersionHistory.Status = pipelineConfig.WorkflowUnableToFetchState installedAppVersionHistory.UpdatedOn = time.Now() installedAppVersionHistory.UpdatedBy = 1 @@ -522,8 +534,8 @@ func (impl *CdHandlerImpl) UpdatePipelineTimelineAndStatusByLiveApplicationFetch } } if isSucceeded { - //handling deployment success event - //updating cdWfr status + // handling deployment success event + // updating cdWfr status installedAppVersionHistory.Status = pipelineConfig.WorkflowSucceeded installedAppVersionHistory.FinishedOn = time.Now() installedAppVersionHistory.UpdatedOn = time.Now() @@ -549,7 +561,7 @@ func (impl *CdHandlerImpl) CheckHelmAppStatusPeriodicallyAndUpdateInDb(helmPipel impl.Logger.Debugw("checking helm app status for non terminal deployment triggers", "wfrList", wfrList, "number of wfr", len(wfrList)) for _, wfr := range wfrList { if time.Now().Sub(wfr.StartedOn) <= time.Duration(helmPipelineStatusCheckEligibleTime)*time.Second { - //if wfr is updated within configured time then do not include for this cron cycle + // if wfr is updated within configured time then do not include for this cron cycle continue } appIdentifier := &client.AppIdentifier{ @@ -586,7 +598,7 @@ func (impl *CdHandlerImpl) CheckHelmAppStatusPeriodicallyAndUpdateInDb(helmPipel return err } go impl.appService.WriteCDSuccessEvent(pipelineOverride.Pipeline.AppId, pipelineOverride.Pipeline.EnvironmentId, pipelineOverride) - err = impl.workflowDagExecutor.HandleDeploymentSuccessEvent(pipelineOverride) + err = impl.workflowDagExecutor.HandleDeploymentSuccessEvent(TriggerContext{}, pipelineOverride) if err != nil { impl.Logger.Errorw("error on handling deployment success event", "wfr", wfr, "err", err) return err @@ -736,7 +748,7 @@ func (impl *CdHandlerImpl) extractWorkfowStatus(workflowStatus v1alpha1.Workflow logLocation := "" podName := "" for k, v := range workflowStatus.Nodes { - impl.Logger.Debugw("extractWorkflowStatus", "workflowName", k, "v", v) + impl.Logger.Debugw("ExtractWorkflowStatus", "workflowName", k, "v", v) if v.TemplateName == bean2.CD_WORKFLOW_NAME { if v.BoundaryID == "" { workflowName = k @@ -775,7 +787,7 @@ func (impl *CdHandlerImpl) stateChanged(status string, podStatus string, msg str func (impl *CdHandlerImpl) GetCdBuildHistory(appId int, environmentId int, pipelineId int, offset int, size int) ([]pipelineConfig.CdWorkflowWithArtifact, error) { var cdWorkflowArtifact []pipelineConfig.CdWorkflowWithArtifact - //this map contains artifactId -> array of tags of that artifact + // this map contains artifactId -> array of tags of that artifact imageTagsDataMap, err := impl.imageTaggingService.GetTagsDataMapByAppId(appId) if err != nil { impl.Logger.Errorw("error in fetching image tags with appId", "err", err, "appId", appId) @@ -967,7 +979,7 @@ func (impl *CdHandlerImpl) getLogsFromRepository(pipelineId int, cdWorkflow *pip } if cdConfig.LogsBucket == "" { - cdConfig.LogsBucket = impl.config.GetDefaultBuildLogsBucket() //TODO -fixme + cdConfig.LogsBucket = impl.config.GetDefaultBuildLogsBucket() // TODO -fixme } if cdConfig.CdCacheRegion == "" { cdConfig.CdCacheRegion = impl.config.GetDefaultCdLogsBucketRegion() @@ -1001,8 +1013,8 @@ func (impl *CdHandlerImpl) getLogsFromRepository(pipelineId int, cdWorkflow *pip } useExternalBlobStorage := isExternalBlobStorageEnabled(isExt, impl.config.UseBlobStorageConfigInCdWorkflow) if useExternalBlobStorage { - //fetch extClusterBlob cm and cs from k8s client, if they are present then read creds - //from them else return. + // fetch extClusterBlob cm and cs from k8s client, if they are present then read creds + // from them else return. cmConfig, secretConfig, err := impl.blobConfigStorageService.FetchCmAndSecretBlobConfigFromExternalCluster(clusterConfig, cdWorkflow.Namespace) if err != nil { impl.Logger.Errorw("error in fetching config map and secret from external cluster", "err", err, "clusterConfig", clusterConfig) @@ -1022,7 +1034,7 @@ func (impl *CdHandlerImpl) getLogsFromRepository(pipelineId int, cdWorkflow *pip return logReader, cleanUp, err } func isExternalBlobStorageEnabled(isExternalRun bool, useBlobStorageConfigInCdWorkflow bool) bool { - //TODO impl.config.UseBlobStorageConfigInCdWorkflow fetches the live status, we need to check from db as well, we should put useExternalBlobStorage in db + // TODO impl.config.UseBlobStorageConfigInCdWorkflow fetches the live status, we need to check from db as well, we should put useExternalBlobStorage in db return isExternalRun && !useBlobStorageConfigInCdWorkflow } @@ -1173,8 +1185,8 @@ func (impl *CdHandlerImpl) DownloadCdWorkflowArtifacts(pipelineId int, buildId i impl.Logger.Errorw("GetClusterConfigByClusterId, error in fetching clusterConfig", "err", err, "clusterId", wfr.CdWorkflow.Pipeline.Environment.ClusterId) return nil, err } - //fetch extClusterBlob cm and cs from k8s client, if they are present then read creds - //from them else return. + // fetch extClusterBlob cm and cs from k8s client, if they are present then read creds + // from them else return. cmConfig, secretConfig, err := impl.blobConfigStorageService.FetchCmAndSecretBlobConfigFromExternalCluster(clusterConfig, wfr.Namespace) if err != nil { impl.Logger.Errorw("error in fetching config map and secret from external cluster", "err", err, "clusterConfig", clusterConfig) @@ -1282,7 +1294,7 @@ func (impl *CdHandlerImpl) FetchAppWorkflowStatusForTriggerView(appId int) ([]*p } pipelineIds := make([]int, 0) partialDeletedPipelines := make(map[int]bool) - //pipelineIdsMap := make(map[int]int) + // pipelineIdsMap := make(map[int]int) for _, pipeline := range pipelines { pipelineIds = append(pipelineIds, pipeline.Id) partialDeletedPipelines[pipeline.Id] = pipeline.DeploymentAppDeleteRequest @@ -1389,7 +1401,7 @@ func (impl *CdHandlerImpl) FetchAppWorkflowStatusForTriggerViewForEnvironment(re if err != nil { return nil, err } - //override appIds if already provided app group id in request. + // override appIds if already provided app group id in request. request.ResourceIds = appIds } if len(request.ResourceIds) > 0 { @@ -1421,7 +1433,7 @@ func (impl *CdHandlerImpl) FetchAppWorkflowStatusForTriggerViewForEnvironment(re if len(pipelineIds) == 0 { return cdWorkflowStatus, nil } - //authorization block starts here + // authorization block starts here var appObjectArr []string var envObjectArr []string objects := impl.enforcerUtil.GetAppAndEnvObjectByPipelineIds(pipelineIds) @@ -1435,12 +1447,12 @@ func (impl *CdHandlerImpl) FetchAppWorkflowStatusForTriggerViewForEnvironment(re appObject := objects[pipeline.Id][0] envObject := objects[pipeline.Id][1] if !(appResults[appObject] && envResults[envObject]) { - //if user unauthorized, skip items + // if user unauthorized, skip items continue } pipelineIds = append(pipelineIds, pipeline.Id) } - //authorization block ends here + // authorization block ends here if len(pipelineIds) == 0 { return cdWorkflowStatus, nil } @@ -1544,7 +1556,7 @@ func (impl *CdHandlerImpl) FetchAppDeploymentStatusForEnvironments(request resou if err != nil { return nil, err } - //override appIds if already provided app group id in request. + // override appIds if already provided app group id in request. request.ResourceIds = appIds } if len(request.ResourceIds) > 0 { @@ -1564,7 +1576,7 @@ func (impl *CdHandlerImpl) FetchAppDeploymentStatusForEnvironments(request resou err = &util.ApiError{Code: "404", HttpStatusCode: 200, UserMessage: "no matching pipeline found"} return nil, err } - //authorization block starts here + // authorization block starts here var appObjectArr []string var envObjectArr []string objects := impl.enforcerUtil.GetAppAndEnvObjectByPipelineIds(pipelineIds) @@ -1578,14 +1590,14 @@ func (impl *CdHandlerImpl) FetchAppDeploymentStatusForEnvironments(request resou appObject := objects[pipeline.Id][0] envObject := objects[pipeline.Id][1] if !(appResults[appObject] && envResults[envObject]) { - //if user unauthorized, skip items + // if user unauthorized, skip items continue } pipelineIds = append(pipelineIds, pipeline.Id) pipelineAppMap[pipeline.Id] = pipeline.AppId } span.End() - //authorization block ends here + // authorization block ends here if len(pipelineIds) == 0 { return deploymentStatuses, nil @@ -1627,7 +1639,7 @@ func (impl *CdHandlerImpl) FetchAppDeploymentStatusForEnvironments(request resou } } } - //in case there is no workflow found for pipeline, set all the pipeline status - Not Deployed + // in case there is no workflow found for pipeline, set all the pipeline status - Not Deployed for _, pipelineId := range pipelineIds { if _, ok := deploymentStatusesMap[pipelineId]; !ok { deploymentStatus := &pipelineConfig.AppDeploymentStatus{} diff --git a/pkg/pipeline/CiHandler.go b/pkg/pipeline/CiHandler.go index 2ab9eff6477..c3d7e77f729 100644 --- a/pkg/pipeline/CiHandler.go +++ b/pkg/pipeline/CiHandler.go @@ -1062,7 +1062,7 @@ func (impl *CiHandlerImpl) GetHistoricBuildLogs(pipelineId int, workflowId int, return resp, err } -func (impl *CiHandlerImpl) extractWorkfowStatus(workflowStatus v1alpha1.WorkflowStatus) (string, string, string, string, string, string) { +func ExtractWorkflowStatus(workflowStatus v1alpha1.WorkflowStatus) (string, string, string, string, string, string) { workflowName := "" status := string(workflowStatus.Phase) podStatus := "" @@ -1071,7 +1071,6 @@ func (impl *CiHandlerImpl) extractWorkfowStatus(workflowStatus v1alpha1.Workflow logLocation := "" for k, v := range workflowStatus.Nodes { if v.TemplateName == bean3.CI_WORKFLOW_NAME { - impl.Logger.Infow("extractWorkflowStatus", "workflowName", k, "v", v) if v.BoundaryID == "" { workflowName = k } else { @@ -1096,7 +1095,7 @@ func (impl *CiHandlerImpl) extractWorkfowStatus(workflowStatus v1alpha1.Workflow const CiStageFailErrorCode = 2 func (impl *CiHandlerImpl) extractPodStatusAndWorkflow(workflowStatus v1alpha1.WorkflowStatus) (string, string, *pipelineConfig.CiWorkflow, error) { - workflowName, status, _, message, _, _ := impl.extractWorkfowStatus(workflowStatus) + workflowName, status, _, message, _, _ := ExtractWorkflowStatus(workflowStatus) if workflowName == "" { impl.Logger.Errorw("extract workflow status, invalid wf name", "workflowName", workflowName, "status", status, "message", message) return status, message, nil, errors.New("invalid wf name") @@ -1132,7 +1131,7 @@ func (impl *CiHandlerImpl) getRefWorkflowAndCiRetryCount(savedWorkflow *pipeline } func (impl *CiHandlerImpl) UpdateWorkflow(workflowStatus v1alpha1.WorkflowStatus) (int, error) { - workflowName, status, podStatus, message, _, podName := impl.extractWorkfowStatus(workflowStatus) + workflowName, status, podStatus, message, _, podName := ExtractWorkflowStatus(workflowStatus) if workflowName == "" { impl.Logger.Errorw("extract workflow status, invalid wf name", "workflowName", workflowName, "status", status, "podStatus", podStatus, "message", message) return 0, errors.New("invalid wf name") diff --git a/pkg/pipeline/WebhookService.go b/pkg/pipeline/WebhookService.go index f6960ddd63e..d09949749d0 100644 --- a/pkg/pipeline/WebhookService.go +++ b/pkg/pipeline/WebhookService.go @@ -24,6 +24,7 @@ import ( "fmt" "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1" "github.com/aws/aws-sdk-go-v2/service/ecr/types" + pubsub "github.com/devtron-labs/common-lib/pubsub-lib" "github.com/devtron-labs/devtron/client/events" "github.com/devtron-labs/devtron/internal/sql/repository" "github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig" @@ -60,10 +61,11 @@ type CiArtifactWebhookRequest struct { type WebhookService interface { AuthenticateExternalCiWebhook(apiKey string) (int, error) - HandleCiSuccessEvent(ciPipelineId int, request *CiArtifactWebhookRequest, imagePushedAt *time.Time) (id int, err error) + HandleCiSuccessEvent(triggerContext TriggerContext, ciPipelineId int, request *CiArtifactWebhookRequest, imagePushedAt *time.Time) (id int, err error) HandleExternalCiWebhook(externalCiId int, request *CiArtifactWebhookRequest, auth func(token string, projectObject string, envObject string) bool, token string) (id int, err error) HandleCiStepFailedEvent(ciPipelineId int, request *CiArtifactWebhookRequest) (err error) HandleMultipleImagesFromEvent(imageDetails []types.ImageDetail, ciWorkflowId int) (map[string]*pipelineConfig.CiWorkflow, error) + GetTriggerValidateFuncs() []pubsub.ValidateMsg } type WebhookServiceImpl struct { @@ -170,7 +172,7 @@ func (impl WebhookServiceImpl) HandleCiStepFailedEvent(ciPipelineId int, request return nil } -func (impl WebhookServiceImpl) HandleCiSuccessEvent(ciPipelineId int, request *CiArtifactWebhookRequest, imagePushedAt *time.Time) (id int, err error) { +func (impl WebhookServiceImpl) HandleCiSuccessEvent(triggerContext TriggerContext, ciPipelineId int, request *CiArtifactWebhookRequest, imagePushedAt *time.Time) (id int, err error) { impl.logger.Infow("webhook for artifact save", "req", request) if request.WorkflowId != nil { savedWorkflow, err := impl.ciWorkflowRepository.FindById(*request.WorkflowId) @@ -329,7 +331,7 @@ func (impl WebhookServiceImpl) HandleCiSuccessEvent(ciPipelineId int, request *C start := time.Now() impl.logger.Infow("Started: auto trigger for children Stage/CD pipelines", "Artifact count", totalCIArtifactCount) for i := 0; i < totalCIArtifactCount; { - //requests left to process + // requests left to process remainingBatch := totalCIArtifactCount - i if remainingBatch < batchSize { batchSize = remainingBatch @@ -342,7 +344,7 @@ func (impl WebhookServiceImpl) HandleCiSuccessEvent(ciPipelineId int, request *C defer wg.Done() ciArtifact := ciArtifactArr[index] // handle individual CiArtifact success event - err = impl.workflowDagExecutor.HandleCiSuccessEvent(ciArtifact, async, request.UserId) + err = impl.workflowDagExecutor.HandleCiSuccessEvent(triggerContext, ciArtifact, async, request.UserId) if err != nil { impl.logger.Errorw("error on handle ci success event", "ciArtifactId", ciArtifact.Id, "err", err) } @@ -471,7 +473,7 @@ func (impl *WebhookServiceImpl) HandleMultipleImagesFromEvent(imageDetails []typ return nil, err } - //creating n-1 workflows for rest images, oldest will be mapped to original workflow id. + // creating n-1 workflows for rest images, oldest will be mapped to original workflow id. digestWorkflowMap := make(map[string]*pipelineConfig.CiWorkflow) // mapping oldest to original ciworkflowId digestWorkflowMap[*imageDetails[0].ImageDigest] = ciWorkflow @@ -503,3 +505,7 @@ func (impl *WebhookServiceImpl) HandleMultipleImagesFromEvent(imageDetails []typ } return digestWorkflowMap, nil } + +func (impl *WebhookServiceImpl) GetTriggerValidateFuncs() []pubsub.ValidateMsg { + return impl.workflowDagExecutor.GetTriggerValidateFuncs() +} diff --git a/pkg/pipeline/WorkflowDagExecutor.go b/pkg/pipeline/WorkflowDagExecutor.go index c8df0a92ab9..385c5add351 100644 --- a/pkg/pipeline/WorkflowDagExecutor.go +++ b/pkg/pipeline/WorkflowDagExecutor.go @@ -69,6 +69,7 @@ import ( status2 "google.golang.org/grpc/status" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/helm/pkg/proto/hapi/chart" + "k8s.io/utils/pointer" "k8s.io/utils/strings/slices" "github.com/devtron-labs/devtron/internal/sql/repository/appWorkflow" @@ -96,18 +97,18 @@ import ( ) type WorkflowDagExecutor interface { - HandleCiSuccessEvent(artifact *repository.CiArtifact, async bool, triggeredBy int32) error + HandleCiSuccessEvent(triggerContext TriggerContext, artifact *repository.CiArtifact, async bool, triggeredBy int32) error HandleWebhookExternalCiEvent(artifact *repository.CiArtifact, triggeredBy int32, externalCiId int, auth func(token string, projectObject string, envObject string) bool, token string) (bool, error) - HandlePreStageSuccessEvent(cdStageCompleteEvent CdStageCompleteEvent) error - HandleDeploymentSuccessEvent(pipelineOverride *chartConfig.PipelineOverride) error - HandlePostStageSuccessEvent(cdWorkflowId int, cdPipelineId int, triggeredBy int32, pluginRegistryImageDetails map[string][]string) error + HandlePreStageSuccessEvent(triggerContext TriggerContext, cdStageCompleteEvent CdStageCompleteEvent) error + HandleDeploymentSuccessEvent(triggerContext TriggerContext, pipelineOverride *chartConfig.PipelineOverride) error + HandlePostStageSuccessEvent(triggerContext TriggerContext, cdWorkflowId int, cdPipelineId int, triggeredBy int32, pluginRegistryImageDetails map[string][]string) error Subscribe() error - TriggerPostStage(cdWf *pipelineConfig.CdWorkflow, cdPipeline *pipelineConfig.Pipeline, triggeredBy int32, refCdWorkflowRunnerId int) error - TriggerPreStage(ctx context.Context, cdWf *pipelineConfig.CdWorkflow, artifact *repository.CiArtifact, pipeline *pipelineConfig.Pipeline, triggeredBy int32, refCdWorkflowRunnerId int) error - TriggerDeployment(cdWf *pipelineConfig.CdWorkflow, artifact *repository.CiArtifact, pipeline *pipelineConfig.Pipeline, triggeredBy int32) error - ManualCdTrigger(overrideRequest *bean.ValuesOverrideRequest, ctx context.Context) (int, error) + TriggerPostStage(request TriggerRequest) error + TriggerPreStage(request TriggerRequest) error + TriggerDeployment(request TriggerRequest) error + ManualCdTrigger(triggerContext TriggerContext, overrideRequest *bean.ValuesOverrideRequest) (int, error) TriggerBulkDeploymentAsync(requests []*BulkTriggerRequest, UserId int32) (interface{}, error) - StopStartApp(stopRequest *StopAppRequest, ctx context.Context) (int, error) + StopStartApp(triggerContext TriggerContext, stopRequest *StopAppRequest) (int, error) TriggerBulkHibernateAsync(request StopDeploymentGroupRequest, ctx context.Context) (interface{}, error) RotatePods(ctx context.Context, podRotateRequest *PodRotateRequest) (*k8s.RotatePodResponse, error) MarkCurrentDeploymentFailed(runner *pipelineConfig.CdWorkflowRunner, releaseErr error, triggeredBy int32) error @@ -115,6 +116,7 @@ type WorkflowDagExecutor interface { OnDeleteCdPipelineEvent(pipelineId int, triggeredBy int32) MarkPipelineStatusTimelineFailed(runner *pipelineConfig.CdWorkflowRunner, releaseErr error) error UpdateTriggerCDMetricsOnFinish(runner *pipelineConfig.CdWorkflowRunner) + GetTriggerValidateFuncs() []pubsub.ValidateMsg } type WorkflowDagExecutorImpl struct { @@ -242,6 +244,25 @@ type CdStageCompleteEvent struct { PluginRegistryArtifactDetails map[string][]string `json:"PluginRegistryArtifactDetails"` } +type TriggerRequest struct { + CdWf *pipelineConfig.CdWorkflow + Pipeline *pipelineConfig.Pipeline + Artifact *repository.CiArtifact + ApplyAuth bool + TriggeredBy int32 + RefCdWorkflowRunnerId int + TriggerContext +} + +type TriggerContext struct { + // Context is a context object to be passed to the pipeline trigger + // +optional + Context context.Context + // ReferenceId is a unique identifier for the workflow runner + // refer pipelineConfig.CdWorkflowRunner + ReferenceId *string +} + func NewWorkflowDagExecutorImpl(Logger *zap.SugaredLogger, pipelineRepository pipelineConfig.PipelineRepository, cdWorkflowRepository pipelineConfig.CdWorkflowRepository, pubsubClient *pubsub.PubSubClientServiceImpl, @@ -417,37 +438,50 @@ func NewWorkflowDagExecutorImpl(Logger *zap.SugaredLogger, pipelineRepository pi func (impl *WorkflowDagExecutorImpl) Subscribe() error { callback := func(msg *model.PubSubMsg) { - impl.logger.Debug("cd stage event received") - //defer msg.Ack() cdStageCompleteEvent := CdStageCompleteEvent{} err := json.Unmarshal([]byte(string(msg.Data)), &cdStageCompleteEvent) if err != nil { impl.logger.Errorw("error while unmarshalling cdStageCompleteEvent object", "err", err, "msg", string(msg.Data)) return } - impl.logger.Debugw("cd stage event:", "workflowRunnerId", cdStageCompleteEvent.WorkflowRunnerId) wf, err := impl.cdWorkflowRepository.FindWorkflowRunnerById(cdStageCompleteEvent.WorkflowRunnerId) if err != nil { impl.logger.Errorw("could not get wf runner", "err", err) return } + triggerContext := TriggerContext{ + ReferenceId: pointer.String(msg.MsgId), + } if wf.WorkflowType == bean.CD_WORKFLOW_TYPE_PRE { impl.logger.Debugw("received pre stage success event for workflow runner ", "wfId", strconv.Itoa(wf.Id)) - err = impl.HandlePreStageSuccessEvent(cdStageCompleteEvent) + err = impl.HandlePreStageSuccessEvent(triggerContext, cdStageCompleteEvent) if err != nil { impl.logger.Errorw("deployment success event error", "err", err) return } } else if wf.WorkflowType == bean.CD_WORKFLOW_TYPE_POST { impl.logger.Debugw("received post stage success event for workflow runner ", "wfId", strconv.Itoa(wf.Id)) - err = impl.HandlePostStageSuccessEvent(wf.CdWorkflowId, cdStageCompleteEvent.CdPipelineId, cdStageCompleteEvent.TriggeredBy, cdStageCompleteEvent.PluginRegistryArtifactDetails) + err = impl.HandlePostStageSuccessEvent(triggerContext, wf.CdWorkflowId, cdStageCompleteEvent.CdPipelineId, cdStageCompleteEvent.TriggeredBy, cdStageCompleteEvent.PluginRegistryArtifactDetails) if err != nil { impl.logger.Errorw("deployment success event error", "err", err) return } } } - err := impl.pubsubClient.Subscribe(pubsub.CD_STAGE_COMPLETE_TOPIC, callback) + + // add required logging here + var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) { + cdStageCompleteEvent := CdStageCompleteEvent{} + err := json.Unmarshal([]byte(string(msg.Data)), &cdStageCompleteEvent) + if err != nil { + return "error while unmarshalling cdStageCompleteEvent object", []interface{}{"err", err, "msg", string(msg.Data)} + } + return "got message for cd stage completion", []interface{}{"workflowRunnerId", cdStageCompleteEvent.WorkflowRunnerId, "workflowId", cdStageCompleteEvent.WorkflowId, "cdPipelineId", cdStageCompleteEvent.CdPipelineId} + } + + validations := impl.GetTriggerValidateFuncs() + + err := impl.pubsubClient.Subscribe(pubsub.CD_STAGE_COMPLETE_TOPIC, callback, loggerFunc, validations...) if err != nil { impl.logger.Error("error", "err", err) return err @@ -486,7 +520,7 @@ func (impl *WorkflowDagExecutorImpl) UpdateWorkflowRunnerStatusForDeployment(app impl.logger.Errorw("error in getting helm app release status", "appIdentifier", appIdentifier, "err", err) // Handle release not found errors if skipReleaseNotFound && util.GetGRPCErrorDetailedMessage(err) != client2.ErrReleaseNotFound { - //skip this error and continue for next workflow status + // skip this error and continue for next workflow status impl.logger.Warnw("found error, skipping helm apps status update for this trigger", "appIdentifier", appIdentifier, "err", err) return false } @@ -735,7 +769,6 @@ func (impl *WorkflowDagExecutorImpl) processDevtronAsyncHelmInstallRequest(CDAsy func (impl *WorkflowDagExecutorImpl) SubscribeDevtronAsyncHelmInstallRequest() error { callback := func(msg *model.PubSubMsg) { - impl.logger.Debug("received Devtron App helm async install request event, SubscribeDevtronAsyncHelmInstallRequest", "data", msg.Data) CDAsyncInstallNatsMessage, appIdentifier, err := impl.extractOverrideRequestFromCDAsyncInstallEvent(msg) if err != nil { impl.logger.Errorw("err on extracting override request, SubscribeDevtronAsyncHelmInstallRequest", "err", err) @@ -750,7 +783,17 @@ func (impl *WorkflowDagExecutorImpl) SubscribeDevtronAsyncHelmInstallRequest() e return } - err := impl.pubsubClient.Subscribe(pubsub.DEVTRON_CHART_INSTALL_TOPIC, callback) + // add required logging here + var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) { + CDAsyncInstallNatsMessage := &bean.AsyncCdDeployEvent{} + err := json.Unmarshal([]byte(msg.Data), CDAsyncInstallNatsMessage) + if err != nil { + return "error in unmarshalling CD async install request nats message", []interface{}{"err", err} + } + return "got message for devtron chart install", []interface{}{"appId", CDAsyncInstallNatsMessage.ValuesOverrideRequest.AppId, "pipelineId", CDAsyncInstallNatsMessage.ValuesOverrideRequest.PipelineId, "artifactId", CDAsyncInstallNatsMessage.ValuesOverrideRequest.CiArtifactId} + } + + err := impl.pubsubClient.Subscribe(pubsub.DEVTRON_CHART_INSTALL_TOPIC, callback, loggerFunc) if err != nil { impl.logger.Error(err) return err @@ -758,7 +801,7 @@ func (impl *WorkflowDagExecutorImpl) SubscribeDevtronAsyncHelmInstallRequest() e return nil } -func (impl *WorkflowDagExecutorImpl) HandleCiSuccessEvent(artifact *repository.CiArtifact, async bool, triggeredBy int32) error { +func (impl *WorkflowDagExecutorImpl) HandleCiSuccessEvent(triggerContext TriggerContext, artifact *repository.CiArtifact, async bool, triggeredBy int32) error { //1. get cd pipelines //2. get config //3. trigger wf/ deployment @@ -775,8 +818,14 @@ func (impl *WorkflowDagExecutorImpl) HandleCiSuccessEvent(artifact *repository.C return err } for _, pipeline := range pipelines { - //passing applyAuth as false since this event is for auto trigger and user who already has access to further stages can trigger ci also - err = impl.triggerIfAutoStageCdPipeline(nil, pipeline, artifact, triggeredBy) + triggerRequest := TriggerRequest{ + CdWf: nil, + Pipeline: pipeline, + Artifact: artifact, + TriggeredBy: triggeredBy, + TriggerContext: triggerContext, + } + err = impl.triggerIfAutoStageCdPipeline(triggerRequest) if err != nil { impl.logger.Debugw("error on trigger cd pipeline", "err", err) } @@ -814,7 +863,14 @@ func (impl *WorkflowDagExecutorImpl) HandleWebhookExternalCiEvent(artifact *repo for _, pipeline := range pipelines { //applyAuth=false, already auth applied for this flow - err = impl.triggerIfAutoStageCdPipeline(nil, pipeline, artifact, triggeredBy) + triggerRequest := TriggerRequest{ + CdWf: nil, + Pipeline: pipeline, + Artifact: artifact, + ApplyAuth: false, + TriggeredBy: triggeredBy, + } + err = impl.triggerIfAutoStageCdPipeline(triggerRequest) if err != nil { impl.logger.Debugw("error on trigger cd pipeline", "err", err) return hasAnyTriggered, err @@ -843,31 +899,32 @@ func (impl *WorkflowDagExecutorImpl) deleteCorruptedPipelineStage(pipelineStage return nil, false } -func (impl *WorkflowDagExecutorImpl) triggerIfAutoStageCdPipeline(cdWf *pipelineConfig.CdWorkflow, pipeline *pipelineConfig.Pipeline, artifact *repository.CiArtifact, triggeredBy int32) error { +func (impl *WorkflowDagExecutorImpl) triggerIfAutoStageCdPipeline(request TriggerRequest) error { - preStage, err := impl.getPipelineStage(pipeline.Id, repository4.PIPELINE_STAGE_TYPE_PRE_CD) + preStage, err := impl.getPipelineStage(request.Pipeline.Id, repository4.PIPELINE_STAGE_TYPE_PRE_CD) if err != nil { return err } //handle corrupt data (https://github.com/devtron-labs/devtron/issues/3826) - err, deleted := impl.deleteCorruptedPipelineStage(preStage, triggeredBy) + err, deleted := impl.deleteCorruptedPipelineStage(preStage, request.TriggeredBy) if err != nil { - impl.logger.Errorw("error in deleteCorruptedPipelineStage ", "cdPipelineId", pipeline.Id, "err", err, "preStage", preStage, "triggeredBy", triggeredBy) + impl.logger.Errorw("error in deleteCorruptedPipelineStage ", "cdPipelineId", request.Pipeline.Id, "err", err, "preStage", preStage, "triggeredBy", request.TriggeredBy) return err } - if len(pipeline.PreStageConfig) > 0 || (preStage != nil && !deleted) { + request.TriggerContext.Context = context.Background() + if len(request.Pipeline.PreStageConfig) > 0 || (preStage != nil && !deleted) { // pre stage exists - if pipeline.PreTriggerType == pipelineConfig.TRIGGER_TYPE_AUTOMATIC { - impl.logger.Debugw("trigger pre stage for pipeline", "artifactId", artifact.Id, "pipelineId", pipeline.Id) - err = impl.TriggerPreStage(context.Background(), cdWf, artifact, pipeline, artifact.UpdatedBy, 0) //TODO handle error here + if request.Pipeline.PreTriggerType == pipelineConfig.TRIGGER_TYPE_AUTOMATIC { + impl.logger.Debugw("trigger pre stage for pipeline", "artifactId", request.Artifact.Id, "pipelineId", request.Pipeline.Id) + err = impl.TriggerPreStage(request) // TODO handle error here return err } - } else if pipeline.TriggerType == pipelineConfig.TRIGGER_TYPE_AUTOMATIC { + } else if request.Pipeline.TriggerType == pipelineConfig.TRIGGER_TYPE_AUTOMATIC { // trigger deployment - impl.logger.Debugw("trigger cd for pipeline", "artifactId", artifact.Id, "pipelineId", pipeline.Id) - err = impl.TriggerDeployment(cdWf, artifact, pipeline, triggeredBy) + impl.logger.Debugw("trigger cd for pipeline", "artifactId", request.Artifact.Id, "pipelineId", request.Pipeline.Id) + err = impl.TriggerDeployment(request) return err } return nil @@ -882,33 +939,36 @@ func (impl *WorkflowDagExecutorImpl) getPipelineStage(pipelineId int, stageType return stage, nil } -func (impl *WorkflowDagExecutorImpl) triggerStageForBulk(cdWf *pipelineConfig.CdWorkflow, pipeline *pipelineConfig.Pipeline, artifact *repository.CiArtifact, async bool, triggeredBy int32) error { +func (impl *WorkflowDagExecutorImpl) triggerStageForBulk(triggerRequest TriggerRequest, async bool) error { - preStage, err := impl.getPipelineStage(pipeline.Id, repository4.PIPELINE_STAGE_TYPE_PRE_CD) + preStage, err := impl.getPipelineStage(triggerRequest.Pipeline.Id, repository4.PIPELINE_STAGE_TYPE_PRE_CD) if err != nil { return err } //handle corrupt data (https://github.com/devtron-labs/devtron/issues/3826) - err, deleted := impl.deleteCorruptedPipelineStage(preStage, triggeredBy) + err, deleted := impl.deleteCorruptedPipelineStage(preStage, triggerRequest.TriggeredBy) if err != nil { - impl.logger.Errorw("error in deleteCorruptedPipelineStage ", "cdPipelineId", pipeline.Id, "err", err, "preStage", preStage, "triggeredBy", triggeredBy) + impl.logger.Errorw("error in deleteCorruptedPipelineStage ", "cdPipelineId", triggerRequest.Pipeline.Id, "err", err, "preStage", preStage, "triggeredBy", triggerRequest.TriggeredBy) return err } - if len(pipeline.PreStageConfig) > 0 || (preStage != nil && !deleted) { + triggerRequest.TriggerContext.Context = context.Background() + if len(triggerRequest.Pipeline.PreStageConfig) > 0 || (preStage != nil && !deleted) { //pre stage exists - impl.logger.Debugw("trigger pre stage for pipeline", "artifactId", artifact.Id, "pipelineId", pipeline.Id) - err = impl.TriggerPreStage(context.Background(), cdWf, artifact, pipeline, artifact.UpdatedBy, 0) //TODO handle error here + impl.logger.Debugw("trigger pre stage for pipeline", "artifactId", triggerRequest.Artifact.Id, "pipelineId", triggerRequest.Pipeline.Id) + triggerRequest.RefCdWorkflowRunnerId = 0 + err = impl.TriggerPreStage(triggerRequest) // TODO handle error here return err } else { // trigger deployment - impl.logger.Debugw("trigger cd for pipeline", "artifactId", artifact.Id, "pipelineId", pipeline.Id) - err = impl.TriggerDeployment(cdWf, artifact, pipeline, triggeredBy) + impl.logger.Debugw("trigger cd for pipeline", "artifactId", triggerRequest.Artifact.Id, "pipelineId", triggerRequest.Pipeline.Id) + err = impl.TriggerDeployment(triggerRequest) return err } } -func (impl *WorkflowDagExecutorImpl) HandlePreStageSuccessEvent(cdStageCompleteEvent CdStageCompleteEvent) error { + +func (impl *WorkflowDagExecutorImpl) HandlePreStageSuccessEvent(triggerContext TriggerContext, cdStageCompleteEvent CdStageCompleteEvent) error { wfRunner, err := impl.cdWorkflowRepository.FindWorkflowRunnerById(cdStageCompleteEvent.WorkflowRunnerId) if err != nil { return err @@ -943,7 +1003,20 @@ func (impl *WorkflowDagExecutorImpl) HandlePreStageSuccessEvent(cdStageCompleteE return err } //passing applyAuth as false since this event is for auto trigger and user who already has access to this cd can trigger pre cd also - err = impl.TriggerDeployment(cdWorkflow, ciArtifact, pipeline, cdStageCompleteEvent.TriggeredBy) + applyAuth := false + if cdStageCompleteEvent.TriggeredBy != 1 { + applyAuth = true + } + triggerRequest := TriggerRequest{ + CdWf: cdWorkflow, + Pipeline: pipeline, + Artifact: ciArtifact, + ApplyAuth: applyAuth, + TriggeredBy: cdStageCompleteEvent.TriggeredBy, + TriggerContext: triggerContext, + } + triggerRequest.TriggerContext.Context = context.Background() + err = impl.TriggerDeployment(triggerRequest) if err != nil { return err } @@ -1003,10 +1076,15 @@ func (impl *WorkflowDagExecutorImpl) SavePluginArtifacts(ciArtifact *repository. return CDArtifacts, nil } -func (impl *WorkflowDagExecutorImpl) TriggerPreStage(ctx context.Context, cdWf *pipelineConfig.CdWorkflow, artifact *repository.CiArtifact, pipeline *pipelineConfig.Pipeline, triggeredBy int32, refCdWorkflowRunnerId int) error { +func (impl *WorkflowDagExecutorImpl) TriggerPreStage(request TriggerRequest) error { //setting triggeredAt variable to have consistent data for various audit log places in db for deployment time triggeredAt := time.Now() + triggeredBy := request.TriggeredBy + artifact := request.Artifact + pipeline := request.Pipeline + ctx := request.TriggerContext.Context //in case of pre stage manual trigger auth is already applied and for auto triggers there is no need for auth check here + cdWf := request.CdWf var err error if cdWf == nil { cdWf = &pipelineConfig.CdWorkflow{ @@ -1024,7 +1102,7 @@ func (impl *WorkflowDagExecutorImpl) TriggerPreStage(ctx context.Context, cdWf * Name: pipeline.Name, WorkflowType: bean.CD_WORKFLOW_TYPE_PRE, ExecutorType: cdWorkflowExecutorType, - Status: pipelineConfig.WorkflowStarting, //starting PreStage + Status: pipelineConfig.WorkflowStarting, // starting PreStage TriggeredBy: triggeredBy, StartedOn: triggeredAt, Namespace: impl.config.GetDefaultNamespace(), @@ -1032,7 +1110,8 @@ func (impl *WorkflowDagExecutorImpl) TriggerPreStage(ctx context.Context, cdWf * CdWorkflowId: cdWf.Id, LogLocation: fmt.Sprintf("%s/%s%s-%s/main.log", impl.config.GetDefaultBuildLogsKeyPrefix(), strconv.Itoa(cdWf.Id), string(bean.CD_WORKFLOW_TYPE_PRE), pipeline.Name), AuditLog: sql.AuditLog{CreatedOn: triggeredAt, CreatedBy: 1, UpdatedOn: triggeredAt, UpdatedBy: 1}, - RefCdWorkflowRunnerId: refCdWorkflowRunnerId, + RefCdWorkflowRunnerId: request.RefCdWorkflowRunnerId, + ReferenceId: request.TriggerContext.ReferenceId, } var env *repository2.Environment if pipeline.RunPreStageInEnv { @@ -1228,15 +1307,18 @@ func convert(ts string) (*time.Time, error) { return &t, nil } -func (impl *WorkflowDagExecutorImpl) TriggerPostStage(cdWf *pipelineConfig.CdWorkflow, pipeline *pipelineConfig.Pipeline, triggeredBy int32, refCdWorkflowRunnerId int) error { +func (impl *WorkflowDagExecutorImpl) TriggerPostStage(request TriggerRequest) error { //setting triggeredAt variable to have consistent data for various audit log places in db for deployment time triggeredAt := time.Now() + triggeredBy := request.TriggeredBy + pipeline := request.Pipeline + cdWf := request.CdWf runner := &pipelineConfig.CdWorkflowRunner{ Name: pipeline.Name, WorkflowType: bean.CD_WORKFLOW_TYPE_POST, ExecutorType: impl.config.GetWorkflowExecutorType(), - Status: pipelineConfig.WorkflowStarting, //starting PostStage + Status: pipelineConfig.WorkflowStarting, // starting PostStage TriggeredBy: triggeredBy, StartedOn: triggeredAt, Namespace: impl.config.GetDefaultNamespace(), @@ -1244,7 +1326,8 @@ func (impl *WorkflowDagExecutorImpl) TriggerPostStage(cdWf *pipelineConfig.CdWor CdWorkflowId: cdWf.Id, LogLocation: fmt.Sprintf("%s/%s%s-%s/main.log", impl.config.GetDefaultBuildLogsKeyPrefix(), strconv.Itoa(cdWf.Id), string(bean.CD_WORKFLOW_TYPE_POST), pipeline.Name), AuditLog: sql.AuditLog{CreatedOn: triggeredAt, CreatedBy: triggeredBy, UpdatedOn: triggeredAt, UpdatedBy: triggeredBy}, - RefCdWorkflowRunnerId: refCdWorkflowRunnerId, + RefCdWorkflowRunnerId: request.RefCdWorkflowRunnerId, + ReferenceId: request.TriggerContext.ReferenceId, } var env *repository2.Environment var err error @@ -1881,7 +1964,7 @@ func (impl *WorkflowDagExecutorImpl) buildDefaultArtifactLocation(cdWorkflowConf return ArtifactLocation } -func (impl *WorkflowDagExecutorImpl) HandleDeploymentSuccessEvent(pipelineOverride *chartConfig.PipelineOverride) error { +func (impl *WorkflowDagExecutorImpl) HandleDeploymentSuccessEvent(triggerContext TriggerContext, pipelineOverride *chartConfig.PipelineOverride) error { if pipelineOverride == nil { return fmt.Errorf("invalid request, pipeline override not found") } @@ -1909,7 +1992,15 @@ func (impl *WorkflowDagExecutorImpl) HandleDeploymentSuccessEvent(pipelineOverri pipelineOverride.DeploymentType != models.DEPLOYMENTTYPE_STOP && pipelineOverride.DeploymentType != models.DEPLOYMENTTYPE_START { - err = impl.TriggerPostStage(cdWorkflow, pipelineOverride.Pipeline, triggeredByUser, 0) + triggerRequest := TriggerRequest{ + CdWf: cdWorkflow, + Pipeline: pipelineOverride.Pipeline, + TriggeredBy: triggeredByUser, + TriggerContext: triggerContext, + RefCdWorkflowRunnerId: 0, + } + triggerRequest.TriggerContext.Context = context.Background() + err = impl.TriggerPostStage(triggerRequest) if err != nil { impl.logger.Errorw("error in triggering post stage after successful deployment event", "err", err, "cdWorkflow", cdWorkflow) return err @@ -1918,7 +2009,7 @@ func (impl *WorkflowDagExecutorImpl) HandleDeploymentSuccessEvent(pipelineOverri } else { // to trigger next pre/cd, if any // finding children cd by pipeline id - err = impl.HandlePostStageSuccessEvent(cdWorkflow.Id, pipelineOverride.PipelineId, 1, nil) + err = impl.HandlePostStageSuccessEvent(triggerContext, cdWorkflow.Id, pipelineOverride.PipelineId, 1, nil) if err != nil { impl.logger.Errorw("error in triggering children cd after successful deployment event", "parentCdPipelineId", pipelineOverride.PipelineId) return err @@ -1927,7 +2018,7 @@ func (impl *WorkflowDagExecutorImpl) HandleDeploymentSuccessEvent(pipelineOverri return nil } -func (impl *WorkflowDagExecutorImpl) HandlePostStageSuccessEvent(cdWorkflowId int, cdPipelineId int, triggeredBy int32, pluginRegistryImageDetails map[string][]string) error { +func (impl *WorkflowDagExecutorImpl) HandlePostStageSuccessEvent(triggerContext TriggerContext, cdWorkflowId int, cdPipelineId int, triggeredBy int32, pluginRegistryImageDetails map[string][]string) error { // finding children cd by pipeline id cdPipelinesMapping, err := impl.appWorkflowRepository.FindWFCDMappingByParentCDPipelineId(cdPipelineId) if err != nil { @@ -1959,8 +2050,15 @@ func (impl *WorkflowDagExecutorImpl) HandlePostStageSuccessEvent(cdWorkflowId in //finding ci artifact by ciPipelineID and pipelineId //TODO : confirm values for applyAuth, async & triggeredBy - triggerArtifact := ciArtifact - err = impl.triggerIfAutoStageCdPipeline(nil, pipeline, triggerArtifact, triggeredBy) + triggerRequest := TriggerRequest{ + CdWf: nil, + Pipeline: pipeline, + Artifact: ciArtifact, + TriggeredBy: triggeredBy, + TriggerContext: triggerContext, + } + + err = impl.triggerIfAutoStageCdPipeline(triggerRequest) if err != nil { impl.logger.Errorw("error in triggering cd pipeline after successful post stage", "err", err, "pipelineId", pipeline.Id) return err @@ -1970,11 +2068,15 @@ func (impl *WorkflowDagExecutorImpl) HandlePostStageSuccessEvent(cdWorkflowId in } // Only used for auto trigger -func (impl *WorkflowDagExecutorImpl) TriggerDeployment(cdWf *pipelineConfig.CdWorkflow, artifact *repository.CiArtifact, pipeline *pipelineConfig.Pipeline, triggeredBy int32) error { +func (impl *WorkflowDagExecutorImpl) TriggerDeployment(request TriggerRequest) error { //in case of manual trigger auth is already applied and for auto triggers there is no need for auth check here + triggeredBy := request.TriggeredBy + pipeline := request.Pipeline + artifact := request.Artifact //setting triggeredAt variable to have consistent data for various audit log places in db for deployment time triggeredAt := time.Now() + cdWf := request.CdWf if cdWf == nil || (cdWf != nil && cdWf.CiArtifactId != artifact.Id) { // cdWf != nil && cdWf.CiArtifactId != artifact.Id for auto trigger case when deployment is triggered with image generated by plugin @@ -1993,12 +2095,13 @@ func (impl *WorkflowDagExecutorImpl) TriggerDeployment(cdWf *pipelineConfig.CdWo Name: pipeline.Name, WorkflowType: bean.CD_WORKFLOW_TYPE_DEPLOY, ExecutorType: pipelineConfig.WORKFLOW_EXECUTOR_TYPE_SYSTEM, - Status: pipelineConfig.WorkflowInitiated, //deployment Initiated for auto trigger + Status: pipelineConfig.WorkflowInitiated, // deployment Initiated for auto trigger TriggeredBy: 1, StartedOn: triggeredAt, Namespace: impl.config.GetDefaultNamespace(), CdWorkflowId: cdWf.Id, AuditLog: sql.AuditLog{CreatedOn: triggeredAt, CreatedBy: triggeredBy, UpdatedOn: triggeredAt, UpdatedBy: triggeredBy}, + ReferenceId: request.TriggerContext.ReferenceId, } savedWfr, err := impl.cdWorkflowRepository.SaveWorkFlowRunner(runner) if err != nil { @@ -2202,7 +2305,7 @@ func (impl *WorkflowDagExecutorImpl) RotatePods(ctx context.Context, podRotateRe return response, nil } -func (impl *WorkflowDagExecutorImpl) StopStartApp(stopRequest *StopAppRequest, ctx context.Context) (int, error) { +func (impl *WorkflowDagExecutorImpl) StopStartApp(triggerContext TriggerContext, stopRequest *StopAppRequest) (int, error) { pipelines, err := impl.pipelineRepository.FindActiveByAppIdAndEnvironmentId(stopRequest.AppId, stopRequest.EnvironmentId) if err != nil { impl.logger.Errorw("error in fetching pipeline", "app", stopRequest.AppId, "env", stopRequest.EnvironmentId, "err", err) @@ -2241,7 +2344,7 @@ func (impl *WorkflowDagExecutorImpl) StopStartApp(stopRequest *StopAppRequest, c } else { return 0, fmt.Errorf("unsupported operation %s", stopRequest.RequestType) } - id, err := impl.ManualCdTrigger(overrideRequest, ctx) + id, err := impl.ManualCdTrigger(triggerContext, overrideRequest) if err != nil { impl.logger.Errorw("error in stopping app", "err", err, "appId", stopRequest.AppId, "envId", stopRequest.EnvironmentId) return 0, err @@ -2285,11 +2388,11 @@ func (impl *WorkflowDagExecutorImpl) GetArtifactVulnerabilityStatus(artifact *re return isVulnerable, nil } -func (impl *WorkflowDagExecutorImpl) ManualCdTrigger(overrideRequest *bean.ValuesOverrideRequest, ctx context.Context) (int, error) { +func (impl *WorkflowDagExecutorImpl) ManualCdTrigger(triggerContext TriggerContext, overrideRequest *bean.ValuesOverrideRequest) (int, error) { //setting triggeredAt variable to have consistent data for various audit log places in db for deployment time triggeredAt := time.Now() releaseId := 0 - + ctx := triggerContext.Context var err error _, span := otel.Tracer("orchestrator").Start(ctx, "pipelineRepository.FindById") cdPipeline, err := impl.pipelineRepository.FindById(overrideRequest.PipelineId) @@ -2317,7 +2420,16 @@ func (impl *WorkflowDagExecutorImpl) ManualCdTrigger(overrideRequest *bean.Value } } _, span = otel.Tracer("orchestrator").Start(ctx, "TriggerPreStage") - err = impl.TriggerPreStage(ctx, nil, artifact, cdPipeline, overrideRequest.UserId, 0) + triggerRequest := TriggerRequest{ + CdWf: nil, + Artifact: artifact, + Pipeline: cdPipeline, + TriggeredBy: overrideRequest.UserId, + ApplyAuth: false, + TriggerContext: triggerContext, + RefCdWorkflowRunnerId: 0, + } + err = impl.TriggerPreStage(triggerRequest) span.End() if err != nil { impl.logger.Errorw("error in TriggerPreStage, ManualCdTrigger", "err", err) @@ -2359,6 +2471,7 @@ func (impl *WorkflowDagExecutorImpl) ManualCdTrigger(overrideRequest *bean.Value Namespace: impl.config.GetDefaultNamespace(), CdWorkflowId: cdWorkflowId, AuditLog: sql.AuditLog{CreatedOn: triggeredAt, CreatedBy: overrideRequest.UserId, UpdatedOn: triggeredAt, UpdatedBy: overrideRequest.UserId}, + ReferenceId: triggerContext.ReferenceId, } savedWfr, err := impl.cdWorkflowRepository.SaveWorkFlowRunner(runner) overrideRequest.WfrId = savedWfr.Id @@ -2422,7 +2535,8 @@ func (impl *WorkflowDagExecutorImpl) ManualCdTrigger(overrideRequest *bean.Value } return 0, releaseErr } - //skip updatePreviousDeploymentStatus if Async Install is enabled; handled inside SubscribeDevtronAsyncHelmInstallRequest + + // skip updatePreviousDeploymentStatus if Async Install is enabled; handled inside SubscribeDevtronAsyncHelmInstallRequest if !impl.appService.IsDevtronAsyncInstallModeEnabled(cdPipeline.DeploymentAppType) { // Update previous deployment runner status (in transaction): Failed _, span = otel.Tracer("orchestrator").Start(ctx, "updatePreviousDeploymentStatus") @@ -2483,7 +2597,14 @@ func (impl *WorkflowDagExecutorImpl) ManualCdTrigger(overrideRequest *bean.Value } } _, span = otel.Tracer("orchestrator").Start(ctx, "TriggerPostStage") - err = impl.TriggerPostStage(cdWf, cdPipeline, overrideRequest.UserId, 0) + triggerRequest := TriggerRequest{ + CdWf: cdWf, + Pipeline: cdPipeline, + TriggeredBy: overrideRequest.UserId, + RefCdWorkflowRunnerId: 0, + TriggerContext: triggerContext, + } + err = impl.TriggerPostStage(triggerRequest) span.End() if err != nil { impl.logger.Errorw("error in TriggerPostStage, ManualCdTrigger", "CdWorkflowId", cdWf.Id, "err", err) @@ -2587,15 +2708,12 @@ func (impl *WorkflowDagExecutorImpl) triggerNatsEventForBulkAction(cdWorkflows [ func (impl *WorkflowDagExecutorImpl) subscribeTriggerBulkAction() error { callback := func(msg *model.PubSubMsg) { - impl.logger.Debug("subscribeTriggerBulkAction event received") - //defer msg.Ack() cdWorkflow := new(pipelineConfig.CdWorkflow) err := json.Unmarshal([]byte(string(msg.Data)), cdWorkflow) if err != nil { impl.logger.Error("Error while unmarshalling cdWorkflow json object", "error", err) return } - impl.logger.Debugw("subscribeTriggerBulkAction event:", "cdWorkflow", cdWorkflow) wf := &pipelineConfig.CdWorkflow{ Id: cdWorkflow.Id, CiArtifactId: cdWorkflow.CiArtifactId, @@ -2637,7 +2755,19 @@ func (impl *WorkflowDagExecutorImpl) subscribeTriggerBulkAction() error { impl.logger.Warnw("unable to migrate deprecated DataSource", "artifactId", artifact.Id) } } - err = impl.triggerStageForBulk(wf, pipeline, artifact, false, cdWorkflow.CreatedBy) + triggerContext := TriggerContext{ + ReferenceId: pointer.String(msg.MsgId), + } + + triggerRequest := TriggerRequest{ + CdWf: wf, + Artifact: artifact, + Pipeline: pipeline, + TriggeredBy: cdWorkflow.CreatedBy, + ApplyAuth: false, + TriggerContext: triggerContext, + } + err = impl.triggerStageForBulk(triggerRequest, false) if err != nil { impl.logger.Errorw("error in cd trigger ", "err", err) wf.WorkflowStatus = pipelineConfig.TRIGGER_ERROR @@ -2646,21 +2776,30 @@ func (impl *WorkflowDagExecutorImpl) subscribeTriggerBulkAction() error { } impl.cdWorkflowRepository.UpdateWorkFlow(wf) } - err := impl.pubsubClient.Subscribe(pubsub.BULK_DEPLOY_TOPIC, callback) + + // add required logging here + var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) { + cdWorkflow := new(pipelineConfig.CdWorkflow) + err := json.Unmarshal([]byte(string(msg.Data)), cdWorkflow) + if err != nil { + return "error while unmarshalling cdWorkflow json object", []interface{}{"error", err} + } + return "got message for bulk deploy", []interface{}{"cdWorkflowId", cdWorkflow.Id} + } + + validations := impl.GetTriggerValidateFuncs() + err := impl.pubsubClient.Subscribe(pubsub.BULK_DEPLOY_TOPIC, callback, loggerFunc, validations...) return err } func (impl *WorkflowDagExecutorImpl) subscribeHibernateBulkAction() error { callback := func(msg *model.PubSubMsg) { - impl.logger.Debug("subscribeHibernateBulkAction event received") - //defer msg.Ack() deploymentGroupAppWithEnv := new(DeploymentGroupAppWithEnv) err := json.Unmarshal([]byte(string(msg.Data)), deploymentGroupAppWithEnv) if err != nil { impl.logger.Error("Error while unmarshalling deploymentGroupAppWithEnv json object", err) return } - impl.logger.Debugw("subscribeHibernateBulkAction event:", "DeploymentGroupAppWithEnv", deploymentGroupAppWithEnv) stopAppRequest := &StopAppRequest{ AppId: deploymentGroupAppWithEnv.AppId, @@ -2673,13 +2812,28 @@ func (impl *WorkflowDagExecutorImpl) subscribeHibernateBulkAction() error { impl.logger.Errorw("error in creating acd synch context", "err", err) return } - _, err = impl.StopStartApp(stopAppRequest, ctx) + triggerContext := TriggerContext{ + ReferenceId: pointer.String(msg.MsgId), + Context: ctx, + } + _, err = impl.StopStartApp(triggerContext, stopAppRequest) if err != nil { impl.logger.Errorw("error in stop app request", "err", err) return } } - err := impl.pubsubClient.Subscribe(pubsub.BULK_HIBERNATE_TOPIC, callback) + + // add required logging here + var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) { + deploymentGroupAppWithEnv := new(DeploymentGroupAppWithEnv) + err := json.Unmarshal([]byte(string(msg.Data)), deploymentGroupAppWithEnv) + if err != nil { + return "error while unmarshalling deploymentGroupAppWithEnv json object", []interface{}{"err", err} + } + return "got message for bulk hibernate", []interface{}{"deploymentGroupId", deploymentGroupAppWithEnv.DeploymentGroupId, "appId", deploymentGroupAppWithEnv.AppId, "environmentId", deploymentGroupAppWithEnv.EnvironmentId} + } + + err := impl.pubsubClient.Subscribe(pubsub.BULK_HIBERNATE_TOPIC, callback, loggerFunc) return err } @@ -4771,3 +4925,44 @@ func (impl *WorkflowDagExecutorImpl) UpdateCDWorkflowRunnerStatus(ctx context.Co } return nil } + +// canInitiateTrigger checks if the current trigger request with natsMsgId haven't already initiated the trigger. +// throws error if the request is already processed. +func (impl *WorkflowDagExecutorImpl) canInitiateTrigger(natsMsgId string) (bool, error) { + if natsMsgId == "" { + return true, nil + } + exists, err := impl.cdWorkflowRepository.CheckWorkflowRunnerByReferenceId(natsMsgId) + if err != nil { + impl.logger.Errorw("error in fetching cd workflow runner using reference_id", "referenceId", natsMsgId, "err", err) + return false, errors.New("error in fetching cd workflow runner") + } + + if exists { + impl.logger.Errorw("duplicate pre stage trigger request as there is already a workflow runner object created by this message") + return false, errors.New("duplicate pre stage trigger request, this request was already processed") + } + + return true, nil +} + +// GetTriggerValidateFuncs gets all the required validation funcs +func (impl *WorkflowDagExecutorImpl) GetTriggerValidateFuncs() []pubsub.ValidateMsg { + + var duplicateTriggerValidateFunc pubsub.ValidateMsg = func(msg model.PubSubMsg) bool { + if msg.MsgDeliverCount == 1 { + // first time message got delivered, always validate this. + return true + } + + // message is redelivered, check if the message is already processed. + if ok, err := impl.canInitiateTrigger(msg.MsgId); !ok || err != nil { + impl.logger.Warnw("duplicate trigger condition, duplicate message", "msgId", msg.MsgId, "err", err) + return false + } + return true + } + + return []pubsub.ValidateMsg{duplicateTriggerValidateFunc} + +} diff --git a/pkg/sql/connection.go b/pkg/sql/connection.go index fe9dc52aaa6..bf13f36be9a 100644 --- a/pkg/sql/connection.go +++ b/pkg/sql/connection.go @@ -55,7 +55,7 @@ func NewDbConnection(cfg *Config, logger *zap.SugaredLogger) (*pg.DB, error) { ApplicationName: cfg.ApplicationName, } dbConnection := pg.Connect(&options) - //check db connection + // check db connection var test string _, err := dbConnection.QueryOne(&test, `SELECT 1`) @@ -66,7 +66,7 @@ func NewDbConnection(cfg *Config, logger *zap.SugaredLogger) (*pg.DB, error) { logger.Infow("connected with db", "db", obfuscateSecretTags(cfg)) } - //-------------- + // -------------- dbConnection.OnQueryProcessed(func(event *pg.QueryProcessedEvent) { queryDuration := time.Since(event.StartTime) diff --git a/scripts/sql/215_reference_id_deploy.down.sql b/scripts/sql/215_reference_id_deploy.down.sql new file mode 100644 index 00000000000..f682dd2571d --- /dev/null +++ b/scripts/sql/215_reference_id_deploy.down.sql @@ -0,0 +1 @@ +ALTER TABLE cd_workflow_runner DROP COLUMN IF EXISTS "reference_id"; \ No newline at end of file diff --git a/scripts/sql/215_reference_id_deploy.up.sql b/scripts/sql/215_reference_id_deploy.up.sql new file mode 100644 index 00000000000..4be9a7db5ff --- /dev/null +++ b/scripts/sql/215_reference_id_deploy.up.sql @@ -0,0 +1 @@ +ALTER TABLE cd_workflow_runner ADD COLUMN IF NOT EXISTS "reference_id" VARCHAR(50) NULL; \ No newline at end of file diff --git a/vendor/github.com/devtron-labs/common-lib/pubsub-lib/JetStreamUtil.go b/vendor/github.com/devtron-labs/common-lib/pubsub-lib/JetStreamUtil.go index e2988660e55..c02ec09f690 100644 --- a/vendor/github.com/devtron-labs/common-lib/pubsub-lib/JetStreamUtil.go +++ b/vendor/github.com/devtron-labs/common-lib/pubsub-lib/JetStreamUtil.go @@ -101,7 +101,10 @@ type NatsTopic struct { consumerName string } type ConfigJson struct { - StreamConfigJson string `env:"STREAM_CONFIG_JSON"` + // StreamConfigJson is a json string of map[string]NatsStreamConfig + StreamConfigJson string `env:"STREAM_CONFIG_JSON"` + // ConsumerConfigJson is a json string of map[string]NatsConsumerConfig + // eg: "{\"ARGO_PIPELINE_STATUS_UPDATE_DURABLE-1\" : \"{\"natsMsgProcessingBatchSize\" : 3, \"natsMsgBufferSize\" : 3, \"ackWaitInSecs\": 300}\"}" ConsumerConfigJson string `env:"CONSUMER_CONFIG_JSON"` } @@ -139,6 +142,7 @@ var NatsStreamWiseConfigMapping = map[string]NatsStreamConfig{ KUBEWATCH_STREAM: {}, GIT_SENSOR_STREAM: {}, IMAGE_SCANNER_STREAM: {}, + DEVTRON_TEST_STREAM: {}, } var NatsConsumerWiseConfigMapping = map[string]NatsConsumerConfig{ @@ -161,8 +165,11 @@ var NatsConsumerWiseConfigMapping = map[string]NatsConsumerConfig{ HELM_CHART_INSTALL_STATUS_DURABLE: {}, DEVTRON_CHART_INSTALL_DURABLE: {}, PANIC_ON_PROCESSING_DURABLE: {}, + DEVTRON_TEST_CONSUMER: {}, } +// getConsumerConfigMap will fetch the consumer wise config from the json string +// this will only fetch consumerConfigs for given consumers in the jsonString func getConsumerConfigMap(jsonString string) map[string]NatsConsumerConfig { resMap := map[string]NatsConsumerConfig{} if jsonString == "" { @@ -181,6 +188,8 @@ func getConsumerConfigMap(jsonString string) map[string]NatsConsumerConfig { return resMap } +// getStreamConfigMap will fetch the stream wise config from the json string +// this will only fetch streamConfigs for given streams in the jsonString func getStreamConfigMap(jsonString string) map[string]NatsStreamConfig { resMap := map[string]NatsStreamConfig{} if jsonString == "" { @@ -203,58 +212,41 @@ func ParseAndFillStreamWiseAndConsumerWiseConfigMaps() { configJson := ConfigJson{} err := env.Parse(&configJson) if err != nil { - log.Fatal("error while parsing config from environment params", "err", err) + log.Fatal("error while parsing config from environment params", " err", err) } + + // fetch the consumer configs that were given explicitly in the configJson.ConsumerConfigJson consumerConfigMap := getConsumerConfigMap(configJson.ConsumerConfigJson) + // fetch the stream configs that were given explicitly in the configJson.StreamConfigJson streamConfigMap := getStreamConfigMap(configJson.StreamConfigJson) + + // default nats configuration values defaultConfig := NatsClientConfig{} err = env.Parse(&defaultConfig) if err != nil { log.Print("error while parsing config from environment params", "err", err) } - defaultStreamConfigVal := NatsStreamConfig{ - StreamConfig: StreamConfig{MaxAge: DefaultMaxAge}, - } - defaultConsumerConfigVal := NatsConsumerConfig{ - NatsMsgBufferSize: defaultConfig.NatsMsgBufferSize, - NatsMsgProcessingBatchSize: defaultConfig.NatsMsgProcessingBatchSize, - } - - // default NATS Consumer config value for BULK CD TRIGGER topic - defaultConsumerConfigForBulkCdTriggerTopic := NatsConsumerConfig{} - - err = json.Unmarshal([]byte(defaultConfig.NatsConsumerConfig), &defaultConsumerConfigForBulkCdTriggerTopic) - - if err != nil { - log.Print("error in unmarshalling nats consumer config", - "consumer-config", defaultConfig.NatsConsumerConfig, - "err", err) - } + // default stream and consumer config values + defaultStreamConfigVal := defaultConfig.GetDefaultNatsStreamConfig() + defaultConsumerConfigVal := defaultConfig.GetDefaultNatsConsumerConfig() + // initialise all the consumer wise config with default values or user defined values for key, _ := range NatsConsumerWiseConfigMapping { - defaultValue := defaultConsumerConfigVal - - // Setting AckWait config. Only for BULK CD TRIGGER topics. Can be used for other topics - // if required to be made configurable - if key == BULK_DEPLOY_DURABLE || key == CD_BULK_DEPLOY_TRIGGER_DURABLE { - defaultValue.AckWaitInSecs = defaultConsumerConfigForBulkCdTriggerTopic.AckWaitInSecs + consumerConfig := defaultConsumerConfigVal + if _, ok := consumerConfigMap[key]; ok { + consumerConfig = consumerConfigMap[key] } - - // Overriding default config with explicitly provided topic-specific config - if _, ok := consumerConfigMap[key]; ok && (key != BULK_DEPLOY_DURABLE && key != CD_BULK_DEPLOY_TRIGGER_DURABLE) { - defaultValue = consumerConfigMap[key] - } - - NatsConsumerWiseConfigMapping[key] = defaultValue + NatsConsumerWiseConfigMapping[key] = consumerConfig } + // initialise all the consumer wise config with default values or user defined values for key, _ := range NatsStreamWiseConfigMapping { - defaultValue := defaultStreamConfigVal + streamConfig := defaultStreamConfigVal if _, ok := streamConfigMap[key]; ok { - defaultValue = streamConfigMap[key] + streamConfig = streamConfigMap[key] } - NatsStreamWiseConfigMapping[key] = defaultValue + NatsStreamWiseConfigMapping[key] = streamConfig } } @@ -280,7 +272,7 @@ func AddStream(js nats.JetStreamContext, streamConfig *nats.StreamConfig, stream streamInfo, err := js.StreamInfo(streamName) if err == nats.ErrStreamNotFound || streamInfo == nil { log.Print("No stream was created already. Need to create one. ", "Stream name: ", streamName) - //Stream doesn't already exist. Create a new stream from jetStreamContext + // Stream doesn't already exist. Create a new stream from jetStreamContext cfgToSet := getNewConfig(streamName, streamConfig) _, err = js.AddStream(cfgToSet) if err != nil { @@ -330,6 +322,5 @@ func getNewConfig(streamName string, toUpdateConfig *nats.StreamConfig) *nats.St if toUpdateConfig.Retention != nats.RetentionPolicy(0) { cfg.Retention = toUpdateConfig.Retention } - //cfg.Retention = nats.RetentionPolicy(1) return cfg } diff --git a/vendor/github.com/devtron-labs/common-lib/pubsub-lib/NatsClient.go b/vendor/github.com/devtron-labs/common-lib/pubsub-lib/NatsClient.go index 7eb06674dba..27a5b367c22 100644 --- a/vendor/github.com/devtron-labs/common-lib/pubsub-lib/NatsClient.go +++ b/vendor/github.com/devtron-labs/common-lib/pubsub-lib/NatsClient.go @@ -18,7 +18,6 @@ package pubsub_lib import ( - "encoding/json" "github.com/caarlos0/env" "github.com/nats-io/nats.go" "go.uber.org/zap" @@ -26,28 +25,52 @@ import ( ) type NatsClient struct { - logger *zap.SugaredLogger - JetStrCtxt nats.JetStreamContext - streamConfig *nats.StreamConfig - NatsMsgProcessingBatchSize int - NatsMsgBufferSize int - Conn nats.Conn + logger *zap.SugaredLogger + JetStrCtxt nats.JetStreamContext + Conn *nats.Conn } -const DefaultMaxAge time.Duration = 86400000000000 - type NatsClientConfig struct { NatsServerHost string `env:"NATS_SERVER_HOST" envDefault:"nats://devtron-nats.devtroncd:4222"` - //consumer wise + // consumer wise + // NatsMsgProcessingBatchSize is the number of messages that will be processed in one go NatsMsgProcessingBatchSize int `env:"NATS_MSG_PROCESSING_BATCH_SIZE" envDefault:"1"` - NatsMsgBufferSize int `env:"NATS_MSG_BUFFER_SIZE" envDefault:"64"` - //stream wise - NatsStreamConfig string `env:"NATS_STREAM_CONFIG" envDefault:"{\"max_age\":86400000000000}"` + // NatsMsgBufferSize is the number of messages that will be buffered in memory (channel size) + // it is recommended to set this value equal to NatsMsgProcessingBatchSize as we want to process maximum messages in the buffer in one go. + // Note: if NatsMsgBufferSize is less than NatsMsgProcessingBatchSize + // then the wait time for the unprocessed messages in the buffer will be high.(total process time = life-time in buffer + processing time) + // NatsMsgBufferSize can be configured independently of NatsMsgProcessingBatchSize if needed by setting its value to positive value in env. + // if NatsMsgBufferSize set to a non-positive value then it will take the value of NatsMsgProcessingBatchSize. + // Note: always get this value by calling GetNatsMsgBufferSize method + NatsMsgBufferSize int `env:"NATS_MSG_BUFFER_SIZE" envDefault:"-1"` + NatsMsgMaxAge int `env:"NATS_MSG_MAX_AGE" envDefault:"86400"` + NatsMsgAckWaitInSecs int `env:"NATS_MSG_ACK_WAIT_IN_SECS" envDefault:"300"` +} + +func (ncc NatsClientConfig) GetNatsMsgBufferSize() int { + // if NatsMsgBufferSize is set to a non-positive value then it will take the value of NatsMsgProcessingBatchSize. + if ncc.NatsMsgBufferSize <= 0 { + return ncc.NatsMsgProcessingBatchSize + } + return ncc.NatsMsgBufferSize +} + +func (ncc NatsClientConfig) GetDefaultNatsConsumerConfig() NatsConsumerConfig { + return NatsConsumerConfig{ + NatsMsgProcessingBatchSize: ncc.NatsMsgProcessingBatchSize, + NatsMsgBufferSize: ncc.GetNatsMsgBufferSize(), + AckWaitInSecs: ncc.NatsMsgAckWaitInSecs, + } +} - // Consumer config - NatsConsumerConfig string `env:"NATS_CONSUMER_CONFIG" envDefault:"{\"ackWaitInSecs\":3600}"` +func (ncc NatsClientConfig) GetDefaultNatsStreamConfig() NatsStreamConfig { + return NatsStreamConfig{ + StreamConfig: StreamConfig{ + MaxAge: time.Duration(ncc.NatsMsgMaxAge) * time.Second, + }, + } } type StreamConfig struct { @@ -56,13 +79,32 @@ type StreamConfig struct { type NatsStreamConfig struct { StreamConfig StreamConfig `json:"streamConfig"` } + type NatsConsumerConfig struct { + // NatsMsgProcessingBatchSize is the number of messages that will be processed in one go NatsMsgProcessingBatchSize int `json:"natsMsgProcessingBatchSize"` - NatsMsgBufferSize int `json:"natsMsgBufferSize"` - AckWaitInSecs int `json:"ackWaitInSecs"` + // NatsMsgBufferSize is the number of messages that will be buffered in memory (channel size). + // Note: always get this value by calling GetNatsMsgBufferSize method + NatsMsgBufferSize int `json:"natsMsgBufferSize"` + // AckWaitInSecs is the time in seconds for which the message can be in unacknowledged state + AckWaitInSecs int `json:"ackWaitInSecs"` +} + +func (consumerConf NatsConsumerConfig) GetNatsMsgBufferSize() int { + // if NatsMsgBufferSize is set to a non-positive value then it will take the value of NatsMsgProcessingBatchSize. + if consumerConf.NatsMsgBufferSize <= 0 { + return consumerConf.NatsMsgProcessingBatchSize + } + return consumerConf.NatsMsgBufferSize } -/* #nosec */ +// func (consumerConf NatsConsumerConfig) GetNatsMsgProcessingBatchSize() int { +// if nbs := consumerConf.GetNatsMsgBufferSize(); nbs < consumerConf.NatsMsgProcessingBatchSize { +// return nbs +// } +// return consumerConf.NatsMsgProcessingBatchSize +// } + func NewNatsClient(logger *zap.SugaredLogger) (*NatsClient, error) { cfg := &NatsClientConfig{} @@ -72,17 +114,7 @@ func NewNatsClient(logger *zap.SugaredLogger) (*NatsClient, error) { return &NatsClient{}, err } - configJson := cfg.NatsStreamConfig - streamCfg := &nats.StreamConfig{} - if configJson != "" { - err := json.Unmarshal([]byte(configJson), streamCfg) - if err != nil { - logger.Errorw("error occurred while parsing streamConfigJson ", "configJson", configJson, "reason", err) - } - } - logger.Debugw("nats config loaded", "NatsMsgProcessingBatchSize", cfg.NatsMsgProcessingBatchSize, "NatsMsgBufferSize", cfg.NatsMsgBufferSize, "config", streamCfg) - - //Connect to NATS + // Connect to NATS nc, err := nats.Connect(cfg.NatsServerHost, nats.ReconnectWait(10*time.Second), nats.MaxReconnects(100), nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { @@ -99,7 +131,7 @@ func NewNatsClient(logger *zap.SugaredLogger) (*NatsClient, error) { return &NatsClient{}, err } - //Create a jetstream context + // Create a jetstream context js, err := nc.JetStream() if err != nil { @@ -107,11 +139,9 @@ func NewNatsClient(logger *zap.SugaredLogger) (*NatsClient, error) { } natsClient := &NatsClient{ - logger: logger, - JetStrCtxt: js, - streamConfig: streamCfg, - NatsMsgBufferSize: cfg.NatsMsgBufferSize, - NatsMsgProcessingBatchSize: cfg.NatsMsgProcessingBatchSize, + logger: logger, + JetStrCtxt: js, + Conn: nc, } return natsClient, nil } diff --git a/vendor/github.com/devtron-labs/common-lib/pubsub-lib/PubSubClientService.go b/vendor/github.com/devtron-labs/common-lib/pubsub-lib/PubSubClientService.go index dec37692bc0..57a84ef932a 100644 --- a/vendor/github.com/devtron-labs/common-lib/pubsub-lib/PubSubClientService.go +++ b/vendor/github.com/devtron-labs/common-lib/pubsub-lib/PubSubClientService.go @@ -14,9 +14,18 @@ import ( "time" ) +const NATS_MSG_LOG_PREFIX = "NATS_LOG" + +type ValidateMsg func(msg model.PubSubMsg) bool + +// LoggerFunc is used to log the message before passing to callback function. +// it expects logg message and key value pairs to be returned. +// if keysAndValues is empty, it will log whole model.PubSubMsg +type LoggerFunc func(msg model.PubSubMsg) (logMsg string, keysAndValues []interface{}) + type PubSubClientService interface { Publish(topic string, msg string) error - Subscribe(topic string, callback func(msg *model.PubSubMsg)) error + Subscribe(topic string, callback func(msg *model.PubSubMsg), loggerFunc LoggerFunc, validations ...ValidateMsg) error } type PubSubClientServiceImpl struct { @@ -77,7 +86,12 @@ func (impl PubSubClientServiceImpl) Publish(topic string, msg string) error { return nil } -func (impl PubSubClientServiceImpl) Subscribe(topic string, callback func(msg *model.PubSubMsg)) error { +// Subscribe method is used to subscribe to the given topic(+required), +// this creates blocking process to continuously fetch messages from nats server published on this topic. +// invokes callback(+required) func for each message received. +// loggerFunc(+optional) is invoked before passing the message to the callback function. +// validations(+optional) methods were called before passing the message to the callback func. +func (impl PubSubClientServiceImpl) Subscribe(topic string, callback func(msg *model.PubSubMsg), loggerFunc LoggerFunc, validations ...ValidateMsg) error { impl.Logger.Infow("Subscribed to pubsub client", "topic", topic) natsTopic := GetNatsTopic(topic) streamName := natsTopic.streamName @@ -91,80 +105,61 @@ func (impl PubSubClientServiceImpl) Subscribe(topic string, callback func(msg *m if streamConfig.Retention == nats.WorkQueuePolicy { deliveryOption = nats.DeliverAll() } - processingBatchSize := NatsConsumerWiseConfigMapping[consumerName].NatsMsgProcessingBatchSize - msgBufferSize := NatsConsumerWiseConfigMapping[consumerName].NatsMsgBufferSize - - // Converting provided ack wait (int) into duration for comparing with nats-server config - ackWait := time.Duration(NatsConsumerWiseConfigMapping[consumerName].AckWaitInSecs) * time.Second - - // Get the current Consumer config from NATS-server - info, err := natsClient.JetStrCtxt.ConsumerInfo(streamName, consumerName) - - if err != nil { - impl.Logger.Errorw("unable to retrieve consumer info from NATS-server", - "stream", streamName, - "consumer", consumerName, - "err", err) - - } else { - // Update NATS Consumer config if new changes detected - // Currently only checking for AckWait, but can be done for other editable properties as well - if ackWait > 0 && info.Config.AckWait != ackWait { + consumerConfig := NatsConsumerWiseConfigMapping[consumerName] + processingBatchSize := consumerConfig.NatsMsgProcessingBatchSize + msgBufferSize := consumerConfig.GetNatsMsgBufferSize() - updatedConfig := info.Config - updatedConfig.AckWait = ackWait - - _, err = natsClient.JetStrCtxt.UpdateConsumer(streamName, &updatedConfig) + // Converting provided ack wait (int) into duration for comparing with nats-server config + ackWait := time.Duration(consumerConfig.AckWaitInSecs) * time.Second - if err != nil { - impl.Logger.Errorw("failed to update Consumer config", - "received consumer config", info.Config, - "err", err) - } - } - } + // Update consumer config if new changes detected + impl.updateConsumer(natsClient, streamName, consumerName, &consumerConfig) channel := make(chan *nats.Msg, msgBufferSize) - _, err = natsClient.JetStrCtxt.ChanQueueSubscribe(topic, queueName, channel, nats.Durable(consumerName), deliveryOption, nats.ManualAck(), + _, err := natsClient.JetStrCtxt.ChanQueueSubscribe(topic, queueName, channel, + nats.Durable(consumerName), + deliveryOption, + nats.ManualAck(), + nats.AckWait(ackWait), // if ackWait is 0 , nats sets this option to 30secs by default nats.BindStream(streamName)) if err != nil { impl.Logger.Fatalw("error while subscribing to nats ", "stream", streamName, "topic", topic, "error", err) return err } - go impl.startListeningForEvents(processingBatchSize, channel, callback, topic) + go impl.startListeningForEvents(processingBatchSize, channel, callback, loggerFunc, validations...) impl.Logger.Infow("Successfully subscribed with Nats", "stream", streamName, "topic", topic, "queue", queueName, "consumer", consumerName) return nil } -func (impl PubSubClientServiceImpl) startListeningForEvents(processingBatchSize int, channel chan *nats.Msg, callback func(msg *model.PubSubMsg), topic string) { +func (impl PubSubClientServiceImpl) startListeningForEvents(processingBatchSize int, channel chan *nats.Msg, callback func(msg *model.PubSubMsg), loggerFunc LoggerFunc, validations ...ValidateMsg) { wg := new(sync.WaitGroup) for index := 0; index < processingBatchSize; index++ { wg.Add(1) - go impl.processMessages(wg, channel, callback, topic) + go impl.processMessages(wg, channel, callback, loggerFunc, validations...) } wg.Wait() impl.Logger.Warn("msgs received Done from Nats side, going to end listening!!") } -func (impl PubSubClientServiceImpl) processMessages(wg *sync.WaitGroup, channel chan *nats.Msg, callback func(msg *model.PubSubMsg), topic string) { +func (impl PubSubClientServiceImpl) processMessages(wg *sync.WaitGroup, channel chan *nats.Msg, callback func(msg *model.PubSubMsg), loggerFunc LoggerFunc, validations ...ValidateMsg) { defer wg.Done() for msg := range channel { - impl.processMsg(msg, callback, topic) + impl.processMsg(msg, callback, loggerFunc, validations...) } } // TODO need to extend msg ack depending upon response from callback like error scenario -func (impl PubSubClientServiceImpl) processMsg(msg *nats.Msg, callback func(msg *model.PubSubMsg), topic string) { +func (impl PubSubClientServiceImpl) processMsg(msg *nats.Msg, callback func(msg *model.PubSubMsg), loggerFunc LoggerFunc, validations ...ValidateMsg) { t1 := time.Now() - metrics.IncConsumingCount(topic) - defer metrics.IncConsumptionCount(topic) + metrics.IncConsumingCount(msg.Subject) + defer metrics.IncConsumptionCount(msg.Subject) defer func() { // wrapping this function in defer as directly calling Observe() will run immediately - metrics.NatsEventConsumptionTime.WithLabelValues(topic).Observe(float64(time.Since(t1).Milliseconds())) + metrics.NatsEventConsumptionTime.WithLabelValues(msg.Subject).Observe(float64(time.Since(t1).Milliseconds())) }() - impl.TryCatchCallBack(msg, callback) + impl.TryCatchCallBack(msg, callback, loggerFunc, validations...) } func (impl PubSubClientServiceImpl) publishPanicError(msg *nats.Msg, panicErr error) (err error) { @@ -190,14 +185,36 @@ func (impl PubSubClientServiceImpl) publishPanicError(msg *nats.Msg, panicErr er } // TryCatchCallBack is a fail-safe method to use callback function -func (impl PubSubClientServiceImpl) TryCatchCallBack(msg *nats.Msg, callback func(msg *model.PubSubMsg)) { - subMsg := &model.PubSubMsg{Data: string(msg.Data)} +func (impl PubSubClientServiceImpl) TryCatchCallBack(msg *nats.Msg, callback func(msg *model.PubSubMsg), loggerFunc LoggerFunc, validations ...ValidateMsg) { + var msgDeliveryCount uint64 = 0 + if metadata, err := msg.Metadata(); err == nil { + msgDeliveryCount = metadata.NumDelivered + } + natsMsgId := msg.Header.Get(model.NatsMsgId) + subMsg := &model.PubSubMsg{Data: string(msg.Data), MsgDeliverCount: msgDeliveryCount, MsgId: natsMsgId} + + // call loggersFunc + impl.Log(loggerFunc, msg.Subject, *subMsg) + + // run validations + for _, validation := range validations { + if !validation(*subMsg) { + impl.Logger.Warnw("nats: message validation failed, not processing the message...", "subject", msg.Subject, "msg", string(msg.Data)) + return + } + } defer func() { // Acknowledge the message delivery err := msg.Ack() if err != nil { impl.Logger.Errorw("nats: unable to acknowledge the message", "subject", msg.Subject, "msg", string(msg.Data)) } + + // publish metrics for msg delivery count if msgDeliveryCount > 1 + if msgDeliveryCount > 1 { + metrics.NatsEventDeliveryCount.WithLabelValues(msg.Subject, natsMsgId).Observe(float64(msgDeliveryCount)) + } + // Panic recovery handling if panicInfo := recover(); panicInfo != nil { impl.Logger.Warnw("nats: found panic error", "subject", msg.Subject, "payload", string(msg.Data), "logs", string(debug.Stack())) @@ -214,12 +231,6 @@ func (impl PubSubClientServiceImpl) TryCatchCallBack(msg *nats.Msg, callback fun callback(subMsg) } -func (impl PubSubClientServiceImpl) printTimeDiff(t0 time.Time, msg *nats.Msg, timeLimitInMillSecs int64) { - t1 := time.Since(t0) - if t1.Milliseconds() > timeLimitInMillSecs { - impl.Logger.Debugw("time took to process msg: ", msg, "time :", t1) - } -} func (impl PubSubClientServiceImpl) getStreamConfig(streamName string) *nats.StreamConfig { configJson := NatsStreamWiseConfigMapping[streamName].StreamConfig streamCfg := &nats.StreamConfig{} @@ -235,3 +246,47 @@ func (impl PubSubClientServiceImpl) getStreamConfig(streamName string) *nats.Str return streamCfg } + +// Updates NATS Consumer config if new changes detected +// if consumer didn't exist, this will just return +func (impl PubSubClientServiceImpl) updateConsumer(natsClient *NatsClient, streamName string, consumerName string, overrideConfig *NatsConsumerConfig) { + + // Get the current Consumer config from NATS-server + info, err := natsClient.JetStrCtxt.ConsumerInfo(streamName, consumerName) + if err != nil { + impl.Logger.Errorw("unable to retrieve consumer info from NATS-server", "stream", streamName, "consumer", consumerName, "err", err) + return + } + + existingConfig := info.Config + updatesDetected := false + + // Currently only checking for AckWait,MaxAckPending but can be done for other editable properties as well + if ackWaitOverride := time.Duration(overrideConfig.AckWaitInSecs) * time.Second; ackWaitOverride > 0 && existingConfig.AckWait != ackWaitOverride { + existingConfig.AckWait = ackWaitOverride + updatesDetected = true + } + + if messageBufferSize := overrideConfig.GetNatsMsgBufferSize(); messageBufferSize > 0 && existingConfig.MaxAckPending != messageBufferSize { + existingConfig.MaxAckPending = messageBufferSize + updatesDetected = true + } + + if updatesDetected { + _, err = natsClient.JetStrCtxt.UpdateConsumer(streamName, &existingConfig) + if err != nil { + impl.Logger.Errorw("failed to update Consumer config", "received consumer config", info.Config, "err", err) + } + } + return +} + +func (impl PubSubClientServiceImpl) Log(loggerFunc LoggerFunc, topic string, subMsg model.PubSubMsg) { + logMsg, metaSlice := loggerFunc(subMsg) + logMsg = fmt.Sprintf("%s:%s", NATS_MSG_LOG_PREFIX, logMsg) + if len(metaSlice) == 0 { + metaSlice = []interface{}{"msgId", subMsg.MsgId, "msg", subMsg.Data} + } + metaSlice = append(metaSlice, "topic", topic) + impl.Logger.Infow(logMsg, metaSlice...) +} diff --git a/vendor/github.com/devtron-labs/common-lib/pubsub-lib/metrics/metrics.go b/vendor/github.com/devtron-labs/common-lib/pubsub-lib/metrics/metrics.go index 9bf5117d6d0..fd7c41e5a7b 100644 --- a/vendor/github.com/devtron-labs/common-lib/pubsub-lib/metrics/metrics.go +++ b/vendor/github.com/devtron-labs/common-lib/pubsub-lib/metrics/metrics.go @@ -28,6 +28,10 @@ var NatsEventPublishTime = promauto.NewHistogramVec(prometheus.HistogramOpts{ Name: "nats_event_publish_time", }, []string{"topic"}) +var NatsEventDeliveryCount = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "nats_event_delivery_count", +}, []string{"topic", "msg_id"}) + func IncPublishCount(topic, status string) { NatsPublishingCount.WithLabelValues(topic, status).Inc() } diff --git a/vendor/github.com/devtron-labs/common-lib/pubsub-lib/model/PubSubClientBean.go b/vendor/github.com/devtron-labs/common-lib/pubsub-lib/model/PubSubClientBean.go index 7ec738fea47..6074fe46c5d 100644 --- a/vendor/github.com/devtron-labs/common-lib/pubsub-lib/model/PubSubClientBean.go +++ b/vendor/github.com/devtron-labs/common-lib/pubsub-lib/model/PubSubClientBean.go @@ -2,9 +2,12 @@ package model const PUBLISH_SUCCESS = "SUCCESS" const PUBLISH_FAILURE = "FAILURE" +const NatsMsgId = "Nats-Msg-Id" type PubSubMsg struct { - Data string + Data string + MsgDeliverCount uint64 + MsgId string } type LogsConfig struct { diff --git a/vendor/modules.txt b/vendor/modules.txt index b4b3bea1216..92df9ed244c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -354,7 +354,7 @@ github.com/devtron-labs/authenticator/jwt github.com/devtron-labs/authenticator/middleware github.com/devtron-labs/authenticator/oidc github.com/devtron-labs/authenticator/password -# github.com/devtron-labs/common-lib v0.0.9-0.20231226070212-c47f7a07ebf5 +# github.com/devtron-labs/common-lib v0.0.9-0.20240104121009-1052d04e42b1 ## explicit; go 1.20 github.com/devtron-labs/common-lib/blob-storage github.com/devtron-labs/common-lib/pubsub-lib