Skip to content

Commit

Permalink
Merge pull request #41490 from idryomov/wip-rbd-pwl-ssd-tailp
Browse files Browse the repository at this point in the history
librbd/cache/pwl/ssd: fix first_free_entry and m_first_free_entry corruption

Reviewed-by: Mahati Chamarthy <mahati.chamarthy@intel.com>
Reviewed-by: Mykola Golub <mgolub@suse.com>
  • Loading branch information
idryomov committed Jul 14, 2021
2 parents 9ac25e2 + f8fb760 commit 89caa62
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 53 deletions.
122 changes: 72 additions & 50 deletions src/librbd/cache/pwl/ssd/WriteLog.cc
Expand Up @@ -35,6 +35,16 @@ using namespace librbd::cache::pwl;
// SSD: this number can be updated later
const unsigned long int ops_appended_together = MAX_WRITES_PER_SYNC_POINT;

static bool is_valid_pool_root(const WriteLogPoolRoot& root) {
return root.pool_size % MIN_WRITE_ALLOC_SSD_SIZE == 0 &&
root.first_valid_entry >= DATA_RING_BUFFER_OFFSET &&
root.first_valid_entry < root.pool_size &&
root.first_valid_entry % MIN_WRITE_ALLOC_SSD_SIZE == 0 &&
root.first_free_entry >= DATA_RING_BUFFER_OFFSET &&
root.first_free_entry < root.pool_size &&
root.first_free_entry % MIN_WRITE_ALLOC_SSD_SIZE == 0;
}

template <typename I>
Builder<AbstractWriteLog<I>>* WriteLog<I>::create_builder() {
m_builderobj = new Builder<This>();
Expand Down Expand Up @@ -208,9 +218,15 @@ void WriteLog<I>::load_existing_entries(pwl::DeferredContexts &later) {

auto p = bl.cbegin();
decode(superblock, p);
ldout(cct,5) << "Decoded superblock" << dendl;

pool_root = superblock.root;
ldout(cct, 1) << "Decoded root: pool_size=" << pool_root.pool_size
<< " first_valid_entry=" << pool_root.first_valid_entry
<< " first_free_entry=" << pool_root.first_free_entry
<< " flushed_sync_gen=" << pool_root.flushed_sync_gen
<< dendl;
ceph_assert(is_valid_pool_root(pool_root));

this->m_log_pool_size = pool_root.pool_size;
this->m_flushed_sync_gen = pool_root.flushed_sync_gen;
this->m_first_valid_entry = pool_root.first_valid_entry;
Expand Down Expand Up @@ -441,14 +457,7 @@ void WriteLog<I>::append_op_log_entries(GenericLogOperations &ops) {
}
});
// Append logs and update first_free_update
uint64_t bytes_allocated_updated;
append_ops(ops, append_ctx, new_first_free_entry, bytes_allocated_updated);

{
std::lock_guard locker1(m_lock);
m_first_free_entry = *new_first_free_entry;
m_bytes_allocated -= bytes_allocated_updated;
}
append_ops(ops, append_ctx, new_first_free_entry);

if (ops.size()) {
this->dispatch_deferred_writes();
Expand Down Expand Up @@ -667,8 +676,8 @@ bool WriteLog<I>::retire_entries(const unsigned long int frees_per_tx) {
pool_root.first_valid_entry = first_valid_entry;

Context *ctx = new LambdaContext(
[this, flushed_sync_gen, first_valid_entry,
initial_first_valid_entry, retiring_entries](int r) {
[this, first_valid_entry, initial_first_valid_entry,
retiring_entries](int r) {
uint64_t allocated_bytes = 0;
uint64_t cached_bytes = 0;
uint64_t former_log_pos = 0;
Expand Down Expand Up @@ -713,8 +722,8 @@ bool WriteLog<I>::retire_entries(const unsigned long int frees_per_tx) {
this->process_writeback_dirty_entries();
});

std::lock_guard locker(m_lock);
schedule_update_root(new_root, ctx);
std::lock_guard locker(m_lock);
schedule_update_root(new_root, ctx);
} else {
ldout(cct, 20) << "Nothing to retire" << dendl;
return false;
Expand All @@ -724,14 +733,14 @@ bool WriteLog<I>::retire_entries(const unsigned long int frees_per_tx) {

template <typename I>
void WriteLog<I>::append_ops(GenericLogOperations &ops, Context *ctx,
uint64_t* new_first_free_entry,
uint64_t &bytes_allocated) {
uint64_t* new_first_free_entry) {
GenericLogEntriesVector log_entries;
CephContext *cct = m_image_ctx.cct;
uint64_t span_payload_len = 0;
bytes_allocated = 0;
uint64_t bytes_to_free = 0;
ldout(cct, 20) << "Appending " << ops.size() << " log entries." << dendl;

*new_first_free_entry = pool_root.first_free_entry;
AioTransContext* aio = new AioTransContext(cct, ctx);

utime_t now = ceph_clock_now();
Expand All @@ -742,9 +751,9 @@ void WriteLog<I>::append_ops(GenericLogOperations &ops, Context *ctx,
if (log_entries.size() == CONTROL_BLOCK_MAX_LOG_ENTRIES ||
span_payload_len >= SPAN_MAX_DATA_LEN) {
if (log_entries.size() > 1) {
bytes_allocated += (log_entries.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE;
bytes_to_free += (log_entries.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE;
}
write_log_entries(log_entries, aio);
write_log_entries(log_entries, aio, new_first_free_entry);
log_entries.clear();
span_payload_len = 0;
}
Expand All @@ -753,30 +762,40 @@ void WriteLog<I>::append_ops(GenericLogOperations &ops, Context *ctx,
}
if (!span_payload_len || !log_entries.empty()) {
if (log_entries.size() > 1) {
bytes_allocated += (log_entries.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE;
bytes_to_free += (log_entries.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE;
}
write_log_entries(log_entries, aio);
write_log_entries(log_entries, aio, new_first_free_entry);
}

{
std::lock_guard locker1(m_lock);
m_first_free_entry = *new_first_free_entry;
m_bytes_allocated -= bytes_to_free;
}

bdev->aio_submit(&aio->ioc);
*new_first_free_entry = pool_root.first_free_entry;
}

template <typename I>
void WriteLog<I>::write_log_entries(GenericLogEntriesVector log_entries,
AioTransContext *aio) {
AioTransContext *aio, uint64_t *pos) {
CephContext *cct = m_image_ctx.cct;
ldout(m_image_ctx.cct, 20) << dendl;
bufferlist data_bl;
ldout(m_image_ctx.cct, 20) << "pos=" << *pos << dendl;
ceph_assert(*pos >= DATA_RING_BUFFER_OFFSET &&
*pos < this->m_log_pool_size &&
*pos % MIN_WRITE_ALLOC_SSD_SIZE == 0);

// The first block is for log entries
uint64_t data_pos = pool_root.first_free_entry + MIN_WRITE_ALLOC_SSD_SIZE;
ldout(m_image_ctx.cct, 20) << "data_pos: " << data_pos << dendl;
if (data_pos == pool_root.pool_size) {
data_pos = data_pos % pool_root.pool_size + DATA_RING_BUFFER_OFFSET;
uint64_t control_block_pos = *pos;
*pos += MIN_WRITE_ALLOC_SSD_SIZE;
if (*pos == this->m_log_pool_size) {
*pos = DATA_RING_BUFFER_OFFSET;
}

std::vector<WriteLogCacheEntry> persist_log_entries;
bufferlist data_bl;
for (auto &log_entry : log_entries) {
log_entry->log_entry_index = pool_root.first_free_entry;
log_entry->log_entry_index = control_block_pos;
// Append data buffer for write operations
if (log_entry->is_write_entry()) {
auto write_entry = static_pointer_cast<WriteLogEntry>(log_entry);
Expand All @@ -785,10 +804,10 @@ void WriteLog<I>::write_log_entries(GenericLogEntriesVector log_entries,
data_bl.append(cache_bl);
data_bl.append_zero(align_size - cache_bl.length());

write_entry->ram_entry.write_data_pos = data_pos;
data_pos += align_size;
if (data_pos >= pool_root.pool_size) {
data_pos = data_pos % pool_root.pool_size + DATA_RING_BUFFER_OFFSET;
write_entry->ram_entry.write_data_pos = *pos;
*pos += align_size;
if (*pos >= this->m_log_pool_size) {
*pos = *pos % this->m_log_pool_size + DATA_RING_BUFFER_OFFSET;
}
}
// push_back _after_ setting write_data_pos
Expand All @@ -802,38 +821,41 @@ void WriteLog<I>::write_log_entries(GenericLogEntriesVector log_entries,
bl.append_zero(MIN_WRITE_ALLOC_SSD_SIZE - bl.length());
bl.append(data_bl);
ceph_assert(bl.length() % MIN_WRITE_ALLOC_SSD_SIZE == 0);
if (pool_root.first_free_entry + bl.length() > pool_root.pool_size) {
if (control_block_pos + bl.length() > this->m_log_pool_size) {
//exceeds border, need to split
uint64_t size = bl.length();
auto end = pool_root.pool_size - pool_root.first_free_entry;
bufferlist bl1;
bl.splice(0, end, &bl1);
bl.splice(0, this->m_log_pool_size - control_block_pos, &bl1);
ceph_assert(bl.length() == (size - bl1.length()));
ldout(cct, 20) << "The write on " << pool_root.first_free_entry
<< " with length " << size << " is split into two: "
<< "pos=" << pool_root.first_free_entry << ", "
<< "length=" << bl1.length() << "; "
<< "pos=" << DATA_RING_BUFFER_OFFSET << ", "
<< "length=" << bl.length() << dendl;

bdev->aio_write(pool_root.first_free_entry, bl1, &aio->ioc, false,

ldout(cct, 20) << "write " << control_block_pos << "~"
<< size << " spans boundary, split into "
<< control_block_pos << "~" << bl1.length()
<< " and " << DATA_RING_BUFFER_OFFSET << "~"
<< bl.length() << dendl;
bdev->aio_write(control_block_pos, bl1, &aio->ioc, false,
WRITE_LIFE_NOT_SET);
bdev->aio_write(DATA_RING_BUFFER_OFFSET, bl, &aio->ioc, false,
WRITE_LIFE_NOT_SET);
} else {
ldout(cct, 20) << "first_free_entry: " << pool_root.first_free_entry
<< " bl length: " << bl.length() << dendl;
bdev->aio_write(pool_root.first_free_entry, bl, &aio->ioc, false,
ldout(cct, 20) << "write " << control_block_pos << "~"
<< bl.length() << dendl;
bdev->aio_write(control_block_pos, bl, &aio->ioc, false,
WRITE_LIFE_NOT_SET);
ldout(cct, 20) << "finished aio_write log entries" << dendl;
}
// New first free entry
pool_root.first_free_entry = data_pos;
}

template <typename I>
void WriteLog<I>::schedule_update_root(
std::shared_ptr<WriteLogPoolRoot> root, Context *ctx) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 15) << "New root: pool_size=" << root->pool_size
<< " first_valid_entry=" << root->first_valid_entry
<< " first_free_entry=" << root->first_free_entry
<< " flushed_sync_gen=" << root->flushed_sync_gen
<< dendl;
ceph_assert(is_valid_pool_root(*root));

bool need_finisher;
{
ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
Expand Down
5 changes: 2 additions & 3 deletions src/librbd/cache/pwl/ssd/WriteLog.h
Expand Up @@ -122,10 +122,9 @@ class WriteLog : public AbstractWriteLog<ImageCtxT> {
Context* construct_flush_entry_ctx(
std::shared_ptr<GenericLogEntry> log_entry);
void append_ops(GenericLogOperations &ops, Context *ctx,
uint64_t* new_first_free_entry,
uint64_t &bytes_allocated);
uint64_t* new_first_free_entry);
void write_log_entries(GenericLogEntriesVector log_entries,
AioTransContext *aio);
AioTransContext *aio, uint64_t *pos);
void schedule_update_root(std::shared_ptr<WriteLogPoolRoot> root,
Context *ctx);
void enlist_op_update_root();
Expand Down

0 comments on commit 89caa62

Please sign in to comment.