/
generator.go
115 lines (96 loc) · 3.03 KB
/
generator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
/*
Generator
Generators create migration operations and are the first step
in an anser Migration. They are supersets of amboy.Job interfaces.
The current limitation is that the generated jobs must be stored
within the implementation of the generator job, which means they must
either all fit in memory *or* be serializable independently (e.g. fit
in the 16mb document limit if using a MongoDB backed queue.)
*/
package anser
import (
"context"
"github.com/cdr/amboy"
"github.com/cdr/amboy/dependency"
"github.com/cdr/grip"
"github.com/deciduosity/anser/model"
"github.com/pkg/errors"
)
// Generator is a amboy.Job super set used to store
// implementations that generate other jobs jobs. Internally they
// construct and store member jobs.
//
// Indeed this interface may be useful at some point for any kind of
// job generating operation.
type Generator interface {
// Jobs produces job objects for the results of the
// generator.
Jobs() <-chan amboy.Job
// Generators are themselves amboy.Jobs.
amboy.Job
}
// generatorDependency produces a configured dependency.Manager from
// the specified Generator options.
func generatorDependency(env Environment, o model.GeneratorOptions) dependency.Manager {
// it might be worth considering using some other kind of
// dependency.Manager implementation.
dep := env.NewDependencyManager(o.JobID)
for _, edge := range o.DependsOn {
grip.Warning(dep.AddEdge(edge))
}
return dep
}
// addMigrationJobs takes an amboy.Queue, processes the results, and
// adds any jobs produced by the generator to the queue.
func addMigrationJobs(ctx context.Context, q amboy.Queue, dryRun bool, limit int) (int, error) {
catcher := grip.NewCatcher()
count := 0
for job := range q.Jobs(ctx) {
generator, ok := job.(Generator)
if !ok {
continue
}
grip.Infof("adding operations for %s", generator.ID())
for j := range generator.Jobs() {
if dryRun {
grip.Infof("dry-run: would have added %s", j.ID())
continue
}
if limit > 0 && count >= limit {
return count, catcher.Resolve()
}
catcher.Add(q.Put(ctx, j))
count++
}
}
grip.Infof("added %d migration operations", count)
return count, catcher.Resolve()
}
// generator provides the high level implementation of the Jobs()
// method that's a part of the Generator interface. This
// takes a list of jobs (using a variadic function to do the type
// conversion,) and returns them in a (buffered) channel. with the
// jobs, having had their dependencies set.
func generator(env Environment, groupID string, input <-chan amboy.Job) (<-chan amboy.Job, error) {
out := make(chan amboy.Job)
network, err := env.GetDependencyNetwork()
if err != nil {
grip.Warning(err)
close(out)
return out, errors.WithStack(err)
}
go func() {
defer close(out)
for migration := range input {
dep := migration.Dependency()
for _, group := range network.Resolve(groupID) {
for _, edge := range network.GetGroup(group) {
grip.Notice(dep.AddEdge(edge))
}
}
migration.SetDependency(dep)
out <- migration
}
}()
return out, nil
}