Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release #440

Merged
merged 3 commits into from
Oct 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ jobs:

execute_process(
COMMAND ${{ steps.cmake_and_ninja.outputs.cmake_dir }}/cmake
-DBUILD_TEST_BUILD=ON
-DBUILD_TEST_BUILD=OFF
-DBUILD_TEST_UNIT=OFF
-S test
-B test/build
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ Subscription subscription("coinbase", "BTC-USD", "MARKET_DEPTH", "MARKET_DEPTH_M

#### Specify correlation id

Instantiate `Request` with the desired correlationId.
Instantiate `Request` with the desired correlationId. The `correlationId` should be unique.
```
Request request(Request::Operation::GET_RECENT_TRADES, "coinbase", "BTC-USD", "cool correlation id");
```
Expand Down Expand Up @@ -699,7 +699,7 @@ Bye

#### Specify correlation id

Instantiate `Request` with the desired correlationId.
Instantiate `Request` with the desired correlationId. The `correlationId` should be unique.
```
Request request(Request::Operation::CREATE_ORDER, "binance-us", "BTCUSD", "cool correlation id");
```
Expand Down Expand Up @@ -969,7 +969,7 @@ Logger* Logger::logger = &myLogger;

[C++](example/src/utility_set_timer/main.cpp)

To perform an asynchronous wait, use the utility method `setTimer` in class `Session`. The handlers are invoked in the same threads as the `processEvent` method in the `EventHandler` class.
To perform an asynchronous wait, use the utility method `setTimer` in class `Session`. The handlers are invoked in the same threads as the `processEvent` method in the `EventHandler` class. The `id` of the timer should be unique. `delayMilliseconds` can be 0.
```
session->setTimer(
"id", 1000,
Expand Down
24 changes: 24 additions & 0 deletions include/ccapi_cpp/ccapi_session.h
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,19 @@ class Session {
this->onEvent(event, eventQueuePtr);
}
#ifndef SWIG
virtual void setImmediate(std::function<void()> successHandler) {
boost::asio::post(*this->serviceContextPtr->ioContextPtr, [this, successHandler]() {
if (this->eventHandler) {
#ifdef CCAPI_USE_SINGLE_THREAD
successHandler();
#else
this->eventDispatcher->dispatch([successHandler] {
successHandler();
});
#endif
}
});
}
virtual void setTimer(const std::string& id, long delayMilliseconds, std::function<void(const boost::system::error_code&)> errorHandler,
std::function<void()> successHandler) {
boost::asio::post(*this->serviceContextPtr->ioContextPtr, [this, id, delayMilliseconds, errorHandler, successHandler]() {
Expand Down Expand Up @@ -932,6 +945,17 @@ class Session {
}
}
}
void forceCloseWebsocketConnections(const std::string& serviceName = "", const std::string& exchangeName = "") {
for (const auto& x : this->serviceByServiceNameExchangeMap) {
if (serviceName.empty() || serviceName == x.first) {
for (const auto& y : x.second) {
if (exchangeName.empty() || exchangeName == y.first) {
y.second->forceCloseWebsocketConnections();
}
}
}
}
}
#endif
#ifndef CCAPI_EXPOSE_INTERNAL

Expand Down
15 changes: 11 additions & 4 deletions include/ccapi_cpp/service/ccapi_market_data_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -585,18 +585,25 @@ class MarketDataService : public Service {
}
virtual void onIncorrectStatesFound(std::shared_ptr<WsConnection> wsConnectionPtr, boost::beast::string_view textMessageView, const TimePoint& timeReceived,
const std::string& exchangeSubscriptionId, std::string const& reason) {
WsConnection& wsConnection = *wsConnectionPtr;
std::string errorMessage = "incorrect states found: connection = " + toString(wsConnection) + ", textMessage = " + std::string(textMessageView) +
std::string errorMessage = "incorrect states found: connection = " + toString(*wsConnectionPtr) + ", textMessage = " + std::string(textMessageView) +
", timeReceived = " + UtilTime::getISOTimestamp(timeReceived) + ", exchangeSubscriptionId = " + exchangeSubscriptionId +
", reason = " + reason;
CCAPI_LOGGER_ERROR(errorMessage);
ErrorCode ec;
this->close(wsConnectionPtr, beast::websocket::close_code::normal,
beast::websocket::close_reason(beast::websocket::close_code::normal, "incorrect states found: " + reason), ec);
if (ec) {
this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, "shutdown");
std::string& channelId =
this->channelIdSymbolIdByConnectionIdExchangeSubscriptionIdMap.at(wsConnectionPtr->id).at(exchangeSubscriptionId).at(CCAPI_CHANNEL_ID);
std::string& symbolId =
this->channelIdSymbolIdByConnectionIdExchangeSubscriptionIdMap.at(wsConnectionPtr->id).at(exchangeSubscriptionId).at(CCAPI_SYMBOL_ID);
CCAPI_LOGGER_TRACE("channelId = " + toString(channelId));
CCAPI_LOGGER_TRACE("symbolId = " + toString(symbolId));
auto& correlationIdList = this->correlationIdListByConnectionIdChannelIdSymbolIdMap.at(wsConnectionPtr->id).at(channelId).at(symbolId);
CCAPI_LOGGER_TRACE("correlationIdList = " + toString(correlationIdList));
this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::INCORRECT_STATE_FOUND, "shutdown", correlationIdList);
}
this->shouldProcessRemainingMessageOnClosingByConnectionIdMap[wsConnection.id] = false;
this->shouldProcessRemainingMessageOnClosingByConnectionIdMap[wsConnectionPtr->id] = false;
this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::INCORRECT_STATE_FOUND, errorMessage);
}
void connect(std::shared_ptr<WsConnection> wsConnectionPtr) override {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ class MarketDataServiceBinanceBase : public MarketDataService {
case Request::Operation::GET_MARKET_DEPTH: {
MarketDataMessage marketDataMessage;
marketDataMessage.type = MarketDataMessage::Type::MARKET_DATA_EVENTS_MARKET_DEPTH;
marketDataMessage.tp = UtilTime::makeTimePointFromMilliseconds(std::stoll(document["T"].GetString()));
for (const auto& x : document["bids"].GetArray()) {
MarketDataMessage::TypeForDataPoint dataPoint;
dataPoint.insert({MarketDataMessage::DataFieldType::PRICE, x[0].GetString()});
Expand Down
19 changes: 10 additions & 9 deletions include/ccapi_cpp/service/ccapi_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,16 @@ class Service : public std::enable_shared_from_this<Service> {
}
}
void purgeHttpConnectionPool() { this->httpConnectionPool.purge(); }
void forceCloseWebsocketConnections() {
for (const auto& x : this->wsConnectionByIdMap) {
ErrorCode ec;
auto wsConnectionPtr = x.second;
this->close(wsConnectionPtr, beast::websocket::close_code::normal, beast::websocket::close_reason("force close"), ec);
if (ec) {
this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, ec, "shutdown");
}
}
}
void stop() {
for (const auto& x : this->sendRequestDelayTimerByCorrelationIdMap) {
x.second->cancel();
Expand All @@ -155,21 +165,12 @@ class Service : public std::enable_shared_from_this<Service> {
this->shouldContinue = false;
for (const auto& x : this->wsConnectionByIdMap) {
ErrorCode ec;
#ifdef CCAPI_LEGACY_USE_WEBSOCKETPP
auto wsConnection = x.second;
this->close(wsConnection, wsConnection.hdl, websocketpp::close::status::normal, "stop", ec);
#else
auto wsConnectionPtr = x.second;
this->close(wsConnectionPtr, beast::websocket::close_code::normal, beast::websocket::close_reason("stop"), ec);
#endif
if (ec) {
this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, ec, "shutdown");
}
#ifdef CCAPI_LEGACY_USE_WEBSOCKETPP
this->shouldProcessRemainingMessageOnClosingByConnectionIdMap[wsConnection.id] = false;
#else
this->shouldProcessRemainingMessageOnClosingByConnectionIdMap[wsConnectionPtr->id] = false;
#endif
}
}
virtual void convertRequestForRestCustom(http::request<http::string_body>& req, const Request& request, const TimePoint& now, const std::string& symbolId,
Expand Down
2 changes: 1 addition & 1 deletion test/test_unit/CMakeLists.txt.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
project(googletest-download)

cmake_minimum_required(VERSION 3.14)
include(ExternalProject)
ExternalProject_Add(googletest
GIT_REPOSITORY https://github.com/google/googletest.git
Expand Down