-
Notifications
You must be signed in to change notification settings - Fork 0
/
workers.gen.go
111 lines (93 loc) · 3.4 KB
/
workers.gen.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// Code generated by github.com/dashotv/golem. DO NOT EDIT.
package app
import (
"context"
"github.com/dashotv/fae"
"github.com/dashotv/minion"
)
func init() {
initializers = append(initializers, setupWorkers)
healthchecks["workers"] = checkWorkers
starters = append(starters, startWorkers)
}
func checkWorkers(app *Application) error {
// TODO: workers health check
return nil
}
func startWorkers(ctx context.Context, app *Application) error {
ctx = context.WithValue(ctx, "app", app)
app.Log.Debugf("starting workers (%d)", app.Config.MinionConcurrency)
go app.Workers.Start(ctx)
return nil
}
func setupWorkers(app *Application) error {
mcfg := &minion.Config{
Logger: app.Log.Named("minion"),
Debug: app.Config.MinionDebug,
Concurrency: app.Config.MinionConcurrency,
BufferSize: app.Config.MinionBufferSize,
DatabaseURI: app.Config.MinionURI,
Database: app.Config.MinionDatabase,
Collection: app.Config.MinionCollection,
}
m, err := minion.New("runic", mcfg)
if err != nil {
return fae.Wrap(err, "creating minion")
}
// add something like the below line in app.Start() (before the workers are
// started) to subscribe to job notifications.
// minion sends notifications as jobs are processed and change status
// m.Subscribe(app.MinionNotification)
// an example of the subscription function and the basic setup instructions
// are included at the end of this file.
if err := minion.Register[*ParseActive](m, &ParseActive{}); err != nil {
return fae.Wrap(err, "registering worker: parse_active (ParseActive)")
}
if _, err := m.Schedule("0 */15 * * * *", &ParseActive{}); err != nil {
return fae.Wrap(err, "scheduling worker: parse_active (ParseActive)")
}
if err := minion.Register[*ParseIndexer](m, &ParseIndexer{}); err != nil {
return fae.Wrap(err, "registering worker: parse_indexer (ParseIndexer)")
}
if err := minion.Register[*ParseRift](m, &ParseRift{}); err != nil {
return fae.Wrap(err, "registering worker: parse_rift (ParseRift)")
}
if _, err := m.Schedule("0 */15 * * * *", &ParseRift{}); err != nil {
return fae.Wrap(err, "scheduling worker: parse_rift (ParseRift)")
}
if err := minion.Register[*ParseRiftAll](m, &ParseRiftAll{}); err != nil {
return fae.Wrap(err, "registering worker: parse_rift_all (ParseRiftAll)")
}
if err := minion.Register[*UpdateIndexes](m, &UpdateIndexes{}); err != nil {
return fae.Wrap(err, "registering worker: update_indexes (UpdateIndexes)")
}
app.Workers = m
return nil
}
// run the following commands to create the events channel and add the necessary models.
//
// > golem add event jobs event id job:*Minion
// > golem add model minion_attempt --struct started_at:time.Time duration:float64 status error 'stacktrace:[]string'
// > golem add model minion queue kind args status 'attempts:[]*MinionAttempt'
//
// then add a Connection configuration that points to the same database connection information
// as the minion database.
// // This allows you to notify other services as jobs change status.
//func (a *Application) MinionNotification(n *minion.Notification) {
// if n.JobID == "-" {
// return
// }
//
// j := &Minion{}
// err := app.DB.Minion.Find(n.JobID, j)
// if err != nil {
// log.Errorf("finding job: %s", err)
// return
// }
//
// if n.Event == "job:created" {
// events.Send("runic.jobs", &EventJob{"created", j.ID.Hex(), j})
// return
// }
// events.Send("runic.jobs", &EventJob{"updated", j.ID.Hex(), j})
//}