From e74474d1fd29164555e1ffab79bde991f621130d Mon Sep 17 00:00:00 2001 From: Adam Kupczyk Date: Tue, 29 Jun 2021 13:03:56 +0200 Subject: [PATCH] os/bluestore/bluefs: Split single bluefs lock into multiple smaller locks Splits bluefs lock into log, dirty, dirs, file and writer locks. This breaks severe locking issues, and makes bluefs more multithreaded. Signed-off-by: Adam Kupczyk --- src/os/bluestore/BlueFS.cc | 408 +++++++++++++++++++++++++------------ src/os/bluestore/BlueFS.h | 113 ++++------ 2 files changed, 321 insertions(+), 200 deletions(-) diff --git a/src/os/bluestore/BlueFS.cc b/src/os/bluestore/BlueFS.cc index fbb492b219277..34c1c505b0f66 100644 --- a/src/os/bluestore/BlueFS.cc +++ b/src/os/bluestore/BlueFS.cc @@ -148,7 +148,7 @@ class BlueFS::SocketHook : public AdminSocketHook { out.append(ss); } else if (command == "bluefs files list") { const char* devnames[3] = {"wal","db","slow"}; - std::lock_guard l(bluefs->lock); + std::lock_guard l(bluefs->dirs_lock); f->open_array_section("files"); for (auto &d : bluefs->dir_map) { std::string dir = d.first; @@ -384,6 +384,7 @@ void BlueFS::_shutdown_logger() delete logger; } +//AK - TODO - locking needed but not certain void BlueFS::_update_logger_stats() { // we must be holding the lock @@ -466,7 +467,6 @@ void BlueFS::handle_discard(unsigned id, interval_set& to_release) uint64_t BlueFS::get_used() { - std::lock_guard l(lock); uint64_t used = 0; for (unsigned id = 0; id < MAX_BDEV; ++id) { used += _get_used(id); @@ -492,7 +492,6 @@ uint64_t BlueFS::get_used(unsigned id) { ceph_assert(id < alloc.size()); ceph_assert(alloc[id]); - std::lock_guard l(lock); return _get_used(id); } @@ -505,13 +504,11 @@ uint64_t BlueFS::_get_total(unsigned id) const uint64_t BlueFS::get_total(unsigned id) { - std::lock_guard l(lock); return _get_total(id); } uint64_t BlueFS::get_free(unsigned id) { - std::lock_guard l(lock); ceph_assert(id < alloc.size()); return alloc[id]->get_free(); } @@ -541,7 +538,7 @@ void BlueFS::dump_block_extents(ostream& out) int BlueFS::get_block_extents(unsigned id, interval_set *extents) { - std::lock_guard l(lock); + std::lock_guard dirl(dirs_lock); dout(10) << __func__ << " bdev " << id << dendl; ceph_assert(id < alloc.size()); for (auto& p : file_map) { @@ -556,7 +553,6 @@ int BlueFS::get_block_extents(unsigned id, interval_set *extents) int BlueFS::mkfs(uuid_d osd_uuid, const bluefs_layout_t& layout) { - std::unique_lock l(lock); dout(1) << __func__ << " osd_uuid " << osd_uuid << dendl; @@ -593,7 +589,7 @@ int BlueFS::mkfs(uuid_d osd_uuid, const bluefs_layout_t& layout) // initial txn log_t.op_init(); - _flush_and_sync_log(l); + flush_and_sync_log(); // write supers super.log_fnode = log_file->fnode; @@ -985,7 +981,7 @@ int BlueFS::prepare_new_device(int id, const bluefs_layout_t& layout) new_log_dev_cur = BDEV_NEWDB; new_log_dev_next = BDEV_DB; } - _rewrite_log_and_layout_sync(false, + rewrite_log_and_layout_sync(false, BDEV_NEWDB, new_log_dev_cur, new_log_dev_next, @@ -993,7 +989,7 @@ int BlueFS::prepare_new_device(int id, const bluefs_layout_t& layout) layout); //} } else if(id == BDEV_NEWWAL) { - _rewrite_log_and_layout_sync(false, + rewrite_log_and_layout_sync(false, BDEV_DB, BDEV_NEWWAL, BDEV_WAL, @@ -1024,7 +1020,6 @@ void BlueFS::get_devices(set *ls) int BlueFS::fsck() { - std::lock_guard l(lock); dout(1) << __func__ << dendl; // hrm, i think we check everything on mount... return 0; @@ -1828,7 +1823,7 @@ int BlueFS::device_migrate_to_existing( new_log_dev_next; } - _rewrite_log_and_layout_sync( + rewrite_log_and_layout_sync( false, (flags & REMOVE_DB) ? BDEV_SLOW : BDEV_DB, new_log_dev_cur, @@ -1963,7 +1958,7 @@ int BlueFS::device_migrate_to_new( BDEV_DB : BDEV_SLOW; - _rewrite_log_and_layout_sync( + rewrite_log_and_layout_sync( false, super_dev, new_log_dev_cur, @@ -1993,12 +1988,16 @@ void BlueFS::_drop_link(FileRef file) dout(20) << __func__ << " had refs " << file->refs << " on " << file->fnode << dendl; ceph_assert(file->refs > 0); + ceph_assert(ceph_mutex_is_locked(log_lock)); + --file->refs; if (file->refs == 0) { dout(20) << __func__ << " destroying " << file->fnode << dendl; ceph_assert(file->num_reading.load() == 0); vselector->sub_usage(file->vselector_hint, file->fnode); log_t.op_file_remove(file->fnode.ino); + + std::lock_guard dl(dirty_lock); for (auto& r : file->fnode.extents) { pending_release[r.bdev].insert(r.offset, r.length); } @@ -2006,6 +2005,7 @@ void BlueFS::_drop_link(FileRef file) file->deleted = true; if (file->dirty_seq) { + // retract request to serialize changes ceph_assert(file->dirty_seq > log_seq_stable); ceph_assert(dirty_files.count(file->dirty_seq)); auto it = dirty_files[file->dirty_seq].iterator_to(*file); @@ -2233,8 +2233,9 @@ int64_t BlueFS::_read( return ret; } -void BlueFS::_invalidate_cache(FileRef f, uint64_t offset, uint64_t length) +void BlueFS::invalidate_cache(FileRef f, uint64_t offset, uint64_t length) { + std::lock_guard l(f->lock); dout(10) << __func__ << " file " << f->fnode << " 0x" << std::hex << offset << "~" << length << std::dec << dendl; @@ -2254,8 +2255,9 @@ void BlueFS::_invalidate_cache(FileRef f, uint64_t offset, uint64_t length) } } -uint64_t BlueFS::_estimate_log_size() +uint64_t BlueFS::estimate_log_size() { + std::lock_guard dirl(dirs_lock); int avg_dir_size = 40; // fixme int avg_file_size = 12; uint64_t size = 4096 * 2; @@ -2267,37 +2269,44 @@ uint64_t BlueFS::_estimate_log_size() void BlueFS::compact_log() { - std::unique_lock l(lock); if (!cct->_conf->bluefs_replay_recovery_disable_compact) { if (cct->_conf->bluefs_compact_log_sync) { - _compact_log_sync(); + compact_log_sync(); } else { - _compact_log_async(l); + compact_log_async(); } } } -bool BlueFS::_should_compact_log() +bool BlueFS::should_start_compact_log() { - uint64_t current = log_writer->file->fnode.size; - uint64_t expected = _estimate_log_size(); + if (log_is_compacting.load() == true) { + // compaction is already running + return false; + } + uint64_t current; + { + std::lock_guard dirl(log_lock); + current = log_writer->file->fnode.size; + } + uint64_t expected = estimate_log_size(); float ratio = (float)current / (float)expected; dout(10) << __func__ << " current 0x" << std::hex << current << " expected " << expected << std::dec << " ratio " << ratio - << (new_log ? " (async compaction in progress)" : "") << dendl; - if (new_log || - current < cct->_conf->bluefs_log_compact_min_size || + if (current < cct->_conf->bluefs_log_compact_min_size || ratio < cct->_conf->bluefs_log_compact_min_ratio) { return false; } return true; } -void BlueFS::_compact_log_dump_metadata(bluefs_transaction_t *t, +void BlueFS::compact_log_dump_metadata(bluefs_transaction_t *t, int flags) { + std::lock_guard dirl(dirs_lock); + t->seq = 1; t->uuid = super.uuid; dout(20) << __func__ << " op_init" << dendl; @@ -2307,7 +2316,7 @@ void BlueFS::_compact_log_dump_metadata(bluefs_transaction_t *t, if (ino == 1) continue; ceph_assert(ino > 1); - + //AK - TODO - touching fnode - need to lock for(auto& e : file_ref->fnode.extents) { auto bdev = e.bdev; auto bdev_new = bdev; @@ -2344,12 +2353,12 @@ void BlueFS::_compact_log_dump_metadata(bluefs_transaction_t *t, } } -void BlueFS::_compact_log_sync() +void BlueFS::compact_log_sync() { dout(10) << __func__ << dendl; auto prefer_bdev = vselector->select_prefer_bdev(log_writer->file->vselector_hint); - _rewrite_log_and_layout_sync(true, + rewrite_log_and_layout_sync(true, BDEV_DB, prefer_bdev, prefer_bdev, @@ -2358,13 +2367,16 @@ void BlueFS::_compact_log_sync() logger->inc(l_bluefs_log_compactions); } -void BlueFS::_rewrite_log_and_layout_sync(bool allocate_with_fallback, - int super_dev, - int log_dev, - int log_dev_new, - int flags, - std::optional layout) +void BlueFS::rewrite_log_and_layout_sync(bool allocate_with_fallback, + int super_dev, + int log_dev, + int log_dev_new, + int flags, + std::optional layout) { + //ceph_assert(ceph_mutex_is_notlocked(log_lock)); + std::lock_guard ll(log_lock); + File *log_file = log_writer->file.get(); // clear out log (be careful who calls us!!!) @@ -2376,7 +2388,7 @@ void BlueFS::_rewrite_log_and_layout_sync(bool allocate_with_fallback, << " flags:" << flags << dendl; bluefs_transaction_t t; - _compact_log_dump_metadata(&t, flags); + compact_log_dump_metadata(&t, flags); dout(20) << __func__ << " op_jump_seq " << log_seq << dendl; t.op_jump_seq(log_seq); @@ -2444,6 +2456,7 @@ void BlueFS::_rewrite_log_and_layout_sync(bool allocate_with_fallback, flush_bdev(); dout(10) << __func__ << " release old log extents " << old_fnode.extents << dendl; + std::lock_guard dl(dirty_lock); for (auto& r : old_fnode.extents) { pending_release[r.bdev].insert(r.offset, r.length); } @@ -2471,29 +2484,42 @@ void BlueFS::_rewrite_log_and_layout_sync(bool allocate_with_fallback, * * 8. Release the old log space. Clean up. */ -void BlueFS::_compact_log_async(std::unique_lock& l) + +void BlueFS::compact_log_async() { dout(10) << __func__ << dendl; + // only one compaction allowed at one time + bool old_is_comp = std::atomic_exchange(&log_is_compacting, true); + if (old_is_comp) { + dout(10) << __func__ << " ongoing" <file.get(); - ceph_assert(!new_log); - ceph_assert(!new_log_writer); + FileWriter *new_log_writer = nullptr; + FileRef new_log = nullptr; + uint64_t new_log_jump_to = 0; + uint64_t old_log_jump_to = 0; - // create a new log [writer] so that we know compaction is in progress - // (see _should_compact_log) new_log = ceph::make_ref(); - new_log->fnode.ino = 0; // so that _flush_range won't try to log the fnode + new_log->fnode.ino = 0; // we use _flush_special to avoid log of the fnode - // 0. wait for any racing flushes to complete. (We do not want to block - // in _flush_sync_log with jump_to set or else a racing thread might flush - // our entries and our jump_to update won't be correct.) - while (log_flushing) { - dout(10) << __func__ << " log is currently flushing, waiting" << dendl; - log_cond.wait(l); - } + // Part 1. + // Prepare current log for jumping into it. + // 1. Allocate extent + // 2. Update op to log + // 3. Jump op to log + // During that, no one else can write to log, otherwise we risk jumping backwards. + // We need to sync log, because we are injecting discontinuity, and writer is not prepared for that. + + //signal _maybe_extend_log that expansion of log is temporary inacceptable + bool old_forbidden = atomic_exchange(&log_forbidden_to_expand, true); + ceph_assert(old_forbidden == false); vselector->sub_usage(log_file->vselector_hint, log_file->fnode); - // 1. allocate new log space and jump to it. + // 1.1 allocate new log space and jump to it. old_log_jump_to = log_file->fnode.get_allocated(); uint64_t runway = log_file->fnode.get_allocated() - log_writer->get_effective_write_pos(); dout(10) << __func__ << " old_log_jump_to 0x" << std::hex << old_log_jump_to @@ -2508,21 +2534,24 @@ void BlueFS::_compact_log_async(std::unique_lock& l) // update the log file change and log a jump to the offset where we want to // write the new entries - log_t.op_file_update(log_file->fnode); - log_t.op_jump(log_seq, old_log_jump_to); + log_t.op_file_update(log_file->fnode); // 1.2 + log_t.op_jump(log_seq, old_log_jump_to); // 1.3 // we need to flush all bdev because we will be streaming all dirty files to log // TODO - think - if _flush_and_sync_log_jump will not add dirty files nor release pending allocations // then flush_bdev() will not be necessary flush_bdev(); - _flush_and_sync_log_jump(old_log_jump_to, runway); + log_lock.unlock(); + // out of jump section - now log can be used to write to + // 2. prepare compacted log bluefs_transaction_t t; - //avoid record two times in log_t and _compact_log_dump_metadata. - log_t.clear(); - _compact_log_dump_metadata(&t, 0); + //this needs files lock + //what will happen, if a file is modified *twice* before we stream it to log? + //the later state that we capture will be seen earlier and replay will see a temporary retraction (!) + compact_log_dump_metadata(&t, 0); uint64_t max_alloc_size = std::max(alloc_size[BDEV_WAL], std::max(alloc_size[BDEV_DB], @@ -2540,7 +2569,7 @@ void BlueFS::_compact_log_async(std::unique_lock& l) ceph_assert(r == 0); // we might have some more ops in log_t due to _allocate call - t.claim_ops(log_t); + // t.claim_ops(log_t); no we no longer track allocations in log bufferlist bl; encode(t, bl); @@ -2550,15 +2579,16 @@ void BlueFS::_compact_log_async(std::unique_lock& l) << std::dec << dendl; new_log_writer = _create_writer(new_log); - new_log_writer->append(bl); + new_log_writer->append(bl); + new_log_writer->lock.lock(); // 3. flush r = _flush_special(new_log_writer); ceph_assert(r == 0); // 4. wait - _flush_bdev_safely(new_log_writer); - + _flush_bdev(new_log_writer); + new_log_writer->lock.unlock(); // 5. update our log fnode // discard first old_log_jump_to extents @@ -2612,29 +2642,42 @@ void BlueFS::_compact_log_async(std::unique_lock& l) ++super.version; _write_super(BDEV_DB); - lock.unlock(); flush_bdev(); - lock.lock(); + + old_forbidden = atomic_exchange(&log_forbidden_to_expand, false); + ceph_assert(old_forbidden == true); + //to wake up if someone was in need of expanding log + log_cond.notify_all(); // 7. release old space dout(10) << __func__ << " release old log extents " << old_extents << dendl; - for (auto& r : old_extents) { - pending_release[r.bdev].insert(r.offset, r.length); + { + std::lock_guard dl(dirty_lock); + for (auto& r : old_extents) { + pending_release[r.bdev].insert(r.offset, r.length); + } } // delete the new log, remove from the dirty files list _close_writer(new_log_writer); + // flush_special does not dirty files + /* if (new_log->dirty_seq) { + std::lock_guard dl(dirty_lock); ceph_assert(dirty_files.count(new_log->dirty_seq)); auto it = dirty_files[new_log->dirty_seq].iterator_to(*new_log); dirty_files[new_log->dirty_seq].erase(it); } + */ new_log_writer = nullptr; new_log = nullptr; log_cond.notify_all(); dout(10) << __func__ << " log extents " << log_file->fnode.extents << dendl; logger->inc(l_bluefs_log_compactions); + + old_is_comp = atomic_exchange(&log_is_compacting, false); + ceph_assert(old_is_comp); } void BlueFS::_pad_bl(bufferlist& bl) @@ -2647,10 +2690,26 @@ void BlueFS::_pad_bl(bufferlist& bl) } } +/** + * Adds file modifications from `dirty_files` to bluefs transactions + * already stored in `log_t`. Writes them to disk and waits until are stable. + * Guarantees that file modifications with `want_seq` are already stable on disk. + * In addition may insert jump forward transaction to log write position `jump_to`. + * + * it should lock ability to: + * 1) add to log_t + * 2) modify dirty_files + * 3) add to pending_release + * + * pending_release and log_t go with same lock + */ + // Adds to log_t file modifications mentioned in `dirty_files`. // Note: some bluefs ops may have already been stored in log_t transaction. uint64_t BlueFS::_consume_dirty() { + ceph_assert(ceph_mutex_is_locked(dirty_lock)); + ceph_assert(ceph_mutex_is_locked(log_lock)); //acquire new seq // this will became log_seq_stable once we write uint64_t seq = log_t.seq = ++log_seq; @@ -2673,6 +2732,7 @@ uint64_t BlueFS::_consume_dirty() // Returns space available *BEFORE* adding new space. Signed for additional <0 detection. int64_t BlueFS::_maybe_extend_log() { + ceph_assert(ceph_mutex_is_locked(log_lock)); // allocate some more space (before we run out)? // BTW: this triggers `flush()` in the `page_aligned_appender` of `log_writer`. int64_t runway = log_writer->file->fnode.get_allocated() - @@ -2695,7 +2755,7 @@ int64_t BlueFS::_maybe_extend_log() * - re-run compaction with more runway for old log * - add OP_FILE_ADDEXT that adds extent; will be compatible with both logs */ - if (new_log_writer) { + if (log_forbidden_to_expand.load() == true) { return -EWOULDBLOCK; } vselector->sub_usage(log_writer->file->vselector_hint, log_writer->file->fnode); @@ -2712,6 +2772,7 @@ int64_t BlueFS::_maybe_extend_log() void BlueFS::_flush_and_sync_log_core(int64_t runway) { + ceph_assert(ceph_mutex_is_locked(log_lock)); dout(10) << __func__ << " " << log_t << dendl; bufferlist bl; @@ -2741,6 +2802,8 @@ void BlueFS::_flush_and_sync_log_core(int64_t runway) // Clears dirty_files up to (including) seq_stable. void BlueFS::_clear_dirty_set_stable(uint64_t seq) { + std::lock_guard lg(dirty_lock); + // clean dirty files if (seq > log_seq_stable) { log_seq_stable = seq; @@ -2797,33 +2860,46 @@ void BlueFS::_release_pending_allocations(vector>& to_rel } } -int BlueFS::_flush_and_sync_log(std::unique_lock& l, - uint64_t want_seq) +int BlueFS::flush_and_sync_log(uint64_t want_seq) { - if (want_seq && want_seq <= log_seq_stable) { - dout(10) << __func__ << " want_seq " << want_seq << " <= log_seq_stable " - << log_seq_stable << ", done" << dendl; - return 0; - } + // we synchronize writing to log, by lock to log_lock int64_t available_runway; do { + log_lock.lock(); + dirty_lock.lock(); + if (want_seq && want_seq <= log_seq_stable) { + dout(10) << __func__ << " want_seq " << want_seq << " <= log_seq_stable " + << log_seq_stable << ", done" << dendl; + dirty_lock.unlock(); + log_lock.unlock(); + return 0; + } + available_runway = _maybe_extend_log(); if (available_runway == -EWOULDBLOCK) { - while (new_log_writer) { - dout(10) << __func__ << " waiting for async compaction" << dendl; - log_cond.wait(l); + // we are in need of adding runway, but we are during log-switch from compaction + dirty_lock.unlock(); + //instead log_lock_unlock() do move ownership + std::unique_lock ll(log_lock, std::adopt_lock); + while (log_forbidden_to_expand.load()) { + log_cond.wait(ll); } + } else { + ceph_assert(available_runway >= 0); } - } while (available_runway == -EWOULDBLOCK); + } while (available_runway < 0); ceph_assert(want_seq == 0 || want_seq <= log_seq + 1); // illegal to request seq that was not created yet uint64_t seq = _consume_dirty(); vector> to_release(pending_release.size()); to_release.swap(pending_release); + dirty_lock.unlock(); _flush_and_sync_log_core(available_runway); - _flush_bdev_safely(log_writer); + _flush_bdev(log_writer); + //now log_lock is no longer needed + log_lock.unlock(); _clear_dirty_set_stable(seq); _release_pending_allocations(to_release); @@ -2836,11 +2912,16 @@ int BlueFS::_flush_and_sync_log(std::unique_lock& l, int BlueFS::_flush_and_sync_log_jump(uint64_t jump_to, int64_t available_runway) { + ceph_assert(ceph_mutex_is_locked(log_lock)); + ceph_assert(jump_to); + // we synchronize writing to log, by lock to log_lock + + dirty_lock.lock(); uint64_t seq = _consume_dirty(); vector> to_release(pending_release.size()); to_release.swap(pending_release); - + dirty_lock.unlock(); _flush_and_sync_log_core(available_runway); dout(10) << __func__ << " jumping log offset from 0x" << std::hex @@ -2850,7 +2931,7 @@ int BlueFS::_flush_and_sync_log_jump(uint64_t jump_to, log_writer->file->fnode.size = jump_to; vselector->add_usage(log_writer->file->vselector_hint, log_writer->file->fnode.size); - _flush_bdev_safely(log_writer); + _flush_bdev(log_writer); _clear_dirty_set_stable(seq); _release_pending_allocations(to_release); @@ -2865,6 +2946,7 @@ ceph::bufferlist BlueFS::FileWriter::flush_buffer( const unsigned length, const bluefs_super_t& super) { + ceph_assert(ceph_mutex_is_locked(this->lock) || file->fnode.ino == 1); ceph::bufferlist bl; if (partial) { tail_block.splice(0, tail_block.length(), &bl); @@ -2903,6 +2985,8 @@ ceph::bufferlist BlueFS::FileWriter::flush_buffer( int BlueFS::_signal_dirty_to_log(FileWriter *h) { + ceph_assert(ceph_mutex_is_locked(h->lock)); + std::lock_guard dl(dirty_lock); h->file->fnode.mtime = ceph_clock_now(); ceph_assert(h->file->fnode.ino >= 1); if (h->file->dirty_seq == 0) { @@ -2928,8 +3012,14 @@ int BlueFS::_signal_dirty_to_log(FileWriter *h) return 0; } +void BlueFS::flush_range(FileWriter *h, uint64_t offset, uint64_t length) { + std::unique_lock hl(h->lock); + _flush_range(h, offset, length); +} + int BlueFS::_flush_range(FileWriter *h, uint64_t offset, uint64_t length) { + ceph_assert(ceph_mutex_is_locked(h->lock)); dout(10) << __func__ << " " << h << " pos 0x" << std::hex << h->pos << " 0x" << offset << "~" << length << std::dec << " to " << h->file->fnode << dendl; @@ -2952,13 +3042,13 @@ int BlueFS::_flush_range(FileWriter *h, uint64_t offset, uint64_t length) << std::hex << offset << "~" << length << std::dec << dendl; } + std::lock_guard file_lock(h->file->lock); ceph_assert(offset <= h->file->fnode.size); uint64_t allocated = h->file->fnode.get_allocated(); vselector->sub_usage(h->file->vselector_hint, h->file->fnode); // do not bother to dirty the file if we are overwriting // previously allocated extents. - if (allocated < offset + length) { // we should never run out of log space here; see the min runway check // in _flush_and_sync_log. @@ -2979,12 +3069,15 @@ int BlueFS::_flush_range(FileWriter *h, uint64_t offset, uint64_t length) h->file->fnode.size = offset + length; h->file->is_dirty = true; } + dout(20) << __func__ << " file now, unflushed " << h->file->fnode << dendl; return _flush_data(h, offset, length, buffered); } int BlueFS::_flush_data(FileWriter *h, uint64_t offset, uint64_t length, bool buffered) { + //ceph_assert(ceph_mutex_is_locked(h->lock)); + //ceph_assert(ceph_mutex_is_locked(h->file->lock)); uint64_t x_off = 0; auto p = h->file->fnode.seek(offset, &x_off); ceph_assert(p != h->file->fnode.extents.end()); @@ -3091,18 +3184,49 @@ void BlueFS::wait_for_aio(FileWriter *h) } #endif -int BlueFS::_flush(FileWriter *h, bool force, std::unique_lock& l) + +void BlueFS::append_try_flush(FileWriter *h, const char* buf, size_t len) +{ + std::unique_lock hl(h->lock); + size_t max_size = 1ull << 30; // cap to 1GB + while (len > 0) { + bool need_flush = true; + auto l0 = h->get_buffer_length(); + if (l0 < max_size) { + size_t l = std::min(len, max_size - l0); + h->append(buf, l); + buf += l; + len -= l; + need_flush = h->get_buffer_length() >= cct->_conf->bluefs_min_flush_size; + } + if (need_flush) { + bool flushed = false; + int r = _flush(h, true, &flushed); + ceph_assert(r == 0); + if (r == 0 && flushed) { + maybe_compact_log(); + } + // make sure we've made any progress with flush hence the + // loop doesn't iterate forever + ceph_assert(h->get_buffer_length() < max_size); + } + } +} + +void BlueFS::flush(FileWriter *h, bool force) { + std::unique_lock hl(h->lock); bool flushed = false; int r = _flush(h, force, &flushed); + ceph_assert(r == 0); if (r == 0 && flushed) { - _maybe_compact_log(l); + maybe_compact_log(); } - return r; } int BlueFS::_flush(FileWriter *h, bool force, bool *flushed) { + ceph_assert(ceph_mutex_is_locked(h->lock)); uint64_t length = h->get_buffer_length(); uint64_t offset = h->pos; if (flushed) { @@ -3139,6 +3263,8 @@ int BlueFS::_flush(FileWriter *h, bool force, bool *flushed) // smart enough to discover it on its own. int BlueFS::_flush_special(FileWriter *h) { + //ceph_assert(ceph_mutex_is_locked(h->lock)); + //ceph_assert(ceph_mutex_is_locked(h->file->lock)); uint64_t length = h->get_buffer_length(); uint64_t offset = h->pos; ceph_assert(length + offset <= h->file->fnode.get_allocated()); @@ -3148,8 +3274,9 @@ int BlueFS::_flush_special(FileWriter *h) return _flush_data(h, offset, length, false); } -int BlueFS::_truncate(FileWriter *h, uint64_t offset) +int BlueFS::truncate(FileWriter *h, uint64_t offset) { + std::lock_guard hl(h->lock); dout(10) << __func__ << " 0x" << std::hex << offset << std::dec << " file " << h->file->fnode << dendl; if (h->file->deleted) { @@ -3179,17 +3306,19 @@ int BlueFS::_truncate(FileWriter *h, uint64_t offset) ceph_abort_msg("truncate up not supported"); } ceph_assert(h->file->fnode.size >= offset); - _flush_bdev_safely(h); + _flush_bdev(h); vselector->sub_usage(h->file->vselector_hint, h->file->fnode.size); h->file->fnode.size = offset; vselector->add_usage(h->file->vselector_hint, h->file->fnode.size); + std::lock_guard ll(log_lock); log_t.op_file_update_inc(h->file->fnode); return 0; } -int BlueFS::_fsync(FileWriter *h, std::unique_lock& l) +int BlueFS::fsync(FileWriter *h) { + std::unique_lock hl(h->lock); dout(10) << __func__ << " " << h << " " << h->file->fnode << dendl; int r = _flush(h, true); if (r < 0) @@ -3200,39 +3329,40 @@ int BlueFS::_fsync(FileWriter *h, std::unique_lock& l) } uint64_t old_dirty_seq = h->file->dirty_seq; - _flush_bdev_safely(h); + _flush_bdev(h); if (old_dirty_seq) { uint64_t s = log_seq; dout(20) << __func__ << " file metadata was dirty (" << old_dirty_seq << ") on " << h->file->fnode << ", flushing log" << dendl; - _flush_and_sync_log(l, old_dirty_seq); + flush_and_sync_log(old_dirty_seq); + // AK - TODO - think - how can dirty_seq change if we are under h lock? ceph_assert(h->file->dirty_seq == 0 || // cleaned - h->file->dirty_seq > s); // or redirtied by someone else + h->file->dirty_seq > s); // or redirtied by someone else } + maybe_compact_log(); return 0; } -void BlueFS::_flush_bdev_safely(FileWriter *h) +// be careful - either h->file->lock or log_lock must be taken +void BlueFS::_flush_bdev(FileWriter *h) { + if (h->file->fnode.ino != 1) { + ceph_assert(ceph_mutex_is_locked(h->lock)); + } else { + ceph_assert(ceph_mutex_is_locked(log_lock)); + } std::array flush_devs = h->dirty_devs; h->dirty_devs.fill(false); #ifdef HAVE_LIBAIO if (!cct->_conf->bluefs_sync_write) { list completed_ios; _claim_completed_aios(h, &completed_ios); - lock.unlock(); wait_for_aio(h); completed_ios.clear(); - flush_bdev(flush_devs); - lock.lock(); - } else -#endif - { - lock.unlock(); - flush_bdev(flush_devs); - lock.lock(); } +#endif + flush_bdev(flush_devs); } void BlueFS::flush_bdev(std::array& dirty_bdevs) @@ -3366,8 +3496,9 @@ int BlueFS::_allocate(uint8_t id, uint64_t len, return 0; } -int BlueFS::_preallocate(FileRef f, uint64_t off, uint64_t len) +int BlueFS::preallocate(FileRef f, uint64_t off, uint64_t len) { + std::lock_guard fl(f->lock); dout(10) << __func__ << " file " << f->fnode << " 0x" << std::hex << off << "~" << len << std::dec << dendl; if (f->deleted) { @@ -3386,6 +3517,7 @@ int BlueFS::_preallocate(FileRef f, uint64_t off, uint64_t len) vselector->add_usage(f->vselector_hint, f->fnode); if (r < 0) return r; + std::lock_guard ll(log_lock); log_t.op_file_update_inc(f->fnode); } return 0; @@ -3393,8 +3525,13 @@ int BlueFS::_preallocate(FileRef f, uint64_t off, uint64_t len) void BlueFS::sync_metadata(bool avoid_compact) { - std::unique_lock l(lock); - if (log_t.empty() && dirty_files.empty()) { + bool can_skip_flush; + { + std::lock_guard ll(log_lock); + std::lock_guard dl(dirty_lock); + can_skip_flush = log_t.empty() && dirty_files.empty(); + } + if (can_skip_flush) { dout(10) << __func__ << " - no pending log events" << dendl; } else { utime_t start; @@ -3402,23 +3539,23 @@ void BlueFS::sync_metadata(bool avoid_compact) start = ceph_clock_now(); *_dout << dendl; flush_bdev(); // FIXME? - _flush_and_sync_log(l); + flush_and_sync_log(); dout(10) << __func__ << " done in " << (ceph_clock_now() - start) << dendl; } if (!avoid_compact) { - _maybe_compact_log(l); + maybe_compact_log(); } } -void BlueFS::_maybe_compact_log(std::unique_lock& l) +void BlueFS::maybe_compact_log() { if (!cct->_conf->bluefs_replay_recovery_disable_compact && - _should_compact_log()) { + should_start_compact_log()) { if (cct->_conf->bluefs_compact_log_sync) { - _compact_log_sync(); + compact_log_sync(); } else { - _compact_log_async(l); + compact_log_async(); } } } @@ -3429,7 +3566,7 @@ int BlueFS::open_for_write( FileWriter **h, bool overwrite) { - std::lock_guard l(lock); + std::lock_guard dirl(dirs_lock); dout(10) << __func__ << " " << dirname << "/" << filename << dendl; map::iterator p = dir_map.find(dirname); DirRef dir; @@ -3491,11 +3628,12 @@ int BlueFS::open_for_write( dout(20) << __func__ << " mapping " << dirname << "/" << filename << " vsel_hint " << file->vselector_hint << dendl; - - log_t.op_file_update(file->fnode); - if (create) - log_t.op_dir_link(dirname, filename, file->fnode.ino); - + { + std::lock_guard ll(log_lock); + log_t.op_file_update(file->fnode); + if (create) + log_t.op_dir_link(dirname, filename, file->fnode.ino); + } *h = _create_writer(file); if (boost::algorithm::ends_with(filename, ".log")) { @@ -3525,7 +3663,7 @@ BlueFS::FileWriter *BlueFS::_create_writer(FileRef f) return w; } -void BlueFS::_close_writer(FileWriter *h) +void BlueFS::_drain_writer(FileWriter *h) { dout(10) << __func__ << " " << h << " type " << h->writer_type << dendl; //h->buffer.reassign_to_mempool(mempool::mempool_bluefs_file_writer); @@ -3541,18 +3679,31 @@ void BlueFS::_close_writer(FileWriter *h) if (h->file->fnode.size >= (1ull << 30)) { dout(10) << __func__ << " file is unexpectedly large:" << h->file->fnode << dendl; } +} + +void BlueFS::_close_writer(FileWriter *h) +{ + _drain_writer(h); + delete h; +} +void BlueFS::close_writer(FileWriter *h) +{ + { + std::lock_guard l(h->lock); + _drain_writer(h); + } delete h; } uint64_t BlueFS::debug_get_dirty_seq(FileWriter *h) { - std::lock_guard l(lock); + std::lock_guard l(h->lock); return h->file->dirty_seq; } bool BlueFS::debug_get_is_dev_dirty(FileWriter *h, uint8_t dev) { - std::lock_guard l(lock); + std::lock_guard l(h->lock); return h->dirty_devs[dev]; } @@ -3562,7 +3713,7 @@ int BlueFS::open_for_read( FileReader **h, bool random) { - std::lock_guard l(lock); + std::lock_guard dirl(dirs_lock); dout(10) << __func__ << " " << dirname << "/" << filename << (random ? " (random)":" (sequential)") << dendl; map::iterator p = dir_map.find(dirname); @@ -3591,7 +3742,8 @@ int BlueFS::rename( std::string_view old_dirname, std::string_view old_filename, std::string_view new_dirname, std::string_view new_filename) { - std::lock_guard l(lock); + std::lock_guard dirl(dirs_lock); + std::lock_guard ll(log_lock); dout(10) << __func__ << " " << old_dirname << "/" << old_filename << " -> " << new_dirname << "/" << new_filename << dendl; map::iterator p = dir_map.find(old_dirname); @@ -3638,7 +3790,8 @@ int BlueFS::rename( int BlueFS::mkdir(std::string_view dirname) { - std::lock_guard l(lock); + std::lock_guard dirl(dirs_lock); + std::lock_guard ll(log_lock); dout(10) << __func__ << " " << dirname << dendl; map::iterator p = dir_map.find(dirname); if (p != dir_map.end()) { @@ -3652,7 +3805,8 @@ int BlueFS::mkdir(std::string_view dirname) int BlueFS::rmdir(std::string_view dirname) { - std::lock_guard l(lock); + std::lock_guard dirl(dirs_lock); + std::lock_guard ll(log_lock); dout(10) << __func__ << " " << dirname << dendl; auto p = dir_map.find(dirname); if (p == dir_map.end()) { @@ -3671,7 +3825,7 @@ int BlueFS::rmdir(std::string_view dirname) bool BlueFS::dir_exists(std::string_view dirname) { - std::lock_guard l(lock); + std::lock_guard dirl(dirs_lock); map::iterator p = dir_map.find(dirname); bool exists = p != dir_map.end(); dout(10) << __func__ << " " << dirname << " = " << (int)exists << dendl; @@ -3681,7 +3835,7 @@ bool BlueFS::dir_exists(std::string_view dirname) int BlueFS::stat(std::string_view dirname, std::string_view filename, uint64_t *size, utime_t *mtime) { - std::lock_guard l(lock); + std::lock_guard dirl(dirs_lock); dout(10) << __func__ << " " << dirname << "/" << filename << dendl; map::iterator p = dir_map.find(dirname); if (p == dir_map.end()) { @@ -3709,7 +3863,7 @@ int BlueFS::stat(std::string_view dirname, std::string_view filename, int BlueFS::lock_file(std::string_view dirname, std::string_view filename, FileLock **plock) { - std::lock_guard l(lock); + std::lock_guard dirl(dirs_lock); dout(10) << __func__ << " " << dirname << "/" << filename << dendl; map::iterator p = dir_map.find(dirname); if (p == dir_map.end()) { @@ -3729,6 +3883,7 @@ int BlueFS::lock_file(std::string_view dirname, std::string_view filename, file_map[ino_last] = file; dir->file_map[string{filename}] = file; ++file->refs; + std::lock_guard ll(log_lock); log_t.op_file_update(file->fnode); log_t.op_dir_link(dirname, filename, file->fnode.ino); } else { @@ -3747,7 +3902,7 @@ int BlueFS::lock_file(std::string_view dirname, std::string_view filename, int BlueFS::unlock_file(FileLock *fl) { - std::lock_guard l(lock); + std::lock_guard dirl(dirs_lock); dout(10) << __func__ << " " << fl << " on " << fl->file->fnode << dendl; ceph_assert(fl->file->locked); fl->file->locked = false; @@ -3761,7 +3916,7 @@ int BlueFS::readdir(std::string_view dirname, vector *ls) if (!dirname.empty() && dirname.back() == '/') { dirname.remove_suffix(1); } - std::lock_guard l(lock); + std::lock_guard dirl(dirs_lock); dout(10) << __func__ << " " << dirname << dendl; if (dirname.empty()) { // list dirs @@ -3789,7 +3944,8 @@ int BlueFS::readdir(std::string_view dirname, vector *ls) int BlueFS::unlink(std::string_view dirname, std::string_view filename) { - std::lock_guard l(lock); + std::lock_guard dirl(dirs_lock); + std::lock_guard ll(log_lock); dout(10) << __func__ << " " << dirname << "/" << filename << dendl; map::iterator p = dir_map.find(dirname); if (p == dir_map.end()) { diff --git a/src/os/bluestore/BlueFS.h b/src/os/bluestore/BlueFS.h index 9b3e34e4190b2..151b632ccf8f9 100644 --- a/src/os/bluestore/BlueFS.h +++ b/src/os/bluestore/BlueFS.h @@ -132,6 +132,11 @@ class BlueFS { std::atomic_int num_reading; void* vselector_hint = nullptr; + /* lock protects fnode and other the parts that can be modified during read & write operations. + Does not protect values that are fixed + Does not need to be taken when doing one-time operations: + _replay, device_migrate_to_existing, device_migrate_to_new */ + ceph::mutex lock = ceph::make_mutex("BlueFS::File::lock"); private: FRIEND_MAKE_REF(File); @@ -315,8 +320,9 @@ class BlueFS { }; private: - ceph::mutex lock = ceph::make_mutex("BlueFS::lock"); - + ceph::mutex log_lock = ceph::make_mutex("BlueFS::log_lock"); + ceph::mutex dirs_lock = ceph::make_mutex("BlueFS::dirs_lock"); + ceph::mutex dirty_lock = ceph::make_mutex("BlueFS::dirty_lock"); PerfCounters *logger = nullptr; uint64_t max_bytes[MAX_BDEV] = {0}; @@ -340,13 +346,12 @@ class BlueFS { FileWriter *log_writer = 0; ///< writer for the log bluefs_transaction_t log_t; ///< pending, unwritten log transaction bool log_flushing = false; ///< true while flushing the log - ceph::condition_variable log_cond; - - uint64_t new_log_jump_to = 0; - uint64_t old_log_jump_to = 0; - FileRef new_log = nullptr; - FileWriter *new_log_writer = nullptr; + ceph::condition_variable log_cond; ///< used for state control between log flush / log compaction + ceph::mutex cond_lock = ceph::make_mutex("BlueFS::cond_lock"); ///< .... + std::atomic log_is_compacting{false}; ///< signals that bluefs log is already ongoing compaction + std::atomic log_forbidden_to_expand{false}; ///< used to signal that async compaction is in state + /// that prohibits expansion of bluefs log /* * There are up to 3 block devices: * @@ -360,6 +365,11 @@ class BlueFS { std::vector alloc; ///< allocators for bdevs std::vector alloc_size; ///< alloc size for each device std::vector> pending_release; ///< extents to release + // TODO: it should be examined what makes pending_release immune to + // eras in a way similar to dirty_files. Hints: + // 1) we have actually only 2 eras: log_seq and log_seq+1 + // 2) we usually not remove extents from files. And when we do, we force log-syncing. + //std::vector> block_unused_too_granular; BlockDevice::aio_callback_t discard_cb[3]; //discard callbacks for each dev @@ -406,10 +416,9 @@ class BlueFS { int _signal_dirty_to_log(FileWriter *h); int _flush_range(FileWriter *h, uint64_t offset, uint64_t length); int _flush_data(FileWriter *h, uint64_t offset, uint64_t length, bool buffered); - int _flush(FileWriter *h, bool force, std::unique_lock& l); int _flush(FileWriter *h, bool force, bool *flushed = nullptr); int _flush_special(FileWriter *h); - int _fsync(FileWriter *h, std::unique_lock& l); + int _fsync(FileWriter *h); #ifdef HAVE_LIBAIO void _claim_completed_aios(FileWriter *h, std::list *ls); @@ -425,11 +434,10 @@ class BlueFS { void _flush_and_sync_log_core(int64_t available_runway); int _flush_and_sync_log_jump(uint64_t jump_to, int64_t available_runway); - int _flush_and_sync_log(std::unique_lock& l, - uint64_t want_seq = 0); + int flush_and_sync_log(uint64_t want_seq = 0); - uint64_t _estimate_log_size(); - bool _should_compact_log(); + uint64_t estimate_log_size(); + bool should_start_compact_log(); enum { REMOVE_DB = 1, @@ -437,12 +445,12 @@ class BlueFS { RENAME_SLOW2DB = 4, RENAME_DB2SLOW = 8, }; - void _compact_log_dump_metadata(bluefs_transaction_t *t, - int flags); - void _compact_log_sync(); - void _compact_log_async(std::unique_lock& l); + void compact_log_dump_metadata(bluefs_transaction_t *t, + int flags); + void compact_log_sync(); + void compact_log_async(); - void _rewrite_log_and_layout_sync(bool allocate_with_fallback, + void rewrite_log_and_layout_sync(bool allocate_with_fallback, int super_dev, int log_dev, int new_log_dev, @@ -451,7 +459,7 @@ class BlueFS { //void _aio_finish(void *priv); - void _flush_bdev_safely(FileWriter *h); + void _flush_bdev(FileWriter *h); void flush_bdev(); // this is safe to call without a lock void flush_bdev(std::array& dirty_bdevs); // this is safe to call without a lock @@ -470,8 +478,6 @@ class BlueFS { uint64_t len, ///< [in] this many bytes char *out); ///< [out] optional: or copy it here - void _invalidate_cache(FileRef f, uint64_t offset, uint64_t length); - int _open_super(); int _write_super(int dev); int _check_allocations(const bluefs_fnode_t& fnode, @@ -484,6 +490,7 @@ class BlueFS { int _replay(bool noop, bool to_stdout = false); ///< replay journal FileWriter *_create_writer(FileRef f); + void _drain_writer(FileWriter *h); void _close_writer(FileWriter *h); // always put the super in the second 4k block. FIXME should this be @@ -549,10 +556,8 @@ class BlueFS { FileReader **h, bool random = false); - void close_writer(FileWriter *h) { - std::lock_guard l(lock); - _close_writer(h); - } + // data added after last fsync() is lost + void close_writer(FileWriter *h); int rename(std::string_view old_dir, std::string_view old_file, std::string_view new_dir, std::string_view new_file); @@ -576,7 +581,7 @@ class BlueFS { /// sync any uncommitted state to disk void sync_metadata(bool avoid_compact); /// test and compact log, if necessary - void _maybe_compact_log(std::unique_lock& l); + void maybe_compact_log(); void set_volume_selector(BlueFSVolumeSelector* s) { vselector.reset(s); @@ -598,42 +603,11 @@ class BlueFS { // handler for discard event void handle_discard(unsigned dev, interval_set& to_release); - void flush(FileWriter *h, bool force = false) { - std::unique_lock l(lock); - int r = _flush(h, force, l); - ceph_assert(r == 0); - } + void flush(FileWriter *h, bool force = false); - void append_try_flush(FileWriter *h, const char* buf, size_t len) { - size_t max_size = 1ull << 30; // cap to 1GB - while (len > 0) { - bool need_flush = true; - auto l0 = h->get_buffer_length(); - if (l0 < max_size) { - size_t l = std::min(len, max_size - l0); - h->append(buf, l); - buf += l; - len -= l; - need_flush = h->get_buffer_length() >= cct->_conf->bluefs_min_flush_size; - } - if (need_flush) { - flush(h, true); - // make sure we've made any progress with flush hence the - // loop doesn't iterate forever - ceph_assert(h->get_buffer_length() < max_size); - } - } - } - void flush_range(FileWriter *h, uint64_t offset, uint64_t length) { - std::lock_guard l(lock); - _flush_range(h, offset, length); - } - int fsync(FileWriter *h) { - std::unique_lock l(lock); - int r = _fsync(h, l); - _maybe_compact_log(l); - return r; - } + void append_try_flush(FileWriter *h, const char* buf, size_t len); + void flush_range(FileWriter *h, uint64_t offset, uint64_t length); + int fsync(FileWriter *h); int64_t read(FileReader *h, uint64_t offset, size_t len, ceph::buffer::list *outbl, char *out) { // no need to hold the global lock here; we only touch h and @@ -648,18 +622,9 @@ class BlueFS { // atomics and asserts). return _read_random(h, offset, len, out); } - void invalidate_cache(FileRef f, uint64_t offset, uint64_t len) { - std::lock_guard l(lock); - _invalidate_cache(f, offset, len); - } - int preallocate(FileRef f, uint64_t offset, uint64_t len) { - std::lock_guard l(lock); - return _preallocate(f, offset, len); - } - int truncate(FileWriter *h, uint64_t offset) { - std::lock_guard l(lock); - return _truncate(h, offset); - } + void invalidate_cache(FileRef f, uint64_t offset, uint64_t len); + int preallocate(FileRef f, uint64_t offset, uint64_t len); + int truncate(FileWriter *h, uint64_t offset); int do_replay_recovery_read(FileReader *log, size_t log_pos, size_t read_offset,