-
Notifications
You must be signed in to change notification settings - Fork 4.7k
/
buffer_impl.h
610 lines (529 loc) · 20.6 KB
/
buffer_impl.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
#pragma once
#include <algorithm>
#include <cstdint>
#include <deque>
#include <string>
#include "envoy/buffer/buffer.h"
#include "envoy/network/io_handle.h"
#include "common/common/assert.h"
#include "common/common/non_copyable.h"
#include "common/common/utility.h"
#include "common/event/libevent.h"
namespace Envoy {
namespace Buffer {
/**
* A Slice manages a contiguous block of bytes.
* The block is arranged like this:
* |<- dataSize() ->|<- reservableSize() ->|
* +-----------------+----------------+----------------------+
* | Drained | Data | Reservable |
* | Unused space | Usable content | New content can be |
* | that formerly | | added here with |
* | was in the Data | | reserve()/commit() |
* | section | | |
* +-----------------+----------------+----------------------+
* ^
* |
* data()
*/
class Slice {
public:
using Reservation = RawSlice;
virtual ~Slice() = default;
/**
* @return a pointer to the start of the usable content.
*/
const uint8_t* data() const { return base_ + data_; }
/**
* @return a pointer to the start of the usable content.
*/
uint8_t* data() { return base_ + data_; }
/**
* @return the size in bytes of the usable content.
*/
uint64_t dataSize() const { return reservable_ - data_; }
/**
* Remove the first `size` bytes of usable content. Runs in O(1) time.
* @param size number of bytes to remove. If greater than data_size(), the result is undefined.
*/
void drain(uint64_t size) {
ASSERT(data_ + size <= reservable_);
data_ += size;
if (data_ == reservable_) {
// All the data in the slice has been drained. Reset the offsets so all
// the data can be reused.
data_ = 0;
reservable_ = 0;
}
}
/**
* @return the number of bytes available to be reserve()d.
* @note Read-only implementations of Slice should return zero from this method.
*/
uint64_t reservableSize() const {
ASSERT(capacity_ >= reservable_);
return capacity_ - reservable_;
}
/**
* Reserve `size` bytes that the caller can populate with content. The caller SHOULD then
* call commit() to add the newly populated content from the Reserved section to the Data
* section.
* @note If there is already an outstanding reservation (i.e., a reservation obtained
* from reserve() that has not been released by calling commit()), this method will
* return a new reservation that replaces it.
* @param size the number of bytes to reserve. The Slice implementation MAY reserve
* fewer bytes than requested (for example, if it doesn't have enough room in the
* Reservable section to fulfill the whole request).
* @return a tuple containing the address of the start of resulting reservation and the
* reservation size in bytes. If the address is null, the reservation failed.
* @note Read-only implementations of Slice should return {nullptr, 0} from this method.
*/
Reservation reserve(uint64_t size) {
if (size == 0) {
return {nullptr, 0};
}
// Verify the semantics that drain() enforces: if the slice is empty, either because
// no data has been added or because all the added data has been drained, the data
// section is at the very start of the slice.
ASSERT(!(dataSize() == 0 && data_ > 0));
uint64_t available_size = capacity_ - reservable_;
if (available_size == 0) {
return {nullptr, 0};
}
uint64_t reservation_size = std::min(size, available_size);
void* reservation = &(base_[reservable_]);
return {reservation, static_cast<size_t>(reservation_size)};
}
/**
* Commit a Reservation that was previously obtained from a call to reserve().
* The Reservation's size is added to the Data section.
* @param reservation a reservation obtained from a previous call to reserve().
* If the reservation is not from this Slice, commit() will return false.
* If the caller is committing fewer bytes than provided by reserve(), it
* should change the mem_ field of the reservation before calling commit().
* For example, if a caller reserve()s 4KB to do a nonblocking socket read,
* and the read only returns two bytes, the caller should set
* reservation.mem_ = 2 and then call `commit(reservation)`.
* @return whether the Reservation was successfully committed to the Slice.
*/
bool commit(const Reservation& reservation) {
if (static_cast<const uint8_t*>(reservation.mem_) != base_ + reservable_ ||
reservable_ + reservation.len_ > capacity_ || reservable_ >= capacity_) {
// The reservation is not from this OwnedSlice.
return false;
}
reservable_ += reservation.len_;
return true;
}
/**
* Copy as much of the supplied data as possible to the end of the slice.
* @param data start of the data to copy.
* @param size number of bytes to copy.
* @return number of bytes copied (may be a smaller than size, may even be zero).
*/
uint64_t append(const void* data, uint64_t size) {
uint64_t copy_size = std::min(size, reservableSize());
uint8_t* dest = base_ + reservable_;
reservable_ += copy_size;
// NOLINTNEXTLINE(clang-analyzer-core.NullDereference)
memcpy(dest, data, copy_size);
return copy_size;
}
/**
* Copy as much of the supplied data as possible to the front of the slice.
* If only part of the data will fit in the slice, the bytes from the _end_ are
* copied.
* @param data start of the data to copy.
* @param size number of bytes to copy.
* @return number of bytes copied (may be a smaller than size, may even be zero).
*/
uint64_t prepend(const void* data, uint64_t size) {
const uint8_t* src = static_cast<const uint8_t*>(data);
uint64_t copy_size;
if (dataSize() == 0) {
// There is nothing in the slice, so put the data at the very end in case the caller
// later tries to prepend anything else in front of it.
copy_size = std::min(size, reservableSize());
reservable_ = capacity_;
data_ = capacity_ - copy_size;
} else {
if (data_ == 0) {
// There is content in the slice, and no space in front of it to write anything.
return 0;
}
// Write into the space in front of the slice's current content.
copy_size = std::min(size, data_);
data_ -= copy_size;
}
memcpy(base_ + data_, src + size - copy_size, copy_size);
return copy_size;
}
protected:
Slice(uint64_t data, uint64_t reservable, uint64_t capacity)
: data_(data), reservable_(reservable), capacity_(capacity) {}
/** Start of the slice - subclasses must set this */
uint8_t* base_{nullptr};
/** Offset in bytes from the start of the slice to the start of the Data section */
uint64_t data_;
/** Offset in bytes from the start of the slice to the start of the Reservable section */
uint64_t reservable_;
/** Total number of bytes in the slice */
uint64_t capacity_;
};
using SlicePtr = std::unique_ptr<Slice>;
// OwnedSlice can not be derived from as it has variable sized array as member.
class OwnedSlice final : public Slice, public InlineStorage {
public:
/**
* Create an empty OwnedSlice.
* @param capacity number of bytes of space the slice should have.
* @return an OwnedSlice with at least the specified capacity.
*/
static SlicePtr create(uint64_t capacity) {
uint64_t slice_capacity = sliceSize(capacity);
return SlicePtr(new (slice_capacity) OwnedSlice(slice_capacity));
}
/**
* Create an OwnedSlice and initialize it with a copy of the supplied copy.
* @param data the content to copy into the slice.
* @param size length of the content.
* @return an OwnedSlice containing a copy of the content, which may (dependent on
* the internal implementation) have a nonzero amount of reservable space at the end.
*/
static SlicePtr create(const void* data, uint64_t size) {
uint64_t slice_capacity = sliceSize(size);
std::unique_ptr<OwnedSlice> slice(new (slice_capacity) OwnedSlice(slice_capacity));
memcpy(slice->base_, data, size);
slice->reservable_ = size;
return slice;
}
private:
OwnedSlice(uint64_t size) : Slice(0, 0, size) { base_ = storage_; }
/**
* Compute a slice size big enough to hold a specified amount of data.
* @param data_size the minimum amount of data the slice must be able to store, in bytes.
* @return a recommended slice size, in bytes.
*/
static uint64_t sliceSize(uint64_t data_size) {
static constexpr uint64_t PageSize = 4096;
const uint64_t num_pages = (sizeof(OwnedSlice) + data_size + PageSize - 1) / PageSize;
return num_pages * PageSize - sizeof(OwnedSlice);
}
uint8_t storage_[];
};
/**
* Queue of SlicePtr that supports efficient read and write access to both
* the front and the back of the queue.
* @note This class has similar properties to std::deque<T>. The reason for using
* a custom deque implementation is that benchmark testing during development
* revealed that std::deque was too slow to reach performance parity with the
* prior evbuffer-based buffer implementation.
*/
class SliceDeque {
public:
SliceDeque() : ring_(inline_ring_), capacity_(InlineRingCapacity) {}
SliceDeque(SliceDeque&& rhs) noexcept {
// This custom move constructor is needed so that ring_ will be updated properly.
std::move(rhs.inline_ring_, rhs.inline_ring_ + InlineRingCapacity, inline_ring_);
external_ring_ = std::move(rhs.external_ring_);
ring_ = (external_ring_ != nullptr) ? external_ring_.get() : inline_ring_;
start_ = rhs.start_;
size_ = rhs.size_;
capacity_ = rhs.capacity_;
}
SliceDeque& operator=(SliceDeque&& rhs) noexcept {
// This custom assignment move operator is needed so that ring_ will be updated properly.
std::move(rhs.inline_ring_, rhs.inline_ring_ + InlineRingCapacity, inline_ring_);
external_ring_ = std::move(rhs.external_ring_);
ring_ = (external_ring_ != nullptr) ? external_ring_.get() : inline_ring_;
start_ = rhs.start_;
size_ = rhs.size_;
capacity_ = rhs.capacity_;
return *this;
}
void emplace_back(SlicePtr&& slice) {
growRing();
size_t index = internalIndex(size_);
ring_[index] = std::move(slice);
size_++;
}
void emplace_front(SlicePtr&& slice) {
growRing();
start_ = (start_ == 0) ? capacity_ - 1 : start_ - 1;
ring_[start_] = std::move(slice);
size_++;
}
bool empty() const { return size() == 0; }
size_t size() const { return size_; }
SlicePtr& front() { return ring_[start_]; }
const SlicePtr& front() const { return ring_[start_]; }
SlicePtr& back() { return ring_[internalIndex(size_ - 1)]; }
const SlicePtr& back() const { return ring_[internalIndex(size_ - 1)]; }
SlicePtr& operator[](size_t i) { return ring_[internalIndex(i)]; }
const SlicePtr& operator[](size_t i) const { return ring_[internalIndex(i)]; }
void pop_front() {
if (size() == 0) {
return;
}
front() = SlicePtr();
size_--;
start_++;
if (start_ == capacity_) {
start_ = 0;
}
}
void pop_back() {
if (size() == 0) {
return;
}
back() = SlicePtr();
size_--;
}
/**
* Forward const iterator for SliceDeque.
* @note this implementation currently supports the minimum functionality needed to support
* the `for (const auto& slice : slice_deque)` idiom.
*/
class ConstIterator {
public:
const SlicePtr& operator*() { return deque_[index_]; }
ConstIterator operator++() {
index_++;
return *this;
}
bool operator!=(const ConstIterator& rhs) const {
return &deque_ != &rhs.deque_ || index_ != rhs.index_;
}
friend class SliceDeque;
private:
ConstIterator(const SliceDeque& deque, size_t index) : deque_(deque), index_(index) {}
const SliceDeque& deque_;
size_t index_;
};
ConstIterator begin() const noexcept { return {*this, 0}; }
ConstIterator end() const noexcept { return {*this, size_}; }
private:
constexpr static size_t InlineRingCapacity = 8;
size_t internalIndex(size_t index) const {
size_t internal_index = start_ + index;
if (internal_index >= capacity_) {
internal_index -= capacity_;
ASSERT(internal_index < capacity_);
}
return internal_index;
}
void growRing() {
if (size_ < capacity_) {
return;
}
const size_t new_capacity = capacity_ * 2;
auto new_ring = std::make_unique<SlicePtr[]>(new_capacity);
for (size_t i = 0; i < new_capacity; i++) {
ASSERT(new_ring[i] == nullptr);
}
size_t src = start_;
size_t dst = 0;
for (size_t i = 0; i < size_; i++) {
new_ring[dst++] = std::move(ring_[src++]);
if (src == capacity_) {
src = 0;
}
}
for (size_t i = 0; i < capacity_; i++) {
ASSERT(ring_[i].get() == nullptr);
}
external_ring_.swap(new_ring);
ring_ = external_ring_.get();
start_ = 0;
capacity_ = new_capacity;
}
SlicePtr inline_ring_[InlineRingCapacity];
std::unique_ptr<SlicePtr[]> external_ring_;
SlicePtr* ring_; // points to start of either inline or external ring.
size_t start_{0};
size_t size_{0};
size_t capacity_;
};
class UnownedSlice : public Slice {
public:
UnownedSlice(BufferFragment& fragment)
: Slice(0, fragment.size(), fragment.size()), fragment_(fragment) {
base_ = static_cast<uint8_t*>(const_cast<void*>(fragment.data()));
}
~UnownedSlice() override { fragment_.done(); }
private:
BufferFragment& fragment_;
};
/**
* An implementation of BufferFragment where a releasor callback is called when the data is
* no longer needed.
*/
class BufferFragmentImpl : NonCopyable, public BufferFragment {
public:
/**
* Creates a new wrapper around the externally owned <data> of size <size>.
* The caller must ensure <data> is valid until releasor() is called, or for the lifetime of the
* fragment. releasor() is called with <data>, <size> and <this> to allow caller to delete
* the fragment object.
* @param data external data to reference
* @param size size of data
* @param releasor a callback function to be called when data is no longer needed.
*/
BufferFragmentImpl(
const void* data, size_t size,
const std::function<void(const void*, size_t, const BufferFragmentImpl*)>& releasor)
: data_(data), size_(size), releasor_(releasor) {}
// Buffer::BufferFragment
const void* data() const override { return data_; }
size_t size() const override { return size_; }
void done() override {
if (releasor_) {
releasor_(data_, size_, this);
}
}
private:
const void* const data_;
const size_t size_;
const std::function<void(const void*, size_t, const BufferFragmentImpl*)> releasor_;
};
class LibEventInstance : public Instance {
public:
// Allows access into the underlying buffer for move() optimizations.
virtual Event::Libevent::BufferPtr& buffer() PURE;
// Called after accessing the memory in buffer() directly to allow any post-processing.
virtual void postProcess() PURE;
};
/**
* Wrapper for uint64_t that asserts upon integer overflow and underflow.
*/
class OverflowDetectingUInt64 {
public:
operator uint64_t() const { return value_; }
OverflowDetectingUInt64& operator+=(uint64_t size) {
uint64_t new_value = value_ + size;
RELEASE_ASSERT(new_value >= value_, "64-bit unsigned integer overflowed");
value_ = new_value;
return *this;
}
OverflowDetectingUInt64& operator-=(uint64_t size) {
RELEASE_ASSERT(value_ >= size, "unsigned integer underflowed");
value_ -= size;
return *this;
}
private:
uint64_t value_{0};
};
/**
* Wraps an allocated and owned buffer.
*
* Note that due to the internals of move(), OwnedImpl is not
* compatible with non-OwnedImpl buffers.
*/
class OwnedImpl : public LibEventInstance {
public:
OwnedImpl();
OwnedImpl(absl::string_view data);
OwnedImpl(const Instance& data);
OwnedImpl(const void* data, uint64_t size);
// Buffer::Instance
void add(const void* data, uint64_t size) override;
void addBufferFragment(BufferFragment& fragment) override;
void add(absl::string_view data) override;
void add(const Instance& data) override;
void prepend(absl::string_view data) override;
void prepend(Instance& data) override;
void commit(RawSlice* iovecs, uint64_t num_iovecs) override;
void copyOut(size_t start, uint64_t size, void* data) const override;
void drain(uint64_t size) override;
uint64_t getRawSlices(RawSlice* out, uint64_t out_size) const override;
uint64_t length() const override;
void* linearize(uint32_t size) override;
void move(Instance& rhs) override;
void move(Instance& rhs, uint64_t length) override;
Api::IoCallUint64Result read(Network::IoHandle& io_handle, uint64_t max_length) override;
uint64_t reserve(uint64_t length, RawSlice* iovecs, uint64_t num_iovecs) override;
ssize_t search(const void* data, uint64_t size, size_t start) const override;
bool startsWith(absl::string_view data) const override;
Api::IoCallUint64Result write(Network::IoHandle& io_handle) override;
std::string toString() const override;
// LibEventInstance
Event::Libevent::BufferPtr& buffer() override { return buffer_; }
void postProcess() override;
/**
* Create a new slice at the end of the buffer, and copy the supplied content into it.
* @param data start of the content to copy.
*
*/
void appendSliceForTest(const void* data, uint64_t size);
/**
* Create a new slice at the end of the buffer, and copy the supplied string into it.
* @param data the string to append to the buffer.
*/
void appendSliceForTest(absl::string_view data);
// Support for choosing the buffer implementation at runtime.
// TODO(brian-pane) remove this once the new implementation has been
// running in production for a while.
/** @return whether this buffer uses the old evbuffer-based implementation. */
bool usesOldImpl() const { return old_impl_; }
/**
* @param use_old_impl whether to use the evbuffer-based implementation for new buffers
* @warning Except for testing code, this method should be called at most once per process,
* before any OwnedImpl objects are created. The reason is that it is unsafe to
* mix and match buffers with different implementations. The move() method,
* in particular, only works if the source and destination objects are using
* the same destination.
*/
static void useOldImpl(bool use_old_impl);
private:
/**
* @param rhs another buffer
* @return whether the rhs buffer is also an instance of OwnedImpl (or a subclass) that
* uses the same internal implementation as this buffer.
*/
bool isSameBufferImpl(const Instance& rhs) const;
/** Whether to use the old evbuffer implementation when constructing new OwnedImpl objects. */
static bool use_old_impl_;
/** Whether this buffer uses the old evbuffer implementation. */
bool old_impl_;
/** Ring buffer of slices. */
SliceDeque slices_;
/** Sum of the dataSize of all slices. */
OverflowDetectingUInt64 length_;
/** Used when old_impl_==true */
Event::Libevent::BufferPtr buffer_;
};
using BufferFragmentPtr = std::unique_ptr<BufferFragment>;
/**
* An implementation of BufferFragment where a releasor callback is called when the data is
* no longer needed. Copies data into internal buffer.
*/
class OwnedBufferFragmentImpl final : public BufferFragment, public InlineStorage {
public:
using Releasor = std::function<void(const OwnedBufferFragmentImpl*)>;
/**
* Copies the data into internal buffer. The releasor is called when the data has been
* fully drained or the buffer that contains this fragment is destroyed.
* @param data external data to reference
* @param releasor a callback function to be called when data is no longer needed.
*/
static BufferFragmentPtr create(absl::string_view data, const Releasor& releasor) {
return BufferFragmentPtr(new (sizeof(OwnedBufferFragmentImpl) + data.size())
OwnedBufferFragmentImpl(data, releasor));
}
// Buffer::BufferFragment
const void* data() const override { return data_; }
size_t size() const override { return size_; }
void done() override { releasor_(this); }
private:
OwnedBufferFragmentImpl(absl::string_view data, const Releasor& releasor)
: releasor_(releasor), size_(data.size()) {
ASSERT(releasor != nullptr);
memcpy(data_, data.data(), data.size());
}
const Releasor releasor_;
const size_t size_;
uint8_t data_[];
};
using OwnedBufferFragmentImplPtr = std::unique_ptr<OwnedBufferFragmentImpl>;
} // namespace Buffer
} // namespace Envoy