Skip to content

Commit

Permalink
Merge pull request #393 from crypto-chassis/dev
Browse files Browse the repository at this point in the history
dev: fix websocket connection issues when using boost beast websocket
  • Loading branch information
cryptochassis committed Jun 13, 2023
2 parents 2e54385 + 6afa7d7 commit f46e5a7
Show file tree
Hide file tree
Showing 19 changed files with 90 additions and 61 deletions.
1 change: 1 addition & 0 deletions app/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,5 +37,6 @@ if (WIN32)
set(ADDITIONAL_LINK_LIBRARIES ws2_32)
endif()
link_libraries(OpenSSL::Crypto OpenSSL::SSL ${ADDITIONAL_LINK_LIBRARIES})
add_compile_options(-Wno-deprecated)
add_subdirectory(src/spot_market_making)
add_subdirectory(src/single_order_execution)
1 change: 1 addition & 0 deletions binding/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,5 @@ include(UseSWIG)
if(BUILD_TEST)
include(CTest)
endif()
add_compile_options(-Wno-deprecated)
add_subdirectory(python)
1 change: 1 addition & 0 deletions example/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ if (WIN32)
set(ADDITIONAL_LINK_LIBRARIES ws2_32)
endif()
link_libraries(OpenSSL::Crypto OpenSSL::SSL ${ADDITIONAL_LINK_LIBRARIES})
add_compile_options(-Wno-deprecated)
add_subdirectory(src/market_data_simple_request)
add_subdirectory(src/market_data_simple_subscription)
add_subdirectory(src/market_data_advanced_request)
Expand Down
70 changes: 45 additions & 25 deletions include/ccapi_cpp/ccapi_ws_connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,29 +93,7 @@ class WsConnection CCAPI_FINAL {
this->correlationIdList.reserve(subscriptionList.size());
std::transform(subscriptionList.cbegin(), subscriptionList.cend(), this->correlationIdList.begin(),
[](Subscription subscription) { return subscription.getCorrelationId(); });
auto splitted1 = UtilString::split(url, "://");
auto foundSlash = splitted1.at(1).find_first_of('/');
auto foundQuestionMark = splitted1.at(1).find_first_of('?');
if (foundSlash == std::string::npos && foundQuestionMark == std::string::npos) {
this->path = "/";
} else if (foundSlash == std::string::npos && foundQuestionMark != std::string::npos) {
this->path = "/" + splitted1.at(1).substr(foundQuestionMark);
} else if (foundSlash != std::string::npos && foundQuestionMark == std::string::npos) {
this->path = splitted1.at(1).substr(foundSlash);
} else {
this->path = splitted1.at(1).substr(foundSlash);
}
auto splitted2 = UtilString::split(UtilString::split(splitted1.at(1), "/").at(0), ":");
this->host = splitted2.at(0);
if (splitted2.size() == 2) {
this->port = splitted2.at(1);
} else {
if (splitted1.at(0) == "https" || splitted1.at(0) == "wss") {
this->port = CCAPI_HTTPS_PORT_DEFAULT;
} else {
this->port = CCAPI_HTTP_PORT_DEFAULT;
}
}
this->setUrlParts();
}
WsConnection() {}
std::string toString() const {
Expand All @@ -127,7 +105,10 @@ class WsConnection CCAPI_FINAL {
oss << streamPtr;
std::string output = "WsConnection [id = " + id + ", url = " + url + ", group = " + group + ", subscriptionList = " + ccapi::toString(subscriptionList) +
", credential = " + ccapi::toString(shortCredential) + ", status = " + statusToString(status) +
", headers = " + ccapi::toString(headers) + ", streamPtr = " + oss.str() + "]";
", headers = " + ccapi::toString(headers) + ", streamPtr = " + oss.str() + ", remoteCloseCode = " + std::to_string(remoteCloseCode) +
", remoteCloseReason = " + std::string(remoteCloseReason.reason.c_str()) +
", hostHttpHeaderValue = " + ccapi::toString(hostHttpHeaderValue) + ", path = " + ccapi::toString(path) +
", host = " + ccapi::toString(host) + ", port = " + ccapi::toString(port) + "]";
return output;
}
enum class Status {
Expand Down Expand Up @@ -164,8 +145,43 @@ class WsConnection CCAPI_FINAL {
}
return output;
}
std::string getUrl() const { return url; }
void setUrl(const std::string& url) {
this->url = url;
this->setUrlParts();
}
void setUrlParts() {
auto splitted1 = UtilString::split(url, "://");
if (splitted1.size() >= 2) {
auto foundSlash = splitted1.at(1).find_first_of('/');
auto foundQuestionMark = splitted1.at(1).find_first_of('?');
if (foundSlash == std::string::npos && foundQuestionMark == std::string::npos) {
this->path = "/";
} else if (foundSlash == std::string::npos && foundQuestionMark != std::string::npos) {
this->path = "/" + splitted1.at(1).substr(foundQuestionMark);
} else if (foundSlash != std::string::npos && foundQuestionMark == std::string::npos) {
this->path = splitted1.at(1).substr(foundSlash);
} else {
this->path = splitted1.at(1).substr(foundSlash);
}
auto splitted2 = UtilString::split(UtilString::split(splitted1.at(1), "/").at(0), ":");
this->host = splitted2.at(0);
if (splitted2.size() == 2) {
this->port = splitted2.at(1);
} else {
if (splitted1.at(0) == "https" || splitted1.at(0) == "wss") {
this->port = CCAPI_HTTPS_PORT_DEFAULT;
} else {
this->port = CCAPI_HTTP_PORT_DEFAULT;
}
}
}
}
void appendUrlPart(const std::string& urlPart) {
this->url += urlPart;
this->setUrlParts();
}
std::string id;
std::string url;
std::string group;
std::vector<Subscription> subscriptionList;
std::vector<std::string> correlationIdList;
Expand All @@ -179,6 +195,10 @@ class WsConnection CCAPI_FINAL {
std::string path;
std::string host;
std::string port;
#ifndef CCAPI_EXPOSE_INTERNAL
private:
#endif
std::string url;
};
} /* namespace ccapi */
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class ExecutionManagementService : public Service {
#else
std::shared_ptr<beast::websocket::stream<beast::ssl_stream<beast::tcp_stream>>> streamPtr(nullptr);
try {
streamPtr = that->createStream<beast::websocket::stream<beast::ssl_stream<beast::tcp_stream>>>(that->serviceContextPtr->ioContextPtr, that->serviceContextPtr->sslContextPtr, that->hostWs);
streamPtr = that->createWsStream(that->serviceContextPtr->ioContextPtr, that->serviceContextPtr->sslContextPtr);
} catch (const beast::error_code& ec) {
CCAPI_LOGGER_TRACE("fail");
that->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE, ec, "create stream", {subscription.getCorrelationId()});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ class ExecutionManagementServiceAscendex : public ExecutionManagementService {
#else
std::shared_ptr<beast::websocket::stream<beast::ssl_stream<beast::tcp_stream>>> streamPtr(nullptr);
try {
streamPtr = that->createStream<beast::websocket::stream<beast::ssl_stream<beast::tcp_stream>>>(that->serviceContextPtr->ioContextPtr, that->serviceContextPtr->sslContextPtr, that->hostWs);
streamPtr = that->createWsStream(that->serviceContextPtr->ioContextPtr, that->serviceContextPtr->sslContextPtr);
} catch (const beast::error_code& ec) {
CCAPI_LOGGER_TRACE("fail");
that->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE, ec, "create stream", {subscription.getCorrelationId()});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ class ExecutionManagementServiceBinanceBase : public ExecutionManagementService
document.Parse<rj::kParseNumbersAsStringsFlag>(body.c_str());
std::string listenKey = document["listenKey"].GetString();
std::string url = that->baseUrlWs + "/" + listenKey;
wsConnectionPtr->url = url;
wsConnectionPtr->setUrl(url);
that->connect(wsConnectionPtr);
that->extraPropertyByConnectionIdMap[wsConnectionPtr->id].insert({
{"listenKey", listenKey},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ class ExecutionManagementServiceBitstamp : public ExecutionManagementService {
if (document.HasMember("token") && document.HasMember("user_id")) {
std::string token = document["token"].GetString();
std::string userId = document["user_id"].GetString();
wsConnectionPtr->url = that->baseUrlWs;
wsConnectionPtr->setUrl(that->baseUrlWs);
that->connect(wsConnectionPtr);
that->extraPropertyByConnectionIdMap[wsConnectionPtr->id].insert({
{"token", token},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class ExecutionManagementServiceGateioPerpetualFutures : public ExecutionManagem
#else
std::shared_ptr<beast::websocket::stream<beast::ssl_stream<beast::tcp_stream>>> streamPtr(nullptr);
try {
streamPtr = that->createStream<beast::websocket::stream<beast::ssl_stream<beast::tcp_stream>>>(that->serviceContextPtr->ioContextPtr, that->serviceContextPtr->sslContextPtr, that->hostWs);
streamPtr = that->createWsStream(that->serviceContextPtr->ioContextPtr, that->serviceContextPtr->sslContextPtr);
} catch (const beast::error_code& ec) {
CCAPI_LOGGER_TRACE("fail");
that->onError(Event::Type::SUBSCRIPTION_STATUS, Message::Type::SUBSCRIPTION_FAILURE, ec, "create stream", {subscription.getCorrelationId()});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,16 +315,16 @@ class ExecutionManagementServiceGemini : public ExecutionManagementService {
credential = this->credentialDefault;
}
auto apiKey = mapGetWithDefault(credential, this->apiKeyName);
wsConnectionPtr->url += "?heartbeat=true";
wsConnectionPtr->appendUrlPart("?heartbeat=true");
if (fieldSet == std::set<std::string>({CCAPI_EM_PRIVATE_TRADE})) {
wsConnectionPtr->url += "&eventTypeFilter=fill";
wsConnectionPtr->appendUrlPart("&eventTypeFilter=fill");
}
if (!instrumentSet.empty()) {
for (const auto& instrument : instrumentSet) {
wsConnectionPtr->url += "&symbolFilter=" + instrument;
wsConnectionPtr->appendUrlPart("&symbolFilter=" + instrument);
}
}
wsConnectionPtr->url += "&apiSessionFilter=" + apiKey;
wsConnectionPtr->appendUrlPart("&apiSessionFilter=" + apiKey);
wsConnectionPtr->headers.insert({"X-GEMINI-APIKEY", apiKey});
int64_t nonce = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
std::string payload = R"({"request":"/v1/order/events","nonce":)" + std::to_string(nonce) + "}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ class ExecutionManagementServiceKraken : public ExecutionManagementService {
document.Parse<rj::kParseNumbersAsStringsFlag>(body.c_str());
if (document.HasMember("result") && document["result"].HasMember("token")) {
std::string token = document["result"]["token"].GetString();
wsConnectionPtr->url = that->baseUrlWs;
wsConnectionPtr->setUrl(that->baseUrlWs);
that->connect(wsConnectionPtr);
that->extraPropertyByConnectionIdMap[wsConnectionPtr->id].insert({
{"token", token},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ class ExecutionManagementServiceKucoinBase : public ExecutionManagementService {
urlWebsocketBase += std::string(instanceServer["endpoint"].GetString());
urlWebsocketBase += "?token=";
urlWebsocketBase += std::string(document["data"]["token"].GetString());
wsConnectionPtr->url = urlWebsocketBase;
wsConnectionPtr->setUrl(urlWebsocketBase);
std::cout << wsConnectionPtr->toString() << std::endl;
that->connect(wsConnectionPtr);
that->extraPropertyByConnectionIdMap[wsConnectionPtr->id].insert({
{"pingInterval", std::string(instanceServer["pingInterval"].GetString())},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ class ExecutionManagementServiceMexc : public ExecutionManagementService {
document.Parse<rj::kParseNumbersAsStringsFlag>(body.c_str());
std::string listenKey = document["listenKey"].GetString();
std::string url = that->baseUrlWs + "?listenKey=" + listenKey;
wsConnectionPtr->url = url;
wsConnectionPtr->setUrl(url);
that->connect(wsConnectionPtr);
that->extraPropertyByConnectionIdMap[wsConnectionPtr->id].insert({
{"listenKey", listenKey},
Expand Down
3 changes: 1 addition & 2 deletions include/ccapi_cpp/service/ccapi_market_data_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ class MarketDataService : public Service {
}
std::shared_ptr<beast::websocket::stream<beast::ssl_stream<beast::tcp_stream>>> streamPtr(nullptr);
try {
streamPtr = that->createStream<beast::websocket::stream<beast::ssl_stream<beast::tcp_stream>>>(
that->serviceContextPtr->ioContextPtr, that->serviceContextPtr->sslContextPtr, that->hostWs);
streamPtr = that->createWsStream(that->serviceContextPtr->ioContextPtr, that->serviceContextPtr->sslContextPtr);
} catch (const beast::error_code& ec) {
CCAPI_LOGGER_TRACE("fail");
std::vector<std::string> correlationIdList;
Expand Down
4 changes: 2 additions & 2 deletions include/ccapi_cpp/service/ccapi_market_data_service_gemini.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class MarketDataServiceGemini : public MarketDataService {
if (marketDepthSubscribedToExchange == 1) {
this->l2UpdateIsReplaceByConnectionIdChannelIdSymbolIdMap[wsConnectionPtr->id][channelId][symbolId] = true;
}
auto exchangeSubscriptionId = wsConnectionPtr->url;
auto exchangeSubscriptionId = wsConnectionPtr->getUrl();
this->channelIdSymbolIdByConnectionIdExchangeSubscriptionIdMap[wsConnectionPtr->id][exchangeSubscriptionId][CCAPI_CHANNEL_ID] = channelId;
this->channelIdSymbolIdByConnectionIdExchangeSubscriptionIdMap[wsConnectionPtr->id][exchangeSubscriptionId][CCAPI_SYMBOL_ID] = symbolId;
std::vector<std::string> correlationIdList_2 =
Expand Down Expand Up @@ -225,7 +225,7 @@ class MarketDataServiceGemini : public MarketDataService {
#ifndef CCAPI_USE_BOOST_BEAST_WEBSOCKET
marketDataMessage.exchangeSubscriptionId = wsConnection.url;
#else
marketDataMessage.exchangeSubscriptionId = wsConnectionPtr->url;
marketDataMessage.exchangeSubscriptionId = wsConnectionPtr.getUrl();
#endif
TimePoint time = timeReceived;
auto it = document.FindMember("timestampms");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class MarketDataServiceKucoinBase : public MarketDataService {
urlWebsocketBase += std::string(instanceServer["endpoint"].GetString());
urlWebsocketBase += "?token=";
urlWebsocketBase += std::string(document["data"]["token"].GetString());
wsConnectionPtr->url = urlWebsocketBase;
wsConnectionPtr->setUrl(urlWebsocketBase);
that->connect(wsConnectionPtr);
for (const auto& subscription : wsConnectionPtr->subscriptionList) {
auto instrument = subscription.getInstrument();
Expand Down
Loading

0 comments on commit f46e5a7

Please sign in to comment.