forked from brocaar/chirpstack-network-server
/
db.go
152 lines (135 loc) · 4.09 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package common
import (
"database/sql"
"fmt"
"time"
"github.com/garyburd/redigo/redis"
"github.com/jmoiron/sqlx"
log "github.com/sirupsen/logrus"
// register postgresql driver
_ "github.com/lib/pq"
)
const (
redisMaxIdle = 3
redisIdleTimeoutSec = 240
redisDialReadTimeout = 60 * time.Second
redisDialWriteTimeout = 10 * time.Second
)
// NewRedisPool returns a new Redis connection pool.
func NewRedisPool(redisURL string) *redis.Pool {
return &redis.Pool{
MaxIdle: redisMaxIdle,
IdleTimeout: redisIdleTimeoutSec * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.DialURL(redisURL,
redis.DialReadTimeout(redisDialReadTimeout),
redis.DialWriteTimeout(redisDialWriteTimeout),
)
if err != nil {
return nil, fmt.Errorf("redis connection error: %s", err)
}
return c, err
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
if err != nil {
return fmt.Errorf("ping redis error: %s", err)
}
return nil
},
}
}
// OpenDatabase opens the database and performs a ping to make sure the
// database is up.
func OpenDatabase(dsn string) (*DBLogger, error) {
db, err := sqlx.Open("postgres", dsn)
if err != nil {
return nil, fmt.Errorf("database connection error: %s", err)
}
for {
if err := db.Ping(); err != nil {
log.Errorf("ping database error, will retry in 2s: %s", err)
time.Sleep(2 * time.Second)
} else {
break
}
}
return &DBLogger{db}, nil
}
// DBLogger is a DB wrapper which logs the executed sql queries and their
// duration.
type DBLogger struct {
*sqlx.DB
}
// Beginx returns a transaction with logging.
func (db *DBLogger) Beginx() (*TxLogger, error) {
tx, err := db.DB.Beginx()
return &TxLogger{tx}, err
}
// Query logs the queries executed by the Query method.
func (db *DBLogger) Query(query string, args ...interface{}) (*sql.Rows, error) {
start := time.Now()
rows, err := db.DB.Query(query, args...)
logQuery(query, time.Since(start), args...)
return rows, err
}
// Queryx logs the queries executed by the Queryx method.
func (db *DBLogger) Queryx(query string, args ...interface{}) (*sqlx.Rows, error) {
start := time.Now()
rows, err := db.DB.Queryx(query, args...)
logQuery(query, time.Since(start), args...)
return rows, err
}
// QueryRowx logs the queries executed by the QueryRowx method.
func (db *DBLogger) QueryRowx(query string, args ...interface{}) *sqlx.Row {
start := time.Now()
row := db.DB.QueryRowx(query, args...)
logQuery(query, time.Since(start), args...)
return row
}
// Exec logs the queries executed by the Exec method.
func (db *DBLogger) Exec(query string, args ...interface{}) (sql.Result, error) {
start := time.Now()
res, err := db.DB.Exec(query, args...)
logQuery(query, time.Since(start), args...)
return res, err
}
// TxLogger logs the executed sql queries and their duration.
type TxLogger struct {
*sqlx.Tx
}
// Query logs the queries executed by the Query method.
func (q *TxLogger) Query(query string, args ...interface{}) (*sql.Rows, error) {
start := time.Now()
rows, err := q.Tx.Query(query, args...)
logQuery(query, time.Since(start), args...)
return rows, err
}
// Queryx logs the queries executed by the Queryx method.
func (q *TxLogger) Queryx(query string, args ...interface{}) (*sqlx.Rows, error) {
start := time.Now()
rows, err := q.Tx.Queryx(query, args...)
logQuery(query, time.Since(start), args...)
return rows, err
}
// QueryRowx logs the queries executed by the QueryRowx method.
func (q *TxLogger) QueryRowx(query string, args ...interface{}) *sqlx.Row {
start := time.Now()
row := q.Tx.QueryRowx(query, args...)
logQuery(query, time.Since(start), args...)
return row
}
// Exec logs the queries executed by the Exec method.
func (q *TxLogger) Exec(query string, args ...interface{}) (sql.Result, error) {
start := time.Now()
res, err := q.Tx.Exec(query, args...)
logQuery(query, time.Since(start), args...)
return res, err
}
func logQuery(query string, duration time.Duration, args ...interface{}) {
log.WithFields(log.Fields{
"query": query,
"args": args,
"duration": duration,
}).Debug("sql query executed")
}