Skip to content

Commit

Permalink
Merge pull request #5810: Heavy memory shuffling in rados bench
Browse files Browse the repository at this point in the history
Reviewed-by: Loic Dachary <ldachary@redhat.com>
  • Loading branch information
ldachary committed Oct 3, 2015
2 parents 7a64830 + dba8b5b commit 5cb6c5c
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 14 deletions.
9 changes: 9 additions & 0 deletions src/common/buffer.cc
Expand Up @@ -1867,6 +1867,15 @@ __u32 buffer::list::crc32c(__u32 crc) const
return crc;
}

void buffer::list::invalidate_crc()
{
for (std::list<ptr>::const_iterator p = _buffers.begin(); p != _buffers.end(); ++p) {
raw *r = p->get_raw();
if (r) {
r->invalidate_crc();
}
}
}

/**
* Binary write all contents to a C++ stream
Expand Down
34 changes: 20 additions & 14 deletions src/common/obj_bencher.cc
Expand Up @@ -379,10 +379,12 @@ int ObjBencher::write_bench(int secondsToRun, int maxObjectsToCreate,
}
lock.Unlock();
//create new contents and name on the heap, and fill them
newContents = new bufferlist();
newName = generate_object_name(data.started);
snprintf(data.object_contents, data.object_size, "I'm the %16dth object!", data.started);
newContents->append(data.object_contents, data.object_size);
newContents = contents[slot];
snprintf(newContents->c_str(), data.object_size, "I'm the %16dth object!", data.started);
// we wrote to buffer, going around internal crc cache, so invalidate it now.
newContents->invalidate_crc();

completion_wait(slot);
lock.Lock();
r = completion_ret(slot);
Expand All @@ -402,8 +404,7 @@ int ObjBencher::write_bench(int secondsToRun, int maxObjectsToCreate,
release_completion(slot);
timePassed = ceph_clock_now(cct) - data.start_time;

//write new stuff to backend, then delete old stuff
//and save locations of new stuff for later deletion
//write new stuff to backend
start_times[slot] = ceph_clock_now(cct);
r = create_completion(slot, _aio_cb, &lc);
if (r < 0)
Expand All @@ -412,10 +413,7 @@ int ObjBencher::write_bench(int secondsToRun, int maxObjectsToCreate,
if (r < 0) {//naughty; doesn't clean up heap space.
goto ERR;
}
delete contents[slot];
name[slot] = newName;
contents[slot] = newContents;
newContents = 0;
lock.Lock();
++data.started;
++data.in_flight;
Expand All @@ -442,6 +440,7 @@ int ObjBencher::write_bench(int secondsToRun, int maxObjectsToCreate,
lock.Unlock();
release_completion(slot);
delete contents[slot];
contents[slot] = 0;
}

timePassed = ceph_clock_now(cct) - data.start_time;
Expand Down Expand Up @@ -478,6 +477,9 @@ int ObjBencher::write_bench(int secondsToRun, int maxObjectsToCreate,
sync_write(run_name_meta, b_write, sizeof(int)*3);

completions_done();
for (int i = 0; i < concurrentios; i++)
if (contents[i])
delete contents[i];

return 0;

Expand All @@ -486,8 +488,10 @@ int ObjBencher::write_bench(int secondsToRun, int maxObjectsToCreate,
data.done = 1;
lock.Unlock();
pthread_join(print_thread, NULL);
delete newContents;
return -5;
for (int i = 0; i < concurrentios; i++)
if (contents[i])
delete contents[i];
return r;
}

int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurrentios, int pid) {
Expand Down Expand Up @@ -595,9 +599,11 @@ int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurre
release_completion(slot);
cur_contents = contents[slot];

// invalidate internal crc cache
cur_contents->invalidate_crc();

//start new read and check data if requested
start_times[slot] = ceph_clock_now(cct);
contents[slot] = new bufferlist();
create_completion(slot, _aio_cb, (void *)&lc);
r = aio_read(newName, slot, contents[slot], data.object_size);
if (r < 0) {
Expand All @@ -613,7 +619,6 @@ int ObjBencher::seq_read_bench(int seconds_to_run, int num_objects, int concurre
++errors;
}
name[slot] = newName;
delete cur_contents;
}

//wait for final reads to complete
Expand Down Expand Up @@ -786,9 +791,11 @@ int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurr
release_completion(slot);
cur_contents = contents[slot];

// invalidate internal crc cache
cur_contents->invalidate_crc();

//start new read and check data if requested
start_times[slot] = ceph_clock_now(g_ceph_context);
contents[slot] = new bufferlist();
create_completion(slot, _aio_cb, (void *)&lc);
r = aio_read(newName, slot, contents[slot], data.object_size);
if (r < 0) {
Expand All @@ -804,7 +811,6 @@ int ObjBencher::rand_read_bench(int seconds_to_run, int num_objects, int concurr
++errors;
}
name[slot] = newName;
delete cur_contents;
}

//wait for final reads to complete
Expand Down
1 change: 1 addition & 0 deletions src/include/buffer.h
Expand Up @@ -501,6 +501,7 @@ class CEPH_BUFFER_API buffer {
int write_fd(int fd) const;
int write_fd_zero_copy(int fd) const;
uint32_t crc32c(uint32_t crc) const;
void invalidate_crc();
};

/*
Expand Down
40 changes: 40 additions & 0 deletions src/test/bufferlist.cc
Expand Up @@ -2356,6 +2356,46 @@ TEST(BufferList, TestCopyAll) {
ASSERT_EQ(memcmp(big.get(), big2.get(), BIG_SZ), 0);
}

TEST(BufferList, InvalidateCrc) {
const static size_t buffer_size = 262144;
ceph::shared_ptr <unsigned char> big(
(unsigned char*)malloc(buffer_size), free);
unsigned char c = 0;
char* ptr = (char*) big.get();
char* inptr;
for (size_t i = 0; i < buffer_size; ++i) {
ptr[i] = c++;
}
bufferlist bl;

// test for crashes (shouldn't crash)
bl.invalidate_crc();

// put data into bufferlist
bl.append((const char*)big.get(), buffer_size);

// get its crc
__u32 crc = bl.crc32c(0);

// modify data in bl without its knowledge
inptr = (char*) bl.c_str();
c = 0;
for (size_t i = 0; i < buffer_size; ++i) {
inptr[i] = c--;
}

// make sure data in bl are now different than in big
EXPECT_NE(memcmp((void*) ptr, (void*) inptr, buffer_size), 0);

// crc should remain the same
__u32 new_crc = bl.crc32c(0);
EXPECT_EQ(crc, new_crc);

// force crc invalidate, check if it is updated
bl.invalidate_crc();
EXPECT_NE(crc, bl.crc32c(0));
}

TEST(BufferHash, all) {
{
bufferlist bl;
Expand Down

0 comments on commit 5cb6c5c

Please sign in to comment.