Skip to content

Commit

Permalink
Merge remote-tracking branch 'backports/pull/3407/head' into giant-ba…
Browse files Browse the repository at this point in the history
…ckports

Conflicts:
	src/test/librbd/test_librbd.cc
  • Loading branch information
ldachary committed Feb 2, 2015
2 parents db19475 + 068d688 commit 6b08a72
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 8 deletions.
47 changes: 39 additions & 8 deletions src/osdc/ObjectCacher.cc
Expand Up @@ -798,6 +798,8 @@ void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid, ceph_tid_t tid,
ldout(cct, 20) << "finishing waiters " << ls << dendl;

finish_contexts(cct, ls, err);
retry_waiting_reads();

--reads_outstanding;
read_cond.Signal();
}
Expand Down Expand Up @@ -1107,18 +1109,35 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
// TODO: make read path not call _readx for every completion
hits.insert(errors.begin(), errors.end());
}

if (!missing.empty() || !rx.empty()) {
// read missing
for (map<loff_t, BufferHead*>::iterator bh_it = missing.begin();
bh_it != missing.end();
++bh_it) {
bh_read(bh_it->second);
if (success && onfinish) {
ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
<< " off " << bh_it->first << dendl;
bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, oset, onfinish) );
}
uint64_t rx_bytes = static_cast<uint64_t>(
stat_rx + bh_it->second->length());
if (!waitfor_read.empty() || rx_bytes > max_size) {
// cache is full with concurrent reads -- wait for rx's to complete
// to constrain memory growth (especially during copy-ups)
if (success) {
ldout(cct, 10) << "readx missed, waiting on cache to complete "
<< waitfor_read.size() << " blocked reads, "
<< (MAX(rx_bytes, max_size) - max_size)
<< " read bytes" << dendl;
waitfor_read.push_back(new C_RetryRead(this, rd, oset, onfinish));
}

bh_remove(o, bh_it->second);
delete bh_it->second;
} else {
bh_read(bh_it->second);
if (success && onfinish) {
ldout(cct, 10) << "readx missed, waiting on " << *bh_it->second
<< " off " << bh_it->first << dendl;
bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, oset, onfinish) );
}
}
bytes_not_in_cache += bh_it->second->length();
success = false;
}
Expand Down Expand Up @@ -1232,7 +1251,7 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
// no misses... success! do the read.
assert(!hit_ls.empty());
ldout(cct, 10) << "readx has all buffers" << dendl;

// ok, assemble into result buffer.
uint64_t pos = 0;
if (rd->bl && !error) {
Expand Down Expand Up @@ -1265,6 +1284,18 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
return ret;
}

void ObjectCacher::retry_waiting_reads()
{
list<Context *> ls;
ls.swap(waitfor_read);

while (!ls.empty() && waitfor_read.empty()) {
Context *ctx = ls.front();
ls.pop_front();
ctx->complete(0);
}
waitfor_read.splice(waitfor_read.end(), ls);
}

int ObjectCacher::writex(OSDWrite *wr, ObjectSet *oset, Mutex& wait_on_lock,
Context *onfreespace)
Expand Down
3 changes: 3 additions & 0 deletions src/osdc/ObjectCacher.h
Expand Up @@ -341,6 +341,8 @@ class ObjectCacher {

vector<ceph::unordered_map<sobject_t, Object*> > objects; // indexed by pool_id

list<Context*> waitfor_read;

ceph_tid_t last_read_tid;

set<BufferHead*> dirty_or_tx_bh;
Expand Down Expand Up @@ -457,6 +459,7 @@ class ObjectCacher {

int _readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
bool external_call);
void retry_waiting_reads();

public:
void bh_read_finish(int64_t poolid, sobject_t oid, ceph_tid_t tid,
Expand Down
51 changes: 51 additions & 0 deletions src/test/librbd/test_librbd.cc
Expand Up @@ -21,6 +21,7 @@
#include "global/global_context.h"
#include "global/global_init.h"
#include "common/ceph_argparse.h"
#include "common/config.h"

#include "gtest/gtest.h"

Expand All @@ -40,6 +41,8 @@
#include "include/interval_set.h"
#include "include/stringify.h"

#include <boost/scope_exit.hpp>

using namespace std;

static int get_features(bool *old_format, uint64_t *features)
Expand Down Expand Up @@ -67,6 +70,8 @@ static int create_image_full(rados_ioctx_t ioctx, const char *name,
{
if (old_format) {
return rbd_create(ioctx, name, size, order);
} else if ((features & RBD_FEATURE_STRIPINGV2) != 0) {
return rbd_create3(ioctx, name, size, features, order, 65536, 16);
} else {
return rbd_create2(ioctx, name, size, features, order);
}
Expand Down Expand Up @@ -1910,6 +1915,52 @@ TEST(LibRBD, TestPendingAio)
ASSERT_EQ(0, destroy_one_pool(pool_name, &cluster));
}

TEST(LibRBD, LargeCacheRead)
{
if (!g_conf->rbd_cache) {
std::cout << "SKIPPING due to disabled cache" << std::endl;
return;
}

rados_t cluster;
rados_ioctx_t ioctx;
string pool_name = get_temp_pool_name();
ASSERT_EQ("", create_one_pool(pool_name, &cluster));
rados_ioctx_create(cluster, pool_name.c_str(), &ioctx);

uint64_t orig_cache_size = g_conf->rbd_cache_size;
g_conf->set_val("rbd_cache_size", "16777216");
BOOST_SCOPE_EXIT( (orig_cache_size) ) {
g_conf->set_val("rbd_cache_size", stringify(orig_cache_size).c_str());
} BOOST_SCOPE_EXIT_END;
ASSERT_EQ(16777216, g_conf->rbd_cache_size);

rbd_image_t image;
int order = 0;
const char *name = "testimg";
uint64_t size = g_conf->rbd_cache_size + 1;

ASSERT_EQ(0, create_image(ioctx, name, size, &order));
ASSERT_EQ(0, rbd_open(ioctx, name, &image, NULL));

std::string buffer(1 << order, '1');
for (size_t offs = 0; offs < size; offs += buffer.size()) {
size_t len = std::min<uint64_t>(buffer.size(), size - offs);
ASSERT_EQ(static_cast<ssize_t>(len),
rbd_write(image, offs, len, buffer.c_str()));
}

ASSERT_EQ(0, rbd_invalidate_cache(image));

buffer.resize(size);
ASSERT_EQ(static_cast<ssize_t>(size-1024), rbd_read(image, 1024, size, &buffer[0]));

ASSERT_EQ(0, rbd_close(image));

rados_ioctx_destroy(ioctx);
ASSERT_EQ(0, destroy_one_pool(pool_name, &cluster));
}

int main(int argc, char **argv)
{
::testing::InitGoogleTest(&argc, argv);
Expand Down

0 comments on commit 6b08a72

Please sign in to comment.