Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ script:
- cd Build
- make test
- ctest -T Memcheck
- cat Testing/Temporary/MemoryChecker.*.log
- cat Testing/Temporary/*.log
- cd ..
- ./CMakeUtils/travis/doCoverage.sh
7 changes: 7 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@ add_library(CPPWebSocketResponseRequest STATIC
include/io_thread.h
include/stream_client.h
include/IOneShotConnectionConsumer.h
include/OpenConnections.h
src/io_thread.cpp
src/ReqFileList.cpp
src/ReqServer.cpp
src/ReqSvrRequest.cpp
src/WebPPSingleThreadOneShotClient.cpp
src/stream_client.cpp
src/OpenConnections.cpp
)
target_link_libraries(CPPWebSocketResponseRequest PUBLIC
FixedJSON::FixedJSON
Expand All @@ -55,6 +57,7 @@ set_property(TARGET CPPWebSocketResponseRequest PROPERTY PUBLIC_HEADER
${CPPWebSocketResponseRequest_SOURCE_DIR}/include/io_thread.h
${CPPWebSocketResponseRequest_SOURCE_DIR}/include/stream_client.h
${CPPWebSocketResponseRequest_SOURCE_DIR}/include/IOneShotConnectionConsumer.h
${CPPWebSocketResponseRequest_SOURCE_DIR}/include/OpenConnections.h
)

#
Expand All @@ -65,8 +68,12 @@ find_package(GTest REQUIRED)
add_executable(requestReply test/requestReply.cpp)
target_link_libraries(requestReply CPPWebSocketResponseRequest GTest::GTest)

add_executable(sub test/sub.cpp)
target_link_libraries(sub CPPWebSocketResponseRequest GTest::GTest)

enable_testing()
add_test(requestReply requestReply)
add_test(sub sub)

#
# NOTE: Valgrind must be configured *before* testing is imported
Expand Down
14 changes: 14 additions & 0 deletions include/OpenConnections.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#ifndef CPPWEBSOCKETRESPONSEREQUEST_OPENCONNECTIONS_H
#define CPPWEBSOCKETRESPONSEREQUEST_OPENCONNECTIONS_H
#include <ReqServer.h>

class OpenConnectionsList {
public:
void Add(SubscriptionHandler::RequestHandle hdl);

void Publish(const std::string& msg);
private:
std::vector<SubscriptionHandler::RequestHandle> hdls;
};

#endif //CPPWEBSOCKETRESPONSEREQUEST_OPENCONNECTIONS_H
23 changes: 18 additions & 5 deletions include/ReqServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,8 @@ class RequestServer: public IPostable {
const std::string& requestName,
std::unique_ptr<RequestReplyHandler> handler);

void AddHandler(
const std::string& requestName,
std::unique_ptr<SubscriptionHandler> handler);
void AddHandler( const std::string& requestName,
std::shared_ptr<SubscriptionHandler> handler);

/**
* Run the event loop, handle any incoming requests or posted tasks
Expand Down Expand Up @@ -141,6 +140,7 @@ class RequestServer: public IPostable {
* Bail out of the event loop...
*/
void Stop();
void StopNoBlock();

std::string HandleRequestReplyMessage(
const std::string& reqName,
Expand All @@ -162,12 +162,25 @@ class RequestServer: public IPostable {
std::future<bool> stopped;

std::map<std::string,std::unique_ptr<RequestReplyHandler>> req_handlers;
std::map<std::string,std::unique_ptr<SubscriptionHandler>> sub_handlers;
std::map<std::string,std::shared_ptr<SubscriptionHandler>> sub_handlers;

/*
* Maps an active connection to
*/
std::map<void*,SubscriptionHandler::RequestHandle> conn_map;
struct StoredHdl{
StoredHdl(websocketpp::connection_hdl hdl)
: raw(hdl.lock().get())
, hdl(hdl) {}

void * raw;
websocketpp::connection_hdl hdl;

bool operator<(const StoredHdl& rhs) const {
return (raw < rhs.raw);
}
};
std::mutex mapGuard;
std::map<StoredHdl,SubscriptionHandler::RequestHandle> conn_map;

// Error tracking
bool failed;
Expand Down
1 change: 0 additions & 1 deletion include/io_thread.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#ifndef DEV_TOOLS_CPP_LIBS_WEBSOCKETS_IO_THREAD_H__
#define DEV_TOOLS_CPP_LIBS_WEBSOCKETS_IO_THREAD_H__

#include <stream_client.h>
#include <boost/asio.hpp>
#include <memory>
#include <thread>
Expand Down
51 changes: 33 additions & 18 deletions include/stream_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <websocketpp/client.hpp>
#include <atomic>
#include <thread>
#include <future>

/**
* Subscribes to a websocket and triggers a call-back for each message.
Expand All @@ -16,15 +17,32 @@
class StreamClient {
public:
/**
* C'tor - connect to the specified url
* C'tor - variant for connecting to a SubsriptionServer
*
* connect to the specified url, and logon with the specified request and message
*/
StreamClient(const std::string& url);
StreamClient(
std::string url,
std::string subName,
std::string subBody
);

virtual ~StreamClient();

/**
* Indicates if the query is currently live
*/
bool Running();

/**
* Wait until the server is running...
*/
bool WaitUntilRunning();

void Ping(unsigned int timeout);

struct InvalidUrlException {};

protected:
/**
* Run the event loop - (start the query)
Expand All @@ -35,6 +53,7 @@ class StreamClient {
* Stop the event loop
*/
void Stop();
void StopNonBlock();

/**
* Call-back triggered when a new message is received.
Expand All @@ -45,40 +64,36 @@ class StreamClient {

private:
typedef websocketpp::config::asio_tls_client::message_type::ptr message_ptr;
typedef websocketpp::lib::shared_ptr<boost::asio::ssl::context> context_ptr;


/**
* Call-back to intialise the encryption
* Call-back to notify us of a successful subscription connection
*/
context_ptr on_tls_init(websocketpp::connection_hdl);
void on_sub_open(websocketpp::connection_hdl hdl);

/**
* Call-back to notify us of a successful connection
* Call-back when a ping request has timed out
*/
void on_open(websocketpp::connection_hdl hdl);
void on_pong_timeout(websocketpp::connection_hdl hdl, std::string msg);

/**
* Call-back triggered by the arrival of a new message
*/
void on_message(websocketpp::connection_hdl hdl, message_ptr msg);

typedef websocketpp::client<websocketpp::config::asio_tls_client> client;
typedef websocketpp::client<websocketpp::config::asio_client> client;

std::atomic<bool> running;
client m_endpoint;
client::connection_ptr con;
std::string subName;
std::string subBody;
std::string url;
std::promise<bool> startFlag;
std::future<bool> futureStart;
std::promise<bool> stopFlag;
std::future<bool> futureStop;

};

class StreamClientThread: public StreamClient {
public:
StreamClientThread(const std::string& url);

~StreamClientThread();
private:
std::thread io_thread;
};


#endif
19 changes: 19 additions & 0 deletions src/OpenConnections.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#include <OpenConnections.h>
#include <OpenConnections.h>

void OpenConnectionsList::Add(SubscriptionHandler::RequestHandle hdl) {
hdls.emplace_back(hdl);
}

void OpenConnectionsList::Publish(const std::string &msg) {
std::vector<SubscriptionHandler::RequestHandle> newHdls;
newHdls.reserve(hdls.size());
for (auto& hdl: hdls) {
if (hdl->Ok()) {
hdl->SendMessage(msg);
newHdls.emplace_back(std::move(hdl));
}
}

hdls = std::move(newHdls);
}
56 changes: 37 additions & 19 deletions src/ReqServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <SimpleJSON.h>
#include <logger.h>
#include <OSTools.h>
#include <ReqServer.h>


/**************************************************************
* Utilities
Expand Down Expand Up @@ -85,7 +87,7 @@ class Request: public SubscriptionHandler::SubRequest {
Server* s,
websocketpp::connection_hdl c)

: request(req), serv(s), conn(serv->get_con_from_hdl(c)), open(true)
: request(req), serv(s), conn(c), open(true)
{
}

Expand All @@ -99,10 +101,9 @@ class Request: public SubscriptionHandler::SubRequest {
* Send a data update, a JSON message, down the pipe
*/
void SendMessage(const std::string& msg) {
SLOG_FROM(LOG_VERBOSE,"<< Request::SendMessage",
"Sending Message to:" << conn.get() << endl <<
"Message: " << msg << endl);
serv->send(conn,msg,websocketpp::frame::opcode::TEXT);
if (Ok()) {
serv->send(conn,msg,websocketpp::frame::opcode::TEXT);
}
}

bool Ok() const { return open; }
Expand All @@ -114,7 +115,7 @@ class Request: public SubscriptionHandler::SubRequest {
private:
std::string request;
Server* serv;
Server::connection_ptr conn;
websocketpp::connection_hdl conn;
bool open;
};

Expand Down Expand Up @@ -144,9 +145,9 @@ void RequestServer::AddHandler(

void RequestServer::AddHandler(
const std::string& requestName,
std::unique_ptr<SubscriptionHandler> handler)
std::shared_ptr<SubscriptionHandler> handler)
{
sub_handlers[requestName].reset(handler.release());
sub_handlers[requestName] = std::move(handler);
}

std::string RequestServer::HandleMessage(
Expand Down Expand Up @@ -198,15 +199,13 @@ std::string RequestServer::HandleRequestReplyMessage(
}

void RequestServer::HandleClose(websocketpp::connection_hdl hdl) {
void* conn = hdl.lock().get();

auto it = conn_map.find(conn);
std::unique_lock<std::mutex> mapLock(mapGuard);
auto it = conn_map.find(StoredHdl(hdl));

if (it != conn_map.end()) {
it->second->Close();
conn_map.erase(it);
}

}

RequestServer::~RequestServer() {
Expand All @@ -219,21 +218,39 @@ void RequestServer::WaitUntilRunning()
}

void RequestServer::Stop() {
StopNoBlock();
stopped.wait();
}

void RequestServer::StopNoBlock() {
websocketpp::lib::error_code ec;
requestServer_.stop_listening(ec);
std::vector<websocketpp::connection_hdl> toClose;
toClose.reserve(conn_map.size());
{
std::unique_lock<std::mutex> mapLock(mapGuard);

for (auto& pair: conn_map) {
toClose.push_back(pair.first.hdl);
}
for (auto& hdl: toClose) {
requestServer_.close(
hdl,
websocketpp::close::status::internal_endpoint_error,
"Stopping",
ec);
}
conn_map.clear();
}
requestServer_.stop();
stopped.wait();
requestServer_.get_io_service().stop();
requestServer_.get_io_service().reset();

}

void RequestServer::FatalError(int code, std::string message) {
failed = true;
failCode = code;
failMsg = std::move(message);
websocketpp::lib::error_code ec;
requestServer_.stop_listening(ec);
requestServer_.stop();
StopNoBlock();
}

std::string RequestServer::HandleSubscriptionMessage(
Expand All @@ -253,7 +270,7 @@ std::string RequestServer::HandleSubscriptionMessage(
try {
SubscriptionHandler::RequestHandle reqHdl(
new Request(jsonRequest,raw_server,hdl));
conn_map.insert({hdl.lock().get(), reqHdl});
conn_map.insert({hdl, reqHdl});
handler.OnRequest(reqHdl);
} catch (const SubscriptionHandler::InvalidRequestException& e) {
response = ErrorMessage(e.errMsg, e.code);
Expand Down Expand Up @@ -312,3 +329,4 @@ void RequestServer::PostTask(const RequestServer::InteruptHandler& f) {

serv.post(f);
}

Loading