-
-
Notifications
You must be signed in to change notification settings - Fork 4
/
scheduler.go
139 lines (118 loc) · 3.24 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package sqlite
import (
"context"
"embed"
"time"
"github.com/YuukanOO/seelf/pkg/bus"
"github.com/YuukanOO/seelf/pkg/id"
"github.com/YuukanOO/seelf/pkg/monad"
"github.com/YuukanOO/seelf/pkg/storage"
"github.com/YuukanOO/seelf/pkg/storage/sqlite"
"github.com/YuukanOO/seelf/pkg/storage/sqlite/builder"
)
var (
//go:embed migrations/*.sql
migrations embed.FS
migrationsModule = sqlite.NewMigrationsModule("scheduler", "migrations", migrations)
)
const retryDelay = 15 * time.Second
type (
job struct {
id string
msg bus.Request
policy bus.JobErrPolicy
}
scheduler struct {
db *sqlite.Database
}
)
func (j *job) ID() string { return j.id }
func (j *job) Message() bus.Request { return j.msg }
func (j *job) Policy() bus.JobErrPolicy { return j.policy }
func NewSchedulerAdapter(db *sqlite.Database) bus.SchedulerAdapter {
return &scheduler{db}
}
// Setup the scheduler adapter, migrate the database and reset running jobs by marking
// them as not retrieved so they will be picked up next time GetNextPendingJobs is called.
// You MUST call this method at the application startup.
func (s *scheduler) Setup() error {
if err := s.db.Migrate(migrationsModule); err != nil {
return err
}
return builder.
Update("scheduled_jobs", builder.Values{
"retrieved": false,
}).
F("WHERE retrieved = true").
Exec(s.db, context.Background())
}
func (s *scheduler) Create(
ctx context.Context,
msg bus.Request,
dedupeName monad.Maybe[string],
policy bus.JobErrPolicy,
) error {
jobId := id.New[string]()
return builder.
Insert("scheduled_jobs", builder.Values{
"id": jobId,
"dedupe_name": dedupeName.Get(jobId), // Default to the job id if no dedupe
"message_name": msg.Name_(),
"message_data": msg,
"queued_at": time.Now().UTC(),
"policy": policy,
"retrieved": false,
}).
Exec(s.db, ctx)
}
func (s *scheduler) GetNextPendingJobs(ctx context.Context) ([]bus.ScheduledJob, error) {
// This query will lock the database to make sure we can't retrieved the same job twice.
return builder.
Query[bus.ScheduledJob](`
UPDATE scheduled_jobs
SET retrieved = true
WHERE id IN (SELECT id FROM (
SELECT id, min(queued_at) FROM scheduled_jobs
WHERE
retrieved = false
AND queued_at <= DATETIME('now')
AND dedupe_name NOT IN (SELECT DISTINCT dedupe_name FROM scheduled_jobs WHERE retrieved = true)
GROUP BY dedupe_name
)
)
RETURNING id, message_name, message_data, policy`).
All(s.db, ctx, jobMapper)
}
func (s *scheduler) Retry(ctx context.Context, j bus.ScheduledJob, err error) error {
return builder.
Update("scheduled_jobs", builder.Values{
"errcode": err.Error(),
"queued_at": time.Now().Add(retryDelay).UTC(),
"retrieved": false,
}).
F("WHERE id = ?", j.ID()).
Exec(s.db, ctx)
}
func (s *scheduler) Done(ctx context.Context, j bus.ScheduledJob) error {
return builder.
Command("DELETE FROM scheduled_jobs WHERE id = ?", j.ID()).
Exec(s.db, ctx)
}
func jobMapper(scanner storage.Scanner) (bus.ScheduledJob, error) {
var (
j job
msgName string
msgData string
)
err := scanner.Scan(
&j.id,
&msgName,
&msgData,
&j.policy,
)
if err != nil {
return &j, err
}
j.msg, err = bus.Marshallable.From(msgName, msgData)
return &j, err
}