/
utils.go
152 lines (135 loc) · 3.29 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
package pillx
import (
"os"
"strconv"
log "github.com/Sirupsen/logrus"
"github.com/robfig/cron"
"stathat.com/c/consistent"
)
var globalConsistent *consistent.Consistent = consistent.New()
var logFormat string
func GetPool(workerPools map[string]Pool) (wp Pool, key string) {
//随机取出一个workerpool
//实现一个稳定的hash算法
for key, wp := range workerPools {
return wp, key
}
return
}
func SetLogFormat(format string) {
logFormat = format
}
func responseSend(clientId uint64, resMap map[string]*Response, msg interface{}) (n int, err error) {
//TODO 发送中发现连接不可用,则剔除
_, res := GetResponse(clientId, resMap)
n, err = res.Send(msg)
return
/**
//循环次数
for i := 0; i < 5; i++ {
MyLog().Info("teee")
_, res := GetResponse(resMap)
MyLog().Info(resMap)
n, err = res.Send(msg)
if err != nil {
MyLog().Error(err)
continue
}
return n, err
}
return 0, err
*/
}
func GetResponse(clientId uint64, resMap map[string]*Response) (ip string, res *Response) {
//取得一个稳定的节点
clientIdStr := strconv.Itoa(int(clientId))
serverName, _ := globalConsistent.Get(clientIdStr)
return serverName, resMap[serverName]
/**
//实现一个稳定的LoadBalance算法
//权重随机
for ip, res := range resMap {
return ip, res
}
return
*/
}
func SendAllGateWay(resMap map[string]*Response, msg interface{}) {
for _, response := range resMap {
response.Send(msg)
}
return
}
func SendAllGateWayPool(gatewayPools map[string]Pool, msg interface{}) {
for _, gp := range gatewayPools {
gateway, err := gp.Get()
if err != nil {
log.WithError(err).Error("gateway池返回错误")
}
gateway.response.Send(msg)
//回收
gateway.Close()
}
return
}
func NewGatewayClient(addr string) *Server {
client := &Server{
Addr: addr,
Handler: NewServeRouter(),
Protocol: &GateWayProtocol{},
}
return client
}
var logInitFlg bool = false
func MyLog() *log.Entry {
if logInitFlg == false {
// Log as JSON instead of the default ASCII formatter.
if logFormat != "" {
log.SetFormatter(&log.JSONFormatter{})
// Only log the warning severity or above.
log.SetLevel(log.DebugLevel)
// Output to stderr instead of stdout, could also be a file.
file := getFile(logFormat)
log.SetOutput(file)
} else {
// The TextFormatter is default, you don't actually have to do this.
log.SetFormatter(&log.TextFormatter{})
// Only log the warning severity or above.
log.SetLevel(log.DebugLevel)
// Output to stderr instead of stdout, could also be a file.
//log.SetOutput(os.Stderr)
}
//每天重置下
if logFormat != "" {
c := cron.New()
c.AddFunc("1 0 0 * * *", func() {
file := getFile(logFormat)
log.SetOutput(file)
})
c.Start()
}
logInitFlg = true
}
return log.WithFields(log.Fields{
"prama": "mylog",
})
}
func getFile(filename string) *os.File {
var f *os.File
if checkFileIsExist(filename) { //如果文件存在
f, _ = os.OpenFile(filename, os.O_RDWR|os.O_APPEND, 0777) //打开文件
} else {
f, _ = os.Create(filename) //创建文件
}
return f
}
/**
* 判断文件是否存在 存在返回 true 不存在返回false
*/
func checkFileIsExist(filename string) bool {
var exist = true
if _, err := os.Stat(filename); os.IsNotExist(err) {
exist = false
}
return exist
}