Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable storing of Run Events as Record #748

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/watcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ var (
namespace = flag.String("namespace", corev1.NamespaceAll, "Should the Watcher only watch a single namespace, then this value needs to be set to the namespace name otherwise leave it empty.")
checkOwner = flag.Bool("check_owner", true, "If enabled, owner references will be checked while deleting objects")
updateLogTimeout = flag.Duration("update_log_timeout", 30*time.Second, "How log the Watcher waits for the UpdateLog operation for storing logs to complete before it aborts.")
storeEvent = flag.Bool("store_event", true, "If enabled, events related to runs will also be stored")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be off by default

events are even more numerous than secrets and config map on a k8s cluster - we should not impact performance by default

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be false. It was enabled for testing. Will change it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bump @khrm - you still need to change this default

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 should be off by default

)

func main() {
Expand Down Expand Up @@ -101,6 +102,7 @@ func main() {
RequeueInterval: *requeueInterval,
CheckOwner: *checkOwner,
UpdateLogTimeout: updateLogTimeout,
StoreEvent: *storeEvent,
}

if selector := *labelSelector; selector != "" {
Expand Down
2 changes: 1 addition & 1 deletion config/base/100-watcher-serviceaccount.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ rules:
# Watcher currently get config from APISever, so will
# fail to start if it does not have this permission.
- apiGroups: [""]
resources: ["configmaps", "pods"]
resources: ["configmaps", "pods", "events"]
verbs: ["get", "list", "watch"]
# Required to read logs, when logs API is enabled
- apiGroups: [""]
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/v1alpha2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"k8s.io/apimachinery/pkg/types"
)

// EventListRecordType represents the API resource type for EventSet records.
const EventListRecordType = "results.tekton.dev/v1alpha2.EventList"

// LogRecordType represents the API resource type for Tekton log records.
const LogRecordType = "results.tekton.dev/v1alpha2.Log"

Expand Down
3 changes: 3 additions & 0 deletions pkg/watcher/reconciler/annotation/annotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ const (
// Log identifier.
Log = "results.tekton.dev/log"

// EventList identifier.
EventList = "results.tekton.dev/eventlist"

// ResultAnnotations is an annotation that integrators should add to objects in order to store
// arbitrary keys/values into the Result.Annotations field.
ResultAnnotations = "results.tekton.dev/resultAnnotations"
Expand Down
3 changes: 3 additions & 0 deletions pkg/watcher/reconciler/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type Config struct {

// UpdateLogTimeout is the time we provide for storing logs before aborting
UpdateLogTimeout *time.Duration

// Whether to Store Events related to Taskrun and Pipelineruns
StoreEvent bool
}

// GetDisableAnnotationUpdate returns whether annotation updates should be
Expand Down
89 changes: 88 additions & 1 deletion pkg/watcher/reconciler/dynamic/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package dynamic
import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"runtime/pprof"
Expand Down Expand Up @@ -45,6 +46,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"knative.dev/pkg/apis"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
Expand All @@ -57,6 +59,9 @@ var (
// Reconciler implements common reconciler behavior across different Tekton Run
// Object types.
type Reconciler struct {
// KubeClientSet allows us to talk to the k8s for core APIs
KubeClientSet kubernetes.Interface

resultsClient *results.Client
objectClient ObjectClient
cfg *reconciler.Config
Expand All @@ -82,9 +87,10 @@ type IsReadyForDeletion func(ctx context.Context, object results.Object) (bool,
type AfterDeletion func(ctx context.Context, object results.Object) error

// NewDynamicReconciler creates a new dynamic Reconciler.
func NewDynamicReconciler(rc pb.ResultsClient, lc pb.LogsClient, oc ObjectClient, cfg *reconciler.Config) *Reconciler {
func NewDynamicReconciler(kubeClientSet kubernetes.Interface, rc pb.ResultsClient, lc pb.LogsClient, oc ObjectClient, cfg *reconciler.Config) *Reconciler {
return &Reconciler{
resultsClient: results.NewClient(rc, lc),
KubeClientSet: kubeClientSet,
objectClient: oc,
cfg: cfg,
// Always true predicate.
Expand Down Expand Up @@ -213,6 +219,26 @@ func (r *Reconciler) Reconcile(ctx context.Context, o results.Object) error {
}
}

// CreateEvents if enabled
if r.cfg.StoreEvent {
if err := r.storeEvents(ctx, o); err != nil {
logger.Errorw("Error storing eventlist",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
zap.Error(err),
)
if ctxCancel != nil {
ctxCancel()
}
return err
}
logger.Debugw("Successfully store eventlist",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
)
}
logger = logger.With(zap.String("results.tekton.dev/result", res.Name),
zap.String("results.tekton.dev/record", rec.Name))
logger.Debugw("Record has been successfully upserted into API server", timeTakenField)
Expand Down Expand Up @@ -572,3 +598,64 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType,

return nil
}

// storeEvents streams logs to the API server
func (r *Reconciler) storeEvents(ctx context.Context, o results.Object) error {
logger := logging.FromContext(ctx)
condition := o.GetStatusCondition().GetCondition(apis.ConditionSucceeded)
GVK := o.GetObjectKind().GroupVersionKind()
if !GVK.Empty() &&
(GVK.Kind == "TaskRun" || GVK.Kind == "PipelineRun") &&
condition != nil &&
condition.Type == "Succeeded" &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, we are only storing events in case of Success. Should this include more conditions?

!condition.IsUnknown() {

rec, err := r.resultsClient.GetEventListRecord(ctx, o)
if err != nil {
return err
}

if rec != nil {
// It means we have already stored events
eventListName := rec.GetName()
// Update Events annotation if it doesn't exist
return r.addResultsAnnotations(ctx, o, annotation.Annotation{Name: annotation.EventList, Value: eventListName})
}

events, err := r.KubeClientSet.CoreV1().Events(o.GetNamespace()).List(ctx, metav1.ListOptions{
FieldSelector: "involvedObject.uid=" + string(o.GetUID()),
})
if err != nil {
logger.Errorf("Failed to store events - retrieve",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
zap.String("err", err.Error()),
)
return err
}

eventList, err := json.Marshal(events)
if err != nil {
logger.Errorf("Failed to store events - marshal",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
zap.String("err", err.Error()),
)
return err
}

rec, err = r.resultsClient.PutEventList(ctx, o, eventList)
if err != nil {
return err
}

if err := r.addResultsAnnotations(ctx, o, annotation.Annotation{Name: annotation.EventList, Value: rec.GetName()}); err != nil {
return err
}

}

return nil
}
37 changes: 34 additions & 3 deletions pkg/watcher/reconciler/dynamic/dynamic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sTest "k8s.io/client-go/kubernetes/fake"
"knative.dev/pkg/apis"
duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1"
"knative.dev/pkg/controller"
Expand Down Expand Up @@ -138,9 +139,12 @@ func TestReconcile_TaskRun(t *testing.T) {
cfg := &reconciler.Config{
DisableAnnotationUpdate: true,
RequeueInterval: 1 * time.Second,
StoreEvent: true,
}

r := NewDynamicReconciler(resultsClient, logsClient, trclient, cfg)
client := k8sTest.NewSimpleClientset()

r := NewDynamicReconciler(client, resultsClient, logsClient, trclient, cfg)
if err := r.Reconcile(ctx, taskrun); err != nil {
t.Fatal(err)
}
Expand All @@ -164,10 +168,14 @@ func TestReconcile_TaskRun(t *testing.T) {
if err != nil {
t.Fatalf("Error parsing result uid: %v", err)
}
logRecordName := record.FormatName(resultName, uuid.NewMD5(uid, []byte(taskrun.GetUID())).String())
logRecordName := record.FormatName(resultName, uuid.NewMD5(uid, []byte(taskrun.GetUID()+"eventlist")).String())
if _, err := resultsClient.GetRecord(ctx, &pb.GetRecordRequest{Name: logRecordName}); err != nil {
t.Fatalf("Error getting log record: %v", err)
}
eventListName := watcherresults.FormatEventListName(resultName, uid, taskrun)
if _, err := resultsClient.GetRecord(ctx, &pb.GetRecordRequest{Name: eventListName}); err != nil {
t.Fatalf("Error getting eventlist %s: record: %v", eventListName, err)
}
})

// Enable Annotation Updates, re-reconcile
Expand Down Expand Up @@ -204,6 +212,14 @@ func TestReconcile_TaskRun(t *testing.T) {
if _, err := resultsClient.GetRecord(ctx, &pb.GetRecordRequest{Name: logRecordName}); err != nil {
t.Fatalf("Error getting log record '%s': %v", logRecordName, err)
}
eventListName := tr.GetAnnotations()[annotation.EventList]
if eventListName == "" {
t.Fatalf("Error parsing eventlist name '%s'", eventListName)
}
if _, err := resultsClient.GetRecord(ctx, &pb.GetRecordRequest{Name: eventListName}); err != nil {
t.Fatalf("Error getting eventlist record '%s': %v", eventListName, err)
}

})

t.Run("delete object once grace period elapses", func(t *testing.T) {
Expand Down Expand Up @@ -428,8 +444,13 @@ func TestReconcile_PipelineRun(t *testing.T) {
if _, err := prclient.Create(ctx, pipelinerun, metav1.CreateOptions{}); err != nil {
t.Fatal(err)
}
cfg := &reconciler.Config{
StoreEvent: true,
}

client := k8sTest.NewSimpleClientset()

r := NewDynamicReconciler(resultsClient, logsClient, prclient, nil)
r := NewDynamicReconciler(client, resultsClient, logsClient, prclient, cfg)
if err := r.Reconcile(ctx, pipelinerun); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -474,6 +495,16 @@ func TestReconcile_PipelineRun(t *testing.T) {
t.Fatalf("Error getting log record: %v", err)
}
})
t.Run("EventList", func(t *testing.T) {

eventListName := pr.GetAnnotations()[annotation.EventList]
if eventListName == "" {
t.Fatalf("Error parsing eventlist name '%s'", eventListName)
}
if _, err := resultsClient.GetRecord(ctx, &pb.GetRecordRequest{Name: eventListName}); err != nil {
t.Fatalf("Error getting eventlist record '%s': %v", eventListName, err)
}
})

// We don't do the same exhaustive feature testing as TaskRuns here -
// since everything is handled as a generic object testing TaskRuns should
Expand Down
2 changes: 2 additions & 0 deletions pkg/watcher/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/tektoncd/results/pkg/watcher/reconciler/leaderelection"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
)
Expand All @@ -48,6 +49,7 @@ func NewControllerWithConfig(ctx context.Context, resultsClient pb.ResultsClient

c := &Reconciler{
LeaderAwareFuncs: leaderelection.NewLeaderAwareFuncs(pipelineRunLister.List),
kubeClientSet: kubeclient.Get(ctx),
resultsClient: resultsClient,
logsClient: logs.Get(ctx),
pipelineRunLister: pipelineRunLister,
Expand Down
6 changes: 5 additions & 1 deletion pkg/watcher/reconciler/pipelinerun/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"go.uber.org/zap"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
Expand All @@ -43,6 +44,9 @@ type Reconciler struct {
// Inline LeaderAwareFuncs to support leader election.
knativereconciler.LeaderAwareFuncs

// kubeClientSet allows us to talk to the k8s for core APIs
kubeClientSet kubernetes.Interface

resultsClient pb.ResultsClient
logsClient pb.LogsClient
pipelineRunLister pipelinev1beta1listers.PipelineRunLister
Expand Down Expand Up @@ -86,7 +90,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error {
PipelineRunInterface: r.pipelineClient.TektonV1beta1().PipelineRuns(namespace),
}

dyn := dynamic.NewDynamicReconciler(r.resultsClient, r.logsClient, pipelineRunClient, r.cfg)
dyn := dynamic.NewDynamicReconciler(r.kubeClientSet, r.resultsClient, r.logsClient, pipelineRunClient, r.cfg)
// Tell the dynamic reconciler to wait until all underlying TaskRuns are
// ready for deletion before deleting the PipelineRun. This guarantees
// that the TaskRuns will not be deleted before their final state being
Expand Down
2 changes: 2 additions & 0 deletions pkg/watcher/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/tektoncd/results/pkg/watcher/reconciler/leaderelection"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
)
Expand All @@ -47,6 +48,7 @@ func NewControllerWithConfig(ctx context.Context, resultsClient pb.ResultsClient

c := &Reconciler{
LeaderAwareFuncs: leaderelection.NewLeaderAwareFuncs(lister.List),
kubeClientSet: kubeclient.Get(ctx),
resultsClient: resultsClient,
logsClient: logs.Get(ctx),
lister: lister,
Expand Down
6 changes: 5 additions & 1 deletion pkg/watcher/reconciler/taskrun/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.uber.org/zap"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"knative.dev/pkg/logging"
)
Expand All @@ -29,6 +30,9 @@ type Reconciler struct {
// Inline LeaderAwareFuncs to support leader election.
knativereconciler.LeaderAwareFuncs

// kubeClientSet allows us to talk to the k8s for core APIs
kubeClientSet kubernetes.Interface

resultsClient pb.ResultsClient
logsClient pb.LogsClient
lister v1beta1.TaskRunLister
Expand Down Expand Up @@ -71,7 +75,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error {
TaskRunInterface: r.pipelineClient.TektonV1beta1().TaskRuns(namespace),
}

dyn := dynamic.NewDynamicReconciler(r.resultsClient, r.logsClient, taskRunClient, r.cfg)
dyn := dynamic.NewDynamicReconciler(r.kubeClientSet, r.resultsClient, r.logsClient, taskRunClient, r.cfg)
dyn.AfterDeletion = func(ctx context.Context, o results.Object) error {
tr := o.(*pipelinev1beta1.TaskRun)
return r.metrics.DurationAndCountDeleted(ctx, r.configStore.Load().Metrics, tr)
Expand Down