Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cps-2.10-infra] feat(server): auto cancel previous builds triggered by PR #1553

Merged
merged 1 commit into from
Jan 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/meta/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@ const (
// AnnotationDescription is the annotation key used to describe resources
AnnotationDescription = "cyclone.dev/description"

// AnnotationOwner is the annotation key used to indicate the owner of resources.
AnnotationOwner = "cyclone.dev/owner"

// AnnotationStageName is annotation applied to pod to indicate which stage it related to
AnnotationStageName = "stage.cyclone.dev/name"

Expand All @@ -25,6 +22,9 @@ const (
// AnnotationWorkflowRunSCMEvent is the annotation key used to indicate the SCM event data to trigger workflowruns.
AnnotationWorkflowRunSCMEvent = "workflowrun.cyclone.dev/scm-event"

// AnnotationWorkflowRunPRUpdatedAt is the annotation key used to indicate the time that SCM event gets triggered.
AnnotationWorkflowRunPRUpdatedAt = "workflowrun.cyclone.dev/scm-pr-updated-at"

// AnnotationTenantInfo is the annotation key used for namespace to relate tenant information
AnnotationTenantInfo = "tenant.cyclone.dev/info"

Expand Down
3 changes: 3 additions & 0 deletions pkg/meta/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const (
// LabelWorkflowRunName is the label key used to indicate the workflowrun which the resources belongs to
LabelWorkflowRunName = "workflowrun.cyclone.dev/name"

// LabelWorkflowRunPRRef is the label key used to indicate the ref of PR that workflowrun belongs to
LabelWorkflowRunPRRef = "workflowrun.cyclone.dev/scm-pr-ref"

// LabelWorkflowRunAcceleration is the label key used to indicate a workflowrun turned on acceleration
LabelWorkflowRunAcceleration = "workflowrun.cyclone.dev/acceleration"

Expand Down
4 changes: 4 additions & 0 deletions pkg/server/biz/scm/bitbucket/server/webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io/ioutil"
"net/http"
"strings"
"time"

"github.com/caicloud/nirvana/log"

Expand Down Expand Up @@ -106,13 +107,15 @@ func ParseEvent(request *http.Request) *scm.EventData {
Ref: fmt.Sprintf(pullRefTemplate, payload.PullRequest.ID),
CommitSHA: payload.PullRequest.FromRef.LatestCommit,
Branch: payload.PullRequest.ToRef.DisplayID,
CreatedAt: time.Unix(payload.PullRequest.CreatedDate/1000, 0),
}
case PrModified:
return &scm.EventData{
Type: scm.PullRequestEventType,
Repo: fmt.Sprintf("%s/%s", strings.ToLower(payload.PullRequest.ToRef.Repository.Project.Key), payload.PullRequest.ToRef.Repository.Slug),
Ref: fmt.Sprintf(pullRefTemplate, payload.PullRequest.ID),
CommitSHA: payload.PullRequest.FromRef.LatestCommit,
CreatedAt: time.Unix(payload.PullRequest.UpdatedDate/1000, 0),
}
case PrCommentAdded:
return &scm.EventData{
Expand All @@ -121,6 +124,7 @@ func ParseEvent(request *http.Request) *scm.EventData {
Ref: fmt.Sprintf(pullRefTemplate, payload.PullRequest.ID),
Comment: payload.Comment.Text,
CommitSHA: payload.PullRequest.FromRef.LatestCommit,
CreatedAt: time.Unix(payload.PullRequest.CreatedDate/1000, 0),
}
default:
log.Warningln("Skip unsupported Bitbucket Server event")
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/biz/scm/github/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,7 @@ func ParseEvent(scmCfg *v1alpha1.SCMSource, request *http.Request) *scm.EventDat
Ref: fmt.Sprintf(pullRefTemplate, *event.PullRequest.Number),
CommitSHA: commitSHA,
Branch: *event.PullRequest.Base.Ref,
CreatedAt: event.GetPullRequest().GetUpdatedAt(),
}
case *github.IssueCommentEvent:
if event.Issue.PullRequestLinks == nil {
Expand Down Expand Up @@ -654,6 +655,7 @@ func ParseEvent(scmCfg *v1alpha1.SCMSource, request *http.Request) *scm.EventDat
Ref: fmt.Sprintf(pullRefTemplate, issueNumber),
Comment: *event.Comment.Body,
CommitSHA: commitSHA,
CreatedAt: event.GetComment().GetCreatedAt(),
}
case *github.PushEvent:
if event.After != nil && *event.After == "0000000000000000000000000000000000000000" {
Expand Down
13 changes: 13 additions & 0 deletions pkg/server/biz/scm/gitlab/gitlab.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"net/http"
"reflect"
"strings"
"time"

"github.com/caicloud/nirvana/log"
"github.com/xanzy/go-gitlab"
Expand Down Expand Up @@ -459,6 +460,16 @@ func parseWebhook(r *http.Request) (payload interface{}, err error) {
return payload, nil
}

const timeLayout = "2006-01-02 15:04:05 MST"

func parseTime(s string) time.Time {
t, err := time.Parse(timeLayout, s)
if err != nil {
log.Warningf("Failed to parse time(%s): %v", s, err)
}
return t
}

// ParseEvent parses data from Gitlab events.
func ParseEvent(request *http.Request) *scm.EventData {
event, err := parseWebhook(request)
Expand Down Expand Up @@ -491,6 +502,7 @@ func ParseEvent(request *http.Request) *scm.EventData {
Ref: fmt.Sprintf(mergeRefTemplate, objectAttributes.Iid, objectAttributes.TargetBranch),
CommitSHA: objectAttributes.LastCommit.ID,
Branch: objectAttributes.TargetBranch,
CreatedAt: parseTime(objectAttributes.UpdatedAt),
}
case *MergeCommentEvent:
if event.MergeRequest == nil {
Expand All @@ -503,6 +515,7 @@ func ParseEvent(request *http.Request) *scm.EventData {
Ref: fmt.Sprintf(mergeRefTemplate, event.MergeRequest.IID, event.MergeRequest.TargetBranch),
Comment: event.ObjectAttributes.Note,
CommitSHA: event.MergeRequest.LastCommit.ID,
CreatedAt: parseTime(event.ObjectAttributes.CreatedAt),
}
case *gitlab.PushEvent:
if event.After == "0000000000000000000000000000000000000000" {
Expand Down
15 changes: 9 additions & 6 deletions pkg/server/biz/scm/scm.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package scm
import (
"fmt"
"strings"
"time"

"github.com/caicloud/nirvana/log"

Expand Down Expand Up @@ -172,12 +173,14 @@ const (

// EventData represents the data parsed from SCM events.
type EventData struct {
Type EventType
Repo string
Ref string
Branch string
Comment string
CommitSHA string
Type EventType
Repo string
Ref string
Branch string
Comment string
CommitSHA string
// CreatedAt is the time this event gets triggered at.
CreatedAt time.Time
ChangedFiles []string
}

Expand Down
159 changes: 121 additions & 38 deletions pkg/server/handler/v1alpha1/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"time"

"github.com/caicloud/nirvana/log"
"github.com/caicloud/nirvana/service"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/caicloud/cyclone/pkg/server/biz/scm/svn"
"github.com/caicloud/cyclone/pkg/server/common"
"github.com/caicloud/cyclone/pkg/server/handler"
"github.com/caicloud/cyclone/pkg/util"
"github.com/caicloud/cyclone/pkg/util/cerr"
)

Expand All @@ -31,6 +33,27 @@ const (
ignoredMsg = "Is ignored"
)

type task struct {
Tenant string
Trigger v1alpha1.WorkflowTrigger
EventData *scm.EventData
}

var taskQueue = make(chan task, 10)

func init() {
go webhookWorker(taskQueue)
}

func webhookWorker(tasks <-chan task) {
for t := range tasks {
err := createWorkflowRun(t.Tenant, t.Trigger, t.EventData)
if err != nil {
log.Errorf("wft %s create workflow run error:%v", t.Trigger.Name, err)
}
}
}

func newWebhookResponse(msg string) api.WebhookResponse {
return api.WebhookResponse{
Message: msg,
Expand Down Expand Up @@ -74,6 +97,8 @@ func HandleWebhook(ctx context.Context, tenant, eventType, integration string) (
if data == nil {
return newWebhookResponse(ignoredMsg), nil
}
// convert the time to UTC timezone
data.CreatedAt = data.CreatedAt.UTC()

wfts, err := hook.ListSCMWfts(tenant, data.Repo, integration)
if err != nil {
Expand All @@ -83,10 +108,12 @@ func HandleWebhook(ctx context.Context, tenant, eventType, integration string) (
triggeredWfts := make([]string, 0)
for _, wft := range wfts.Items {
log.Infof("Trigger workflow trigger %s", wft.Name)
triggeredWfts = append(triggeredWfts, wft.Name)
if err = createWorkflowRun(tenant, wft, data); err != nil {
log.Errorf("wft %s create workflow run error:%v", wft.Name, err)
taskQueue <- task{
Tenant: tenant,
Trigger: wft,
EventData: data,
}
triggeredWfts = append(triggeredWfts, wft.Name)
}
if len(triggeredWfts) > 0 {
return newWebhookResponse(fmt.Sprintf("%s: %s", succeededMsg, triggeredWfts)), nil
Expand All @@ -95,6 +122,21 @@ func HandleWebhook(ctx context.Context, tenant, eventType, integration string) (
return newWebhookResponse(ignoredMsg), nil
}

func sanitizeRef(ref string) string {
ret := make([]rune, 0, len(ref))
for _, ch := range ref {
switch {
case ch >= 'a' && ch <= 'z':
case ch >= 'A' && ch <= 'Z':
case ch >= '0' && ch <= '9':
default:
ch = '_'
}
ret = append(ret, ch)
}
return string(ret)
}

func createWorkflowRun(tenant string, wft v1alpha1.WorkflowTrigger, data *scm.EventData) error {
ns := wft.Namespace
cancelPrevious := false
Expand Down Expand Up @@ -191,6 +233,62 @@ func createWorkflowRun(tenant string, wft v1alpha1.WorkflowTrigger, data *scm.Ev
return nil
}

cycloneClient := handler.K8sClient.CycloneV1alpha1()
ctx := context.TODO()
skipCurrent := false
ref := sanitizeRef(data.Ref)

var currentWfrs []v1alpha1.WorkflowRun

if cancelPrevious {
wfrs, err := cycloneClient.WorkflowRuns(ns).List(metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s,%s=%s,%s=%s",
meta.LabelProjectName, project,
meta.LabelWorkflowName, wfName,
meta.LabelWorkflowRunPRRef, ref),
})
if err != nil {
log.Warningf("Fail to list previous WorkflowRuns: %v. project=%s, workflow=%s, ref=%s, trigger=%s", err,
project, wfName, ref, data.Type)
} else if !data.CreatedAt.IsZero() {
for _, item := range wfrs.Items {
if len(item.Annotations) == 0 {
continue
}
evtType := scm.EventType(item.Annotations[meta.AnnotationWorkflowRunTrigger])
if util.IsWorkflowRunTerminated(&item) ||
(evtType != scm.PullRequestEventType && evtType != scm.PullRequestCommentEventType) {
continue
}

currentWfrs = append(currentWfrs, item)

updatedAtStr := item.Annotations[meta.AnnotationWorkflowRunPRUpdatedAt]
if len(updatedAtStr) == 0 {
continue
}
updatedAt, err := time.Parse(time.RFC3339, updatedAtStr)
if err != nil {
log.Warningf("Fail to parse pr-updated-at: %s: %v. ns=%s, wfr=%s", updatedAtStr, err,
item.Namespace, item.Name)
continue
}
if updatedAt.After(data.CreatedAt) {
skipCurrent = true
log.Infof("There is already running workflowRun for PR %s. Ignore this event. Existing wfr is %s/%s", data.Ref, item.Namespace, item.Name)
break
}
}
} else {
log.Infof("The update time of PR %s/%s is unknown. Turn off canceling previous builds.", data.Repo, data.Ref)
cancelPrevious = false
}
}

if skipCurrent {
return nil
}

log.Infof("Trigger wft %s with event data: %v", wft.Name, data)

name := fmt.Sprintf("%s-%s", wfName, rand.String(5))
Expand All @@ -200,13 +298,15 @@ func createWorkflowRun(tenant string, wft v1alpha1.WorkflowTrigger, data *scm.Ev
ObjectMeta: metav1.ObjectMeta{
Name: name,
Annotations: map[string]string{
meta.AnnotationWorkflowRunTrigger: string(data.Type),
meta.AnnotationAlias: name,
meta.AnnotationWorkflowRunPRUpdatedAt: data.CreatedAt.Format(time.RFC3339),
meta.AnnotationWorkflowRunTrigger: string(data.Type),
meta.AnnotationAlias: name,
},
Labels: map[string]string{
meta.LabelProjectName: project,
meta.LabelWorkflowName: wfName,
meta.LabelWorkflowRunAcceleration: wft.Labels[meta.LabelWorkflowRunAcceleration],
meta.LabelWorkflowRunPRRef: ref,
},
},
Spec: wft.Spec.WorkflowRunSpec,
Expand Down Expand Up @@ -240,49 +340,32 @@ func createWorkflowRun(tenant string, wft v1alpha1.WorkflowTrigger, data *scm.Ev
}

accelerator.NewAccelerator(tenant, project, wfr).Accelerate()
cycloneClient := handler.K8sClient.CycloneV1alpha1()
_, err = cycloneClient.WorkflowRuns(ns).Create(wfr)
if err != nil {
return cerr.ConvertK8sError(err)
}

// Init pull-request status to pending
wfrCopy := wfr.DeepCopy()
wfrCopy.Status.Overall.Phase = v1alpha1.StatusRunning
err = updatePullRequestStatus(wfrCopy)
if err != nil {
log.Warningf("Init pull request status for %s error: %v", wfr.Name, err)
}
go func(wfrCopy *v1alpha1.WorkflowRun) {
// Init pull-request status to pending
wfrCopy.Status.Overall.Phase = v1alpha1.StatusRunning
err = updatePullRequestStatus(wfrCopy)
if err != nil {
log.Warningf("Init pull request status for %s error: %v", wfr.Name, err)
}
}(wfr.DeepCopy())

if !cancelPrevious {
return nil
}
wfrs, err := cycloneClient.WorkflowRuns(ns).List(metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s,%s=%s",
meta.LabelProjectName, project,
meta.LabelWorkflowName, wfName),
})
if err == nil {
log.Infof("Trying to cancel %d previous builds", len(wfrs.Items))
for _, item := range wfrs.Items {
if item.Annotations == nil {
continue
}
evtType := scm.EventType(item.Annotations[meta.AnnotationWorkflowRunTrigger])
if (evtType != scm.PullRequestEventType && evtType != scm.PullRequestCommentEventType) ||
// skip the WorkflowRun created above
item.Name == name {
continue
}
_, err := stopWorkflowRun(context.TODO(), &item, "AutoCancelPreviousBuild")
if err != nil {
log.Warningf("Stop previous WorkflowRun: %v. project=%s, workflow=%s, trigger=%s, name=%s", err,
project, wfName, data.Type, item.Name)
}

log.Infof("Trying to cancel %d previous builds for PR %s. repo=%s", len(currentWfrs), data.Ref, data.Repo)
for _, item := range currentWfrs {
_, err := stopWorkflowRun(ctx, &item, "AutoCancelPreviousBuild")
if err != nil {
log.Warningf("Fail to stop previous WorkflowRun %s/%s: %v. trigger=%s", err,
item.Namespace, item.Name, data.Type)
}
} else {
log.Warningf("Fail to list previous WorkflowRuns: %v. project=%s, workflow=%s, trigger=%s", err,
project, wfName, data.Type)
}

return nil
}