Skip to content

Commit

Permalink
fix: #432
Browse files Browse the repository at this point in the history
  • Loading branch information
cryptochassis committed Sep 25, 2023
1 parent ad8adea commit 45dc8fe
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 15 deletions.
4 changes: 2 additions & 2 deletions include/ccapi_cpp/ccapi_ws_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ class WsConnection CCAPI_FINAL {
std::map<std::string, std::string> headers;
std::map<std::string, std::string> credential;
std::shared_ptr<beast::websocket::stream<beast::ssl_stream<beast::tcp_stream> > > streamPtr;
beast::websocket::close_code remoteCloseCode;
beast::websocket::close_reason remoteCloseReason;
beast::websocket::close_code remoteCloseCode{};
beast::websocket::close_reason remoteCloseReason{};
std::string hostHttpHeaderValue;
std::string path;
std::string host;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ class ExecutionManagementService : public Service {
WsConnection& wsConnection = this->getWsConnectionFromConnectionPtr(this->serviceContextPtr->tlsClientPtr->get_con_from_hdl(hdl));
auto subscription = wsConnection.subscriptionList.at(0);
this->onTextMessage(wsConnection, subscription, textMessage, timeReceived);
this->onPongByMethod(PingPongMethod::WEBSOCKET_APPLICATION_LEVEL, hdl, textMessage, timeReceived);
this->onPongByMethod(PingPongMethod::WEBSOCKET_APPLICATION_LEVEL, hdl, textMessage, timeReceived, false);
}
void onOpen(wspp::connection_hdl hdl) override {
CCAPI_LOGGER_FUNCTION_ENTER;
Expand Down Expand Up @@ -253,7 +253,7 @@ class ExecutionManagementService : public Service {
void onTextMessage(std::shared_ptr<WsConnection> wsConnectionPtr, boost::beast::string_view textMessageView, const TimePoint& timeReceived) override {
auto subscription = wsConnectionPtr->subscriptionList.at(0);
this->onTextMessage(wsConnectionPtr, subscription, textMessageView, timeReceived);
this->onPongByMethod(PingPongMethod::WEBSOCKET_APPLICATION_LEVEL, wsConnectionPtr, timeReceived);
this->onPongByMethod(PingPongMethod::WEBSOCKET_APPLICATION_LEVEL, wsConnectionPtr, timeReceived, false);
}
void onOpen(std::shared_ptr<WsConnection> wsConnectionPtr) override {
CCAPI_LOGGER_FUNCTION_ENTER;
Expand Down
4 changes: 2 additions & 2 deletions include/ccapi_cpp/service/ccapi_market_data_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ class MarketDataService : public Service {
event.setMessageList({message});
this->eventHandler(event, nullptr);
}
this->onPongByMethod(PingPongMethod::WEBSOCKET_APPLICATION_LEVEL, hdl, textMessage, timeReceived);
this->onPongByMethod(PingPongMethod::WEBSOCKET_APPLICATION_LEVEL, hdl, textMessage, timeReceived, false);
CCAPI_LOGGER_FUNCTION_EXIT;
}
virtual void onIncorrectStatesFound(WsConnection& wsConnection, wspp::connection_hdl hdl, const std::string& textMessage, const TimePoint& timeReceived,
Expand Down Expand Up @@ -569,7 +569,7 @@ class MarketDataService : public Service {
event.setMessageList({message});
this->eventHandler(event, nullptr);
}
this->onPongByMethod(PingPongMethod::WEBSOCKET_APPLICATION_LEVEL, wsConnectionPtr, timeReceived);
this->onPongByMethod(PingPongMethod::WEBSOCKET_APPLICATION_LEVEL, wsConnectionPtr, timeReceived, false);
CCAPI_LOGGER_FUNCTION_EXIT;
}
virtual void onIncorrectStatesFound(std::shared_ptr<WsConnection> wsConnectionPtr, boost::beast::string_view textMessageView, const TimePoint& timeReceived,
Expand Down
24 changes: 15 additions & 9 deletions include/ccapi_cpp/service/ccapi_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -1301,7 +1301,9 @@ class Service : public std::enable_shared_from_this<Service> {
CCAPI_LOGGER_TRACE("about to start read");
this->startReadWs(wsConnectionPtr);
auto& stream = *wsConnectionPtr->streamPtr;
stream.control_callback(beast::bind_front_handler(&Service::onControlCallback, shared_from_this(), wsConnectionPtr));
stream.control_callback([wsConnectionPtr, that = shared_from_this()](boost::beast::websocket::frame_type kind, boost::beast::string_view payload) {
that->onControlCallback(wsConnectionPtr, kind, payload);
});
}
void startReadWs(std::shared_ptr<WsConnection> wsConnectionPtr) {
auto& stream = *wsConnectionPtr->streamPtr;
Expand Down Expand Up @@ -1345,7 +1347,7 @@ class Service : public std::enable_shared_from_this<Service> {
this->onMessage(wsConnectionPtr, (const char*)readMessageBuffer.data().data(), readMessageBuffer.size());
readMessageBuffer.consume(readMessageBuffer.size());
this->startReadWs(wsConnectionPtr);
this->onPongByMethod(PingPongMethod::WEBSOCKET_PROTOCOL_LEVEL, wsConnectionPtr, now);
this->onPongByMethod(PingPongMethod::WEBSOCKET_PROTOCOL_LEVEL, wsConnectionPtr, now, false);
CCAPI_LOGGER_FUNCTION_EXIT;
}
virtual void onOpen(std::shared_ptr<WsConnection> wsConnectionPtr) {
Expand Down Expand Up @@ -1608,27 +1610,31 @@ class Service : public std::enable_shared_from_this<Service> {
}
void onPong(std::shared_ptr<WsConnection> wsConnectionPtr, boost::beast::string_view payload) {
auto now = UtilTime::now();
this->onPongByMethod(PingPongMethod::WEBSOCKET_PROTOCOL_LEVEL, wsConnectionPtr, now);
this->onPongByMethod(PingPongMethod::WEBSOCKET_PROTOCOL_LEVEL, wsConnectionPtr, now, true);
}
void onPongByMethod(PingPongMethod method, std::shared_ptr<WsConnection> wsConnectionPtr, const TimePoint& timeReceived) {
void onPongByMethod(PingPongMethod method, std::shared_ptr<WsConnection> wsConnectionPtr, const TimePoint& timeReceived, bool truePong) {
CCAPI_LOGGER_FUNCTION_ENTER;
CCAPI_LOGGER_TRACE(pingPongMethodToString(method) + ": received a pong from " + toString(*wsConnectionPtr));
CCAPI_LOGGER_TRACE(pingPongMethodToString(method) + ": received a " + (truePong ? "websocket protocol pong" : "data message") + " from " +
toString(*wsConnectionPtr));
this->lastPongTpByMethodByConnectionIdMap[wsConnectionPtr->id][method] = timeReceived;
CCAPI_LOGGER_FUNCTION_EXIT;
}
void onPing(std::shared_ptr<WsConnection> wsConnectionPtr, boost::beast::string_view payload) {
CCAPI_LOGGER_FUNCTION_ENTER;
auto now = UtilTime::now();
CCAPI_LOGGER_TRACE("received a ping from " + toString(*wsConnectionPtr));
this->lastPongTpByMethodByConnectionIdMap[wsConnectionPtr->id][PingPongMethod::WEBSOCKET_PROTOCOL_LEVEL] = now;
CCAPI_LOGGER_FUNCTION_EXIT;
}
void send(std::shared_ptr<WsConnection> wsConnectionPtr, boost::beast::string_view payload, ErrorCode& ec) {
this->writeMessage(wsConnectionPtr, payload.data(), payload.length());
}
void ping(std::shared_ptr<WsConnection> wsConnectionPtr, boost::beast::string_view payload, ErrorCode& ec) {
if (!this->wsConnectionPendingPingingByIdMap[wsConnectionPtr->id]) {
if (!this->wsConnectionPendingPingingByConnectionIdMap[wsConnectionPtr->id]) {
auto& stream = *wsConnectionPtr->streamPtr;
stream.async_ping("", [that = this, wsConnectionPtr](ErrorCode const& ec) { that->wsConnectionPendingPingingByIdMap[wsConnectionPtr->id] = false; });
this->wsConnectionPendingPingingByIdMap[wsConnectionPtr->id] = true;
stream.async_ping(
"", [that = this, wsConnectionPtr](ErrorCode const& ec) { that->wsConnectionPendingPingingByConnectionIdMap[wsConnectionPtr->id] = false; });
this->wsConnectionPendingPingingByConnectionIdMap[wsConnectionPtr->id] = true;
}
}
virtual void pingOnApplicationLevel(std::shared_ptr<WsConnection> wsConnectionPtr, ErrorCode& ec) {}
Expand Down Expand Up @@ -1743,7 +1749,7 @@ class Service : public std::enable_shared_from_this<Service> {
std::map<std::string, size_t> writeMessageBufferWrittenLengthByConnectionIdMap;
std::map<std::string, std::vector<size_t>> writeMessageBufferBoundaryByConnectionIdMap;
#endif
std::map<std::string, bool> wsConnectionPendingPingingByIdMap;
std::map<std::string, bool> wsConnectionPendingPingingByConnectionIdMap;
std::map<std::string, bool> shouldProcessRemainingMessageOnClosingByConnectionIdMap;
std::map<std::string, int> connectNumRetryOnFailByConnectionUrlMap;
std::map<std::string, TimerPtr> connectRetryOnFailTimerByConnectionIdMap;
Expand Down

0 comments on commit 45dc8fe

Please sign in to comment.