Skip to content

Commit

Permalink
Enable storing of Run Events as Record
Browse files Browse the repository at this point in the history
All Events related to taskrun are stored when we are done with Runs
and in a single List.
This can be controlled by flag passed to watcher "store-event". Put
it to false disable storing of eventlist.
Record Name of EventList is stored as `results.tekton.dev/eventlist`
in TaskRun and PipelineRun.
  • Loading branch information
khrm committed Apr 18, 2024
1 parent 9309cee commit 78e672e
Show file tree
Hide file tree
Showing 12 changed files with 235 additions and 7 deletions.
2 changes: 2 additions & 0 deletions cmd/watcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ var (
namespace = flag.String("namespace", corev1.NamespaceAll, "Should the Watcher only watch a single namespace, then this value needs to be set to the namespace name otherwise leave it empty.")
checkOwner = flag.Bool("check_owner", true, "If enabled, owner references will be checked while deleting objects")
updateLogTimeout = flag.Duration("update_log_timeout", 30*time.Second, "How log the Watcher waits for the UpdateLog operation for storing logs to complete before it aborts.")
storeEvent = flag.Bool("store_event", true, "If enabled, events related to runs will also be stored")
)

func main() {
Expand Down Expand Up @@ -101,6 +102,7 @@ func main() {
RequeueInterval: *requeueInterval,
CheckOwner: *checkOwner,
UpdateLogTimeout: updateLogTimeout,
StoreEvent: *storeEvent,
}

if selector := *labelSelector; selector != "" {
Expand Down
2 changes: 1 addition & 1 deletion config/base/100-watcher-serviceaccount.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ rules:
# Watcher currently get config from APISever, so will
# fail to start if it does not have this permission.
- apiGroups: [""]
resources: ["configmaps", "pods"]
resources: ["configmaps", "pods", "events"]
verbs: ["get", "list", "watch"]
# Required to read logs, when logs API is enabled
- apiGroups: [""]
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/v1alpha2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"k8s.io/apimachinery/pkg/types"
)

// EventListRecordType represents the API resource type for EventSet records.
const EventListRecordType = "results.tekton.dev/v1alpha2.EventList"

// LogRecordType represents the API resource type for Tekton log records.
const LogRecordType = "results.tekton.dev/v1alpha2.Log"

Expand Down
3 changes: 3 additions & 0 deletions pkg/watcher/reconciler/annotation/annotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ const (
// Log identifier.
Log = "results.tekton.dev/log"

// EventList identifier.
EventList = "results.tekton.dev/eventlist"

// ResultAnnotations is an annotation that integrators should add to objects in order to store
// arbitrary keys/values into the Result.Annotations field.
ResultAnnotations = "results.tekton.dev/resultAnnotations"
Expand Down
3 changes: 3 additions & 0 deletions pkg/watcher/reconciler/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type Config struct {

// UpdateLogTimeout is the time we provide for storing logs before aborting
UpdateLogTimeout *time.Duration

// Whether to Store Events related to Taskrun and Pipelineruns
StoreEvent bool
}

// GetDisableAnnotationUpdate returns whether annotation updates should be
Expand Down
89 changes: 88 additions & 1 deletion pkg/watcher/reconciler/dynamic/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package dynamic
import (
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"runtime/pprof"
Expand Down Expand Up @@ -45,6 +46,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"knative.dev/pkg/apis"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
Expand All @@ -57,6 +59,9 @@ var (
// Reconciler implements common reconciler behavior across different Tekton Run
// Object types.
type Reconciler struct {
// KubeClientSet allows us to talk to the k8s for core APIs
KubeClientSet kubernetes.Interface

resultsClient *results.Client
objectClient ObjectClient
cfg *reconciler.Config
Expand All @@ -82,9 +87,10 @@ type IsReadyForDeletion func(ctx context.Context, object results.Object) (bool,
type AfterDeletion func(ctx context.Context, object results.Object) error

// NewDynamicReconciler creates a new dynamic Reconciler.
func NewDynamicReconciler(rc pb.ResultsClient, lc pb.LogsClient, oc ObjectClient, cfg *reconciler.Config) *Reconciler {
func NewDynamicReconciler(kubeClientSet kubernetes.Interface, rc pb.ResultsClient, lc pb.LogsClient, oc ObjectClient, cfg *reconciler.Config) *Reconciler {
return &Reconciler{
resultsClient: results.NewClient(rc, lc),
KubeClientSet: kubeClientSet,
objectClient: oc,
cfg: cfg,
// Always true predicate.
Expand Down Expand Up @@ -213,6 +219,26 @@ func (r *Reconciler) Reconcile(ctx context.Context, o results.Object) error {
}
}

// CreateEvents if enabled
if r.cfg.StoreEvent {
if err := r.storeEvents(ctx, o); err != nil {
logger.Errorw("Error storing eventlist",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
zap.Error(err),
)
if ctxCancel != nil {
ctxCancel()
}
return err
}
logger.Debugw("Successfully store eventlist",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
)
}
logger = logger.With(zap.String("results.tekton.dev/result", res.Name),
zap.String("results.tekton.dev/record", rec.Name))
logger.Debugw("Record has been successfully upserted into API server", timeTakenField)
Expand Down Expand Up @@ -572,3 +598,64 @@ func (r *Reconciler) streamLogs(ctx context.Context, o results.Object, logType,

return nil
}

// storeEvents streams logs to the API server
func (r *Reconciler) storeEvents(ctx context.Context, o results.Object) error {
logger := logging.FromContext(ctx)
condition := o.GetStatusCondition().GetCondition(apis.ConditionSucceeded)
GVK := o.GetObjectKind().GroupVersionKind()
if !GVK.Empty() &&
(GVK.Kind == "TaskRun" || GVK.Kind == "PipelineRun") &&
condition != nil &&
condition.Type == "Succeeded" &&
!condition.IsUnknown() {

rec, err := r.resultsClient.GetEventListRecord(ctx, o)
if err != nil {
return err
}

if rec != nil {
// It means we have already stored events
eventListName := rec.GetName()
// Update Events annotation if it doesn't exist
return r.addResultsAnnotations(ctx, o, annotation.Annotation{Name: annotation.EventList, Value: eventListName})
}

events, err := r.KubeClientSet.CoreV1().Events(o.GetNamespace()).List(ctx, metav1.ListOptions{
FieldSelector: "involvedObject.uid=" + string(o.GetUID()),
})
if err != nil {
logger.Errorf("Failed to store events - retrieve",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
zap.String("err", err.Error()),
)
return err
}

eventList, err := json.Marshal(events)
if err != nil {
logger.Errorf("Failed to store events - marshal",
zap.String("namespace", o.GetNamespace()),
zap.String("kind", o.GetObjectKind().GroupVersionKind().Kind),
zap.String("name", o.GetName()),
zap.String("err", err.Error()),
)
return err
}

rec, err = r.resultsClient.PutEventList(ctx, o, eventList)
if err != nil {
return err
}

if err := r.addResultsAnnotations(ctx, o, annotation.Annotation{Name: annotation.EventList, Value: rec.GetName()}); err != nil {
return err
}

}

return nil
}
37 changes: 34 additions & 3 deletions pkg/watcher/reconciler/dynamic/dynamic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sTest "k8s.io/client-go/kubernetes/fake"
"knative.dev/pkg/apis"
duckv1beta1 "knative.dev/pkg/apis/duck/v1beta1"
"knative.dev/pkg/controller"
Expand Down Expand Up @@ -138,9 +139,12 @@ func TestReconcile_TaskRun(t *testing.T) {
cfg := &reconciler.Config{
DisableAnnotationUpdate: true,
RequeueInterval: 1 * time.Second,
StoreEvent: true,
}

r := NewDynamicReconciler(resultsClient, logsClient, trclient, cfg)
client := k8sTest.NewSimpleClientset()

r := NewDynamicReconciler(client, resultsClient, logsClient, trclient, cfg)
if err := r.Reconcile(ctx, taskrun); err != nil {
t.Fatal(err)
}
Expand All @@ -164,10 +168,14 @@ func TestReconcile_TaskRun(t *testing.T) {
if err != nil {
t.Fatalf("Error parsing result uid: %v", err)
}
logRecordName := record.FormatName(resultName, uuid.NewMD5(uid, []byte(taskrun.GetUID())).String())
logRecordName := record.FormatName(resultName, uuid.NewMD5(uid, []byte(taskrun.GetUID()+"eventlist")).String())
if _, err := resultsClient.GetRecord(ctx, &pb.GetRecordRequest{Name: logRecordName}); err != nil {
t.Fatalf("Error getting log record: %v", err)
}
eventListName := watcherresults.FormatEventListName(resultName, uid, taskrun)
if _, err := resultsClient.GetRecord(ctx, &pb.GetRecordRequest{Name: eventListName}); err != nil {
t.Fatalf("Error getting eventlist %s: record: %v", eventListName, err)
}
})

// Enable Annotation Updates, re-reconcile
Expand Down Expand Up @@ -204,6 +212,14 @@ func TestReconcile_TaskRun(t *testing.T) {
if _, err := resultsClient.GetRecord(ctx, &pb.GetRecordRequest{Name: logRecordName}); err != nil {
t.Fatalf("Error getting log record '%s': %v", logRecordName, err)
}
eventListName := tr.GetAnnotations()[annotation.EventList]
if eventListName == "" {
t.Fatalf("Error parsing eventlist name '%s'", eventListName)
}
if _, err := resultsClient.GetRecord(ctx, &pb.GetRecordRequest{Name: eventListName}); err != nil {
t.Fatalf("Error getting eventlist record '%s': %v", eventListName, err)
}

})

t.Run("delete object once grace period elapses", func(t *testing.T) {
Expand Down Expand Up @@ -428,8 +444,13 @@ func TestReconcile_PipelineRun(t *testing.T) {
if _, err := prclient.Create(ctx, pipelinerun, metav1.CreateOptions{}); err != nil {
t.Fatal(err)
}
cfg := &reconciler.Config{
StoreEvent: true,
}

client := k8sTest.NewSimpleClientset()

r := NewDynamicReconciler(resultsClient, logsClient, prclient, nil)
r := NewDynamicReconciler(client, resultsClient, logsClient, prclient, cfg)
if err := r.Reconcile(ctx, pipelinerun); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -474,6 +495,16 @@ func TestReconcile_PipelineRun(t *testing.T) {
t.Fatalf("Error getting log record: %v", err)
}
})
t.Run("EventList", func(t *testing.T) {

eventListName := pr.GetAnnotations()[annotation.EventList]
if eventListName == "" {
t.Fatalf("Error parsing eventlist name '%s'", eventListName)
}
if _, err := resultsClient.GetRecord(ctx, &pb.GetRecordRequest{Name: eventListName}); err != nil {
t.Fatalf("Error getting eventlist record '%s': %v", eventListName, err)
}
})

// We don't do the same exhaustive feature testing as TaskRuns here -
// since everything is handled as a generic object testing TaskRuns should
Expand Down
2 changes: 2 additions & 0 deletions pkg/watcher/reconciler/pipelinerun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/tektoncd/results/pkg/watcher/reconciler/leaderelection"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
)
Expand All @@ -48,6 +49,7 @@ func NewControllerWithConfig(ctx context.Context, resultsClient pb.ResultsClient

c := &Reconciler{
LeaderAwareFuncs: leaderelection.NewLeaderAwareFuncs(pipelineRunLister.List),
kubeClientSet: kubeclient.Get(ctx),
resultsClient: resultsClient,
logsClient: logs.Get(ctx),
pipelineRunLister: pipelineRunLister,
Expand Down
6 changes: 5 additions & 1 deletion pkg/watcher/reconciler/pipelinerun/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"go.uber.org/zap"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
Expand All @@ -43,6 +44,9 @@ type Reconciler struct {
// Inline LeaderAwareFuncs to support leader election.
knativereconciler.LeaderAwareFuncs

// kubeClientSet allows us to talk to the k8s for core APIs
kubeClientSet kubernetes.Interface

resultsClient pb.ResultsClient
logsClient pb.LogsClient
pipelineRunLister pipelinev1beta1listers.PipelineRunLister
Expand Down Expand Up @@ -86,7 +90,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error {
PipelineRunInterface: r.pipelineClient.TektonV1beta1().PipelineRuns(namespace),
}

dyn := dynamic.NewDynamicReconciler(r.resultsClient, r.logsClient, pipelineRunClient, r.cfg)
dyn := dynamic.NewDynamicReconciler(r.kubeClientSet, r.resultsClient, r.logsClient, pipelineRunClient, r.cfg)
// Tell the dynamic reconciler to wait until all underlying TaskRuns are
// ready for deletion before deleting the PipelineRun. This guarantees
// that the TaskRuns will not be deleted before their final state being
Expand Down
2 changes: 2 additions & 0 deletions pkg/watcher/reconciler/taskrun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/tektoncd/results/pkg/watcher/reconciler/leaderelection"
pb "github.com/tektoncd/results/proto/v1alpha2/results_go_proto"
"k8s.io/client-go/tools/cache"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
)
Expand All @@ -47,6 +48,7 @@ func NewControllerWithConfig(ctx context.Context, resultsClient pb.ResultsClient

c := &Reconciler{
LeaderAwareFuncs: leaderelection.NewLeaderAwareFuncs(lister.List),
kubeClientSet: kubeclient.Get(ctx),
resultsClient: resultsClient,
logsClient: logs.Get(ctx),
lister: lister,
Expand Down
6 changes: 5 additions & 1 deletion pkg/watcher/reconciler/taskrun/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.uber.org/zap"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"knative.dev/pkg/logging"
)
Expand All @@ -29,6 +30,9 @@ type Reconciler struct {
// Inline LeaderAwareFuncs to support leader election.
knativereconciler.LeaderAwareFuncs

// kubeClientSet allows us to talk to the k8s for core APIs
kubeClientSet kubernetes.Interface

resultsClient pb.ResultsClient
logsClient pb.LogsClient
lister v1beta1.TaskRunLister
Expand Down Expand Up @@ -71,7 +75,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error {
TaskRunInterface: r.pipelineClient.TektonV1beta1().TaskRuns(namespace),
}

dyn := dynamic.NewDynamicReconciler(r.resultsClient, r.logsClient, taskRunClient, r.cfg)
dyn := dynamic.NewDynamicReconciler(r.kubeClientSet, r.resultsClient, r.logsClient, taskRunClient, r.cfg)
dyn.AfterDeletion = func(ctx context.Context, o results.Object) error {
tr := o.(*pipelinev1beta1.TaskRun)
return r.metrics.DurationAndCountDeleted(ctx, r.configStore.Load().Metrics, tr)
Expand Down

0 comments on commit 78e672e

Please sign in to comment.