Skip to content

Commit

Permalink
Merge pull request #7832 from trociny/wip-journal-register-async
Browse files Browse the repository at this point in the history
journal: async methods to (un)register and update client

Reviewed-by: Jason Dillaman <dillaman@redhat.com>
  • Loading branch information
Jason Dillaman committed Feb 29, 2016
2 parents 7bc42aa + 9453967 commit 4b0ff1c
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 27 deletions.
14 changes: 10 additions & 4 deletions src/cls/journal/cls_journal_client.cc
Expand Up @@ -291,11 +291,17 @@ void client_update(librados::ObjectWriteOperation *op,

int client_unregister(librados::IoCtx &ioctx, const std::string &oid,
const std::string &id) {
bufferlist inbl;
::encode(id, inbl);
librados::ObjectWriteOperation op;
client_unregister(&op, id);
return ioctx.operate(oid, &op);
}

bufferlist outbl;
return ioctx.exec(oid, "journal", "client_unregister", inbl, outbl);
void client_unregister(librados::ObjectWriteOperation *op,
const std::string &id) {

bufferlist bl;
::encode(id, bl);
op->exec("journal", "client_unregister", bl);
}

void client_commit(librados::ObjectWriteOperation *op, const std::string &id,
Expand Down
2 changes: 2 additions & 0 deletions src/cls/journal/cls_journal_client.h
Expand Up @@ -52,6 +52,8 @@ void client_update(librados::ObjectWriteOperation *op,

int client_unregister(librados::IoCtx &ioctx, const std::string &oid,
const std::string &id);
void client_unregister(librados::ObjectWriteOperation *op,
const std::string &id);

void client_commit(librados::ObjectWriteOperation *op, const std::string &id,
const cls::journal::ObjectSetPosition &commit_position);
Expand Down
57 changes: 39 additions & 18 deletions src/journal/JournalMetadata.cc
Expand Up @@ -357,32 +357,53 @@ void JournalMetadata::get_mutable_metadata(uint64_t *minimum_set,
on_finish);
}

int JournalMetadata::register_client(const bufferlist &data) {
void JournalMetadata::register_client(const bufferlist &data,
Context *on_finish) {
ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
int r = client::client_register(m_ioctx, m_oid, m_client_id, data);
if (r < 0) {
lderr(m_cct) << "failed to register journal client '" << m_client_id
<< "': " << cpp_strerror(r) << dendl;
return r;
}
librados::ObjectWriteOperation op;
client::client_register(&op, m_client_id, data);

C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);

librados::AioCompletion *comp =
librados::Rados::aio_create_completion(ctx, NULL,
utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, comp, &op);
assert(r == 0);
comp->release();
}

void JournalMetadata::update_client(const bufferlist &data,
Context *on_finish) {
ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
librados::ObjectWriteOperation op;
client::client_update(&op, m_client_id, data);

C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);

notify_update();
return 0;
librados::AioCompletion *comp =
librados::Rados::aio_create_completion(ctx, NULL,
utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, comp, &op);
assert(r == 0);
comp->release();
}

int JournalMetadata::unregister_client() {
void JournalMetadata::unregister_client(Context *on_finish) {
assert(!m_client_id.empty());

ldout(m_cct, 10) << __func__ << ": " << m_client_id << dendl;
int r = client::client_unregister(m_ioctx, m_oid, m_client_id);
if (r < 0) {
lderr(m_cct) << "failed to unregister journal client '" << m_client_id
<< "': " << cpp_strerror(r) << dendl;
return r;
}
librados::ObjectWriteOperation op;
client::client_unregister(&op, m_client_id);

C_NotifyUpdate *ctx = new C_NotifyUpdate(this, on_finish);

notify_update();
return 0;
librados::AioCompletion *comp =
librados::Rados::aio_create_completion(ctx, NULL,
utils::rados_ctx_callback);
int r = m_ioctx.aio_operate(m_oid, comp, &op);
assert(r == 0);
comp->release();
}

void JournalMetadata::allocate_tag(uint64_t tag_class, const bufferlist &data,
Expand Down
5 changes: 3 additions & 2 deletions src/journal/JournalMetadata.h
Expand Up @@ -60,8 +60,9 @@ class JournalMetadata : public RefCountedObject, boost::noncopyable {
void add_listener(Listener *listener);
void remove_listener(Listener *listener);

int register_client(const bufferlist &data);
int unregister_client();
void register_client(const bufferlist &data, Context *on_finish);
void update_client(const bufferlist &data, Context *on_finish);
void unregister_client(Context *on_finish);

void allocate_tag(uint64_t tag_class, const bufferlist &data,
Tag *tag, Context *on_finish);
Expand Down
20 changes: 18 additions & 2 deletions src/journal/Journaler.cc
Expand Up @@ -176,11 +176,27 @@ void Journaler::flush_commit_position(Context *on_safe) {
}

int Journaler::register_client(const bufferlist &data) {
return m_metadata->register_client(data);
C_SaferCond cond;
register_client(data, &cond);
return cond.wait();
}

int Journaler::unregister_client() {
return m_metadata->unregister_client();
C_SaferCond cond;
unregister_client(&cond);
return cond.wait();
}

void Journaler::register_client(const bufferlist &data, Context *on_finish) {
return m_metadata->register_client(data, on_finish);
}

void Journaler::update_client(const bufferlist &data, Context *on_finish) {
return m_metadata->update_client(data, on_finish);
}

void Journaler::unregister_client(Context *on_finish) {
return m_metadata->unregister_client(on_finish);
}

void Journaler::allocate_tag(const bufferlist &data, cls::journal::Tag *tag,
Expand Down
3 changes: 3 additions & 0 deletions src/journal/Journaler.h
Expand Up @@ -53,6 +53,9 @@ class Journaler {

int register_client(const bufferlist &data);
int unregister_client();
void register_client(const bufferlist &data, Context *on_finish);
void update_client(const bufferlist &data, Context *on_finish);
void unregister_client(Context *on_finish);

void flush_commit_position(Context *on_safe);

Expand Down
46 changes: 45 additions & 1 deletion src/test/journal/test_Journaler.cc
Expand Up @@ -42,7 +42,25 @@ class TestJournaler : public RadosTestFixture {
journal::Journaler journaler(m_ioctx, m_journal_id, client_id, 5);
bufferlist data;
data.append(desc);
return journaler.register_client(data);
C_SaferCond cond;
journaler.register_client(data, &cond);
return cond.wait();
}

int update_client(const std::string &client_id, const std::string &desc) {
journal::Journaler journaler(m_ioctx, m_journal_id, client_id, 5);
bufferlist data;
data.append(desc);
C_SaferCond cond;
journaler.update_client(data, &cond);
return cond.wait();
}

int unregister_client(const std::string &client_id) {
journal::Journaler journaler(m_ioctx, m_journal_id, client_id, 5);
C_SaferCond cond;
journaler.unregister_client(&cond);
return cond.wait();
}

static uint64_t _journal_id;
Expand Down Expand Up @@ -80,10 +98,36 @@ TEST_F(TestJournaler, InitDNE) {
}

TEST_F(TestJournaler, RegisterClientDuplicate) {
ASSERT_EQ(0, create_journal(12, 8));
ASSERT_EQ(0, register_client(CLIENT_ID, "foo"));
ASSERT_EQ(-EEXIST, register_client(CLIENT_ID, "foo2"));
}

TEST_F(TestJournaler, UpdateClient) {
ASSERT_EQ(0, create_journal(12, 8));
ASSERT_EQ(0, register_client(CLIENT_ID, "foo"));
ASSERT_EQ(0, update_client(CLIENT_ID, "foo2"));
}

TEST_F(TestJournaler, UpdateClientDNE) {
ASSERT_EQ(0, create_journal(12, 8));
ASSERT_EQ(-ENOENT, update_client(CLIENT_ID, "foo"));
}

TEST_F(TestJournaler, UnregisterClient) {
ASSERT_EQ(0, create_journal(12, 8));
ASSERT_EQ(0, register_client(CLIENT_ID, "foo"));
ASSERT_EQ(0, unregister_client(CLIENT_ID));
// Test it does not exist and can be registered again
ASSERT_EQ(-ENOENT, update_client(CLIENT_ID, "foo"));
ASSERT_EQ(0, register_client(CLIENT_ID, "foo"));
}

TEST_F(TestJournaler, UnregisterClientDNE) {
ASSERT_EQ(0, create_journal(12, 8));
ASSERT_EQ(-ENOENT, unregister_client(CLIENT_ID));
}

TEST_F(TestJournaler, AllocateTag) {
ASSERT_EQ(0, create_journal(12, 8));

Expand Down

0 comments on commit 4b0ff1c

Please sign in to comment.