diff --git a/pkg/timer/main.go b/pkg/timer/main.go index 75a6132f9d..4383dfbb46 100644 --- a/pkg/timer/main.go +++ b/pkg/timer/main.go @@ -38,7 +38,8 @@ func Start(ctx context.Context, logger *zap.Logger, routerUrl string) error { } poster := publisher.MakeWebhookPublisher(logger, routerUrl) - MakeTimerSync(ctx, logger, fissionClient, MakeTimer(logger, poster)) + timerSync := MakeTimerSync(ctx, logger, fissionClient, MakeTimer(logger, poster)) + timerSync.Run(ctx) return nil } diff --git a/pkg/timer/timer.go b/pkg/timer/timer.go index e6121c4481..ec089ba105 100644 --- a/pkg/timer/timer.go +++ b/pkg/timer/timer.go @@ -21,7 +21,6 @@ import ( "go.uber.org/zap" fv1 "github.com/fission/fission/pkg/apis/core/v1" - "github.com/fission/fission/pkg/crd" "github.com/fission/fission/pkg/publisher" "github.com/fission/fission/pkg/utils" ) @@ -34,20 +33,11 @@ const ( type ( Timer struct { - logger *zap.Logger - triggers map[string]*timerTriggerWithCron - requestChannel chan *timerRequest - publisher *publisher.Publisher + logger *zap.Logger + triggers map[string]*timerTriggerWithCron + publisher *publisher.Publisher } - timerRequest struct { - requestType - triggers []fv1.TimeTrigger - responseChannel chan *timerResponse - } - timerResponse struct { - error - } timerTriggerWithCron struct { trigger fv1.TimeTrigger cron *cron.Cron @@ -56,88 +46,26 @@ type ( func MakeTimer(logger *zap.Logger, publisher publisher.Publisher) *Timer { timer := &Timer{ - logger: logger.Named("timer"), - triggers: make(map[string]*timerTriggerWithCron), - requestChannel: make(chan *timerRequest), - publisher: &publisher, + logger: logger.Named("timer"), + triggers: make(map[string]*timerTriggerWithCron), + publisher: &publisher, } - go timer.svc() return timer } -func (timer *Timer) Sync(triggers []fv1.TimeTrigger) error { - req := &timerRequest{ - requestType: SYNC, - triggers: triggers, - responseChannel: make(chan *timerResponse), - } - timer.requestChannel <- req - resp := <-req.responseChannel - return resp.error -} - -func (timer *Timer) svc() { - for { - req := <-timer.requestChannel - switch req.requestType { - case SYNC: - err := timer.syncCron(req.triggers) - req.responseChannel <- &timerResponse{error: err} - } - } -} - -func (timer *Timer) syncCron(triggers []fv1.TimeTrigger) error { - // add new triggers or update existing ones - triggerMap := make(map[string]bool) - for _, t := range triggers { - triggerMap[crd.CacheKey(&t.ObjectMeta)] = true - if item, ok := timer.triggers[crd.CacheKey(&t.ObjectMeta)]; ok { - // update cron if the cron spec changed - if item.trigger.Spec.Cron != t.Spec.Cron { - // if there is an cron running, stop it - if item.cron != nil { - item.cron.Stop() - } - item.cron = timer.newCron(t) - } - - item.trigger = t - } else { - timer.triggers[crd.CacheKey(&t.ObjectMeta)] = &timerTriggerWithCron{ - trigger: t, - cron: timer.newCron(t), - } - } - } - - // process removed triggers - for k, v := range timer.triggers { - if _, found := triggerMap[k]; !found { - if v.cron != nil { - v.cron.Stop() - timer.logger.Info("cron for time trigger stopped", zap.String("trigger", v.trigger.ObjectMeta.Name)) - } - delete(timer.triggers, k) - } - } - - return nil -} - func (timer *Timer) newCron(t fv1.TimeTrigger) *cron.Cron { c := cron.New() c.AddFunc(t.Spec.Cron, func() { //nolint: errCheck headers := map[string]string{ - "X-Fission-Timer-Name": t.ObjectMeta.Name, + "X-Fission-Timer-Name": t.Name, } // with the addition of multi-tenancy, the users can create functions in any namespace. however, // the triggers can only be created in the same namespace as the function. // so essentially, function namespace = trigger namespace. - (*timer.publisher).Publish("", headers, utils.UrlForFunction(t.Spec.FunctionReference.Name, t.ObjectMeta.Namespace)) + (*timer.publisher).Publish("", headers, utils.UrlForFunction(t.Spec.FunctionReference.Name, t.Namespace)) }) c.Start() - timer.logger.Info("added new cron for time trigger", zap.String("trigger", t.ObjectMeta.Name)) + timer.logger.Info("started cron for time trigger", zap.String("trigger_name", t.Name), zap.String("trigger_namespace", t.Namespace), zap.String("cron", t.Spec.Cron)) return c } diff --git a/pkg/timer/timerSync.go b/pkg/timer/timerSync.go index f08466641d..3165c9dff5 100644 --- a/pkg/timer/timerSync.go +++ b/pkg/timer/timerSync.go @@ -21,17 +21,20 @@ import ( "time" "go.uber.org/zap" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sCache "k8s.io/client-go/tools/cache" + fv1 "github.com/fission/fission/pkg/apis/core/v1" + "github.com/fission/fission/pkg/crd" "github.com/fission/fission/pkg/generated/clientset/versioned" "github.com/fission/fission/pkg/utils" ) type ( TimerSync struct { - logger *zap.Logger - fissionClient versioned.Interface - timer *Timer + logger *zap.Logger + fissionClient versioned.Interface + timer *Timer + timeTriggerInformer map[string]k8sCache.SharedIndexInformer } ) @@ -41,24 +44,68 @@ func MakeTimerSync(ctx context.Context, logger *zap.Logger, fissionClient versio fissionClient: fissionClient, timer: timer, } - go ws.syncSvc(ctx) + ws.timeTriggerInformer = utils.GetInformersForNamespaces(fissionClient, time.Minute*30, fv1.TimeTriggerResource) + ws.TimeTriggerEventHandlers(ctx) return ws } -func (ws *TimerSync) syncSvc(ctx context.Context) { - for { - triggers, err := ws.fissionClient.CoreV1().TimeTriggers(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}) - if err != nil { - if utils.IsNetworkError(err) { - ws.logger.Info("encountered a network error - will retry", zap.Error(err)) - time.Sleep(5 * time.Second) - continue +func (ws *TimerSync) Run(ctx context.Context) { + for _, informer := range ws.timeTriggerInformer { + go informer.Run(ctx.Done()) + } +} + +func (ws *TimerSync) AddUpdateTimeTrigger(timeTrigger *fv1.TimeTrigger) { + logger := ws.logger.With(zap.String("trigger_name", timeTrigger.Name), zap.String("trigger_namespace", timeTrigger.Namespace)) + + ws.logger.Debug("cron event") + + if item, ok := ws.timer.triggers[crd.CacheKeyUID(&timeTrigger.ObjectMeta)]; ok { + if item.trigger.Spec.Cron != timeTrigger.Spec.Cron { + if item.cron != nil { + item.cron.Stop() } - ws.logger.Fatal("failed to get time trigger list", zap.Error(err)) + item.trigger = *timeTrigger + item.cron = ws.timer.newCron(*timeTrigger) + logger.Debug("cron updated") + } + } else { + ws.timer.triggers[crd.CacheKeyUID(&timeTrigger.ObjectMeta)] = &timerTriggerWithCron{ + trigger: *timeTrigger, + cron: ws.timer.newCron(*timeTrigger), } - ws.timer.Sync(triggers.Items) //nolint: errCheck + logger.Debug("cron added") + } +} + +func (ws *TimerSync) DeleteTimeTrigger(timeTrigger *fv1.TimeTrigger) { + logger := ws.logger.With(zap.String("trigger_name", timeTrigger.Name), zap.String("trigger_namespace", timeTrigger.Namespace)) + + if item, ok := ws.timer.triggers[crd.CacheKeyUID(&timeTrigger.ObjectMeta)]; ok { + if item.cron != nil { + item.cron.Stop() + logger.Info("cron for time trigger stopped") + } + delete(ws.timer.triggers, crd.CacheKeyUID(&timeTrigger.ObjectMeta)) + logger.Debug("cron deleted") + } +} - // TODO switch to watches - time.Sleep(3 * time.Second) +func (ws *TimerSync) TimeTriggerEventHandlers(ctx context.Context) { + for _, informer := range ws.timeTriggerInformer { + informer.AddEventHandler(k8sCache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + timeTrigger := obj.(*fv1.TimeTrigger) + ws.AddUpdateTimeTrigger(timeTrigger) + }, + UpdateFunc: func(_ interface{}, obj interface{}) { + timeTrigger := obj.(*fv1.TimeTrigger) + ws.AddUpdateTimeTrigger(timeTrigger) + }, + DeleteFunc: func(obj interface{}) { + timeTrigger := obj.(*fv1.TimeTrigger) + ws.DeleteTimeTrigger(timeTrigger) + }, + }) } }