Skip to content

Commit

Permalink
Merge pull request #6791: Race condition in rados bench
Browse files Browse the repository at this point in the history
Reviewed-by: Loic Dachary <ldachary@redhat.com>
  • Loading branch information
Loic Dachary committed Jan 11, 2016
2 parents 23b9747 + c2c6d02 commit 904d88f
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 42 deletions.
108 changes: 71 additions & 37 deletions src/common/obj_bencher.cc
Expand Up @@ -154,7 +154,7 @@ void *ObjBencher::status_printer(void *_bencher) {
int ObjBencher::aio_bench(
int operation, int secondsToRun,
int maxObjectsToCreate,
int concurrentios, int op_size, bool cleanup, const char* run_name) {
int concurrentios, int op_size, bool cleanup, const char* run_name, bool no_verify) {

if (concurrentios <= 0)
return -EINVAL;
Expand Down Expand Up @@ -203,11 +203,11 @@ int ObjBencher::aio_bench(
if (r != 0) goto out;
}
else if (OP_SEQ_READ == operation) {
r = seq_read_bench(secondsToRun, num_objects, concurrentios, prevPid);
r = seq_read_bench(secondsToRun, num_objects, concurrentios, prevPid, no_verify);
if (r != 0) goto out;
}
else if (OP_RAND_READ == operation) {
r = rand_read_bench(secondsToRun, num_objects, concurrentios, prevPid);
r = rand_read_bench(secondsToRun, num_objects, concurrentios, prevPid, no_verify);
if (r != 0) goto out;
}

Expand Down Expand Up @@ -494,7 +494,7 @@ int ObjBencher::write_bench(int secondsToRun, int maxObjectsToCreate,
return r;
}

int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid) {
int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid, bool no_verify) {
lock_cond lc(&lock);

if (concurrentios <= 0)
Expand Down Expand Up @@ -576,31 +576,43 @@ int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurre
}
lc.cond.Wait(lock);
}
lock.Unlock();
newName = generate_object_name(data.started, pid);

// calculate latency here, so memcmp doesn't inflate it
data.cur_latency = ceph_clock_now(cct) - start_times[slot];

cur_contents = contents[slot];
int current_index = index[slot];

// invalidate internal crc cache
cur_contents->invalidate_crc();

if (!no_verify) {
snprintf(data.object_contents, data.object_size, "I'm the %16dth object!", current_index);
if (memcmp(data.object_contents, cur_contents->c_str(), data.object_size) != 0) {
cerr << name[slot] << " is not correct!" << std::endl;
++errors;
}
}

newName = generate_object_name(data.started, pid);
index[slot] = data.started;
lock.Unlock();
completion_wait(slot);
lock.Lock();
r = completion_ret(slot);
if (r < 0) {
cerr << "read got " << r << std::endl;
lock.Unlock();
goto ERR;
}
data.cur_latency = ceph_clock_now(cct) - start_times[slot];
lock.Lock();
total_latency += data.cur_latency;
if( data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
++data.finished;
data.avg_latency = total_latency / data.finished;
--data.in_flight;
lock.Unlock();
release_completion(slot);
cur_contents = contents[slot];

// invalidate internal crc cache
cur_contents->invalidate_crc();

//start new read and check data if requested
start_times[slot] = ceph_clock_now(cct);
Expand All @@ -612,12 +624,14 @@ int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurre
lock.Lock();
++data.started;
++data.in_flight;
snprintf(data.object_contents, data.object_size, "I'm the %16dth object!", current_index);
lock.Unlock();
lock.Unlock();
if (memcmp(data.object_contents, cur_contents->c_str(), data.object_size) != 0) {
cerr << name[slot] << " is not correct!" << std::endl;
++errors;
} else {
lock.Unlock();
}

name[slot] = newName;
}

Expand All @@ -640,11 +654,15 @@ int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurre
data.avg_latency = total_latency / data.finished;
--data.in_flight;
release_completion(slot);
snprintf(data.object_contents, data.object_size, "I'm the %16dth object!", index[slot]);
lock.Unlock();
if (memcmp(data.object_contents, contents[slot]->c_str(), data.object_size) != 0) {
cerr << name[slot] << " is not correct!" << std::endl;
++errors;
if (!no_verify) {
snprintf(data.object_contents, data.object_size, "I'm the %16dth object!", index[slot]);
lock.Unlock();
if (memcmp(data.object_contents, contents[slot]->c_str(), data.object_size) != 0) {
cerr << name[slot] << " is not correct!" << std::endl;
++errors;
}
} else {
lock.Unlock();
}
delete contents[slot];
}
Expand Down Expand Up @@ -682,7 +700,7 @@ int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurre
return -5;
}

int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid)
int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid, bool no_verify)
{
lock_cond lc(&lock);

Expand Down Expand Up @@ -767,11 +785,14 @@ int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurr
}
lc.cond.Wait(lock);
}

// calculate latency here, so memcmp doesn't inflate it
data.cur_latency = ceph_clock_now(cct) - start_times[slot];

lock.Unlock();
rand_id = rand() % num_objects;
newName = generate_object_name(rand_id, pid);

int current_index = index[slot];
index[slot] = rand_id;
cur_contents = contents[slot];
completion_wait(slot);
lock.Lock();
r = completion_ret(slot);
Expand All @@ -780,16 +801,27 @@ int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurr
lock.Unlock();
goto ERR;
}
data.cur_latency = ceph_clock_now(g_ceph_context) - start_times[slot];

total_latency += data.cur_latency;
if( data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
if (data.cur_latency > data.max_latency) data.max_latency = data.cur_latency;
if (data.cur_latency < data.min_latency) data.min_latency = data.cur_latency;
++data.finished;
data.avg_latency = total_latency / data.finished;
--data.in_flight;
lock.Unlock();

if (!no_verify) {
snprintf(data.object_contents, data.object_size, "I'm the %16dth object!", current_index);
if (memcmp(data.object_contents, cur_contents->c_str(), data.object_size) != 0) {
cerr << name[slot] << " is not correct!" << std::endl;
++errors;
}
}

rand_id = rand() % num_objects;
newName = generate_object_name(rand_id, pid);
index[slot] = rand_id;
release_completion(slot);
cur_contents = contents[slot];

// invalidate internal crc cache
cur_contents->invalidate_crc();
Expand All @@ -804,15 +836,11 @@ int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurr
lock.Lock();
++data.started;
++data.in_flight;
snprintf(data.object_contents, data.object_size, "I'm the %16dth object!", current_index);
lock.Unlock();
if (memcmp(data.object_contents, cur_contents->c_str(), data.object_size) != 0) {
cerr << name[slot] << " is not correct!" << std::endl;
++errors;
}
name[slot] = newName;
}


//wait for final reads to complete
while (data.finished < data.started) {
slot = data.finished % concurrentios;
Expand All @@ -832,12 +860,18 @@ int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurr
data.avg_latency = total_latency / data.finished;
--data.in_flight;
release_completion(slot);
snprintf(data.object_contents, data.object_size, "I'm the %16dth object!", index[slot]);
lock.Unlock();
if (memcmp(data.object_contents, contents[slot]->c_str(), data.object_size) != 0) {
cerr << name[slot] << " is not correct!" << std::endl;
++errors;

if (!no_verify) {
snprintf(data.object_contents, data.object_size, "I'm the %16dth object!", index[slot]);
lock.Unlock();
if (memcmp(data.object_contents, contents[slot]->c_str(), data.object_size) != 0) {
cerr << name[slot] << " is not correct!" << std::endl;
++errors;
}
} else {
lock.Unlock();
}

delete contents[slot];
}

Expand Down
6 changes: 3 additions & 3 deletions src/common/obj_bencher.h
Expand Up @@ -65,8 +65,8 @@ class ObjBencher {
int fetch_bench_metadata(const std::string& metadata_file, int* object_size, int* num_objects, int* prevPid);

int write_bench(int secondsToRun, int maxObjects, int concurrentios, const string& run_name_meta);
int seq_read_bench(int secondsToRun, int num_objects, int concurrentios, int writePid);
int rand_read_bench(int secondsToRun, int num_objects, int concurrentios, int writePid);
int seq_read_bench(int secondsToRun, int num_objects, int concurrentios, int writePid, bool no_verify);
int rand_read_bench(int secondsToRun, int num_objects, int concurrentios, int writePid, bool no_verify);

int clean_up(int num_objects, int prevPid, int concurrentios);
int clean_up_slow(const std::string& prefix, int concurrentios);
Expand Down Expand Up @@ -98,7 +98,7 @@ class ObjBencher {
virtual ~ObjBencher() {}
int aio_bench(
int operation, int secondsToRun, int maxObjectsToCreate,
int concurrentios, int op_size, bool cleanup, const char* run_name);
int concurrentios, int op_size, bool cleanup, const char* run_name, bool no_verify=false);
int clean_up(const char* prefix, int concurrentios, const char* run_name);

void set_show_time(bool dt) {
Expand Down
12 changes: 10 additions & 2 deletions src/tools/rados/rados.cc
Expand Up @@ -185,6 +185,8 @@ void usage(ostream& out)
" Set number of concurrent I/O operations\n"
" --show-time\n"
" prefix output with date/time\n"
" --no-verify\n"
" do not verify contents of read objects\n"
"\n"
"LOAD GEN OPTIONS:\n"
" --num-objects total number of objects\n"
Expand Down Expand Up @@ -1142,6 +1144,7 @@ static int rados_tool_common(const std::map < std::string, std::string > &opts,
int concurrent_ios = 16;
unsigned op_size = default_op_size;
bool cleanup = true;
bool no_verify = false;
const char *snapname = NULL;
snap_t snapid = CEPH_NOSNAP;
std::map<std::string, std::string>::const_iterator i;
Expand Down Expand Up @@ -1311,7 +1314,10 @@ static int rados_tool_common(const std::map < std::string, std::string > &opts,
if (i != opts.end()) {
nspace = i->second;
}

i = opts.find("no-verify");
if (i != opts.end()) {
no_verify = true;
}

// open rados
ret = rados.init_with_context(g_ceph_context);
Expand Down Expand Up @@ -2262,7 +2268,7 @@ static int rados_tool_common(const std::map < std::string, std::string > &opts,
RadosBencher bencher(g_ceph_context, rados, io_ctx);
bencher.set_show_time(show_time);
ret = bencher.aio_bench(operation, seconds, num_objs,
concurrent_ios, op_size, cleanup, run_name);
concurrent_ios, op_size, cleanup, run_name, no_verify);
if (ret != 0)
cerr << "error during benchmark: " << ret << std::endl;
}
Expand Down Expand Up @@ -2641,6 +2647,8 @@ int main(int argc, const char **argv)
opts["show-time"] = "true";
} else if (ceph_argparse_flag(args, i, "--no-cleanup", (char*)NULL)) {
opts["no-cleanup"] = "true";
} else if (ceph_argparse_flag(args, i, "--no-verify", (char*)NULL)) {
opts["no-verify"] = "true";
} else if (ceph_argparse_witharg(args, i, &val, "--run-name", (char*)NULL)) {
opts["run-name"] = val;
} else if (ceph_argparse_witharg(args, i, &val, "--prefix", (char*)NULL)) {
Expand Down

0 comments on commit 904d88f

Please sign in to comment.