Skip to content

Commit

Permalink
Merge pull request #9898 from liewegas/wip-bluefs-async-flush
Browse files Browse the repository at this point in the history
os/bluestore/BlueFS: do not hold internal lock while waiting for IO
  • Loading branch information
liewegas committed Jun 30, 2016
2 parents dd51efb + 6a696a6 commit 58cc8ab
Show file tree
Hide file tree
Showing 5 changed files with 354 additions and 55 deletions.
1 change: 1 addition & 0 deletions src/common/buffer.cc
Expand Up @@ -2273,6 +2273,7 @@ int buffer::list::write_fd(int fd, uint64_t offset) const

void buffer::list::prepare_iov(std::vector<iovec> *piov) const
{
assert(_buffers.size() <= IOV_MAX);
piov->resize(_buffers.size());
unsigned n = 0;
for (std::list<buffer::ptr>::const_iterator p = _buffers.begin();
Expand Down
146 changes: 105 additions & 41 deletions src/os/bluestore/BlueFS.cc
Expand Up @@ -17,6 +17,7 @@ BlueFS::BlueFS()
: logger(NULL),
ino_last(0),
log_seq(0),
log_seq_stable(0),
log_writer(NULL),
bdev(MAX_BDEV),
ioc(MAX_BDEV),
Expand Down Expand Up @@ -133,7 +134,7 @@ uint64_t BlueFS::get_block_device_size(unsigned id)

void BlueFS::add_block_extent(unsigned id, uint64_t offset, uint64_t length)
{
std::lock_guard<std::mutex> l(lock);
std::unique_lock<std::mutex> l(lock);
dout(1) << __func__ << " bdev " << id
<< " 0x" << std::hex << offset << "~" << length << std::dec
<< dendl;
Expand All @@ -145,7 +146,7 @@ void BlueFS::add_block_extent(unsigned id, uint64_t offset, uint64_t length)

if (alloc.size()) {
log_t.op_alloc_add(id, offset, length);
int r = _flush_log();
int r = _flush_and_sync_log(l);
assert(r == 0);
alloc[id]->init_add_free(offset, length);
}
Expand All @@ -158,7 +159,7 @@ void BlueFS::add_block_extent(unsigned id, uint64_t offset, uint64_t length)
int BlueFS::reclaim_blocks(unsigned id, uint64_t want,
uint64_t *offset, uint32_t *length)
{
std::lock_guard<std::mutex> l(lock);
std::unique_lock<std::mutex> l(lock);
dout(1) << __func__ << " bdev " << id
<< " want 0x" << std::hex << want << std:: dec << dendl;
assert(id < alloc.size());
Expand All @@ -175,7 +176,7 @@ int BlueFS::reclaim_blocks(unsigned id, uint64_t want,
block_all[id].erase(*offset, *length);
block_total[id] -= *length;
log_t.op_alloc_rm(id, *offset, *length);
r = _flush_log();
r = _flush_and_sync_log(l);
assert(r == 0);

if (logger)
Expand Down Expand Up @@ -234,6 +235,7 @@ int BlueFS::get_block_extents(unsigned id, interval_set<uint64_t> *extents)

int BlueFS::mkfs(uuid_d osd_uuid)
{
std::unique_lock<std::mutex> l(lock);
dout(1) << __func__
<< " osd_uuid " << osd_uuid
<< dendl;
Expand Down Expand Up @@ -271,12 +273,12 @@ int BlueFS::mkfs(uuid_d osd_uuid)
log_t.op_alloc_add(bdev, q.get_start(), q.get_len());
}
}
_flush_log();
_flush_and_sync_log(l);

// write supers
super.log_fnode = log_file->fnode;
_write_super();
_flush_bdev();
flush_bdev();

// clean up
super = bluefs_super_t();
Expand Down Expand Up @@ -401,13 +403,15 @@ int BlueFS::_write_super()
::encode(super, bl);
uint32_t crc = bl.crc32c(-1);
::encode(crc, bl);
dout(10) << __func__ << " super block length(encoded): " << bl.length() << dendl;
dout(10) << __func__ << " superblock " << super.version << dendl;
dout(10) << __func__ << " log_fnode " << super.log_fnode << dendl;
assert(bl.length() <= get_super_length());
bl.append_zero(get_super_length() - bl.length());

IOContext ioc(NULL);
bdev[BDEV_DB]->aio_write(get_super_offset(), bl, &ioc, false);
bdev[BDEV_DB]->aio_submit(&ioc);
ioc.aio_wait();
bdev[BDEV_DB]->aio_write(get_super_offset(), bl, ioc[BDEV_DB], false);
bdev[BDEV_DB]->aio_submit(ioc[BDEV_DB]);
ioc[BDEV_DB]->aio_wait();
dout(20) << __func__ << " v " << super.version
<< " crc 0x" << std::hex << crc
<< " offset 0x" << get_super_offset() << std::dec
Expand Down Expand Up @@ -456,6 +460,7 @@ int BlueFS::_replay()

FileRef log_file = _get_file(1);
log_file->fnode = super.log_fnode;
dout(10) << __func__ << " log_fnode " << super.log_fnode << dendl;

FileReader *log_reader = new FileReader(
log_file, g_conf->bluefs_alloc_size,
Expand Down Expand Up @@ -737,8 +742,9 @@ void BlueFS::_drop_link(FileRef file)
}
file_map.erase(file->fnode.ino);
file->deleted = true;
if (file->dirty) {
file->dirty = false;
if (file->dirty_seq) {
assert(file->dirty_seq > log_seq_stable);
file->dirty_seq = 0;
dirty_files.erase(dirty_files.iterator_to(*file));
}
}
Expand Down Expand Up @@ -995,13 +1001,13 @@ void BlueFS::_compact_log()
log_writer->append(bl);
int r = _flush(log_writer, true);
assert(r == 0);
_flush_wait(log_writer);
wait_for_aio(log_writer);

dout(10) << __func__ << " writing super" << dendl;
super.log_fnode = log_file->fnode;
++super.version;
_write_super();
_flush_bdev();
flush_bdev();

dout(10) << __func__ << " release old log extents " << old_extents << dendl;
for (auto& r : old_extents) {
Expand All @@ -1021,9 +1027,26 @@ void BlueFS::_pad_bl(bufferlist& bl)
}
}

int BlueFS::_flush_log()
int BlueFS::_flush_and_sync_log(std::unique_lock<std::mutex>& l,
uint64_t want_seq)
{
log_t.seq = ++log_seq;
while (log_flushing) {
dout(10) << __func__ << " want_seq " << want_seq
<< " log is currently flushing, waiting" << dendl;
log_cond.wait(l);
}
if (want_seq && want_seq <= log_seq_stable) {
dout(10) << __func__ << " want_seq " << want_seq << " <= log_seq_stable "
<< log_seq_stable << ", done" << dendl;
return 0;
}
if (log_t.empty()) {
dout(10) << __func__ << " want_seq " << want_seq
<< " " << log_t << " not dirty, no-op" << dendl;
return 0;
}
uint64_t seq = log_t.seq = ++log_seq;
assert(want_seq == 0 || want_seq <= seq);
log_t.uuid = super.uuid;
dout(10) << __func__ << " " << log_t << dendl;
assert(!log_t.empty());
Expand Down Expand Up @@ -1051,21 +1074,41 @@ int BlueFS::_flush_log()

log_t.clear();
log_t.seq = 0; // just so debug output is less confusing
log_flushing = true;

_flush_bdev();
flush_bdev();
int r = _flush(log_writer, true);
assert(r == 0);
_flush_wait(log_writer);
_flush_bdev();

// drop lock while we wait for io
l.unlock();
wait_for_aio(log_writer);
flush_bdev();
l.lock();

log_flushing = false;
log_cond.notify_all();

// clean dirty files
dirty_file_list_t::iterator p = dirty_files.begin();
while (p != dirty_files.end()) {
File *file = &(*p);
assert(file->dirty);
dout(20) << __func__ << " cleaned file " << file->fnode << dendl;
file->dirty = false;
dirty_files.erase(p++);
if (seq > log_seq_stable) {
log_seq_stable = seq;
dout(20) << __func__ << " log_seq_stable " << log_seq_stable << dendl;
dirty_file_list_t::iterator p = dirty_files.begin();
while (p != dirty_files.end()) {
File *file = &(*p);
assert(file->dirty_seq > 0);
if (file->dirty_seq <= log_seq_stable) {
dout(20) << __func__ << " cleaned file " << file->fnode << dendl;
file->dirty_seq = 0;
dirty_files.erase(p++);
} else {
++p;
}
}
} else {
dout(20) << __func__ << " log_seq_stable " << log_seq_stable
<< " already > out seq " << seq
<< ", we lost a race against another log flush, done" << dendl;
}

_update_logger_stats();
Expand Down Expand Up @@ -1107,15 +1150,25 @@ int BlueFS::_flush_range(FileWriter *h, uint64_t offset, uint64_t length)
}
if (h->file->fnode.size < offset + length) {
h->file->fnode.size = offset + length;
must_dirty = true;
if (h->file->fnode.ino != 1) {
// we do not need to dirty the log file when the file size
// changes because replay is smart enough to discover it on its
// own.
must_dirty = true;
}
}
if (must_dirty) {
h->file->fnode.mtime = ceph_clock_now(NULL);
log_t.op_file_update(h->file->fnode);
if (!h->file->dirty) {
h->file->dirty = true;
if (h->file->dirty_seq == 0) {
dirty_files.push_back(*h->file);
dout(20) << __func__ << " dirty_seq = " << log_seq + 1
<< " (was clean)" << dendl;
} else {
dout(20) << __func__ << " dirty_seq = " << log_seq + 1
<< " (was " << h->file->dirty_seq << ")" << dendl;
}
h->file->dirty_seq = log_seq + 1;
}
dout(20) << __func__ << " file now " << h->file->fnode << dendl;

Expand Down Expand Up @@ -1202,8 +1255,10 @@ int BlueFS::_flush_range(FileWriter *h, uint64_t offset, uint64_t length)
return 0;
}

void BlueFS::_flush_wait(FileWriter *h)
void BlueFS::wait_for_aio(FileWriter *h)
{
// NOTE: this is safe to call without a lock, as long as our reference is
// stable.
dout(10) << __func__ << " " << h << dendl;
utime_t start = ceph_clock_now(NULL);
for (auto p : h->iocv) {
Expand Down Expand Up @@ -1271,21 +1326,30 @@ int BlueFS::_truncate(FileWriter *h, uint64_t offset)
return 0;
}

void BlueFS::_fsync(FileWriter *h)
int BlueFS::_fsync(FileWriter *h, std::unique_lock<std::mutex>& l)
{
dout(10) << __func__ << " " << h << " " << h->file->fnode << dendl;
_flush(h, true);
_flush_wait(h);
if (h->file->dirty) {
dout(20) << __func__ << " file metadata is dirty, flushing log on "
<< h->file->fnode << dendl;
_flush_log();
assert(!h->file->dirty);
int r = _flush(h, true);
if (r < 0)
return r;
uint64_t old_dirty_seq = h->file->dirty_seq;
lock.unlock();
wait_for_aio(h);
lock.lock();
if (old_dirty_seq) {
uint64_t s = log_seq;
dout(20) << __func__ << " file metadata was dirty (" << old_dirty_seq
<< ") on " << h->file->fnode << ", flushing log" << dendl;
_flush_and_sync_log(l, old_dirty_seq);
assert(h->file->dirty_seq == 0 || // cleaned
h->file->dirty_seq > s); // or redirtied by someone else
}
return 0;
}

void BlueFS::_flush_bdev()
void BlueFS::flush_bdev()
{
// NOTE: this is safe to call without a lock.
dout(20) << __func__ << dendl;
for (auto p : bdev) {
if (p)
Expand Down Expand Up @@ -1363,7 +1427,7 @@ int BlueFS::_preallocate(FileRef f, uint64_t off, uint64_t len)

void BlueFS::sync_metadata()
{
std::lock_guard<std::mutex> l(lock);
std::unique_lock<std::mutex> l(lock);
if (log_t.empty()) {
dout(10) << __func__ << " - no pending log events" << dendl;
return;
Expand All @@ -1375,7 +1439,7 @@ void BlueFS::sync_metadata()
p->commit_start();
}
}
_flush_log();
_flush_and_sync_log(l);
for (auto p : alloc) {
if (p) {
p->commit_finish();
Expand Down
22 changes: 13 additions & 9 deletions src/os/bluestore/BlueFS.h
Expand Up @@ -54,7 +54,7 @@ class BlueFS {
struct File : public RefCountedObject {
bluefs_fnode_t fnode;
int refs;
bool dirty;
uint64_t dirty_seq;
bool locked;
bool deleted;
boost::intrusive::list_member_hook<> dirty_item;
Expand All @@ -65,7 +65,7 @@ class BlueFS {
File()
: RefCountedObject(NULL, 0),
refs(0),
dirty(false),
dirty_seq(0),
locked(false),
deleted(false),
num_readers(0),
Expand Down Expand Up @@ -201,8 +201,11 @@ class BlueFS {
bluefs_super_t super; ///< latest superblock (as last written)
uint64_t ino_last; ///< last assigned ino (this one is in use)
uint64_t log_seq; ///< last used log seq (by current pending log_t)
uint64_t log_seq_stable; ///< last stable/synced log seq
FileWriter *log_writer; ///< writer for the log
bluefs_transaction_t log_t; ///< pending, unwritten log transaction
bool log_flushing = false; ///< true while flushing the log
std::condition_variable log_cond;

/*
* There are up to 3 block devices:
Expand Down Expand Up @@ -232,17 +235,18 @@ class BlueFS {
int _allocate(unsigned bdev, uint64_t len, vector<bluefs_extent_t> *ev);
int _flush_range(FileWriter *h, uint64_t offset, uint64_t length);
int _flush(FileWriter *h, bool force);
void _flush_wait(FileWriter *h);
void _fsync(FileWriter *h);
void wait_for_aio(FileWriter *h); // safe to call without a lock
int _fsync(FileWriter *h, std::unique_lock<std::mutex>& l);

int _flush_log();
int _flush_and_sync_log(std::unique_lock<std::mutex>& l,
uint64_t want_seq = 0);
uint64_t _estimate_log_size();
void _maybe_compact_log();
void _compact_log();

//void _aio_finish(void *priv);

void _flush_bdev();
void flush_bdev(); // this is safe to call without a lock

int _preallocate(FileRef f, uint64_t off, uint64_t len);
int _truncate(FileWriter *h, uint64_t off);
Expand Down Expand Up @@ -355,9 +359,9 @@ class BlueFS {
std::lock_guard<std::mutex> l(lock);
_flush_range(h, offset, length);
}
void fsync(FileWriter *h) {
std::lock_guard<std::mutex> l(lock);
_fsync(h);
int fsync(FileWriter *h) {
std::unique_lock<std::mutex> l(lock);
return _fsync(h, l);
}
int read(FileReader *h, FileReaderBuffer *buf, uint64_t offset, size_t len,
bufferlist *outbl, char *out) {
Expand Down
4 changes: 2 additions & 2 deletions src/os/bluestore/KernelDevice.cc
Expand Up @@ -385,10 +385,10 @@ int KernelDevice::aio_write(
assert(off < size);
assert(off + len <= size);

if (!buffered && bl.rebuild_aligned_size_and_memory(block_size, block_size)) {
if ((!buffered || bl.get_num_buffers() >= IOV_MAX) &&
bl.rebuild_aligned_size_and_memory(block_size, block_size)) {
dout(20) << __func__ << " rebuilding buffer to be aligned" << dendl;
}

dout(40) << "data: ";
bl.hexdump(*_dout);
*_dout << dendl;
Expand Down

0 comments on commit 58cc8ab

Please sign in to comment.