Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rbd: crash during shutdown after writeback blocked by IO errors #5767

Merged
8 commits merged into from Sep 13, 2015
4 changes: 1 addition & 3 deletions src/client/Client.cc
Expand Up @@ -7365,9 +7365,7 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf)

// async, caching, non-blocking.
r = objectcacher->file_write(&in->oset, &in->layout, in->snaprealm->get_snap_context(),
offset, size, bl, ceph_clock_now(cct), 0,
client_lock);

offset, size, bl, ceph_clock_now(cct), 0);
put_cap_ref(in, CEPH_CAP_FILE_BUFFER);

if (r < 0)
Expand Down
35 changes: 16 additions & 19 deletions src/librbd/ImageCtx.cc
Expand Up @@ -632,25 +632,10 @@ class ThreadPoolSingleton : public ThreadPool {
wr->extents.push_back(extent);
{
Mutex::Locker l(cache_lock);
object_cacher->writex(wr, object_set, cache_lock, onfinish);
object_cacher->writex(wr, object_set, onfinish);
}
}

int ImageCtx::read_from_cache(object_t o, uint64_t object_no, bufferlist *bl,
size_t len, uint64_t off) {
int r;
Mutex mylock("librbd::ImageCtx::read_from_cache");
Cond cond;
bool done;
Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r);
aio_read_from_cache(o, object_no, bl, len, off, onfinish, 0);
mylock.Lock();
while (!done)
cond.Wait(mylock);
mylock.Unlock();
return r;
}

void ImageCtx::user_flushed() {
if (object_cacher && cct->_conf->rbd_cache_writethrough_until_flush) {
md_lock.get_read();
Expand Down Expand Up @@ -698,14 +683,26 @@ class ThreadPoolSingleton : public ThreadPool {
flush_async_operations();

RWLock::RLocker owner_locker(owner_lock);
invalidate_cache();
invalidate_cache(true);
object_cacher->stop();
}

int ImageCtx::invalidate_cache() {
int ImageCtx::invalidate_cache(bool purge_on_error) {
int result;
C_SaferCond ctx;
invalidate_cache(&ctx);
return ctx.wait();
result = ctx.wait();

if (result && purge_on_error) {
cache_lock.Lock();
if (object_cacher != NULL) {
lderr(cct) << "invalidate cache met error " << cpp_strerror(result) << " !Purging cache..." << dendl;
object_cacher->purge_set(object_set);
}
cache_lock.Unlock();
}

return result;
}

void ImageCtx::invalidate_cache(Context *on_finish) {
Expand Down
4 changes: 1 addition & 3 deletions src/librbd/ImageCtx.h
Expand Up @@ -191,13 +191,11 @@ namespace librbd {
int fadvise_flags);
void write_to_cache(object_t o, const bufferlist& bl, size_t len,
uint64_t off, Context *onfinish, int fadvise_flags);
int read_from_cache(object_t o, uint64_t object_no, bufferlist *bl,
size_t len, uint64_t off);
void user_flushed();
void flush_cache_aio(Context *onfinish);
int flush_cache();
void shutdown_cache();
int invalidate_cache();
int invalidate_cache(bool purge_on_error=false);
void invalidate_cache(Context *on_finish);
void invalidate_cache_completion(int r, Context *on_finish);
void clear_nonexistence_cache();
Expand Down
145 changes: 82 additions & 63 deletions src/osdc/ObjectCacher.cc
Expand Up @@ -379,7 +379,7 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(OSDWrite *wr)

if (p->first < cur) {
assert(final == 0);
if (cur + max >= p->first + p->second->length()) {
if (cur + max >= bh->end()) {
// we want right bit (one splice)
final = split(bh, cur); // just split it, take right half.
++p;
Expand All @@ -393,7 +393,7 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(OSDWrite *wr)
}
} else {
assert(p->first == cur);
if (p->second->length() <= max) {
if (bh->length() <= max) {
// whole bufferhead, piece of cake.
} else {
// we want left bit (one splice)
Expand Down Expand Up @@ -886,6 +886,7 @@ void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid, loff_t start,
}
}

list <BufferHead*> hit;
// apply to bh's!
for (map<loff_t, BufferHead*>::iterator p = ob->data_lower_bound(start);
p != ob->data.end();
Expand Down Expand Up @@ -917,6 +918,7 @@ void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid, loff_t start,
if (r >= 0) {
// ok! mark bh clean and error-free
mark_clean(bh);
hit.push_back(bh);
ldout(cct, 10) << "bh_write_commit clean " << *bh << dendl;
} else {
mark_dirty(bh);
Expand All @@ -926,6 +928,13 @@ void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid, loff_t start,
}
}

for (list<BufferHead*>::iterator bh = hit.begin();
bh != hit.end();
++bh) {
assert(*bh);
ob->try_merge_bh(*bh);
}

// update last_commit.
assert(ob->last_commit_tid < tid);
ob->last_commit_tid = tid;
Expand Down Expand Up @@ -1060,6 +1069,13 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
map<uint64_t, bufferlist> stripe_map; // final buffer offset -> substring
bool dontneed = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;

/*
* WARNING: we can only meaningfully return ENOENT if the read request
* passed in a single ObjectExtent. Any caller who wants ENOENT instead of
* zeroed buffers needs to feed single extents into readx().
*/
assert(!oset->return_enoent || rd->extents.size() == 1);

for (vector<ObjectExtent>::iterator ex_it = rd->extents.begin();
ex_it != rd->extents.end();
++ex_it) {
Expand All @@ -1075,10 +1091,6 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,

// does not exist and no hits?
if (oset->return_enoent && !o->exists) {
// WARNING: we can only meaningfully return ENOENT if the read request
// passed in a single ObjectExtent. Any caller who wants ENOENT instead of
// zeroed buffers needs to feed single extents into readx().
assert(rd->extents.size() == 1);
ldout(cct, 10) << "readx object !exists, 1 extent..." << dendl;

// should we worry about COW underneaeth us?
Expand Down Expand Up @@ -1139,6 +1151,7 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,

if (!missing.empty() || !rx.empty()) {
// read missing
map<loff_t, BufferHead*>::iterator last = missing.end();
for (map<loff_t, BufferHead*>::iterator bh_it = missing.begin();
bh_it != missing.end();
++bh_it) {
Expand All @@ -1160,15 +1173,20 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
delete bh_it->second;
} else {
bh_read(bh_it->second, rd->fadvise_flags);
if (success && onfinish) {
ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
<< " off " << bh_it->first << dendl;
bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, oset, onfinish) );
}
if ((success && onfinish) || last != missing.end())
last = bh_it;
}
success = false;
}

//add wait in last bh avoid wakeup early. Because read is order
if (last != missing.end()) {
ldout(cct, 10) << "readx missed, waiting on " << *last->second
<< " off " << last->first << dendl;
last->second->waitfor_read[last->first].push_back( new C_RetryRead(this, rd, oset, onfinish) );

}

// bump rx
for (map<loff_t, BufferHead*>::iterator bh_it = rx.begin();
bh_it != rx.end();
Expand Down Expand Up @@ -1210,56 +1228,58 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
}
}

// create reverse map of buffer offset -> object for the eventual result.
// this is over a single ObjectExtent, so we know that
// - the bh's are contiguous
// - the buffer frags need not be (and almost certainly aren't)
loff_t opos = ex_it->offset;
map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
assert(bh_it->second->start() <= opos);
uint64_t bhoff = opos - bh_it->second->start();
vector<pair<uint64_t,uint64_t> >::iterator f_it = ex_it->buffer_extents.begin();
uint64_t foff = 0;
while (1) {
BufferHead *bh = bh_it->second;
assert(opos == (loff_t)(bh->start() + bhoff));

uint64_t len = MIN(f_it->second - foff, bh->length() - bhoff);
ldout(cct, 10) << "readx rmap opos " << opos
<< ": " << *bh << " +" << bhoff
<< " frag " << f_it->first << "~" << f_it->second << " +" << foff << "~" << len
<< dendl;
if (!error) {
// create reverse map of buffer offset -> object for the eventual result.
// this is over a single ObjectExtent, so we know that
// - the bh's are contiguous
// - the buffer frags need not be (and almost certainly aren't)
loff_t opos = ex_it->offset;
map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
assert(bh_it->second->start() <= opos);
uint64_t bhoff = opos - bh_it->second->start();
vector<pair<uint64_t,uint64_t> >::iterator f_it = ex_it->buffer_extents.begin();
uint64_t foff = 0;
while (1) {
BufferHead *bh = bh_it->second;
assert(opos == (loff_t)(bh->start() + bhoff));

uint64_t len = MIN(f_it->second - foff, bh->length() - bhoff);
ldout(cct, 10) << "readx rmap opos " << opos
<< ": " << *bh << " +" << bhoff
<< " frag " << f_it->first << "~" << f_it->second << " +" << foff << "~" << len
<< dendl;

bufferlist bit; // put substr here first, since substr_of clobbers, and
// we may get multiple bh's at this stripe_map position
if (bh->is_zero()) {
bufferptr bp(len);
bp.zero();
stripe_map[f_it->first].push_back(bp);
} else {
bit.substr_of(bh->bl,
opos - bh->start(),
len);
stripe_map[f_it->first].claim_append(bit);
}

bufferlist bit; // put substr here first, since substr_of clobbers, and
// we may get multiple bh's at this stripe_map position
if (bh->is_zero()) {
bufferptr bp(len);
bp.zero();
stripe_map[f_it->first].push_back(bp);
} else {
bit.substr_of(bh->bl,
opos - bh->start(),
len);
stripe_map[f_it->first].claim_append(bit);
opos += len;
bhoff += len;
foff += len;
if (opos == bh->end()) {
++bh_it;
bhoff = 0;
}
if (foff == f_it->second) {
++f_it;
foff = 0;
}
if (bh_it == hits.end()) break;
if (f_it == ex_it->buffer_extents.end())
break;
}

opos += len;
bhoff += len;
foff += len;
if (opos == bh->end()) {
++bh_it;
bhoff = 0;
}
if (foff == f_it->second) {
++f_it;
foff = 0;
}
if (bh_it == hits.end()) break;
if (f_it == ex_it->buffer_extents.end())
break;
assert(f_it == ex_it->buffer_extents.end());
assert(opos == (loff_t)ex_it->offset + (loff_t)ex_it->length);
}
assert(f_it == ex_it->buffer_extents.end());
assert(opos == (loff_t)ex_it->offset + (loff_t)ex_it->length);

if (dontneed && o->include_all_cached_data(ex_it->offset, ex_it->length))
bottouch_ob(o);
Expand Down Expand Up @@ -1303,7 +1323,7 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
assert(rd->bl->length() == pos);
}
ldout(cct, 10) << "readx result is " << rd->bl->length() << dendl;
} else {
} else if (!error) {
ldout(cct, 10) << "readx no bufferlist ptr (readahead?), done." << dendl;
map<uint64_t,bufferlist>::reverse_iterator i = stripe_map.rbegin();
pos = i->first + i->second.length();
Expand Down Expand Up @@ -1334,8 +1354,7 @@ void ObjectCacher::retry_waiting_reads()
waitfor_read.splice(waitfor_read.end(), ls);
}

int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock,
Context *onfreespace)
int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace)
{
assert(lock.is_locked());
utime_t now = ceph_clock_now(cct);
Expand Down Expand Up @@ -1408,7 +1427,7 @@ int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock,
}
}

int r = _wait_for_write(wr, bytes_written, oset, wait_on_lock, onfreespace);
int r = _wait_for_write(wr, bytes_written, oset, onfreespace);
delete wr;

//verify_stats();
Expand Down Expand Up @@ -1456,7 +1475,7 @@ void ObjectCacher::maybe_wait_for_writeback(uint64_t len)
}

// blocking wait for write.
int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Mutex& lock, Context *onfreespace)
int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Context *onfreespace)
{
assert(lock.is_locked());
int ret = 0;
Expand Down
11 changes: 4 additions & 7 deletions src/osdc/ObjectCacher.h
Expand Up @@ -602,14 +602,12 @@ class ObjectCacher {
* the return value is total bytes read
*/
int readx(OSDRead *rd, ObjectSet *oset, Context *onfinish);
int writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock,
Context *onfreespace);
int writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace);
bool is_cached(ObjectSet *oset, vector<ObjectExtent>& extents, snapid_t snapid);

private:
// write blocking
int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Mutex& lock,
Context *onfreespace);
int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Context *onfreespace);
void maybe_wait_for_writeback(uint64_t len);
bool _flush_set_finish(C_GatherBuilder *gather, Context *onfinish);

Expand Down Expand Up @@ -678,11 +676,10 @@ class ObjectCacher {

int file_write(ObjectSet *oset, ceph_file_layout *layout, const SnapContext& snapc,
loff_t offset, uint64_t len,
bufferlist& bl, utime_t mtime, int flags,
Mutex& wait_on_lock) {
bufferlist& bl, utime_t mtime, int flags) {
OSDWrite *wr = prepare_write(snapc, bl, mtime, flags);
Striper::file_to_extents(cct, oset->ino, layout, offset, len, oset->truncate_size, wr->extents);
return writex(wr, oset, wait_on_lock, NULL);
return writex(wr, oset, NULL);
}

bool file_flush(ObjectSet *oset, ceph_file_layout *layout, const SnapContext& snapc,
Expand Down
2 changes: 1 addition & 1 deletion src/test/osdc/object_cacher_stress.cc
Expand Up @@ -112,7 +112,7 @@ int stress_test(uint64_t num_ops, uint64_t num_objs,
ObjectCacher::OSDWrite *wr = obc.prepare_write(snapc, bl, utime_t(), 0);
wr->extents.push_back(op->extent);
lock.Lock();
obc.writex(wr, &object_set, lock, NULL);
obc.writex(wr, &object_set, NULL);
lock.Unlock();
}
}
Expand Down