diff --git a/pkg/meta/annotations.go b/pkg/meta/annotations.go index 9b8cff2e6..d2e677d81 100644 --- a/pkg/meta/annotations.go +++ b/pkg/meta/annotations.go @@ -10,9 +10,6 @@ const ( // AnnotationDescription is the annotation key used to describe resources AnnotationDescription = "cyclone.dev/description" - // AnnotationOwner is the annotation key used to indicate the owner of resources. - AnnotationOwner = "cyclone.dev/owner" - // AnnotationStageName is annotation applied to pod to indicate which stage it related to AnnotationStageName = "stage.cyclone.dev/name" @@ -25,6 +22,9 @@ const ( // AnnotationWorkflowRunSCMEvent is the annotation key used to indicate the SCM event data to trigger workflowruns. AnnotationWorkflowRunSCMEvent = "workflowrun.cyclone.dev/scm-event" + // AnnotationWorkflowRunPRUpdatedAt is the annotation key used to indicate the time that SCM event gets triggered. + AnnotationWorkflowRunPRUpdatedAt = "workflowrun.cyclone.dev/scm-pr-updated-at" + // AnnotationTenantInfo is the annotation key used for namespace to relate tenant information AnnotationTenantInfo = "tenant.cyclone.dev/info" diff --git a/pkg/meta/labels.go b/pkg/meta/labels.go index c4cf67fbe..b873d21e9 100644 --- a/pkg/meta/labels.go +++ b/pkg/meta/labels.go @@ -26,6 +26,9 @@ const ( // LabelWorkflowRunName is the label key used to indicate the workflowrun which the resources belongs to LabelWorkflowRunName = "workflowrun.cyclone.dev/name" + // LabelWorkflowRunPRRef is the label key used to indicate the ref of PR that workflowrun belongs to + LabelWorkflowRunPRRef = "workflowrun.cyclone.dev/scm-pr-ref" + // LabelWorkflowRunAcceleration is the label key used to indicate a workflowrun turned on acceleration LabelWorkflowRunAcceleration = "workflowrun.cyclone.dev/acceleration" diff --git a/pkg/server/biz/scm/bitbucket/server/webhooks.go b/pkg/server/biz/scm/bitbucket/server/webhooks.go index 97d5bd934..bb6a203c1 100644 --- a/pkg/server/biz/scm/bitbucket/server/webhooks.go +++ b/pkg/server/biz/scm/bitbucket/server/webhooks.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "net/http" "strings" + "time" "github.com/caicloud/nirvana/log" @@ -106,6 +107,7 @@ func ParseEvent(request *http.Request) *scm.EventData { Ref: fmt.Sprintf(pullRefTemplate, payload.PullRequest.ID), CommitSHA: payload.PullRequest.FromRef.LatestCommit, Branch: payload.PullRequest.ToRef.DisplayID, + CreatedAt: time.Unix(payload.PullRequest.CreatedDate/1000, 0), } case PrModified: return &scm.EventData{ @@ -113,6 +115,7 @@ func ParseEvent(request *http.Request) *scm.EventData { Repo: fmt.Sprintf("%s/%s", strings.ToLower(payload.PullRequest.ToRef.Repository.Project.Key), payload.PullRequest.ToRef.Repository.Slug), Ref: fmt.Sprintf(pullRefTemplate, payload.PullRequest.ID), CommitSHA: payload.PullRequest.FromRef.LatestCommit, + CreatedAt: time.Unix(payload.PullRequest.UpdatedDate/1000, 0), } case PrCommentAdded: return &scm.EventData{ @@ -121,6 +124,7 @@ func ParseEvent(request *http.Request) *scm.EventData { Ref: fmt.Sprintf(pullRefTemplate, payload.PullRequest.ID), Comment: payload.Comment.Text, CommitSHA: payload.PullRequest.FromRef.LatestCommit, + CreatedAt: time.Unix(payload.PullRequest.CreatedDate/1000, 0), } default: log.Warningln("Skip unsupported Bitbucket Server event") diff --git a/pkg/server/biz/scm/github/github.go b/pkg/server/biz/scm/github/github.go index 28842e0c0..947d152c8 100644 --- a/pkg/server/biz/scm/github/github.go +++ b/pkg/server/biz/scm/github/github.go @@ -623,6 +623,7 @@ func ParseEvent(scmCfg *v1alpha1.SCMSource, request *http.Request) *scm.EventDat Ref: fmt.Sprintf(pullRefTemplate, *event.PullRequest.Number), CommitSHA: commitSHA, Branch: *event.PullRequest.Base.Ref, + CreatedAt: event.GetPullRequest().GetUpdatedAt(), } case *github.IssueCommentEvent: if event.Issue.PullRequestLinks == nil { @@ -654,6 +655,7 @@ func ParseEvent(scmCfg *v1alpha1.SCMSource, request *http.Request) *scm.EventDat Ref: fmt.Sprintf(pullRefTemplate, issueNumber), Comment: *event.Comment.Body, CommitSHA: commitSHA, + CreatedAt: event.GetComment().GetCreatedAt(), } case *github.PushEvent: if event.After != nil && *event.After == "0000000000000000000000000000000000000000" { diff --git a/pkg/server/biz/scm/gitlab/gitlab.go b/pkg/server/biz/scm/gitlab/gitlab.go index 17543a120..202d58357 100644 --- a/pkg/server/biz/scm/gitlab/gitlab.go +++ b/pkg/server/biz/scm/gitlab/gitlab.go @@ -25,6 +25,7 @@ import ( "net/http" "reflect" "strings" + "time" "github.com/caicloud/nirvana/log" "github.com/xanzy/go-gitlab" @@ -459,6 +460,16 @@ func parseWebhook(r *http.Request) (payload interface{}, err error) { return payload, nil } +const timeLayout = "2006-01-02 15:04:05 MST" + +func parseTime(s string) time.Time { + t, err := time.Parse(timeLayout, s) + if err != nil { + log.Warningf("Failed to parse time(%s): %v", s, err) + } + return t +} + // ParseEvent parses data from Gitlab events. func ParseEvent(request *http.Request) *scm.EventData { event, err := parseWebhook(request) @@ -491,6 +502,7 @@ func ParseEvent(request *http.Request) *scm.EventData { Ref: fmt.Sprintf(mergeRefTemplate, objectAttributes.Iid, objectAttributes.TargetBranch), CommitSHA: objectAttributes.LastCommit.ID, Branch: objectAttributes.TargetBranch, + CreatedAt: parseTime(objectAttributes.UpdatedAt), } case *MergeCommentEvent: if event.MergeRequest == nil { @@ -503,6 +515,7 @@ func ParseEvent(request *http.Request) *scm.EventData { Ref: fmt.Sprintf(mergeRefTemplate, event.MergeRequest.IID, event.MergeRequest.TargetBranch), Comment: event.ObjectAttributes.Note, CommitSHA: event.MergeRequest.LastCommit.ID, + CreatedAt: parseTime(event.ObjectAttributes.CreatedAt), } case *gitlab.PushEvent: if event.After == "0000000000000000000000000000000000000000" { diff --git a/pkg/server/biz/scm/scm.go b/pkg/server/biz/scm/scm.go index b45a0b278..8a183d157 100644 --- a/pkg/server/biz/scm/scm.go +++ b/pkg/server/biz/scm/scm.go @@ -19,6 +19,7 @@ package scm import ( "fmt" "strings" + "time" "github.com/caicloud/nirvana/log" @@ -172,12 +173,14 @@ const ( // EventData represents the data parsed from SCM events. type EventData struct { - Type EventType - Repo string - Ref string - Branch string - Comment string - CommitSHA string + Type EventType + Repo string + Ref string + Branch string + Comment string + CommitSHA string + // CreatedAt is the time this event gets triggered at. + CreatedAt time.Time ChangedFiles []string } diff --git a/pkg/server/handler/v1alpha1/webhook.go b/pkg/server/handler/v1alpha1/webhook.go index 69c5ee45b..5066d0c88 100644 --- a/pkg/server/handler/v1alpha1/webhook.go +++ b/pkg/server/handler/v1alpha1/webhook.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" "github.com/caicloud/nirvana/log" "github.com/caicloud/nirvana/service" @@ -22,6 +23,7 @@ import ( "github.com/caicloud/cyclone/pkg/server/biz/scm/svn" "github.com/caicloud/cyclone/pkg/server/common" "github.com/caicloud/cyclone/pkg/server/handler" + "github.com/caicloud/cyclone/pkg/util" "github.com/caicloud/cyclone/pkg/util/cerr" ) @@ -31,6 +33,27 @@ const ( ignoredMsg = "Is ignored" ) +type task struct { + Tenant string + Trigger v1alpha1.WorkflowTrigger + EventData *scm.EventData +} + +var taskQueue = make(chan task, 10) + +func init() { + go webhookWorker(taskQueue) +} + +func webhookWorker(tasks <-chan task) { + for t := range tasks { + err := createWorkflowRun(t.Tenant, t.Trigger, t.EventData) + if err != nil { + log.Errorf("wft %s create workflow run error:%v", t.Trigger.Name, err) + } + } +} + func newWebhookResponse(msg string) api.WebhookResponse { return api.WebhookResponse{ Message: msg, @@ -74,6 +97,8 @@ func HandleWebhook(ctx context.Context, tenant, eventType, integration string) ( if data == nil { return newWebhookResponse(ignoredMsg), nil } + // convert the time to UTC timezone + data.CreatedAt = data.CreatedAt.UTC() wfts, err := hook.ListSCMWfts(tenant, data.Repo, integration) if err != nil { @@ -83,10 +108,12 @@ func HandleWebhook(ctx context.Context, tenant, eventType, integration string) ( triggeredWfts := make([]string, 0) for _, wft := range wfts.Items { log.Infof("Trigger workflow trigger %s", wft.Name) - triggeredWfts = append(triggeredWfts, wft.Name) - if err = createWorkflowRun(tenant, wft, data); err != nil { - log.Errorf("wft %s create workflow run error:%v", wft.Name, err) + taskQueue <- task{ + Tenant: tenant, + Trigger: wft, + EventData: data, } + triggeredWfts = append(triggeredWfts, wft.Name) } if len(triggeredWfts) > 0 { return newWebhookResponse(fmt.Sprintf("%s: %s", succeededMsg, triggeredWfts)), nil @@ -95,6 +122,21 @@ func HandleWebhook(ctx context.Context, tenant, eventType, integration string) ( return newWebhookResponse(ignoredMsg), nil } +func sanitizeRef(ref string) string { + ret := make([]rune, 0, len(ref)) + for _, ch := range ref { + switch { + case ch >= 'a' && ch <= 'z': + case ch >= 'A' && ch <= 'Z': + case ch >= '0' && ch <= '9': + default: + ch = '_' + } + ret = append(ret, ch) + } + return string(ret) +} + func createWorkflowRun(tenant string, wft v1alpha1.WorkflowTrigger, data *scm.EventData) error { ns := wft.Namespace cancelPrevious := false @@ -191,6 +233,62 @@ func createWorkflowRun(tenant string, wft v1alpha1.WorkflowTrigger, data *scm.Ev return nil } + cycloneClient := handler.K8sClient.CycloneV1alpha1() + ctx := context.TODO() + skipCurrent := false + ref := sanitizeRef(data.Ref) + + var currentWfrs []v1alpha1.WorkflowRun + + if cancelPrevious { + wfrs, err := cycloneClient.WorkflowRuns(ns).List(metav1.ListOptions{ + LabelSelector: fmt.Sprintf("%s=%s,%s=%s,%s=%s", + meta.LabelProjectName, project, + meta.LabelWorkflowName, wfName, + meta.LabelWorkflowRunPRRef, ref), + }) + if err != nil { + log.Warningf("Fail to list previous WorkflowRuns: %v. project=%s, workflow=%s, ref=%s, trigger=%s", err, + project, wfName, ref, data.Type) + } else if !data.CreatedAt.IsZero() { + for _, item := range wfrs.Items { + if len(item.Annotations) == 0 { + continue + } + evtType := scm.EventType(item.Annotations[meta.AnnotationWorkflowRunTrigger]) + if util.IsWorkflowRunTerminated(&item) || + (evtType != scm.PullRequestEventType && evtType != scm.PullRequestCommentEventType) { + continue + } + + currentWfrs = append(currentWfrs, item) + + updatedAtStr := item.Annotations[meta.AnnotationWorkflowRunPRUpdatedAt] + if len(updatedAtStr) == 0 { + continue + } + updatedAt, err := time.Parse(time.RFC3339, updatedAtStr) + if err != nil { + log.Warningf("Fail to parse pr-updated-at: %s: %v. ns=%s, wfr=%s", updatedAtStr, err, + item.Namespace, item.Name) + continue + } + if updatedAt.After(data.CreatedAt) { + skipCurrent = true + log.Infof("There is already running workflowRun for PR %s. Ignore this event. Existing wfr is %s/%s", data.Ref, item.Namespace, item.Name) + break + } + } + } else { + log.Infof("The update time of PR %s/%s is unknown. Turn off canceling previous builds.", data.Repo, data.Ref) + cancelPrevious = false + } + } + + if skipCurrent { + return nil + } + log.Infof("Trigger wft %s with event data: %v", wft.Name, data) name := fmt.Sprintf("%s-%s", wfName, rand.String(5)) @@ -200,13 +298,15 @@ func createWorkflowRun(tenant string, wft v1alpha1.WorkflowTrigger, data *scm.Ev ObjectMeta: metav1.ObjectMeta{ Name: name, Annotations: map[string]string{ - meta.AnnotationWorkflowRunTrigger: string(data.Type), - meta.AnnotationAlias: name, + meta.AnnotationWorkflowRunPRUpdatedAt: data.CreatedAt.Format(time.RFC3339), + meta.AnnotationWorkflowRunTrigger: string(data.Type), + meta.AnnotationAlias: name, }, Labels: map[string]string{ meta.LabelProjectName: project, meta.LabelWorkflowName: wfName, meta.LabelWorkflowRunAcceleration: wft.Labels[meta.LabelWorkflowRunAcceleration], + meta.LabelWorkflowRunPRRef: ref, }, }, Spec: wft.Spec.WorkflowRunSpec, @@ -240,49 +340,32 @@ func createWorkflowRun(tenant string, wft v1alpha1.WorkflowTrigger, data *scm.Ev } accelerator.NewAccelerator(tenant, project, wfr).Accelerate() - cycloneClient := handler.K8sClient.CycloneV1alpha1() _, err = cycloneClient.WorkflowRuns(ns).Create(wfr) if err != nil { return cerr.ConvertK8sError(err) } - // Init pull-request status to pending - wfrCopy := wfr.DeepCopy() - wfrCopy.Status.Overall.Phase = v1alpha1.StatusRunning - err = updatePullRequestStatus(wfrCopy) - if err != nil { - log.Warningf("Init pull request status for %s error: %v", wfr.Name, err) - } + go func(wfrCopy *v1alpha1.WorkflowRun) { + // Init pull-request status to pending + wfrCopy.Status.Overall.Phase = v1alpha1.StatusRunning + err = updatePullRequestStatus(wfrCopy) + if err != nil { + log.Warningf("Init pull request status for %s error: %v", wfr.Name, err) + } + }(wfr.DeepCopy()) if !cancelPrevious { return nil } - wfrs, err := cycloneClient.WorkflowRuns(ns).List(metav1.ListOptions{ - LabelSelector: fmt.Sprintf("%s=%s,%s=%s", - meta.LabelProjectName, project, - meta.LabelWorkflowName, wfName), - }) - if err == nil { - log.Infof("Trying to cancel %d previous builds", len(wfrs.Items)) - for _, item := range wfrs.Items { - if item.Annotations == nil { - continue - } - evtType := scm.EventType(item.Annotations[meta.AnnotationWorkflowRunTrigger]) - if (evtType != scm.PullRequestEventType && evtType != scm.PullRequestCommentEventType) || - // skip the WorkflowRun created above - item.Name == name { - continue - } - _, err := stopWorkflowRun(context.TODO(), &item, "AutoCancelPreviousBuild") - if err != nil { - log.Warningf("Stop previous WorkflowRun: %v. project=%s, workflow=%s, trigger=%s, name=%s", err, - project, wfName, data.Type, item.Name) - } + + log.Infof("Trying to cancel %d previous builds for PR %s. repo=%s", len(currentWfrs), data.Ref, data.Repo) + for _, item := range currentWfrs { + _, err := stopWorkflowRun(ctx, &item, "AutoCancelPreviousBuild") + if err != nil { + log.Warningf("Fail to stop previous WorkflowRun %s/%s: %v. trigger=%s", err, + item.Namespace, item.Name, data.Type) } - } else { - log.Warningf("Fail to list previous WorkflowRuns: %v. project=%s, workflow=%s, trigger=%s", err, - project, wfName, data.Type) } + return nil } diff --git a/pkg/util/util.go b/pkg/util/util.go new file mode 100644 index 000000000..e7d0e8ec0 --- /dev/null +++ b/pkg/util/util.go @@ -0,0 +1,20 @@ +package util + +import "github.com/caicloud/cyclone/pkg/apis/cyclone/v1alpha1" + +// IsWorkflowRunTerminated judges whether the WorkflowRun has be terminated. +// Return true if terminated, otherwise return false. +func IsWorkflowRunTerminated(wfr *v1alpha1.WorkflowRun) bool { + return IsPhaseTerminated(wfr.Status.Overall.Phase) +} + +// IsPhaseTerminated judges whether the phase is terminated +func IsPhaseTerminated(phase v1alpha1.StatusPhase) bool { + if phase == v1alpha1.StatusSucceeded || + phase == v1alpha1.StatusFailed || + phase == v1alpha1.StatusCancelled { + return true + } + + return false +}