forked from smartcontractkit/chainlink
/
transaction.go
129 lines (114 loc) · 4.01 KB
/
transaction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package postgres
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/pkg/errors"
"github.com/smartcontractkit/sqlx"
"gorm.io/gorm"
)
// NOTE: In an ideal world the timeouts below would be set to something sane in
// the postgres configuration by the user. Since we do not live in an ideal
// world, it is necessary to override them here.
//
// They cannot easily be set at a session level due to how Go's connection
// pooling works.
const (
// LockTimeout controls the max time we will wait for any kind of database lock.
// It's good to set this to _something_ because waiting for locks forever is really bad.
LockTimeout = 15 * time.Second
// IdleInTxSessionTimeout controls the max time we leave a transaction open and idle.
// It's good to set this to _something_ because leaving transactions open forever is really bad.
IdleInTxSessionTimeout = 1 * time.Hour
)
var (
ErrNoDeadlineSet = errors.New("no deadline set")
)
// WARNING: Only use for nested txes inside ORM methods where you expect db to already have a ctx with a deadline.
func GormTransactionWithoutContext(db *gorm.DB, fc func(tx *gorm.DB) error, txOptss ...sql.TxOptions) (err error) {
var txOpts sql.TxOptions
if len(txOptss) > 0 {
txOpts = txOptss[0]
} else {
txOpts = DefaultSqlTxOptions
}
return db.Transaction(func(tx *gorm.DB) error {
err = tx.Exec(fmt.Sprintf(`SET LOCAL lock_timeout = %v; SET LOCAL idle_in_transaction_session_timeout = %v;`, LockTimeout.Milliseconds(), IdleInTxSessionTimeout.Milliseconds())).Error
if err != nil {
return errors.Wrap(err, "error setting transaction timeouts")
}
return fc(tx)
}, &txOpts)
}
// DEPRECATED: Use the transaction manager instead.
func GormTransaction(ctx context.Context, db *gorm.DB, fc func(tx *gorm.DB) error, txOptss ...sql.TxOptions) (err error) {
var txOpts sql.TxOptions
if len(txOptss) > 0 {
txOpts = txOptss[0]
} else {
txOpts = DefaultSqlTxOptions
}
if _, set := ctx.Deadline(); !set {
return ErrNoDeadlineSet
}
return db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
err = tx.Exec(fmt.Sprintf(`SET LOCAL lock_timeout = %v; SET LOCAL idle_in_transaction_session_timeout = %v;`, LockTimeout.Milliseconds(), IdleInTxSessionTimeout.Milliseconds())).Error
if err != nil {
return errors.Wrap(err, "error setting transaction timeouts")
}
return fc(tx)
}, &txOpts)
}
// DEPRECATED: Use the transaction manager instead.
func GormTransactionWithDefaultContext(db *gorm.DB, fc func(tx *gorm.DB) error, txOptss ...sql.TxOptions) error {
var txOpts sql.TxOptions
if len(txOptss) > 0 {
txOpts = txOptss[0]
} else {
txOpts = DefaultSqlTxOptions
}
ctx, cancel := DefaultQueryCtx()
defer cancel()
err := db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
err := tx.Exec(fmt.Sprintf(`SET LOCAL lock_timeout = %v; SET LOCAL idle_in_transaction_session_timeout = %v;`, LockTimeout.Milliseconds(), IdleInTxSessionTimeout.Milliseconds())).Error
if err != nil {
return errors.Wrap(err, "error setting transaction timeouts")
}
return fc(tx)
}, &txOpts)
return err
}
func DBWithDefaultContext(db *gorm.DB, fc func(db *gorm.DB) error) error {
ctx, cancel := DefaultQueryCtx()
defer cancel()
return fc(db.WithContext(ctx))
}
func SqlTransaction(ctx context.Context, rdb *sql.DB, fc func(tx *sqlx.Tx) error, txOpts ...sql.TxOptions) (err error) {
opts := &DefaultSqlTxOptions
if len(txOpts) > 0 {
opts = &txOpts[0]
}
db := WrapDbWithSqlx(rdb)
tx, err := db.BeginTxx(ctx, opts)
panicked := false
defer func() {
// Make sure to rollback when panic, Block error or Commit error
if panicked || err != nil {
if perr := tx.Rollback(); perr != nil {
panic(perr)
}
}
}()
_, err = tx.Exec(fmt.Sprintf(`SET LOCAL lock_timeout = %v; SET LOCAL idle_in_transaction_session_timeout = %v;`, LockTimeout.Milliseconds(), IdleInTxSessionTimeout.Milliseconds()))
if err != nil {
return errors.Wrap(err, "error setting transaction timeouts")
}
panicked = true
err = fc(tx)
panicked = false
if err == nil {
err = errors.WithStack(tx.Commit())
}
return
}