Skip to content

Commit

Permalink
os/bluestore/Bluefs: Unit test cases for bluefs async flush
Browse files Browse the repository at this point in the history
Signed-off-by: Varada Kari <varada.kari@sandisk.com>
  • Loading branch information
Varada Kari authored and liewegas committed Jun 30, 2016
1 parent a579e06 commit 005efba
Showing 1 changed file with 227 additions and 0 deletions.
227 changes: 227 additions & 0 deletions src/test/objectstore/test_bluefs.cc
Expand Up @@ -7,6 +7,7 @@
#include <time.h>
#include <fcntl.h>
#include <unistd.h>
#include <thread>
#include "global/global_init.h"
#include "common/ceph_argparse.h"
#include "include/stringify.h"
Expand All @@ -28,6 +29,15 @@ string get_temp_bdev(uint64_t size)
return fn;
}

char* gen_buffer(uint64_t size)
{
char *buffer = new char[size];
boost::random::random_device rand;
rand.generate(buffer, buffer + size);
return buffer;
}


void rm_temp_bdev(string f)
{
::unlink(f.c_str());
Expand Down Expand Up @@ -131,6 +141,223 @@ TEST(BlueFS, small_appends) {
rm_temp_bdev(fn);
}

#define ALLOC_SIZE 4096

void write_data(BlueFS &fs, uint64_t rationed_bytes)
{
BlueFS::FileWriter *h;
int j=0, r=0;
uint64_t written_bytes = 0;
rationed_bytes -= ALLOC_SIZE;
stringstream ss;
string dir = "dir.";
ss << std::this_thread::get_id();
dir.append(ss.str());
dir.append(".");
dir.append(to_string(j));
ASSERT_EQ(0, fs.mkdir(dir));
while (1) {
string file = "file.";
file.append(to_string(j));
ASSERT_EQ(0, fs.open_for_write(dir, file, &h, false));
bufferlist bl;
char *buf = gen_buffer(ALLOC_SIZE);
bufferptr bp = buffer::claim_char(ALLOC_SIZE, buf);
bl.push_back(bp);
h->append(bl);
r = fs.fsync(h);
if (r < 0) {
std::cout << " rationed_bytes: " << rationed_bytes << " written_bytes: " << written_bytes << std::endl;
fs.close_writer(h);
break;
}
written_bytes += g_conf->bluefs_alloc_size;
fs.close_writer(h);
j++;
if ((rationed_bytes - written_bytes) <= g_conf->bluefs_alloc_size) {
std::cout << " rationed_bytes: " << rationed_bytes << " written_bytes: " << written_bytes << std::endl;
break;
}
}
}

void create_single_file(BlueFS &fs)
{
BlueFS::FileWriter *h;
stringstream ss;
string dir = "dir.test";
ASSERT_EQ(0, fs.mkdir(dir));
string file = "testfile";
ASSERT_EQ(0, fs.open_for_write(dir, file, &h, false));
bufferlist bl;
char *buf = gen_buffer(ALLOC_SIZE);
bufferptr bp = buffer::claim_char(ALLOC_SIZE, buf);
bl.push_back(bp);
h->append(bl);
fs.fsync(h);
fs.close_writer(h);
}

void write_single_file(BlueFS &fs, uint64_t rationed_bytes)
{
BlueFS::FileWriter *h;
stringstream ss;
string dir = "dir.test";
string file = "testfile";
int r=0, j=0;
uint64_t written_bytes = 0;
rationed_bytes -= ALLOC_SIZE;
while (1) {
ASSERT_EQ(0, fs.open_for_write(dir, file, &h, false));
bufferlist bl;
char *buf = gen_buffer(ALLOC_SIZE);
bufferptr bp = buffer::claim_char(ALLOC_SIZE, buf);
bl.push_back(bp);
h->append(bl);
r = fs.fsync(h);
if (r < 0) {
std::cout << " rationed_bytes: " << rationed_bytes << " written_bytes: " << written_bytes << std::endl;
fs.close_writer(h);
break;
}
written_bytes += g_conf->bluefs_alloc_size;
fs.close_writer(h);
j++;
if ((rationed_bytes - written_bytes) <= g_conf->bluefs_alloc_size) {
std::cout << " rationed_bytes: " << rationed_bytes << " written_bytes: " << written_bytes << std::endl;
break;
}
}
}

bool writes_done = false;

void sync_fs(BlueFS &fs)
{
while (1) {
if (writes_done == true)
break;
fs.sync_metadata();
sleep(1);
}
}


void do_join(std::thread& t)
{
t.join();
}

void join_all(std::vector<std::thread>& v)
{
std::for_each(v.begin(),v.end(),do_join);
}

#define NUM_WRITERS 3
#define NUM_SYNC_THREADS 1

#define NUM_SINGLE_FILE_WRITERS 1
#define NUM_MULTIPLE_FILE_WRITERS 2

TEST(BlueFS, test_flush_1) {
uint64_t size = 1048476 * 128;
string fn = get_temp_bdev(size);
g_ceph_context->_conf->set_val(
"bluefs_alloc_size",
"65536");
g_ceph_context->_conf->apply_changes(NULL);

BlueFS fs;
ASSERT_EQ(0, fs.add_block_device(BlueFS::BDEV_DB, fn));
fs.add_block_extent(BlueFS::BDEV_DB, 1048576, size - 1048576);
uuid_d fsid;
ASSERT_EQ(0, fs.mkfs(fsid));
ASSERT_EQ(0, fs.mount());
{
std::vector<std::thread> write_thread_multiple;
uint64_t effective_size = size - (32 * 1048576); // leaving the last 32 MB for log compaction
uint64_t per_thread_bytes = (effective_size/(NUM_MULTIPLE_FILE_WRITERS + NUM_SINGLE_FILE_WRITERS));
for (int i=0; i<NUM_MULTIPLE_FILE_WRITERS ; i++) {
write_thread_multiple.push_back(std::thread(write_data, std::ref(fs), per_thread_bytes));
}

create_single_file(fs);
std::vector<std::thread> write_thread_single;
for (int i=0; i<NUM_SINGLE_FILE_WRITERS; i++) {
write_thread_single.push_back(std::thread(write_single_file, std::ref(fs), per_thread_bytes));
}

join_all(write_thread_single);
join_all(write_thread_multiple);
}
fs.umount();
rm_temp_bdev(fn);
}

TEST(BlueFS, test_flush_2) {
uint64_t size = 1048476 * 128;
string fn = get_temp_bdev(size);
g_ceph_context->_conf->set_val(
"bluefs_alloc_size",
"65536");
g_ceph_context->_conf->apply_changes(NULL);

BlueFS fs;
ASSERT_EQ(0, fs.add_block_device(BlueFS::BDEV_DB, fn));
fs.add_block_extent(BlueFS::BDEV_DB, 1048576, size - 1048576);
uuid_d fsid;
ASSERT_EQ(0, fs.mkfs(fsid));
ASSERT_EQ(0, fs.mount());
{
uint64_t effective_size = size - (32 * 1048576); // leaving the last 32 MB for log compaction
uint64_t per_thread_bytes = (effective_size/(NUM_WRITERS));
std::vector<std::thread> write_thread_multiple;
for (int i=0; i<NUM_WRITERS; i++) {
write_thread_multiple.push_back(std::thread(write_data, std::ref(fs), per_thread_bytes));
}

join_all(write_thread_multiple);
}
fs.umount();
rm_temp_bdev(fn);
}

TEST(BlueFS, test_flush_3) {
uint64_t size = 1048476 * 128;
string fn = get_temp_bdev(size);
g_ceph_context->_conf->set_val(
"bluefs_alloc_size",
"65536");
g_ceph_context->_conf->apply_changes(NULL);

BlueFS fs;
ASSERT_EQ(0, fs.add_block_device(BlueFS::BDEV_DB, fn));
fs.add_block_extent(BlueFS::BDEV_DB, 1048576, size - 1048576);
uuid_d fsid;
ASSERT_EQ(0, fs.mkfs(fsid));
ASSERT_EQ(0, fs.mount());
{
std::vector<std::thread> write_threads;
uint64_t effective_size = size - (11 * 1048576); // leaving the last 11 MB for log compaction
uint64_t per_thread_bytes = (effective_size/(NUM_WRITERS));
for (int i=0; i<NUM_WRITERS; i++) {
write_threads.push_back(std::thread(write_data, std::ref(fs), per_thread_bytes));
}

std::vector<std::thread> sync_threads;
for (int i=0; i<NUM_SYNC_THREADS; i++) {
sync_threads.push_back(std::thread(sync_fs, std::ref(fs)));
}

join_all(write_threads);
writes_done = true;
join_all(sync_threads);
}
fs.umount();
rm_temp_bdev(fn);
}


int main(int argc, char **argv) {
vector<const char*> args;
argv_to_vec(argc, (const char **)argv, args);
Expand Down

0 comments on commit 005efba

Please sign in to comment.