Skip to content

Commit

Permalink
Use informer for time trigger handling with multiple namespace support (
Browse files Browse the repository at this point in the history
#2593)

* changes to add informer in timer for time trigger
* Refactor timer trigger handlers and remove unwanted code

Signed-off-by: Sanket Sudake <sanketsudake@gmail.com>
Co-authored-by: Sanket Sudake <sanketsudake@gmail.com>
  • Loading branch information
shubham-bansal96 and sanketsudake committed Nov 2, 2022
1 parent c33842c commit b9fa6ca
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 99 deletions.
3 changes: 2 additions & 1 deletion pkg/timer/main.go
Expand Up @@ -38,7 +38,8 @@ func Start(ctx context.Context, logger *zap.Logger, routerUrl string) error {
}

poster := publisher.MakeWebhookPublisher(logger, routerUrl)
MakeTimerSync(ctx, logger, fissionClient, MakeTimer(logger, poster))
timerSync := MakeTimerSync(ctx, logger, fissionClient, MakeTimer(logger, poster))
timerSync.Run(ctx)

return nil
}
90 changes: 9 additions & 81 deletions pkg/timer/timer.go
Expand Up @@ -21,7 +21,6 @@ import (
"go.uber.org/zap"

fv1 "github.com/fission/fission/pkg/apis/core/v1"
"github.com/fission/fission/pkg/crd"
"github.com/fission/fission/pkg/publisher"
"github.com/fission/fission/pkg/utils"
)
Expand All @@ -34,20 +33,11 @@ const (

type (
Timer struct {
logger *zap.Logger
triggers map[string]*timerTriggerWithCron
requestChannel chan *timerRequest
publisher *publisher.Publisher
logger *zap.Logger
triggers map[string]*timerTriggerWithCron
publisher *publisher.Publisher
}

timerRequest struct {
requestType
triggers []fv1.TimeTrigger
responseChannel chan *timerResponse
}
timerResponse struct {
error
}
timerTriggerWithCron struct {
trigger fv1.TimeTrigger
cron *cron.Cron
Expand All @@ -56,88 +46,26 @@ type (

func MakeTimer(logger *zap.Logger, publisher publisher.Publisher) *Timer {
timer := &Timer{
logger: logger.Named("timer"),
triggers: make(map[string]*timerTriggerWithCron),
requestChannel: make(chan *timerRequest),
publisher: &publisher,
logger: logger.Named("timer"),
triggers: make(map[string]*timerTriggerWithCron),
publisher: &publisher,
}
go timer.svc()
return timer
}

func (timer *Timer) Sync(triggers []fv1.TimeTrigger) error {
req := &timerRequest{
requestType: SYNC,
triggers: triggers,
responseChannel: make(chan *timerResponse),
}
timer.requestChannel <- req
resp := <-req.responseChannel
return resp.error
}

func (timer *Timer) svc() {
for {
req := <-timer.requestChannel
switch req.requestType {
case SYNC:
err := timer.syncCron(req.triggers)
req.responseChannel <- &timerResponse{error: err}
}
}
}

func (timer *Timer) syncCron(triggers []fv1.TimeTrigger) error {
// add new triggers or update existing ones
triggerMap := make(map[string]bool)
for _, t := range triggers {
triggerMap[crd.CacheKey(&t.ObjectMeta)] = true
if item, ok := timer.triggers[crd.CacheKey(&t.ObjectMeta)]; ok {
// update cron if the cron spec changed
if item.trigger.Spec.Cron != t.Spec.Cron {
// if there is an cron running, stop it
if item.cron != nil {
item.cron.Stop()
}
item.cron = timer.newCron(t)
}

item.trigger = t
} else {
timer.triggers[crd.CacheKey(&t.ObjectMeta)] = &timerTriggerWithCron{
trigger: t,
cron: timer.newCron(t),
}
}
}

// process removed triggers
for k, v := range timer.triggers {
if _, found := triggerMap[k]; !found {
if v.cron != nil {
v.cron.Stop()
timer.logger.Info("cron for time trigger stopped", zap.String("trigger", v.trigger.ObjectMeta.Name))
}
delete(timer.triggers, k)
}
}

return nil
}

func (timer *Timer) newCron(t fv1.TimeTrigger) *cron.Cron {
c := cron.New()
c.AddFunc(t.Spec.Cron, func() { //nolint: errCheck
headers := map[string]string{
"X-Fission-Timer-Name": t.ObjectMeta.Name,
"X-Fission-Timer-Name": t.Name,
}

// with the addition of multi-tenancy, the users can create functions in any namespace. however,
// the triggers can only be created in the same namespace as the function.
// so essentially, function namespace = trigger namespace.
(*timer.publisher).Publish("", headers, utils.UrlForFunction(t.Spec.FunctionReference.Name, t.ObjectMeta.Namespace))
(*timer.publisher).Publish("", headers, utils.UrlForFunction(t.Spec.FunctionReference.Name, t.Namespace))
})
c.Start()
timer.logger.Info("added new cron for time trigger", zap.String("trigger", t.ObjectMeta.Name))
timer.logger.Info("started cron for time trigger", zap.String("trigger_name", t.Name), zap.String("trigger_namespace", t.Namespace), zap.String("cron", t.Spec.Cron))
return c
}
81 changes: 64 additions & 17 deletions pkg/timer/timerSync.go
Expand Up @@ -21,17 +21,20 @@ import (
"time"

"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sCache "k8s.io/client-go/tools/cache"

fv1 "github.com/fission/fission/pkg/apis/core/v1"
"github.com/fission/fission/pkg/crd"
"github.com/fission/fission/pkg/generated/clientset/versioned"
"github.com/fission/fission/pkg/utils"
)

type (
TimerSync struct {
logger *zap.Logger
fissionClient versioned.Interface
timer *Timer
logger *zap.Logger
fissionClient versioned.Interface
timer *Timer
timeTriggerInformer map[string]k8sCache.SharedIndexInformer
}
)

Expand All @@ -41,24 +44,68 @@ func MakeTimerSync(ctx context.Context, logger *zap.Logger, fissionClient versio
fissionClient: fissionClient,
timer: timer,
}
go ws.syncSvc(ctx)
ws.timeTriggerInformer = utils.GetInformersForNamespaces(fissionClient, time.Minute*30, fv1.TimeTriggerResource)
ws.TimeTriggerEventHandlers(ctx)
return ws
}

func (ws *TimerSync) syncSvc(ctx context.Context) {
for {
triggers, err := ws.fissionClient.CoreV1().TimeTriggers(metav1.NamespaceAll).List(ctx, metav1.ListOptions{})
if err != nil {
if utils.IsNetworkError(err) {
ws.logger.Info("encountered a network error - will retry", zap.Error(err))
time.Sleep(5 * time.Second)
continue
func (ws *TimerSync) Run(ctx context.Context) {
for _, informer := range ws.timeTriggerInformer {
go informer.Run(ctx.Done())
}
}

func (ws *TimerSync) AddUpdateTimeTrigger(timeTrigger *fv1.TimeTrigger) {
logger := ws.logger.With(zap.String("trigger_name", timeTrigger.Name), zap.String("trigger_namespace", timeTrigger.Namespace))

ws.logger.Debug("cron event")

if item, ok := ws.timer.triggers[crd.CacheKeyUID(&timeTrigger.ObjectMeta)]; ok {
if item.trigger.Spec.Cron != timeTrigger.Spec.Cron {
if item.cron != nil {
item.cron.Stop()
}
ws.logger.Fatal("failed to get time trigger list", zap.Error(err))
item.trigger = *timeTrigger
item.cron = ws.timer.newCron(*timeTrigger)
logger.Debug("cron updated")
}
} else {
ws.timer.triggers[crd.CacheKeyUID(&timeTrigger.ObjectMeta)] = &timerTriggerWithCron{
trigger: *timeTrigger,
cron: ws.timer.newCron(*timeTrigger),
}
ws.timer.Sync(triggers.Items) //nolint: errCheck
logger.Debug("cron added")
}
}

func (ws *TimerSync) DeleteTimeTrigger(timeTrigger *fv1.TimeTrigger) {
logger := ws.logger.With(zap.String("trigger_name", timeTrigger.Name), zap.String("trigger_namespace", timeTrigger.Namespace))

if item, ok := ws.timer.triggers[crd.CacheKeyUID(&timeTrigger.ObjectMeta)]; ok {
if item.cron != nil {
item.cron.Stop()
logger.Info("cron for time trigger stopped")
}
delete(ws.timer.triggers, crd.CacheKeyUID(&timeTrigger.ObjectMeta))
logger.Debug("cron deleted")
}
}

// TODO switch to watches
time.Sleep(3 * time.Second)
func (ws *TimerSync) TimeTriggerEventHandlers(ctx context.Context) {
for _, informer := range ws.timeTriggerInformer {
informer.AddEventHandler(k8sCache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
timeTrigger := obj.(*fv1.TimeTrigger)
ws.AddUpdateTimeTrigger(timeTrigger)
},
UpdateFunc: func(_ interface{}, obj interface{}) {
timeTrigger := obj.(*fv1.TimeTrigger)
ws.AddUpdateTimeTrigger(timeTrigger)
},
DeleteFunc: func(obj interface{}) {
timeTrigger := obj.(*fv1.TimeTrigger)
ws.DeleteTimeTrigger(timeTrigger)
},
})
}
}

0 comments on commit b9fa6ca

Please sign in to comment.