-
Notifications
You must be signed in to change notification settings - Fork 0
/
AsyncConnection.h
409 lines (381 loc) · 13.7 KB
/
AsyncConnection.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2014 UnitedStack <haomai@unitedstack.com>
*
* Author: Haomai Wang <haomaiwang@gmail.com>
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#ifndef CEPH_MSG_ASYNCCONNECTION_H
#define CEPH_MSG_ASYNCCONNECTION_H
#include <atomic>
#include <pthread.h>
#include <signal.h>
#include <climits>
#include <list>
#include <mutex>
#include <map>
using namespace std;
#include "auth/AuthSessionHandler.h"
#include "common/ceph_time.h"
#include "common/perf_counters.h"
#include "include/buffer.h"
#include "msg/Connection.h"
#include "msg/Messenger.h"
#include "Event.h"
#include "Stack.h"
class AsyncMessenger;
class Worker;
static const int ASYNC_IOV_MAX = (IOV_MAX >= 1024 ? IOV_MAX / 4 : IOV_MAX);
/*
* AsyncConnection maintains a logic session between two endpoints. In other
* word, a pair of addresses can find the only AsyncConnection. AsyncConnection
* will handle with network fault or read/write transactions. If one file
* descriptor broken, AsyncConnection will maintain the message queue and
* sequence, try to reconnect peer endpoint.
*/
class AsyncConnection : public Connection {
ssize_t read_bulk(char *buf, unsigned len);
ssize_t do_sendmsg(struct msghdr &msg, unsigned len, bool more);
ssize_t try_send(bufferlist &bl, bool more=false) {
std::lock_guard<std::mutex> l(write_lock);
outcoming_bl.claim_append(bl);
return _try_send(more);
}
// if "send" is false, it will only append bl to send buffer
// the main usage is avoid error happen outside messenger threads
ssize_t _try_send(bool more=false);
ssize_t _send(Message *m);
void prepare_send_message(uint64_t features, Message *m, bufferlist &bl);
ssize_t read_until(unsigned needed, char *p);
ssize_t _process_connection();
void _connect();
void _stop();
int handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r);
ssize_t handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl);
void was_session_reset();
void fault();
void discard_out_queue();
void discard_requeued_up_to(uint64_t seq);
void requeue_sent();
int randomize_out_seq();
void handle_ack(uint64_t seq);
void _send_keepalive_or_ack(bool ack=false, utime_t *t=NULL);
ssize_t write_message(Message *m, bufferlist& bl, bool more);
void inject_delay();
ssize_t _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply,
bufferlist &authorizer_reply) {
bufferlist reply_bl;
reply.tag = tag;
reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required;
reply.authorizer_len = authorizer_reply.length();
reply_bl.append((char*)&reply, sizeof(reply));
if (reply.authorizer_len) {
reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
}
ssize_t r = try_send(reply_bl);
if (r < 0) {
inject_delay();
return -1;
}
state = STATE_ACCEPTING_WAIT_CONNECT_MSG;
return 0;
}
bool is_queued() const {
return !out_q.empty() || outcoming_bl.length();
}
void shutdown_socket() {
for (auto &&t : register_time_events)
center->delete_time_event(t);
register_time_events.clear();
if (last_tick_id) {
center->delete_time_event(last_tick_id);
last_tick_id = 0;
}
if (cs) {
center->delete_file_event(cs.fd(), EVENT_READABLE|EVENT_WRITABLE);
cs.shutdown();
cs.close();
}
}
Message *_get_next_outgoing(bufferlist *bl) {
Message *m = 0;
while (!m && !out_q.empty()) {
map<int, list<pair<bufferlist, Message*> > >::reverse_iterator it = out_q.rbegin();
if (!it->second.empty()) {
list<pair<bufferlist, Message*> >::iterator p = it->second.begin();
m = p->second;
if (bl)
bl->swap(p->first);
it->second.erase(p);
}
if (it->second.empty())
out_q.erase(it->first);
}
return m;
}
bool _has_next_outgoing() const {
return !out_q.empty();
}
void reset_recv_state();
/**
* The DelayedDelivery is for injecting delays into Message delivery off
* the socket. It is only enabled if delays are requested, and if they
* are then it pulls Messages off the DelayQueue and puts them into the
* AsyncMessenger event queue.
*/
class DelayedDelivery : public EventCallback {
std::set<uint64_t> register_time_events; // need to delete it if stop
std::deque<std::pair<utime_t, Message*> > delay_queue;
std::mutex delay_lock;
AsyncMessenger *msgr;
EventCenter *center;
DispatchQueue *dispatch_queue;
uint64_t conn_id;
std::atomic_bool stop_dispatch;
public:
explicit DelayedDelivery(AsyncMessenger *omsgr, EventCenter *c,
DispatchQueue *q, uint64_t cid)
: msgr(omsgr), center(c), dispatch_queue(q), conn_id(cid),
stop_dispatch(false) { }
~DelayedDelivery() {
assert(register_time_events.empty());
assert(delay_queue.empty());
}
void set_center(EventCenter *c) { center = c; }
void do_request(int id) override;
void queue(double delay_period, utime_t release, Message *m) {
std::lock_guard<std::mutex> l(delay_lock);
delay_queue.push_back(std::make_pair(release, m));
register_time_events.insert(center->create_time_event(delay_period*1000000, this));
}
void discard() {
stop_dispatch = true;
center->submit_to(center->get_id(), [this] () mutable {
std::lock_guard<std::mutex> l(delay_lock);
while (!delay_queue.empty()) {
Message *m = delay_queue.front().second;
dispatch_queue->dispatch_throttle_release(m->get_dispatch_throttle_size());
m->put();
delay_queue.pop_front();
}
for (auto i : register_time_events)
center->delete_time_event(i);
register_time_events.clear();
stop_dispatch = false;
}, true);
}
bool ready() const { return !stop_dispatch && delay_queue.empty() && register_time_events.empty(); }
void flush();
} *delay_state;
public:
AsyncConnection(CephContext *cct, AsyncMessenger *m, DispatchQueue *q, Worker *w);
~AsyncConnection();
void maybe_start_delay_thread();
ostream& _conn_prefix(std::ostream *_dout);
bool is_connected() override {
return can_write.load() == WriteStatus::CANWRITE;
}
// Only call when AsyncConnection first construct
void connect(const entity_addr_t& addr, int type) {
set_peer_type(type);
set_peer_addr(addr);
policy = msgr->get_policy(type);
_connect();
}
// Only call when AsyncConnection first construct
void accept(ConnectedSocket socket, entity_addr_t &addr);
int send_message(Message *m) override;
void send_keepalive() override;
void mark_down() override;
void mark_disposable() override {
std::lock_guard<std::mutex> l(lock);
policy.lossy = true;
}
private:
enum {
STATE_NONE,
STATE_OPEN,
STATE_OPEN_KEEPALIVE2,
STATE_OPEN_KEEPALIVE2_ACK,
STATE_OPEN_TAG_ACK,
STATE_OPEN_MESSAGE_HEADER,
STATE_OPEN_MESSAGE_THROTTLE_MESSAGE,
STATE_OPEN_MESSAGE_THROTTLE_BYTES,
STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE,
STATE_OPEN_MESSAGE_READ_FRONT,
STATE_OPEN_MESSAGE_READ_MIDDLE,
STATE_OPEN_MESSAGE_READ_DATA_PREPARE,
STATE_OPEN_MESSAGE_READ_DATA,
STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH,
STATE_OPEN_TAG_CLOSE,
STATE_WAIT_SEND,
STATE_CONNECTING,
STATE_CONNECTING_RE,
STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY,
STATE_CONNECTING_SEND_CONNECT_MSG,
STATE_CONNECTING_WAIT_CONNECT_REPLY,
STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH,
STATE_CONNECTING_WAIT_ACK_SEQ,
STATE_CONNECTING_READY,
STATE_ACCEPTING,
STATE_ACCEPTING_WAIT_BANNER_ADDR,
STATE_ACCEPTING_WAIT_CONNECT_MSG,
STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH,
STATE_ACCEPTING_WAIT_SEQ,
STATE_ACCEPTING_READY,
STATE_STANDBY,
STATE_CLOSED,
STATE_WAIT, // just wait for racing connection
};
static const int TCP_PREFETCH_MIN_SIZE;
static const char *get_state_name(int state) {
const char* const statenames[] = {"STATE_NONE",
"STATE_OPEN",
"STATE_OPEN_KEEPALIVE2",
"STATE_OPEN_KEEPALIVE2_ACK",
"STATE_OPEN_TAG_ACK",
"STATE_OPEN_MESSAGE_HEADER",
"STATE_OPEN_MESSAGE_THROTTLE_MESSAGE",
"STATE_OPEN_MESSAGE_THROTTLE_BYTES",
"STATE_OPEN_MESSAGE_THROTTLE_DISPATCH_QUEUE",
"STATE_OPEN_MESSAGE_READ_FRONT",
"STATE_OPEN_MESSAGE_READ_MIDDLE",
"STATE_OPEN_MESSAGE_READ_DATA_PREPARE",
"STATE_OPEN_MESSAGE_READ_DATA",
"STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH",
"STATE_OPEN_TAG_CLOSE",
"STATE_WAIT_SEND",
"STATE_CONNECTING",
"STATE_CONNECTING_RE",
"STATE_CONNECTING_WAIT_BANNER_AND_IDENTIFY",
"STATE_CONNECTING_SEND_CONNECT_MSG",
"STATE_CONNECTING_WAIT_CONNECT_REPLY",
"STATE_CONNECTING_WAIT_CONNECT_REPLY_AUTH",
"STATE_CONNECTING_WAIT_ACK_SEQ",
"STATE_CONNECTING_READY",
"STATE_ACCEPTING",
"STATE_ACCEPTING_WAIT_BANNER_ADDR",
"STATE_ACCEPTING_WAIT_CONNECT_MSG",
"STATE_ACCEPTING_WAIT_CONNECT_MSG_AUTH",
"STATE_ACCEPTING_WAIT_SEQ",
"STATE_ACCEPTING_READY",
"STATE_STANDBY",
"STATE_CLOSED",
"STATE_WAIT"};
return statenames[state];
}
AsyncMessenger *async_msgr;
uint64_t conn_id;
PerfCounters *logger;
int global_seq;
__u32 connect_seq, peer_global_seq;
atomic64_t out_seq;
atomic64_t ack_left, in_seq;
int state;
int state_after_send;
ConnectedSocket cs;
int port;
Messenger::Policy policy;
DispatchQueue *dispatch_queue;
std::mutex write_lock;
enum class WriteStatus {
NOWRITE,
REPLACING,
CANWRITE,
CLOSED
};
std::atomic<WriteStatus> can_write;
bool open_write;
map<int, list<pair<bufferlist, Message*> > > out_q; // priority queue for outbound msgs
list<Message*> sent; // the first bufferlist need to inject seq
bufferlist outcoming_bl;
bool keepalive;
std::mutex lock;
utime_t backoff; // backoff time
EventCallbackRef read_handler;
EventCallbackRef write_handler;
EventCallbackRef wakeup_handler;
EventCallbackRef tick_handler;
struct iovec msgvec[ASYNC_IOV_MAX];
char *recv_buf;
uint32_t recv_max_prefetch;
uint32_t recv_start;
uint32_t recv_end;
set<uint64_t> register_time_events; // need to delete it if stop
ceph::coarse_mono_clock::time_point last_active;
uint64_t last_tick_id = 0;
const uint64_t inactive_timeout_us;
// Tis section are temp variables used by state transition
// Open state
utime_t recv_stamp;
utime_t throttle_stamp;
unsigned msg_left;
uint64_t cur_msg_size;
ceph_msg_header current_header;
bufferlist data_buf;
bufferlist::iterator data_blp;
bufferlist front, middle, data;
ceph_msg_connect connect_msg;
// Connecting state
bool got_bad_auth;
AuthAuthorizer *authorizer;
bufferlist authorizer_buf;
ceph_msg_connect_reply connect_reply;
// Accepting state
entity_addr_t socket_addr;
CryptoKey session_key;
bool replacing; // when replacing process happened, we will reply connect
// side with RETRY tag and accept side will clear replaced
// connection. So when connect side reissue connect_msg,
// there won't exists conflicting connection so we use
// "replacing" to skip RESETSESSION to avoid detect wrong
// presentation
bool is_reset_from_peer;
bool once_ready;
// used only for local state, it will be overwrite when state transition
char *state_buffer;
// used only by "read_until"
uint64_t state_offset;
Worker *worker;
EventCenter *center;
ceph::shared_ptr<AuthSessionHandler> session_security;
public:
// used by eventcallback
void handle_write();
void process();
void wakeup_from(uint64_t id);
void tick(uint64_t id);
void local_deliver();
void stop(bool queue_reset) {
lock.lock();
bool need_queue_reset = (state != STATE_CLOSED) && queue_reset;
_stop();
lock.unlock();
if (need_queue_reset)
dispatch_queue->queue_reset(this);
}
void cleanup() {
shutdown_socket();
delete read_handler;
delete write_handler;
delete wakeup_handler;
delete tick_handler;
if (delay_state) {
delete delay_state;
delay_state = NULL;
}
}
PerfCounters *get_perf_counter() {
return logger;
}
}; /* AsyncConnection */
typedef boost::intrusive_ptr<AsyncConnection> AsyncConnectionRef;
#endif