-
Notifications
You must be signed in to change notification settings - Fork 4
/
game_server.go
404 lines (374 loc) · 14.4 KB
/
game_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
package gameserver
import (
"context"
"encoding/json"
"github.com/fish-tennis/gentity"
"github.com/fish-tennis/gserver/game"
"github.com/fish-tennis/gserver/social"
"google.golang.org/protobuf/proto"
"os"
"sync"
"time"
. "github.com/fish-tennis/gnet"
"github.com/fish-tennis/gserver/cache"
"github.com/fish-tennis/gserver/cfg"
"github.com/fish-tennis/gserver/db"
. "github.com/fish-tennis/gserver/internal"
"github.com/fish-tennis/gserver/logger"
"github.com/fish-tennis/gserver/misc"
"github.com/fish-tennis/gserver/pb"
)
var (
_ gentity.Application = (*GameServer)(nil)
)
// 游戏服
type GameServer struct {
BaseServer
config *GameServerConfig
// 网关服务器listener
gateListener Listener
// 服务器listener
serverListener Listener
// 在线玩家
playerMap sync.Map // playerId-*player
}
// 游戏服配置
type GameServerConfig struct {
BaseServerConfig
}
// 初始化
func (this *GameServer) Init(ctx context.Context, configFile string) bool {
game.SetPlayerMgr(this)
if !this.BaseServer.Init(ctx, configFile) {
return false
}
this.readConfig()
this.loadCfgs()
game.InitPlayerComponentMap()
this.initDb()
this.initCache()
this.GetServerList().SetCache(cache.Get())
netMgr := GetNetMgr()
// NOTE: 实际项目中,监听客户端和监听网关,二选一即可
// 这里为了演示,同时提供客户端直连和网关两种模式
// 客户端的codec和handler
clientCodec := NewProtoCodec(nil)
clientHandler := game.NewClientConnectionHandler(clientCodec)
this.registerClientPacket(clientHandler)
clientListenerConfig := &ListenerConfig{
AcceptConfig: this.config.ClientConnConfig,
}
clientListenerConfig.AcceptConfig.Codec = clientCodec
clientListenerConfig.AcceptConfig.Handler = clientHandler
clientListenerConfig.ListenerHandler = &ClientListerHandler{}
if netMgr.NewListener(ctx, this.config.ClientListenAddr, clientListenerConfig) == nil {
panic("listen client failed")
return false
}
// NOTE: 实际项目中,监听客户端和监听网关,二选一即可
// 这里为了演示,同时提供客户端直连和网关两种模式
// 服务器的codec和handler
gateCodec := NewGateCodec(nil)
gateHandler := NewDefaultConnectionHandler(gateCodec)
this.registerGatePacket(gateHandler)
gateListenerConfig := &ListenerConfig{
AcceptConfig: this.config.ServerConnConfig,
}
gateListenerConfig.AcceptConfig.Codec = gateCodec
gateListenerConfig.AcceptConfig.Handler = gateHandler
this.gateListener = netMgr.NewListener(ctx, this.config.GateListenAddr, gateListenerConfig)
if this.gateListener == nil {
panic("listen gate failed")
return false
}
// 服务器的codec和handler
serverCodec := NewProtoCodec(nil)
serverHandler := NewDefaultConnectionHandler(serverCodec)
this.registerServerPacket(serverHandler)
serverListenerConfig := &ListenerConfig{
AcceptConfig: this.config.ServerConnConfig,
}
serverListenerConfig.AcceptConfig.Codec = serverCodec
serverListenerConfig.AcceptConfig.Handler = serverHandler
this.serverListener = netMgr.NewListener(ctx, this.config.ServerListenAddr, serverListenerConfig)
if this.serverListener == nil {
panic("listen server failed")
return false
}
// 连接其他服务器
this.BaseServer.SetDefaultServerConnectorConfig(this.config.ServerConnConfig, NewProtoCodec(nil))
this.BaseServer.GetServerList().SetFetchAndConnectServerTypes(ServerType_Game)
// 其他模块初始化
this.AddServerHook(&game.Hook{}, &social.Hook{})
serverInitArg := &misc.GameServerInitArg{
ClientHandler: clientHandler,
ServerHandler: serverHandler,
PlayerMgr: game.GetPlayerMgr(),
}
for _, hook := range this.BaseServer.GetServerHooks() {
hook.OnApplicationInit(serverInitArg)
}
logger.Info("GameServer.Init")
return true
}
// 运行
func (this *GameServer) Run(ctx context.Context) {
this.BaseServer.Run(ctx)
logger.Info("GameServer.Run")
}
// 退出
func (this *GameServer) Exit() {
this.playerMap.Range(func(key, value interface{}) bool {
player := value.(*game.Player)
player.Stop()
return true
})
game.GetGlobalEntity().PushMessage(NewProtoPacket(PacketCommand(pb.CmdGlobalEntity_Cmd_ShutdownReq), &pb.ShutdownReq{
Timestamp: game.GetGlobalEntity().GetTimerEntries().Now().Unix(),
}))
this.BaseServer.Exit()
logger.Info("GameServer.Exit")
dbMgr := db.GetDbMgr()
if dbMgr != nil {
dbMgr.(*gentity.MongoDb).Disconnect()
}
}
// 读取配置文件
func (this *GameServer) readConfig() {
fileData, err := os.ReadFile(this.GetConfigFile())
if err != nil {
panic("read config file err")
}
this.config = new(GameServerConfig)
err = json.Unmarshal(fileData, this.config)
if err != nil {
panic("decode config file err")
}
logger.Debug("%v", this.config)
this.BaseServer.GetServerInfo().ServerId = this.config.ServerId
this.BaseServer.GetServerInfo().ServerType = ServerType_Game
this.BaseServer.GetServerInfo().ClientListenAddr = this.config.ClientListenAddr
this.BaseServer.GetServerInfo().GateListenAddr = this.config.GateListenAddr
this.BaseServer.GetServerInfo().ServerListenAddr = this.config.ServerListenAddr
}
// 加载配置数据
func (this *GameServer) loadCfgs() {
progressMgr := game.RegisterProgressCheckers()
conditionMgr := game.RegisterConditionCheckers()
cfg.LoadAllCfgs("cfgdata", progressMgr, conditionMgr)
}
// 初始化数据库
func (this *GameServer) initDb() {
// 使用mongodb来演示
mongoDb := gentity.NewMongoDb(this.config.MongoUri, this.config.MongoDbName)
// 玩家数据库
playerDB := mongoDb.RegisterPlayerDb("player", "_id", "accountid", "regionid")
// 公会数据库
mongoDb.RegisterEntityDb("guild", "_id")
// 全局对象数据库(如GlobalEntity)
mongoDb.RegisterEntityDb(game.GlobalEntityCollectionName, game.GlobalEntityCollectionKeyName)
if !mongoDb.Connect() {
panic("connect db error")
}
// 玩家数据库设置分片
mongoDb.ShardDatabase(this.config.MongoDbName)
playerDB.(*gentity.MongoCollectionPlayer).ShardCollection(true)
db.SetDbMgr(mongoDb)
}
// 初始化redis缓存
func (this *GameServer) initCache() {
cache.NewRedis(this.config.RedisUri, this.config.RedisUsername, this.config.RedisPassword, this.config.RedisCluster)
pong, err := cache.GetRedis().Ping(context.Background()).Result()
if err != nil || pong == "" {
panic("redis connect error")
}
this.repairCache()
}
// 修复缓存,游戏服异常宕机重启后进行修复操作
func (this *GameServer) repairCache() {
cache.ResetOnlinePlayer(this.GetId(), this.repairPlayerCache)
}
// 缓存中的玩家数据保存到数据库
// 服务器crash时,缓存数据没来得及保存到数据库,服务器重启后进行自动修复,防止玩家数据回档
func (this *GameServer) repairPlayerCache(playerId, accountId int64) error {
defer func() {
if err := recover(); err != nil {
logger.Error("repairPlayerCache %v err:%v", playerId, err.(error).Error())
LogStack()
}
}()
tmpPlayer := game.CreateTempPlayer(playerId, accountId)
gentity.FixEntityDataFromCache(tmpPlayer, db.GetPlayerDb(), cache.Get(), game.PlayerCachePrefix, playerId)
return nil
}
// 注册客户端消息回调
func (this *GameServer) registerClientPacket(clientHandler PacketHandlerRegister) {
// 手动注册消息回调
clientHandler.Register(PacketCommand(pb.CmdInner_Cmd_HeartBeatReq), onHeartBeatReq, new(pb.HeartBeatReq))
clientHandler.Register(PacketCommand(pb.CmdLogin_Cmd_PlayerEntryGameReq), onPlayerEntryGameReq, new(pb.PlayerEntryGameReq))
clientHandler.Register(PacketCommand(pb.CmdLogin_Cmd_CreatePlayerReq), onCreatePlayerReq, new(pb.CreatePlayerReq))
this.registerGatePlayerPacket(clientHandler, PacketCommand(pb.CmdInner_Cmd_TestCmd), onTestCmd, new(pb.TestCmd))
// 通过反射自动注册消息回调
game.AutoRegisterPlayerComponentProto(clientHandler)
// 自动注册消息回调的另一种方案: proto_code_gen工具生成的回调函数
// 因为已经用了反射自动注册,所以这里注释了
// player_component_handler_gen(clientHandler)
}
// 心跳回复
func onHeartBeatReq(connection Connection, packet Packet) {
req := packet.Message().(*pb.HeartBeatReq)
SendPacketAdapt(connection, packet, PacketCommand(pb.CmdInner_Cmd_HeartBeatRes), &pb.HeartBeatRes{
RequestTimestamp: req.GetTimestamp(),
ResponseTimestamp: uint64(time.Now().UnixNano() / int64(time.Microsecond)),
})
}
// 注册网关消息回调
func (this *GameServer) registerGatePacket(gateHandler PacketHandlerRegister) {
// 手动注册消息回调
gateHandler.Register(PacketCommand(pb.CmdInner_Cmd_HeartBeatReq), onHeartBeatReq, new(pb.HeartBeatReq))
gateHandler.Register(PacketCommand(pb.CmdLogin_Cmd_PlayerEntryGameReq), onPlayerEntryGameReq, new(pb.PlayerEntryGameReq))
gateHandler.Register(PacketCommand(pb.CmdLogin_Cmd_CreatePlayerReq), onCreatePlayerReq, new(pb.CreatePlayerReq))
gateHandler.Register(PacketCommand(pb.CmdInner_Cmd_ClientDisconnect), onClientDisconnect, new(pb.TestCmd))
this.registerGatePlayerPacket(gateHandler, PacketCommand(pb.CmdInner_Cmd_TestCmd), onTestCmd, new(pb.TestCmd))
gateHandler.(*DefaultConnectionHandler).SetUnRegisterHandler(func(connection Connection, packet Packet) {
if gatePacket, ok := packet.(*GatePacket); ok {
playerId := gatePacket.PlayerId()
player := this.GetPlayer(playerId)
if player != nil {
if gamePlayer, ok := player.(*game.Player); ok {
if gamePlayer.GetConnection() == connection {
// 在线玩家的消息,转到玩家消息处理协程去处理
gamePlayer.OnRecvPacket(gatePacket.ToProtoPacket())
logger.Debug("gate->player playerId:%v cmd:%v", playerId, packet.Command())
}
}
}
}
})
// 网关服务器掉线,该网关上的所有玩家都掉线
gateHandler.(*DefaultConnectionHandler).SetOnDisconnectedFunc(func(connection Connection) {
this.playerMap.Range(func(key, value interface{}) bool {
if player, ok := value.(*game.Player); ok {
if player.GetConnection() == connection {
player.OnDisconnect(connection)
}
}
return true
})
})
// 通过反射自动注册消息和proto.Message的映射
game.AutoRegisterPlayerComponentProto(gateHandler)
// 自动注册消息回调的另一种方案: proto_code_gen工具生成的回调函数
// 因为已经用了反射自动注册,所以这里注释了
// player_component_handler_gen(clientHandler)
}
// 注册func(player *Player, packet Packet)格式的消息回调函数,支持网关模式和客户端直连模式
func (this *GameServer) registerGatePlayerPacket(gateHandler PacketHandlerRegister, packetCommand PacketCommand, playerHandler func(player *game.Player, packet Packet), protoMessage proto.Message) {
gateHandler.Register(packetCommand, func(connection Connection, packet Packet) {
var playerId int64
if gatePacket, ok := packet.(*GatePacket); ok {
// 网关转发的消息,包含playerId
playerId = gatePacket.PlayerId()
} else {
// 客户端直连的模式
if connection.GetTag() == nil {
return
}
playerId, ok = connection.GetTag().(int64)
if !ok {
return
}
}
player := game.GetPlayer(playerId)
if player == nil {
return
}
// 网关模式,使用的GatePacket
if gatePacket, ok := packet.(*GatePacket); ok {
// 转换成ProtoPacket,业务层统一接口
player.OnRecvPacket(gatePacket.ToProtoPacket())
} else {
// 客户端直连模式,使用的ProtoPacket
player.OnRecvPacket(packet.(*ProtoPacket))
}
}, protoMessage)
game.RegisterPlayerHandler(packetCommand, playerHandler)
}
// 注册服务器消息回调
func (this *GameServer) registerServerPacket(serverHandler *DefaultConnectionHandler) {
serverHandler.Register(PacketCommand(pb.CmdInner_Cmd_HeartBeatReq), onHeartBeatReq, new(pb.HeartBeatReq))
serverHandler.Register(PacketCommand(pb.CmdInner_Cmd_KickPlayer), this.onKickPlayer, new(pb.KickPlayer))
serverHandler.Register(PacketCommand(pb.CmdRoute_Cmd_RoutePlayerMessage), this.onRoutePlayerMessage, new(pb.RoutePlayerMessage))
//serverHandler.autoRegisterPlayerComponentProto()
}
// 添加一个在线玩家
func (this *GameServer) AddPlayer(player gentity.Player) {
this.playerMap.Store(player.GetId(), player)
cache.AddOnlinePlayer(player.GetId(), player.GetAccountId(), this.GetId())
}
// 删除一个在线玩家
func (this *GameServer) RemovePlayer(player gentity.Player) {
// 先保存数据库 再移除cache
player.(*game.Player).SaveDb(true)
this.playerMap.Delete(player.GetId())
cache.RemoveOnlineAccount(player.GetAccountId())
cache.RemoveOnlinePlayer(player.GetId(), this.GetId())
}
// 获取一个在线玩家
func (this *GameServer) GetPlayer(playerId int64) gentity.Player {
if v, ok := this.playerMap.Load(playerId); ok {
return v.(gentity.Player)
}
return nil
}
// 踢玩家下线
func (this *GameServer) onKickPlayer(connection Connection, packet Packet) {
req := packet.Message().(*pb.KickPlayer)
player := game.GetPlayer(req.GetPlayerId())
if player != nil {
player.ResetConnection()
player.Stop()
logger.Debug("kick player account:%v playerId:%v gameServerId:%v",
req.GetAccountId(), req.GetPlayerId(), this.GetId())
} else {
playerId, gameServerId := cache.GetOnlineAccount(req.AccountId)
if playerId == req.PlayerId && gameServerId == this.GetId() {
cache.RemoveOnlineAccount(req.AccountId)
logger.Info("kick player2 account:%v playerId:%v gameServerId:%v",
req.GetAccountId(), req.GetPlayerId(), this.GetId())
} else {
logger.Error("kick player failed account:%v playerId:%v gameServerId:%v",
req.GetAccountId(), req.GetPlayerId(), this.GetId())
}
}
}
// 转发玩家消息
// otherServer -> thisServer -> player
func (this *GameServer) onRoutePlayerMessage(connection Connection, packet Packet) {
req := packet.Message().(*pb.RoutePlayerMessage)
logger.Debug("onRoutePlayerMessage %v", req)
player := game.GetPlayer(req.ToPlayerId)
if player == nil {
// NOTE: 由于是异步消息,这里的player有很低的概率可能不在线了,如果是重要的不能丢弃的消息,需要保存该消息,留待后续处理
// 演示程序暂不处理,这里就直接丢弃了
logger.Error("player nil %v", req.ToPlayerId)
return
}
message, err := req.PacketData.UnmarshalNew()
if err != nil {
logger.Error("UnmarshalNew %v err:%v", req.ToPlayerId, err)
return
}
err = req.PacketData.UnmarshalTo(message)
if err != nil {
logger.Error("UnmarshalTo %v err:%v", req.ToPlayerId, err)
return
}
if req.DirectSendClient {
// 不需要player处理的消息,直接转发给客户端
player.Send(PacketCommand(uint16(req.PacketCommand)), message)
} else {
// 需要player处理的消息,放进player的消息队列,在玩家的逻辑协程中处理
player.OnRecvPacket(NewProtoPacket(PacketCommand(req.PacketCommand), message))
}
}