Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

librbd/cache/pwl/ssd: fix first_free_entry and m_first_free_entry corruption #41490

Merged
merged 4 commits into from Jul 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
122 changes: 72 additions & 50 deletions src/librbd/cache/pwl/ssd/WriteLog.cc
Expand Up @@ -35,6 +35,16 @@ using namespace librbd::cache::pwl;
// SSD: this number can be updated later
const unsigned long int ops_appended_together = MAX_WRITES_PER_SYNC_POINT;

static bool is_valid_pool_root(const WriteLogPoolRoot& root) {
return root.pool_size % MIN_WRITE_ALLOC_SSD_SIZE == 0 &&
root.first_valid_entry >= DATA_RING_BUFFER_OFFSET &&
root.first_valid_entry < root.pool_size &&
root.first_valid_entry % MIN_WRITE_ALLOC_SSD_SIZE == 0 &&
root.first_free_entry >= DATA_RING_BUFFER_OFFSET &&
root.first_free_entry < root.pool_size &&
root.first_free_entry % MIN_WRITE_ALLOC_SSD_SIZE == 0;
}

template <typename I>
Builder<AbstractWriteLog<I>>* WriteLog<I>::create_builder() {
m_builderobj = new Builder<This>();
Expand Down Expand Up @@ -208,9 +218,15 @@ void WriteLog<I>::load_existing_entries(pwl::DeferredContexts &later) {

auto p = bl.cbegin();
decode(superblock, p);
ldout(cct,5) << "Decoded superblock" << dendl;

pool_root = superblock.root;
ldout(cct, 1) << "Decoded root: pool_size=" << pool_root.pool_size
<< " first_valid_entry=" << pool_root.first_valid_entry
<< " first_free_entry=" << pool_root.first_free_entry
<< " flushed_sync_gen=" << pool_root.flushed_sync_gen
<< dendl;
ceph_assert(is_valid_pool_root(pool_root));

this->m_log_pool_size = pool_root.pool_size;
this->m_flushed_sync_gen = pool_root.flushed_sync_gen;
this->m_first_valid_entry = pool_root.first_valid_entry;
Expand Down Expand Up @@ -434,14 +450,7 @@ void WriteLog<I>::append_op_log_entries(GenericLogOperations &ops) {
}
});
// Append logs and update first_free_update
uint64_t bytes_allocated_updated;
append_ops(ops, append_ctx, new_first_free_entry, bytes_allocated_updated);

{
std::lock_guard locker1(m_lock);
m_first_free_entry = *new_first_free_entry;
m_bytes_allocated -= bytes_allocated_updated;
}
append_ops(ops, append_ctx, new_first_free_entry);

if (ops.size()) {
this->dispatch_deferred_writes();
Expand Down Expand Up @@ -660,8 +669,8 @@ bool WriteLog<I>::retire_entries(const unsigned long int frees_per_tx) {
pool_root.first_valid_entry = first_valid_entry;

Context *ctx = new LambdaContext(
[this, flushed_sync_gen, first_valid_entry,
initial_first_valid_entry, retiring_entries](int r) {
[this, first_valid_entry, initial_first_valid_entry,
retiring_entries](int r) {
uint64_t allocated_bytes = 0;
uint64_t cached_bytes = 0;
uint64_t former_log_pos = 0;
Expand Down Expand Up @@ -706,8 +715,8 @@ bool WriteLog<I>::retire_entries(const unsigned long int frees_per_tx) {
this->process_writeback_dirty_entries();
});

std::lock_guard locker(m_lock);
schedule_update_root(new_root, ctx);
std::lock_guard locker(m_lock);
schedule_update_root(new_root, ctx);
} else {
ldout(cct, 20) << "Nothing to retire" << dendl;
return false;
Expand All @@ -717,14 +726,14 @@ bool WriteLog<I>::retire_entries(const unsigned long int frees_per_tx) {

template <typename I>
void WriteLog<I>::append_ops(GenericLogOperations &ops, Context *ctx,
uint64_t* new_first_free_entry,
uint64_t &bytes_allocated) {
uint64_t* new_first_free_entry) {
GenericLogEntriesVector log_entries;
CephContext *cct = m_image_ctx.cct;
uint64_t span_payload_len = 0;
bytes_allocated = 0;
uint64_t bytes_to_free = 0;
ldout(cct, 20) << "Appending " << ops.size() << " log entries." << dendl;

*new_first_free_entry = pool_root.first_free_entry;
AioTransContext* aio = new AioTransContext(cct, ctx);

utime_t now = ceph_clock_now();
Expand All @@ -735,9 +744,9 @@ void WriteLog<I>::append_ops(GenericLogOperations &ops, Context *ctx,
if (log_entries.size() == CONTROL_BLOCK_MAX_LOG_ENTRIES ||
span_payload_len >= SPAN_MAX_DATA_LEN) {
if (log_entries.size() > 1) {
bytes_allocated += (log_entries.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE;
bytes_to_free += (log_entries.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE;
}
write_log_entries(log_entries, aio);
write_log_entries(log_entries, aio, new_first_free_entry);
log_entries.clear();
span_payload_len = 0;
}
Expand All @@ -746,30 +755,40 @@ void WriteLog<I>::append_ops(GenericLogOperations &ops, Context *ctx,
}
if (!span_payload_len || !log_entries.empty()) {
if (log_entries.size() > 1) {
bytes_allocated += (log_entries.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE;
bytes_to_free += (log_entries.size() - 1) * MIN_WRITE_ALLOC_SSD_SIZE;
}
write_log_entries(log_entries, aio);
write_log_entries(log_entries, aio, new_first_free_entry);
}

{
std::lock_guard locker1(m_lock);
m_first_free_entry = *new_first_free_entry;
m_bytes_allocated -= bytes_to_free;
}

bdev->aio_submit(&aio->ioc);
*new_first_free_entry = pool_root.first_free_entry;
}

template <typename I>
void WriteLog<I>::write_log_entries(GenericLogEntriesVector log_entries,
AioTransContext *aio) {
AioTransContext *aio, uint64_t *pos) {
CephContext *cct = m_image_ctx.cct;
ldout(m_image_ctx.cct, 20) << dendl;
bufferlist data_bl;
ldout(m_image_ctx.cct, 20) << "pos=" << *pos << dendl;
ceph_assert(*pos >= DATA_RING_BUFFER_OFFSET &&
*pos < this->m_log_pool_size &&
*pos % MIN_WRITE_ALLOC_SSD_SIZE == 0);

// The first block is for log entries
uint64_t data_pos = pool_root.first_free_entry + MIN_WRITE_ALLOC_SSD_SIZE;
ldout(m_image_ctx.cct, 20) << "data_pos: " << data_pos << dendl;
if (data_pos == pool_root.pool_size) {
data_pos = data_pos % pool_root.pool_size + DATA_RING_BUFFER_OFFSET;
uint64_t control_block_pos = *pos;
*pos += MIN_WRITE_ALLOC_SSD_SIZE;
if (*pos == this->m_log_pool_size) {
*pos = DATA_RING_BUFFER_OFFSET;
}

std::vector<WriteLogCacheEntry> persist_log_entries;
bufferlist data_bl;
for (auto &log_entry : log_entries) {
log_entry->log_entry_index = pool_root.first_free_entry;
log_entry->log_entry_index = control_block_pos;
// Append data buffer for write operations
if (log_entry->is_write_entry()) {
auto write_entry = static_pointer_cast<WriteLogEntry>(log_entry);
Expand All @@ -778,10 +797,10 @@ void WriteLog<I>::write_log_entries(GenericLogEntriesVector log_entries,
data_bl.append(cache_bl);
data_bl.append_zero(align_size - cache_bl.length());

write_entry->ram_entry.write_data_pos = data_pos;
data_pos += align_size;
if (data_pos >= pool_root.pool_size) {
data_pos = data_pos % pool_root.pool_size + DATA_RING_BUFFER_OFFSET;
write_entry->ram_entry.write_data_pos = *pos;
*pos += align_size;
if (*pos >= this->m_log_pool_size) {
*pos = *pos % this->m_log_pool_size + DATA_RING_BUFFER_OFFSET;
}
}
// push_back _after_ setting write_data_pos
Expand All @@ -795,38 +814,41 @@ void WriteLog<I>::write_log_entries(GenericLogEntriesVector log_entries,
bl.append_zero(MIN_WRITE_ALLOC_SSD_SIZE - bl.length());
bl.append(data_bl);
ceph_assert(bl.length() % MIN_WRITE_ALLOC_SSD_SIZE == 0);
if (pool_root.first_free_entry + bl.length() > pool_root.pool_size) {
if (control_block_pos + bl.length() > this->m_log_pool_size) {
//exceeds border, need to split
uint64_t size = bl.length();
auto end = pool_root.pool_size - pool_root.first_free_entry;
bufferlist bl1;
bl.splice(0, end, &bl1);
bl.splice(0, this->m_log_pool_size - control_block_pos, &bl1);
ceph_assert(bl.length() == (size - bl1.length()));
ldout(cct, 20) << "The write on " << pool_root.first_free_entry
<< " with length " << size << " is split into two: "
<< "pos=" << pool_root.first_free_entry << ", "
<< "length=" << bl1.length() << "; "
<< "pos=" << DATA_RING_BUFFER_OFFSET << ", "
<< "length=" << bl.length() << dendl;

bdev->aio_write(pool_root.first_free_entry, bl1, &aio->ioc, false,

ldout(cct, 20) << "write " << control_block_pos << "~"
<< size << " spans boundary, split into "
<< control_block_pos << "~" << bl1.length()
<< " and " << DATA_RING_BUFFER_OFFSET << "~"
<< bl.length() << dendl;
bdev->aio_write(control_block_pos, bl1, &aio->ioc, false,
WRITE_LIFE_NOT_SET);
bdev->aio_write(DATA_RING_BUFFER_OFFSET, bl, &aio->ioc, false,
WRITE_LIFE_NOT_SET);
} else {
ldout(cct, 20) << "first_free_entry: " << pool_root.first_free_entry
<< " bl length: " << bl.length() << dendl;
bdev->aio_write(pool_root.first_free_entry, bl, &aio->ioc, false,
ldout(cct, 20) << "write " << control_block_pos << "~"
<< bl.length() << dendl;
bdev->aio_write(control_block_pos, bl, &aio->ioc, false,
WRITE_LIFE_NOT_SET);
ldout(cct, 20) << "finished aio_write log entries" << dendl;
}
// New first free entry
pool_root.first_free_entry = data_pos;
}

template <typename I>
void WriteLog<I>::schedule_update_root(
std::shared_ptr<WriteLogPoolRoot> root, Context *ctx) {
CephContext *cct = m_image_ctx.cct;
ldout(cct, 15) << "New root: pool_size=" << root->pool_size
<< " first_valid_entry=" << root->first_valid_entry
<< " first_free_entry=" << root->first_free_entry
<< " flushed_sync_gen=" << root->flushed_sync_gen
<< dendl;
ceph_assert(is_valid_pool_root(*root));

bool need_finisher;
{
ceph_assert(ceph_mutex_is_locked_by_me(m_lock));
Expand Down
5 changes: 2 additions & 3 deletions src/librbd/cache/pwl/ssd/WriteLog.h
Expand Up @@ -122,10 +122,9 @@ class WriteLog : public AbstractWriteLog<ImageCtxT> {
Context* construct_flush_entry_ctx(
std::shared_ptr<GenericLogEntry> log_entry);
void append_ops(GenericLogOperations &ops, Context *ctx,
uint64_t* new_first_free_entry,
uint64_t &bytes_allocated);
uint64_t* new_first_free_entry);
void write_log_entries(GenericLogEntriesVector log_entries,
AioTransContext *aio);
AioTransContext *aio, uint64_t *pos);
void schedule_update_root(std::shared_ptr<WriteLogPoolRoot> root,
Context *ctx);
void enlist_op_update_root();
Expand Down