-
Notifications
You must be signed in to change notification settings - Fork 878
/
connector.go
124 lines (89 loc) · 2.08 KB
/
connector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package mysql
import (
"database/sql"
"github.com/davyxu/cellnet"
"github.com/davyxu/cellnet/peer"
"github.com/go-sql-driver/mysql"
"sync"
"time"
)
type mysqlConnector struct {
peer.CorePeerProperty
peer.CoreContextSet
peer.CoreSQLParameter
db *sql.DB
dbGuard sync.RWMutex
reconDur time.Duration
}
func (self *mysqlConnector) IsReady() bool {
return self.dbConn() != nil
}
func (self *mysqlConnector) Raw() interface{} {
return self.dbConn()
}
func (self *mysqlConnector) Operate(callback func(client interface{}) interface{}) interface{} {
return callback(self.dbConn())
}
func (self *mysqlConnector) dbConn() *sql.DB {
self.dbGuard.RLock()
defer self.dbGuard.RUnlock()
return self.db
}
func (self *mysqlConnector) TypeName() string {
return "mysql.Connector"
}
func (self *mysqlConnector) Start() cellnet.Peer {
for {
self.tryConnect()
if self.reconDur == 0 || self.IsReady() {
break
}
time.Sleep(self.reconDur)
}
return self
}
func (self *mysqlConnector) ReconnectDuration() time.Duration {
return self.reconDur
}
func (self *mysqlConnector) SetReconnectDuration(v time.Duration) {
self.reconDur = v
}
func (self *mysqlConnector) tryConnect() {
config, err := mysql.ParseDSN(self.Address())
if err != nil {
log.Errorf("Invalid mysql DSN: %s, %s\n", self.Address(), err.Error())
return
}
log.Infof("Connecting to mysql %s/%s...", config.Addr, config.DBName)
db, err := sql.Open("mysql", self.Address())
if err != nil {
log.Errorf("Open mysql database error: %s\n", err)
return
}
err = db.Ping()
if err != nil {
log.Errorln(err)
return
}
db.SetMaxOpenConns(int(self.PoolConnCount))
db.SetMaxIdleConns(int(self.PoolConnCount / 2))
self.dbGuard.Lock()
self.db = db
self.dbGuard.Unlock()
if config != nil {
log.SetColor("green").Infof("Connected to mysql %s/%s", config.Addr, config.DBName)
}
}
func (self *mysqlConnector) Stop() {
db := self.dbConn()
if db != nil {
db.Close()
}
}
func init() {
peer.RegisterPeerCreator(func() cellnet.Peer {
self := &mysqlConnector{}
self.CoreSQLParameter.Init()
return self
})
}