-
Notifications
You must be signed in to change notification settings - Fork 284
/
ChannelInfo.java
192 lines (167 loc) · 5.97 KB
/
ChannelInfo.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
/*
* Copyright (c) 2018 Baidu, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.baidu.brpc;
import com.baidu.brpc.buffer.DynamicCompositeByteBuf;
import com.baidu.brpc.client.BrpcChannelGroup;
import com.baidu.brpc.client.FastFutureStore;
import com.baidu.brpc.client.RpcFuture;
import com.baidu.brpc.exceptions.RpcException;
import com.baidu.brpc.protocol.Protocol;
import com.baidu.brpc.protocol.RpcResponse;
import io.netty.channel.Channel;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
@Setter
@Getter
@Slf4j
public class ChannelInfo {
private static final AttributeKey<ChannelInfo> CLIENT_CHANNEL_KEY = AttributeKey.valueOf("client_key");
private static final AttributeKey<ChannelInfo> SERVER_CHANNEL_KEY = AttributeKey.valueOf("server_key");
private Channel channel;
private BrpcChannelGroup channelGroup;
// 是否来自于业务RpcContext手动设置的
private boolean isFromRpcContext = false;
private Protocol protocol;
private long logId;
private FastFutureStore pendingRpc;
private DynamicCompositeByteBuf recvBuf = new DynamicCompositeByteBuf(16);
public static ChannelInfo getOrCreateClientChannelInfo(Channel channel) {
Attribute<ChannelInfo> attribute = channel.attr(ChannelInfo.CLIENT_CHANNEL_KEY);
ChannelInfo channelInfo = attribute.get();
if (channelInfo == null) {
channelInfo = new ChannelInfo();
// 此时FastFutureStore单例对象已经在RpcClient创建时初始化过了
channelInfo.setPendingRpc(FastFutureStore.getInstance(0));
channelInfo.setChannel(channel);
attribute.set(channelInfo);
}
return channelInfo;
}
public static ChannelInfo getClientChannelInfo(Channel channel) {
Attribute<ChannelInfo> attribute = channel.attr(ChannelInfo.CLIENT_CHANNEL_KEY);
ChannelInfo channelInfo = attribute.get();
return channelInfo;
}
public static ChannelInfo getOrCreateServerChannelInfo(Channel channel) {
Attribute<ChannelInfo> attribute = channel.attr(ChannelInfo.SERVER_CHANNEL_KEY);
ChannelInfo channelInfo = attribute.get();
if (channelInfo == null) {
channelInfo = new ChannelInfo();
channelInfo.setChannel(channel);
attribute.set(channelInfo);
}
return channelInfo;
}
public static ChannelInfo getServerChannelInfo(Channel channel) {
Attribute<ChannelInfo> attribute = channel.attr(ChannelInfo.SERVER_CHANNEL_KEY);
ChannelInfo channelInfo = attribute.get();
return channelInfo;
}
public long addRpcFuture(RpcFuture future) {
// FastFutureStore会保证返回的logId不会占用已经使用过的slot
return pendingRpc.put(future);
}
public RpcFuture getRpcFuture(long logId) {
return pendingRpc.get(logId);
}
public RpcFuture removeRpcFuture(long logId) {
return pendingRpc.getAndRemove(logId);
}
/**
* return channel when fail
*/
public void handleRequestFail() {
channelGroup.incFailedNum();
returnChannelAfterRequest();
}
/**
* return channel when success
*/
public void handleRequestSuccess() {
returnChannelAfterRequest();
}
private void returnChannelAfterRequest() {
if (isFromRpcContext()) {
return;
}
if (protocol.returnChannelBeforeResponse()) {
channelGroup.returnChannel(channel);
}
}
/**
* return channel when fail
*/
public void handleResponseFail() {
channelGroup.incFailedNum();
returnChannelAfterResponse();
}
/**
* return channel when success
*/
public void handleResponseSuccess() {
returnChannelAfterResponse();
}
private void returnChannelAfterResponse() {
if (isFromRpcContext()) {
return;
}
if (!protocol.returnChannelBeforeResponse()) {
channelGroup.returnChannel(channel);
}
}
/**
* channel不可用时或者handler出现异常时处理逻辑
*/
public void handleChannelException(RpcException ex) {
if (isFromRpcContext()) {
return;
}
channelGroup.removeChannel(channel);
// 遍历并删除当前channel下所有RpcFuture
pendingRpc.traverse(new ChannelErrorStoreWalker(channel, ex));
}
protected ChannelInfo() {
}
/**
* 用于遍历FutureStore元素的实现类
*/
private static class ChannelErrorStoreWalker implements FastFutureStore.StoreWalker {
private Channel currentChannel;
private RpcException exception;
public ChannelErrorStoreWalker(Channel currentChannel, RpcException exception) {
this.currentChannel = currentChannel;
this.exception = exception;
}
@Override
public boolean visitElement(RpcFuture fut) {
// 与当前channel相同则删除
if (currentChannel == fut.getChannelInfo().channel) {
return false;
}
// 不删除返回true
return true;
}
@Override
public void actionAfterDelete(RpcFuture fut) {
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setException(exception);
fut.handleResponse(rpcResponse);
}
}
}