Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: making pre,post,deploy triggers flows idempotent #4486

Merged
merged 20 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 8 additions & 2 deletions api/restHandler/PipelineTriggerRestHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,10 @@ func (handler PipelineTriggerRestHandlerImpl) OverrideConfig(w http.ResponseWrit
}
ctx := context.WithValue(r.Context(), "token", acdToken)
_, span := otel.Tracer("orchestrator").Start(ctx, "workflowDagExecutor.ManualCdTrigger")
mergeResp, err := handler.workflowDagExecutor.ManualCdTrigger(&overrideRequest, ctx)
triggerContext := pipeline.TriggerContext{
Context: ctx,
}
mergeResp, err := handler.workflowDagExecutor.ManualCdTrigger(triggerContext, &overrideRequest)
span.End()
if err != nil {
handler.logger.Errorw("request err, OverrideConfig", "err", err, "payload", overrideRequest)
Expand Down Expand Up @@ -224,7 +227,10 @@ func (handler PipelineTriggerRestHandlerImpl) StartStopApp(w http.ResponseWriter
return
}
ctx := context.WithValue(r.Context(), "token", acdToken)
mergeResp, err := handler.workflowDagExecutor.StopStartApp(&overrideRequest, ctx)
triggerContext := pipeline.TriggerContext{
Context: ctx,
}
mergeResp, err := handler.workflowDagExecutor.StopStartApp(triggerContext, &overrideRequest)
if err != nil {
handler.logger.Errorw("service err, StartStopApp", "err", err, "payload", overrideRequest)
common.WriteJsonResp(w, err, nil, http.StatusInternalServerError)
Expand Down
45 changes: 32 additions & 13 deletions api/router/pubsub/ApplicationStatusHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"github.com/devtron-labs/common-lib/pubsub-lib/model"
"github.com/devtron-labs/devtron/pkg/app"
"k8s.io/utils/pointer"
"time"

"github.com/devtron-labs/devtron/internal/sql/repository/pipelineConfig"
Expand Down Expand Up @@ -74,7 +75,7 @@ func NewApplicationStatusHandlerImpl(logger *zap.SugaredLogger, pubsubClient *pu
}
err := appStatusUpdateHandlerImpl.Subscribe()
if err != nil {
//logger.Error("err", err)
// logger.Error("err", err)
return nil
}
err = appStatusUpdateHandlerImpl.SubscribeDeleteStatus()
Expand All @@ -91,7 +92,6 @@ type ApplicationDetail struct {

func (impl *ApplicationStatusHandlerImpl) Subscribe() error {
callback := func(msg *model.PubSubMsg) {
impl.logger.Debugw("APP_STATUS_UPDATE_REQ", "stage", "raw", "data", msg.Data)
applicationDetail := ApplicationDetail{}
err := json.Unmarshal([]byte(msg.Data), &applicationDetail)
if err != nil {
Expand All @@ -109,10 +109,10 @@ func (impl *ApplicationStatusHandlerImpl) Subscribe() error {
_, err = impl.pipelineRepository.GetArgoPipelineByArgoAppName(app.ObjectMeta.Name)
if err != nil && err == pg.ErrNoRows {
impl.logger.Infow("this app not found in pipeline table looking in installed_apps table", "appName", app.ObjectMeta.Name)
//if not found in pipeline table then search in installed_apps table
// if not found in pipeline table then search in installed_apps table
gitOpsDeployedAppNames, err := impl.installedAppRepository.GetAllGitOpsDeploymentAppName()
if err != nil && err == pg.ErrNoRows {
//no installed_apps found
// no installed_apps found
impl.logger.Errorw("no installed apps found", "err", err)
return
} else if err != nil {
Expand All @@ -127,17 +127,17 @@ func (impl *ApplicationStatusHandlerImpl) Subscribe() error {
devtronGitOpsAppName = app.ObjectMeta.Name
}
if slices.Contains(gitOpsDeployedAppNames, devtronGitOpsAppName) {
//app found in installed_apps table hence setting flag to true
// app found in installed_apps table hence setting flag to true
isAppStoreApplication = true
} else {
//app neither found in installed_apps nor in pipeline table hence returning
// app neither found in installed_apps nor in pipeline table hence returning
return
}
}
isSucceeded, pipelineOverride, err := impl.appService.UpdateDeploymentStatusAndCheckIsSucceeded(app, applicationDetail.StatusTime, isAppStoreApplication)
if err != nil {
impl.logger.Errorw("error on application status update", "err", err, "msg", string(msg.Data))
//TODO - check update for charts - fix this call
// TODO - check update for charts - fix this call
if err == pg.ErrNoRows {
// if not found in charts (which is for devtron apps) try to find in installed app (which is for devtron charts)
_, err := impl.installedAppService.UpdateInstalledAppVersionStatus(app)
Expand All @@ -153,7 +153,10 @@ func (impl *ApplicationStatusHandlerImpl) Subscribe() error {
// invoke DagExecutor, for cd success which will trigger post stage if exist.
if isSucceeded {
impl.logger.Debugw("git hash history", "list", app.Status.History)
err = impl.workflowDagExecutor.HandleDeploymentSuccessEvent(pipelineOverride)
triggerContext := pipeline.TriggerContext{
ReferenceId: pointer.String(msg.MsgId),
}
err = impl.workflowDagExecutor.HandleDeploymentSuccessEvent(triggerContext, pipelineOverride)
if err != nil {
impl.logger.Errorw("deployment success event error", "pipelineOverride", pipelineOverride, "err", err)
return
Expand All @@ -162,7 +165,13 @@ func (impl *ApplicationStatusHandlerImpl) Subscribe() error {
impl.logger.Debugw("application status update completed", "app", app.Name)
}

err := impl.pubsubClient.Subscribe(pubsub.APPLICATION_STATUS_UPDATE_TOPIC, callback)
// add required logging here
var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) {
return "", nil
}

validations := impl.workflowDagExecutor.GetTriggerValidateFuncs()
err := impl.pubsubClient.Subscribe(pubsub.APPLICATION_STATUS_UPDATE_TOPIC, callback, loggerFunc, validations...)
if err != nil {
impl.logger.Error(err)
return err
Expand All @@ -173,7 +182,6 @@ func (impl *ApplicationStatusHandlerImpl) Subscribe() error {
func (impl *ApplicationStatusHandlerImpl) SubscribeDeleteStatus() error {
callback := func(msg *model.PubSubMsg) {

impl.logger.Debugw("APP_STATUS_DELETE_REQ", "stage", "raw", "data", msg.Data)
applicationDetail := ApplicationDetail{}
err := json.Unmarshal([]byte(msg.Data), &applicationDetail)
if err != nil {
Expand All @@ -191,7 +199,18 @@ func (impl *ApplicationStatusHandlerImpl) SubscribeDeleteStatus() error {
impl.logger.Errorw("error in updating pipeline delete status", "err", err, "appName", app.Name)
}
}
err := impl.pubsubClient.Subscribe(pubsub.APPLICATION_STATUS_DELETE_TOPIC, callback)

// add required logging here
var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) {
applicationDetail := ApplicationDetail{}
err := json.Unmarshal([]byte(msg.Data), &applicationDetail)
if err != nil {
return "unmarshal error on app delete status", []interface{}{"err", err}
}
return "got message for application status delete", []interface{}{"appName", applicationDetail.Application.Name, "namespace", applicationDetail.Application.Namespace, "deleteTimestamp", applicationDetail.Application.DeletionTimestamp}
}

err := impl.pubsubClient.Subscribe(pubsub.APPLICATION_STATUS_DELETE_TOPIC, callback, loggerFunc)
if err != nil {
impl.logger.Errorw("error in subscribing to argo application status delete topic", "err", err)
return err
Expand All @@ -210,7 +229,7 @@ func (impl *ApplicationStatusHandlerImpl) updateArgoAppDeleteStatus(app *v1alpha
return errors.New("invalid nats message, pipeline already deleted")
}
if err == pg.ErrNoRows {
//Helm app deployed using argocd
// Helm app deployed using argocd
var gitHash string
if app.Operation != nil && app.Operation.Sync != nil {
gitHash = app.Operation.Sync.Revision
Expand All @@ -229,7 +248,7 @@ func (impl *ApplicationStatusHandlerImpl) updateArgoAppDeleteStatus(app *v1alpha
impl.logger.Errorw("App not found in database", "installedAppId", model.InstalledAppId, "err", err)
return fmt.Errorf("app not found in database %s", err)
} else if installedApp.DeploymentAppDeleteRequest == false {
//TODO 4465 remove app from log after final RCA
// TODO 4465 remove app from log after final RCA
impl.logger.Infow("Deployment delete not requested for app, not deleting app from DB", "appName", app.Name, "app", app)
return nil
}
Expand Down
29 changes: 22 additions & 7 deletions api/router/pubsub/CiEventHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/devtron-labs/devtron/util"
"go.uber.org/zap"
"gopkg.in/go-playground/validator.v9"
"k8s.io/utils/pointer"
"time"
)

Expand Down Expand Up @@ -100,8 +101,6 @@ func NewCiEventHandlerImpl(logger *zap.SugaredLogger, pubsubClient *pubsub.PubSu

func (impl *CiEventHandlerImpl) Subscribe() error {
callback := func(msg *model.PubSubMsg) {
impl.logger.Debugw("ci complete event received")
//defer msg.Ack()
ciCompleteEvent := CiCompleteEvent{}
err := json.Unmarshal([]byte(string(msg.Data)), &ciCompleteEvent)
if err != nil {
Expand All @@ -114,6 +113,10 @@ func (impl *CiEventHandlerImpl) Subscribe() error {
return
}

triggerContext := pipeline.TriggerContext{
ReferenceId: pointer.String(msg.MsgId),
}

if ciCompleteEvent.FailureReason != "" {
req.FailureReason = ciCompleteEvent.FailureReason
err := impl.webhookService.HandleCiStepFailedEvent(ciCompleteEvent.PipelineId, req)
Expand All @@ -136,7 +139,7 @@ func (impl *CiEventHandlerImpl) Subscribe() error {
impl.logger.Error("Error while creating request for pipelineID", "pipelineId", ciCompleteEvent.PipelineId, "err", err)
return
}
resp, err := impl.ValidateAndHandleCiSuccessEvent(ciCompleteEvent.PipelineId, request, detail.ImagePushedAt)
resp, err := impl.ValidateAndHandleCiSuccessEvent(triggerContext, ciCompleteEvent.PipelineId, request, detail.ImagePushedAt)
if err != nil {
return
}
Expand All @@ -146,28 +149,40 @@ func (impl *CiEventHandlerImpl) Subscribe() error {

} else {
util.TriggerCIMetrics(ciCompleteEvent.Metrics, impl.ciEventConfig.ExposeCiMetrics, ciCompleteEvent.PipelineName, ciCompleteEvent.AppName)
resp, err := impl.ValidateAndHandleCiSuccessEvent(ciCompleteEvent.PipelineId, req, &time.Time{})
resp, err := impl.ValidateAndHandleCiSuccessEvent(triggerContext, ciCompleteEvent.PipelineId, req, &time.Time{})
if err != nil {
return
}
impl.logger.Debug(resp)
}
}
err := impl.pubsubClient.Subscribe(pubsub.CI_COMPLETE_TOPIC, callback)

// add required logging here
var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) {
ciCompleteEvent := CiCompleteEvent{}
err := json.Unmarshal([]byte(string(msg.Data)), &ciCompleteEvent)
if err != nil {
return "error while unmarshalling json data", []interface{}{"error", err}
}
return "got message for ci-completion", []interface{}{"ciPipelineId", ciCompleteEvent.PipelineId, "workflowId", ciCompleteEvent.WorkflowId}
}

validations := impl.webhookService.GetTriggerValidateFuncs()
err := impl.pubsubClient.Subscribe(pubsub.CI_COMPLETE_TOPIC, callback, loggerFunc, validations...)
if err != nil {
impl.logger.Error(err)
return err
}
return nil
}

func (impl *CiEventHandlerImpl) ValidateAndHandleCiSuccessEvent(ciPipelineId int, request *pipeline.CiArtifactWebhookRequest, imagePushedAt *time.Time) (int, error) {
func (impl *CiEventHandlerImpl) ValidateAndHandleCiSuccessEvent(triggerContext pipeline.TriggerContext, ciPipelineId int, request *pipeline.CiArtifactWebhookRequest, imagePushedAt *time.Time) (int, error) {
validationErr := impl.validator.Struct(request)
if validationErr != nil {
impl.logger.Errorw("validation err, HandleCiSuccessEvent", "err", validationErr, "payload", request)
return 0, validationErr
}
buildArtifactId, err := impl.webhookService.HandleCiSuccessEvent(ciPipelineId, request, imagePushedAt)
buildArtifactId, err := impl.webhookService.HandleCiSuccessEvent(triggerContext, ciPipelineId, request, imagePushedAt)
if err != nil {
impl.logger.Error("Error while sending event for CI success for pipelineID",
ciPipelineId, "request", request, "error", err)
Expand Down
13 changes: 12 additions & 1 deletion api/router/pubsub/GitWebhookHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,18 @@ func (impl *GitWebhookHandlerImpl) Subscribe() error {
return
}
}
err := impl.pubsubClient.Subscribe(pubsub.NEW_CI_MATERIAL_TOPIC, callback)

// add required logging here
var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) {
ciPipelineMaterial := gitSensor.CiPipelineMaterial{}
err := json.Unmarshal([]byte(string(msg.Data)), &ciPipelineMaterial)
if err != nil {
return "error while unmarshalling json response", []interface{}{"error", err}
}
return "got message for about new ci material", []interface{}{"ciPipelineMaterialId", ciPipelineMaterial.Id, "gitMaterialId", ciPipelineMaterial.GitMaterialId, "type", ciPipelineMaterial.Type}
}

err := impl.pubsubClient.Subscribe(pubsub.NEW_CI_MATERIAL_TOPIC, callback, loggerFunc)
if err != nil {
impl.logger.Error("err", err)
return err
Expand Down
34 changes: 27 additions & 7 deletions api/router/pubsub/WorkflowStatusUpdateHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ func NewWorkflowStatusUpdateHandlerImpl(logger *zap.SugaredLogger, pubsubClient

func (impl *WorkflowStatusUpdateHandlerImpl) Subscribe() error {
callback := func(msg *model.PubSubMsg) {
impl.logger.Debug("received wf update request")
//defer msg.Ack()
wfStatus := v1alpha1.WorkflowStatus{}
err := json.Unmarshal([]byte(string(msg.Data)), &wfStatus)
if err != nil {
Expand All @@ -93,7 +91,20 @@ func (impl *WorkflowStatusUpdateHandlerImpl) Subscribe() error {
}

}
err := impl.pubsubClient.Subscribe(pubsub.WORKFLOW_STATUS_UPDATE_TOPIC, callback)

// add required logging here
var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) {
wfStatus := v1alpha1.WorkflowStatus{}
err := json.Unmarshal([]byte(string(msg.Data)), &wfStatus)
if err != nil {
return "error while unmarshalling wf status update", []interface{}{"err", err, "msg", string(msg.Data)}
}

workflowName, status, _, message, _, _ := pipeline.ExtractWorkflowStatus(wfStatus)
return "got message for ci workflow status update ", []interface{}{"workflowName", workflowName, "status", status, "message", message}
}

err := impl.pubsubClient.Subscribe(pubsub.WORKFLOW_STATUS_UPDATE_TOPIC, callback, loggerFunc)

if err != nil {
impl.logger.Error("err", err)
Expand All @@ -104,16 +115,13 @@ func (impl *WorkflowStatusUpdateHandlerImpl) Subscribe() error {

func (impl *WorkflowStatusUpdateHandlerImpl) SubscribeCD() error {
callback := func(msg *model.PubSubMsg) {
impl.logger.Debug("received cd wf update request")
//defer msg.Ack()
wfStatus := v1alpha1.WorkflowStatus{}
err := json.Unmarshal([]byte(string(msg.Data)), &wfStatus)
if err != nil {
impl.logger.Error("Error while unmarshalling wfStatus json object", "error", err)
return
}

impl.logger.Debugw("received cd wf update request body", "body", wfStatus)
wfrId, wfrStatus, err := impl.cdHandler.UpdateWorkflow(wfStatus)
impl.logger.Debugw("UpdateWorkflow", "wfrId", wfrId, "wfrStatus", wfrStatus)
if err != nil {
Expand Down Expand Up @@ -170,7 +178,19 @@ func (impl *WorkflowStatusUpdateHandlerImpl) SubscribeCD() error {
}
}
}
err := impl.pubsubClient.Subscribe(pubsub.CD_WORKFLOW_STATUS_UPDATE, callback)

// add required logging here
var loggerFunc pubsub.LoggerFunc = func(msg model.PubSubMsg) (string, []interface{}) {
wfStatus := v1alpha1.WorkflowStatus{}
err := json.Unmarshal([]byte(string(msg.Data)), &wfStatus)
if err != nil {
return "error while unmarshalling wfStatus json object", []interface{}{"error", err}
}
workflowName, status, _, message, _, _ := pipeline.ExtractWorkflowStatus(wfStatus)
return "got message for cd workflow status", []interface{}{"workflowName", workflowName, "status", status, "message", message}
}

err := impl.pubsubClient.Subscribe(pubsub.CD_WORKFLOW_STATUS_UPDATE, callback, loggerFunc)
if err != nil {
impl.logger.Error("err", err)
return err
Expand Down