Skip to content

Commit

Permalink
Merge pull request #7130 from liewegas/wip-bluestore
Browse files Browse the repository at this point in the history
osd: bluestore: more fixes
  • Loading branch information
liewegas committed Jan 11, 2016
2 parents 1899e7e + a87470e commit 9e9603e
Show file tree
Hide file tree
Showing 16 changed files with 303 additions and 74 deletions.
6 changes: 4 additions & 2 deletions src/common/config_opts.h
Expand Up @@ -139,7 +139,7 @@ SUBSYS(compressor, 1, 5)
SUBSYS(newstore, 1, 5)
SUBSYS(bluestore, 1, 5)
SUBSYS(bluefs, 1, 5)
SUBSYS(bdev, 1, 5)
SUBSYS(bdev, 1, 3)
SUBSYS(kstore, 1, 5)
SUBSYS(rocksdb, 4, 5)
SUBSYS(leveldb, 4, 5)
Expand Down Expand Up @@ -837,7 +837,7 @@ OPTION(bdev_debug_inflight_ios, OPT_BOOL, false)
OPTION(bdev_inject_crash, OPT_INT, 0) // if N>0, then ~ 1/N IOs will complete before we crash on flush.
OPTION(bdev_aio, OPT_BOOL, true)
OPTION(bdev_aio_poll_ms, OPT_INT, 250) // milliseconds
OPTION(bdev_aio_max_queue_depth, OPT_INT, 4096)
OPTION(bdev_aio_max_queue_depth, OPT_INT, 32)

OPTION(bluefs_alloc_size, OPT_U64, 1048576)
OPTION(bluefs_max_prefetch, OPT_U64, 1048576)
Expand Down Expand Up @@ -891,6 +891,8 @@ OPTION(bluestore_debug_misc, OPT_BOOL, false)
OPTION(bluestore_debug_no_reuse_blocks, OPT_BOOL, false)
OPTION(bluestore_debug_small_allocations, OPT_INT, 0)
OPTION(bluestore_debug_freelist, OPT_BOOL, false)
OPTION(bluestore_debug_prefill, OPT_FLOAT, 0)
OPTION(bluestore_debug_prefragment_max, OPT_INT, 1048576)

OPTION(kstore_max_ops, OPT_U64, 512)
OPTION(kstore_max_bytes, OPT_U64, 64*1024*1024)
Expand Down
66 changes: 64 additions & 2 deletions src/os/bluestore/BlockDevice.cc
Expand Up @@ -45,6 +45,8 @@ BlockDevice::BlockDevice(aio_callback_t cb, void *cbpriv)
size(0), block_size(0),
fs(NULL), aio(false), dio(false),
debug_lock("BlockDevice::debug_lock"),
ioc_reap_lock("BlockDevice::ioc_reap_lock"),
flush_lock("BlockDevice::flush_lock"),
aio_queue(g_conf->bdev_aio_max_queue_depth),
aio_callback(cb),
aio_callback_priv(cbpriv),
Expand Down Expand Up @@ -164,7 +166,15 @@ void BlockDevice::close()

int BlockDevice::flush()
{
// serialize flushers, so that we can avoid weird io_since_flush
// races (w/ multipler flushers).
Mutex::Locker l(flush_lock);
if (io_since_flush.read() == 0) {
dout(10) << __func__ << " no-op (no ios since last flush)" << dendl;
return 0;
}
dout(10) << __func__ << " start" << dendl;
io_since_flush.set(0);
if (g_conf->bdev_inject_crash) {
// sleep for a moment to give other threads a chance to submit or
// wait on io that races with a flush.
Expand Down Expand Up @@ -247,6 +257,15 @@ void BlockDevice::_aio_thread()
}
}
}
if (ioc_reap_count.read()) {
Mutex::Locker l(ioc_reap_lock);
for (auto p : ioc_reap_queue) {
dout(20) << __func__ << " reap ioc " << p << dendl;
delete p;
}
ioc_reap_queue.clear();
ioc_reap_count.dec();
}
}
dout(10) << __func__ << " end" << dendl;
}
Expand Down Expand Up @@ -394,7 +413,13 @@ int BlockDevice::aio_write(
derr << __func__ << " pwritev error: " << cpp_strerror(r) << dendl;
return r;
}
if (buffered) {
// initiate IO (but do not wait)
::sync_file_range(fd_buffered, off, len, SYNC_FILE_RANGE_WRITE);
}
}

io_since_flush.set(1);
return 0;
}

Expand All @@ -410,8 +435,6 @@ int BlockDevice::aio_zero(
assert(off < size);
assert(off + len <= size);

#warning fix discard (aio?)
//return fs->zero(fd, off, len);
bufferlist bl;
while (len > 0) {
bufferlist t;
Expand Down Expand Up @@ -464,6 +487,37 @@ int BlockDevice::read(uint64_t off, uint64_t len, bufferlist *pbl,
return r < 0 ? r : 0;
}

int BlockDevice::read_buffered(uint64_t off, uint64_t len, char *buf)
{
dout(5) << __func__ << " " << off << "~" << len << dendl;
assert(len > 0);
assert(off < size);
assert(off + len <= size);

int r = 0;
char *t = buf;
uint64_t left = len;
while (left > 0) {
r = ::pread(fd_buffered, t, left, off);
if (r < 0) {
r = -errno;
goto out;
}
off += r;
t += r;
left -= r;
}

dout(40) << __func__ << " data: ";
bufferlist bl;
bl.append(buf, len);
bl.hexdump(*_dout);
*_dout << dendl;

out:
return r < 0 ? r : 0;
}

int BlockDevice::invalidate_cache(uint64_t off, uint64_t len)
{
dout(5) << __func__ << " " << off << "~" << len << dendl;
Expand All @@ -477,3 +531,11 @@ int BlockDevice::invalidate_cache(uint64_t off, uint64_t len)
}
return r;
}

void BlockDevice::queue_reap_ioc(IOContext *ioc)
{
Mutex::Locker l(ioc_reap_lock);
if (ioc_reap_count.read() == 0)
ioc_reap_count.inc();
ioc_reap_queue.push_back(ioc);
}
10 changes: 10 additions & 0 deletions src/os/bluestore/BlockDevice.h
Expand Up @@ -55,6 +55,13 @@ class BlockDevice {
Mutex debug_lock;
interval_set<uint64_t> debug_inflight;

Mutex ioc_reap_lock;
vector<IOContext*> ioc_reap_queue;
atomic_t ioc_reap_count;

Mutex flush_lock;
atomic_t io_since_flush;

FS::aio_queue_t aio_queue;
aio_callback_t aio_callback;
void *aio_callback_priv;
Expand Down Expand Up @@ -93,6 +100,7 @@ class BlockDevice {
int read(uint64_t off, uint64_t len, bufferlist *pbl,
IOContext *ioc,
bool buffered);
int read_buffered(uint64_t off, uint64_t len, char *buf);

int aio_write(uint64_t off, bufferlist& bl,
IOContext *ioc,
Expand All @@ -101,6 +109,8 @@ class BlockDevice {
IOContext *ioc);
int flush();

void queue_reap_ioc(IOContext *ioc);

// for managing buffered readers/writers
int invalidate_cache(uint64_t off, uint64_t len);
int open(string path);
Expand Down
75 changes: 54 additions & 21 deletions src/os/bluestore/BlueFS.cc
Expand Up @@ -256,11 +256,6 @@ void BlueFS::umount()

_close_writer(log_writer);
log_writer = NULL;
// manually clean up it's iocs
for (auto p : ioc_reap_queue) {
delete p;
}
ioc_reap_queue.clear();

block_all.clear();
_stop_alloc();
Expand Down Expand Up @@ -345,6 +340,7 @@ int BlueFS::_replay()

FileReader *log_reader = new FileReader(
log_file, g_conf->bluefs_alloc_size,
false, // !random
true); // ignore eof
while (true) {
assert((log_reader->buf.pos & ~super.block_mask()) == 0);
Expand Down Expand Up @@ -425,7 +421,7 @@ int BlueFS::_replay()
dout(20) << __func__ << " " << pos << ": op_jump_seq "
<< next_seq << dendl;
assert(next_seq >= log_seq);
log_seq = next_seq;
log_seq = next_seq - 1; // we will increment it below
}
break;

Expand Down Expand Up @@ -612,6 +608,51 @@ void BlueFS::_drop_link(FileRef file)
}
}

int BlueFS::_read_random(
FileReader *h, ///< [in] read from here
uint64_t off, ///< [in] offset
size_t len, ///< [in] this many bytes
char *out) ///< [out] optional: or copy it here
{
dout(10) << __func__ << " h " << h << " " << off << "~" << len
<< " from " << h->file->fnode << dendl;

h->file->num_reading.inc();

if (!h->ignore_eof &&
off + len > h->file->fnode.size) {
if (off > h->file->fnode.size)
len = 0;
else
len = h->file->fnode.size - off;
dout(20) << __func__ << " reaching (or past) eof, len clipped to "
<< len << dendl;
}

int ret = 0;
while (len > 0) {
uint64_t x_off = 0;
vector<bluefs_extent_t>::iterator p = h->file->fnode.seek(off, &x_off);
uint64_t l = MIN(p->length - x_off, len);
if (!h->ignore_eof &&
off + l > h->file->fnode.size) {
l = h->file->fnode.size - off;
}
dout(20) << __func__ << " read buffered " << x_off << "~" << l << " of "
<< *p << dendl;
int r = bdev[p->bdev]->read_buffered(p->offset + x_off, l, out);
assert(r == 0);
off += l;
len -= l;
ret += l;
out += l;
}

dout(20) << __func__ << " got " << ret << dendl;
h->file->num_reading.dec();
return ret;
}

int BlueFS::_read(
FileReader *h, ///< [in] read from here
FileReaderBuffer *buf, ///< [in] reader state
Expand Down Expand Up @@ -744,7 +785,6 @@ void BlueFS::_maybe_compact_log()

void BlueFS::_compact_log()
{
#warning smarter _compact_log
// FIXME: we currently hold the lock while writing out the compacted log,
// which may mean a latency spike. we could drop the lock while writing out
// the big compacted log, while continuing to log at the end of the old log
Expand Down Expand Up @@ -867,6 +907,7 @@ int BlueFS::_flush_log()
_flush_bdev();
int r = _flush(log_writer, true);
assert(r == 0);
_flush_wait(log_writer);
_flush_bdev();

// clean dirty files
Expand Down Expand Up @@ -978,7 +1019,7 @@ int BlueFS::_flush_range(FileWriter *h, uint64_t offset, uint64_t length)
z.zero();
t.append(z);
}
bdev[p->bdev]->aio_write(p->offset + x_off, t, h->iocv[p->bdev], false);
bdev[p->bdev]->aio_write(p->offset + x_off, t, h->iocv[p->bdev], true);
bloff += x_len;
length -= x_len;
++p;
Expand All @@ -990,7 +1031,6 @@ int BlueFS::_flush_range(FileWriter *h, uint64_t offset, uint64_t length)
}
}
dout(20) << __func__ << " h " << h << " pos now " << h->pos << dendl;
_flush_wait(h);
return 0;
}

Expand Down Expand Up @@ -1063,6 +1103,7 @@ void BlueFS::_fsync(FileWriter *h)
{
dout(10) << __func__ << " " << h << " " << h->file->fnode << dendl;
_flush(h, true);
_flush_wait(h);
if (h->file->dirty) {
dout(20) << __func__ << " file metadata is dirty, flushing log on "
<< h->file->fnode << dendl;
Expand Down Expand Up @@ -1147,18 +1188,13 @@ void BlueFS::sync_metadata()
}
dout(10) << __func__ << dendl;
utime_t start = ceph_clock_now(NULL);
vector<IOContext*> iocv;
iocv.swap(ioc_reap_queue);
for (auto p : alloc) {
p->commit_start();
}
_flush_log();
for (auto p : alloc) {
p->commit_finish();
}
for (auto p : iocv) {
delete p;
}
utime_t end = ceph_clock_now(NULL);
utime_t dur = end - start;
dout(10) << __func__ << " done in " << dur << dendl;
Expand Down Expand Up @@ -1244,12 +1280,8 @@ int BlueFS::open_for_write(
void BlueFS::_close_writer(FileWriter *h)
{
dout(10) << __func__ << " " << h << dendl;
for (auto i : h->iocv) {
if (i->has_aios()) {
ioc_reap_queue.push_back(i);
} else {
delete i;
}
for (unsigned i=0; i<bdev.size(); ++i) {
bdev[i]->queue_reap_ioc(h->iocv[i]);
}
h->iocv.clear();
delete h;
Expand Down Expand Up @@ -1280,7 +1312,8 @@ int BlueFS::open_for_read(
}
File *file = q->second.get();

*h = new FileReader(file, random ? 4096 : g_conf->bluefs_max_prefetch);
*h = new FileReader(file, random ? 4096 : g_conf->bluefs_max_prefetch,
random, false);
dout(10) << __func__ << " h " << *h << " on " << file->fnode << dendl;
return 0;
}
Expand Down
19 changes: 15 additions & 4 deletions src/os/bluestore/BlueFS.h
Expand Up @@ -127,11 +127,13 @@ class BlueFS {
struct FileReader {
FileRef file;
FileReaderBuffer buf;
bool random;
bool ignore_eof; ///< used when reading our log file

FileReader(FileRef f, uint64_t mpf, bool ie = false)
FileReader(FileRef f, uint64_t mpf, bool rand, bool ie)
: file(f),
buf(mpf),
random(rand),
ignore_eof(ie) {
file->num_readers.inc();
}
Expand Down Expand Up @@ -180,8 +182,6 @@ class BlueFS {
vector<interval_set<uint64_t> > block_all; ///< extents in bdev we own
vector<Allocator*> alloc; ///< allocators for bdevs

vector<IOContext*> ioc_reap_queue; ///< iocs from closed writers

void _init_alloc();
void _stop_alloc();

Expand Down Expand Up @@ -215,6 +215,11 @@ class BlueFS {
size_t len, ///< [in] this many bytes
bufferlist *outbl, ///< [out] optional: reference the result here
char *out); ///< [out] optional: or copy it here
int _read_random(
FileReader *h, ///< [in] read from here
uint64_t offset, ///< [in] offset
size_t len, ///< [in] this many bytes
char *out); ///< [out] optional: or copy it here

void _invalidate_cache(FileRef f, uint64_t offset, uint64_t length);

Expand Down Expand Up @@ -315,9 +320,15 @@ class BlueFS {
// no need to hold the global lock here; we only touch h and
// h->file, and read vs write or delete is already protected (via
// atomics and asserts).
Mutex::Locker l(lock);
return _read(h, buf, offset, len, outbl, out);
}
int read_random(FileReader *h, uint64_t offset, size_t len,
char *out) {
// no need to hold the global lock here; we only touch h and
// h->file, and read vs write or delete is already protected (via
// atomics and asserts).
return _read_random(h, offset, len, out);
}
void invalidate_cache(FileRef f, uint64_t offset, uint64_t len) {
Mutex::Locker l(lock);
_invalidate_cache(f, offset, len);
Expand Down

0 comments on commit 9e9603e

Please sign in to comment.