Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mds: #11950: Persistent purge queue #12786

Merged
merged 33 commits into from Mar 8, 2017
Merged
Changes from 1 commit
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
6f6ef70
compact_set: add #includes for dependencies
Dec 1, 2016
e8c6e74
osdc/Journaler: assign a name for logging
Dec 1, 2016
58ec1c6
osdc/Journaler: remove incorrect assertion
Dec 1, 2016
50f4783
osdc/Filer: const fix for passed layouts
Dec 1, 2016
059a877
osdc/Journaler: add have_waiter()
Dec 1, 2016
050dc5c
common/lockdep: clearer log messages
Dec 8, 2016
1b36be9
osdc/Journaler: wrap recover() completion in finisher
Dec 1, 2016
6cc0130
mds: const methods on SnapRealm
Dec 19, 2016
459c745
mds: const snaprealm getters on CInode
Dec 19, 2016
8ebf7d9
mds: use a persistent queue for purging deleted files
Dec 1, 2016
7189b53
mds: move PurgeQueue up to MDSRank
Dec 1, 2016
8a1b3e1
mds: move throttling code out of StrayManager
Dec 5, 2016
ed4c7cb
mds: move dir purge and truncate into purgequeue
Dec 19, 2016
d96d0b0
mds: add stats to PurgeQueue
Dec 21, 2016
f2fb187
mds: implement PurgeQueue throttling
Dec 23, 2016
3970502
qa: update test_strays for purgequeue
Dec 23, 2016
0c9a69a
mds: wait for purgequeue on rank shutdown
Dec 23, 2016
f826c7e
qa/cephfs: add TestStrays.test_purge_on_shutdown
Dec 24, 2016
3e66de2
mds: create purge queue if it's not found
Dec 25, 2016
4427aed
mds: update PurgeQueue for single-ack OSD change
Feb 2, 2017
0952ce9
mds: expose progress during PurgeQueue drain
Feb 8, 2017
6d59f15
mds: add error handling in PurgeQueue
Feb 11, 2017
199e5b4
osdc: remove Journaler "journaler_batch_*" settings
Feb 12, 2017
8d4f6b9
osdc: less aggressive prefetch in read/write Journaler
Feb 13, 2017
335bdc1
mds: update for removing Timer from Journaler
Feb 13, 2017
0933f61
mds: remove unnecessary flush() from PurgeQueue
Feb 13, 2017
92bf85e
osdc: expose Journaler::write_head_needed
Feb 13, 2017
207846f
mds: write_head when reading in PurgeQueue
Feb 13, 2017
6cf9c29
qa: add TestStrays.test_purge_queue_op_rate
Feb 13, 2017
5df0022
mds: flush PQ even when not consuming
Mar 1, 2017
1a19510
qa: update TestFlush for changed stray perf counters
Mar 3, 2017
41f8ded
qa: update TestDamage for PurgeQueue
Mar 3, 2017
1777333
mds: handle Journaler::recover errors in PurgeQueue
Mar 3, 2017
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.
+427 −33
Diff settings

Always

Just for now

mds: use a persistent queue for purging deleted files

To avoid creating stray directories of unbounded size
and all the associated pain, use a more appropriate
datastructure to store a FIFO of inodes that need
purging.

Fixes: http://tracker.ceph.com/issues/11950
Signed-off-by: John Spray <john.spray@redhat.com>
  • Loading branch information...
John Spray
John Spray committed Dec 1, 2016
commit 8ebf7d95a9071de24bb1e56a6423c505169cb4de
@@ -11,6 +11,7 @@ set(mds_srcs
MDCache.cc
RecoveryQueue.cc
StrayManager.cc
PurgeQueue.cc
Locker.cc
Migrator.cc
MDBalancer.cc
@@ -461,6 +461,8 @@ void MDCache::create_mydir_hierarchy(MDSGather *gather)
mydir->commit(0, gather->new_sub());

myin->store(gather->new_sub());

stray_manager.purge_queue.create(new C_IO_Wrapper(mds, gather->new_sub()));
}

struct C_MDC_CreateSystemFile : public MDCacheLogContext {
@@ -584,8 +586,15 @@ void MDCache::open_root_inode(MDSInternalContextBase *c)

void MDCache::open_mydir_inode(MDSInternalContextBase *c)
{
MDSGatherBuilder gather(g_ceph_context);

CInode *in = create_system_inode(MDS_INO_MDSDIR(mds->get_nodeid()), S_IFDIR|0755); // initially inaccurate!
in->fetch(c);
in->fetch(gather.new_sub());

stray_manager.purge_queue.open(new C_IO_Wrapper(mds, gather.new_sub()));

gather.set_finisher(c);
gather.activate();
}

void MDCache::open_root()
@@ -73,7 +73,7 @@ enum {
l_mdc_first = 3000,
// How many inodes currently in stray dentries
l_mdc_num_strays,
// How many stray dentries are currently being purged
// How many stray dentries are currently enqueued for purge
l_mdc_num_strays_purging,
// How many stray dentries are currently delayed for purge due to refs
l_mdc_num_strays_delayed,
@@ -159,6 +159,8 @@ void MDSRankDispatcher::init()

progress_thread.create("mds_rank_progr");

mdcache->stray_manager.purge_queue.init();

finisher->start();
}

@@ -239,6 +241,8 @@ void MDSRankDispatcher::shutdown()
// shut down cache
mdcache->shutdown();

mdcache->stray_manager.purge_queue.shutdown();

if (objecter->initialized.read())
objecter->shutdown();

@@ -0,0 +1,284 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2015 Red Hat
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/

#include "common/debug.h"
#include "mds/mdstypes.h"
#include "mds/CInode.h"

#include "PurgeQueue.h"


#define dout_context cct
#define dout_subsys ceph_subsys_mds
#undef dout_prefix
#define dout_prefix _prefix(_dout, rank) << __func__ << ": "
static ostream& _prefix(std::ostream *_dout, mds_rank_t rank) {
return *_dout << "mds." << rank << ".purge_queue ";
}

void PurgeItem::encode(bufferlist &bl) const
{
ENCODE_START(1, 1, bl);
::encode(ino, bl);
::encode(size, bl);
::encode(layout, bl, CEPH_FEATURE_FS_FILE_LAYOUT_V2);
::encode(old_pools, bl);
::encode(snapc, bl);
ENCODE_FINISH(bl);
}

void PurgeItem::decode(bufferlist::iterator &p)
{
DECODE_START(1, p);
::decode(ino, p);
::decode(size, p);
::decode(layout, p);
::decode(old_pools, p);
::decode(snapc, p);
DECODE_FINISH(p);
}

// TODO: implement purge queue creation on startup
// if we are on a filesystem created before purge queues existed
// TODO: ensure that a deactivating MDS rank blocks
// on complete drain of this queue before finishing
// TODO: when we're deactivating, lift all limits on
// how many OSD ops we're allowed to emit at a time to
// race through the queue as fast as we can.
// TODO: populate logger here to gather latency stat?
// ...and a stat for the size of the queue, if we can
// somehow track that? Could do an initial pass through
// the whole queue to count the items at startup?
// TODO: there is absolutely no reason to consume an inode number
// for this. Shoudl just give objects a string name with a rank
// suffix, like we do for MDSTables. Requires a little refactor
// of Journaler.
PurgeQueue::PurgeQueue(
CephContext *cct_,
mds_rank_t rank_,
const int64_t metadata_pool_,
Objecter *objecter_)
:
cct(cct_),
rank(rank_),
lock("PurgeQueue"),
metadata_pool(metadata_pool_),
finisher(cct, "PurgeQueue", "PQ_Finisher"),
timer(cct, lock),
filer(objecter_, &finisher),
objecter(objecter_),
journaler("pq", MDS_INO_PURGE_QUEUE + rank, metadata_pool,
CEPH_FS_ONDISK_MAGIC, objecter_, nullptr, 0, &timer,
&finisher)
{
}

void PurgeQueue::init()
{
Mutex::Locker l(lock);

finisher.start();
timer.init();
}

void PurgeQueue::shutdown()
{
Mutex::Locker l(lock);

journaler.shutdown();
timer.shutdown();
finisher.stop();
}

void PurgeQueue::open(Context *completion)
{
dout(4) << "opening" << dendl;

Mutex::Locker l(lock);

journaler.recover(new FunctionContext([this, completion](int r){
Mutex::Locker l(lock);
dout(4) << "open complete" << dendl;
if (r == 0) {
journaler.set_writeable();
}
completion->complete(r);
}));
}

void PurgeQueue::create(Context *fin)
{
dout(4) << "creating" << dendl;
Mutex::Locker l(lock);

file_layout_t layout = file_layout_t::get_default();
layout.pool_id = metadata_pool;
journaler.set_writeable();
journaler.create(&layout, JOURNAL_FORMAT_RESILIENT);
journaler.write_head(fin);
}

void PurgeQueue::push(const PurgeItem &pi, Context *completion)
{
dout(4) << "pushing inode 0x" << std::hex << pi.ino << std::dec << dendl;
Mutex::Locker l(lock);

// Callers should have waited for open() before using us
assert(!journaler.is_readonly());

bufferlist bl;

::encode(pi, bl);
journaler.append_entry(bl);

// Note that flush calls are not 1:1 with IOs, Journaler
// does its own batching. So we just call every time.
// FIXME: *actually* as soon as we call _consume it will
// do a flush via _issue_read, so we really are doing one
// write per event. Avoid this by avoiding doing the journaler
// read (see "if we could consume this PurgeItem immediately...")
journaler.flush(completion);

// Maybe go ahead and do something with it right away
_consume();

// TODO: if we could consume this PurgeItem immediately, and
// Journaler does not have any outstanding prefetches, then
// short circuit its read by advancing read_pos to write_pos
// and passing the PurgeItem straight into _execute_item
}

bool PurgeQueue::can_consume()
{
// TODO: enforce limits (currently just allowing one in flight)
if (in_flight.size() > 0) {
return false;
} else {
return true;
}
}

void PurgeQueue::_consume()
{
assert(lock.is_locked_by_me());

// Because we are the writer and the reader of the journal
// via the same Journaler instance, we never need to reread_head

if (!can_consume()) {
dout(10) << " cannot consume right now" << dendl;

return;
}

if (!journaler.is_readable()) {
dout(10) << " not readable right now" << dendl;
if (!journaler.have_waiter()) {
journaler.wait_for_readable(new FunctionContext([this](int r) {
Mutex::Locker l(lock);
if (r == 0) {
_consume();
}
}));
}

return;
}

// The journaler is readable: consume an entry
bufferlist bl;
bool readable = journaler.try_read_entry(bl);
assert(readable); // we checked earlier

dout(20) << " decoding entry" << dendl;
PurgeItem item;
bufferlist::iterator q = bl.begin();
::decode(item, q);
dout(20) << " executing item (0x" << std::hex << item.ino
<< std::dec << ")" << dendl;
_execute_item(item, journaler.get_read_pos());
}

void PurgeQueue::_execute_item(
const PurgeItem &item,
uint64_t expire_to)
{
assert(lock.is_locked_by_me());

in_flight[expire_to] = item;

// TODO: handle things other than normal file purges
// (directories, snapshot truncates)
C_GatherBuilder gather(cct);
if (item.size > 0) {
uint64_t num = Striper::get_num_objects(item.layout, item.size);
dout(10) << " 0~" << item.size << " objects 0~" << num
<< " snapc " << item.snapc << " on " << item.ino << dendl;
filer.purge_range(item.ino, &item.layout, item.snapc,
0, num, ceph::real_clock::now(), 0,
gather.new_sub());
}

// remove the backtrace object if it was not purged
object_t oid = CInode::get_object_name(item.ino, frag_t(), "");
if (!gather.has_subs() || !item.layout.pool_ns.empty()) {
object_locator_t oloc(item.layout.pool_id);
dout(10) << " remove backtrace object " << oid
<< " pool " << oloc.pool << " snapc " << item.snapc << dendl;
objecter->remove(oid, oloc, item.snapc,
ceph::real_clock::now(), 0,
NULL, gather.new_sub());
}

// remove old backtrace objects
for (const auto &p : item.old_pools) {
object_locator_t oloc(p);
dout(10) << " remove backtrace object " << oid
<< " old pool " << p << " snapc " << item.snapc << dendl;
objecter->remove(oid, oloc, item.snapc,
ceph::real_clock::now(), 0,
NULL, gather.new_sub());
}
assert(gather.has_subs());

gather.set_finisher(new FunctionContext([this, expire_to](int r){
execute_item_complete(expire_to);
}));
gather.activate();
}

void PurgeQueue::execute_item_complete(
uint64_t expire_to)
{
dout(10) << "complete at 0x" << std::hex << expire_to << std::dec << dendl;
Mutex::Locker l(lock);
assert(in_flight.count(expire_to) == 1);

auto iter = in_flight.find(expire_to);
assert(iter != in_flight.end());
if (iter == in_flight.begin()) {
// This was the lowest journal position in flight, so we can now
// safely expire the journal up to here.
journaler.set_expire_pos(expire_to);

This comment has been minimized.

Copy link
@gregsfortytwo

gregsfortytwo Jan 12, 2017

Member

We probably want to expire to the next item in the list, so that when we have out-of-order completions we do them all ASAP.

This comment has been minimized.

Copy link
@jcsp

jcsp Feb 11, 2017

Author Contributor

The expire_to value passed into _execute_item from _consume is the journal read pos after the entry has been read, i.e. the next entry -- so I think we're already doing this

journaler.trim();
}

dout(10) << "completed item for ino 0x" << std::hex << iter->second.ino
<< std::dec << dendl;

in_flight.erase(iter);

_consume();
}

Oops, something went wrong.
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.