Skip to content

Commit

Permalink
Merge pull request #439 from crypto-chassis/small_fixes
Browse files Browse the repository at this point in the history
dev: small fixes and small additions
  • Loading branch information
cryptochassis committed Oct 15, 2023
2 parents 368c54d + 6c5c4f2 commit 7c45df3
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Expand Up @@ -206,7 +206,7 @@ jobs:
execute_process(
COMMAND ${{ steps.cmake_and_ninja.outputs.cmake_dir }}/cmake
-DBUILD_TEST_BUILD=ON
-DBUILD_TEST_BUILD=OFF
-DBUILD_TEST_UNIT=OFF
-S test
-B test/build
Expand Down
6 changes: 3 additions & 3 deletions README.md
Expand Up @@ -363,7 +363,7 @@ Subscription subscription("coinbase", "BTC-USD", "MARKET_DEPTH", "MARKET_DEPTH_M

#### Specify correlation id

Instantiate `Request` with the desired correlationId.
Instantiate `Request` with the desired correlationId. The `correlationId` should be unique.
```
Request request(Request::Operation::GET_RECENT_TRADES, "coinbase", "BTC-USD", "cool correlation id");
```
Expand Down Expand Up @@ -699,7 +699,7 @@ Bye

#### Specify correlation id

Instantiate `Request` with the desired correlationId.
Instantiate `Request` with the desired correlationId. The `correlationId` should be unique.
```
Request request(Request::Operation::CREATE_ORDER, "binance-us", "BTCUSD", "cool correlation id");
```
Expand Down Expand Up @@ -969,7 +969,7 @@ Logger* Logger::logger = &myLogger;

[C++](example/src/utility_set_timer/main.cpp)

To perform an asynchronous wait, use the utility method `setTimer` in class `Session`. The handlers are invoked in the same threads as the `processEvent` method in the `EventHandler` class.
To perform an asynchronous wait, use the utility method `setTimer` in class `Session`. The handlers are invoked in the same threads as the `processEvent` method in the `EventHandler` class. The `id` of the timer should be unique. `delayMilliseconds` can be 0.
```
session->setTimer(
"id", 1000,
Expand Down
24 changes: 24 additions & 0 deletions include/ccapi_cpp/ccapi_session.h
Expand Up @@ -886,6 +886,19 @@ class Session {
this->onEvent(event, eventQueuePtr);
}
#ifndef SWIG
virtual void setImmediate(std::function<void()> successHandler) {
boost::asio::post(*this->serviceContextPtr->ioContextPtr, [this, successHandler]() {
if (this->eventHandler) {
#ifdef CCAPI_USE_SINGLE_THREAD
successHandler();
#else
this->eventDispatcher->dispatch([successHandler] {
successHandler();
});
#endif
}
});
}
virtual void setTimer(const std::string& id, long delayMilliseconds, std::function<void(const boost::system::error_code&)> errorHandler,
std::function<void()> successHandler) {
boost::asio::post(*this->serviceContextPtr->ioContextPtr, [this, id, delayMilliseconds, errorHandler, successHandler]() {
Expand Down Expand Up @@ -932,6 +945,17 @@ class Session {
}
}
}
void forceCloseWebsocketConnections(const std::string& serviceName = "", const std::string& exchangeName = "") {
for (const auto& x : this->serviceByServiceNameExchangeMap) {
if (serviceName.empty() || serviceName == x.first) {
for (const auto& y : x.second) {
if (exchangeName.empty() || exchangeName == y.first) {
y.second->forceCloseWebsocketConnections();
}
}
}
}
}
#endif
#ifndef CCAPI_EXPOSE_INTERNAL

Expand Down
15 changes: 11 additions & 4 deletions include/ccapi_cpp/service/ccapi_market_data_service.h
Expand Up @@ -585,18 +585,25 @@ class MarketDataService : public Service {
}
virtual void onIncorrectStatesFound(std::shared_ptr<WsConnection> wsConnectionPtr, boost::beast::string_view textMessageView, const TimePoint& timeReceived,
const std::string& exchangeSubscriptionId, std::string const& reason) {
WsConnection& wsConnection = *wsConnectionPtr;
std::string errorMessage = "incorrect states found: connection = " + toString(wsConnection) + ", textMessage = " + std::string(textMessageView) +
std::string errorMessage = "incorrect states found: connection = " + toString(*wsConnectionPtr) + ", textMessage = " + std::string(textMessageView) +
", timeReceived = " + UtilTime::getISOTimestamp(timeReceived) + ", exchangeSubscriptionId = " + exchangeSubscriptionId +
", reason = " + reason;
CCAPI_LOGGER_ERROR(errorMessage);
ErrorCode ec;
this->close(wsConnectionPtr, beast::websocket::close_code::normal,
beast::websocket::close_reason(beast::websocket::close_code::normal, "incorrect states found: " + reason), ec);
if (ec) {
this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, "shutdown");
std::string& channelId =
this->channelIdSymbolIdByConnectionIdExchangeSubscriptionIdMap.at(wsConnectionPtr->id).at(exchangeSubscriptionId).at(CCAPI_CHANNEL_ID);
std::string& symbolId =
this->channelIdSymbolIdByConnectionIdExchangeSubscriptionIdMap.at(wsConnectionPtr->id).at(exchangeSubscriptionId).at(CCAPI_SYMBOL_ID);
CCAPI_LOGGER_TRACE("channelId = " + toString(channelId));
CCAPI_LOGGER_TRACE("symbolId = " + toString(symbolId));
auto& correlationIdList = this->correlationIdListByConnectionIdChannelIdSymbolIdMap.at(wsConnectionPtr->id).at(channelId).at(symbolId);
CCAPI_LOGGER_TRACE("correlationIdList = " + toString(correlationIdList));
this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::INCORRECT_STATE_FOUND, "shutdown", correlationIdList);
}
this->shouldProcessRemainingMessageOnClosingByConnectionIdMap[wsConnection.id] = false;
this->shouldProcessRemainingMessageOnClosingByConnectionIdMap[wsConnectionPtr->id] = false;
this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::INCORRECT_STATE_FOUND, errorMessage);
}
void connect(std::shared_ptr<WsConnection> wsConnectionPtr) override {
Expand Down
Expand Up @@ -448,6 +448,7 @@ class MarketDataServiceBinanceBase : public MarketDataService {
case Request::Operation::GET_MARKET_DEPTH: {
MarketDataMessage marketDataMessage;
marketDataMessage.type = MarketDataMessage::Type::MARKET_DATA_EVENTS_MARKET_DEPTH;
marketDataMessage.tp = UtilTime::makeTimePointFromMilliseconds(std::stoll(document["T"].GetString()));
for (const auto& x : document["bids"].GetArray()) {
MarketDataMessage::TypeForDataPoint dataPoint;
dataPoint.insert({MarketDataMessage::DataFieldType::PRICE, x[0].GetString()});
Expand Down
19 changes: 10 additions & 9 deletions include/ccapi_cpp/service/ccapi_service.h
Expand Up @@ -147,6 +147,16 @@ class Service : public std::enable_shared_from_this<Service> {
}
}
void purgeHttpConnectionPool() { this->httpConnectionPool.purge(); }
void forceCloseWebsocketConnections() {
for (const auto& x : this->wsConnectionByIdMap) {
ErrorCode ec;
auto wsConnectionPtr = x.second;
this->close(wsConnectionPtr, beast::websocket::close_code::normal, beast::websocket::close_reason("force close"), ec);
if (ec) {
this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, ec, "shutdown");
}
}
}
void stop() {
for (const auto& x : this->sendRequestDelayTimerByCorrelationIdMap) {
x.second->cancel();
Expand All @@ -155,21 +165,12 @@ class Service : public std::enable_shared_from_this<Service> {
this->shouldContinue = false;
for (const auto& x : this->wsConnectionByIdMap) {
ErrorCode ec;
#ifdef CCAPI_LEGACY_USE_WEBSOCKETPP
auto wsConnection = x.second;
this->close(wsConnection, wsConnection.hdl, websocketpp::close::status::normal, "stop", ec);
#else
auto wsConnectionPtr = x.second;
this->close(wsConnectionPtr, beast::websocket::close_code::normal, beast::websocket::close_reason("stop"), ec);
#endif
if (ec) {
this->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::GENERIC_ERROR, ec, "shutdown");
}
#ifdef CCAPI_LEGACY_USE_WEBSOCKETPP
this->shouldProcessRemainingMessageOnClosingByConnectionIdMap[wsConnection.id] = false;
#else
this->shouldProcessRemainingMessageOnClosingByConnectionIdMap[wsConnectionPtr->id] = false;
#endif
}
}
virtual void convertRequestForRestCustom(http::request<http::string_body>& req, const Request& request, const TimePoint& now, const std::string& symbolId,
Expand Down
2 changes: 1 addition & 1 deletion test/test_unit/CMakeLists.txt.in
@@ -1,5 +1,5 @@
project(googletest-download)

cmake_minimum_required(VERSION 3.14)
include(ExternalProject)
ExternalProject_Add(googletest
GIT_REPOSITORY https://github.com/google/googletest.git
Expand Down

0 comments on commit 7c45df3

Please sign in to comment.