-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
input_messenger.h
164 lines (134 loc) · 5.86 KB
/
input_messenger.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#ifndef BRPC_INPUT_MESSENGER_H
#define BRPC_INPUT_MESSENGER_H
#include "butil/iobuf.h" // butil::IOBuf
#include "brpc/socket.h" // SocketId, SocketUser
#include "brpc/parse_result.h" // ParseResult
#include "brpc/input_message_base.h" // InputMessageBase
namespace brpc {
namespace rdma {
class RdmaEndpoint;
}
struct InputMessageHandler {
// The callback to cut a message from `source'.
// Returned message will be passed to process_request or process_response
// later and Destroy()-ed by them.
// Returns:
// MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA):
// `source' does not form a complete message yet.
// MakeParseError(PARSE_ERROR_TRY_OTHERS).
// `source' does not fit the protocol, the data should be tried by
// other protocols. If the data is definitely corrupted (e.g. magic
// header matches but other fields are wrong), pop corrupted part
// from `source' before returning.
// MakeMessage(InputMessageBase*):
// The message is parsed successfully and cut from `source'.
typedef ParseResult (*Parse)(butil::IOBuf* source, Socket *socket,
bool read_eof, const void *arg);
Parse parse;
// The callback to handle `msg' created by a successful parse().
// `msg' must be Destroy()-ed when the processing is done. To make sure
// Destroy() is always called, consider using DestroyingPtr<> defined in
// destroyable.h
// May be called in a different thread from parse().
typedef void (*Process)(InputMessageBase* msg);
Process process;
// The callback to verify authentication of this socket. Only called
// on the first message that a socket receives. Can be NULL when
// authentication is not needed or this is the client side.
// Returns true on successful authentication.
typedef bool (*Verify)(const InputMessageBase* msg);
Verify verify;
// An argument associated with the handler.
const void* arg;
// Name of this handler, must be string constant.
const char* name;
};
// Process messages from connections.
// `Message' corresponds to a client's request or a server's response.
class InputMessenger : public SocketUser {
friend class rdma::RdmaEndpoint;
public:
explicit InputMessenger(size_t capacity = 128);
~InputMessenger();
// [thread-safe] Must be called at least once before Start().
// `handler' contains user-supplied callbacks to cut off and
// process messages from connections.
// Returns 0 on success, -1 otherwise.
int AddHandler(const InputMessageHandler& handler);
// [thread-safe] Create a socket to process input messages.
int Create(const butil::EndPoint& remote_side,
time_t health_check_interval_s,
SocketId* id);
// Overwrite necessary fields in `base_options' and create a socket with
// the modified options.
int Create(SocketOptions base_options, SocketId* id);
// Returns the internal index of `InputMessageHandler' whose name=`name'
// Returns -1 when not found
int FindProtocolIndex(const char* name) const;
int FindProtocolIndex(ProtocolType type) const;
// Get name of the n-th handler
const char* NameOfProtocol(int n) const;
// Add a handler which doesn't belong to any registered protocol.
// Note: Invoking this method indicates that you are using Socket without
// Channel nor Server.
int AddNonProtocolHandler(const InputMessageHandler& handler);
protected:
// Load data from m->fd() into m->read_buf, cut off new messages and
// call callbacks.
static void OnNewMessages(Socket* m);
private:
class InputMessageClosure {
public:
InputMessageClosure() : _msg(NULL) { }
~InputMessageClosure() noexcept(false);
InputMessageBase* release() {
InputMessageBase* m = _msg;
_msg = NULL;
return m;
}
void reset(InputMessageBase* m);
private:
InputMessageBase* _msg;
};
// Find a valid scissor from `handlers' to cut off `header' and `payload'
// from m->read_buf, save index of the scissor into `index'.
ParseResult CutInputMessage(Socket* m, size_t* index, bool read_eof);
// Process a new message just received in OnNewMessages
// Return value >= 0 means success
int ProcessNewMessage(
Socket* m, ssize_t bytes, bool read_eof,
const uint64_t received_us, const uint64_t base_realtime,
InputMessageClosure& last_msg);
// User-supplied scissors and handlers.
// the index of handler is exactly the same as the protocol
InputMessageHandler* _handlers;
// Max added protocol type
butil::atomic<int> _max_index;
bool _non_protocol;
size_t _capacity;
butil::Mutex _add_handler_mutex;
};
// Get the global InputMessenger at client-side.
BUTIL_FORCE_INLINE InputMessenger* get_client_side_messenger() {
extern InputMessenger* g_messenger;
return g_messenger;
}
InputMessenger* get_or_new_client_side_messenger();
} // namespace brpc
#endif // BRPC_INPUT_MESSENGER_H