Skip to content

Commit

Permalink
feat(server): auto cancel previous builds triggered by PR (#1553)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jian Zeng committed Jan 10, 2021
1 parent 0ceb24f commit df25a0b
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 47 deletions.
6 changes: 3 additions & 3 deletions pkg/meta/annotations.go
Expand Up @@ -10,9 +10,6 @@ const (
// AnnotationDescription is the annotation key used to describe resources
AnnotationDescription = "cyclone.dev/description"

// AnnotationOwner is the annotation key used to indicate the owner of resources.
AnnotationOwner = "cyclone.dev/owner"

// AnnotationStageName is annotation applied to pod to indicate which stage it related to
AnnotationStageName = "stage.cyclone.dev/name"

Expand All @@ -25,6 +22,9 @@ const (
// AnnotationWorkflowRunSCMEvent is the annotation key used to indicate the SCM event data to trigger workflowruns.
AnnotationWorkflowRunSCMEvent = "workflowrun.cyclone.dev/scm-event"

// AnnotationWorkflowRunPRUpdatedAt is the annotation key used to indicate the time that SCM event gets triggered.
AnnotationWorkflowRunPRUpdatedAt = "workflowrun.cyclone.dev/scm-pr-updated-at"

// AnnotationTenantInfo is the annotation key used for namespace to relate tenant information
AnnotationTenantInfo = "tenant.cyclone.dev/info"

Expand Down
3 changes: 3 additions & 0 deletions pkg/meta/labels.go
Expand Up @@ -26,6 +26,9 @@ const (
// LabelWorkflowRunName is the label key used to indicate the workflowrun which the resources belongs to
LabelWorkflowRunName = "workflowrun.cyclone.dev/name"

// LabelWorkflowRunPRRef is the label key used to indicate the ref of PR that workflowrun belongs to
LabelWorkflowRunPRRef = "workflowrun.cyclone.dev/scm-pr-ref"

// LabelWorkflowRunAcceleration is the label key used to indicate a workflowrun turned on acceleration
LabelWorkflowRunAcceleration = "workflowrun.cyclone.dev/acceleration"

Expand Down
4 changes: 4 additions & 0 deletions pkg/server/biz/scm/bitbucket/server/webhooks.go
Expand Up @@ -6,6 +6,7 @@ import (
"io/ioutil"
"net/http"
"strings"
"time"

"github.com/caicloud/nirvana/log"

Expand Down Expand Up @@ -106,13 +107,15 @@ func ParseEvent(request *http.Request) *scm.EventData {
Ref: fmt.Sprintf(pullRefTemplate, payload.PullRequest.ID),
CommitSHA: payload.PullRequest.FromRef.LatestCommit,
Branch: payload.PullRequest.ToRef.DisplayID,
CreatedAt: time.Unix(payload.PullRequest.CreatedDate/1000, 0),
}
case PrModified:
return &scm.EventData{
Type: scm.PullRequestEventType,
Repo: fmt.Sprintf("%s/%s", strings.ToLower(payload.PullRequest.ToRef.Repository.Project.Key), payload.PullRequest.ToRef.Repository.Slug),
Ref: fmt.Sprintf(pullRefTemplate, payload.PullRequest.ID),
CommitSHA: payload.PullRequest.FromRef.LatestCommit,
CreatedAt: time.Unix(payload.PullRequest.UpdatedDate/1000, 0),
}
case PrCommentAdded:
return &scm.EventData{
Expand All @@ -121,6 +124,7 @@ func ParseEvent(request *http.Request) *scm.EventData {
Ref: fmt.Sprintf(pullRefTemplate, payload.PullRequest.ID),
Comment: payload.Comment.Text,
CommitSHA: payload.PullRequest.FromRef.LatestCommit,
CreatedAt: time.Unix(payload.PullRequest.CreatedDate/1000, 0),
}
default:
log.Warningln("Skip unsupported Bitbucket Server event")
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/biz/scm/github/github.go
Expand Up @@ -623,6 +623,7 @@ func ParseEvent(scmCfg *v1alpha1.SCMSource, request *http.Request) *scm.EventDat
Ref: fmt.Sprintf(pullRefTemplate, *event.PullRequest.Number),
CommitSHA: commitSHA,
Branch: *event.PullRequest.Base.Ref,
CreatedAt: event.GetPullRequest().GetUpdatedAt(),
}
case *github.IssueCommentEvent:
if event.Issue.PullRequestLinks == nil {
Expand Down Expand Up @@ -654,6 +655,7 @@ func ParseEvent(scmCfg *v1alpha1.SCMSource, request *http.Request) *scm.EventDat
Ref: fmt.Sprintf(pullRefTemplate, issueNumber),
Comment: *event.Comment.Body,
CommitSHA: commitSHA,
CreatedAt: event.GetComment().GetCreatedAt(),
}
case *github.PushEvent:
if event.After != nil && *event.After == "0000000000000000000000000000000000000000" {
Expand Down
13 changes: 13 additions & 0 deletions pkg/server/biz/scm/gitlab/gitlab.go
Expand Up @@ -25,6 +25,7 @@ import (
"net/http"
"reflect"
"strings"
"time"

"github.com/caicloud/nirvana/log"
"github.com/xanzy/go-gitlab"
Expand Down Expand Up @@ -459,6 +460,16 @@ func parseWebhook(r *http.Request) (payload interface{}, err error) {
return payload, nil
}

const timeLayout = "2006-01-02 15:04:05 MST"

func parseTime(s string) time.Time {
t, err := time.Parse(timeLayout, s)
if err != nil {
log.Warningf("Failed to parse time(%s): %v", s, err)
}
return t
}

// ParseEvent parses data from Gitlab events.
func ParseEvent(request *http.Request) *scm.EventData {
event, err := parseWebhook(request)
Expand Down Expand Up @@ -491,6 +502,7 @@ func ParseEvent(request *http.Request) *scm.EventData {
Ref: fmt.Sprintf(mergeRefTemplate, objectAttributes.Iid, objectAttributes.TargetBranch),
CommitSHA: objectAttributes.LastCommit.ID,
Branch: objectAttributes.TargetBranch,
CreatedAt: parseTime(objectAttributes.UpdatedAt),
}
case *MergeCommentEvent:
if event.MergeRequest == nil {
Expand All @@ -503,6 +515,7 @@ func ParseEvent(request *http.Request) *scm.EventData {
Ref: fmt.Sprintf(mergeRefTemplate, event.MergeRequest.IID, event.MergeRequest.TargetBranch),
Comment: event.ObjectAttributes.Note,
CommitSHA: event.MergeRequest.LastCommit.ID,
CreatedAt: parseTime(event.ObjectAttributes.CreatedAt),
}
case *gitlab.PushEvent:
if event.After == "0000000000000000000000000000000000000000" {
Expand Down
15 changes: 9 additions & 6 deletions pkg/server/biz/scm/scm.go
Expand Up @@ -19,6 +19,7 @@ package scm
import (
"fmt"
"strings"
"time"

"github.com/caicloud/nirvana/log"

Expand Down Expand Up @@ -172,12 +173,14 @@ const (

// EventData represents the data parsed from SCM events.
type EventData struct {
Type EventType
Repo string
Ref string
Branch string
Comment string
CommitSHA string
Type EventType
Repo string
Ref string
Branch string
Comment string
CommitSHA string
// CreatedAt is the time this event gets triggered at.
CreatedAt time.Time
ChangedFiles []string
}

Expand Down
159 changes: 121 additions & 38 deletions pkg/server/handler/v1alpha1/webhook.go
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"time"

"github.com/caicloud/nirvana/log"
"github.com/caicloud/nirvana/service"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/caicloud/cyclone/pkg/server/biz/scm/svn"
"github.com/caicloud/cyclone/pkg/server/common"
"github.com/caicloud/cyclone/pkg/server/handler"
"github.com/caicloud/cyclone/pkg/util"
"github.com/caicloud/cyclone/pkg/util/cerr"
)

Expand All @@ -31,6 +33,27 @@ const (
ignoredMsg = "Is ignored"
)

type task struct {
Tenant string
Trigger v1alpha1.WorkflowTrigger
EventData *scm.EventData
}

var taskQueue = make(chan task, 10)

func init() {
go webhookWorker(taskQueue)
}

func webhookWorker(tasks <-chan task) {
for t := range tasks {
err := createWorkflowRun(t.Tenant, t.Trigger, t.EventData)
if err != nil {
log.Errorf("wft %s create workflow run error:%v", t.Trigger.Name, err)
}
}
}

func newWebhookResponse(msg string) api.WebhookResponse {
return api.WebhookResponse{
Message: msg,
Expand Down Expand Up @@ -74,6 +97,8 @@ func HandleWebhook(ctx context.Context, tenant, eventType, integration string) (
if data == nil {
return newWebhookResponse(ignoredMsg), nil
}
// convert the time to UTC timezone
data.CreatedAt = data.CreatedAt.UTC()

wfts, err := hook.ListSCMWfts(tenant, data.Repo, integration)
if err != nil {
Expand All @@ -83,10 +108,12 @@ func HandleWebhook(ctx context.Context, tenant, eventType, integration string) (
triggeredWfts := make([]string, 0)
for _, wft := range wfts.Items {
log.Infof("Trigger workflow trigger %s", wft.Name)
triggeredWfts = append(triggeredWfts, wft.Name)
if err = createWorkflowRun(tenant, wft, data); err != nil {
log.Errorf("wft %s create workflow run error:%v", wft.Name, err)
taskQueue <- task{
Tenant: tenant,
Trigger: wft,
EventData: data,
}
triggeredWfts = append(triggeredWfts, wft.Name)
}
if len(triggeredWfts) > 0 {
return newWebhookResponse(fmt.Sprintf("%s: %s", succeededMsg, triggeredWfts)), nil
Expand All @@ -95,6 +122,21 @@ func HandleWebhook(ctx context.Context, tenant, eventType, integration string) (
return newWebhookResponse(ignoredMsg), nil
}

func sanitizeRef(ref string) string {
ret := make([]rune, 0, len(ref))
for _, ch := range ref {
switch {
case ch >= 'a' && ch <= 'z':
case ch >= 'A' && ch <= 'Z':
case ch >= '0' && ch <= '9':
default:
ch = '_'
}
ret = append(ret, ch)
}
return string(ret)
}

func createWorkflowRun(tenant string, wft v1alpha1.WorkflowTrigger, data *scm.EventData) error {
ns := wft.Namespace
cancelPrevious := false
Expand Down Expand Up @@ -191,6 +233,62 @@ func createWorkflowRun(tenant string, wft v1alpha1.WorkflowTrigger, data *scm.Ev
return nil
}

cycloneClient := handler.K8sClient.CycloneV1alpha1()
ctx := context.TODO()
skipCurrent := false
ref := sanitizeRef(data.Ref)

var currentWfrs []v1alpha1.WorkflowRun

if cancelPrevious {
wfrs, err := cycloneClient.WorkflowRuns(ns).List(metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s,%s=%s,%s=%s",
meta.LabelProjectName, project,
meta.LabelWorkflowName, wfName,
meta.LabelWorkflowRunPRRef, ref),
})
if err != nil {
log.Warningf("Fail to list previous WorkflowRuns: %v. project=%s, workflow=%s, ref=%s, trigger=%s", err,
project, wfName, ref, data.Type)
} else if !data.CreatedAt.IsZero() {
for _, item := range wfrs.Items {
if len(item.Annotations) == 0 {
continue
}
evtType := scm.EventType(item.Annotations[meta.AnnotationWorkflowRunTrigger])
if util.IsWorkflowRunTerminated(&item) ||
(evtType != scm.PullRequestEventType && evtType != scm.PullRequestCommentEventType) {
continue
}

currentWfrs = append(currentWfrs, item)

updatedAtStr := item.Annotations[meta.AnnotationWorkflowRunPRUpdatedAt]
if len(updatedAtStr) == 0 {
continue
}
updatedAt, err := time.Parse(time.RFC3339, updatedAtStr)
if err != nil {
log.Warningf("Fail to parse pr-updated-at: %s: %v. ns=%s, wfr=%s", updatedAtStr, err,
item.Namespace, item.Name)
continue
}
if updatedAt.After(data.CreatedAt) {
skipCurrent = true
log.Infof("There is already running workflowRun for PR %s. Ignore this event. Existing wfr is %s/%s", data.Ref, item.Namespace, item.Name)
break
}
}
} else {
log.Infof("The update time of PR %s/%s is unknown. Turn off canceling previous builds.", data.Repo, data.Ref)
cancelPrevious = false
}
}

if skipCurrent {
return nil
}

log.Infof("Trigger wft %s with event data: %v", wft.Name, data)

name := fmt.Sprintf("%s-%s", wfName, rand.String(5))
Expand All @@ -200,13 +298,15 @@ func createWorkflowRun(tenant string, wft v1alpha1.WorkflowTrigger, data *scm.Ev
ObjectMeta: metav1.ObjectMeta{
Name: name,
Annotations: map[string]string{
meta.AnnotationWorkflowRunTrigger: string(data.Type),
meta.AnnotationAlias: name,
meta.AnnotationWorkflowRunPRUpdatedAt: data.CreatedAt.Format(time.RFC3339),
meta.AnnotationWorkflowRunTrigger: string(data.Type),
meta.AnnotationAlias: name,
},
Labels: map[string]string{
meta.LabelProjectName: project,
meta.LabelWorkflowName: wfName,
meta.LabelWorkflowRunAcceleration: wft.Labels[meta.LabelWorkflowRunAcceleration],
meta.LabelWorkflowRunPRRef: ref,
},
},
Spec: wft.Spec.WorkflowRunSpec,
Expand Down Expand Up @@ -240,49 +340,32 @@ func createWorkflowRun(tenant string, wft v1alpha1.WorkflowTrigger, data *scm.Ev
}

accelerator.NewAccelerator(tenant, project, wfr).Accelerate()
cycloneClient := handler.K8sClient.CycloneV1alpha1()
_, err = cycloneClient.WorkflowRuns(ns).Create(wfr)
if err != nil {
return cerr.ConvertK8sError(err)
}

// Init pull-request status to pending
wfrCopy := wfr.DeepCopy()
wfrCopy.Status.Overall.Phase = v1alpha1.StatusRunning
err = updatePullRequestStatus(wfrCopy)
if err != nil {
log.Warningf("Init pull request status for %s error: %v", wfr.Name, err)
}
go func(wfrCopy *v1alpha1.WorkflowRun) {
// Init pull-request status to pending
wfrCopy.Status.Overall.Phase = v1alpha1.StatusRunning
err = updatePullRequestStatus(wfrCopy)
if err != nil {
log.Warningf("Init pull request status for %s error: %v", wfr.Name, err)
}
}(wfr.DeepCopy())

if !cancelPrevious {
return nil
}
wfrs, err := cycloneClient.WorkflowRuns(ns).List(metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s,%s=%s",
meta.LabelProjectName, project,
meta.LabelWorkflowName, wfName),
})
if err == nil {
log.Infof("Trying to cancel %d previous builds", len(wfrs.Items))
for _, item := range wfrs.Items {
if item.Annotations == nil {
continue
}
evtType := scm.EventType(item.Annotations[meta.AnnotationWorkflowRunTrigger])
if (evtType != scm.PullRequestEventType && evtType != scm.PullRequestCommentEventType) ||
// skip the WorkflowRun created above
item.Name == name {
continue
}
_, err := stopWorkflowRun(context.TODO(), &item, "AutoCancelPreviousBuild")
if err != nil {
log.Warningf("Stop previous WorkflowRun: %v. project=%s, workflow=%s, trigger=%s, name=%s", err,
project, wfName, data.Type, item.Name)
}

log.Infof("Trying to cancel %d previous builds for PR %s. repo=%s", len(currentWfrs), data.Ref, data.Repo)
for _, item := range currentWfrs {
_, err := stopWorkflowRun(ctx, &item, "AutoCancelPreviousBuild")
if err != nil {
log.Warningf("Fail to stop previous WorkflowRun %s/%s: %v. trigger=%s", err,
item.Namespace, item.Name, data.Type)
}
} else {
log.Warningf("Fail to list previous WorkflowRuns: %v. project=%s, workflow=%s, trigger=%s", err,
project, wfName, data.Type)
}

return nil
}

0 comments on commit df25a0b

Please sign in to comment.