/
toserver.go
154 lines (143 loc) · 4.66 KB
/
toserver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package server
import (
"fmt"
"github.com/brokercap/Bifrost/server/filequeue"
"sync"
"log"
pluginStorage "github.com/brokercap/Bifrost/plugin/storage"
"encoding/json"
)
type ToServer struct {
sync.RWMutex
ToServerID int
PluginName string
MustBeSuccess bool
FilterQuery bool
FilterUpdate bool
FieldList []string
ToServerKey string
BinlogFileNum int
BinlogPosition uint32
PluginParam map[string]interface{}
Status string
ToServerChan *ToServerChan `json:"-"`
Error string
ErrorWaitDeal int
ErrorWaitData interface{}
LastBinlogFileNum int // 由 channel 提交到 ToServerChan 的最后一个位点
LastBinlogPosition uint32 // 假如 BinlogFileNum == LastBinlogFileNum && BinlogPosition == LastBinlogPosition 则说明这个位点是没有问题的
LastBinlogKey []byte `json:"-"` // 将数据保存到 level 的key
QueueMsgCount uint32 // 队列里的堆积的数量
fileQueueObj *filequeue.Queue
FileQueueStatus bool // 是否启动文件队列
Notes string
ThreadCount int16 // 消费线程数量
FileQueueUsableCount uint32 // 在开始文件队列的配置下,每次写入 ToServerChan 后 ,在 FileQueueUsableCountTimeDiff 时间内 队列都是满的次数
FileQueueUsableCountStartTime int64 // 开始统计 FileQueueUsableCount 计算的时间
CosumerPluginParamMap map[uint16]interface{} `json:"-"` // 用以区分多个消费者的身份
CosumerIdInrc uint16 // 消费者自增id
}
func (db *db) AddTableToServer(schemaName string, tableName string, toserver *ToServer) (bool,int) {
key := GetSchemaAndTableJoin(schemaName,tableName)
if _, ok := db.tableMap[key]; !ok {
return false,0
} else {
db.Lock()
if toserver.ToServerID <= 0{
db.tableMap[key].LastToServerID += 1
toserver.ToServerID = db.tableMap[key].LastToServerID
}
if toserver.PluginName == ""{
ToServerInfo := pluginStorage.GetToServerInfo(toserver.ToServerKey)
if ToServerInfo != nil{
toserver.PluginName = ToServerInfo.PluginName
}
}
if toserver.BinlogFileNum == 0{
BinlogPostion,err := getBinlogPosition(getDBBinlogkey(db))
if err == nil {
toserver.BinlogFileNum = BinlogPostion.BinlogFileNum
toserver.LastBinlogFileNum = BinlogPostion.BinlogFileNum
toserver.BinlogPosition = BinlogPostion.BinlogPosition
toserver.LastBinlogPosition = BinlogPostion.BinlogPosition
}else{
log.Println("AddTableToServer GetDBBinlogPostion:",err)
}
}
toserver.QueueMsgCount = 0
db.tableMap[key].ToServerList = append(db.tableMap[key].ToServerList, toserver)
db.Unlock()
log.Println("AddTableToServer",db.Name,schemaName,tableName,toserver)
}
return true,toserver.ToServerID
}
func (db *db) DelTableToServer(schemaName string, tableName string,ToServerID int) bool {
key := GetSchemaAndTableJoin(schemaName,tableName)
if _, ok := db.tableMap[key]; !ok {
return false
} else {
var index int = -1
db.Lock()
for index1,toServerInfo2 := range db.tableMap[key].ToServerList{
if toServerInfo2.ToServerID == ToServerID{
index = index1
break
}
}
if index == -1 {
db.Unlock()
return true
}
toServerInfo := db.tableMap[key].ToServerList[index]
toServerPositionBinlogKey := getToServerBinlogkey(db,toServerInfo)
//toServerInfo.Lock()
db.tableMap[key].ToServerList = append(db.tableMap[key].ToServerList[:index], db.tableMap[key].ToServerList[index+1:]...)
if toServerInfo.Status == "running"{
toServerInfo.Status = "deling"
}else{
if toServerInfo.Status != "deling" {
delBinlogPosition(toServerPositionBinlogKey)
}
}
log.Println("DelTableToServer",db.Name,schemaName,tableName,"toServerInfo:",toServerInfo)
db.Unlock()
}
//将文件队列的路径也相应的删除掉
filequeue.Delete(GetFileQueue(db.Name,schemaName,tableName,fmt.Sprint(ToServerID)))
return true
}
func (This *ToServer) UpdateBinlogPosition(BinlogFileNum int,BinlogPosition uint32) bool {
This.Lock()
This.BinlogFileNum = BinlogFileNum
This.BinlogPosition = BinlogPosition
This.Unlock()
return true
}
func (This *ToServer) AddWaitError(WaitErr error,WaitData interface{}) bool {
This.Lock()
This.Error = WaitErr.Error()
b,_:=json.Marshal(WaitData)
This.ErrorWaitData = string(b)
This.Unlock()
return true
}
func (This *ToServer) DealWaitError() bool {
This.Lock()
This.ErrorWaitDeal = 1
This.Unlock()
return true
}
func (This *ToServer) GetWaitErrorDeal() int {
This.Lock()
deal := This.ErrorWaitDeal
This.Unlock()
return deal
}
func (This *ToServer) DelWaitError() bool {
This.Lock()
This.Error = ""
This.ErrorWaitData = nil
This.ErrorWaitDeal = 0
This.Unlock()
return true
}