-
Notifications
You must be signed in to change notification settings - Fork 0
/
context.go
103 lines (88 loc) · 2.24 KB
/
context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package pgx
import (
"context"
"errors"
"time"
a "github.com/dalthon/ana"
pgx "github.com/jackc/pgx/v5"
)
var finishTrackedOperationQuery string = `
UPDATE ana.tracked_operations
SET
payload = @payload,
result = @result,
finished_at = NOW(),
status = 'finished',
error_message = NULL
WHERE
key = @key AND target = @target;
`
var failTrackedOperationQuery string = `
UPDATE ana.tracked_operations
SET
payload = @payload,
result = NULL,
finished_at = NOW(),
status = 'failed',
timeout = NOW(),
error_message = @error_message,
error_count = error_count + 1
WHERE
key = @key AND target = @target;
`
type PgxContext[P any, R any] struct {
outerTx pgx.Tx
Tx pgx.Tx
Context context.Context
}
func NewPgxContext[P any, R any](outerTx pgx.Tx, tx pgx.Tx, context context.Context) *PgxContext[P, R] {
return &PgxContext[P, R]{outerTx: outerTx, Tx: tx, Context: context}
}
func (ctx *PgxContext[P, R]) Success(operation *a.TrackedOperation[P, R]) {
if !operation.Expiration.IsZero() && time.Now().After(operation.Expiration) {
operation.Err = errors.New("Operation expired")
ctx.Fail(operation)
return
}
if commitErr := ctx.Tx.Commit(ctx.Context); commitErr != nil {
panic(commitErr)
}
_, err := ctx.outerTx.Exec(
ctx.Context,
finishTrackedOperationQuery,
pgx.NamedArgs{
"key": operation.Key,
"target": operation.Target,
"payload": serialize(operation.Payload),
"result": serialize(operation.Result),
},
)
if err != nil {
ctx.outerTx.Rollback(ctx.Context)
panic(err)
}
if commitErr := ctx.outerTx.Commit(ctx.Context); commitErr != nil {
panic(commitErr)
}
}
func (ctx *PgxContext[P, R]) Fail(operation *a.TrackedOperation[P, R]) {
if commitErr := ctx.Tx.Rollback(ctx.Context); commitErr != nil {
panic(commitErr)
}
_, err := ctx.outerTx.Exec(
ctx.Context,
failTrackedOperationQuery,
pgx.NamedArgs{
"key": operation.Key,
"target": operation.Target,
"payload": serialize(operation.Payload),
"error_message": operation.Err.Error(),
},
)
if err != nil {
panic(err)
}
if commitErr := ctx.outerTx.Commit(ctx.Context); commitErr != nil {
panic(commitErr)
}
}