Skip to content

Commit

Permalink
storage: fix race between segment.ms and appends
Browse files Browse the repository at this point in the history
H/T to VladLazar for a similar change that inspired this one:
VladLazar@682aea5

The problem statement:
```
We have seen a couple of races between the application of `segment.ms`
and the normal append path. They had the following pattern in common:
1. application of `segment.ms` begins
2. a call to `segment::append` is interleaved
3. the append finishes first and and advances the dirty offsets, which
the rolling logic in `segment.ms` does not expect
-- or --
4. `segment.ms` releases the current appender while the append is
ongoing, which the append logic does not expect
```

The proposed fix was to introduce a new appender lock to the segment, and
ensure that it is held while appending an while segment.ms rolling. This
addressed problem #3, but wasn't sufficient to address redpanda-data#4.

The issue with introducing another lock to the segment is that the
unexpected behavior when appending to a segment happens in the context
of an already referenced segment. I.e. the appending fiber may proceed
to reference an appender, only for it to be destructed by the
housekeeping fiber before segment::append() is called, resulting in a
segfault.

This patch extends usage of the existing disk_log_impl::_segments_rolling_lock
to cover the entire duration of append (i.e. not just the underlying
segment::append() call), ensuring that segment.ms rolls and appends are
mutually exclusive.
  • Loading branch information
andrwng committed Aug 2, 2023
1 parent 5035f9a commit 4c05904
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 12 deletions.
14 changes: 13 additions & 1 deletion src/v/storage/disk_log_appender.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ bool disk_log_appender::needs_to_roll_log(model::term_id batch_term) const {
* _bytes_left_in_segment is for initial condition
*
*/
if (!_seg || !_seg->has_appender()) {
return true;
}
return _bytes_left_in_segment == 0 || _log.term() != batch_term
|| _log._segs.empty() /*see above before removing this condition*/;
}
Expand All @@ -73,6 +76,14 @@ void disk_log_appender::release_lock() {

ss::future<ss::stop_iteration>
disk_log_appender::operator()(model::record_batch& batch) {
// We use a fast path here since this lock should very rarely be contested.
// An open segment may only have one in-flight append at any given time
// and the only other place this lock is held is when enforcing segment.ms
// (which should rarely happen in high throughput scenarios).
auto segment_roll_lock_holder = _log.try_segment_roll_lock();
if (!segment_roll_lock_holder.has_value()) {
segment_roll_lock_holder = co_await _log.segment_roll_lock();
}
batch.header().base_offset = _idx;
batch.header().header_crc = model::internal_header_only_crc(batch.header());
if (_last_term != batch.term()) {
Expand All @@ -94,7 +105,8 @@ disk_log_appender::operator()(model::record_batch& batch) {
break;
}
release_lock();
co_await _log.maybe_roll(_last_term, _idx, _config.io_priority);
co_await _log.maybe_roll_unlocked(
_last_term, _idx, _config.io_priority);
co_await initialize();
}
}
Expand Down
20 changes: 10 additions & 10 deletions src/v/storage/disk_log_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1222,15 +1222,11 @@ ss::future<> disk_log_impl::force_roll(ss::io_priority_class iopc) {
});
}

ss::future<> disk_log_impl::maybe_roll(
ss::future<> disk_log_impl::maybe_roll_unlocked(
model::term_id t, model::offset next_offset, ss::io_priority_class iopc) {
// This lock will only rarely be contended. If it is held, then
// we must wait for do_housekeeping to complete before proceeding, because
// the log might be in a state mid-roll where it has no appender.
// We need to take this irrespective of whether we're actually rolling
// or not, in order to ensure that writers wait for a background roll
// to complete if one is ongoing.
auto roll_lock_holder = co_await _segments_rolling_lock.get_units();
vassert(
!_segments_rolling_lock.ready(),
"Must have taken _segments_rolling_lock");

vassert(t >= term(), "Term:{} must be greater than base:{}", t, term());
if (_segs.empty()) {
Expand All @@ -1253,8 +1249,12 @@ ss::future<> disk_log_impl::maybe_roll(

ss::future<> disk_log_impl::apply_segment_ms() {
auto gate = _compaction_housekeeping_gate.hold();
// do_housekeeping races with maybe_roll to use new_segment.
// take a lock to prevent problems
// Holding the lock blocks writes to the last open segment.
// This is required in order to avoid the logic in this function
// racing with an inflight append. Contention on this lock should
// be very light, since we wouldn't need to enforce segment.ms
// if this partition was high throughput (segment would have rolled
// naturally).
auto lock = co_await _segments_rolling_lock.get_units();

if (_segs.empty()) {
Expand Down
18 changes: 17 additions & 1 deletion src/v/storage/disk_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ class disk_log_impl final : public log::impl {
std::optional<model::offset> index_lower_bound(model::offset o) const final;
std::ostream& print(std::ostream&) const final;

ss::future<> maybe_roll(
// Must be called while _segments_rolling_lock is held.
ss::future<> maybe_roll_unlocked(
model::term_id, model::offset next_offset, ss::io_priority_class);

// roll immediately with the current term. users should prefer the
Expand Down Expand Up @@ -135,6 +136,14 @@ class disk_log_impl final : public log::impl {

ss::future<reclaimable_offsets> get_reclaimable_offsets(gc_config cfg);

std::optional<ssx::semaphore_units> try_segment_roll_lock() {
return _segments_rolling_lock.try_get_units();
}

ss::future<ssx::semaphore_units> segment_roll_lock() {
return _segments_rolling_lock.get_units();
}

private:
friend class disk_log_appender; // for multi-term appends
friend class disk_log_builder; // for tests
Expand Down Expand Up @@ -274,6 +283,13 @@ class disk_log_impl final : public log::impl {

// Mutually exclude operations that will cause segment rolling
// do_housekeeping and maybe_roll
//
// This lock will only rarely be contended. If it is held, then
// we must wait for do_housekeeping to complete before proceeding, because
// the log might be in a state mid-roll where it has no appender.
// We need to take this irrespective of whether we're actually rolling
// or not, in order to ensure that writers wait for a background roll
// to complete if one is ongoing.
mutex _segments_rolling_lock;

std::optional<model::offset> _cloud_gc_offset;
Expand Down

0 comments on commit 4c05904

Please sign in to comment.