forked from hashicorp/vault
/
sql.go
164 lines (133 loc) · 4.48 KB
/
sql.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package connutil
import (
"context"
"database/sql"
"fmt"
"strings"
"sync"
"time"
"github.com/hashicorp/errwrap"
"github.com/hashicorp/vault/helper/parseutil"
"github.com/hashicorp/vault/plugins/helper/database/dbutil"
"github.com/mitchellh/mapstructure"
)
var _ ConnectionProducer = &SQLConnectionProducer{}
// SQLConnectionProducer implements ConnectionProducer and provides a generic producer for most sql databases
type SQLConnectionProducer struct {
ConnectionURL string `json:"connection_url" mapstructure:"connection_url" structs:"connection_url"`
MaxOpenConnections int `json:"max_open_connections" mapstructure:"max_open_connections" structs:"max_open_connections"`
MaxIdleConnections int `json:"max_idle_connections" mapstructure:"max_idle_connections" structs:"max_idle_connections"`
MaxConnectionLifetimeRaw interface{} `json:"max_connection_lifetime" mapstructure:"max_connection_lifetime" structs:"max_connection_lifetime"`
Username string `json:"username" mapstructure:"username" structs:"username"`
Password string `json:"password" mapstructure:"password" structs:"password"`
Type string
RawConfig map[string]interface{}
maxConnectionLifetime time.Duration
Initialized bool
db *sql.DB
sync.Mutex
}
func (c *SQLConnectionProducer) Initialize(ctx context.Context, conf map[string]interface{}, verifyConnection bool) error {
_, err := c.Init(ctx, conf, verifyConnection)
return err
}
func (c *SQLConnectionProducer) Init(ctx context.Context, conf map[string]interface{}, verifyConnection bool) (map[string]interface{}, error) {
c.Lock()
defer c.Unlock()
c.RawConfig = conf
err := mapstructure.WeakDecode(conf, &c)
if err != nil {
return nil, err
}
if len(c.ConnectionURL) == 0 {
return nil, fmt.Errorf("connection_url cannot be empty")
}
c.ConnectionURL = dbutil.QueryHelper(c.ConnectionURL, map[string]string{
"username": c.Username,
"password": c.Password,
})
if c.MaxOpenConnections == 0 {
c.MaxOpenConnections = 2
}
if c.MaxIdleConnections == 0 {
c.MaxIdleConnections = c.MaxOpenConnections
}
if c.MaxIdleConnections > c.MaxOpenConnections {
c.MaxIdleConnections = c.MaxOpenConnections
}
if c.MaxConnectionLifetimeRaw == nil {
c.MaxConnectionLifetimeRaw = "0s"
}
c.maxConnectionLifetime, err = parseutil.ParseDurationSecond(c.MaxConnectionLifetimeRaw)
if err != nil {
return nil, errwrap.Wrapf("invalid max_connection_lifetime: {{err}}", err)
}
// Set initialized to true at this point since all fields are set,
// and the connection can be established at a later time.
c.Initialized = true
if verifyConnection {
if _, err := c.Connection(ctx); err != nil {
return nil, errwrap.Wrapf("error verifying connection: {{err}}", err)
}
if err := c.db.PingContext(ctx); err != nil {
return nil, errwrap.Wrapf("error verifying connection: {{err}}", err)
}
}
return c.RawConfig, nil
}
func (c *SQLConnectionProducer) Connection(ctx context.Context) (interface{}, error) {
if !c.Initialized {
return nil, ErrNotInitialized
}
// If we already have a DB, test it and return
if c.db != nil {
if err := c.db.PingContext(ctx); err == nil {
return c.db, nil
}
// If the ping was unsuccessful, close it and ignore errors as we'll be
// reestablishing anyways
c.db.Close()
}
// For mssql backend, switch to sqlserver instead
dbType := c.Type
if c.Type == "mssql" {
dbType = "sqlserver"
}
// Otherwise, attempt to make connection
conn := c.ConnectionURL
// Ensure timezone is set to UTC for all the connections
if strings.HasPrefix(conn, "postgres://") || strings.HasPrefix(conn, "postgresql://") {
if strings.Contains(conn, "?") {
conn += "&timezone=utc"
} else {
conn += "?timezone=utc"
}
}
var err error
c.db, err = sql.Open(dbType, conn)
if err != nil {
return nil, err
}
// Set some connection pool settings. We don't need much of this,
// since the request rate shouldn't be high.
c.db.SetMaxOpenConns(c.MaxOpenConnections)
c.db.SetMaxIdleConns(c.MaxIdleConnections)
c.db.SetConnMaxLifetime(c.maxConnectionLifetime)
return c.db, nil
}
func (c *SQLConnectionProducer) SecretValues() map[string]interface{} {
return map[string]interface{}{
c.Password: "[password]",
}
}
// Close attempts to close the connection
func (c *SQLConnectionProducer) Close() error {
// Grab the write lock
c.Lock()
defer c.Unlock()
if c.db != nil {
c.db.Close()
}
c.db = nil
return nil
}