/
app.go
443 lines (391 loc) · 12.3 KB
/
app.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
// app 表示在每个machine上运行的所有module的集合,也就是说每个machine运行一个app
package app
import (
"github.com/dming/lodos/conf"
"fmt"
"github.com/dming/lodos/rpc"
"github.com/dming/lodos/module"
"os/exec"
"os"
"path/filepath"
"flag"
"github.com/dming/lodos/log"
"os/signal"
"strings"
"math"
"hash/crc32"
"github.com/dming/lodos/module/base"
"github.com/opentracing/opentracing-go"
"github.com/dming/lodos/gate"
"github.com/dming/lodos/rpc/base"
"encoding/json"
"github.com/dming/lodos/rpc/pb"
)
type resultsInfo struct {
ErrStr string
Results []interface{}
}
type protocolMarshalImp struct {
data []byte
}
func (this *protocolMarshalImp) GetData() []byte {
return this.data
}
func NewApp(version string) module.AppInterface {
a := new(app)
a.routes = map[string]func(app module.AppInterface, mType string, hash string) (module.ModuleSession, error){}
a.serverList = map[string]module.ModuleSession{}
a.defaultRoute = func(app module.AppInterface, mType string, hash string) (module.ModuleSession, error) {
//默认使用第一个Server
servers := a.GetServersByType(mType)
if len(servers) == 0 {
return nil, fmt.Errorf("has no servers of %s", mType)
}
index := int(math.Abs(float64(crc32.ChecksumIEEE([]byte(hash))))) % len(servers)
return servers[index], nil
}
a.rpcSerializes=map[string]module.RPCSerialize{}
a.version = version
return a
}
type app struct {
//module.AppInterface
name string
version string
processId string
settings conf.Config
manager module.ModuleManager
serverList map[string]module.ModuleSession
routes map[string]func(app module.AppInterface, mType string, hash string) (module.ModuleSession, error)
defaultRoute func(app module.AppInterface, mType string, hash string) (module.ModuleSession, error)
rpcSerializes map[string]module.RPCSerialize
//
getTracer func() opentracing.Tracer
configurationLoaded func(app module.AppInterface)
startUp func(app module.AppInterface)
moduleInited func(app module.AppInterface, module module.Module)
judgeGuest func(session gate.Session) bool
protocolMarshal func(results []interface{}, errStr string) (module.ProtocolMarshal, error)
}
func (a *app) OnInit(settings conf.Config) error {
a.serverList = make(map[string]module.ModuleSession)
for Type, ModuleInfos := range settings.Modules {
for _, mInfo := range ModuleInfos {
if m, ok := a.serverList[mInfo.Id]; ok {
//如果Id已经存在,说明有两个相同Id的模块,这种情况不能被允许,这里就直接抛异常 强制崩溃以免以后调试找不到问题
panic(fmt.Sprintf("ServerId (%s) Type (%s) of the modules already exist Can not be reused ServerId (%s) Type (%s)",
m.GetId(), m.GetType(), mInfo.Id, Type))
}
client, err := baserpc.NewRPCClient(a, mInfo.Id)
if err != nil {
continue
}
if a.GetProcessID() != mInfo.ProcessID {
//同一个ProcessID下的模块直接通过local channel通信就可以了
if mInfo.Rabbitmq != nil {
//如果远程的rpc存在则创建一个对应的客户端
client.NewRabbitmqRpcClient(mInfo.Rabbitmq)
}
if mInfo.Redis != nil && mInfo.Redis.RPCUri != "" {
//如果远程的rpc存在则创建一个对应的客户端
client.NewRedisRpcClient(mInfo.Redis)
}
}
session := basemodule.NewModuleSession(a, mInfo.Id, Type, client)
a.serverList[mInfo.Id] = session
log.Info("rpcClient create success type(%s) id(%s)", Type, mInfo.Id)
}
}
return nil
}
func (a *app) Run(debug bool, mods ...module.Module) error {
wdPath := flag.String("wd", "", "Server work directory")
confPath := flag.String("conf", "", "Server configuration file path")
ProcessID := flag.String("pid", "development", "Server ProcessID?")
Logdir := flag.String("log", "", "Log file directory?")
flag.Parse() //解析输入的参数
a.processId = *ProcessID
ApplicationDir := ""
if *wdPath != "" {
_, err := os.Open(*wdPath)
if err != nil {
panic(err)
}
os.Chdir(*wdPath)
ApplicationDir, err = os.Getwd()
} else {
var err error
ApplicationDir, err = os.Getwd()
if err != nil {
file, _ := exec.LookPath(os.Args[0])
ApplicationPath, _ := filepath.Abs(file)
ApplicationDir, _ = filepath.Split(ApplicationPath)
}
}// get ApplicationDir
defaultConfPath := fmt.Sprintf("%s/conf/server.conf", ApplicationDir)
defaultLogPath := fmt.Sprintf("%s/logs", ApplicationDir)
if *confPath == "" {
*confPath = defaultConfPath
}
if *Logdir == "" {
*Logdir = defaultLogPath
}
f, err := os.Open(*confPath)
if err != nil {
panic(err)
}
_, err = os.Open(*Logdir)
if err != nil {
//文件不存在
err := os.Mkdir(*Logdir, os.ModePerm) //
if err != nil {
fmt.Println(err)
}
}
fmt.Println("Server configuration file path :", *confPath)
conf.LoadConfig(f.Name()) //加载配置文件
a.ConfigSettings(conf.Conf) //配置信息
log.InitBeego(debug, *ProcessID, *Logdir, conf.Conf.Log)
log.Info("Lodos %v starting up", a.version)
if a.configurationLoaded != nil {
a.configurationLoaded(a)
}
manager := NewModuleManager()
//manager.RegisterRunMod(modules.TimerModule()) //注册时间轮模块 每一个进程都默认运行
// module
for i := 0; i < len(mods); i++ {
mods[i].OnAppConfigurationLoaded(a)
manager.Register(mods[i])
}
a.OnInit(a.settings)
manager.Init(a, *ProcessID)
if a.startUp != nil {
a.startUp(a)
}
if a.judgeGuest == nil {
log.Warning("App.judgeGuest is still nil, it would cause all user just be guest ")
}
log.Info("Server started : %s", *confPath)
// close
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, os.Kill)
sig := <-c
manager.Destroy()
a.OnDestroy()
log.Info("Lodos closing down (signal: %v)", sig)
return nil
}
func (a *app) OnDestroy() error {
for id, session := range a.serverList {
err := session.GetClient().Done()
if err != nil {
log.Warning("rpcClient close fail type(%s) id(%s)", session.GetType(), id)
} else {
log.Info("rpcClient close success type(%s) id(%s)", session.GetType(), id)
}
}
return nil
}
func (a *app) ConfigSettings(settings conf.Config) {
a.settings = settings
}
func (a *app) GetSettings() conf.Config {
return a.settings
}
func (a *app) GetProcessID() string {
return a.processId
}
func (a *app) RegisterLocalClient(serverId string, server rpc.RPCServer) error {
if session, ok := a.serverList[serverId]; ok {
err := session.GetClient().NewLocalRpcClient(server)
if err != nil {
return err
}
return nil
} else {
return fmt.Errorf("Server(%s) Not Found", serverId)
}
}
// Route(moduleType string, fn func(app AppInterface, Type string, hash string) ModuleSession) error
func (a *app) SetRoute (moduleType string, fn func(app module.AppInterface, moduleType string, hash string) (module.ModuleSession, error)) {
a.routes[moduleType] = fn
}
func (a *app) GetRoute (moduleType string) (func(app module.AppInterface, moduleType string, hash string) (module.ModuleSession, error)) {
if _, ok := a.routes[moduleType]; ok {
return a.routes[moduleType]
} else {
return a.defaultRoute
}
}
func (a *app) GetServerById(moduleId string) (module.ModuleSession, error) {
if ms, ok := a.serverList[moduleId]; ok {
return ms, nil
} else {
return nil, fmt.Errorf("%s not found.", moduleId)
}
}
/**
filter 调用者服务类型 moduleType|moduleType@moduleID
Type 想要调用的服务类型
*/
func (a *app) GetServersByType(moduleType string) []module.ModuleSession {
sessions := make([]module.ModuleSession, 0)
for _, session := range a.serverList {
if session.GetType() == moduleType {
sessions = append(sessions, session)
}
}
return sessions
}
//GetModuleSession (filter string, hash string) (ModuleSession, error)
func (a *app) GetRouteServer(filter string, hash string) (module.ModuleSession, error) {
sl := strings.Split(filter, "@")
if len(sl) == 2 {
moduleID := sl[1]
if moduleID != "" {
return a.GetServerById(moduleID)
}
}
moduleType := sl[0]
route := a.GetRoute(moduleType) //route is a function
s, _ := route(a, moduleType, hash) // s is module.ServerSession
if s == nil {
return nil, fmt.Errorf("Server(type : %s) Not Found", moduleType)
}
return s, nil
} //获取经过筛选过的服务
func (a *app) AddRPCSerialize(name string, rs module.RPCSerialize) error {
if _, ok := a.rpcSerializes[name]; ok{
return fmt.Errorf("The name(%s) has been occupied",name)
}
a.rpcSerializes[name] = rs
return nil
}
func (a *app) GetRPCSerialize() (map[string]module.RPCSerialize) {
return a.rpcSerializes
}
func (a *app) DefaultTracer(fn func() opentracing.Tracer) {
a.getTracer = fn
}
func (a *app) GetTracer() opentracing.Tracer {
if a.getTracer != nil {
return a.getTracer()
}
return nil
}
func (a *app) SetJudgeGuest(judgeGuest func(session gate.Session) bool) {
a.judgeGuest = judgeGuest
}
func (a *app) GetJudgeGuest() func(session gate.Session) bool {
return a.judgeGuest
}
func (a *app) SetProtocolMarshal(protocolMarshal func(Results []interface{}, errStr string) (module.ProtocolMarshal, error)) {
a.protocolMarshal = protocolMarshal
}
func (a *app) NewProtocolMarshal(data []byte) module.ProtocolMarshal {
return &protocolMarshalImp{
data: data,
}
}
func (a *app) ProtocolMarshal(results []interface{}, errStr string) (module.ProtocolMarshal, error) {
if a.protocolMarshal != nil {
return a.protocolMarshal(results, errStr)
}
r := &resultsInfo{
ErrStr: errStr,
Results: results,
}
body, err := json.Marshal(r)
if err != nil {
return nil, err
} else {
return a.NewProtocolMarshal(body), nil
}
}
func (a *app) GetModuleInited() func(app module.AppInterface, module module.Module) {
return a.moduleInited
}
func (a *app) SetModuleInited(fn func(app module.AppInterface, module module.Module)) {
a.moduleInited = fn
}
func (a *app) SetConfigurationLoaded(fn func(app module.AppInterface)) {
a.configurationLoaded = fn
}
func (a *app) SetStartup(fn func(app module.AppInterface)) {
a.startUp = fn
}
func (a *app) RpcCall(m module.FullModule, moduleType string, _func string, params ...interface{}) ([]interface{}, error) {
server, err := a.GetRouteServer(moduleType, m.GetServerId())
if err != nil {
return nil, err
}
return server.Call(_func, params...)
}
func (a *app) RpcSyncCall(m module.FullModule, moduleType string, _func string, params ...interface{}) (chan rpcpb.ResultInfo, error) {
server, err := a.GetRouteServer(moduleType, m.GetServerId())
if err != nil {
return nil, err
}
return server.SyncCall(_func, params...)
}
func (a *app) RpcCallNR(m module.FullModule, moduleType string, _func string, params ...interface{}) (err error) {
server, err := a.GetRouteServer(moduleType, m.GetServerId())
if err != nil {
return err
}
return server.CallNR(_func, params...)
}
func (a *app) RpcCallArgs(m module.FullModule, moduleType string, _func string, ArgsType []string, Args [][]byte) ([]interface{}, error) {
server, err := a.GetRouteServer(moduleType, m.GetServerId())
if err != nil {
return nil, err
}
return server.CallArgs(_func, ArgsType, Args)
}
func (a *app) RpcSyncCallArgs(m module.FullModule, moduleType string, _func string, ArgsType []string, Args [][]byte) (chan rpcpb.ResultInfo, error) {
server, err := a.GetRouteServer(moduleType, m.GetServerId())
if err != nil {
return nil, err
}
return server.SyncCallArgs(_func, ArgsType, Args)
}
func (a *app) RpcCallArgsNR(m module.FullModule, moduleType string, _func string, ArgsType []string, Args [][]byte) (err error) {
server, err := a.GetRouteServer(moduleType, m.GetServerId())
if err != nil {
return err
}
return server.CallArgsNR(_func, ArgsType, Args)
}
/*
func (a *app) GetModuleSession (filter string, hash string) (module.ModuleSession, error) {
sl := strings.Split(filter, "@")
if len(sl) == 2 {
moduleId := sl[1]
if moduleId != "" {
return a.GetServerById(moduleId)
}
}
moduleType := sl[0]
route, _ := a.GetRoute(moduleType)
ms, err := route(a, moduleType, hash)
if err != nil {
return nil, err
} else {
return ms, nil
}
}
func (a *app) Call(moduleType string, hash string, fnId string, args ...interface{}) (*rpc.RetInfo, error) {
ms, err := a.GetModuleSession(moduleType, hash)
if err != nil {
return nil, err
}
return ms.GetClient().Call(fnId, 1, args...)
}
func (a *app) AsynCall(moduleType string, hash string, fnId string, args ...interface{}) (chan *rpc.RetInfo, error) {
ms, err := a.GetModuleSession(moduleType, hash)
if err != nil {
return nil, err
}
return ms.GetClient().AsynCall(fnId, args...)
}
*/