Skip to content
This repository has been archived by the owner on Mar 3, 2020. It is now read-only.

Commit

Permalink
Merge branch 'devel' into task/iscroggin/KEP-727
Browse files Browse the repository at this point in the history
  • Loading branch information
isabelsavannah committed Oct 23, 2018
2 parents a425584 + 25ce8e3 commit 042bda6
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 5 deletions.
17 changes: 13 additions & 4 deletions crud/subscription_manager.cpp
Expand Up @@ -140,7 +140,7 @@ subscription_manager::unsubscribe(const bzn::uuid_t& uuid, const bzn::key_t& key


void
subscription_manager::notify_sessions(const bzn::uuid_t& uuid, const bzn::key_t& key, const std::string& value)
subscription_manager::notify_sessions(const bzn::uuid_t& uuid, const bool update, const bzn::key_t& key, const std::string& value)
{
std::lock_guard<std::mutex> lock(this->subscribers_lock);

Expand All @@ -165,6 +165,15 @@ subscription_manager::notify_sessions(const bzn::uuid_t& uuid, const bzn::key_t&
resp.mutable_subscription_update()->set_value(value);
}

if (update)
{
resp.mutable_subscription_update()->set_operation(database_subscription_update::UPDATE);
}
else
{
resp.mutable_subscription_update()->set_operation(database_subscription_update::DELETE);
}

LOG(debug) << "notifying session [" << session_shared_ptr->get_session_id() << "] : " << uuid
<< ":" << key << ":" << subscription.first << ":" << value.substr(0, MAX_MESSAGE_SIZE);

Expand All @@ -183,15 +192,15 @@ subscription_manager::inspect_commit(const database_msg& msg)
switch (msg.msg_case())
{
case database_msg::kCreate:
this->notify_sessions(msg.header().db_uuid(), msg.create().key(), msg.create().value());
this->notify_sessions(msg.header().db_uuid(), true, msg.create().key(), msg.create().value());
break;

case database_msg::kUpdate:
this->notify_sessions(msg.header().db_uuid(), msg.update().key(), msg.update().value());
this->notify_sessions(msg.header().db_uuid(), true, msg.update().key(), msg.update().value());
break;

case database_msg::kDelete:
this->notify_sessions(msg.header().db_uuid(), msg.delete_().key(), "");
this->notify_sessions(msg.header().db_uuid(), false, msg.delete_().key(), "");
break;

default:
Expand Down
2 changes: 1 addition & 1 deletion crud/subscription_manager.hpp
Expand Up @@ -40,7 +40,7 @@ namespace bzn

void purge_closed_sessions(const boost::system::error_code& ec);

void notify_sessions(const bzn::uuid_t& uuid, const bzn::key_t& key, const std::string& value);
void notify_sessions(const bzn::uuid_t& uuid, bool update, const bzn::key_t& key, const std::string& value);

std::unordered_map<bzn::uuid_t, std::unordered_map<bzn::key_t, std::unordered_map<bzn::session_id, std::unordered_map<uint64_t, std::weak_ptr<bzn::session_base>>>>> subscribers;

Expand Down
9 changes: 9 additions & 0 deletions crud/test/subscription_manager_test.cpp
Expand Up @@ -199,6 +199,7 @@ TEST(subscription_manager, test_that_subscriber_is_notified_for_create_and_updat
ASSERT_EQ(resp.response_case(), database_response::kSubscriptionUpdate);
ASSERT_EQ(resp.subscription_update().key(), "0");
ASSERT_EQ(resp.subscription_update().value(), "0");
ASSERT_EQ(resp.subscription_update().operation(), database_subscription_update::UPDATE);
}));

EXPECT_CALL(*mock_session2, send_datagram(An<std::shared_ptr<std::string>>())).WillOnce(Invoke(
Expand All @@ -211,6 +212,7 @@ TEST(subscription_manager, test_that_subscriber_is_notified_for_create_and_updat
ASSERT_EQ(resp.response_case(), database_response::kSubscriptionUpdate);
ASSERT_EQ(resp.subscription_update().key(), "0");
ASSERT_EQ(resp.subscription_update().value(), "0");
ASSERT_EQ(resp.subscription_update().operation(), database_subscription_update::UPDATE);
}));

sm.inspect_commit(msg);
Expand All @@ -235,6 +237,7 @@ TEST(subscription_manager, test_that_subscriber_is_notified_for_create_and_updat
ASSERT_EQ(resp.response_case(), database_response::kSubscriptionUpdate);
ASSERT_EQ(resp.subscription_update().key(), "1");
ASSERT_EQ(resp.subscription_update().value(), "1");
ASSERT_EQ(resp.subscription_update().operation(), database_subscription_update::UPDATE);
}));

EXPECT_CALL(*mock_session2, send_datagram(An<std::shared_ptr<std::string>>())).WillOnce(Invoke(
Expand All @@ -247,6 +250,7 @@ TEST(subscription_manager, test_that_subscriber_is_notified_for_create_and_updat
ASSERT_EQ(resp.response_case(), database_response::kSubscriptionUpdate);
ASSERT_EQ(resp.subscription_update().key(), "1");
ASSERT_EQ(resp.subscription_update().value(), "1");
ASSERT_EQ(resp.subscription_update().operation(), database_subscription_update::UPDATE);
}));

sm.inspect_commit(msg);
Expand All @@ -270,6 +274,7 @@ TEST(subscription_manager, test_that_subscriber_is_notified_for_create_and_updat
ASSERT_EQ(resp.response_case(), database_response::kSubscriptionUpdate);
ASSERT_EQ(resp.subscription_update().key(), "1");
ASSERT_EQ(resp.subscription_update().value(), "");
ASSERT_EQ(resp.subscription_update().operation(), database_subscription_update::DELETE);
}));

EXPECT_CALL(*mock_session2, send_datagram(An<std::shared_ptr<std::string>>())).WillOnce(Invoke(
Expand All @@ -282,6 +287,7 @@ TEST(subscription_manager, test_that_subscriber_is_notified_for_create_and_updat
ASSERT_EQ(resp.response_case(), database_response::kSubscriptionUpdate);
ASSERT_EQ(resp.subscription_update().key(), "1");
ASSERT_EQ(resp.subscription_update().value(), "");
ASSERT_EQ(resp.subscription_update().operation(), database_subscription_update::DELETE);
}));

sm.inspect_commit(msg);
Expand Down Expand Up @@ -342,6 +348,7 @@ TEST(subscription_manager, test_that_dead_session_is_removed_from_subscriber_lis
ASSERT_EQ(resp.response_case(), database_response::kSubscriptionUpdate);
ASSERT_EQ(resp.subscription_update().key(), "0");
ASSERT_EQ(resp.subscription_update().value(), "0");
ASSERT_EQ(resp.subscription_update().operation(), database_subscription_update::UPDATE);
}));

EXPECT_CALL(*mock_session2, send_datagram(An<std::shared_ptr<std::string>>())).WillOnce(Invoke(
Expand All @@ -354,6 +361,7 @@ TEST(subscription_manager, test_that_dead_session_is_removed_from_subscriber_lis
ASSERT_EQ(resp.response_case(), database_response::kSubscriptionUpdate);
ASSERT_EQ(resp.subscription_update().key(), "0");
ASSERT_EQ(resp.subscription_update().value(), "0");
ASSERT_EQ(resp.subscription_update().operation(), database_subscription_update::UPDATE);
}));

sm->inspect_commit(msg);
Expand All @@ -375,6 +383,7 @@ TEST(subscription_manager, test_that_dead_session_is_removed_from_subscriber_lis
ASSERT_EQ(resp.response_case(), database_response::kSubscriptionUpdate);
ASSERT_EQ(resp.subscription_update().key(), "0");
ASSERT_EQ(resp.subscription_update().value(), "0");
ASSERT_EQ(resp.subscription_update().operation(), database_subscription_update::UPDATE);
}));

sm->inspect_commit(msg);
Expand Down
7 changes: 7 additions & 0 deletions proto/database.proto
Expand Up @@ -84,8 +84,15 @@ message database_has

message database_subscription_update
{
enum operation_type
{
UPDATE = 0;
DELETE = 1;
}

string key = 1;
bytes value = 2;
operation_type operation = 3;
}

///////////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit 042bda6

Please sign in to comment.