-
Notifications
You must be signed in to change notification settings - Fork 25
/
multi-job.go
164 lines (138 loc) · 3.82 KB
/
multi-job.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package jobs
import (
"context"
"sync"
"go.uber.org/fx"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/wrapperspb"
"github.com/fluxninja/aperture/v2/pkg/config"
jobsconfig "github.com/fluxninja/aperture/v2/pkg/jobs/config"
"github.com/fluxninja/aperture/v2/pkg/log"
panichandler "github.com/fluxninja/aperture/v2/pkg/panic-handler"
"github.com/fluxninja/aperture/v2/pkg/status"
)
// MultiJobConfig holds configuration for MultiJob.
// swagger:model
type MultiJobConfig struct {
jobsconfig.JobConfig
}
// MultiJobConstructor holds fields to create annotated instance of MultiJob.
type MultiJobConstructor struct {
DefaultConfig MultiJobConfig
Name string
JobGroupName string
JWS JobWatchers
GWS GroupWatchers
}
// Annotate provides annotated instance of MultiJob.
func (mjc MultiJobConstructor) Annotate() fx.Option {
name := config.NameTag(mjc.JobGroupName + "." + mjc.Name)
group := config.GroupTag(mjc.Name)
jgName := config.NameTag(mjc.JobGroupName)
return fx.Provide(
fx.Annotate(
mjc.provideMultiJob,
fx.ParamTags(group, group, jgName),
fx.ResultTags(name),
),
)
}
func (mjc MultiJobConstructor) provideMultiJob(
gws GroupWatchers,
jws JobWatchers,
jg *JobGroup,
unmarshaller config.Unmarshaller,
lifecycle fx.Lifecycle,
) (*MultiJob, error) {
config := mjc.DefaultConfig
if err := unmarshaller.UnmarshalKey(mjc.Name, &config); err != nil {
log.Error().Err(err).Msg("Unable to deserialize")
return nil, err
}
gwAll := GroupWatchers{}
if len(mjc.GWS) > 0 || len(gws) > 0 {
gwAll = append(gwAll, mjc.GWS...)
gwAll = append(gwAll, gws...)
}
jwAll := JobWatchers{}
if len(mjc.JWS) > 0 || len(jws) > 0 {
jwAll = append(jwAll, mjc.JWS...)
jwAll = append(jwAll, jws...)
}
// Create a new MultiJob instance
mj := NewMultiJob(jg.GetStatusRegistry().Child("multi-job", mjc.Name), jwAll, gwAll)
lifecycle.Append(fx.Hook{
OnStart: func(_ context.Context) error {
// Register multijob
err := jg.RegisterJob(mj, config.JobConfig)
if err != nil {
return err
}
return nil
},
OnStop: func(context.Context) error {
// Deregister all jobs
mj.gt.reset()
_ = jg.DeregisterJob(mjc.Name)
return nil
},
})
return mj, nil
}
// MultiJob runs multiple jobs in asynchronous manner.
type MultiJob struct {
gt *groupTracker
JobBase
}
// Make sure MultiJob complies with Job interface.
var _ Job = (*MultiJob)(nil)
// NewMultiJob creates a new instance of MultiJob.
func NewMultiJob(registry status.Registry, jws JobWatchers, gws GroupWatchers) *MultiJob {
return &MultiJob{
JobBase: JobBase{
JobName: registry.Key(),
JWS: jws,
},
gt: newGroupTracker(gws, registry),
}
}
// Name returns the name of the job.
func (mj *MultiJob) Name() string {
return mj.JobBase.Name()
}
// JobWatchers returns the list of job watchers.
func (mj *MultiJob) JobWatchers() JobWatchers {
return mj.JobBase.JobWatchers()
}
// Execute executes all jobs, collects that results, and returns the aggregated status.
func (mj *MultiJob) Execute(ctx context.Context) (proto.Message, error) {
jobs := mj.gt.getJobs()
var wg sync.WaitGroup
for _, job := range jobs {
wg.Add(1)
execFunc := func(job Job) func() {
return func() {
defer wg.Done()
_, _ = mj.gt.execute(ctx, job)
}
}
panichandler.Go(execFunc(job))
}
// wait for results
wg.Wait()
// nothing to report at the multijob level
return wrapperspb.String("MultiJob"), nil
}
// RegisterJob registers a job with the MultiJob.
func (mj *MultiJob) RegisterJob(job Job) error {
return mj.gt.registerJob(job)
}
// DeregisterJob deregisters a job with the MultiJob.
func (mj *MultiJob) DeregisterJob(name string) error {
_, err := mj.gt.deregisterJob(name)
return err
}
// DeregisterAll removes all jobs from the MultiJob.
func (mj *MultiJob) DeregisterAll() {
mj.gt.reset()
}