Skip to content

Commit

Permalink
mds: add error handling in PurgeQueue
Browse files Browse the repository at this point in the history
For decode errors, and for Journaler errors.
Both are considered damage to the MDS rank, as
with other per-rank data structures.

Signed-off-by: John Spray <john.spray@redhat.com>
  • Loading branch information
John Spray committed Feb 11, 2017
1 parent f7cdd75 commit b4ba350
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 4 deletions.
14 changes: 13 additions & 1 deletion src/mds/MDSRank.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,19 @@ MDSRank::MDSRank(
state(MDSMap::STATE_BOOT),
stopping(false),
purge_queue(g_ceph_context, whoami_,
mdsmap_->get_metadata_pool(), objecter),
mdsmap_->get_metadata_pool(), objecter,
new FunctionContext(
[this](int r){
// Purge Queue operates inside mds_lock when we're calling into
// it, and outside when in background, so must handle both cases.
if (mds_lock.is_locked_by_me()) {
damaged();
} else {
damaged_unlocked();
}
}
)
),
progress_thread(this), dispatch_depth(0),
hb(NULL), last_tid(0), osd_epoch_barrier(0), beacon(beacon_),
mds_slow_req_count(0),
Expand Down
16 changes: 14 additions & 2 deletions src/mds/PurgeQueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ PurgeQueue::PurgeQueue(
CephContext *cct_,
mds_rank_t rank_,
const int64_t metadata_pool_,
Objecter *objecter_)
Objecter *objecter_,
Context *on_error_)
:
cct(cct_),
rank(rank_),
Expand All @@ -73,11 +74,16 @@ PurgeQueue::PurgeQueue(
journaler("pq", MDS_INO_PURGE_QUEUE + rank, metadata_pool,
CEPH_FS_ONDISK_MAGIC, objecter_, nullptr, 0, &timer,
&finisher),
on_error(on_error_),
ops_in_flight(0),
max_purge_ops(0),
drain_initial(0),
draining(false)
{
assert(cct != nullptr);
assert(on_error != nullptr);
assert(objecter != nullptr);
journaler.set_write_error_handler(on_error);
}

PurgeQueue::~PurgeQueue()
Expand Down Expand Up @@ -264,7 +270,13 @@ void PurgeQueue::_consume()
dout(20) << " decoding entry" << dendl;
PurgeItem item;
bufferlist::iterator q = bl.begin();
::decode(item, q);
try {
::decode(item, q);
} catch (const buffer::error &err) {
derr << "Decode error at read_pos=0x" << std::hex
<< journaler.get_read_pos() << dendl;
on_error->complete(0);
}
dout(20) << " executing item (0x" << std::hex << item.ino
<< std::dec << ")" << dendl;
_execute_item(item, journaler.get_read_pos());
Expand Down
6 changes: 5 additions & 1 deletion src/mds/PurgeQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ class PurgeQueue

Journaler journaler;

Context *on_error;

// Map of Journaler offset to PurgeItem
std::map<uint64_t, PurgeItem> in_flight;

Expand Down Expand Up @@ -118,6 +120,7 @@ class PurgeQueue
void execute_item_complete(
uint64_t expire_to);


public:
void init();
void shutdown();
Expand Down Expand Up @@ -164,7 +167,8 @@ class PurgeQueue
CephContext *cct_,
mds_rank_t rank_,
const int64_t metadata_pool_,
Objecter *objecter_);
Objecter *objecter_,
Context *on_error);
~PurgeQueue();
};

Expand Down

0 comments on commit b4ba350

Please sign in to comment.