-
Notifications
You must be signed in to change notification settings - Fork 24
/
goflow.go
159 lines (128 loc) · 3.33 KB
/
goflow.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
// Package goflow implements a simple but powerful DAG scheduler and dashboard.
package goflow
import (
"errors"
"fmt"
"net/http"
"github.com/google/uuid"
"github.com/philippgille/gokv"
"github.com/philippgille/gokv/gomap"
"github.com/robfig/cron/v3"
)
// Goflow contains job data and a router.
type Goflow struct {
Store gokv.Store
Options Options
Jobs map[string](func() *Job)
Router *http.ServeMux
cron *cron.Cron
jobs []string
}
// Options to control various Goflow behavior.
type Options struct {
Store gokv.Store
UIPath string
Streaming bool
ShowExamples bool
WithSeconds bool
}
// New returns a Goflow engine.
func New(opts Options) *Goflow {
// Add a default store if necessary
if opts.Store == nil {
opts.Store = gomap.NewStore(gomap.DefaultOptions)
}
// Add the cron schedule
var c *cron.Cron
if opts.WithSeconds {
c = cron.New(cron.WithSeconds())
} else {
c = cron.New()
}
g := &Goflow{
Store: opts.Store,
Options: opts,
Jobs: make(map[string](func() *Job)),
Router: http.NewServeMux(),
cron: c,
}
if opts.ShowExamples {
g.AddJob(complexAnalyticsJob)
g.AddJob(randomFailureJob)
g.AddJob(summationJob)
}
return g
}
// scheduledExecution implements cron.Job
type scheduledExecution struct {
store gokv.Store
jobFunc func() *Job
}
func (schedExec *scheduledExecution) Run() {
// create job
job := schedExec.jobFunc()
// create and persist a new execution
e := job.newExecution()
persistNewExecution(schedExec.store, e)
indexExecutions(schedExec.store, e)
// start running the job
job.run(schedExec.store, e)
}
// AddJob takes a job-emitting function and registers it
// with the engine.
func (g *Goflow) AddJob(jobFunc func() *Job) error {
j := jobFunc()
// "" is not a valid key in the storage layer
if j.Name == "" {
return errors.New("\"\" is not a valid job name")
}
// Register the job
g.Jobs[j.Name] = jobFunc
g.jobs = append(g.jobs, j.Name)
// If the job is active by default, add it to the cron schedule
if j.Active {
e := &scheduledExecution{g.Store, jobFunc}
_, err := g.cron.AddJob(j.Schedule, e)
return err
}
return nil
}
// toggle flips a job's cron schedule status from active to inactive
// and vice versa. It returns true if the new status is active and false
// if it is inactive.
func (g *Goflow) toggle(jobName string) (bool, error) {
// if the job is found in the list of entries, remove it
for _, entry := range g.cron.Entries() {
if name := entry.Job.(*scheduledExecution).jobFunc().Name; name == jobName {
g.cron.Remove(entry.ID)
return false, nil
}
}
// else add a new entry
jobFunc := g.Jobs[jobName]
e := &scheduledExecution{g.Store, jobFunc}
g.cron.AddJob(jobFunc().Schedule, e)
return true, nil
}
// Execute tells the engine to run a given job in a new goroutine.
func (g *Goflow) Execute(job string) (*uuid.UUID, error) {
jobFunc, ok := g.Jobs[job]
if !ok {
return nil, fmt.Errorf("job %s does not exist", job)
}
j := jobFunc()
// create and persist a new execution
e := j.newExecution()
persistNewExecution(g.Store, e)
indexExecutions(g.Store, e)
// start running the job
go j.run(g.Store, e)
return &e.ID, nil
}
// Run runs the webserver.
func (g *Goflow) Run(port string) error {
// g.addStreamRoute(true)
g.addTestRoute()
g.cron.Start()
return http.ListenAndServe(port, g.Router)
}