Skip to content

Commit

Permalink
dev: draft for specifying a local ip address when sending a request
Browse files Browse the repository at this point in the history
  • Loading branch information
cryptochassis committed Oct 25, 2023
1 parent ab4ed13 commit f6f5670
Show file tree
Hide file tree
Showing 55 changed files with 216 additions and 181 deletions.
2 changes: 2 additions & 0 deletions .clang-format
@@ -1,2 +1,4 @@
BasedOnStyle: Google
ColumnLimit: 160
SeparateDefinitionBlocks: Always
MaxEmptyLinesToKeep: 1
@@ -1,6 +1,7 @@
set(NAME execution_management_simple_request)
project(${NAME})
add_compile_definitions(CCAPI_ENABLE_SERVICE_EXECUTION_MANAGEMENT)
add_compile_definitions(CCAPI_ENABLE_EXCHANGE_BINANCE)
add_compile_definitions(CCAPI_ENABLE_LOG_TRACE)
add_compile_definitions(CCAPI_ENABLE_EXCHANGE_BINANCE_USDS_FUTURES)
add_executable(${NAME} main.cpp)
add_dependencies(${NAME} boost rapidjson)
52 changes: 36 additions & 16 deletions example/src/execution_management_simple_request/main.cpp
@@ -1,6 +1,18 @@
#include "ccapi_cpp/ccapi_session.h"
namespace ccapi {
Logger* Logger::logger = nullptr; // This line is needed.
class MyLogger final : public Logger {
public:
void logMessage(const std::string& severity, const std::string& threadId, const std::string& timeISO, const std::string& fileName,
const std::string& lineNumber, const std::string& message) override {
std::lock_guard<std::mutex> lock(m);
std::cout << threadId << ": [" << timeISO << "] {" << fileName << ":" << lineNumber << "} " << severity << std::string(8, ' ') << message << std::endl;
}

private:
std::mutex m;
};
MyLogger myLogger;
Logger* Logger::logger = &myLogger;
class MyEventHandler : public EventHandler {
public:
bool processEvent(const Event& event, Session* session) override {
Expand All @@ -17,14 +29,14 @@ using ::ccapi::SessionOptions;
using ::ccapi::toString;
using ::ccapi::UtilSystem;
int main(int argc, char** argv) {
if (UtilSystem::getEnvAsString("BINANCE_API_KEY").empty()) {
std::cerr << "Please set environment variable BINANCE_API_KEY" << std::endl;
return EXIT_FAILURE;
}
if (UtilSystem::getEnvAsString("BINANCE_API_SECRET").empty()) {
std::cerr << "Please set environment variable BINANCE_API_SECRET" << std::endl;
return EXIT_FAILURE;
}
// if (UtilSystem::getEnvAsString("BINANCE_API_KEY").empty()) {
// std::cerr << "Please set environment variable BINANCE_API_KEY" << std::endl;
// return EXIT_FAILURE;
// }
// if (UtilSystem::getEnvAsString("BINANCE_API_SECRET").empty()) {
// std::cerr << "Please set environment variable BINANCE_API_SECRET" << std::endl;
// return EXIT_FAILURE;
// }
std::vector<std::string> modeList = {
"create_order", "cancel_order", "get_order", "get_open_orders", "cancel_open_orders", "get_account_balances",
};
Expand All @@ -45,7 +57,7 @@ int main(int argc, char** argv) {
session.stop();
return EXIT_FAILURE;
}
Request request(Request::Operation::CREATE_ORDER, "binance", argv[2]);
Request request(Request::Operation::CREATE_ORDER, "binance-usds-futures", argv[2]);
request.appendParam({
{"SIDE", strcmp(argv[3], "buy") == 0 ? "BUY" : "SELL"},
{"QUANTITY", argv[4]},
Expand All @@ -60,7 +72,7 @@ int main(int argc, char** argv) {
session.stop();
return EXIT_FAILURE;
}
Request request(Request::Operation::CANCEL_ORDER, "binance", argv[2]);
Request request(Request::Operation::CANCEL_ORDER, "binance-usds-futures", argv[2]);
request.appendParam({
{"ORDER_ID", argv[3]},
});
Expand All @@ -73,7 +85,7 @@ int main(int argc, char** argv) {
session.stop();
return EXIT_FAILURE;
}
Request request(Request::Operation::GET_ORDER, "binance", argv[2]);
Request request(Request::Operation::GET_ORDER, "binance-usds-futures", argv[2]);
request.appendParam({
{"ORDER_ID", argv[3]},
});
Expand All @@ -86,8 +98,16 @@ int main(int argc, char** argv) {
session.stop();
return EXIT_FAILURE;
}
Request request(Request::Operation::GET_OPEN_ORDERS, "binance", argv[2]);
session.sendRequest(request);
{
Request request(Request::Operation::GET_OPEN_ORDERS, "binance-usds-futures", argv[2]);
session.sendRequest(request);
}
// std::this_thread::sleep_for(std::chrono::seconds(1));
{
Request request(Request::Operation::GET_OPEN_ORDERS, "binance-usds-futures", argv[2]);
session.sendRequest(request);
}

} else if (mode == "cancel_open_orders") {
if (argc != 3) {
std::cerr << "Usage: " << argv[0] << " cancel_open_orders <symbol>\n"
Expand All @@ -96,10 +116,10 @@ int main(int argc, char** argv) {
session.stop();
return EXIT_FAILURE;
}
Request request(Request::Operation::CANCEL_OPEN_ORDERS, "binance", argv[2]);
Request request(Request::Operation::CANCEL_OPEN_ORDERS, "binance-usds-futures", argv[2]);
session.sendRequest(request);
} else if (mode == "get_account_balances") {
Request request(Request::Operation::GET_ACCOUNT_BALANCES, "binance");
Request request(Request::Operation::GET_ACCOUNT_BALANCES, "binance-usds-futures");
session.sendRequest(request);
}
std::this_thread::sleep_for(std::chrono::seconds(10));
Expand Down
@@ -1,6 +1,7 @@
set(NAME execution_management_simple_subscription)
project(${NAME})
add_compile_definitions(CCAPI_ENABLE_SERVICE_EXECUTION_MANAGEMENT)
add_compile_definitions(CCAPI_ENABLE_EXCHANGE_COINBASE)
add_compile_definitions(CCAPI_ENABLE_LOG_TRACE)
add_compile_definitions(CCAPI_ENABLE_EXCHANGE_BINANCE_USDS_FUTURES)
add_executable(${NAME} main.cpp)
add_dependencies(${NAME} boost rapidjson)
74 changes: 43 additions & 31 deletions example/src/execution_management_simple_subscription/main.cpp
@@ -1,25 +1,37 @@
#include "ccapi_cpp/ccapi_session.h"
namespace ccapi {
Logger* Logger::logger = nullptr; // This line is needed.
class MyLogger final : public Logger {
public:
void logMessage(const std::string& severity, const std::string& threadId, const std::string& timeISO, const std::string& fileName,
const std::string& lineNumber, const std::string& message) override {
std::lock_guard<std::mutex> lock(m);
std::cout << threadId << ": [" << timeISO << "] {" << fileName << ":" << lineNumber << "} " << severity << std::string(8, ' ') << message << std::endl;
}

private:
std::mutex m;
};
MyLogger myLogger;
Logger* Logger::logger = &myLogger;
class MyEventHandler : public EventHandler {
public:
bool processEvent(const Event& event, Session* session) override {
if (event.getType() == Event::Type::SUBSCRIPTION_STATUS) {
std::cout << "Received an event of type SUBSCRIPTION_STATUS:\n" + event.toStringPretty(2, 2) << std::endl;
auto message = event.getMessageList().at(0);
if (message.getType() == Message::Type::SUBSCRIPTION_STARTED) {
Request request(Request::Operation::CREATE_ORDER, "coinbase", "BTC-USD");
request.appendParam({
{"SIDE", "BUY"},
{"LIMIT_PRICE", "20000"},
{"QUANTITY", "0.001"},
{"CLIENT_ORDER_ID", "6d4eb0fb-2229-469f-873e-557dd78ac11e"},
});
session->sendRequest(request);
}
} else if (event.getType() == Event::Type::SUBSCRIPTION_DATA) {
std::cout << "Received an event of type SUBSCRIPTION_DATA:\n" + event.toStringPretty(2, 2) << std::endl;
}
// if (event.getType() == Event::Type::SUBSCRIPTION_STATUS) {
// std::cout << "Received an event of type SUBSCRIPTION_STATUS:\n" + event.toStringPretty(2, 2) << std::endl;
// auto message = event.getMessageList().at(0);
// if (message.getType() == Message::Type::SUBSCRIPTION_STARTED) {
// Request request(Request::Operation::CREATE_ORDER, "coinbase", "BTC-USD");
// request.appendParam({
// {"SIDE", "BUY"},
// {"LIMIT_PRICE", "20000"},
// {"QUANTITY", "0.001"},
// {"CLIENT_ORDER_ID", "6d4eb0fb-2229-469f-873e-557dd78ac11e"},
// });
// session->sendRequest(request);
// }
// } else if (event.getType() == Event::Type::SUBSCRIPTION_DATA) {
// std::cout << "Received an event of type SUBSCRIPTION_DATA:\n" + event.toStringPretty(2, 2) << std::endl;
// }
return true;
}
};
Expand All @@ -32,25 +44,25 @@ using ::ccapi::SessionOptions;
using ::ccapi::Subscription;
using ::ccapi::UtilSystem;
int main(int argc, char** argv) {
if (UtilSystem::getEnvAsString("COINBASE_API_KEY").empty()) {
std::cerr << "Please set environment variable COINBASE_API_KEY" << std::endl;
return EXIT_FAILURE;
}
if (UtilSystem::getEnvAsString("COINBASE_API_SECRET").empty()) {
std::cerr << "Please set environment variable COINBASE_API_SECRET" << std::endl;
return EXIT_FAILURE;
}
if (UtilSystem::getEnvAsString("COINBASE_API_PASSPHRASE").empty()) {
std::cerr << "Please set environment variable COINBASE_API_PASSPHRASE" << std::endl;
return EXIT_FAILURE;
}
// if (UtilSystem::getEnvAsString("COINBASE_API_KEY").empty()) {
// std::cerr << "Please set environment variable COINBASE_API_KEY" << std::endl;
// return EXIT_FAILURE;
// }
// if (UtilSystem::getEnvAsString("COINBASE_API_SECRET").empty()) {
// std::cerr << "Please set environment variable COINBASE_API_SECRET" << std::endl;
// return EXIT_FAILURE;
// }
// if (UtilSystem::getEnvAsString("COINBASE_API_PASSPHRASE").empty()) {
// std::cerr << "Please set environment variable COINBASE_API_PASSPHRASE" << std::endl;
// return EXIT_FAILURE;
// }
SessionOptions sessionOptions;
SessionConfigs sessionConfigs;
MyEventHandler eventHandler;
Session session(sessionOptions, sessionConfigs, &eventHandler);
Subscription subscription("coinbase", "BTC-USD", "ORDER_UPDATE");
Subscription subscription("binance-usds-futures", argv[1], argv[2]);
session.subscribe(subscription);
std::this_thread::sleep_for(std::chrono::seconds(10));
std::this_thread::sleep_for(std::chrono::seconds(1000));
session.stop();
std::cout << "Bye" << std::endl;
return EXIT_SUCCESS;
Expand Down
3 changes: 2 additions & 1 deletion include/ccapi_cpp/ccapi_http_connection.h
Expand Up @@ -15,12 +15,13 @@ class HttpConnection CCAPI_FINAL {
std::string toString() const {
std::ostringstream oss;
oss << streamPtr;
std::string output = "HttpConnection [host = " + host + ", port = " + port + ", streamPtr = " + oss.str() + "]";
std::string output = "HttpConnection [host = " + host + ", port = " + port + ", streamPtr = " + oss.str() +", lastReceiveDataTp = "+UtilTime::getISOTimestamp(lastReceiveDataTp)+ "]";
return output;
}
std::string host;
std::string port;
std::shared_ptr<beast::ssl_stream<beast::tcp_stream> > streamPtr;
TimePoint lastReceiveDataTp{std::chrono::seconds{0}};
};
} /* namespace ccapi */
#endif // INCLUDE_CCAPI_CPP_CCAPI_HTTP_CONNECTION_H_
9 changes: 3 additions & 6 deletions include/ccapi_cpp/ccapi_macro.h
Expand Up @@ -603,6 +603,9 @@
#ifndef CCAPI_CREDENTIAL_DISPLAY_LENGTH
#define CCAPI_CREDENTIAL_DISPLAY_LENGTH 4
#endif
#ifndef CCAPI_LOCAL_IP_ADDRESS_DEFAULT
#define CCAPI_LOCAL_IP_ADDRESS_DEFAULT ""
#endif

// start: exchange REST urls
#ifndef CCAPI_COINBASE_URL_REST_BASE
Expand Down Expand Up @@ -1004,12 +1007,6 @@
#ifndef CCAPI_BINANCE_API_SECRET
#define CCAPI_BINANCE_API_SECRET "BINANCE_API_SECRET"
#endif
// #ifndef CCAPI_BINANCE_MARGIN_API_KEY
// #define CCAPI_BINANCE_MARGIN_API_KEY "BINANCE_MARGIN_API_KEY"
// #endif
// #ifndef CCAPI_BINANCE_MARGIN_API_SECRET
// #define CCAPI_BINANCE_MARGIN_API_SECRET "BINANCE_MARGIN_API_SECRET"
// #endif
#ifndef CCAPI_BINANCE_USDS_FUTURES_API_KEY
#define CCAPI_BINANCE_USDS_FUTURES_API_KEY "BINANCE_USDS_FUTURES_API_KEY"
#endif
Expand Down
5 changes: 4 additions & 1 deletion include/ccapi_cpp/ccapi_request.h
Expand Up @@ -148,7 +148,7 @@ class Request CCAPI_FINAL {
", correlationId = " + correlationId + ", secondaryCorrelationId = " + secondaryCorrelationId +
(this->serviceName == CCAPI_FIX ? ", paramListFix = " + ccapi::toString(paramListFix) : ", paramList = " + ccapi::toString(paramList)) +
", credential = " + ccapi::toString(shortCredential) + ", operation = " + operationToString(operation) +
", timeSent = " + UtilTime::getISOTimestamp(timeSent) + "]";
", timeSent = " + UtilTime::getISOTimestamp(timeSent)+", index = " + ccapi::toString(index)+", localIpAddress = " + localIpAddress + "]";
return output;
}
const std::string& getCorrelationId() const { return correlationId; }
Expand Down Expand Up @@ -181,11 +181,13 @@ class Request CCAPI_FINAL {
std::pair<long long, long long> getTimeSentPair() const { return UtilTime::divide(timeSent); }
void setTimeSent(TimePoint timeSent) { this->timeSent = timeSent; }
int getIndex() const { return index; }
const std::string& getLocalIpAddress() const { return localIpAddress; }
void setIndex(int index) { this->index = index; }
void setCredential(const std::map<std::string, std::string>& credential) { this->credential = credential; }
void setCorrelationId(const std::string& correlationId) { this->correlationId = correlationId; }
void setSecondaryCorrelationId(const std::string& secondaryCorrelationId) { this->secondaryCorrelationId = secondaryCorrelationId; }
void setMarginType(const std::string& marginType) { this->marginType = marginType; }
void bind(const std::string& localIpAddress){this->localIpAddress=localIpAddress;}
#ifndef CCAPI_EXPOSE_INTERNAL

private:
Expand All @@ -202,6 +204,7 @@ class Request CCAPI_FINAL {
std::vector<std::vector<std::pair<int, std::string> > > paramListFix;
TimePoint timeSent{std::chrono::seconds{0}};
int index{};
std::string localIpAddress{CCAPI_LOCAL_IP_ADDRESS_DEFAULT};
};
} /* namespace ccapi */
#endif // INCLUDE_CCAPI_CPP_CCAPI_REQUEST_H_
10 changes: 7 additions & 3 deletions include/ccapi_cpp/ccapi_session.h
Expand Up @@ -259,15 +259,18 @@ class Session {
Session(const Session&) = delete;
Session& operator=(const Session&) = delete;
Session(const SessionOptions& sessionOptions = SessionOptions(), const SessionConfigs& sessionConfigs = SessionConfigs(),
EventHandler* eventHandler = nullptr, EventDispatcher* eventDispatcher = nullptr)
EventHandler* eventHandler = nullptr, EventDispatcher* eventDispatcher = nullptr, ServiceContext* serviceContextPtr=nullptr)
: sessionOptions(sessionOptions),
sessionConfigs(sessionConfigs),
eventHandler(eventHandler),
#ifndef CCAPI_USE_SINGLE_THREAD
eventDispatcher(eventDispatcher),
#endif
eventQueue(sessionOptions.maxEventQueueSize),
serviceContextPtr(new ServiceContext()) {
serviceContextPtr(serviceContextPtr) {
if (!this->serviceContextPtr){
this->serviceContextPtr=new ServiceContext();
}
CCAPI_LOGGER_FUNCTION_ENTER;
#ifndef CCAPI_USE_SINGLE_THREAD
if (this->eventHandler) {
Expand All @@ -291,6 +294,7 @@ class Session {
delete this->eventDispatcher;
}
#endif
delete this->serviceContextPtr;
CCAPI_LOGGER_FUNCTION_EXIT;
}
virtual void start() {
Expand Down Expand Up @@ -968,7 +972,7 @@ class Session {
EventDispatcher* eventDispatcher;
bool useInternalEventDispatcher{};
#endif
std::shared_ptr<ServiceContext> serviceContextPtr;
ServiceContext* serviceContextPtr;
std::map<std::string, std::map<std::string, std::shared_ptr<Service> > > serviceByServiceNameExchangeMap;
std::thread t;
Queue<Event> eventQueue;
Expand Down
5 changes: 2 additions & 3 deletions include/ccapi_cpp/ccapi_session_options.h
Expand Up @@ -28,7 +28,7 @@ class SessionOptions CCAPI_FINAL {
", httpMaxNumRedirect = " + ccapi::toString(httpMaxNumRedirect) +
", httpRequestTimeoutMilliseconds = " + ccapi::toString(httpRequestTimeoutMilliseconds) +
", httpConnectionPoolMaxSize = " + ccapi::toString(httpConnectionPoolMaxSize) +
", httpConnectionPoolIdleTimeoutMilliseconds = " + ccapi::toString(httpConnectionPoolIdleTimeoutMilliseconds) +
", httpConnectionKeepAliveTimeoutSeconds = " + ccapi::toString(httpConnectionKeepAliveTimeoutSeconds) +
", enableOneHttpConnectionPerRequest = " + ccapi::toString(enableOneHttpConnectionPerRequest) + "]";
return output;
}
Expand All @@ -50,8 +50,7 @@ class SessionOptions CCAPI_FINAL {
int httpMaxNumRedirect{1};
long httpRequestTimeoutMilliseconds{10000};
int httpConnectionPoolMaxSize{1}; // used to set the maximal number of http connections to be kept in the pool (connections in the pool are idle)
long httpConnectionPoolIdleTimeoutMilliseconds{0}; // used to purge the http connection pool if all connections in the
// pool have stayed idle for at least this amount of time
long httpConnectionKeepAliveTimeoutSeconds{10}; // used to remove a http connection from the http connection pool if it has stayed idle for at least this amount of time
bool enableOneHttpConnectionPerRequest{}; // create a new http connection for each request
#ifdef CCAPI_LEGACY_USE_WEBSOCKETPP
#else
Expand Down
2 changes: 1 addition & 1 deletion include/ccapi_cpp/service/ccapi_market_data_service.h
Expand Up @@ -17,7 +17,7 @@ namespace ccapi {
class MarketDataService : public Service {
public:
MarketDataService(std::function<void(Event&, Queue<Event>*)> eventHandler, SessionOptions sessionOptions, SessionConfigs sessionConfigs,
std::shared_ptr<ServiceContext> serviceContextPtr)
ServiceContext* serviceContextPtr)
: Service(eventHandler, sessionOptions, sessionConfigs, serviceContextPtr) {
CCAPI_LOGGER_FUNCTION_ENTER;
this->requestOperationToMessageTypeMap = {
Expand Down
Expand Up @@ -7,7 +7,7 @@ namespace ccapi {
class MarketDataServiceAscendex : public MarketDataService {
public:
MarketDataServiceAscendex(std::function<void(Event&, Queue<Event>*)> eventHandler, SessionOptions sessionOptions, SessionConfigs sessionConfigs,
std::shared_ptr<ServiceContext> serviceContextPtr)
ServiceContext* serviceContextPtr)
: MarketDataService(eventHandler, sessionOptions, sessionConfigs, serviceContextPtr) {
this->exchangeName = CCAPI_EXCHANGE_NAME_ASCENDEX;
this->baseUrlWs = sessionConfigs.getUrlWebsocketBase().at(this->exchangeName) + "/api/pro/v1/stream";
Expand Down
Expand Up @@ -7,7 +7,7 @@ namespace ccapi {
class MarketDataServiceBinance : public MarketDataServiceBinanceBase {
public:
MarketDataServiceBinance(std::function<void(Event&, Queue<Event>*)> eventHandler, SessionOptions sessionOptions, SessionConfigs sessionConfigs,
std::shared_ptr<ServiceContext> serviceContextPtr)
ServiceContext* serviceContextPtr)
: MarketDataServiceBinanceBase(eventHandler, sessionOptions, sessionConfigs, serviceContextPtr) {
this->exchangeName = CCAPI_EXCHANGE_NAME_BINANCE;
this->baseUrlWs = sessionConfigs.getUrlWebsocketBase().at(this->exchangeName) + "/stream";
Expand Down
Expand Up @@ -8,7 +8,7 @@ namespace ccapi {
class MarketDataServiceBinanceBase : public MarketDataService {
public:
MarketDataServiceBinanceBase(std::function<void(Event&, Queue<Event>*)> eventHandler, SessionOptions sessionOptions, SessionConfigs sessionConfigs,
std::shared_ptr<ServiceContext> serviceContextPtr)
ServiceContext* serviceContextPtr)
: MarketDataService(eventHandler, sessionOptions, sessionConfigs, serviceContextPtr) {
this->enableCheckPingPongWebsocketApplicationLevel = false;
}
Expand Down
Expand Up @@ -7,7 +7,7 @@ namespace ccapi {
class MarketDataServiceBinanceCoinFutures : public MarketDataServiceBinanceDerivativesBase {
public:
MarketDataServiceBinanceCoinFutures(std::function<void(Event&, Queue<Event>*)> eventHandler, SessionOptions sessionOptions, SessionConfigs sessionConfigs,
std::shared_ptr<ServiceContext> serviceContextPtr)
ServiceContext* serviceContextPtr)
: MarketDataServiceBinanceDerivativesBase(eventHandler, sessionOptions, sessionConfigs, serviceContextPtr) {
this->exchangeName = CCAPI_EXCHANGE_NAME_BINANCE_COIN_FUTURES;
this->baseUrlWs = sessionConfigs.getUrlWebsocketBase().at(this->exchangeName) + "/stream";
Expand Down
Expand Up @@ -7,7 +7,7 @@ namespace ccapi {
class MarketDataServiceBinanceDerivativesBase : public MarketDataServiceBinanceBase {
public:
MarketDataServiceBinanceDerivativesBase(std::function<void(Event&, Queue<Event>*)> eventHandler, SessionOptions sessionOptions, SessionConfigs sessionConfigs,
std::shared_ptr<ServiceContext> serviceContextPtr)
ServiceContext* serviceContextPtr)
: MarketDataServiceBinanceBase(eventHandler, sessionOptions, sessionConfigs, serviceContextPtr) {
this->isDerivatives = true;
}
Expand Down

0 comments on commit f6f5670

Please sign in to comment.