-
Notifications
You must be signed in to change notification settings - Fork 0
/
batch.go
109 lines (99 loc) · 2.31 KB
/
batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// Code generated by sqlc. DO NOT EDIT.
// versions:
// sqlc v1.24.0
// source: batch.go
package dbsqlc
import (
"context"
"errors"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgtype"
)
var (
ErrBatchAlreadyClosed = errors.New("batch already closed")
)
const createTask = `-- name: CreateTask :batchone
WITH
group_key_task AS (
INSERT INTO task_groups (
id,
group_key,
block_addr
) VALUES (
COALESCE((SELECT max(id) FROM task_groups), -1) + 1,
$3::text,
(SELECT max_assigned_block_addr FROM task_addr_ptrs)
) ON CONFLICT (group_key)
DO UPDATE SET
group_key = EXCLUDED.group_key,
block_addr = GREATEST(
task_groups.block_addr + 1,
(SELECT max_assigned_block_addr FROM task_addr_ptrs)
)
RETURNING id, group_key, block_addr
)
INSERT INTO tasks (
id,
created_at,
status,
args,
group_key
) VALUES (
(SELECT id FROM group_key_task) + 1024 * 1024 * (SELECT block_addr FROM group_key_task),
COALESCE($1::timestamp, now()),
'QUEUED',
COALESCE($2::jsonb, '{}'::jsonb),
$3::text
)
RETURNING id, created_at, status, args, group_key
`
type CreateTaskBatchResults struct {
br pgx.BatchResults
tot int
closed bool
}
type CreateTaskParams struct {
CreatedAt pgtype.Timestamp `json:"created_at"`
Args []byte `json:"args"`
GroupKey string `json:"group_key"`
}
func (q *Queries) CreateTask(ctx context.Context, db DBTX, arg []CreateTaskParams) *CreateTaskBatchResults {
batch := &pgx.Batch{}
for _, a := range arg {
vals := []interface{}{
a.CreatedAt,
a.Args,
a.GroupKey,
}
batch.Queue(createTask, vals...)
}
br := db.SendBatch(ctx, batch)
return &CreateTaskBatchResults{br, len(arg), false}
}
func (b *CreateTaskBatchResults) QueryRow(f func(int, *Task, error)) {
defer b.br.Close()
for t := 0; t < b.tot; t++ {
var i Task
if b.closed {
if f != nil {
f(t, nil, ErrBatchAlreadyClosed)
}
continue
}
row := b.br.QueryRow()
err := row.Scan(
&i.ID,
&i.CreatedAt,
&i.Status,
&i.Args,
&i.GroupKey,
)
if f != nil {
f(t, &i, err)
}
}
}
func (b *CreateTaskBatchResults) Close() error {
b.closed = true
return b.br.Close()
}