-
Notifications
You must be signed in to change notification settings - Fork 4
/
create_task.go
72 lines (54 loc) · 1.51 KB
/
create_task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package jobrunner
import (
"context"
"fmt"
"github.com/anti-raid/splashtail/jobs/tasks"
"github.com/anti-raid/splashtail/jobs/tasks/taskdef"
"github.com/jackc/pgx/v5/pgxpool"
)
// Sets up a task
func CreateTask(ctx context.Context, pool *pgxpool.Pool, task taskdef.TaskDefinition) (*string, error) {
taskName := task.Name()
taskFor := task.TaskFor()
_, ok := tasks.TaskDefinitionRegistry[task.Name()]
if !ok {
return nil, fmt.Errorf("task %s does not exist on registry", task.Name())
}
var taskId string
tx, err := pool.Begin(ctx)
if err != nil {
return nil, fmt.Errorf("failed to start transaction: %w", err)
}
defer tx.Rollback(ctx)
taskForStr, err := tasks.FormatTaskFor(taskFor)
if err != nil {
return nil, fmt.Errorf("failed to format task_for: %w", err)
}
err = tx.QueryRow(ctx, "INSERT INTO tasks (task_name, task_for, expiry, output, task_fields, resumable) VALUES ($1, $2, $3, $4, $5, $6) RETURNING task_id",
taskName,
taskForStr,
task.Expiry(),
nil,
task.TaskFields(),
task.Resumable(),
).Scan(&taskId)
if err != nil {
return nil, fmt.Errorf("failed to create task: %w", err)
}
// Add to ongoing_tasks
_, err = tx.Exec(
ctx,
"INSERT INTO ongoing_tasks (task_id, data, initial_opts) VALUES ($1, $2, $3)",
taskId,
map[string]any{},
task,
)
if err != nil {
return nil, fmt.Errorf("failed to add task to ongoing_tasks: %w", err)
}
err = tx.Commit(ctx)
if err != nil {
return nil, fmt.Errorf("failed to commit transaction: %w", err)
}
return &taskId, nil
}