diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0f2c8e7d..5a77a3af 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -206,7 +206,7 @@ jobs: execute_process( COMMAND ${{ steps.cmake_and_ninja.outputs.cmake_dir }}/cmake - -DBUILD_TEST_BUILD=ON + -DBUILD_TEST_BUILD=OFF -DBUILD_TEST_UNIT=OFF -S test -B test/build diff --git a/README.md b/README.md index e5100f6a..bc7419ce 100644 --- a/README.md +++ b/README.md @@ -363,7 +363,7 @@ Subscription subscription("coinbase", "BTC-USD", "MARKET_DEPTH", "MARKET_DEPTH_M #### Specify correlation id -Instantiate `Request` with the desired correlationId. +Instantiate `Request` with the desired correlationId. The `correlationId` should be unique. ``` Request request(Request::Operation::GET_RECENT_TRADES, "coinbase", "BTC-USD", "cool correlation id"); ``` @@ -699,7 +699,7 @@ Bye #### Specify correlation id -Instantiate `Request` with the desired correlationId. +Instantiate `Request` with the desired correlationId. The `correlationId` should be unique. ``` Request request(Request::Operation::CREATE_ORDER, "binance-us", "BTCUSD", "cool correlation id"); ``` @@ -969,7 +969,7 @@ Logger* Logger::logger = &myLogger; [C++](example/src/utility_set_timer/main.cpp) -To perform an asynchronous wait, use the utility method `setTimer` in class `Session`. The handlers are invoked in the same threads as the `processEvent` method in the `EventHandler` class. +To perform an asynchronous wait, use the utility method `setTimer` in class `Session`. The handlers are invoked in the same threads as the `processEvent` method in the `EventHandler` class. The `id` of the timer should be unique. `delayMilliseconds` can be 0. ``` session->setTimer( "id", 1000, diff --git a/include/ccapi_cpp/ccapi_session.h b/include/ccapi_cpp/ccapi_session.h index 653c8885..57dca882 100644 --- a/include/ccapi_cpp/ccapi_session.h +++ b/include/ccapi_cpp/ccapi_session.h @@ -886,6 +886,19 @@ class Session { this->onEvent(event, eventQueuePtr); } #ifndef SWIG + virtual void setImmediate(std::function successHandler) { + boost::asio::post(*this->serviceContextPtr->ioContextPtr, [this, successHandler]() { + if (this->eventHandler) { +#ifdef CCAPI_USE_SINGLE_THREAD + successHandler(); +#else + this->eventDispatcher->dispatch([successHandler] { + successHandler(); + }); +#endif + } + }); + } virtual void setTimer(const std::string& id, long delayMilliseconds, std::function errorHandler, std::function successHandler) { boost::asio::post(*this->serviceContextPtr->ioContextPtr, [this, id, delayMilliseconds, errorHandler, successHandler]() { @@ -932,6 +945,17 @@ class Session { } } } + void forceCloseWebsocketConnections(const std::string& serviceName = "", const std::string& exchangeName = "") { + for (const auto& x : this->serviceByServiceNameExchangeMap) { + if (serviceName.empty() || serviceName == x.first) { + for (const auto& y : x.second) { + if (exchangeName.empty() || exchangeName == y.first) { + y.second->forceCloseWebsocketConnections(); + } + } + } + } + } #endif #ifndef CCAPI_EXPOSE_INTERNAL diff --git a/include/ccapi_cpp/service/ccapi_market_data_service.h b/include/ccapi_cpp/service/ccapi_market_data_service.h index 97f852d2..40db7432 100644 --- a/include/ccapi_cpp/service/ccapi_market_data_service.h +++ b/include/ccapi_cpp/service/ccapi_market_data_service.h @@ -585,8 +585,7 @@ class MarketDataService : public Service { } virtual void onIncorrectStatesFound(std::shared_ptr wsConnectionPtr, boost::beast::string_view textMessageView, const TimePoint& timeReceived, const std::string& exchangeSubscriptionId, std::string const& reason) { - WsConnection& wsConnection = *wsConnectionPtr; - std::string errorMessage = "incorrect states found: connection = " + toString(wsConnection) + ", textMessage = " + std::string(textMessageView) + + std::string errorMessage = "incorrect states found: connection = " + toString(*wsConnectionPtr) + ", textMessage = " + std::string(textMessageView) + ", timeReceived = " + UtilTime::getISOTimestamp(timeReceived) + ", exchangeSubscriptionId = " + exchangeSubscriptionId + ", reason = " + reason; CCAPI_LOGGER_ERROR(errorMessage); @@ -594,9 +593,17 @@ class MarketDataService : public Service { this->close(wsConnectionPtr, beast::websocket::close_code::normal, beast::websocket::close_reason(beast::websocket::close_code::normal, "incorrect states found: " + reason), ec); if (ec) { - this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, "shutdown"); + std::string& channelId = + this->channelIdSymbolIdByConnectionIdExchangeSubscriptionIdMap.at(wsConnectionPtr->id).at(exchangeSubscriptionId).at(CCAPI_CHANNEL_ID); + std::string& symbolId = + this->channelIdSymbolIdByConnectionIdExchangeSubscriptionIdMap.at(wsConnectionPtr->id).at(exchangeSubscriptionId).at(CCAPI_SYMBOL_ID); + CCAPI_LOGGER_TRACE("channelId = " + toString(channelId)); + CCAPI_LOGGER_TRACE("symbolId = " + toString(symbolId)); + auto& correlationIdList = this->correlationIdListByConnectionIdChannelIdSymbolIdMap.at(wsConnectionPtr->id).at(channelId).at(symbolId); + CCAPI_LOGGER_TRACE("correlationIdList = " + toString(correlationIdList)); + this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::INCORRECT_STATE_FOUND, "shutdown", correlationIdList); } - this->shouldProcessRemainingMessageOnClosingByConnectionIdMap[wsConnection.id] = false; + this->shouldProcessRemainingMessageOnClosingByConnectionIdMap[wsConnectionPtr->id] = false; this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::INCORRECT_STATE_FOUND, errorMessage); } void connect(std::shared_ptr wsConnectionPtr) override { diff --git a/include/ccapi_cpp/service/ccapi_market_data_service_binance_base.h b/include/ccapi_cpp/service/ccapi_market_data_service_binance_base.h index 30edaddc..78b450ab 100644 --- a/include/ccapi_cpp/service/ccapi_market_data_service_binance_base.h +++ b/include/ccapi_cpp/service/ccapi_market_data_service_binance_base.h @@ -448,6 +448,7 @@ class MarketDataServiceBinanceBase : public MarketDataService { case Request::Operation::GET_MARKET_DEPTH: { MarketDataMessage marketDataMessage; marketDataMessage.type = MarketDataMessage::Type::MARKET_DATA_EVENTS_MARKET_DEPTH; + marketDataMessage.tp = UtilTime::makeTimePointFromMilliseconds(std::stoll(document["T"].GetString())); for (const auto& x : document["bids"].GetArray()) { MarketDataMessage::TypeForDataPoint dataPoint; dataPoint.insert({MarketDataMessage::DataFieldType::PRICE, x[0].GetString()}); diff --git a/include/ccapi_cpp/service/ccapi_service.h b/include/ccapi_cpp/service/ccapi_service.h index b3196cf9..dc0bc960 100644 --- a/include/ccapi_cpp/service/ccapi_service.h +++ b/include/ccapi_cpp/service/ccapi_service.h @@ -147,6 +147,16 @@ class Service : public std::enable_shared_from_this { } } void purgeHttpConnectionPool() { this->httpConnectionPool.purge(); } + void forceCloseWebsocketConnections() { + for (const auto& x : this->wsConnectionByIdMap) { + ErrorCode ec; + auto wsConnectionPtr = x.second; + this->close(wsConnectionPtr, beast::websocket::close_code::normal, beast::websocket::close_reason("force close"), ec); + if (ec) { + this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, ec, "shutdown"); + } + } + } void stop() { for (const auto& x : this->sendRequestDelayTimerByCorrelationIdMap) { x.second->cancel(); @@ -155,21 +165,12 @@ class Service : public std::enable_shared_from_this { this->shouldContinue = false; for (const auto& x : this->wsConnectionByIdMap) { ErrorCode ec; -#ifdef CCAPI_LEGACY_USE_WEBSOCKETPP - auto wsConnection = x.second; - this->close(wsConnection, wsConnection.hdl, websocketpp::close::status::normal, "stop", ec); -#else auto wsConnectionPtr = x.second; this->close(wsConnectionPtr, beast::websocket::close_code::normal, beast::websocket::close_reason("stop"), ec); -#endif if (ec) { this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, ec, "shutdown"); } -#ifdef CCAPI_LEGACY_USE_WEBSOCKETPP - this->shouldProcessRemainingMessageOnClosingByConnectionIdMap[wsConnection.id] = false; -#else this->shouldProcessRemainingMessageOnClosingByConnectionIdMap[wsConnectionPtr->id] = false; -#endif } } virtual void convertRequestForRestCustom(http::request& req, const Request& request, const TimePoint& now, const std::string& symbolId, diff --git a/test/test_unit/CMakeLists.txt.in b/test/test_unit/CMakeLists.txt.in index 821ffe9c..68843afb 100644 --- a/test/test_unit/CMakeLists.txt.in +++ b/test/test_unit/CMakeLists.txt.in @@ -1,5 +1,5 @@ project(googletest-download) - +cmake_minimum_required(VERSION 3.14) include(ExternalProject) ExternalProject_Add(googletest GIT_REPOSITORY https://github.com/google/googletest.git