Skip to content

Commit

Permalink
Merge pull request #248 from wrieg123/wrr/update_ws_backend
Browse files Browse the repository at this point in the history
Move websocket backend to boost::beast
  • Loading branch information
wrieg123 committed May 24, 2024
2 parents ece8c1a + 89b20bb commit 69514cb
Show file tree
Hide file tree
Showing 17 changed files with 758 additions and 586 deletions.
533 changes: 249 additions & 284 deletions NOTICE

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions conda/dev-environment-unix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ dependencies:
- httpx>=0.20,<1
- isort>=5,<6
- libarrow=16
- libboost>=1.80.0
- libboost-headers>=1.80.0
- librdkafka
- libboost-headers
- lz4-c
- mamba
- mdformat>=0.7.17,<0.8
Expand Down Expand Up @@ -51,5 +52,4 @@ dependencies:
- twine
- unzip
- wheel
- websocketpp
- zip
4 changes: 2 additions & 2 deletions conda/dev-environment-win.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ dependencies:
- httpx>=0.20,<1
- isort>=5,<6
- libarrow=16
- libboost-headers
- libboost>=1.80.0
- libboost-headers>=1.80.0
- librdkafka
- lz4-c
- make
Expand Down Expand Up @@ -48,4 +49,3 @@ dependencies:
- tornado
- twine
- wheel
- websocketpp
11 changes: 3 additions & 8 deletions cpp/csp/adapters/websocket/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
csp_autogen( csp.adapters.websocket_types websocket_types WEBSOCKET_HEADER WEBSOCKET_SOURCE )

# Need to build websocket adapter under cpp17 standard due to websocketpp incompatibility issues
set(CMAKE_CXX_STANDARD 17)

set(WS_CLIENT_HEADER_FILES
ClientAdapterManager.h
ClientInputAdapter.h
Expand All @@ -25,16 +22,14 @@ set(WS_CLIENT_SOURCE_FILES
add_library(csp_websocket_client_adapter STATIC ${WS_CLIENT_SOURCE_FILES})
set_target_properties(csp_websocket_client_adapter PROPERTIES PUBLIC_HEADER "${WS_CLIENT_SOURCE_FILES}")

find_package(websocketpp REQUIRED)
#set(OPENSSL_USE_STATIC_LIBS TRUE)
find_package(Boost REQUIRED)
find_package(OpenSSL REQUIRED)

target_link_libraries(csp_websocket_client_adapter
PRIVATE
csp_adapter_utils
${OPENSSL_SSL_LIBRARY}
${OPENSSL_CRYPTO_LIBRARY}
websocketpp::websocketpp
Boost::boost
OpenSSL::SSL
)

install(TARGETS csp_websocket_client_adapter
Expand Down
90 changes: 45 additions & 45 deletions cpp/csp/adapters/websocket/ClientAdapterManager.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
#include <csp/adapters/websocket/ClientAdapterManager.h>

#include <csp/core/Platform.h>
#include <chrono>
#include <iomanip>
#include <iostream>

namespace csp {

INIT_CSP_ENUM( adapters::websocket::ClientStatusType,
Expand All @@ -24,57 +19,62 @@ ClientAdapterManager::ClientAdapterManager( Engine* engine, const Dictionary & p
: AdapterManager( engine ),
m_active( false ),
m_shouldRun( false ),
m_endpoint( nullptr ),
m_endpoint( std::make_unique<WebsocketEndpoint>( properties ) ),
m_inputAdapter( nullptr ),
m_outputAdapter( nullptr ),
m_updateAdapter( nullptr ),
m_thread( nullptr ),
m_properties( properties )
{
if( m_properties.get<bool>( "use_tls" ) )
{
m_endpoint = new WebsocketEndpointTLS( properties );
}
else
{
m_endpoint = new WebsocketEndpointNoTLS( properties );
}


};
{ };

ClientAdapterManager::~ClientAdapterManager()
{ };

void ClientAdapterManager::start( DateTime starttime, DateTime endtime )
{
if(m_inputAdapter != nullptr)
{
m_endpoint -> setOnMessageCb( [ this ]( std::string msg ) {
PushBatch batch( m_engine -> rootEngine() );
m_inputAdapter -> processMessage( msg, &batch );
});
}
m_endpoint -> setOnOpenCb( [ this ]() {
m_active = true;
pushStatus( StatusLevel::INFO, ClientStatusType::ACTIVE, "Connected successfully" );
});
m_endpoint -> setOnFailCb( [ this ]() {
m_active = false;
pushStatus( StatusLevel::ERROR, ClientStatusType::CONNECTION_FAILED, "Connection failed, will try to reconnect" );
});
m_endpoint -> setOnCloseCb( [ this ]() {
m_active = false;
pushStatus( StatusLevel::INFO, ClientStatusType::CLOSED, "Connection closed" );
});
m_endpoint -> setOnSendFailCb( [ this ]( const std::string& s ) {
std::stringstream ss;
ss << "Failed to send: " << s;
pushStatus( StatusLevel::ERROR, ClientStatusType::MESSAGE_SEND_FAIL, ss.str() );
});
AdapterManager::start( starttime, endtime );
// start the bg thread

m_shouldRun = true;
m_endpoint -> setOnOpen(
[ this ]() {
m_active = true;
pushStatus( StatusLevel::INFO, ClientStatusType::ACTIVE, "Connected successfully" );
}
);
m_endpoint -> setOnFail(
[ this ]( const std::string& reason ) {
std::stringstream ss;
ss << "Connection Failure: " << reason;
m_active = false;
pushStatus( StatusLevel::ERROR, ClientStatusType::CONNECTION_FAILED, ss.str() );
}
);
if( m_inputAdapter ) {
m_endpoint -> setOnMessage(
[ this ]( void* c, size_t t ) {
PushBatch batch( m_engine -> rootEngine() );
m_inputAdapter -> processMessage( c, t, &batch );
}
);
} else {
// if a user doesn't call WebsocketAdapterManager.subscribe, no inputadapter will be created
// but we still need something to avoid on_message_cb not being set in the endpoint.
m_endpoint -> setOnMessage( []( void* c, size_t t ){} );
}
m_endpoint -> setOnClose(
[ this ]() {
m_active = false;
pushStatus( StatusLevel::INFO, ClientStatusType::CLOSED, "Connection closed" );
}
);
m_endpoint -> setOnSendFail(
[ this ]( const std::string& s ) {
std::stringstream ss;
ss << "Failed to send: " << s;
pushStatus( StatusLevel::ERROR, ClientStatusType::MESSAGE_SEND_FAIL, ss.str() );
}
);

m_thread = std::make_unique<std::thread>( [ this ]() {
while( m_shouldRun )
{
Expand All @@ -89,7 +89,7 @@ void ClientAdapterManager::stop() {
AdapterManager::stop();

m_shouldRun=false;
if( m_active ) m_endpoint->close();
if( m_active ) m_endpoint->stop();
if( m_thread ) m_thread->join();
};

Expand All @@ -109,7 +109,7 @@ PushInputAdapter* ClientAdapterManager::getInputAdapter(CspTypePtr & type, PushM

OutputAdapter* ClientAdapterManager::getOutputAdapter()
{
if (m_outputAdapter == nullptr) m_outputAdapter = m_engine -> createOwnedObject<ClientOutputAdapter>(m_endpoint);
if (m_outputAdapter == nullptr) m_outputAdapter = m_engine -> createOwnedObject<ClientOutputAdapter>(*m_endpoint);

return m_outputAdapter;
}
Expand Down
14 changes: 9 additions & 5 deletions cpp/csp/adapters/websocket/ClientAdapterManager.h
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
#ifndef _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_ADAPTERMGR_H
#define _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_ADAPTERMGR_H

#include <csp/adapters/websocket/WebsocketEndpoint.h>
#include <csp/adapters/websocket/ClientInputAdapter.h>
#include <csp/adapters/websocket/ClientOutputAdapter.h>
#include <csp/adapters/websocket/ClientHeaderUpdateAdapter.h>
#include <csp/core/Enum.h>
#include <csp/core/Hash.h>
#include <csp/engine/AdapterManager.h>
#include <csp/engine/Dictionary.h>
#include <csp/engine/PushInputAdapter.h>
#include <csp/core/Platform.h>
#include <thread>
#include <chrono>
#include <iomanip>
#include <iostream>

#include <csp/adapters/websocket/ClientInputAdapter.h>
#include <csp/adapters/websocket/ClientOutputAdapter.h>
#include <csp/adapters/websocket/ClientHeaderUpdateAdapter.h>
#include <csp/adapters/websocket/WebsocketEndpoint.h>

namespace csp::adapters::websocket {

Expand Down Expand Up @@ -64,7 +68,7 @@ class ClientAdapterManager final : public AdapterManager

bool m_active;
bool m_shouldRun;
WebsocketEndpointBase* m_endpoint;
std::unique_ptr<WebsocketEndpoint> m_endpoint;
ClientInputAdapter* m_inputAdapter;
ClientOutputAdapter* m_outputAdapter;
ClientHeaderUpdateOutputAdapter* m_updateAdapter;
Expand Down
2 changes: 0 additions & 2 deletions cpp/csp/adapters/websocket/ClientHeaderUpdateAdapter.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
#ifndef _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_HEADERUPDATEADAPTER_H
#define _IN_CSP_ADAPTERS_WEBSOCKETS_CLIENT_HEADERUPDATEADAPTER_H

#include <websocketpp/config/asio_client.hpp>
#include <websocketpp/client.hpp>
#include <csp/engine/Dictionary.h>
#include <csp/engine/OutputAdapter.h>
#include <csp/adapters/utils/MessageWriter.h>
Expand Down
6 changes: 3 additions & 3 deletions cpp/csp/adapters/websocket/ClientInputAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@ ClientInputAdapter::ClientInputAdapter(
m_converter = adapters::utils::MessageStructConverterCache::instance().create( type, properties );
};

void ClientInputAdapter::processMessage( std::string payload, PushBatch* batch )
void ClientInputAdapter::processMessage( void* c, size_t t, PushBatch* batch )
{

if( type() -> type() == CspType::Type::STRUCT )
{
auto tick = m_converter -> asStruct( (void*)payload.data(), payload.length() );
auto tick = m_converter -> asStruct( c, t );
pushTick( std::move(tick), batch );
} else if ( type() -> type() == CspType::Type::STRING )
{
pushTick( std::move(payload), batch );
pushTick( std::string((char const*)c, t), batch );
}

}
Expand Down
2 changes: 1 addition & 1 deletion cpp/csp/adapters/websocket/ClientInputAdapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class ClientInputAdapter final: public PushInputAdapter {
const Dictionary & properties
);

void processMessage( std::string payload, PushBatch* batch );
void processMessage( void* c, size_t t, PushBatch* batch );

private:
adapters::utils::MessageStructConverterPtr m_converter;
Expand Down
4 changes: 2 additions & 2 deletions cpp/csp/adapters/websocket/ClientOutputAdapter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ namespace csp::adapters::websocket {

ClientOutputAdapter::ClientOutputAdapter(
Engine * engine,
WebsocketEndpointBase * endpoint
WebsocketEndpoint& endpoint
) : OutputAdapter( engine ), m_endpoint( endpoint )
{ };

void ClientOutputAdapter::executeImpl()
{
const std::string & value = input() -> lastValueTyped<std::string>();
m_endpoint->send( value );
m_endpoint.send( value );
};

}
4 changes: 2 additions & 2 deletions cpp/csp/adapters/websocket/ClientOutputAdapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ class ClientOutputAdapter final: public OutputAdapter
public:
ClientOutputAdapter(
Engine * engine,
WebsocketEndpointBase* endpoint
WebsocketEndpoint& endpoint
);

void executeImpl() override;

const char * name() const override { return "WebsocketClientOutputAdapter"; }

private:
WebsocketEndpointBase* m_endpoint;
WebsocketEndpoint& m_endpoint;
};

}
Expand Down
Loading

0 comments on commit 69514cb

Please sign in to comment.