Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

memstore: clone zero-fills holes from source range #11157

Merged
merged 3 commits into from
Sep 20, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
65 changes: 54 additions & 11 deletions src/os/memstore/MemStore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1652,8 +1652,7 @@ int MemStore::PageSetObject::write(uint64_t offset, const bufferlist &src)

auto page = tls_pages.begin();

// XXX: cast away the const because bufferlist doesn't have a const_iterator
auto p = const_cast<bufferlist&>(src).begin();
auto p = src.begin();
while (len > 0) {
unsigned page_offset = offset - (*page)->offset;
unsigned pageoff = data.get_page_size() - page_offset;
Expand Down Expand Up @@ -1685,34 +1684,78 @@ int MemStore::PageSetObject::clone(Object *src, uint64_t srcoff,
PageSet::page_vector dst_pages;

while (len) {
const auto count = std::min(len, (uint64_t)src_page_size * 16);
// limit to 16 pages at a time so tls_pages doesn't balloon in size
auto count = std::min(len, (uint64_t)src_page_size * 16);
src_data.get_range(srcoff, count, tls_pages);

// allocate the destination range
// TODO: avoid allocating pages for holes in the source range
dst_data.alloc_range(srcoff + delta, count, dst_pages);
auto dst_iter = dst_pages.begin();

for (auto &src_page : tls_pages) {
auto sbegin = std::max(srcoff, src_page->offset);
auto send = std::min(srcoff + count, src_page->offset + src_page_size);
dst_data.alloc_range(sbegin + delta, send - sbegin, dst_pages);

// zero-fill holes before src_page
if (srcoff < sbegin) {
while (dst_iter != dst_pages.end()) {
auto &dst_page = *dst_iter;
auto dbegin = std::max(srcoff + delta, dst_page->offset);
auto dend = std::min(sbegin + delta, dst_page->offset + dst_page_size);
std::fill(dst_page->data + dbegin - dst_page->offset,
dst_page->data + dend - dst_page->offset, 0);
if (dend < dst_page->offset + dst_page_size)
break;
++dst_iter;
}
const auto c = sbegin - srcoff;
count -= c;
len -= c;
}

// copy data from src page to dst pages
for (auto &dst_page : dst_pages) {
while (dst_iter != dst_pages.end()) {
auto &dst_page = *dst_iter;
auto dbegin = std::max(sbegin + delta, dst_page->offset);
auto dend = std::min(send + delta, dst_page->offset + dst_page_size);

std::copy(src_page->data + (dbegin - delta) - src_page->offset,
src_page->data + (dend - delta) - src_page->offset,
dst_page->data + dbegin - dst_page->offset);
if (dend < dst_page->offset + dst_page_size)
break;
++dst_iter;
}
dst_pages.clear(); // drop page refs

const auto c = send - sbegin;
count -= c;
len -= c;
srcoff = send;
dstoff = send + delta;
}
srcoff += count;
dstoff += count;
len -= count;
tls_pages.clear(); // drop page refs

// zero-fill holes after the last src_page
if (count > 0) {
while (dst_iter != dst_pages.end()) {
auto &dst_page = *dst_iter;
auto dbegin = std::max(dstoff, dst_page->offset);
auto dend = std::min(dstoff + count, dst_page->offset + dst_page_size);
std::fill(dst_page->data + dbegin - dst_page->offset,
dst_page->data + dend - dst_page->offset, 0);
++dst_iter;
}
srcoff += count;
dstoff += count;
len -= count;
}
dst_pages.clear(); // drop page refs
}

// update object size
if (data_len < dstoff + len)
data_len = dstoff + len;
if (data_len < dstoff)
data_len = dstoff;
return 0;
}

Expand Down
4 changes: 4 additions & 0 deletions src/test/objectstore/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,7 @@ add_executable(unittest_transaction
add_ceph_unittest(unittest_transaction ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest_transaction)
target_link_libraries(unittest_transaction os common)

# unittest_memstore_clone
add_executable(unittest_memstore_clone test_memstore_clone.cc)
add_ceph_unittest(unittest_memstore_clone ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/unittest_memstore_clone)
target_link_libraries(unittest_memstore_clone os global)
224 changes: 224 additions & 0 deletions src/test/objectstore/test_memstore_clone.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
// vim: ts=8 sw=2 smarttab
/*
* Ceph - scalable distributed file system
*
* Copyright (C) 2015 Red Hat
*
* This is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License version 2.1, as published by the Free Software
* Foundation. See file COPYING.
*
*/
#include <boost/intrusive_ptr.hpp>
#include "global/global_init.h"
#include "common/ceph_argparse.h"
#include "os/ObjectStore.h"
#include <gtest/gtest.h>
#include "include/assert.h"
#include "common/errno.h"

namespace {

ObjectStore *g_store{nullptr};
const coll_t cid;

ghobject_t make_ghobject(const char *oid)
{
return ghobject_t{hobject_t{oid, "", CEPH_NOSNAP, 0, 0, ""}};
}

} // anonymous namespace

// src 11[11 11 11 11]11
// dst 22 22 22 22 22 22
// res 22 11 11 11 11 22
TEST(MemStore, CloneRangeAllocated)
{
ASSERT_TRUE(g_store);

const auto src = make_ghobject("src1");
const auto dst = make_ghobject("dst1");

bufferlist srcbl, dstbl, result, expected;
srcbl.append("111111111111");
dstbl.append("222222222222");
expected.append("221111111122");

ObjectStore::Transaction t;
t.write(cid, src, 0, 12, srcbl);
t.write(cid, dst, 0, 12, dstbl);
t.clone_range(cid, src, dst, 2, 8, 2);
ASSERT_EQ(0u, g_store->apply_transaction(nullptr, std::move(t)));
ASSERT_EQ(12, g_store->read(cid, dst, 0, 12, result));
ASSERT_EQ(expected, result);
}

// src __[__ __ __ __]__ 11 11
// dst 22 22 22 22 22 22
// res 22 00 00 00 00 22
TEST(MemStore, CloneRangeHole)
{
ASSERT_TRUE(g_store);

const auto src = make_ghobject("src2");
const auto dst = make_ghobject("dst2");

bufferlist srcbl, dstbl, result, expected;
srcbl.append("1111");
dstbl.append("222222222222");
expected.append("22\000\000\000\000\000\000\000\00022", 12);

ObjectStore::Transaction t;
t.write(cid, src, 12, 4, srcbl);
t.write(cid, dst, 0, 12, dstbl);
t.clone_range(cid, src, dst, 2, 8, 2);
ASSERT_EQ(0u, g_store->apply_transaction(nullptr, std::move(t)));
ASSERT_EQ(12, g_store->read(cid, dst, 0, 12, result));
ASSERT_EQ(expected, result);
}

// src __[__ __ __ 11]11
// dst 22 22 22 22 22 22
// res 22 00 00 00 11 22
TEST(MemStore, CloneRangeHoleStart)
{
ASSERT_TRUE(g_store);

const auto src = make_ghobject("src3");
const auto dst = make_ghobject("dst3");

bufferlist srcbl, dstbl, result, expected;
srcbl.append("1111");
dstbl.append("222222222222");
expected.append("22\000\000\000\000\000\0001122", 12);

ObjectStore::Transaction t;
t.write(cid, src, 8, 4, srcbl);
t.write(cid, dst, 0, 12, dstbl);
t.clone_range(cid, src, dst, 2, 8, 2);
ASSERT_EQ(0u, g_store->apply_transaction(nullptr, std::move(t)));
ASSERT_EQ(12, g_store->read(cid, dst, 0, 12, result));
ASSERT_EQ(expected, result);
}

// src 11[11 __ __ 11]11
// dst 22 22 22 22 22 22
// res 22 11 00 00 11 22
TEST(MemStore, CloneRangeHoleMiddle)
{
ASSERT_TRUE(g_store);

const auto src = make_ghobject("src4");
const auto dst = make_ghobject("dst4");

bufferlist srcbl, dstbl, result, expected;
srcbl.append("1111");
dstbl.append("222222222222");
expected.append("2211\000\000\000\0001122", 12);

ObjectStore::Transaction t;
t.write(cid, src, 0, 4, srcbl);
t.write(cid, src, 8, 4, srcbl);
t.write(cid, dst, 0, 12, dstbl);
t.clone_range(cid, src, dst, 2, 8, 2);
ASSERT_EQ(0u, g_store->apply_transaction(nullptr, std::move(t)));
ASSERT_EQ(12, g_store->read(cid, dst, 0, 12, result));
ASSERT_EQ(expected, result);
}

// src 11[11 __ __ __]__ 11 11
// dst 22 22 22 22 22 22
// res 22 11 00 00 00 22
TEST(MemStore, CloneRangeHoleEnd)
{
ASSERT_TRUE(g_store);

const auto src = make_ghobject("src5");
const auto dst = make_ghobject("dst5");

bufferlist srcbl, dstbl, result, expected;
srcbl.append("1111");
dstbl.append("222222222222");
expected.append("2211\000\000\000\000\000\00022", 12);

ObjectStore::Transaction t;
t.write(cid, src, 0, 4, srcbl);
t.write(cid, src, 12, 4, srcbl);
t.write(cid, dst, 0, 12, dstbl);
t.clone_range(cid, src, dst, 2, 8, 2);
ASSERT_EQ(0u, g_store->apply_transaction(nullptr, std::move(t)));
ASSERT_EQ(12, g_store->read(cid, dst, 0, 12, result));
ASSERT_EQ(expected, result);
}

// enable boost::intrusive_ptr<CephContext>
void intrusive_ptr_add_ref(CephContext *cct) { cct->get(); }
void intrusive_ptr_release(CephContext *cct) { cct->put(); }

int main(int argc, char** argv)
{
// default to memstore
vector<const char*> defaults{
"--osd_objectstore", "memstore",
"--osd_data", "memstore_clone_temp_dir",
"--memstore_page_size", "4",
};

vector<const char*> args;
argv_to_vec(argc, (const char **)argv, args);

global_init(&defaults, args, CEPH_ENTITY_TYPE_CLIENT,
CODE_ENVIRONMENT_UTILITY, 0);
common_init_finish(g_ceph_context);

// release g_ceph_context on exit
boost::intrusive_ptr<CephContext> cct{g_ceph_context, false};

// create and mount the objectstore
std::unique_ptr<ObjectStore> store{ObjectStore::create(
g_ceph_context,
g_conf->osd_objectstore,
g_conf->osd_data,
g_conf->osd_journal,
g_conf->osd_os_flags)};
if (!store) {
derr << "failed to create osd_objectstore=" << g_conf->osd_objectstore << dendl;
return EXIT_FAILURE;
}

int r = store->mkfs();
if (r < 0) {
derr << "failed to mkfs with " << cpp_strerror(r) << dendl;
return EXIT_FAILURE;
}

r = store->mount();
if (r < 0) {
derr << "failed to mount with " << cpp_strerror(r) << dendl;
return EXIT_FAILURE;
}
g_store = store.get();

ObjectStore::Transaction t;
t.create_collection(cid, 4);
r = store->apply_transaction(nullptr, std::move(t));
if (r < 0) {
derr << "failed to create collection with " << cpp_strerror(r) << dendl;
return EXIT_FAILURE;
}

// unmount the store on exit
auto umount = [] (ObjectStore *store) {
int r = store->umount();
if (r < 0) {
derr << "failed to unmount with " << cpp_strerror(r) << dendl;
}
g_store = nullptr;
};
std::unique_ptr<ObjectStore, decltype(umount)> umounter{store.get(), umount};

::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}