-
Notifications
You must be signed in to change notification settings - Fork 2
/
scheduler.go
152 lines (134 loc) · 3.54 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package scheduler
import (
"context"
"fmt"
"time"
"github.com/robfig/cron/v3"
"go.uber.org/zap"
"go.autokitteh.dev/autokitteh/internal/kittehs"
"go.autokitteh.dev/autokitteh/sdk/sdkservices"
"go.autokitteh.dev/autokitteh/sdk/sdktypes"
)
const (
scope = "scheduler"
// initInterval is the interval at which we check for new connections.
initInterval = time.Second
)
type connection struct {
schedule, timezone, memo string
cronID cron.EntryID
}
type event struct {
// Trigger settings.
Schedule, Timezone, Memo string
// Event instance.
Timestamp time.Time
SinceEpoch int64
Location string
Year, Month, Day, Weekday int
Hour, Minute, Second int
}
var (
connections = map[string]connection{}
cronTable = cron.New(cron.WithParser(cron.NewParser(
cron.SecondOptional | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
)))
)
// detectNewConnections is a persistent goroutine that periodically
// checks for new connections and adds them to the cron table.
func detectNewConnections(l *zap.Logger, s sdkservices.Secrets, d sdkservices.Dispatcher) {
ctx := context.Background()
tokens, err := s.List(ctx, scope, "all")
if err != nil {
l.Error("Failed to list connections", zap.Error(err))
return
}
for _, token := range tokens {
// Ignore existing connections.
if _, ok := connections[token]; ok {
continue
}
// Add new connections to the cron table.
// TODO(ENG-301): Support multiple distributed server instances.
if conn, err := s.Get(ctx, scope, token); err == nil {
c := connection{
schedule: conn["schedule"],
timezone: conn["timezone"],
memo: conn["memo"],
}
spec := c.schedule
if c.timezone != "Local" {
spec = fmt.Sprintf("CRON_TZ=%s %s", c.timezone, s)
}
id, err := cronTable.AddFunc(spec, dispatchEvents(ctx, l, d, token, c))
if err != nil {
l.Error("Failed to add cron schedule",
zap.String("token", token),
zap.Any("connection", c),
zap.Error(err),
)
continue
}
c.cronID = id
connections[token] = c
cronTable.Start()
}
}
}
func dispatchEvents(ctx context.Context, l *zap.Logger, d sdkservices.Dispatcher, token string, conn connection) func() {
return func() {
now := time.Now()
e := event{
// Trigger settings.
Schedule: conn.schedule,
Timezone: conn.timezone,
Memo: conn.memo,
// Event instance.
Timestamp: now,
SinceEpoch: now.Unix(),
Location: now.Location().String(),
Year: now.Year(),
Month: int(now.Month()),
Day: now.Day(),
Weekday: int(now.Weekday()),
Hour: now.Hour(),
Minute: now.Minute(),
Second: now.Second(),
}
wrapped, err := sdktypes.DefaultValueWrapper.Wrap(e)
if err != nil {
l.Error("Failed to wrap cron event",
zap.Any("event", e),
zap.Error(err),
)
return
}
data, err := wrapped.ToStringValuesMap()
if err != nil {
l.Error("Failed to convert wrapped cron event",
zap.Any("event", e),
zap.Error(err),
)
return
}
proto := &sdktypes.EventPB{
IntegrationId: integrationID.String(),
IntegrationToken: token,
EventType: "cron_trigger",
Data: kittehs.TransformMapValues(data, sdktypes.ToProto),
}
event := kittehs.Must1(sdktypes.EventFromProto(proto))
eventID, err := d.Dispatch(ctx, event, nil)
if err != nil {
l.Error("Dispatch failed",
zap.String("connectionToken", token),
zap.Error(err),
)
return
}
l.Debug("Dispatched",
zap.String("connectionToken", token),
zap.String("eventID", eventID.String()),
)
}
}