diff --git a/crud/subscription_manager.cpp b/crud/subscription_manager.cpp index 2474b0fe..fd4540dc 100644 --- a/crud/subscription_manager.cpp +++ b/crud/subscription_manager.cpp @@ -140,7 +140,7 @@ subscription_manager::unsubscribe(const bzn::uuid_t& uuid, const bzn::key_t& key void -subscription_manager::notify_sessions(const bzn::uuid_t& uuid, const bzn::key_t& key, const std::string& value) +subscription_manager::notify_sessions(const bzn::uuid_t& uuid, const bool update, const bzn::key_t& key, const std::string& value) { std::lock_guard lock(this->subscribers_lock); @@ -165,6 +165,15 @@ subscription_manager::notify_sessions(const bzn::uuid_t& uuid, const bzn::key_t& resp.mutable_subscription_update()->set_value(value); } + if (update) + { + resp.mutable_subscription_update()->set_operation(database_subscription_update::UPDATE); + } + else + { + resp.mutable_subscription_update()->set_operation(database_subscription_update::DELETE); + } + LOG(debug) << "notifying session [" << session_shared_ptr->get_session_id() << "] : " << uuid << ":" << key << ":" << subscription.first << ":" << value.substr(0, MAX_MESSAGE_SIZE); @@ -183,15 +192,15 @@ subscription_manager::inspect_commit(const database_msg& msg) switch (msg.msg_case()) { case database_msg::kCreate: - this->notify_sessions(msg.header().db_uuid(), msg.create().key(), msg.create().value()); + this->notify_sessions(msg.header().db_uuid(), true, msg.create().key(), msg.create().value()); break; case database_msg::kUpdate: - this->notify_sessions(msg.header().db_uuid(), msg.update().key(), msg.update().value()); + this->notify_sessions(msg.header().db_uuid(), true, msg.update().key(), msg.update().value()); break; case database_msg::kDelete: - this->notify_sessions(msg.header().db_uuid(), msg.delete_().key(), ""); + this->notify_sessions(msg.header().db_uuid(), false, msg.delete_().key(), ""); break; default: diff --git a/crud/subscription_manager.hpp b/crud/subscription_manager.hpp index a42ae451..e382dee1 100644 --- a/crud/subscription_manager.hpp +++ b/crud/subscription_manager.hpp @@ -40,7 +40,7 @@ namespace bzn void purge_closed_sessions(const boost::system::error_code& ec); - void notify_sessions(const bzn::uuid_t& uuid, const bzn::key_t& key, const std::string& value); + void notify_sessions(const bzn::uuid_t& uuid, bool update, const bzn::key_t& key, const std::string& value); std::unordered_map>>>> subscribers; diff --git a/crud/test/subscription_manager_test.cpp b/crud/test/subscription_manager_test.cpp index a426279a..dfb7bccd 100644 --- a/crud/test/subscription_manager_test.cpp +++ b/crud/test/subscription_manager_test.cpp @@ -199,6 +199,7 @@ TEST(subscription_manager, test_that_subscriber_is_notified_for_create_and_updat ASSERT_EQ(resp.response_case(), database_response::kSubscriptionUpdate); ASSERT_EQ(resp.subscription_update().key(), "0"); ASSERT_EQ(resp.subscription_update().value(), "0"); + ASSERT_EQ(resp.subscription_update().operation(), database_subscription_update::UPDATE); })); EXPECT_CALL(*mock_session2, send_datagram(An>())).WillOnce(Invoke( @@ -211,6 +212,7 @@ TEST(subscription_manager, test_that_subscriber_is_notified_for_create_and_updat ASSERT_EQ(resp.response_case(), database_response::kSubscriptionUpdate); ASSERT_EQ(resp.subscription_update().key(), "0"); ASSERT_EQ(resp.subscription_update().value(), "0"); + ASSERT_EQ(resp.subscription_update().operation(), database_subscription_update::UPDATE); })); sm.inspect_commit(msg); @@ -235,6 +237,7 @@ TEST(subscription_manager, test_that_subscriber_is_notified_for_create_and_updat ASSERT_EQ(resp.response_case(), database_response::kSubscriptionUpdate); ASSERT_EQ(resp.subscription_update().key(), "1"); ASSERT_EQ(resp.subscription_update().value(), "1"); + ASSERT_EQ(resp.subscription_update().operation(), database_subscription_update::UPDATE); })); EXPECT_CALL(*mock_session2, send_datagram(An>())).WillOnce(Invoke( @@ -247,6 +250,7 @@ TEST(subscription_manager, test_that_subscriber_is_notified_for_create_and_updat ASSERT_EQ(resp.response_case(), database_response::kSubscriptionUpdate); ASSERT_EQ(resp.subscription_update().key(), "1"); ASSERT_EQ(resp.subscription_update().value(), "1"); + ASSERT_EQ(resp.subscription_update().operation(), database_subscription_update::UPDATE); })); sm.inspect_commit(msg); @@ -270,6 +274,7 @@ TEST(subscription_manager, test_that_subscriber_is_notified_for_create_and_updat ASSERT_EQ(resp.response_case(), database_response::kSubscriptionUpdate); ASSERT_EQ(resp.subscription_update().key(), "1"); ASSERT_EQ(resp.subscription_update().value(), ""); + ASSERT_EQ(resp.subscription_update().operation(), database_subscription_update::DELETE); })); EXPECT_CALL(*mock_session2, send_datagram(An>())).WillOnce(Invoke( @@ -282,6 +287,7 @@ TEST(subscription_manager, test_that_subscriber_is_notified_for_create_and_updat ASSERT_EQ(resp.response_case(), database_response::kSubscriptionUpdate); ASSERT_EQ(resp.subscription_update().key(), "1"); ASSERT_EQ(resp.subscription_update().value(), ""); + ASSERT_EQ(resp.subscription_update().operation(), database_subscription_update::DELETE); })); sm.inspect_commit(msg); @@ -342,6 +348,7 @@ TEST(subscription_manager, test_that_dead_session_is_removed_from_subscriber_lis ASSERT_EQ(resp.response_case(), database_response::kSubscriptionUpdate); ASSERT_EQ(resp.subscription_update().key(), "0"); ASSERT_EQ(resp.subscription_update().value(), "0"); + ASSERT_EQ(resp.subscription_update().operation(), database_subscription_update::UPDATE); })); EXPECT_CALL(*mock_session2, send_datagram(An>())).WillOnce(Invoke( @@ -354,6 +361,7 @@ TEST(subscription_manager, test_that_dead_session_is_removed_from_subscriber_lis ASSERT_EQ(resp.response_case(), database_response::kSubscriptionUpdate); ASSERT_EQ(resp.subscription_update().key(), "0"); ASSERT_EQ(resp.subscription_update().value(), "0"); + ASSERT_EQ(resp.subscription_update().operation(), database_subscription_update::UPDATE); })); sm->inspect_commit(msg); @@ -375,6 +383,7 @@ TEST(subscription_manager, test_that_dead_session_is_removed_from_subscriber_lis ASSERT_EQ(resp.response_case(), database_response::kSubscriptionUpdate); ASSERT_EQ(resp.subscription_update().key(), "0"); ASSERT_EQ(resp.subscription_update().value(), "0"); + ASSERT_EQ(resp.subscription_update().operation(), database_subscription_update::UPDATE); })); sm->inspect_commit(msg); diff --git a/proto/database.proto b/proto/database.proto index 4a25966f..42a17077 100644 --- a/proto/database.proto +++ b/proto/database.proto @@ -84,8 +84,15 @@ message database_has message database_subscription_update { + enum operation_type + { + UPDATE = 0; + DELETE = 1; + } + string key = 1; bytes value = 2; + operation_type operation = 3; } ///////////////////////////////////////////////////////////////////////////////