Skip to content

Commit

Permalink
rgw_file: add timed namespace invalidation
Browse files Browse the repository at this point in the history
With change, librgw/rgw_file consumers can provide an invalidation
callback, which is used by the library to invalidate directories
whose contents should be forgotten.

The existing RGWLib GC mechanism is being used to drive this.  New
configuration params have been added.  The main configurable is
rgw_nfs_namespace_expire_secs, the expire timeout.

Updated post Yehuda review.

Fixes: http://tracker.ceph.com/issues/18651

Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
(cherry picked from commit deb2c1e)

Conflicts:
	src/rgw/rgw_lib_frontend.h - in class RGWLibProcess : public RGWProcess
                       there was no public method stop() in jewel (now there is)
  • Loading branch information
mattbenjamin authored and smithfarm committed Jan 29, 2017
1 parent 6b4bcd3 commit 8b124c8
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 48 deletions.
3 changes: 3 additions & 0 deletions src/common/config_opts.h
Expand Up @@ -1329,6 +1329,9 @@ OPTION(rgw_nfs_lru_lanes, OPT_INT, 5)
OPTION(rgw_nfs_lru_lane_hiwat, OPT_INT, 911)
OPTION(rgw_nfs_fhcache_partitions, OPT_INT, 3)
OPTION(rgw_nfs_fhcache_size, OPT_INT, 2017) /* 3*2017=6051 */
OPTION(rgw_nfs_namespace_expire_secs, OPT_INT, 300) /* namespace invalidate
* timer */
OPTION(rgw_nfs_max_gc, OPT_INT, 300) /* max gc events per cycle */
OPTION(rgw_nfs_write_completion_interval_s, OPT_INT, 10) /* stateless (V3)
* commit
* delay */
Expand Down
12 changes: 11 additions & 1 deletion src/include/rados/rgw_file.h
Expand Up @@ -26,7 +26,7 @@ extern "C" {

#define LIBRGW_FILE_VER_MAJOR 1
#define LIBRGW_FILE_VER_MINOR 1
#define LIBRGW_FILE_VER_EXTRA 0
#define LIBRGW_FILE_VER_EXTRA 1

#define LIBRGW_FILE_VERSION(maj, min, extra) ((maj << 16) + (min << 8) + extra)
#define LIBRGW_FILE_VERSION_CODE LIBRGW_FILE_VERSION(LIBRGW_FILE_VER_MAJOR, LIBRGW_FILE_VER_MINOR, LIBRGW_FILE_VER_EXTRA)
Expand Down Expand Up @@ -115,6 +115,16 @@ int rgw_mount(librgw_t rgw, const char *uid, const char *key,
const char *secret, struct rgw_fs **rgw_fs,
uint32_t flags);

/*
register invalidate callbacks
*/
#define RGW_REG_INVALIDATE_FLAG_NONE 0x0000

typedef void (*rgw_fh_callback_t)(void *handle, struct rgw_fh_hk fh_hk);

int rgw_register_invalidate(struct rgw_fs *rgw_fs, rgw_fh_callback_t cb,
void *arg, uint32_t flags);

/*
detach rgw namespace
*/
Expand Down
13 changes: 12 additions & 1 deletion src/rgw/librgw.cc
Expand Up @@ -13,6 +13,7 @@
*/
#include <sys/types.h>
#include <string.h>
#include <chrono>

#include "include/types.h"
#include "include/rados/librgw.h"
Expand Down Expand Up @@ -79,6 +80,8 @@ namespace rgw {
m_tp.drain(&req_wq);
}

#define MIN_EXPIRE_S 120

void RGWLibProcess::run()
{
/* write completion interval */
Expand All @@ -91,6 +94,14 @@ namespace rgw {
/* gc loop */
while (! shutdown) {
lsubdout(cct, rgw, 5) << "RGWLibProcess GC" << dendl;

/* dirent invalidate timeout--basically, the upper-bound on
* inconsistency with the S3 namespace */
auto expire_s = cct->_conf->rgw_nfs_namespace_expire_secs;

/* delay between gc cycles */
auto delay_s = std::max(1, std::min(MIN_EXPIRE_S, expire_s/2));

unique_lock uniq(mtx);
restart:
int cur_gen = gen;
Expand All @@ -105,7 +116,7 @@ namespace rgw {
goto restart; /* invalidated */
}
uniq.unlock();
std::this_thread::sleep_for(std::chrono::seconds(120));
std::this_thread::sleep_for(std::chrono::seconds(delay_s));
}
}

Expand Down
86 changes: 62 additions & 24 deletions src/rgw/rgw_file.cc
Expand Up @@ -80,22 +80,6 @@ namespace rgw {
using std::get;

LookupFHResult fhr{nullptr, 0};
#if 0
RGWFileHandle::directory* d = parent->get_directory();
if (! d->name_cache.empty()) {
RGWFileHandle::dirent_string name{path};
const auto& diter = d->name_cache.find(name);
if (diter != d->name_cache.end()) {
fhr = lookup_fh(parent, path,
RGWFileHandle::FLAG_CREATE|
((diter->second == RGW_FS_TYPE_DIRECTORY) ?
RGWFileHandle::FLAG_DIRECTORY :
RGWFileHandle::FLAG_NONE));
if (get<0>(fhr))
return fhr;
}
}
#endif

/* XXX the need for two round-trip operations to identify file or
* directory leaf objects is unecessary--the current proposed
Expand Down Expand Up @@ -628,13 +612,34 @@ namespace rgw {
rele();
} /* RGWLibFS::close */

std::ostream& operator<<(std::ostream &os, RGWLibFS::event const &ev) {
os << "<event:";
switch (ev.t) {
case RGWLibFS::event::type::READDIR:
os << "type=READDIR;";
break;
default:
os << "type=UNKNOWN;";
break;
};
os << "fid=" << ev.fhk.fh_hk.bucket << ":" << ev.fhk.fh_hk.object
<< ";ts=<timespec:" << ev.ts.tv_sec << ";" << ev.ts.tv_nsec << ">>";
return os;
}

void RGWLibFS::gc()
{
using std::get;
using directory = RGWFileHandle::directory;

static constexpr uint32_t max_ev = 24;
static constexpr uint16_t expire_s = 300; /* 5m */
/* dirent invalidate timeout--basically, the upper-bound on
* inconsistency with the S3 namespace */
auto expire_s
= get_context()->_conf->rgw_nfs_namespace_expire_secs;

/* max events to gc in one cycle */
uint32_t max_ev =
std::max(1, get_context()->_conf->rgw_nfs_max_gc);

struct timespec now;
event_vector ve;
Expand All @@ -645,11 +650,15 @@ namespace rgw {
do {
{
lock_guard guard(state.mtx); /* LOCKED */
/* just return if no events */
if (events.empty()) {
return;
}
uint32_t _max_ev =
(events.size() < 500) ? max_ev : (events.size() / 4);
for (uint32_t ix = 0; (ix < _max_ev) && (events.size() > 0); ++ix) {
event& ev = events.front();
if (ev.ts.tv_sec < (now.tv_sec + expire_s)) {
if (ev.ts.tv_sec > (now.tv_sec + expire_s)) {
stop = true;
break;
}
Expand All @@ -659,8 +668,12 @@ namespace rgw {
} /* anon */
/* !LOCKED */
for (auto& ev : ve) {
lsubdout(get_context(), rgw, 15)
<< "try-expire ev: " << ev << dendl;
if (likely(ev.t == event::type::READDIR)) {
RGWFileHandle* rgw_fh = lookup_handle(ev.fhk.fh_hk);
lsubdout(get_context(), rgw, 15)
<< "ev rgw_fh: " << rgw_fh << dendl;
if (rgw_fh) {
RGWFileHandle::directory* d;
if (unlikely(! rgw_fh->is_dir())) {
Expand All @@ -677,14 +690,15 @@ namespace rgw {
if (d) {
lock_guard guard(rgw_fh->mtx);
d->clear_state();
rgw_fh->invalidate();
}
rele:
unref(rgw_fh);
} /* rgw_fh */
} /* event::type::READDIR */
} /* ev */
std::this_thread::sleep_for(std::chrono::seconds(120));
} while (! stop);
ve.clear();
} while (! (stop || shutdown));
} /* RGWLibFS::gc */

void RGWFileHandle::encode_attrs(ceph::buffer::list& ux_key1,
Expand Down Expand Up @@ -719,7 +733,6 @@ namespace rgw {
int rc = 0;
struct timespec now;
CephContext* cct = fs->get_context();
directory* d = get_directory(); /* already type-checked */

(void) clock_gettime(CLOCK_MONOTONIC_COARSE, &now); /* !LOCKED */

Expand All @@ -734,8 +747,9 @@ namespace rgw {
offset);
rc = rgwlib.get_fe()->execute_req(&req);
if (! rc) {
set_nlink(2 + d->name_cache.size());
lock_guard guard(mtx);
state.atime = now;
set_nlink(2 + 1);
*eof = req.eof();
event ev(event::type::READDIR, get_key(), state.atime);
fs->state.push_event(ev);
Expand All @@ -745,8 +759,9 @@ namespace rgw {
RGWReaddirRequest req(cct, fs->get_user(), this, rcb, cb_arg, offset);
rc = rgwlib.get_fe()->execute_req(&req);
if (! rc) {
lock_guard guard(mtx);
state.atime = now;
set_nlink(2 + d->name_cache.size());
set_nlink(2 + 1);
*eof = req.eof();
event ev(event::type::READDIR, get_key(), state.atime);
fs->state.push_event(ev);
Expand Down Expand Up @@ -903,6 +918,18 @@ namespace rgw {
delete write_req;
}

void RGWFileHandle::directory::clear_state()
{
marker_cache.clear();
}

void RGWFileHandle::invalidate() {
RGWLibFS *fs = get_fs();
if (fs->invalidate_cb) {
fs->invalidate_cb(fs->invalidate_arg, get_key().fh_hk);
}
}

int RGWWriteRequest::exec_start() {
struct req_state* s = get_state();

Expand Down Expand Up @@ -1110,6 +1137,17 @@ extern "C" {
return 0;
}

/*
register invalidate callbacks
*/
int rgw_register_invalidate(struct rgw_fs *rgw_fs, rgw_fh_callback_t cb,
void *arg, uint32_t flags)

{
RGWLibFS *fs = static_cast<RGWLibFS*>(rgw_fs->fs_private);
return fs->register_invalidate(cb, arg, flags);
}

/*
detach rgw namespace
*/
Expand Down
50 changes: 28 additions & 22 deletions src/rgw/rgw_file.h
Expand Up @@ -169,7 +169,6 @@ namespace rgw {
using dirent_string = basic_sstring<char, uint16_t, 32>;

using marker_cache_t = flat_map<uint64_t, dirent_string>;
using name_cache_t = flat_map<dirent_string, uint8_t>;

struct State {
uint64_t dev;
Expand Down Expand Up @@ -199,19 +198,10 @@ namespace rgw {

uint32_t flags;
marker_cache_t marker_cache;
name_cache_t name_cache;

directory() : flags(FLAG_NONE) {}

void clear_state() {
marker_cache.clear();
name_cache.clear();
}

void set_overflow() {
clear_state();
flags |= FLAG_OVERFLOW;
}
void clear_state();
};

boost::variant<file, directory> variant_type;
Expand Down Expand Up @@ -463,18 +453,9 @@ namespace rgw {
// XXXX check for failure (dup key)
d->marker_cache.insert(
marker_cache_t::value_type(off, marker.data()));
/* 90% of directories hold <= 32 entries (Yifan Wang, CMU),
* but go big */
if (d->name_cache.size() < 128) {
d->name_cache.insert(
name_cache_t::value_type(marker.data(), obj_type));
} else {
d->set_overflow(); // too many
}
}
}

/* XXX */
std::string find_marker(uint64_t off) { // XXX copy
using std::get;
directory* d = get<directory>(&variant_type);
Expand Down Expand Up @@ -601,6 +582,8 @@ namespace rgw {
void decode_attrs(const ceph::buffer::list* ux_key1,
const ceph::buffer::list* ux_attrs1);

void invalidate();

virtual bool reclaim();

typedef cohort::lru::LRU<std::mutex> FhLRU;
Expand Down Expand Up @@ -698,6 +681,9 @@ namespace rgw {
CephContext* cct;
struct rgw_fs fs;
RGWFileHandle root_fh;
rgw_fh_callback_t invalidate_cb;
void *invalidate_arg;
bool shutdown;

mutable std::atomic<uint64_t> refcnt;

Expand Down Expand Up @@ -726,6 +712,9 @@ namespace rgw {
: t(t), fhk(k), ts(ts) {}
};

friend std::ostream& operator<<(std::ostream &os,
RGWLibFS::event const &ev);

using event_vector = /* boost::small_vector<event, 16> */
std::vector<event>;

Expand Down Expand Up @@ -753,7 +742,6 @@ namespace rgw {
State() : flags(0) {}

void push_event(const event& ev) {
lock_guard guard(mtx);
events.push_back(ev);
}
} state;
Expand All @@ -768,7 +756,8 @@ namespace rgw {

RGWLibFS(CephContext* _cct, const char *_uid, const char *_user_id,
const char* _key)
: cct(_cct), root_fh(this, get_inst()), refcnt(1),
: cct(_cct), root_fh(this, get_inst()), invalidate_cb(nullptr),
invalidate_arg(nullptr), shutdown(false), refcnt(1),
fh_cache(cct->_conf->rgw_nfs_fhcache_partitions,
cct->_conf->rgw_nfs_fhcache_size),
fh_lru(cct->_conf->rgw_nfs_lru_lanes,
Expand Down Expand Up @@ -808,6 +797,8 @@ namespace rgw {
intrusive_ptr_release(this);
}

void stop() { shutdown = true; }

void release_evict(RGWFileHandle* fh) {
/* remove from cache, releases sentinel ref */
fh_cache.remove(fh->fh.fh_hk.object, fh,
Expand Down Expand Up @@ -851,6 +842,12 @@ namespace rgw {
return ret;
} /* authorize */

int register_invalidate(rgw_fh_callback_t cb, void *arg, uint32_t flags) {
invalidate_cb = cb;
invalidate_arg = arg;
return 0;
}

/* find RGWFileHandle by id */
LookupFHResult lookup_fh(const fh_key& fhk,
const uint32_t flags = RGWFileHandle::FLAG_NONE) {
Expand Down Expand Up @@ -1038,6 +1035,15 @@ namespace rgw {
fh->mtx.unlock(); /* !LOCKED */
out:
lat.lock->unlock(); /* !LATCHED */

/* special case: lookup root_fh */
if (! fh) {
if (unlikely(fh_hk == root_fh.fh.fh_hk)) {
fh = &root_fh;
ref(fh);
}
}

return fh;
}

Expand Down
7 changes: 7 additions & 0 deletions src/rgw/rgw_lib_frontend.h
Expand Up @@ -33,6 +33,13 @@ namespace rgw {
void run();
void checkpoint();

void stop() {
shutdown = true;
for (const auto& fs: mounted_fs) {
fs.second->stop();
}
}

void register_fs(RGWLibFS* fs) {
lock_guard guard(mtx);
mounted_fs.insert(FSMAP::value_type(fs, fs));
Expand Down

0 comments on commit 8b124c8

Please sign in to comment.