Skip to content

Commit

Permalink
Create a class template for publish subscribe pattern (#443)
Browse files Browse the repository at this point in the history
  • Loading branch information
an-tao committed May 24, 2020
1 parent 84e503a commit 49472a3
Show file tree
Hide file tree
Showing 5 changed files with 304 additions and 4 deletions.
3 changes: 2 additions & 1 deletion CMakeLists.txt
Expand Up @@ -382,7 +382,8 @@ set(DROGON_HEADERS
lib/inc/drogon/WebSocketController.h
lib/inc/drogon/drogon.h
lib/inc/drogon/version.h
lib/inc/drogon/drogon_callbacks.h)
lib/inc/drogon/drogon_callbacks.h
lib/inc/drogon/PubSubService.h)
install(FILES ${DROGON_HEADERS} DESTINATION ${INSTALL_INCLUDE_DIR}/drogon)

set(ORM_HEADERS
Expand Down
272 changes: 272 additions & 0 deletions lib/inc/drogon/PubSubService.h
@@ -0,0 +1,272 @@
/**
*
* PubSubService.h
* An Tao
*
* Copyright 2018, An Tao. All rights reserved.
* https://github.com/an-tao/drogon
* Use of this source code is governed by a MIT license
* that can be found in the License file.
*
* Drogon
*
*/

#pragma once

#include <trantor/utils/NonCopyable.h>
#include <string>
#include <unordered_map>
#include <functional>
#include <shared_mutex>

namespace drogon
{
using SubscriberID = uint64_t;

/**
* @brief This class template presents an unnamed topic.
*
* @tparam MessageType
*/
template <typename MessageType>
class Topic : public trantor::NonCopyable
{
public:
using MessageHandler = std::function<void(const MessageType &)>;
#if __cplusplus >= 201703L | defined _WIN32
using SharedMutex = std::shared_mutex;
#else
using SharedMutex = std::shared_timed_mutex;
#endif
/**
* @brief Publish a message, every subscriber in the topic will receive the
* message.
*
* @param message
*/
void publish(const MessageType &message) const
{
std::shared_lock<SharedMutex> lock(mutex_);
for (auto &pair : handlersMap_)
{
pair.second(message);
}
}

/**
* @brief Subcribe to the topic.
*
* @param handler is invoked when a message arrives.
* @return SubscriberID
*/
SubscriberID subscribe(const MessageHandler &handler)
{
std::unique_lock<SharedMutex> lock(mutex_);
handlersMap_[++id_] = handler;
return id_;
}

/**
* @brief Subcribe to the topic.
*
* @param handler is invoked when a message arrives.
* @return SubscriberID
*/
SubscriberID subscribe(MessageHandler &&handler)
{
std::unique_lock<SharedMutex> lock(mutex_);
handlersMap_[++id_] = std::move(handler);
return id_;
}

/**
* @brief Unsubscribe from the topic.
*/
void unsubscribe(SubscriberID id)
{
std::unique_lock<SharedMutex> lock(mutex_);
handlersMap_.erase(id);
}

/**
* @brief Check if the topic is empty.
*
* @return true means there are no subscribers.
* @return false means there are subscribers in the topic.
*/
bool empty() const
{
std::shared_lock<SharedMutex> lock(mutex_);
return handlersMap_.empty();
}
/**
* @brief Remove all subscribers from the topic.
*
*/
void clear()
{
std::unique_lock<SharedMutex> lock(mutex_);
handlersMap_.clear();
}

private:
std::unordered_map<SubscriberID, MessageHandler> handlersMap_;
mutable SharedMutex mutex_;
SubscriberID id_;
};

/**
* @brief This class template implements a publish-subscribe pattern with
* multiple named topics.
*
* @tparam MessageType The message type.
*/
template <typename MessageType>
class PubSubService : public trantor::NonCopyable
{
public:
using MessageHandler =
std::function<void(const std::string &, const MessageType &)>;
#if __cplusplus >= 201703L | defined _WIN32
using SharedMutex = std::shared_mutex;
#else
using SharedMutex = std::shared_timed_mutex;
#endif

/**
* @brief Publish a message to a topic. The message will be broadcasted to
* every subscriber.
*/
void publish(const std::string &topicName, const MessageType &message) const
{
std::shared_ptr<Topic<MessageType>> topicPtr;
{
std::shared_lock<SharedMutex> lock(mutex_);
auto iter = topicMap_.find(topicName);
if (iter != topicMap_.end())
{
topicPtr = iter->second;
}
else
{
return;
}
}
topicPtr->publish(message);
}

/**
* @brief Subscribe to a topic. When a message is published to the topic,
* the handler is invoked by passing the topic and message as parameters.
*/
SubscriberID subscribe(const std::string &topicName,
const MessageHandler &handler)
{
auto topicHandler = [topicName, handler](const MessageType &message) {
handler(topicName, message);
};
return subscribeToTopic(topicName, std::move(topicHandler));
}

/**
* @brief Subscribe to a topic. When a message is published to the topic,
* the handler is invoked by passing the topic and message as parameters.
*/
SubscriberID subscribe(const std::string &topicName,
MessageHandler &&handler)
{
auto topicHandler = [topicName, handler = std::move(handler)](
const MessageType &message) {
handler(topicName, message);
};
return subscribeToTopic(topicName, std::move(topicHandler));
}

/**
* @brief Unsubscribe from a topic.
*
* @param topic
* @param id The subscriber ID returned from the subscribe method.
*/
void unsubscribe(const std::string &topicName, SubscriberID id)
{
{
std::shared_lock<SharedMutex> lock(mutex_);
auto iter = topicMap_.find(topicName);
if (iter == topicMap_.end())
{
return;
}
iter->second->unsubscribe(id);
if (!iter->second->empty())
return;
}
std::unique_lock<SharedMutex> lock(mutex_);
auto iter = topicMap_.find(topicName);
if (iter == topicMap_.end())
{
return;
}
if (iter->second->empty())
topicMap_.erase(iter);
}

/**
* @brief return the number of topics.
*/
size_t size() const
{
std::shared_lock<SharedMutex> lock(mutex_);
return topicMap_.size();
}

/**
* @brief remove all topics.
*/
void clear()
{
std::unique_lock<SharedMutex> lock(mutex_);
topicMap_.clear();
}

/**
* @brief Remove a topic
*
*/
void removeTopic(const std::string &topicName)
{
std::unique_lock<SharedMutex> lock(mutex_);
topicMap_.erase(topicName);
}

private:
std::unordered_map<std::string, std::shared_ptr<Topic<MessageType>>>
topicMap_;
mutable SharedMutex mutex_;
SubscriberID subID_ = 0;
SubscriberID subscribeToTopic(
const std::string &topicName,
typename Topic<MessageType>::MessageHandler &&handler)
{
{
std::shared_lock<SharedMutex> lock(mutex_);
auto iter = topicMap_.find(topicName);
if (iter != topicMap_.end())
{
return iter->second->subscribe(std::move(handler));
}
}
std::unique_lock<SharedMutex> lock(mutex_);
auto iter = topicMap_.find(topicName);
if (iter != topicMap_.end())
{
return iter->second->subscribe(std::move(handler));
}
auto topicPtr = std::make_shared<Topic<MessageType>>();
auto id = topicPtr->subscribe(std::move(handler));
topicMap_[topicName] = std::move(topicPtr);
return id;
}
};
} // namespace drogon
4 changes: 2 additions & 2 deletions lib/src/Utilities.cc
Expand Up @@ -337,11 +337,11 @@ std::string getUuid()
return ret;
#elif defined __FreeBSD__
uuid_t *uuid = new uuid_t;
char* binstr = (char *) malloc(16);
char *binstr = (char *)malloc(16);
uuidgen(uuid, 1);
#if _BYTE_ORDER == _LITTLE_ENDIAN
uuid_enc_le(binstr, uuid);
#else /* _BYTE_ORDER != _LITTLE_ENDIAN */
#else /* _BYTE_ORDER != _LITTLE_ENDIAN */
uuid_enc_be(binstr, uuid);
#endif /* _BYTE_ORDER == _LITTLE_ENDIAN */
delete uuid;
Expand Down
4 changes: 3 additions & 1 deletion unittest/CMakeLists.txt
Expand Up @@ -5,6 +5,7 @@ add_executable(md5_unittest MD5Unittest.cpp ../lib/src/ssl_funcs/Md5.cc)
add_executable(sha1_unittest SHA1Unittest.cpp ../lib/src/ssl_funcs/Sha1.cc)
add_executable(ostringstream_unittest OStringStreamUnitttest.cpp)
add_executable(base64_unittest Base64Unittest.cpp)
add_executable(pubsubservice_unittest PubSubServiceUnittest.cpp)
if(Brotli_FOUND)
add_executable(brotli_unittest BrotliUnittest.cpp)
endif()
Expand All @@ -16,7 +17,8 @@ set(UNITTEST_TARGETS
md5_unittest
sha1_unittest
ostringstream_unittest
base64_unittest)
base64_unittest
pubsubservice_unittest)
if(Brotli_FOUND)
set(UNITTEST_TARGETS ${UNITTEST_TARGETS} brotli_unittest)
endif()
Expand Down
25 changes: 25 additions & 0 deletions unittest/PubSubServiceUnittest.cpp
@@ -0,0 +1,25 @@
#include <gtest/gtest.h>
#include <drogon/PubSubService.h>
#include <string>
#include <iostream>

TEST(PubSubServiceTest, normal)
{
drogon::PubSubService<std::string> service;
auto id=service.subscribe("topic1",
[](const std::string &topic, const std::string &message) {
EXPECT_STREQ(topic.c_str(), "topic1");
EXPECT_STREQ(message.c_str(), "hello world");
});
service.publish("topic1", "hello world");
service.publish("topic2", "hello world");
EXPECT_EQ(service.size(), 1);
service.unsubscribe("topic1", id);
EXPECT_EQ(service.size(), 0);
}

int main(int argc, char **argv)
{
testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

0 comments on commit 49472a3

Please sign in to comment.