-
Notifications
You must be signed in to change notification settings - Fork 6
/
connection_factory.go
132 lines (104 loc) · 2.69 KB
/
connection_factory.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package database
import (
"fmt"
"strings"
"code.cloudfoundry.org/lager"
"database/sql"
"replication-canary/config"
"replication-canary/models"
"errors"
"github.com/go-sql-driver/mysql"
)
//go:generate counterfeiter . SwitchboardClient
type SwitchboardClient interface {
ActiveBackendHost() (string, error)
}
type ConnectionFactory struct {
switchboardClients []SwitchboardClient
logger lager.Logger
clusterIPs []string
port int
galeraHealthcheckPort int
canaryDatabase string
canaryUsername string
canaryPassword string
conns []*models.NamedConnection
OpenConn func(dsn string) (*sql.DB, error)
}
func NewConnectionFactoryFromConfig(
c *config.Config,
switchboardClients []SwitchboardClient,
logger lager.Logger,
) *ConnectionFactory {
return &ConnectionFactory{
switchboardClients: switchboardClients,
logger: logger,
clusterIPs: c.MySQL.ClusterIPs,
port: c.MySQL.Port,
galeraHealthcheckPort: c.MySQL.GaleraHealthcheckPort,
canaryDatabase: c.Canary.Database,
canaryUsername: c.Canary.Username,
canaryPassword: c.Canary.Password,
OpenConn: openConnection,
}
}
func (c *ConnectionFactory) Conns() ([]*models.NamedConnection, error) {
if len(c.conns) > 0 {
return c.conns, nil
}
var errs []error
var conns []*models.NamedConnection
for _, ip := range c.clusterIPs {
cfg := &mysql.Config{
User: c.canaryUsername,
Passwd: c.canaryPassword,
DBName: c.canaryDatabase,
Net: "tcp",
Addr: fmt.Sprintf("%s:%d", ip, c.port),
}
conn, err := c.OpenConn(cfg.FormatDSN())
conns = append(conns, &models.NamedConnection{
Name: cfg.Addr,
Connection: conn,
})
if err != nil {
errs = append(errs, err)
}
}
// Close all open connections if any of them errored, so we don't leak connections
if len(errs) > 0 {
for _, conn := range conns {
conn.Connection.Close()
}
return nil, errs[0]
}
c.conns = conns
return c.conns, nil
}
func (c *ConnectionFactory) WriteConn() (*models.NamedConnection, error) {
conns, err := c.Conns()
if err != nil {
return nil, err
}
var host string
for _, switchboardClient := range c.switchboardClients {
host, err = switchboardClient.ActiveBackendHost()
if host != "" {
break
}
}
if err != nil {
return nil, err
}
c.logger.Info("using write connection", lager.Data{"host": host})
hostPrefix := fmt.Sprintf("%s:", host)
for _, namedConn := range conns {
if strings.HasPrefix(namedConn.Name, hostPrefix) {
return namedConn, nil
}
}
return nil, errors.New("no connection found for active write host")
}
func openConnection(dsn string) (*sql.DB, error) {
return sql.Open("mysql", dsn)
}