-
Notifications
You must be signed in to change notification settings - Fork 0
/
utils.go
81 lines (67 loc) · 2.55 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package database
import (
"context"
"database/sql/driver"
"github.com/go-sql-driver/mysql"
"github.com/icinga/icinga-go-library/com"
"github.com/icinga/icinga-go-library/strcase"
"github.com/icinga/icinga-go-library/types"
"github.com/pkg/errors"
)
// CantPerformQuery wraps the given error with the specified query that cannot be executed.
func CantPerformQuery(err error, q string) error {
return errors.Wrapf(err, "can't perform %q", q)
}
// TableName returns the table of t.
func TableName(t interface{}) string {
if tn, ok := t.(TableNamer); ok {
return tn.TableName()
} else {
return strcase.Snake(types.Name(t))
}
}
// SplitOnDupId returns a state machine which tracks the inputs' IDs.
// Once an already seen input arrives, it demands splitting.
func SplitOnDupId[T IDer]() com.BulkChunkSplitPolicy[T] {
seenIds := map[string]struct{}{}
return func(ider T) bool {
id := ider.ID().String()
_, ok := seenIds[id]
if ok {
seenIds = map[string]struct{}{id: {}}
} else {
seenIds[id] = struct{}{}
}
return ok
}
}
// setGaleraOpts sets the "wsrep_sync_wait" variable for each session ensures that causality checks are performed
// before execution and that each statement is executed on a fully synchronized node. Doing so prevents foreign key
// violation when inserting into dependent tables on different MariaDB/MySQL nodes. When using MySQL single nodes,
// the "SET SESSION" command will fail with "Unknown system variable (1193)" and will therefore be silently dropped.
//
// https://mariadb.com/kb/en/galera-cluster-system-variables/#wsrep_sync_wait
func setGaleraOpts(ctx context.Context, conn driver.Conn, wsrepSyncWait int64) error {
const galeraOpts = "SET SESSION wsrep_sync_wait=?"
stmt, err := conn.(driver.ConnPrepareContext).PrepareContext(ctx, galeraOpts)
if err != nil {
if errors.Is(err, &mysql.MySQLError{Number: 1193}) { // Unknown system variable
return nil
}
return errors.Wrap(err, "cannot prepare "+galeraOpts)
}
// This is just for an unexpected exit and any returned error can safely be ignored and in case
// of the normal function exit, the stmt is closed manually, and its error is handled gracefully.
defer func() { _ = stmt.Close() }()
_, err = stmt.(driver.StmtExecContext).ExecContext(ctx, []driver.NamedValue{{Value: wsrepSyncWait}})
if err != nil {
return errors.Wrap(err, "cannot execute "+galeraOpts)
}
if err = stmt.Close(); err != nil {
return errors.Wrap(err, "cannot close prepared statement "+galeraOpts)
}
return nil
}
var (
_ com.BulkChunkSplitPolicyFactory[Entity] = SplitOnDupId[Entity]
)