Skip to content

Commit

Permalink
Merge pull request #5280: librbd: add valgrind memory checks for unit…
Browse files Browse the repository at this point in the history
… tests

Reviewed-by: Loic Dachary <ldachary@redhat.com>
  • Loading branch information
ldachary committed Jul 19, 2015
2 parents d20f513 + f99f312 commit 391f753
Show file tree
Hide file tree
Showing 23 changed files with 233 additions and 92 deletions.
6 changes: 2 additions & 4 deletions qa/workunits/cephtool/test.sh
Expand Up @@ -315,10 +315,8 @@ function test_tiering()
# make sure we can't clobber snapshot state
ceph osd pool create snap_base 2
ceph osd pool create snap_cache 2
rbd -p snap_cache create foo --size 10
rbd -p snap_cache snap create foo --snap snap1
rbd -p snap_cache snap rm foo --snap snap1
expect_false ceph osd tier add snap_base snap_cache --force-nonempty
ceph osd pool mksnap snap_cache snapname
expect_false ceph osd tier add snap_base snap_cache
ceph osd pool delete snap_base snap_base --yes-i-really-really-mean-it
ceph osd pool delete snap_cache snap_cache --yes-i-really-really-mean-it

Expand Down
6 changes: 6 additions & 0 deletions qa/workunits/mon/crush_ops.sh
Expand Up @@ -63,6 +63,12 @@ ceph osd tree | grep -c host1 | grep -q 0

expect_false ceph osd crush rm bar # not empty
ceph osd crush unlink host2

# reference foo and bar with a rule
ceph osd crush rule create-simple foo-rule foo host firstn
expect_false ceph osd crush rm foo
ceph osd crush rule rm foo-rule

ceph osd crush rm bar
ceph osd crush rm foo
ceph osd crush rm osd.$o2 host2
Expand Down
10 changes: 10 additions & 0 deletions src/common/config_opts.h
Expand Up @@ -263,6 +263,10 @@ OPTION(mon_sync_provider_kill_at, OPT_INT, 0) // kill the sync provider at a sp
OPTION(mon_sync_requester_kill_at, OPT_INT, 0) // kill the sync requester at a specific point in the work flow
OPTION(mon_force_quorum_join, OPT_BOOL, false) // force monitor to join quorum even if it has been previously removed from the map
OPTION(mon_keyvaluedb, OPT_STR, "leveldb") // type of keyvaluedb backend

// UNSAFE -- TESTING ONLY! Allows addition of a cache tier with preexisting snaps
OPTION(mon_debug_unsafe_allow_tier_with_nonempty_snaps, OPT_BOOL, false)

OPTION(paxos_stash_full_interval, OPT_INT, 25) // how often (in commits) to stash a full copy of the PaxosService state
OPTION(paxos_max_join_drift, OPT_INT, 10) // max paxos iterations before we must first sync the monitor stores
OPTION(paxos_propose_interval, OPT_DOUBLE, 1.0) // gather updates for this long before proposing a map update
Expand Down Expand Up @@ -564,16 +568,22 @@ OPTION(osd_recover_clone_overlap_limit, OPT_INT, 10)
OPTION(osd_backfill_scan_min, OPT_INT, 64)
OPTION(osd_backfill_scan_max, OPT_INT, 512)
OPTION(osd_op_thread_timeout, OPT_INT, 15)
OPTION(osd_op_thread_suicide_timeout, OPT_INT, 150)
OPTION(osd_recovery_thread_timeout, OPT_INT, 30)
OPTION(osd_recovery_thread_suicide_timeout, OPT_INT, 300)
OPTION(osd_snap_trim_thread_timeout, OPT_INT, 60*60*1)
OPTION(osd_snap_trim_thread_suicide_timeout, OPT_INT, 60*60*10)
OPTION(osd_snap_trim_sleep, OPT_FLOAT, 0)
OPTION(osd_scrub_thread_timeout, OPT_INT, 60)
OPTION(osd_scrub_thread_suicide_timeout, OPT_INT, 60)
OPTION(osd_scrub_finalize_thread_timeout, OPT_INT, 60*10)
OPTION(osd_scrub_invalid_stats, OPT_BOOL, true)
OPTION(osd_remove_thread_timeout, OPT_INT, 60*60)
OPTION(osd_remove_thread_suicide_timeout, OPT_INT, 10*60*60)
OPTION(osd_command_thread_timeout, OPT_INT, 10*60)
OPTION(osd_age, OPT_FLOAT, .8)
OPTION(osd_age_time, OPT_INT, 0)
OPTION(osd_command_thread_suicide_timeout, OPT_INT, 15*60)
OPTION(osd_heartbeat_addr, OPT_ADDR, entity_addr_t())
OPTION(osd_heartbeat_interval, OPT_INT, 6) // (seconds) how often we ping peers
OPTION(osd_heartbeat_grace, OPT_INT, 20) // (seconds) how long before we decide a peer has failed
Expand Down
27 changes: 27 additions & 0 deletions src/crush/CrushWrapper.cc
Expand Up @@ -192,6 +192,9 @@ bool CrushWrapper::_maybe_remove_last_instance(CephContext *cct, int item, bool
if (_search_item_exists(item)) {
return false;
}
if (item < 0 && _bucket_is_in_use(cct, item)) {
return false;
}

if (item < 0 && !unlink_only) {
crush_bucket *t = get_bucket(item);
Expand Down Expand Up @@ -219,6 +222,9 @@ int CrushWrapper::remove_item(CephContext *cct, int item, bool unlink_only)
<< " items, not empty" << dendl;
return -ENOTEMPTY;
}
if (_bucket_is_in_use(cct, item)) {
return -EBUSY;
}
}

for (int i = 0; i < crush->max_buckets; i++) {
Expand Down Expand Up @@ -258,6 +264,22 @@ bool CrushWrapper::_search_item_exists(int item) const
return false;
}

bool CrushWrapper::_bucket_is_in_use(CephContext *cct, int item)
{
for (unsigned i = 0; i < crush->max_rules; ++i) {
crush_rule *r = crush->rules[i];
if (!r)
continue;
for (unsigned j = 0; j < r->len; ++j) {
if (r->steps[j].op == CRUSH_RULE_TAKE &&
r->steps[j].arg1 == item) {
return true;
}
}
}
return false;
}

int CrushWrapper::_remove_item_under(CephContext *cct, int item, int ancestor, bool unlink_only)
{
ldout(cct, 5) << "_remove_item_under " << item << " under " << ancestor
Expand Down Expand Up @@ -293,6 +315,11 @@ int CrushWrapper::remove_item_under(CephContext *cct, int item, int ancestor, bo
{
ldout(cct, 5) << "remove_item_under " << item << " under " << ancestor
<< (unlink_only ? " unlink_only":"") << dendl;

if (!unlink_only && _bucket_is_in_use(cct, item)) {
return -EBUSY;
}

int ret = _remove_item_under(cct, item, ancestor, unlink_only);
if (ret < 0)
return ret;
Expand Down
1 change: 1 addition & 0 deletions src/crush/CrushWrapper.h
Expand Up @@ -587,6 +587,7 @@ class CrushWrapper {
private:
bool _maybe_remove_last_instance(CephContext *cct, int id, bool unlink_only);
int _remove_item_under(CephContext *cct, int id, int ancestor, bool unlink_only);
bool _bucket_is_in_use(CephContext *cct, int id);
public:
int remove_item_under(CephContext *cct, int id, int ancestor, bool unlink_only);

Expand Down
11 changes: 9 additions & 2 deletions src/crush/mapper.c
Expand Up @@ -839,8 +839,15 @@ int crush_do_rule(const struct crush_map *map,

switch (curstep->op) {
case CRUSH_RULE_TAKE:
w[0] = curstep->arg1;
wsize = 1;
if ((curstep->arg1 >= 0 &&
curstep->arg1 < map->max_devices) ||
(-1-curstep->arg1 < map->max_buckets &&
map->buckets[-1-curstep->arg1])) {
w[0] = curstep->arg1;
wsize = 1;
} else {
dprintk(" bad take value %d\n", curstep->arg1);
}
break;

case CRUSH_RULE_SET_CHOOSE_TRIES:
Expand Down
7 changes: 4 additions & 3 deletions src/include/krbd.h
Expand Up @@ -13,14 +13,15 @@
#ifndef CEPH_KRBD_H
#define CEPH_KRBD_H

#include "rados/librados.h"

#ifdef __cplusplus
extern "C" {
#endif

struct krbd_ctx;
struct CephContext;

int krbd_create_from_context(struct CephContext *cct, struct krbd_ctx **pctx);
int krbd_create_from_context(rados_config_t cct, struct krbd_ctx **pctx);
void krbd_destroy(struct krbd_ctx *ctx);

int krbd_map(struct krbd_ctx *ctx, const char *pool, const char *image,
Expand All @@ -38,7 +39,7 @@ namespace ceph {
class Formatter;
}

int krbd_showmapped(struct krbd_ctx *ctx, Formatter *f);
int krbd_showmapped(struct krbd_ctx *ctx, ceph::Formatter *f);

#endif /* __cplusplus */

Expand Down
5 changes: 3 additions & 2 deletions src/krbd.cc
Expand Up @@ -34,6 +34,7 @@
#include "common/TextTable.h"
#include "include/assert.h"
#include "include/stringify.h"
#include "include/krbd.h"
#include "mon/MonMap.h"

#include <blkid/blkid.h>
Expand Down Expand Up @@ -582,12 +583,12 @@ int dump_images(struct krbd_ctx *ctx, Formatter *f)
return r;
}

extern "C" int krbd_create_from_context(struct CephContext *cct,
extern "C" int krbd_create_from_context(rados_config_t cct,
struct krbd_ctx **pctx)
{
struct krbd_ctx *ctx = new struct krbd_ctx();

ctx->cct = cct;
ctx->cct = reinterpret_cast<CephContext *>(cct);
ctx->udev = udev_new();
if (!ctx->udev) {
delete ctx;
Expand Down
1 change: 1 addition & 0 deletions src/librbd/TaskFinisher.h
Expand Up @@ -35,6 +35,7 @@ class TaskFinisher {
delete m_safe_timer;
}

m_finisher->wait_for_empty();
m_finisher->stop();
delete m_finisher;
}
Expand Down
4 changes: 3 additions & 1 deletion src/mon/OSDMonitor.cc
Expand Up @@ -6121,7 +6121,9 @@ bool OSDMonitor::prepare_command_impl(MMonCommand *m,
err = -ENOTSUP;
goto reply;
}
if (!tp->removed_snaps.empty() || !tp->snaps.empty()) {
if ((!tp->removed_snaps.empty() || !tp->snaps.empty()) &&
((force_nonempty != "--force-nonempty") ||
(!g_conf->mon_debug_unsafe_allow_tier_with_nonempty_snaps))) {
ss << "tier pool '" << tierpoolstr << "' has snapshot state; it cannot be added as a tier without breaking the pool";
err = -ENOTEMPTY;
goto reply;
Expand Down
50 changes: 41 additions & 9 deletions src/osd/OSD.cc
Expand Up @@ -1543,9 +1543,17 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
op_tracker(cct, cct->_conf->osd_enable_op_tracker,
cct->_conf->osd_num_op_tracker_shard),
test_ops_hook(NULL),
op_shardedwq(cct->_conf->osd_op_num_shards, this,
cct->_conf->osd_op_thread_timeout, &osd_op_tp),
peering_wq(this, cct->_conf->osd_op_thread_timeout, &osd_tp),
op_shardedwq(
cct->_conf->osd_op_num_shards,
this,
cct->_conf->osd_op_thread_timeout,
cct->_conf->osd_op_thread_suicide_timeout,
&osd_op_tp),
peering_wq(
this,
cct->_conf->osd_op_thread_timeout,
cct->_conf->osd_op_thread_suicide_timeout,
&osd_tp),
map_lock("OSD::map_lock"),
pg_map_lock("OSD::pg_map_lock"),
debug_drop_pg_create_probability(cct->_conf->osd_debug_drop_pg_create_probability),
Expand All @@ -1557,14 +1565,38 @@ OSD::OSD(CephContext *cct_, ObjectStore *store_,
pg_stat_queue_lock("OSD::pg_stat_queue_lock"),
osd_stat_updated(false),
pg_stat_tid(0), pg_stat_tid_flushed(0),
command_wq(this, cct->_conf->osd_command_thread_timeout, &command_tp),
command_wq(
this,
cct->_conf->osd_command_thread_timeout,
cct->_conf->osd_command_thread_suicide_timeout,
&command_tp),
recovery_ops_active(0),
recovery_wq(this, cct->_conf->osd_recovery_thread_timeout, &recovery_tp),
recovery_wq(
this,
cct->_conf->osd_recovery_thread_timeout,
cct->_conf->osd_recovery_thread_suicide_timeout,
&recovery_tp),
replay_queue_lock("OSD::replay_queue_lock"),
snap_trim_wq(this, cct->_conf->osd_snap_trim_thread_timeout, &disk_tp),
scrub_wq(this, cct->_conf->osd_scrub_thread_timeout, &disk_tp),
rep_scrub_wq(this, cct->_conf->osd_scrub_thread_timeout, &disk_tp),
remove_wq(store, cct->_conf->osd_remove_thread_timeout, &disk_tp),
snap_trim_wq(
this,
cct->_conf->osd_snap_trim_thread_timeout,
cct->_conf->osd_snap_trim_thread_suicide_timeout,
&disk_tp),
scrub_wq(
this,
cct->_conf->osd_scrub_thread_timeout,
cct->_conf->osd_scrub_thread_suicide_timeout,
&disk_tp),
rep_scrub_wq(
this,
cct->_conf->osd_scrub_thread_timeout,
cct->_conf->osd_scrub_thread_suicide_timeout,
&disk_tp),
remove_wq(
store,
cct->_conf->osd_remove_thread_timeout,
cct->_conf->osd_remove_thread_suicide_timeout,
&disk_tp),
service(this)
{
monc->set_messenger(client_messenger);
Expand Down
32 changes: 16 additions & 16 deletions src/osd/OSD.h
Expand Up @@ -1452,8 +1452,8 @@ class OSD : public Dispatcher,
uint32_t num_shards;

public:
ShardedOpWQ(uint32_t pnum_shards, OSD *o, time_t ti, ShardedThreadPool* tp):
ShardedThreadPool::ShardedWQ < pair <PGRef, OpRequestRef> >(ti, ti*10, tp),
ShardedOpWQ(uint32_t pnum_shards, OSD *o, time_t ti, time_t si, ShardedThreadPool* tp):
ShardedThreadPool::ShardedWQ < pair <PGRef, OpRequestRef> >(ti, si, tp),
osd(o), num_shards(pnum_shards) {
for(uint32_t i = 0; i < num_shards; i++) {
char lock_name[32] = {0};
Expand Down Expand Up @@ -1563,9 +1563,9 @@ class OSD : public Dispatcher,
list<PG*> peering_queue;
OSD *osd;
set<PG*> in_use;
PeeringWQ(OSD *o, time_t ti, ThreadPool *tp)
PeeringWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
: ThreadPool::BatchWorkQueue<PG>(
"OSD::PeeringWQ", ti, ti*10, tp), osd(o) {}
"OSD::PeeringWQ", ti, si, tp), osd(o) {}

void _dequeue(PG *pg) {
for (list<PG*>::iterator i = peering_queue.begin();
Expand Down Expand Up @@ -1944,8 +1944,8 @@ class OSD : public Dispatcher,
list<Command*> command_queue;
struct CommandWQ : public ThreadPool::WorkQueue<Command> {
OSD *osd;
CommandWQ(OSD *o, time_t ti, ThreadPool *tp)
: ThreadPool::WorkQueue<Command>("OSD::CommandWQ", ti, 0, tp), osd(o) {}
CommandWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
: ThreadPool::WorkQueue<Command>("OSD::CommandWQ", ti, si, tp), osd(o) {}

bool _empty() {
return osd->command_queue.empty();
Expand Down Expand Up @@ -1998,8 +1998,8 @@ class OSD : public Dispatcher,

struct RecoveryWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
RecoveryWQ(OSD *o, time_t ti, ThreadPool *tp)
: ThreadPool::WorkQueue<PG>("OSD::RecoveryWQ", ti, ti*10, tp), osd(o) {}
RecoveryWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
: ThreadPool::WorkQueue<PG>("OSD::RecoveryWQ", ti, si, tp), osd(o) {}

bool _empty() {
return osd->recovery_queue.empty();
Expand Down Expand Up @@ -2056,8 +2056,8 @@ class OSD : public Dispatcher,

struct SnapTrimWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
SnapTrimWQ(OSD *o, time_t ti, ThreadPool *tp)
: ThreadPool::WorkQueue<PG>("OSD::SnapTrimWQ", ti, 0, tp), osd(o) {}
SnapTrimWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
: ThreadPool::WorkQueue<PG>("OSD::SnapTrimWQ", ti, si, tp), osd(o) {}

bool _empty() {
return osd->snap_trim_queue.empty();
Expand Down Expand Up @@ -2102,8 +2102,8 @@ class OSD : public Dispatcher,

struct ScrubWQ : public ThreadPool::WorkQueue<PG> {
OSD *osd;
ScrubWQ(OSD *o, time_t ti, ThreadPool *tp)
: ThreadPool::WorkQueue<PG>("OSD::ScrubWQ", ti, 0, tp), osd(o) {}
ScrubWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
: ThreadPool::WorkQueue<PG>("OSD::ScrubWQ", ti, si, tp), osd(o) {}

bool _empty() {
return osd->scrub_queue.empty();
Expand Down Expand Up @@ -2149,8 +2149,8 @@ class OSD : public Dispatcher,
list<MOSDRepScrub*> rep_scrub_queue;

public:
RepScrubWQ(OSD *o, time_t ti, ThreadPool *tp)
: ThreadPool::WorkQueue<MOSDRepScrub>("OSD::RepScrubWQ", ti, 0, tp), osd(o) {}
RepScrubWQ(OSD *o, time_t ti, time_t si, ThreadPool *tp)
: ThreadPool::WorkQueue<MOSDRepScrub>("OSD::RepScrubWQ", ti, si, tp), osd(o) {}

bool _empty() {
return rep_scrub_queue.empty();
Expand Down Expand Up @@ -2202,9 +2202,9 @@ class OSD : public Dispatcher,
public ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> > {
ObjectStore *&store;
list<pair<PGRef, DeletingStateRef> > remove_queue;
RemoveWQ(ObjectStore *&o, time_t ti, ThreadPool *tp)
RemoveWQ(ObjectStore *&o, time_t ti, time_t si, ThreadPool *tp)
: ThreadPool::WorkQueueVal<pair<PGRef, DeletingStateRef> >(
"OSD::RemoveWQ", ti, 0, tp),
"OSD::RemoveWQ", ti, si, tp),
store(o) {}

bool _empty() {
Expand Down
2 changes: 1 addition & 1 deletion src/osdc/ObjectCacher.cc
Expand Up @@ -1144,6 +1144,7 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
++bh_it) {
uint64_t rx_bytes = static_cast<uint64_t>(
stat_rx + bh_it->second->length());
bytes_not_in_cache += bh_it->second->length();
if (!waitfor_read.empty() || rx_bytes > max_size) {
// cache is full with concurrent reads -- wait for rx's to complete
// to constrain memory growth (especially during copy-ups)
Expand All @@ -1165,7 +1166,6 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, oset, onfinish) );
}
}
bytes_not_in_cache += bh_it->second->length();
success = false;
}

Expand Down
8 changes: 6 additions & 2 deletions src/pybind/rbd.py
Expand Up @@ -466,8 +466,12 @@ def parent_info(self):
pool = create_string_buffer(size)
name = create_string_buffer(size)
snapname = create_string_buffer(size)
ret = self.librbd.rbd_get_parent_info(self.image, pool, len(pool),
name, len(name), snapname, len(snapname))
ret = self.librbd.rbd_get_parent_info(self.image, byref(pool),
c_size_t(size),
byref(name),
c_size_t(size),
byref(snapname),
c_size_t(size))
if ret == -errno.ERANGE:
size *= 2

Expand Down

0 comments on commit 391f753

Please sign in to comment.