-
Notifications
You must be signed in to change notification settings - Fork 788
/
mux_conn.go
179 lines (155 loc) · 4.55 KB
/
mux_conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
/*
* Copyright 2021 CloudWeGo Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package netpollmux
import (
"context"
"errors"
"fmt"
"io"
"net"
"sync"
"github.com/cloudwego/netpoll"
"github.com/cloudwego/netpoll/mux"
"github.com/cloudwego/kitex/pkg/klog"
"github.com/cloudwego/kitex/pkg/remote"
"github.com/cloudwego/kitex/pkg/remote/codec"
np "github.com/cloudwego/kitex/pkg/remote/trans/netpoll"
"github.com/cloudwego/kitex/pkg/remote/transmeta"
"github.com/cloudwego/kitex/pkg/rpcinfo"
)
// ErrConnClosed .
var ErrConnClosed = errors.New("conn closed")
var defaultCodec = codec.NewDefaultCodec()
func newMuxCliConn(connection netpoll.Connection) *muxCliConn {
c := &muxCliConn{
muxConn: newMuxConn(connection),
seqIDMap: newShardMap(mux.ShardSize),
}
connection.SetOnRequest(c.OnRequest)
connection.AddCloseCallback(func(connection netpoll.Connection) error {
return c.forceClose()
})
return c
}
type muxCliConn struct {
muxConn
closing bool // whether the server is going to close this connection
seqIDMap *shardMap // (k,v) is (sequenceID, notify)
}
func (c *muxCliConn) IsActive() bool {
return !c.closing && c.muxConn.IsActive()
}
// OnRequest is called when the connection creates.
func (c *muxCliConn) OnRequest(ctx context.Context, connection netpoll.Connection) (err error) {
// check protocol header
length, seqID, err := parseHeader(connection.Reader())
if err != nil {
err = fmt.Errorf("%w: addr(%s)", err, connection.RemoteAddr())
return c.onError(ctx, err, connection)
}
// reader is nil if return error
reader, err := connection.Reader().Slice(length)
if err != nil {
err = fmt.Errorf("mux read package slice failed: addr(%s), %w", connection.RemoteAddr(), err)
return c.onError(ctx, err, connection)
}
// seqId == 0 means a control frame.
if seqID == 0 {
iv := rpcinfo.NewInvocation("none", "none")
iv.SetSeqID(0)
ri := rpcinfo.NewRPCInfo(nil, nil, iv, nil, nil)
ctl := NewControlFrame()
msg := remote.NewMessage(ctl, nil, ri, remote.Reply, remote.Client)
bufReader := np.NewReaderByteBuffer(reader)
if err = defaultCodec.Decode(ctx, msg, bufReader); err != nil {
return
}
crrst := msg.TransInfo().TransStrInfo()[transmeta.HeaderConnectionReadyToReset]
if len(crrst) > 0 {
// the server is closing this connection
// in this case, let server close the real connection
// mux cli conn will mark itself is closing but will not close connection.
c.closing = true
reader.(io.Closer).Close()
}
return
}
// notify asyncCallback
callback, ok := c.seqIDMap.load(seqID)
if !ok {
reader.(io.Closer).Close()
return
}
bufReader := np.NewReaderByteBuffer(reader)
callback.Recv(bufReader, nil)
return nil
}
// Close does nothing.
func (c *muxCliConn) Close() error {
return nil
}
func (c *muxCliConn) forceClose() error {
c.shardQueue.Close()
c.Connection.Close()
c.seqIDMap.rangeMap(func(seqID int32, msg EventHandler) {
msg.Recv(nil, ErrConnClosed)
})
return nil
}
func (c *muxCliConn) close() error {
if !c.closing {
return c.forceClose()
}
// if closing, let server close the connection
return nil
}
func (c *muxCliConn) onError(ctx context.Context, err error, connection netpoll.Connection) error {
klog.CtxErrorf(ctx, "KITEX: error=%s", err.Error())
connection.Close()
return err
}
func newMuxSvrConn(connection netpoll.Connection, pool *sync.Pool) *muxSvrConn {
c := &muxSvrConn{
muxConn: newMuxConn(connection),
pool: pool,
}
return c
}
type muxSvrConn struct {
muxConn
pool *sync.Pool // pool of rpcInfo
}
func newMuxConn(connection netpoll.Connection) muxConn {
c := muxConn{}
c.Connection = connection
c.shardQueue = mux.NewShardQueue(mux.ShardSize, connection)
return c
}
var (
_ net.Conn = &muxConn{}
_ netpoll.Connection = &muxConn{}
)
type muxConn struct {
netpoll.Connection // raw conn
shardQueue *mux.ShardQueue // use for write
}
// Put puts the buffer getter back to the queue.
func (c *muxConn) Put(gt mux.WriterGetter) {
c.shardQueue.Add(gt)
}
func (c *muxConn) GracefulShutdown() {
c.shardQueue.Close()
}