/
sql.go
156 lines (134 loc) · 3.99 KB
/
sql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package golembic
import (
"context"
"database/sql"
"fmt"
"time"
)
// NOTE: Ensure that
// * `timeColumnPointer` satisfies `TimestampColumn`.
var (
_ TimestampColumn = (*TimeColumnPointer)(nil)
)
// readAllInt performs a SQL query and reads all rows into an `int` slice,
// under the assumption that a single (INTEGER) column is being returned for
// the query.
func readAllInt(ctx context.Context, tx *sql.Tx, query string, args ...interface{}) (result []int, err error) {
var rows *sql.Rows
defer func() {
err = rowsClose(rows, err)
}()
rows, err = tx.QueryContext(ctx, query, args...)
if err != nil {
return
}
for rows.Next() {
var value int
err = rows.Scan(&value)
if err != nil {
return
}
result = append(result, value)
}
return
}
// migrationFromQuery is intended to be used to construct a metadata row
// from a pair of values read off of a `sql.Rows`.
func migrationFromQuery(previous sql.NullString, revision string, createdAt time.Time) Migration {
if previous.Valid {
return Migration{Previous: previous.String, Revision: revision, createdAt: createdAt}
}
// Handle NULL.
return Migration{Revision: revision, createdAt: createdAt}
}
// readAllMigration performs a SQL query and reads all rows into a
// `Migration` slice, under the assumption that three columns -- revision,
// previous and created_at -- are being returned for the query (in that order).
// For example, the query
//
// SELECT revision, previous, created_at FROM golembic_migrations
//
// would satisfy this. A more "focused" query would return the latest migration
// applied
//
// SELECT
// revision,
// previous,
// created_at
// FROM
// golembic_migrations
// ORDER BY
// serial_id DESC
// LIMIT 1
func readAllMigration(ctx context.Context, tx *sql.Tx, query string, createdAt TimestampColumn, args ...interface{}) (result []Migration, err error) {
var rows *sql.Rows
defer func() {
err = rowsClose(rows, err)
}()
rows, err = tx.QueryContext(ctx, query, args...)
if err != nil {
return
}
var revision string
var previous sql.NullString
for rows.Next() {
err = rows.Scan(&revision, &previous, createdAt.Pointer())
if err != nil {
return
}
result = append(result, migrationFromQuery(previous, revision, createdAt.Timestamp()))
}
return
}
// rowsClose is intended to be used in `defer` blocks to ensure that a SQL
// query `Rows` iterator is always closed after being consumed (or abandonded
// during iteration).
func rowsClose(rows *sql.Rows, err error) error {
if rows == nil {
return err
}
closeErr := rows.Close()
return maybeWrap(err, closeErr, "failed to close rows")
}
// txFinalize is intended to be used in `defer` blocks to ensure that a SQL
// transaction is always rolled back after being started. In cases when the
// transaction was successfully committed, the rollback will fail with
// `sql.ErrTxDone` which will be ignored here.
func txFinalize(tx *sql.Tx, err error) error {
if tx == nil {
return err
}
rollbackErr := ignoreTxDone(tx.Rollback())
return maybeWrap(err, rollbackErr, "failed to rollback transaction")
}
// ignoreTxDone converts a `sql.ErrTxDone` error to `nil` (and leaves alone
// all other errors).
func ignoreTxDone(err error) error {
if err == sql.ErrTxDone {
return nil
}
return err
}
// maybeWrap attempts to wrap a secondary error inside a primary one. If
// one (or both) of the errors if `nil`, then no wrapping is necessary.
func maybeWrap(primary, secondary error, message string) error {
if primary == nil {
return secondary
}
if secondary == nil {
return primary
}
return fmt.Errorf("%w; %s: %v", primary, message, secondary)
}
// TimeColumnPointer provides the default implementation of `TimestampColumn`.
type TimeColumnPointer struct {
Stored time.Time
}
// Pointer returns a pointer to the stored timestamp value.
func (tcp *TimeColumnPointer) Pointer() interface{} {
return &tcp.Stored
}
// Timestamp returns the stored timestamp value.
func (tcp TimeColumnPointer) Timestamp() time.Time {
return tcp.Stored
}