Skip to content

Commit

Permalink
Merge pull request #5767 from dachary/wip-12597-hammer
Browse files Browse the repository at this point in the history
Crash during shutdown after writeback blocked by IO errors

Reviewed-by: Jason Dillaman <dillaman@redhat.com>
  • Loading branch information
Loic Dachary committed Sep 13, 2015
2 parents c1849ec + 6c4ccc8 commit 23fb811
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 96 deletions.
4 changes: 1 addition & 3 deletions src/client/Client.cc
Expand Up @@ -7370,9 +7370,7 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf)

// async, caching, non-blocking.
r = objectcacher->file_write(&in->oset, &in->layout, in->snaprealm->get_snap_context(),
offset, size, bl, ceph_clock_now(cct), 0,
client_lock);

offset, size, bl, ceph_clock_now(cct), 0);
put_cap_ref(in, CEPH_CAP_FILE_BUFFER);

if (r < 0)
Expand Down
35 changes: 16 additions & 19 deletions src/librbd/ImageCtx.cc
Expand Up @@ -632,25 +632,10 @@ class ThreadPoolSingleton : public ThreadPool {
wr->extents.push_back(extent);
{
Mutex::Locker l(cache_lock);
object_cacher->writex(wr, object_set, cache_lock, onfinish);
object_cacher->writex(wr, object_set, onfinish);
}
}

int ImageCtx::read_from_cache(object_t o, uint64_t object_no, bufferlist *bl,
size_t len, uint64_t off) {
int r;
Mutex mylock("librbd::ImageCtx::read_from_cache");
Cond cond;
bool done;
Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r);
aio_read_from_cache(o, object_no, bl, len, off, onfinish, 0);
mylock.Lock();
while (!done)
cond.Wait(mylock);
mylock.Unlock();
return r;
}

void ImageCtx::user_flushed() {
if (object_cacher && cct->_conf->rbd_cache_writethrough_until_flush) {
md_lock.get_read();
Expand Down Expand Up @@ -698,14 +683,26 @@ class ThreadPoolSingleton : public ThreadPool {
flush_async_operations();

RWLock::RLocker owner_locker(owner_lock);
invalidate_cache();
invalidate_cache(true);
object_cacher->stop();
}

int ImageCtx::invalidate_cache() {
int ImageCtx::invalidate_cache(bool purge_on_error) {
int result;
C_SaferCond ctx;
invalidate_cache(&ctx);
return ctx.wait();
result = ctx.wait();

if (result && purge_on_error) {
cache_lock.Lock();
if (object_cacher != NULL) {
lderr(cct) << "invalidate cache met error " << cpp_strerror(result) << " !Purging cache..." << dendl;
object_cacher->purge_set(object_set);
}
cache_lock.Unlock();
}

return result;
}

void ImageCtx::invalidate_cache(Context *on_finish) {
Expand Down
4 changes: 1 addition & 3 deletions src/librbd/ImageCtx.h
Expand Up @@ -191,13 +191,11 @@ namespace librbd {
int fadvise_flags);
void write_to_cache(object_t o, const bufferlist& bl, size_t len,
uint64_t off, Context *onfinish, int fadvise_flags);
int read_from_cache(object_t o, uint64_t object_no, bufferlist *bl,
size_t len, uint64_t off);
void user_flushed();
void flush_cache_aio(Context *onfinish);
int flush_cache();
void shutdown_cache();
int invalidate_cache();
int invalidate_cache(bool purge_on_error=false);
void invalidate_cache(Context *on_finish);
void invalidate_cache_completion(int r, Context *on_finish);
void clear_nonexistence_cache();
Expand Down
145 changes: 82 additions & 63 deletions src/osdc/ObjectCacher.cc
Expand Up @@ -379,7 +379,7 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(OSDWrite *wr)

if (p->first < cur) {
assert(final == 0);
if (cur + max >= p->first + p->second->length()) {
if (cur + max >= bh->end()) {
// we want right bit (one splice)
final = split(bh, cur); // just split it, take right half.
++p;
Expand All @@ -393,7 +393,7 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(OSDWrite *wr)
}
} else {
assert(p->first == cur);
if (p->second->length() <= max) {
if (bh->length() <= max) {
// whole bufferhead, piece of cake.
} else {
// we want left bit (one splice)
Expand Down Expand Up @@ -886,6 +886,7 @@ void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid, loff_t start,
}
}

list <BufferHead*> hit;
// apply to bh's!
for (map<loff_t, BufferHead*>::iterator p = ob->data_lower_bound(start);
p != ob->data.end();
Expand Down Expand Up @@ -917,6 +918,7 @@ void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid, loff_t start,
if (r >= 0) {
// ok! mark bh clean and error-free
mark_clean(bh);
hit.push_back(bh);
ldout(cct, 10) << "bh_write_commit clean " << *bh << dendl;
} else {
mark_dirty(bh);
Expand All @@ -926,6 +928,13 @@ void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid, loff_t start,
}
}

for (list<BufferHead*>::iterator bh = hit.begin();
bh != hit.end();
++bh) {
assert(*bh);
ob->try_merge_bh(*bh);
}

// update last_commit.
assert(ob->last_commit_tid < tid);
ob->last_commit_tid = tid;
Expand Down Expand Up @@ -1060,6 +1069,13 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
map<uint64_t, bufferlist> stripe_map; // final buffer offset -> substring
bool dontneed = rd->fadvise_flags & LIBRADOS_OP_FLAG_FADVISE_DONTNEED;

/*
* WARNING: we can only meaningfully return ENOENT if the read request
* passed in a single ObjectExtent. Any caller who wants ENOENT instead of
* zeroed buffers needs to feed single extents into readx().
*/
assert(!oset->return_enoent || rd->extents.size() == 1);

for (vector<ObjectExtent>::iterator ex_it = rd->extents.begin();
ex_it != rd->extents.end();
++ex_it) {
Expand All @@ -1075,10 +1091,6 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,

// does not exist and no hits?
if (oset->return_enoent && !o->exists) {
// WARNING: we can only meaningfully return ENOENT if the read request
// passed in a single ObjectExtent. Any caller who wants ENOENT instead of
// zeroed buffers needs to feed single extents into readx().
assert(rd->extents.size() == 1);
ldout(cct, 10) << "readx object !exists, 1 extent..." << dendl;

// should we worry about COW underneaeth us?
Expand Down Expand Up @@ -1139,6 +1151,7 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,

if (!missing.empty() || !rx.empty()) {
// read missing
map<loff_t, BufferHead*>::iterator last = missing.end();
for (map<loff_t, BufferHead*>::iterator bh_it = missing.begin();
bh_it != missing.end();
++bh_it) {
Expand All @@ -1160,15 +1173,20 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
delete bh_it->second;
} else {
bh_read(bh_it->second, rd->fadvise_flags);
if (success && onfinish) {
ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
<< " off " << bh_it->first << dendl;
bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, oset, onfinish) );
}
if ((success && onfinish) || last != missing.end())
last = bh_it;
}
success = false;
}

//add wait in last bh avoid wakeup early. Because read is order
if (last != missing.end()) {
ldout(cct, 10) << "readx missed, waiting on " << *last->second
<< " off " << last->first << dendl;
last->second->waitfor_read[last->first].push_back( new C_RetryRead(this, rd, oset, onfinish) );

}

// bump rx
for (map<loff_t, BufferHead*>::iterator bh_it = rx.begin();
bh_it != rx.end();
Expand Down Expand Up @@ -1210,56 +1228,58 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
}
}

// create reverse map of buffer offset -> object for the eventual result.
// this is over a single ObjectExtent, so we know that
// - the bh's are contiguous
// - the buffer frags need not be (and almost certainly aren't)
loff_t opos = ex_it->offset;
map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
assert(bh_it->second->start() <= opos);
uint64_t bhoff = opos - bh_it->second->start();
vector<pair<uint64_t,uint64_t> >::iterator f_it = ex_it->buffer_extents.begin();
uint64_t foff = 0;
while (1) {
BufferHead *bh = bh_it->second;
assert(opos == (loff_t)(bh->start() + bhoff));

uint64_t len = MIN(f_it->second - foff, bh->length() - bhoff);
ldout(cct, 10) << "readx rmap opos " << opos
<< ": " << *bh << " +" << bhoff
<< " frag " << f_it->first << "~" << f_it->second << " +" << foff << "~" << len
<< dendl;
if (!error) {
// create reverse map of buffer offset -> object for the eventual result.
// this is over a single ObjectExtent, so we know that
// - the bh's are contiguous
// - the buffer frags need not be (and almost certainly aren't)
loff_t opos = ex_it->offset;
map<loff_t, BufferHead*>::iterator bh_it = hits.begin();
assert(bh_it->second->start() <= opos);
uint64_t bhoff = opos - bh_it->second->start();
vector<pair<uint64_t,uint64_t> >::iterator f_it = ex_it->buffer_extents.begin();
uint64_t foff = 0;
while (1) {
BufferHead *bh = bh_it->second;
assert(opos == (loff_t)(bh->start() + bhoff));

uint64_t len = MIN(f_it->second - foff, bh->length() - bhoff);
ldout(cct, 10) << "readx rmap opos " << opos
<< ": " << *bh << " +" << bhoff
<< " frag " << f_it->first << "~" << f_it->second << " +" << foff << "~" << len
<< dendl;

bufferlist bit; // put substr here first, since substr_of clobbers, and
// we may get multiple bh's at this stripe_map position
if (bh->is_zero()) {
bufferptr bp(len);
bp.zero();
stripe_map[f_it->first].push_back(bp);
} else {
bit.substr_of(bh->bl,
opos - bh->start(),
len);
stripe_map[f_it->first].claim_append(bit);
}

bufferlist bit; // put substr here first, since substr_of clobbers, and
// we may get multiple bh's at this stripe_map position
if (bh->is_zero()) {
bufferptr bp(len);
bp.zero();
stripe_map[f_it->first].push_back(bp);
} else {
bit.substr_of(bh->bl,
opos - bh->start(),
len);
stripe_map[f_it->first].claim_append(bit);
opos += len;
bhoff += len;
foff += len;
if (opos == bh->end()) {
++bh_it;
bhoff = 0;
}
if (foff == f_it->second) {
++f_it;
foff = 0;
}
if (bh_it == hits.end()) break;
if (f_it == ex_it->buffer_extents.end())
break;
}

opos += len;
bhoff += len;
foff += len;
if (opos == bh->end()) {
++bh_it;
bhoff = 0;
}
if (foff == f_it->second) {
++f_it;
foff = 0;
}
if (bh_it == hits.end()) break;
if (f_it == ex_it->buffer_extents.end())
break;
assert(f_it == ex_it->buffer_extents.end());
assert(opos == (loff_t)ex_it->offset + (loff_t)ex_it->length);
}
assert(f_it == ex_it->buffer_extents.end());
assert(opos == (loff_t)ex_it->offset + (loff_t)ex_it->length);

if (dontneed && o->include_all_cached_data(ex_it->offset, ex_it->length))
bottouch_ob(o);
Expand Down Expand Up @@ -1303,7 +1323,7 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
assert(rd->bl->length() == pos);
}
ldout(cct, 10) << "readx result is " << rd->bl->length() << dendl;
} else {
} else if (!error) {
ldout(cct, 10) << "readx no bufferlist ptr (readahead?), done." << dendl;
map<uint64_t,bufferlist>::reverse_iterator i = stripe_map.rbegin();
pos = i->first + i->second.length();
Expand Down Expand Up @@ -1334,8 +1354,7 @@ void ObjectCacher::retry_waiting_reads()
waitfor_read.splice(waitfor_read.end(), ls);
}

int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock,
Context *onfreespace)
int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace)
{
assert(lock.is_locked());
utime_t now = ceph_clock_now(cct);
Expand Down Expand Up @@ -1408,7 +1427,7 @@ int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock,
}
}

int r = _wait_for_write(wr, bytes_written, oset, wait_on_lock, onfreespace);
int r = _wait_for_write(wr, bytes_written, oset, onfreespace);
delete wr;

//verify_stats();
Expand Down Expand Up @@ -1456,7 +1475,7 @@ void ObjectCacher::maybe_wait_for_writeback(uint64_t len)
}

// blocking wait for write.
int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Mutex& lock, Context *onfreespace)
int ObjectCacher::_wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Context *onfreespace)
{
assert(lock.is_locked());
int ret = 0;
Expand Down
11 changes: 4 additions & 7 deletions src/osdc/ObjectCacher.h
Expand Up @@ -602,14 +602,12 @@ class ObjectCacher {
* the return value is total bytes read
*/
int readx(OSDRead *rd, ObjectSet *oset, Context *onfinish);
int writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock,
Context *onfreespace);
int writex(OSDWrite *wr, ObjectSet *oset, Context *onfreespace);
bool is_cached(ObjectSet *oset, vector<ObjectExtent>& extents, snapid_t snapid);

private:
// write blocking
int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Mutex& lock,
Context *onfreespace);
int _wait_for_write(OSDWrite *wr, uint64_t len, ObjectSet *oset, Context *onfreespace);
void maybe_wait_for_writeback(uint64_t len);
bool _flush_set_finish(C_GatherBuilder *gather, Context *onfinish);

Expand Down Expand Up @@ -678,11 +676,10 @@ class ObjectCacher {

int file_write(ObjectSet *oset, ceph_file_layout *layout, const SnapContext& snapc,
loff_t offset, uint64_t len,
bufferlist& bl, utime_t mtime, int flags,
Mutex& wait_on_lock) {
bufferlist& bl, utime_t mtime, int flags) {
OSDWrite *wr = prepare_write(snapc, bl, mtime, flags);
Striper::file_to_extents(cct, oset->ino, layout, offset, len, oset->truncate_size, wr->extents);
return writex(wr, oset, wait_on_lock, NULL);
return writex(wr, oset, NULL);
}

bool file_flush(ObjectSet *oset, ceph_file_layout *layout, const SnapContext& snapc,
Expand Down
2 changes: 1 addition & 1 deletion src/test/osdc/object_cacher_stress.cc
Expand Up @@ -112,7 +112,7 @@ int stress_test(uint64_t num_ops, uint64_t num_objs,
ObjectCacher::OSDWrite *wr = obc.prepare_write(snapc, bl, utime_t(), 0);
wr->extents.push_back(op->extent);
lock.Lock();
obc.writex(wr, &object_set, lock, NULL);
obc.writex(wr, &object_set, NULL);
lock.Unlock();
}
}
Expand Down

0 comments on commit 23fb811

Please sign in to comment.