Skip to content

Commit

Permalink
test/fio: add single pool mode to share collections among multiple jobs
Browse files Browse the repository at this point in the history
Signed-off-by: Igor Fedotov ifedotov@suse.com
  • Loading branch information
ifed01 committed Nov 30, 2017
1 parent 120ef36 commit 9d7291f
Showing 1 changed file with 113 additions and 54 deletions.
167 changes: 113 additions & 54 deletions src/test/fio/fio_ceph_objectstore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ struct Options {
_fastinfo_omap_len_low,
_fastinfo_omap_len_high;
bool simulate_pglog;
bool single_pool_mode;
};

template <class Func> // void Func(fio_option&)
Expand Down Expand Up @@ -126,18 +127,98 @@ static std::vector<fio_option> ceph_options{
o.def = 0;
o.minval = 0;
}),
make_option([] (fio_option& o) {
o.name = "single_pool_mode";
o.lname = "single(shared among jobs) pool mode";
o.type = FIO_OPT_BOOL;
o.help = "Enables the mode when all jobs run against the same pool";
o.off1 = offsetof(Options, single_pool_mode);
o.def = "0";
}),
{} // fio expects a 'null'-terminated list
};


struct Collection {
spg_t pg;
coll_t cid;
ObjectStore::Sequencer sequencer;
// Can't use mutex directly in vectors hence dynamic allocation

ceph::unique_ptr<std::mutex> lock;
uint64_t pglog_ver_head = 1;
uint64_t pglog_ver_tail = 1;
uint64_t pglog_dup_ver_tail = 1;

// use big pool ids to avoid clashing with existing collections
static constexpr int64_t MIN_POOL_ID = 0x0000ffffffffffff;

Collection(const spg_t& pg)
: pg(pg), cid(pg), sequencer(stringify(pg)),
lock(new std::mutex) {
sequencer.shard_hint = pg;
}
};

int init_collections(std::unique_ptr<ObjectStore>& os,
uint64_t pool,
std::vector<Collection>& collections,
uint64_t count)
{
assert(count > 0);
collections.reserve(count);

const int split_bits = cbits(count - 1);

ObjectStore::Transaction t;
for (uint32_t i = 0; i < count; i++) {
auto pg = spg_t{pg_t{i, pool}};
collections.emplace_back(pg);

auto& coll = collections.back();
if (!os->collection_exists(coll.cid)) {
t.create_collection(coll.cid, split_bits);
ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
t.touch(coll.cid, pgmeta_oid);
}
}
ObjectStore::Sequencer sequencer("Engine init");
int r = os->apply_transaction(&sequencer, std::move(t));
if (r)
derr << "Engine init failed with " << cpp_strerror(-r) << dendl;
return r;
}

int destroy_collections(
std::unique_ptr<ObjectStore>& os,
std::vector<Collection>& collections)
{
ObjectStore::Transaction t;
// remove our collections
for (auto& coll : collections) {
ghobject_t pgmeta_oid(coll.pg.make_pgmeta_oid());
t.remove(coll.cid, pgmeta_oid);
t.remove_collection(coll.cid);
}
ObjectStore::Sequencer sequencer("Engine cleanup");
int r = os->apply_transaction(&sequencer, std::move(t));
if (r)
derr << "Engine cleanup failed with " << cpp_strerror(-r) << dendl;
return r;
}

/// global engine state shared between all jobs within the process. this
/// includes g_ceph_context and the ObjectStore instance
struct Engine {
/// the initial g_ceph_context reference to be dropped on destruction
boost::intrusive_ptr<CephContext> cct;
std::unique_ptr<ObjectStore> os;

std::vector<Collection> collections; //< shared collections to spread objects over

std::mutex lock;
int ref_count;
const bool unlink; //< unlink objects on destruction

Engine(thread_data* td);
~Engine();
Expand Down Expand Up @@ -172,14 +253,18 @@ struct Engine {
f->flush(ostr);
delete f;

if (unlink) {
destroy_collections(os, collections);
}
os->umount();
dout(0) << ostr.str() << dendl;
}
}
};

Engine::Engine(thread_data* td)
: ref_count(0)
: ref_count(0),
unlink(td->o.unlink)
{
// add the ceph command line arguments
auto o = static_cast<Options*>(td->eo);
Expand Down Expand Up @@ -236,35 +321,20 @@ Engine::Engine(thread_data* td)
if (r < 0)
throw std::system_error(-r, std::system_category(), "mount failed");

// create shared collections up to osd_pool_default_pg_num
if (o->single_pool_mode) {
uint64_t count = g_conf->get_val<uint64_t>("osd_pool_default_pg_num");
if (count > td->o.nr_files)
count = td->o.nr_files;
init_collections(os, Collection::MIN_POOL_ID, collections, count);
}
}

Engine::~Engine()
{
assert(!ref_count);
}


struct Collection {
spg_t pg;
coll_t cid;
ObjectStore::Sequencer sequencer;

// Can't use mutex directly in vectors hence dynamic allocation
ceph::unique_ptr<std::mutex> lock;
uint64_t pglog_ver_head = 1;
uint64_t pglog_ver_tail = 1;
uint64_t pglog_dup_ver_tail = 1;

// use big pool ids to avoid clashing with existing collections
static constexpr int64_t MIN_POOL_ID = 0x0000ffffffffffff;

Collection(const spg_t& pg)
: pg(pg), cid(pg), sequencer(stringify(pg)),
lock(new std::mutex) {
sequencer.shard_hint = pg;
}
};

struct Object {
ghobject_t oid;
Collection& coll;
Expand All @@ -274,10 +344,11 @@ struct Object {
coll(coll) {}
};

/// treat each fio job like a separate pool with its own collections and objects
/// treat each fio job either like a separate pool with its own collections and objects
/// or just a client using its own objects from the shared pool
struct Job {
Engine* engine; //< shared ptr to the global Engine
std::vector<Collection> collections; //< spread objects over collections
std::vector<Collection> collections; //< job's private collections to spread objects over
std::vector<Object> objects; //< associate an object with each fio_file
std::vector<io_u*> events; //< completions for fio_ceph_os_event()
const bool unlink; //< unlink objects on destruction
Expand All @@ -295,29 +366,6 @@ Job::Job(Engine* engine, const thread_data* td)
unlink(td->o.unlink)
{
engine->ref();
// use the fio thread_number for our unique pool id
const uint64_t pool = Collection::MIN_POOL_ID + td->thread_number;

// create a collection for each object, up to osd_pool_default_pg_num
uint64_t count = g_conf->get_val<uint64_t>("osd_pool_default_pg_num");
if (count > td->o.nr_files)
count = td->o.nr_files;

assert(count > 0);
collections.reserve(count);

const int split_bits = cbits(count - 1);

ObjectStore::Transaction t;
for (uint32_t i = 0; i < count; i++) {
auto pg = spg_t{pg_t{i, pool}};
collections.emplace_back(pg);

auto& cid = collections.back().cid;
if (!engine->os->collection_exists(cid))
t.create_collection(cid, split_bits);
}

auto o = static_cast<Options*>(td->eo);
unsigned long long max_data = max(o->oi_attr_len_high,
o->snapset_attr_len_high);
Expand All @@ -326,16 +374,30 @@ Job::Job(Engine* engine, const thread_data* td)
max_data = max(max_data, o->_fastinfo_omap_len_high);
one_for_all_data = buffer::create(max_data);

std::vector<Collection>* colls;
// create private collections up to osd_pool_default_pg_num
if (!o->single_pool_mode) {
uint64_t count = g_conf->get_val<uint64_t>("osd_pool_default_pg_num");
if (count > td->o.nr_files)
count = td->o.nr_files;
// use the fio thread_number for our unique pool id
const uint64_t pool = Collection::MIN_POOL_ID + td->thread_number + 1;
init_collections(engine->os, pool, collections, count);
colls = &collections;
} else {
colls = &engine->collections;
}
const uint64_t file_size = td->o.size / max(1u, td->o.nr_files);
ObjectStore::Transaction t;

// create an object for each file in the job
for (uint32_t i = 0; i < td->o.nr_files; i++) {
auto f = td->files[i];
f->real_file_size = file_size;
f->engine_pos = i;

// associate each object with a collection in a round-robin fashion
auto& coll = collections[i % collections.size()];
// associate each object with a collection in a round-robin fashion.
auto& coll = (*colls)[i % colls->size()];

objects.emplace_back(f->file_name, coll);
auto& oid = objects.back().oid;
Expand All @@ -356,15 +418,12 @@ Job::Job(Engine* engine, const thread_data* td)
Job::~Job()
{
if (unlink) {
destroy_collections(engine->os, collections);
ObjectStore::Transaction t;
// remove our objects
for (auto& obj : objects) {
t.remove(obj.coll.cid, obj.oid);
}
// remove our collections
for (auto& coll : collections) {
t.remove_collection(coll.cid);
}
ObjectStore::Sequencer sequencer("job cleanup");
int r = engine->os->apply_transaction(&sequencer, std::move(t));
if (r)
Expand Down

0 comments on commit 9d7291f

Please sign in to comment.