-
Notifications
You must be signed in to change notification settings - Fork 335
/
connect.go
142 lines (117 loc) · 3.03 KB
/
connect.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package rdbms
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/cortezaproject/corteza/server/pkg/sentry"
"github.com/jmoiron/sqlx"
"go.uber.org/zap"
)
const (
// TxRetryHardLimit is the absolute maximum retries we'll allow
TxRetryHardLimit = 100
DefaultSliceCapacity = 1000
MinEnsureFetchLimit = 10
MaxRefetches = 100
MaxLimit = 1000
)
// Connect is rdbms' package connector
//
// Function is called from (store) driver's required connection function
// to open connection and bind it with
func Connect(ctx context.Context, log *zap.Logger, cfg *ConnConfig) (db *sqlx.DB, err error) {
var (
connErrCh = make(chan error, 1)
patience = time.Now().Add(cfg.ConnTryPatience)
base *sql.DB
)
log = log.Named("store")
if base, err = sql.Open(cfg.DriverName, cfg.DataSourceName); err != nil {
return
}
db = sqlx.NewDb(base, cfg.DriverName)
log.Debug(
"setting database connection parameters",
zap.Int("MaxOpenConns", cfg.MaxOpenConns),
zap.Duration("MaxLifetime", cfg.ConnMaxLifetime),
zap.Int("MaxIdleConns", cfg.MaxIdleConns),
// log DSN with masked username and password
zap.String("DSN", cfg.MaskedDSN),
)
db.SetMaxOpenConns(cfg.MaxOpenConns)
db.SetConnMaxLifetime(cfg.ConnMaxLifetime)
db.SetMaxIdleConns(cfg.MaxIdleConns)
go func() {
defer sentry.Recover()
var (
err error
try = 0
// Make a small adjustment when
// collecting callers from the callstack for this
log = log.WithOptions(zap.AddCallerSkip(-2))
)
for {
try++
if cfg.ConnTryMax <= try {
connErrCh <- fmt.Errorf("could not connect in %d tries", try)
return
}
if err = db.PingContext(ctx); err != nil {
if time.Now().After(patience) {
// don't make too much fuss
// if we're in patience mode
log.Warn(
"could not connect to the database",
zap.Error(err),
zap.Int("try", try),
zap.Float64("delay", cfg.ConnTryBackoffDelay.Seconds()),
)
}
select {
case <-ctx.Done():
// Forced break
break
case <-time.After(cfg.ConnTryBackoffDelay):
// Wait before next try
continue
}
}
log.Debug("connected to the database")
break
}
connErrCh <- err
}()
to := cfg.ConnTryTimeout * time.Duration(cfg.ConnTryMax*2)
select {
case err = <-connErrCh:
return
case <-time.After(to):
// Wait before next try
return nil, fmt.Errorf("timedout after %.2fs", to.Seconds())
case <-ctx.Done():
return nil, fmt.Errorf("connection cancelled")
}
}
// Connect is called from the adapter's Connect function
//
// It is intentionally not compatible with store.ConnectorFn
// and can not be used to register
//func Connect(ctx context.Context, cfg *ConnConfig) (s *Store, err error) {
// return s, func() error {
// if err = cfg.ParseExtra(); err != nil {
// return err
// }
//
// cfg.SetDefaults()
// s = &Store{
// config: cfg,
// //schema: ddl.SchemaAPI(s.DB(), ddl.NewCommonDialect()),
// }
//
// return s.Connect(ctx)
// }()
//}
func dbHealthcheck(db *sqlx.DB) func(ctx context.Context) error {
return db.PingContext
}