-
Notifications
You must be signed in to change notification settings - Fork 0
/
context.go
158 lines (134 loc) · 3.48 KB
/
context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package qim
import (
"sync"
"github.com/joeyscat/qim/logger"
"github.com/joeyscat/qim/wire"
"github.com/joeyscat/qim/wire/pkt"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
)
// Session read only
type Session interface {
GetChannelId() string
GetGateId() string
GetAccount() string
GetRemoteIp() string
GetApp() string
GetTags() []string
}
type Context interface {
Dispatcher
SessionStorage
Header() *pkt.Header
ReadBody(value proto.Message) error
Session() Session
RespWithError(status pkt.Status, err error) error
Resp(status pkt.Status, body proto.Message) error
Dispatch(body proto.Message, recvs ...*Location) error
Next()
}
type HandlerFunc func(Context)
type HandlersChain []HandlerFunc
// ContextImpl the most important part
type ContextImpl struct {
sync.Mutex
Dispatcher
SessionStorage
handlers HandlersChain
index int
request *pkt.LogicPkt
session Session
}
func BuildContext() Context {
return &ContextImpl{}
}
// Dispatch implements Context
func (c *ContextImpl) Dispatch(body protoreflect.ProtoMessage, recvs ...*Location) error {
if len(recvs) == 0 {
return nil
}
packet := pkt.NewFrom(&c.request.Header)
packet.Flag = pkt.Flag_Push
packet.WriteBody(body)
logger.L.Debug("<-- Dispatch", zap.Int("to.len", len(recvs)), zap.String("header", c.request.Header.String()))
// the receivers group by the destination of gateway
group := make(map[string][]string)
for _, recv := range recvs {
if recv.ChannelID == c.Session().GetChannelId() {
continue
}
if _, ok := group[recv.GateID]; !ok {
group[recv.GateID] = make([]string, 0)
}
group[recv.GateID] = append(group[recv.GateID], recv.ChannelID)
}
for gateway, ids := range group {
err := c.Push(gateway, ids, packet)
if err != nil {
logger.L.Error("Push error", zap.Error(err))
}
return err
}
return nil
}
// Header implements Context
func (c *ContextImpl) Header() *pkt.Header {
return &c.request.Header
}
// Next implements Context
func (c *ContextImpl) Next() {
if c.index >= len(c.handlers) {
return
}
f := c.handlers[c.index]
c.index++
if f == nil {
logger.L.Warn("arrived unknown HandlerFunc")
return
}
f(c)
}
// ReadBody implements Context
func (c *ContextImpl) ReadBody(value protoreflect.ProtoMessage) error {
return c.request.ReadBody(value)
}
// Resp implements Context
func (c *ContextImpl) Resp(status pkt.Status, body protoreflect.ProtoMessage) error {
packet := pkt.NewFrom(&c.request.Header)
packet.Status = status
packet.WriteBody(body)
packet.Flag = pkt.Flag_Response
logger.L.Debug("<-- Resp", zap.String("toAccount", c.session.GetAccount()),
zap.String("header", c.request.Header.String()),
zap.String("status", status.String()),
zap.Any("body", body),
)
err := c.Push(c.Session().GetGateId(), []string{c.session.GetChannelId()}, packet)
if err != nil {
logger.L.Error("Push error", zap.Error(err))
}
return err
}
// RespWithError implements Context
func (c *ContextImpl) RespWithError(status pkt.Status, err error) error {
return c.Resp(status, &pkt.ErrorResp{Message: err.Error()})
}
// Session implements Context
func (c *ContextImpl) Session() Session {
if c.session == nil {
server, _ := c.request.GetMeta(wire.MetaDestServer)
c.session = &pkt.Session{
ChannelId: c.request.ChannelId,
GateId: server.(string),
Tags: []string{"AutoGenerated"},
}
}
return c.session
}
func (c *ContextImpl) reset() {
c.request = nil
c.index = 0
c.handlers = nil
c.session = nil
}