Skip to content

Commit

Permalink
Merge eae0baa into cb4ac63
Browse files Browse the repository at this point in the history
  • Loading branch information
knight42 committed Dec 29, 2020
2 parents cb4ac63 + eae0baa commit a4e1c05
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 12 deletions.
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,12 @@ push:
docker push ${REGISTRY}/$${imageName}:$(VERSION); \
done

push-local: container-local
@for image in $(IMAGES); do \
imageName=$(IMAGE_PREFIX)$${image/\//-}$(IMAGE_SUFFIX); \
docker push ${REGISTRY}/$${imageName}:$(VERSION); \
done

gen: clean-generated
@./hack/update-codegen.sh
sed -i 's|v1alpha1.Resource(|v1alpha1.GroupResource(|' ./pkg/k8s/listers/cyclone/v1alpha1/*.go
Expand Down
5 changes: 4 additions & 1 deletion pkg/server/handler/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,15 @@ type JSONPatch struct {
}

// BuildWfrStatusPatch builds patch for updating status of workflowrun
func BuildWfrStatusPatch(statusPhase v1alpha1.StatusPhase) ([]byte, error) {
func BuildWfrStatusPatch(statusPhase v1alpha1.StatusPhase, reason string) ([]byte, error) {
now := metav1.Time{Time: time.Now()}

p := map[string]string{
"/status/overall/phase": string(statusPhase),
"/status/overall/lastTransitionTime": now.UTC().Format(time.RFC3339),
}
if len(reason) > 0 {
p["/status/overall/reason"] = reason
}
return BuildPatch(p)
}
40 changes: 39 additions & 1 deletion pkg/server/handler/v1alpha1/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func HandleWebhook(ctx context.Context, tenant, eventType, integration string) (

func createWorkflowRun(tenant string, wft v1alpha1.WorkflowTrigger, data *scm.EventData) error {
ns := wft.Namespace
cancelPrevious := false
var err error
var project string
if wft.Labels != nil {
Expand Down Expand Up @@ -144,19 +145,22 @@ func createWorkflowRun(tenant string, wft v1alpha1.WorkflowTrigger, data *scm.Ev
// Always trigger if Branches are not specified
if len(st.PullRequest.Branches) == 0 {
trigger = true
cancelPrevious = true
break
}

for _, branch := range st.PullRequest.Branches {
if branch == data.Branch {
trigger = true
cancelPrevious = true
break
}
}
case scm.PullRequestCommentEventType:
for _, comment := range st.PullRequestComment.Comments {
if comment == data.Comment {
trigger = true
cancelPrevious = true
break
}
}
Expand Down Expand Up @@ -208,6 +212,8 @@ func createWorkflowRun(tenant string, wft v1alpha1.WorkflowTrigger, data *scm.Ev
Spec: wft.Spec.WorkflowRunSpec,
}

cycloneClient := handler.K8sClient.CycloneV1alpha1()

wfr.Annotations, err = setSCMEventData(wfr.Annotations, data)
if err != nil {
return err
Expand Down Expand Up @@ -235,8 +241,9 @@ func createWorkflowRun(tenant string, wft v1alpha1.WorkflowTrigger, data *scm.Ev
}
}

ctx := context.TODO()
accelerator.NewAccelerator(tenant, project, wfr).Accelerate()
_, err = handler.K8sClient.CycloneV1alpha1().WorkflowRuns(ns).Create(context.TODO(), wfr, metav1.CreateOptions{})
_, err = cycloneClient.WorkflowRuns(ns).Create(ctx, wfr, metav1.CreateOptions{})
if err != nil {
return cerr.ConvertK8sError(err)
}
Expand All @@ -248,5 +255,36 @@ func createWorkflowRun(tenant string, wft v1alpha1.WorkflowTrigger, data *scm.Ev
if err != nil {
log.Warningf("Init pull request status for %s error: %v", wfr.Name, err)
}

if !cancelPrevious {
return nil
}
wfrs, err := cycloneClient.WorkflowRuns(ns).List(ctx, metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s,%s=%s",
meta.LabelProjectName, project,
meta.LabelWorkflowName, wfName),
})
if err == nil {
log.Infof("Trying to cancel %d previous builds", len(wfrs.Items))
for _, item := range wfrs.Items {
if item.Annotations == nil {
continue
}
evtType := scm.EventType(item.Annotations[meta.AnnotationWorkflowRunTrigger])
if (evtType != scm.PullRequestEventType && evtType != scm.PullRequestCommentEventType) ||
// skip the WorkflowRun created above
item.Name == name {
continue
}
_, err := stopWorkflowRun(ctx, &item, "AutoCancelPreviousBuild")
if err != nil {
log.Warningf("Stop previous WorkflowRun: %v. project=%s, workflow=%s, trigger=%s, name=%s", err,
project, wfName, data.Type, item.Name)
}
}
} else {
log.Warningf("Fail to list previous WorkflowRuns: %v. project=%s, workflow=%s, trigger=%s", err,
project, wfName, data.Type)
}
return nil
}
22 changes: 14 additions & 8 deletions pkg/server/handler/v1alpha1/workflowrun.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,25 +279,31 @@ func StopWorkflowRun(ctx context.Context, project, workflow, workflowrun, tenant
return nil, cerr.ConvertK8sError(err)
}

wfr, err = stopWorkflowRun(ctx, wfr, "ManuallyStop")
if err != nil {
log.Errorf("Stop WorkflowRun %s error %s", workflowrun, err)
return nil, cerr.ConvertK8sError(err)
}
return wfr, nil
}

func stopWorkflowRun(ctx context.Context, wfr *v1alpha1.WorkflowRun, reason string) (*v1alpha1.WorkflowRun, error) {
// If wfr already in terminated state, skip it
if util.IsWorkflowRunTerminated(wfr) {
return wfr, nil
}

data, err := handler.BuildWfrStatusPatch(v1alpha1.StatusCancelled)
data, err := handler.BuildWfrStatusPatch(v1alpha1.StatusCancelled, reason)
if err != nil {
log.Errorf("Stop WorkflowRun %s error %s", workflowrun, err)
return nil, err
}

wfr, err = handler.K8sClient.CycloneV1alpha1().WorkflowRuns(common.TenantNamespace(tenant)).Patch(context.TODO(), workflowrun, k8s_types.JSONPatchType, data, metav1.PatchOptions{})

return wfr, cerr.ConvertK8sError(err)
wfr, err = handler.K8sClient.CycloneV1alpha1().WorkflowRuns(wfr.Namespace).Patch(ctx, wfr.Name, k8s_types.JSONPatchType, data, metav1.PatchOptions{})
return wfr, err
}

// PauseWorkflowRun updates the workflowrun overall status to Waiting.
func PauseWorkflowRun(ctx context.Context, project, workflow, workflowrun, tenant string) (*v1alpha1.WorkflowRun, error) {
data, err := handler.BuildWfrStatusPatch(v1alpha1.StatusWaiting)
data, err := handler.BuildWfrStatusPatch(v1alpha1.StatusWaiting, "ManuallyPause")
if err != nil {
log.Errorf("pause workflowrun %s error %s", workflowrun, err)
return nil, err
Expand All @@ -310,7 +316,7 @@ func PauseWorkflowRun(ctx context.Context, project, workflow, workflowrun, tenan

// ResumeWorkflowRun updates the workflowrun overall status to Running.
func ResumeWorkflowRun(ctx context.Context, project, workflow, workflowrun, tenant string) (*v1alpha1.WorkflowRun, error) {
data, err := handler.BuildWfrStatusPatch(v1alpha1.StatusRunning)
data, err := handler.BuildWfrStatusPatch(v1alpha1.StatusRunning, "ManuallyResume")
if err != nil {
log.Errorf("continue workflowrun %s error %s", workflowrun, err)
return nil, err
Expand Down
11 changes: 11 additions & 0 deletions pkg/util/k8s/client.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,25 @@
package k8s

import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"

"github.com/caicloud/cyclone/pkg/k8s/clientset"
cyscheme "github.com/caicloud/cyclone/pkg/k8s/clientset/scheme"
cyclonev1alpha1 "github.com/caicloud/cyclone/pkg/k8s/clientset/typed/cyclone/v1alpha1"
)

// Scheme consists of kubernetes and cyclone scheme.
var Scheme = runtime.NewScheme()

func init() {
_ = cyscheme.AddToScheme(Scheme)
_ = scheme.AddToScheme(Scheme)
}

// Interface consists of kubernetes and cyclone interfaces
type Interface interface {
kubernetes.Interface
Expand Down
3 changes: 1 addition & 2 deletions pkg/workflow/common/event_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"

Expand All @@ -30,7 +29,7 @@ func GetEventRecorder(client k8s.Interface, component string) record.EventRecord
log.Info("Creating event recorder")
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: client.CoreV1().Events("")})
eventRecorder = broadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: component})
eventRecorder = broadcaster.NewRecorder(k8s.Scheme, corev1.EventSource{Component: component})
})

return eventRecorder
Expand Down

0 comments on commit a4e1c05

Please sign in to comment.