Skip to content

Commit

Permalink
Merge pull request #434 from crypto-chassis/develop
Browse files Browse the repository at this point in the history
Release
  • Loading branch information
cryptochassis committed Sep 25, 2023
2 parents e978bd7 + 65ee040 commit 98e90fc
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 17 deletions.
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -55,8 +55,8 @@
* Code closely follows Bloomberg's API: https://www.bloomberg.com/professional/support/api-library/.
* It is ultra fast thanks to very careful optimizations: move semantics, regex optimization, locality of reference, lock contention minimization, etc.
* Supported exchanges:
* Market Data: coinbase, gemini, kraken, kraken-futures, bitstamp, bitfinex, bitmex, binance-us, binance, binance-usds-futures, binance-coin-futures, huobi, huobi-usdt-swap, huobi-coin-swap, okx, erisx (Cboe Digital), kucoin, kucoin-futures, deribit, gateio, gateio-perpetual-futures, cryptocom, bybit, bybit-derivatives, ascendex, bitget, bitget-futures, bitmart, mexc, mexc-futures, whitebit.
* Execution Management: coinbase, gemini, kraken, kraken-futures, bitstamp, bitfinex, bitmex, binance-us, binance, binance-usds-futures, binance-coin-futures, huobi, huobi-usdt-swap, huobi-coin-swap, okx, erisx (Cboe Digital), kucoin, kucoin-futures, deribit, gateio, gateio-perpetual-futures, cryptocom, bybit, bybit-derivatives, ascendex, bitget, bitget-futures, bitmart, mexc.
* Market Data: ascendex, binance, binance-usds-futures, binance-coin-futures, binance-us, bitfinex, bitget, bitget-futures, bitmart, bitmex, bitstamp, bybit, bybit-derivatives, coinbase, cryptocom, deribit, erisx (Cboe Digital), gateio, gateio-perpetual-futures, gemini, huobi, huobi-usdt-swap, huobi-coin-swap, kraken, kraken-futures, kucoin, kucoin-futures, mexc, mexc-futures, okx, whitebit.
* Execution Management: ascendex, binance, binance-usds-futures, binance-coin-futures, binance-us, bitfinex, bitget, bitget-futures, bitmart, bitmex, bitstamp, bybit, bybit-derivatives, coinbase, cryptocom, deribit, erisx (Cboe Digital), gateio, gateio-perpetual-futures, gemini, huobi, huobi-usdt-swap, huobi-coin-swap, kraken, kraken-futures, kucoin, kucoin-futures, mexc, okx.
* FIX: coinbase, gemini.
* A spot market making application is provided as an end-to-end solution for liquidity providers.
* A single order execution application is provided as an end-to-end solution for executing large orders.
Expand Down
4 changes: 2 additions & 2 deletions include/ccapi_cpp/ccapi_ws_connection.h
Expand Up @@ -189,8 +189,8 @@ class WsConnection CCAPI_FINAL {
std::map<std::string, std::string> headers;
std::map<std::string, std::string> credential;
std::shared_ptr<beast::websocket::stream<beast::ssl_stream<beast::tcp_stream> > > streamPtr;
beast::websocket::close_code remoteCloseCode;
beast::websocket::close_reason remoteCloseReason;
beast::websocket::close_code remoteCloseCode{};
beast::websocket::close_reason remoteCloseReason{};
std::string hostHttpHeaderValue;
std::string path;
std::string host;
Expand Down
Expand Up @@ -209,7 +209,7 @@ class ExecutionManagementService : public Service {
WsConnection& wsConnection = this->getWsConnectionFromConnectionPtr(this->serviceContextPtr->tlsClientPtr->get_con_from_hdl(hdl));
auto subscription = wsConnection.subscriptionList.at(0);
this->onTextMessage(wsConnection, subscription, textMessage, timeReceived);
this->onPongByMethod(PingPongMethod::WEBSOCKET_APPLICATION_LEVEL, hdl, textMessage, timeReceived);
this->onPongByMethod(PingPongMethod::WEBSOCKET_APPLICATION_LEVEL, hdl, textMessage, timeReceived, false);
}
void onOpen(wspp::connection_hdl hdl) override {
CCAPI_LOGGER_FUNCTION_ENTER;
Expand Down Expand Up @@ -253,7 +253,7 @@ class ExecutionManagementService : public Service {
void onTextMessage(std::shared_ptr<WsConnection> wsConnectionPtr, boost::beast::string_view textMessageView, const TimePoint& timeReceived) override {
auto subscription = wsConnectionPtr->subscriptionList.at(0);
this->onTextMessage(wsConnectionPtr, subscription, textMessageView, timeReceived);
this->onPongByMethod(PingPongMethod::WEBSOCKET_APPLICATION_LEVEL, wsConnectionPtr, timeReceived);
this->onPongByMethod(PingPongMethod::WEBSOCKET_APPLICATION_LEVEL, wsConnectionPtr, timeReceived, false);
}
void onOpen(std::shared_ptr<WsConnection> wsConnectionPtr) override {
CCAPI_LOGGER_FUNCTION_ENTER;
Expand Down
4 changes: 2 additions & 2 deletions include/ccapi_cpp/service/ccapi_market_data_service.h
Expand Up @@ -307,7 +307,7 @@ class MarketDataService : public Service {
event.setMessageList({message});
this->eventHandler(event, nullptr);
}
this->onPongByMethod(PingPongMethod::WEBSOCKET_APPLICATION_LEVEL, hdl, textMessage, timeReceived);
this->onPongByMethod(PingPongMethod::WEBSOCKET_APPLICATION_LEVEL, hdl, textMessage, timeReceived, false);
CCAPI_LOGGER_FUNCTION_EXIT;
}
virtual void onIncorrectStatesFound(WsConnection& wsConnection, wspp::connection_hdl hdl, const std::string& textMessage, const TimePoint& timeReceived,
Expand Down Expand Up @@ -569,7 +569,7 @@ class MarketDataService : public Service {
event.setMessageList({message});
this->eventHandler(event, nullptr);
}
this->onPongByMethod(PingPongMethod::WEBSOCKET_APPLICATION_LEVEL, wsConnectionPtr, timeReceived);
this->onPongByMethod(PingPongMethod::WEBSOCKET_APPLICATION_LEVEL, wsConnectionPtr, timeReceived, false);
CCAPI_LOGGER_FUNCTION_EXIT;
}
virtual void onIncorrectStatesFound(std::shared_ptr<WsConnection> wsConnectionPtr, boost::beast::string_view textMessageView, const TimePoint& timeReceived,
Expand Down
24 changes: 15 additions & 9 deletions include/ccapi_cpp/service/ccapi_service.h
Expand Up @@ -1301,7 +1301,9 @@ class Service : public std::enable_shared_from_this<Service> {
CCAPI_LOGGER_TRACE("about to start read");
this->startReadWs(wsConnectionPtr);
auto& stream = *wsConnectionPtr->streamPtr;
stream.control_callback(beast::bind_front_handler(&Service::onControlCallback, shared_from_this(), wsConnectionPtr));
stream.control_callback([wsConnectionPtr, that = shared_from_this()](boost::beast::websocket::frame_type kind, boost::beast::string_view payload) {
that->onControlCallback(wsConnectionPtr, kind, payload);
});
}
void startReadWs(std::shared_ptr<WsConnection> wsConnectionPtr) {
auto& stream = *wsConnectionPtr->streamPtr;
Expand Down Expand Up @@ -1345,7 +1347,7 @@ class Service : public std::enable_shared_from_this<Service> {
this->onMessage(wsConnectionPtr, (const char*)readMessageBuffer.data().data(), readMessageBuffer.size());
readMessageBuffer.consume(readMessageBuffer.size());
this->startReadWs(wsConnectionPtr);
this->onPongByMethod(PingPongMethod::WEBSOCKET_PROTOCOL_LEVEL, wsConnectionPtr, now);
this->onPongByMethod(PingPongMethod::WEBSOCKET_PROTOCOL_LEVEL, wsConnectionPtr, now, false);
CCAPI_LOGGER_FUNCTION_EXIT;
}
virtual void onOpen(std::shared_ptr<WsConnection> wsConnectionPtr) {
Expand Down Expand Up @@ -1608,27 +1610,31 @@ class Service : public std::enable_shared_from_this<Service> {
}
void onPong(std::shared_ptr<WsConnection> wsConnectionPtr, boost::beast::string_view payload) {
auto now = UtilTime::now();
this->onPongByMethod(PingPongMethod::WEBSOCKET_PROTOCOL_LEVEL, wsConnectionPtr, now);
this->onPongByMethod(PingPongMethod::WEBSOCKET_PROTOCOL_LEVEL, wsConnectionPtr, now, true);
}
void onPongByMethod(PingPongMethod method, std::shared_ptr<WsConnection> wsConnectionPtr, const TimePoint& timeReceived) {
void onPongByMethod(PingPongMethod method, std::shared_ptr<WsConnection> wsConnectionPtr, const TimePoint& timeReceived, bool truePong) {
CCAPI_LOGGER_FUNCTION_ENTER;
CCAPI_LOGGER_TRACE(pingPongMethodToString(method) + ": received a pong from " + toString(*wsConnectionPtr));
CCAPI_LOGGER_TRACE(pingPongMethodToString(method) + ": received a " + (truePong ? "websocket protocol pong" : "data message") + " from " +
toString(*wsConnectionPtr));
this->lastPongTpByMethodByConnectionIdMap[wsConnectionPtr->id][method] = timeReceived;
CCAPI_LOGGER_FUNCTION_EXIT;
}
void onPing(std::shared_ptr<WsConnection> wsConnectionPtr, boost::beast::string_view payload) {
CCAPI_LOGGER_FUNCTION_ENTER;
auto now = UtilTime::now();
CCAPI_LOGGER_TRACE("received a ping from " + toString(*wsConnectionPtr));
this->lastPongTpByMethodByConnectionIdMap[wsConnectionPtr->id][PingPongMethod::WEBSOCKET_PROTOCOL_LEVEL] = now;
CCAPI_LOGGER_FUNCTION_EXIT;
}
void send(std::shared_ptr<WsConnection> wsConnectionPtr, boost::beast::string_view payload, ErrorCode& ec) {
this->writeMessage(wsConnectionPtr, payload.data(), payload.length());
}
void ping(std::shared_ptr<WsConnection> wsConnectionPtr, boost::beast::string_view payload, ErrorCode& ec) {
if (!this->wsConnectionPendingPingingByIdMap[wsConnectionPtr->id]) {
if (!this->wsConnectionPendingPingingByConnectionIdMap[wsConnectionPtr->id]) {
auto& stream = *wsConnectionPtr->streamPtr;
stream.async_ping("", [that = this, wsConnectionPtr](ErrorCode const& ec) { that->wsConnectionPendingPingingByIdMap[wsConnectionPtr->id] = false; });
this->wsConnectionPendingPingingByIdMap[wsConnectionPtr->id] = true;
stream.async_ping(
"", [that = this, wsConnectionPtr](ErrorCode const& ec) { that->wsConnectionPendingPingingByConnectionIdMap[wsConnectionPtr->id] = false; });
this->wsConnectionPendingPingingByConnectionIdMap[wsConnectionPtr->id] = true;
}
}
virtual void pingOnApplicationLevel(std::shared_ptr<WsConnection> wsConnectionPtr, ErrorCode& ec) {}
Expand Down Expand Up @@ -1743,7 +1749,7 @@ class Service : public std::enable_shared_from_this<Service> {
std::map<std::string, size_t> writeMessageBufferWrittenLengthByConnectionIdMap;
std::map<std::string, std::vector<size_t>> writeMessageBufferBoundaryByConnectionIdMap;
#endif
std::map<std::string, bool> wsConnectionPendingPingingByIdMap;
std::map<std::string, bool> wsConnectionPendingPingingByConnectionIdMap;
std::map<std::string, bool> shouldProcessRemainingMessageOnClosingByConnectionIdMap;
std::map<std::string, int> connectNumRetryOnFailByConnectionUrlMap;
std::map<std::string, TimerPtr> connectRetryOnFailTimerByConnectionIdMap;
Expand Down

0 comments on commit 98e90fc

Please sign in to comment.