-
Notifications
You must be signed in to change notification settings - Fork 245
/
connection.go
124 lines (101 loc) · 3.21 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package mysql
import (
"context"
"database/sql/driver"
"fmt"
"strconv"
"time"
log "github.com/authzed/spicedb/internal/logging"
"github.com/prometheus/client_golang/prometheus"
)
var (
connectHistogram = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "spicedb",
Subsystem: "datastore",
Name: "mysql_connect_duration",
Help: "distribution in seconds of time spent opening a new MySQL connection.",
Buckets: []float64{0.01, 0.1, 0.5, 1, 5, 10, 25, 60, 120},
})
connectCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "spicedb",
Subsystem: "datastore",
Name: "mysql_connect_count_total",
Help: "number of mysql connections opened.",
}, []string{"success"})
)
// instrumentedConnector wraps the default MySQL driver connector
// to get metrics and tracing when creating a new connection
type instrumentedConnector struct {
conn driver.Connector
drv driver.Driver
}
func (d *instrumentedConnector) Connect(ctx context.Context) (driver.Conn, error) {
ctx, span := tracer.Start(ctx, "openMySQLConnection")
defer span.End()
startTime := time.Now()
defer func() {
connectHistogram.Observe(time.Since(startTime).Seconds())
}()
conn, err := d.conn.Connect(ctx)
connectCount.WithLabelValues(strconv.FormatBool(err == nil)).Inc()
if err != nil {
span.RecordError(err)
log.Ctx(ctx).Error().Err(err).Msg("failed to open mysql connection")
return nil, fmt.Errorf("failed to open connection to mysql: %w", err)
}
return conn, nil
}
func (d *instrumentedConnector) Driver() driver.Driver {
return d.drv
}
func instrumentConnector(c driver.Connector) (driver.Connector, error) {
err := prometheus.Register(connectHistogram)
if err != nil {
return nil, fmt.Errorf("unable to register metric: %w", err)
}
err = prometheus.Register(connectCount)
if err != nil {
return nil, fmt.Errorf("unable to register metric: %w", err)
}
return &instrumentedConnector{
conn: c,
drv: c.Driver(),
}, nil
}
type sessionVariableConnector struct {
conn driver.Connector
drv driver.Driver
statements []string
}
func (s *sessionVariableConnector) Connect(ctx context.Context) (driver.Conn, error) {
ctx, span := tracer.Start(ctx, "setSessionVariables")
defer span.End()
conn, err := s.conn.Connect(ctx)
if err != nil {
span.RecordError(err)
log.Ctx(ctx).Error().Err(err).Msg("failed to open db connection")
return nil, fmt.Errorf("failed to open connection to db: %w", err)
}
// The go mysql driver implements the ExecerContext interface, assert that here.
execConn := conn.(driver.ExecerContext)
for _, stmt := range s.statements {
if _, err := execConn.ExecContext(ctx, stmt, nil); err != nil {
return nil, fmt.Errorf("unable to execute statement `%s`: %w", stmt, err)
}
}
return conn, nil
}
func (s *sessionVariableConnector) Driver() driver.Driver {
return s.drv
}
func addSessionVariables(c driver.Connector, variables map[string]string) (driver.Connector, error) {
statements := make([]string, 0, len(variables))
for sessionVar, value := range variables {
statements = append(statements, "SET SESSION "+sessionVar+"="+value)
}
return &sessionVariableConnector{
conn: c,
drv: c.Driver(),
statements: statements,
}, nil
}