-
Notifications
You must be signed in to change notification settings - Fork 5
/
db.go
456 lines (408 loc) · 15.1 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
/*
Copyright 2014 Outbrain Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package db
import (
"database/sql"
"errors"
"fmt"
"regexp"
"strings"
"sync"
"time"
"github.com/openark/golib/log"
"github.com/openark/golib/sqlutils"
"github.com/openark/orchestrator/go/config"
)
const dsnRegexp = `^([^:]+)(:.*)?(@(socket|tcp)\(.+\)/.*)$`
var (
EmptyArgs []interface{}
ErrNoMatch = errors.New("no match")
)
var mysqlURI string
var dbMutex sync.Mutex
type DummySqlResult struct {
}
func (this DummySqlResult) LastInsertId() (int64, error) {
return 0, nil
}
func (this DummySqlResult) RowsAffected() (int64, error) {
return 1, nil
}
func getMySQLURI() string {
dbMutex.Lock()
defer dbMutex.Unlock()
if mysqlURI != "" {
return mysqlURI
}
mysqlURI := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?timeout=%ds&readTimeout=%ds&rejectReadOnly=%t&interpolateParams=true",
config.Config.MySQLOrchestratorUser,
config.Config.MySQLOrchestratorPassword,
config.Config.MySQLOrchestratorHost,
config.Config.MySQLOrchestratorPort,
config.Config.MySQLOrchestratorDatabase,
config.Config.MySQLConnectTimeoutSeconds,
config.Config.MySQLOrchestratorReadTimeoutSeconds,
config.Config.MySQLOrchestratorRejectReadOnly,
)
if config.Config.MySQLOrchestratorUseMutualTLS {
mysqlURI, _ = SetupMySQLOrchestratorTLS(mysqlURI)
}
return mysqlURI
}
// OpenDiscovery returns a DB instance to access a topology instance.
// It has lower read timeout than OpenTopology and is intended to
// be used with low-latency discovery queries.
func OpenDiscovery(host string, port int) (*sql.DB, error) {
return openTopology(host, port, config.Config.MySQLDiscoveryReadTimeoutSeconds)
}
// OpenTopology returns a DB instance to access a topology instance.
func OpenTopology(host string, port int) (*sql.DB, error) {
return openTopology(host, port, config.Config.MySQLTopologyReadTimeoutSeconds)
}
func openTopology(host string, port int, readTimeout int) (db *sql.DB, err error) {
mysql_uri := fmt.Sprintf("%s:%s@tcp(%s:%d)/?timeout=%ds&readTimeout=%ds&interpolateParams=true",
config.Config.MySQLTopologyUser,
config.Config.MySQLTopologyPassword,
host, port,
config.Config.MySQLConnectTimeoutSeconds,
readTimeout,
)
if config.Config.MySQLTopologyUseMutualTLS ||
(config.Config.MySQLTopologyUseMixedTLS && requiresTLS(host, port, mysql_uri)) {
if mysql_uri, err = SetupMySQLTopologyTLS(mysql_uri); err != nil {
return nil, err
}
}
if db, _, err = sqlutils.GetDB(mysql_uri); err != nil {
return nil, err
}
if config.Config.MySQLConnectionLifetimeSeconds > 0 {
db.SetConnMaxLifetime(time.Duration(config.Config.MySQLConnectionLifetimeSeconds) * time.Second)
}
db.SetMaxOpenConns(config.MySQLTopologyMaxPoolConnections)
db.SetMaxIdleConns(config.MySQLTopologyMaxPoolConnections)
return db, err
}
func openOrchestratorMySQLGeneric() (db *sql.DB, fromCache bool, err error) {
uri := fmt.Sprintf("%s:%s@tcp(%s:%d)/?timeout=%ds&readTimeout=%ds&interpolateParams=true",
config.Config.MySQLOrchestratorUser,
config.Config.MySQLOrchestratorPassword,
config.Config.MySQLOrchestratorHost,
config.Config.MySQLOrchestratorPort,
config.Config.MySQLConnectTimeoutSeconds,
config.Config.MySQLOrchestratorReadTimeoutSeconds,
)
if config.Config.MySQLOrchestratorUseMutualTLS {
uri, _ = SetupMySQLOrchestratorTLS(uri)
}
return sqlutils.GetDB(uri)
}
func IsSQLite() bool {
return config.Config.IsSQLite()
}
func isInMemorySQLite() bool {
return config.Config.IsSQLite() && strings.Contains(config.Config.SQLite3DataFile, ":memory:")
}
// matchDSN tries to match the DSN or returns an error
func matchDSN(dsn string) (string, error) {
re := regexp.MustCompile(dsnRegexp)
matches := re.FindStringSubmatch(dsn)
if matches == nil {
return "", ErrNoMatch
}
// matching dsn so printout excluding the password
// - if no password is provided the lack of a password will be visible!
maskedPass := ""
if len(matches[2]) > 0 {
maskedPass = `:?`
}
return matches[1] + maskedPass + matches[3], nil
}
// safeMySQLURI returns a version of the dsn without showing the password so it can be logged safely.
// - if we are unable to correctly match the provided dsn we build an incomplete one based on some of the settings.
func safeMySQLURI(dsn string) string {
if safeDSN, err := matchDSN(dsn); err == nil {
return safeDSN
}
// Fallback to use an incomplete dsn which may be good enough.
// Do not show the password but do show what we connect to.
return fmt.Sprintf("%s:?@tcp(%s:%d)/%s?timeout=%ds (incomplete dsn!)",
config.Config.MySQLOrchestratorUser,
config.Config.MySQLOrchestratorHost,
config.Config.MySQLOrchestratorPort,
config.Config.MySQLOrchestratorDatabase,
config.Config.MySQLConnectTimeoutSeconds)
}
// OpenTopology returns the DB instance for the orchestrator backed database
func OpenOrchestrator() (db *sql.DB, err error) {
var fromCache bool
if IsSQLite() {
db, fromCache, err = sqlutils.GetSQLiteDB(config.Config.SQLite3DataFile)
if err == nil && !fromCache {
log.Debugf("Connected to orchestrator backend: sqlite on %v", config.Config.SQLite3DataFile)
}
db.SetMaxOpenConns(1)
db.SetMaxIdleConns(1)
} else {
if db, fromCache, err := openOrchestratorMySQLGeneric(); err != nil {
return db, log.Errore(err)
} else if !fromCache {
// first time ever we talk to MySQL
query := fmt.Sprintf("create database if not exists %s", config.Config.MySQLOrchestratorDatabase)
if _, err := db.Exec(query); err != nil {
return db, log.Errore(err)
}
}
dsn := getMySQLURI()
db, fromCache, err = sqlutils.GetDB(dsn)
if err == nil && !fromCache {
log.Debugf("Connected to orchestrator backend: %v", safeMySQLURI(dsn))
if config.Config.MySQLOrchestratorMaxPoolConnections > 0 {
log.Debugf("Orchestrator pool SetMaxOpenConns: %d", config.Config.MySQLOrchestratorMaxPoolConnections)
db.SetMaxOpenConns(config.Config.MySQLOrchestratorMaxPoolConnections)
}
if config.Config.MySQLConnectionLifetimeSeconds > 0 {
db.SetConnMaxLifetime(time.Duration(config.Config.MySQLConnectionLifetimeSeconds) * time.Second)
}
}
}
if err == nil && !fromCache {
if !config.Config.SkipOrchestratorDatabaseUpdate {
initOrchestratorDB(db)
}
// A low value here will trigger reconnects which could
// make the number of backend connections hit the tcp
// limit. That's bad. I could make this setting dynamic
// but then people need to know which value to use. For now
// allow up to 25% of MySQLOrchestratorMaxPoolConnections
// to be idle. That should provide a good number which
// does not keep the maximum number of connections open but
// at the same time does not trigger disconnections and
// reconnections too frequently.
maxIdleConns := int(config.Config.MySQLOrchestratorMaxPoolConnections * 25 / 100)
if maxIdleConns < 10 {
maxIdleConns = 10
}
log.Infof("Connecting to backend %s:%d: maxConnections: %d, maxIdleConns: %d",
config.Config.MySQLOrchestratorHost,
config.Config.MySQLOrchestratorPort,
config.Config.MySQLOrchestratorMaxPoolConnections,
maxIdleConns)
db.SetMaxIdleConns(maxIdleConns)
}
return db, err
}
func translateStatement(statement string) (string, error) {
if IsSQLite() {
statement = sqlutils.ToSqlite3Dialect(statement)
}
return statement, nil
}
// versionIsDeployed checks if given version has already been deployed
func versionIsDeployed(db *sql.DB) (result bool, err error) {
query := `
select
count(*) as is_deployed
from
orchestrator_db_deployments
where
deployed_version = ?
`
err = db.QueryRow(query, config.RuntimeCLIFlags.ConfiguredVersion).Scan(&result)
// err means the table 'orchestrator_db_deployments' does not even exist, in which case we proceed
// to deploy.
// If there's another error to this, like DB gone bad, then we're about to find out anyway.
return result, err
}
// registerOrchestratorDeployment updates the orchestrator_metadata table upon successful deployment
func registerOrchestratorDeployment(db *sql.DB) error {
query := `
replace into orchestrator_db_deployments (
deployed_version, deployed_timestamp
) values (
?, NOW()
)
`
if _, err := execInternal(db, query, config.RuntimeCLIFlags.ConfiguredVersion); err != nil {
log.Fatalf("Unable to write to orchestrator_metadata: %+v", err)
}
log.Debugf("Migrated database schema to version [%+v]", config.RuntimeCLIFlags.ConfiguredVersion)
return nil
}
// deployStatements will issue given sql queries that are not already known to be deployed.
// This iterates both lists (to-run and already-deployed) and also verifies no contraditions.
func deployStatements(db *sql.DB, queries []string) error {
tx, err := db.Begin()
if err != nil {
log.Fatale(err)
}
// Ugly workaround ahead.
// Origin of this workaround is the existence of some "timestamp NOT NULL," column definitions,
// where in NO_ZERO_IN_DATE,NO_ZERO_DATE sql_mode are invalid (since default is implicitly "0")
// This means installation of orchestrator fails on such configured servers, and in particular on 5.7
// where this setting is the dfault.
// For purpose of backwards compatability, what we do is force sql_mode to be more relaxed, create the schemas
// along with the "invalid" definition, and then go ahead and fix those definitions via following ALTER statements.
// My bad.
originalSqlMode := ""
if config.Config.IsMySQL() {
err = tx.QueryRow(`select @@session.sql_mode`).Scan(&originalSqlMode)
if _, err := tx.Exec(`set @@session.sql_mode=REPLACE(@@session.sql_mode, 'NO_ZERO_DATE', '')`); err != nil {
log.Fatale(err)
}
if _, err := tx.Exec(`set @@session.sql_mode=REPLACE(@@session.sql_mode, 'NO_ZERO_IN_DATE', '')`); err != nil {
log.Fatale(err)
}
}
for i, query := range queries {
if i == 0 {
//log.Debugf("sql_mode is: %+v", originalSqlMode)
}
query, err := translateStatement(query)
if err != nil {
return log.Fatalf("Cannot initiate orchestrator: %+v; query=%+v", err, query)
}
if _, err := tx.Exec(query); err != nil {
if strings.Contains(err.Error(), "syntax error") {
return log.Fatalf("Cannot initiate orchestrator: %+v; query=%+v", err, query)
}
if !sqlutils.IsAlterTable(query) && !sqlutils.IsCreateIndex(query) && !sqlutils.IsDropIndex(query) {
return log.Fatalf("Cannot initiate orchestrator: %+v; query=%+v", err, query)
}
if !strings.Contains(err.Error(), "duplicate column name") &&
!strings.Contains(err.Error(), "Duplicate column name") &&
!strings.Contains(err.Error(), "check that column/key exists") &&
!strings.Contains(err.Error(), "already exists") &&
!strings.Contains(err.Error(), "Duplicate key name") {
log.Errorf("Error initiating orchestrator: %+v; query=%+v", err, query)
}
}
}
if config.Config.IsMySQL() {
if _, err := tx.Exec(`set session sql_mode=?`, originalSqlMode); err != nil {
log.Fatale(err)
}
}
if err := tx.Commit(); err != nil {
log.Fatale(err)
}
return nil
}
// initOrchestratorDB attempts to create/upgrade the orchestrator backend database. It is created once in the
// application's lifetime.
func initOrchestratorDB(db *sql.DB) error {
log.Debug("Initializing orchestrator")
versionAlreadyDeployed, err := versionIsDeployed(db)
if versionAlreadyDeployed && config.RuntimeCLIFlags.ConfiguredVersion != "" && err == nil {
// Already deployed with this version
return nil
}
if config.Config.PanicIfDifferentDatabaseDeploy && config.RuntimeCLIFlags.ConfiguredVersion != "" && !versionAlreadyDeployed {
log.Fatalf("PanicIfDifferentDatabaseDeploy is set. Configured version %s is not the version found in the database", config.RuntimeCLIFlags.ConfiguredVersion)
}
log.Debugf("Migrating database schema")
deployStatements(db, generateSQLBase)
deployStatements(db, generateSQLPatches)
registerOrchestratorDeployment(db)
if IsSQLite() {
ExecOrchestrator(`PRAGMA journal_mode = WAL`)
ExecOrchestrator(`PRAGMA synchronous = NORMAL`)
}
return nil
}
// execInternal
func execInternal(db *sql.DB, query string, args ...interface{}) (sql.Result, error) {
var err error
query, err = translateStatement(query)
if err != nil {
return nil, err
}
res, err := sqlutils.ExecNoPrepare(db, query, args...)
return res, err
}
// ExecOrchestrator will execute given query on the orchestrator backend database.
func ExecOrchestrator(query string, args ...interface{}) (sql.Result, error) {
var err error
query, err = translateStatement(query)
if err != nil {
return nil, err
}
db, err := OpenOrchestrator()
if err != nil {
return nil, err
}
res, err := sqlutils.ExecNoPrepare(db, query, args...)
return res, err
}
// QueryRowsMapOrchestrator
func QueryOrchestratorRowsMap(query string, on_row func(sqlutils.RowMap) error) error {
query, err := translateStatement(query)
if err != nil {
return log.Fatalf("Cannot query orchestrator: %+v; query=%+v", err, query)
}
db, err := OpenOrchestrator()
if err != nil {
return err
}
return sqlutils.QueryRowsMap(db, query, on_row)
}
// QueryOrchestrator
func QueryOrchestrator(query string, argsArray []interface{}, on_row func(sqlutils.RowMap) error) error {
query, err := translateStatement(query)
if err != nil {
return log.Fatalf("Cannot query orchestrator: %+v; query=%+v", err, query)
}
db, err := OpenOrchestrator()
if err != nil {
return err
}
return log.Criticale(sqlutils.QueryRowsMap(db, query, on_row, argsArray...))
}
// QueryOrchestratorRowsMapBuffered
func QueryOrchestratorRowsMapBuffered(query string, on_row func(sqlutils.RowMap) error) error {
query, err := translateStatement(query)
if err != nil {
return log.Fatalf("Cannot query orchestrator: %+v; query=%+v", err, query)
}
db, err := OpenOrchestrator()
if err != nil {
return err
}
return sqlutils.QueryRowsMapBuffered(db, query, on_row)
}
// QueryOrchestratorBuffered
func QueryOrchestratorBuffered(query string, argsArray []interface{}, on_row func(sqlutils.RowMap) error) error {
query, err := translateStatement(query)
if err != nil {
return log.Fatalf("Cannot query orchestrator: %+v; query=%+v", err, query)
}
db, err := OpenOrchestrator()
if err != nil {
return err
}
if argsArray == nil {
argsArray = EmptyArgs
}
return log.Criticale(sqlutils.QueryRowsMapBuffered(db, query, on_row, argsArray...))
}
// ReadTimeNow reads and returns the current timestamp as string. This is an unfortunate workaround
// to support both MySQL and SQLite in all possible timezones. SQLite only speaks UTC where MySQL has
// timezone support. By reading the time as string we get the database's de-facto notion of the time,
// which we can then feed back to it.
func ReadTimeNow() (timeNow string, err error) {
err = QueryOrchestrator(`select now() as time_now`, nil, func(m sqlutils.RowMap) error {
timeNow = m.GetString("time_now")
return nil
})
return timeNow, err
}