diff --git a/src/v/storage/disk_log_appender.cc b/src/v/storage/disk_log_appender.cc index 540eaad32b6a7..6b8123d93d623 100644 --- a/src/v/storage/disk_log_appender.cc +++ b/src/v/storage/disk_log_appender.cc @@ -61,6 +61,9 @@ bool disk_log_appender::needs_to_roll_log(model::term_id batch_term) const { * _bytes_left_in_segment is for initial condition * */ + if (!_seg || !_seg->has_appender()) { + return true; + } return _bytes_left_in_segment == 0 || _log.term() != batch_term || _log._segs.empty() /*see above before removing this condition*/; } @@ -73,6 +76,14 @@ void disk_log_appender::release_lock() { ss::future disk_log_appender::operator()(model::record_batch& batch) { + // We use a fast path here since this lock should very rarely be contested. + // An open segment may only have one in-flight append at any given time + // and the only other place this lock is held is when enforcing segment.ms + // (which should rarely happen in high throughput scenarios). + auto segment_roll_lock_holder = _log.try_segment_roll_lock(); + if (!segment_roll_lock_holder.has_value()) { + segment_roll_lock_holder = co_await _log.segment_roll_lock(); + } batch.header().base_offset = _idx; batch.header().header_crc = model::internal_header_only_crc(batch.header()); if (_last_term != batch.term()) { @@ -94,7 +105,8 @@ disk_log_appender::operator()(model::record_batch& batch) { break; } release_lock(); - co_await _log.maybe_roll(_last_term, _idx, _config.io_priority); + co_await _log.maybe_roll_unlocked( + _last_term, _idx, _config.io_priority); co_await initialize(); } } diff --git a/src/v/storage/disk_log_impl.cc b/src/v/storage/disk_log_impl.cc index f1f17b7ad4b03..18168f38524f9 100644 --- a/src/v/storage/disk_log_impl.cc +++ b/src/v/storage/disk_log_impl.cc @@ -1222,15 +1222,11 @@ ss::future<> disk_log_impl::force_roll(ss::io_priority_class iopc) { }); } -ss::future<> disk_log_impl::maybe_roll( +ss::future<> disk_log_impl::maybe_roll_unlocked( model::term_id t, model::offset next_offset, ss::io_priority_class iopc) { - // This lock will only rarely be contended. If it is held, then - // we must wait for do_housekeeping to complete before proceeding, because - // the log might be in a state mid-roll where it has no appender. - // We need to take this irrespective of whether we're actually rolling - // or not, in order to ensure that writers wait for a background roll - // to complete if one is ongoing. - auto roll_lock_holder = co_await _segments_rolling_lock.get_units(); + vassert( + !_segments_rolling_lock.ready(), + "Must have taken _segments_rolling_lock"); vassert(t >= term(), "Term:{} must be greater than base:{}", t, term()); if (_segs.empty()) { @@ -1253,8 +1249,12 @@ ss::future<> disk_log_impl::maybe_roll( ss::future<> disk_log_impl::apply_segment_ms() { auto gate = _compaction_housekeeping_gate.hold(); - // do_housekeeping races with maybe_roll to use new_segment. - // take a lock to prevent problems + // Holding the lock blocks writes to the last open segment. + // This is required in order to avoid the logic in this function + // racing with an inflight append. Contention on this lock should + // be very light, since we wouldn't need to enforce segment.ms + // if this partition was high throughput (segment would have rolled + // naturally). auto lock = co_await _segments_rolling_lock.get_units(); if (_segs.empty()) { diff --git a/src/v/storage/disk_log_impl.h b/src/v/storage/disk_log_impl.h index 36cdd6b9831e0..2592fa52584a6 100644 --- a/src/v/storage/disk_log_impl.h +++ b/src/v/storage/disk_log_impl.h @@ -94,7 +94,8 @@ class disk_log_impl final : public log::impl { std::optional index_lower_bound(model::offset o) const final; std::ostream& print(std::ostream&) const final; - ss::future<> maybe_roll( + // Must be called while _segments_rolling_lock is held. + ss::future<> maybe_roll_unlocked( model::term_id, model::offset next_offset, ss::io_priority_class); // roll immediately with the current term. users should prefer the @@ -135,6 +136,14 @@ class disk_log_impl final : public log::impl { ss::future get_reclaimable_offsets(gc_config cfg); + std::optional try_segment_roll_lock() { + return _segments_rolling_lock.try_get_units(); + } + + ss::future segment_roll_lock() { + return _segments_rolling_lock.get_units(); + } + private: friend class disk_log_appender; // for multi-term appends friend class disk_log_builder; // for tests @@ -274,6 +283,13 @@ class disk_log_impl final : public log::impl { // Mutually exclude operations that will cause segment rolling // do_housekeeping and maybe_roll + // + // This lock will only rarely be contended. If it is held, then + // we must wait for do_housekeeping to complete before proceeding, because + // the log might be in a state mid-roll where it has no appender. + // We need to take this irrespective of whether we're actually rolling + // or not, in order to ensure that writers wait for a background roll + // to complete if one is ongoing. mutex _segments_rolling_lock; std::optional _cloud_gc_offset;