-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
protocol.h
253 lines (219 loc) · 9.39 KB
/
protocol.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef BRPC_PROTOCOL_H
#define BRPC_PROTOCOL_H
// To brpc developers: This is a header included by user, don't depend
// on internal structures, use opaque pointers instead.
#include <vector> // std::vector
#include <stdint.h> // uint64_t
#include <gflags/gflags_declare.h> // DECLARE_xxx
#include "butil/endpoint.h" // butil::EndPoint
#include "butil/iobuf.h"
#include "butil/logging.h"
#include "brpc/options.pb.h" // ProtocolType
#include "brpc/socket_id.h" // SocketId
#include "brpc/parse_result.h" // ParseResult
#include "brpc/adaptive_connection_type.h"
#include "brpc/adaptive_protocol_type.h"
namespace google {
namespace protobuf {
class Message;
class MethodDescriptor;
} // namespace protobuf
} // namespace google
namespace butil {
class IOBuf;
}
namespace brpc {
class Socket;
class SocketMessage;
class Controller;
class Authenticator;
class InputMessageBase;
DECLARE_uint64(max_body_size);
DECLARE_bool(log_error_text);
// Get the serialized byte size of the protobuf message,
// different versions of protobuf have different methods
// use template to avoid include `google/protobuf/message.h`
template<typename T>
inline uint32_t GetProtobufByteSize(const T& message) {
#if GOOGLE_PROTOBUF_VERSION >= 3010000
return message.ByteSizeLong();
#else
return static_cast<uint32_t>(message.ByteSize());
#endif
}
// 3 steps to add a new Protocol:
// Step1: Add a new ProtocolType in src/brpc/options.proto
// as identifier of the Protocol.
// Step2: Implement callbacks of struct `Protocol' in policy/ directory.
// Step3: Register the protocol in global.cpp using `RegisterProtocol'
struct Protocol {
// [Required by both client and server]
// The callback to cut a message from `source'.
// Returned message will be passed to process_request and process_response
// later and Destroy()-ed by InputMessenger.
// Returns:
// MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA):
// `source' does not form a complete message yet.
// MakeParseError(PARSE_ERROR_TRY_OTHERS).
// `source' does not fit the protocol, the data should be tried by
// other protocols. If the data is definitely corrupted (e.g. magic
// header matches but other fields are wrong), pop corrupted part
// from `source' before returning.
// MakeMessage(InputMessageBase*):
// The message is parsed successfully and cut from `source'.
typedef ParseResult (*Parse)(butil::IOBuf* source, Socket *socket,
bool read_eof, const void *arg);
Parse parse;
// [Required by client]
// The callback to serialize `request' into `request_buf' which will be
// packed into message by pack_request later. Called once for each RPC.
// `cntl' provides additional data needed by some protocol (say HTTP).
// Call cntl->SetFailed() on error.
typedef void (*SerializeRequest)(
butil::IOBuf* request_buf,
Controller* cntl,
const google::protobuf::Message* request);
SerializeRequest serialize_request;
// [Required by client]
// The callback to pack `request_buf' into `iobuf_out' or `user_message_out'
// Called before sending each request (including retries).
// Remember to pack authentication information when `auth' is not NULL.
// Call cntl->SetFailed() on error.
typedef void (*PackRequest)(
butil::IOBuf* iobuf_out,
SocketMessage** user_message_out,
uint64_t correlation_id,
const google::protobuf::MethodDescriptor* method,
Controller* controller,
const butil::IOBuf& request_buf,
const Authenticator* auth);
PackRequest pack_request;
// [Required by server]
// The callback to handle request `msg' created by a successful parse().
// `msg' must be Destroy()-ed when the processing is done. To make sure
// Destroy() is always called, consider using DestroyingPtr<> defined in
// destroyable.h
// May be called in a different thread from parse().
typedef void (*ProcessRequest)(InputMessageBase* msg);
ProcessRequest process_request;
// [Required by client]
// The callback to handle response `msg' created by a successful parse().
// `msg' must be Destroy()-ed when the processing is done. To make sure
// Destroy() is always called, consider using DestroyingPtr<> defined in
// destroyable.h
// May be called in a different thread from parse().
typedef void (*ProcessResponse)(InputMessageBase* msg);
ProcessResponse process_response;
// [Required by authenticating server]
// The callback to verify authentication of this socket. Only called
// on the first message that a socket receives. Can be NULL when
// authentication is not needed or this is the client side.
// Returns true on successful authentication.
typedef bool (*Verify)(const InputMessageBase* msg);
Verify verify;
// [Optional]
// Convert `server_addr_and_port'(a parameter to Channel) to butil::EndPoint.
typedef bool (*ParseServerAddress)(butil::EndPoint* out,
const char* server_addr_and_port);
ParseServerAddress parse_server_address;
// [Optional] Customize method name.
typedef const std::string& (*GetMethodName)(
const google::protobuf::MethodDescriptor* method,
const Controller*);
GetMethodName get_method_name;
// Bitwise-or of supported ConnectionType
ConnectionType supported_connection_type;
// Name of this protocol, must be string constant.
const char* name;
// True if this protocol is supported at client-side.
bool support_client() const {
return serialize_request && pack_request && process_response;
}
// True if this protocol is supported at server-side.
bool support_server() const { return process_request; }
};
const ConnectionType CONNECTION_TYPE_POOLED_AND_SHORT =
(ConnectionType)((int)CONNECTION_TYPE_POOLED |
(int)CONNECTION_TYPE_SHORT);
const ConnectionType CONNECTION_TYPE_ALL =
(ConnectionType)((int)CONNECTION_TYPE_SINGLE |
(int)CONNECTION_TYPE_POOLED |
(int)CONNECTION_TYPE_SHORT);
// [thread-safe]
// Register `protocol' using key=`type'.
// Returns 0 on success, -1 otherwise
int RegisterProtocol(ProtocolType type, const Protocol& protocol);
// [thread-safe]
// Find the protocol registered with key=`type'.
// Returns NULL on not found.
const Protocol* FindProtocol(ProtocolType type);
// [thread-safe]
// List all registered protocols into `vec'.
void ListProtocols(std::vector<Protocol>* vec);
void ListProtocols(std::vector<std::pair<ProtocolType, Protocol> >* vec);
// The common serialize_request implementation used by many protocols.
void SerializeRequestDefault(butil::IOBuf* buf,
Controller* cntl,
const google::protobuf::Message* request);
// Replacements for msg->ParseFromXXX() to make the bytes limit in pb
// consistent with -max_body_size
bool ParsePbFromZeroCopyStream(google::protobuf::Message* msg,
google::protobuf::io::ZeroCopyInputStream* input);
bool ParsePbFromIOBuf(google::protobuf::Message* msg, const butil::IOBuf& buf);
bool ParsePbTextFromIOBuf(google::protobuf::Message* msg, const butil::IOBuf& buf);
bool ParsePbFromArray(google::protobuf::Message* msg, const void* data, size_t size);
bool ParsePbFromString(google::protobuf::Message* msg, const std::string& str);
// Deleter for unique_ptr to print error_text of the controller when
// -log_error_text is on, then delete the controller if `delete_cntl' is true
class LogErrorTextAndDelete {
public:
explicit LogErrorTextAndDelete(bool delete_cntl = true)
: _delete_cntl(delete_cntl) {}
void operator()(Controller* c) const;
private:
bool _delete_cntl;
};
// Utility to build a temporary array.
// Example:
// TemporaryArrayBuilder<Foo, 5> b;
// b.push() = Foo1;
// b.push() = Foo2;
// UseArray(b.raw_array(), b.size());
template <typename T, size_t N>
class TemporaryArrayBuilder {
public:
TemporaryArrayBuilder() : _size(0) {}
T& push() {
if (_size < N) {
return _arr[_size++];
} else {
CHECK(false) << "push to a full array, cap=" << N;
static T dummy;
return dummy;
}
}
T& operator[](size_t i) { return _arr[i]; }
size_t size() const { return _size; }
T* raw_array() { return _arr; }
private:
size_t _size;
T _arr[N];
};
} // namespace brpc
#endif // BRPC_PROTOCOL_H