-
Notifications
You must be signed in to change notification settings - Fork 31
/
cron.go
61 lines (53 loc) · 1.44 KB
/
cron.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package cron
import (
"context"
"fmt"
"time"
dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
"github.com/argoproj-labs/argo-dataflow/runner/sidecar/source"
sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util"
"github.com/opentracing/opentracing-go"
"github.com/robfig/cron/v3"
"k8s.io/apimachinery/pkg/util/runtime"
)
var logger = sharedutil.NewLogger()
type cronSource struct {
crn *cron.Cron
}
func New(ctx context.Context, sourceName, sourceURN string, x dfv1.Cron, process source.Process) (source.Interface, error) {
crn := cron.New(
cron.WithParser(cron.NewParser(cron.SecondOptional|cron.Minute|cron.Hour|cron.Dom|cron.Month|cron.Dow|cron.Descriptor)),
cron.WithChain(cron.Recover(logger)),
)
go func() {
defer runtime.HandleCrash()
crn.Run()
}()
_, err := crn.AddFunc(x.Schedule, func() {
span, ctx := opentracing.StartSpanFromContext(ctx, fmt.Sprintf("cron-source-%s", sourceName))
defer span.Finish()
now := time.Now()
msg := []byte(now.Format(x.Layout))
if err := process(
dfv1.ContextWithMeta(
ctx,
dfv1.Meta{
Source: sourceURN,
ID: fmt.Sprint(now.Unix()),
Time: now.Unix(),
},
),
msg,
); err != nil {
logger.Error(err, "failed to process message")
}
})
if err != nil {
return nil, fmt.Errorf("failed to schedule cron %q: %w", x.Schedule, err)
}
return cronSource{crn}, nil
}
func (s cronSource) Close() error {
<-s.crn.Stop().Done()
return nil
}