Skip to content

Commit

Permalink
Start emitting CloudEvents for TaskRuns
Browse files Browse the repository at this point in the history
Add a new method 'SendCloudEventWithRetries' to the cloud events
controller. It that allows emitting cloud events asynchronously
(in a go routine), taking benefit of the retries capabilities of
the cloudevents go sdk.

Rework the fake client to allow for unit testing of
'SendCloudEventWithRetries'. The new implementation is similar
to that of the fake recorder from client-go, which allows to
unit test k8s events and cloud events in a similar fashion.

Add a new config option default-cloud-events-sink in the
defaults config map. This options allows setting a default sink
for cloud events. If the default sink is setup, cloud events are
sent, else they're disabled.

Invoke 'SendCloudEventWithRetries' from the events modules so
that we start sending cloud events in the same places we were
send k8s events (except for errors).

Most of the plumbing is in place to start emitting cloud events
for pipeline runs too, however there is a small refactor for
the config maps required before we can add that, so this commit
only enabled cloud events for task runs for now.
  • Loading branch information
afrittoli authored and tekton-robot committed Jul 2, 2020
1 parent 08ac72c commit 3866374
Show file tree
Hide file tree
Showing 16 changed files with 607 additions and 77 deletions.
9 changes: 8 additions & 1 deletion config/config-defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,11 @@ data:
# default-pod-template contains the default pod template to use
# TaskRun and PipelineRun, if none is specified. If a pod template
# is specified, the default pod template is ignored.
# default-pod-template:
# default-pod-template:
# default-cloud-events-sink contains the default CloudEvents sink to be
# used for TaskRun and PipelineRun, when no sink is specified.
# Note that right now it is still not possible to set a PipelineRun or
# TaskRun specific sink, so the default is the only option available.
# If no sink is specified, no CloudEvent is generated
# default-cloud-events-sink:
27 changes: 25 additions & 2 deletions docs/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ weight: 2

Tekton runtime resources, specifically `TaskRuns` and `PipelineRuns`,
emit events when they are executed, so that users can monitor their lifecycle
and react to it. Tekton emits [kubernetes events](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#event-v1-core), that can be retrieve from the resource via
`kubectl describe [resource]`.
and react to it.

Tekton emits [kubernetes events](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.18/#event-v1-core), that can be retrieve from the resource via `kubectl describe [resource]`.
[Optionally](#CloudEvents) Tekton can emit [CloudEvents](https://github.com/cloudevents/spec) too.

No events are emitted for `Conditions` today (https://github.com/tektoncd/pipeline/issues/2461).

Expand Down Expand Up @@ -47,3 +49,24 @@ No events are emitted for `Conditions` today (https://github.com/tektoncd/pipeli
successfully. Causes of failure may be: one the `Tasks` failed or the
`PipelineRun` was cancelled or timed out. `Failed` events are also triggered
in case the `PipelineRun` cannot be executed at all because of validation issues.

# CloudEvents

When a sink is [configured](../install.md#configuring-cloudevents-notifications), the following events
will be generated by appropriate controller when a lifecycle event happens for `TaskRun` or
`PipelineRun`.

The complete table of events:

Reasource |Event |Event Type
:-------------|:-------:|:----------------------------------------------------------
TaskRun | Started | dev.tekton.event.taskrun.started.v1
TaskRun | Running | dev.tekton.event.taskrun.runnning.v1
TaskRun | Condition Change while Running | dev.tekton.event.taskrun.unknown.v1
TaskRun | Succeed | dev.tekton.event.taskrun.successful.v1
TaskRun | Failed | dev.tekton.event.taskrun.failed.v1
PipelineRun | Started | dev.tekton.event.pipelinerun.started.v1
PipelineRun | Running | dev.tekton.event.pipelinerun.runnning.v1
PipelineRun | Condition Change while Running | dev.tekton.event.pipelinerun.unknown.v1
PipelineRun | Succeed | dev.tekton.event.pipelinerun.successful.v1
PipelineRun | Failed | dev.tekton.event.pipelinerun.failed.v1
19 changes: 19 additions & 0 deletions docs/install.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,25 @@ data:
bucket.service.account.field.name: GOOGLE_APPLICATION_CREDENTIALS
```
## Configuring CloudEvents notifications
When configured so, Tekton can generate `CloudEvents` for `TaskRun` and `PipelineRun` lifecycle
events. The only configuration parameter is the URL of the sink. When not set, no notification is
generared.
```
apiVersion: v1
kind: ConfigMap
metadata:
name: config-defaults
namespace: tekton-pipelines
labels:
app.kubernetes.io/instance: default
app.kubernetes.io/part-of: tekton-pipelines
data:
default-cloud-events-sink: https://my-sink-url
```
## Customizing basic execution parameters
You can specify your own values that replace the default service account (`ServiceAccount`), timeout (`Timeout`), and Pod template (`PodTemplate`) values used by Tekton Pipelines in `TaskRun` and `PipelineRun` definitions. To do so, modify the ConfigMap `config-defaults` with your desired values.
Expand Down
11 changes: 10 additions & 1 deletion pkg/apis/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const (
defaultManagedByLabelValueKey = "default-managed-by-label-value"
DefaultManagedByLabelValue = "tekton-pipelines"
defaultPodTemplateKey = "default-pod-template"
defaultCloudEventsSinkKey = "default-cloud-events-sink"
DefaultCloudEventSinkValue = ""
)

// Defaults holds the default configurations
Expand All @@ -44,6 +46,7 @@ type Defaults struct {
DefaultServiceAccount string
DefaultManagedByLabelValue string
DefaultPodTemplate *pod.Template
DefaultCloudEventsSink string
}

// GetBucketConfigName returns the name of the configmap containing all
Expand All @@ -68,14 +71,16 @@ func (cfg *Defaults) Equals(other *Defaults) bool {
return other.DefaultTimeoutMinutes == cfg.DefaultTimeoutMinutes &&
other.DefaultServiceAccount == cfg.DefaultServiceAccount &&
other.DefaultManagedByLabelValue == cfg.DefaultManagedByLabelValue &&
other.DefaultPodTemplate.Equals(cfg.DefaultPodTemplate)
other.DefaultPodTemplate.Equals(cfg.DefaultPodTemplate) &&
other.DefaultCloudEventsSink == cfg.DefaultCloudEventsSink
}

// NewDefaultsFromMap returns a Config given a map corresponding to a ConfigMap
func NewDefaultsFromMap(cfgMap map[string]string) (*Defaults, error) {
tc := Defaults{
DefaultTimeoutMinutes: DefaultTimeoutMinutes,
DefaultManagedByLabelValue: DefaultManagedByLabelValue,
DefaultCloudEventsSink: DefaultCloudEventSinkValue,
}

if defaultTimeoutMin, ok := cfgMap[defaultTimeoutMinutesKey]; ok {
Expand All @@ -102,6 +107,10 @@ func NewDefaultsFromMap(cfgMap map[string]string) (*Defaults, error) {
tc.DefaultPodTemplate = &podTemplate
}

if defaultCloudEventsSink, ok := cfgMap[defaultCloudEventsSinkKey]; ok {
tc.DefaultCloudEventsSink = defaultCloudEventsSink
}

return &tc, nil
}

Expand Down
42 changes: 42 additions & 0 deletions pkg/reconciler/events/cloudevent/cloud_event_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cloudevent

import (
"context"
"errors"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
Expand All @@ -26,7 +27,11 @@ import (
resource "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1"
"github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1/cloudevent"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
controller "knative.dev/pkg/controller"
"knative.dev/pkg/logging"
)

// InitializeCloudEvents initializes the CloudEvents part of the
Expand Down Expand Up @@ -108,3 +113,40 @@ func SendCloudEvents(tr *v1beta1.TaskRun, ceclient CEClient, logger *zap.Sugared
}
return merr.ErrorOrNil()
}

// SendCloudEventWithRetries sends a cloud event for the specified resource.
// It does not block and it perform retries with backoff using the cloudevents
// sdk-go capabilities.
// It accepts a runtime.Object to avoid making objectWithCondition public since
// it's only used within the events/cloudevents packages.
func SendCloudEventWithRetries(ctx context.Context, object runtime.Object) error {
var (
o objectWithCondition
ok bool
)
if o, ok = object.(objectWithCondition); !ok {
return errors.New("Input object does not satisfy objectWithCondition")
}
logger := logging.FromContext(ctx)
ceClient := Get(ctx)
if ceClient == nil {
return errors.New("No cloud events client found in the context")
}
event, err := EventForObjectWithCondition(o)
if err != nil {
return err
}

go func() {
if result := ceClient.Send(cloudevents.ContextWithRetriesExponentialBackoff(ctx, 10*time.Millisecond, 10), *event); !cloudevents.IsACK(result) {
logger.Warnf("Failed to send cloudevent: %s", result.Error())
recorder := controller.GetEventRecorder(ctx)
if recorder == nil {
logger.Warnf("No recorder in context, cannot emit error event")
}
recorder.Event(object, corev1.EventTypeWarning, "Cloud Event Failure", result.Error())
}
}()

return nil
}
179 changes: 179 additions & 0 deletions pkg/reconciler/events/cloudevent/cloud_event_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,26 @@ limitations under the License.
package cloudevent

import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/google/go-cmp/cmp"
tb "github.com/tektoncd/pipeline/internal/builder/v1beta1"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
resourcev1alpha1 "github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1"
"github.com/tektoncd/pipeline/pkg/logging"
"github.com/tektoncd/pipeline/test/diff"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/record"
"knative.dev/pkg/apis"
duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1"
"knative.dev/pkg/controller"
rtesting "knative.dev/pkg/reconciler/testing"
)

func TestCloudEventDeliveryFromTargets(t *testing.T) {
Expand Down Expand Up @@ -283,3 +293,172 @@ func TestInitializeCloudEvents(t *testing.T) {
})
}
}

func TestSendCloudEventWithRetries(t *testing.T) {

objectStatus := duckv1beta1.Status{
Conditions: []apis.Condition{{
Type: apis.ConditionSucceeded,
Status: corev1.ConditionTrue,
}},
}

tests := []struct {
name string
clientBehaviour FakeClientBehaviour
object objectWithCondition
wantCEvent string
wantEvent string
}{{
name: "test-send-cloud-event-taskrun",
clientBehaviour: FakeClientBehaviour{
SendSuccessfully: true,
},
object: &v1beta1.TaskRun{
ObjectMeta: metav1.ObjectMeta{
SelfLink: "/taskruns/test1",
},
Status: v1beta1.TaskRunStatus{Status: objectStatus},
},
wantCEvent: "Validation: valid",
wantEvent: "",
}, {
name: "test-send-cloud-event-pipelinerun",
clientBehaviour: FakeClientBehaviour{
SendSuccessfully: true,
},
object: &v1beta1.PipelineRun{
ObjectMeta: metav1.ObjectMeta{
SelfLink: "/pipelineruns/test1",
},
Status: v1beta1.PipelineRunStatus{Status: objectStatus},
},
wantCEvent: "Validation: valid",
wantEvent: "",
}, {
name: "test-send-cloud-event-failed",
clientBehaviour: FakeClientBehaviour{
SendSuccessfully: false,
},
object: &v1beta1.PipelineRun{
Status: v1beta1.PipelineRunStatus{Status: objectStatus},
},
wantCEvent: "",
wantEvent: "Warning Cloud Event Failure",
}}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx := setupFakeContext(t, tc.clientBehaviour, true)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
err := SendCloudEventWithRetries(ctx, tc.object)
if err != nil {
t.Fatalf("Unexpected error sending cloud events: %v", err)
}
ceClient := Get(ctx).(FakeClient)
err = checkCloudEvents(t, &ceClient, tc.name, tc.wantCEvent)
if err != nil {
t.Fatalf(err.Error())
}
recorder := controller.GetEventRecorder(ctx).(*record.FakeRecorder)
err = checkEvents(t, recorder, tc.name, tc.wantEvent)
if err != nil {
t.Fatalf(err.Error())
}
})
}
}

func TestSendCloudEventWithRetriesInvalid(t *testing.T) {

tests := []struct {
name string
object objectWithCondition
wantCEvent string
wantEvent string
}{{
name: "test-send-cloud-event-invalid-taskrun",
object: &v1beta1.TaskRun{
Status: v1beta1.TaskRunStatus{},
},
wantCEvent: "Validation: valid",
wantEvent: "",
}, {
name: "test-send-cloud-event-pipelinerun",
object: &v1beta1.PipelineRun{
Status: v1beta1.PipelineRunStatus{},
},
wantCEvent: "Validation: valid",
wantEvent: "",
}}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctx := setupFakeContext(t, FakeClientBehaviour{
SendSuccessfully: true,
}, true)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
err := SendCloudEventWithRetries(ctx, tc.object)
if err == nil {
t.Fatalf("Expected an error sending cloud events for invalid object, got none")
}
})
}
}

func TestSendCloudEventWithRetriesNoClient(t *testing.T) {

ctx := setupFakeContext(t, FakeClientBehaviour{}, false)
err := SendCloudEventWithRetries(ctx, &v1beta1.TaskRun{Status: v1beta1.TaskRunStatus{}})
if err == nil {
t.Fatalf("Expected an error sending cloud events with no client in the context, got none")
}
if d := cmp.Diff("No cloud events client found in the context", err.Error()); d != "" {
t.Fatalf("Unexpected error message %s", diff.PrintWantGot(d))
}
}

func setupFakeContext(t *testing.T, behaviour FakeClientBehaviour, withClient bool) context.Context {
var ctx context.Context
ctx, _ = rtesting.SetupFakeContext(t)
if withClient {
ctx = WithClient(ctx, &behaviour)
}
return ctx
}

func testLogger(t *testing.T) *zap.SugaredLogger {
logger, err := zap.NewDevelopment(zap.AddCaller())
if err != nil {
t.Fatalf("failed to create logger: %s", err)
}
return logger.Sugar().Named(t.Name())
}

func eventFromChannel(c chan string, testName string, wantEvent string) error {
timer := time.NewTimer(1 * time.Second)
select {
case event := <-c:
if wantEvent == "" {
return fmt.Errorf("received event \"%s\" for %s but none expected", event, testName)
}
if !(strings.HasPrefix(event, wantEvent)) {
return fmt.Errorf("expected event \"%s\" but got \"%s\" instead for %s", wantEvent, event, testName)
}
case <-timer.C:
if wantEvent != "" {
return fmt.Errorf("received no events for %s but %s expected", testName, wantEvent)
}
}
return nil
}

func checkEvents(t *testing.T, fr *record.FakeRecorder, testName string, wantEvent string) error {
t.Helper()
return eventFromChannel(fr.Events, testName, wantEvent)
}

func checkCloudEvents(t *testing.T, fce *FakeClient, testName string, wantEvent string) error {
t.Helper()
return eventFromChannel(fce.Events, testName, wantEvent)
}
3 changes: 3 additions & 0 deletions pkg/reconciler/events/cloudevent/cloudevent.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,9 @@ func EventForPipelineRun(pipelineRun *v1beta1.PipelineRun) (*cloudevents.Event,

func getEventType(runObject objectWithCondition) (*TektonEventType, error) {
c := runObject.GetStatusCondition().GetCondition(apis.ConditionSucceeded)
if c == nil {
return nil, fmt.Errorf("no condition for ConditionSucceeded in %T", runObject)
}
var eventType TektonEventType
switch {
case c.IsUnknown():
Expand Down
Loading

0 comments on commit 3866374

Please sign in to comment.