diff --git a/.github/workflows/flex.yml b/.github/workflows/flex.yml index 6a9c6747a896..9f7b4f41b4c8 100644 --- a/.github/workflows/flex.yml +++ b/.github/workflows/flex.yml @@ -50,6 +50,28 @@ jobs: mkdir build && cd build # only test default build cmake .. -DCMAKE_BUILD_TYPE=DEBUG -DBUILD_DOC=OFF && sudo make -j 4 + # test the different combination of cmake options: -DBUILD_HQPS=ON/OFF -DBUILD_TEST=ON/OFF, -DBUILD_ODPS_FRAGMENT_LOADER=ON/OFF + test-cmake-options: + runs-on: ubuntu-20.04 + container: + image: registry.cn-hongkong.aliyuncs.com/graphscope/hqps-server-base:v0.0.9 + strategy: + matrix: + BUILD_HQPS: [ON, OFF] + BUILD_TEST: [ON, OFF] + BUILD_ODPS_FRAGMENT_LOADER: [ON, OFF] + steps: + - uses: actions/checkout@v3 + + - name: Build + run: | + cd ${GITHUB_WORKSPACE}/flex + git submodule update --init + mkdir build && cd build + cmake .. -DBUILD_HQPS=${{ matrix.BUILD_HQPS }} -DBUILD_TEST=${{ matrix.BUILD_TEST }} \ + -DBUILD_ODPS_FRAGMENT_LOADER=${{ matrix.BUILD_ODPS_FRAGMENT_LOADER }} + sudo make -j4 + test-flex: runs-on: ubuntu-20.04 if: ${{ github.repository == 'alibaba/GraphScope' }} @@ -72,9 +94,8 @@ jobs: env: HOME: /home/graphscope/ run: | - cd ${GITHUB_WORKSPACE}/ - git submodule update --init cd ${GITHUB_WORKSPACE}/flex + git submodule update --init mkdir build && cd build cmake .. && sudo make -j$(nproc) diff --git a/.github/workflows/hqps-db-ci.yml b/.github/workflows/hqps-db-ci.yml index 17b5ccfa02f3..f5dd83c1c467 100644 --- a/.github/workflows/hqps-db-ci.yml +++ b/.github/workflows/hqps-db-ci.yml @@ -170,6 +170,17 @@ jobs: eval ${cmd} done + -name: Test cypher procedure generation + run: | + cd ${GITHUB_WORKSPACE}/flex/bin + ./load_plan_and_gen.sh -e=hqps -i=../interactive/examples/modern_graph/get_person_name.cypher -w=/tmp/codegen \ + --ir_conf=/workspaces/GraphScope/flex/tests/hqps/engine_config_test.yaml \ + --graph_schema_path=../interactive/examples/modern_graph/modern_graph.yaml + + ./load_plan_and_gen.sh -e=hqps -i=../interactive/examples/modern_graph/count_vertex_num.cypher -w=/tmp/codegen \ + --ir_conf=/workspaces/GraphScope/flex/tests/hqps/engine_config_test.yaml \ + --graph_schema_path=../interactive/examples/modern_graph/modern_graph.yaml + - name: Run End-to-End cypher adhoc ldbc query test env: GS_TEST_DIR: ${{ github.workspace }}/gstest diff --git a/flex/CMakeLists.txt b/flex/CMakeLists.txt index b14a632ccf4f..94ff4e5ed9b3 100644 --- a/flex/CMakeLists.txt +++ b/flex/CMakeLists.txt @@ -17,6 +17,14 @@ option(BUILD_ODPS_FRAGMENT_LOADER "Whether to build odps fragment loader" ON) option(MONITOR_SESSIONS "Whether monitor sessions" OFF) option(ENABLE_HUGEPAGE "Whether to use hugepages when open mmap array in memory" OFF) +#print options +message(STATUS "Build HighQPS Engine: ${BUILD_HQPS}") +message(STATUS "Build test: ${BUILD_TEST}") +message(STATUS "Build doc: ${BUILD_DOC}") +message(STATUS "Build odps fragment loader: ${BUILD_ODPS_FRAGMENT_LOADER}") +message(STATUS "Monitor sessions: ${MONITOR_SESSIONS}") +message(STATUS "Enable hugepage: ${ENABLE_HUGEPAGE}") + # ------------------------------------------------------------------------------ # cmake configs # ------------------------------------------------------------------------------ @@ -31,6 +39,11 @@ set(CMAKE_BUILD_WITH_INSTALL_RPATH FALSE) set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/lib") set(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE) +if (BUILD_HQPS) + message("Build HighQPS Engine") + add_definitions(-DBUILD_HQPS) +endif () + if (MONITOR_SESSIONS) message("Monitor sessions is enabled") add_definitions(-DMONITOR_SESSIONS) diff --git a/flex/bin/load_plan_and_gen.sh b/flex/bin/load_plan_and_gen.sh index 3afc76828d47..a59ef3e6debe 100755 --- a/flex/bin/load_plan_and_gen.sh +++ b/flex/bin/load_plan_and_gen.sh @@ -276,7 +276,7 @@ compile_hqps_so() { if [ ! -z ${CMAKE_C_COMPILER} ]; then cmd="${cmd} -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}" fi - info "Cmake command = ${cmd}" + info "CMake command = ${cmd}" info "---------------------------" eval ${cmd} diff --git a/flex/codegen/CMakeLists.txt b/flex/codegen/CMakeLists.txt index 336350b15717..87f6834e656c 100644 --- a/flex/codegen/CMakeLists.txt +++ b/flex/codegen/CMakeLists.txt @@ -3,6 +3,8 @@ find_package(Boost REQUIRED COMPONENTS system filesystem context program_options regex thread) include_directories(SYSTEM ${Boost_INCLUDE_DIRS}) -add_executable(gen_code_from_plan gen_code_from_plan.cc) -target_link_libraries(gen_code_from_plan hqps_plan_proto ${GLOG_LIBRARIES} ${Boost_LIBRARIES}) -install_flex_target(gen_code_from_plan) +if (BUILD_HQPS) + add_executable(gen_code_from_plan gen_code_from_plan.cc) + target_link_libraries(gen_code_from_plan hqps_plan_proto ${GLOG_LIBRARIES} ${Boost_LIBRARIES}) + install_flex_target(gen_code_from_plan) +endif() diff --git a/flex/codegen/src/hqps/hqps_scan_builder.h b/flex/codegen/src/hqps/hqps_scan_builder.h index e59650e4fb84..fa7ea9eb5c61 100644 --- a/flex/codegen/src/hqps/hqps_scan_builder.h +++ b/flex/codegen/src/hqps/hqps_scan_builder.h @@ -199,6 +199,9 @@ class ScanOpBuilder { VLOG(10) << "receive param const in index predicate: " << dyn_param_pb.DebugString(); ctx_.AddParameterVar(param_const); + // set to oid_ and oid_type_name_ + oid_ = param_const.var_name; + oid_type_name_ = data_type_2_string(param_const.type); } return *this; diff --git a/flex/codegen/src/hqps_generator.h b/flex/codegen/src/hqps_generator.h index 294a9f94dafe..347ebf90af95 100644 --- a/flex/codegen/src/hqps_generator.h +++ b/flex/codegen/src/hqps_generator.h @@ -71,7 +71,9 @@ static constexpr const char* QUERY_TEMPLATE_STR = " // dump results to string\n" " std::string res_str = res.SerializeAsString();\n" " // encode results to encoder\n" - " encoder.put_string(res_str);\n" + " if (!res_str.empty()){\n" + " encoder.put_string_view(res_str);\n" + " }\n" " return true;\n" " }\n" " //private members\n" @@ -227,12 +229,16 @@ class QueryGenerator { CHECK(param_vars[i] == param_vars[i - 1]); continue; } else { - ss << ", " << data_type_2_string(param_vars[i].type) << " " - << param_vars[i].var_name; + ss << data_type_2_string(param_vars[i].type) << " " + << param_vars[i].var_name << ","; } } } - return ss.str(); + auto str = ss.str(); + if (str.size() > 0) { + str.pop_back(); // remove the last comma + } + return str; } // implement the function that overrides the base class. @@ -265,9 +271,6 @@ class QueryGenerator { std::string param_vars_decoding, param_vars_concat_str; { std::stringstream ss; - if (param_names.size() > 0) { - ss << ","; - } for (size_t i = 0; i < param_names.size(); ++i) { ss << param_names[i]; if (i != param_names.size() - 1) { diff --git a/flex/engines/graph_db/CMakeLists.txt b/flex/engines/graph_db/CMakeLists.txt index 4f5544fcb23b..852af87019b8 100644 --- a/flex/engines/graph_db/CMakeLists.txt +++ b/flex/engines/graph_db/CMakeLists.txt @@ -1,10 +1,16 @@ file(GLOB_RECURSE GRAPH_DB_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/app/*.cc" "${CMAKE_CURRENT_SOURCE_DIR}/database/*.cc") +if (NOT BUILD_HQPS) + list(FILTER GRAPH_DB_SRC_FILES EXCLUDE REGEX ".*hqps_app*.") +endif() add_library(flex_graph_db SHARED ${GRAPH_DB_SRC_FILES}) target_include_directories(flex_graph_db PUBLIC $) -target_link_libraries(flex_graph_db flex_rt_mutable_graph flex_utils hqps_plan_proto ${GLOG_LIBRARIES} ${LIBGRAPELITE_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) +target_link_libraries(flex_graph_db flex_rt_mutable_graph flex_utils ${GLOG_LIBRARIES} ${LIBGRAPELITE_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) +if (BUILD_HQPS) + target_link_libraries(flex_graph_db hqps_plan_proto) +endif() install_flex_target(flex_graph_db) install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/database/graph_db.h @@ -22,4 +28,8 @@ install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/database/graph_db.h install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/app/app_base.h DESTINATION include/flex/engines/graph_db/app) +if (BUILD_HQPS) + install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/app/hqps_app.h + DESTINATION include/flex/engines/graph_db/app) +endif() diff --git a/flex/engines/graph_db/app/hqps_app.cc b/flex/engines/graph_db/app/hqps_app.cc new file mode 100644 index 000000000000..b6e1aa3a4020 --- /dev/null +++ b/flex/engines/graph_db/app/hqps_app.cc @@ -0,0 +1,127 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "flex/engines/graph_db/app/hqps_app.h" +#include "flex/proto_generated_gie/stored_procedure.pb.h" + +namespace gs { + +void put_argment(gs::Encoder& encoder, const query::Argument& argment) { + auto& value = argment.value(); + auto item_case = value.item_case(); + switch (item_case) { + case common::Value::kI32: + encoder.put_int(value.i32()); + break; + case common::Value::kI64: + encoder.put_long(value.i64()); + break; + case common::Value::kF64: + encoder.put_double(value.f64()); + break; + case common::Value::kStr: + encoder.put_string(value.str()); + break; + default: + LOG(ERROR) << "Not recognizable param type" << static_cast(item_case); + } +} + +HQPSAdhocApp::HQPSAdhocApp(GraphDBSession& graph) : graph_(graph) {} + +bool HQPSAdhocApp::Query(Decoder& input, Encoder& output) { + if (input.size() <= 4) { + LOG(ERROR) << "Invalid input for HQPSAdhocApp, input size: " + << input.size(); + return false; + } + std::string input_lib_path = std::string(input.get_string()); + auto app_factory = std::make_shared(input_lib_path); + AppWrapper app_wrapper; // wrapper should be destroyed before the factory + + if (app_factory) { + app_wrapper = app_factory->CreateApp(graph_); + if (app_wrapper.app() == NULL) { + LOG(ERROR) << "Fail to create app for adhoc query: " << input_lib_path; + return false; + } + } else { + LOG(ERROR) << "Fail to evaluate adhoc query: " << input_lib_path; + return false; + } + + return app_wrapper.app()->Query(input, output); +} + +HQPSProcedureApp::HQPSProcedureApp(GraphDBSession& graph) : graph_(graph) {} + +bool HQPSProcedureApp::Query(Decoder& input, Encoder& output) { + if (input.size() <= 0) { + LOG(ERROR) << "Invalid input for HQPSProcedureApp, input size: " + << input.size(); + return false; + } + query::Query cur_query; + if (!cur_query.ParseFromArray(input.data(), input.size())) { + LOG(ERROR) << "Fail to parse query from input content"; + return false; + } + auto query_name = cur_query.query_name().name(); + + std::vector input_buffer; + gs::Encoder input_encoder(input_buffer); + auto& args = cur_query.arguments(); + for (int32_t i = 0; i < args.size(); ++i) { + put_argment(input_encoder, args[i]); + } + VLOG(10) << "Query name: " << query_name << ", args: " << input_buffer.size() + << " bytes"; + gs::Decoder input_decoder(input_buffer.data(), input_buffer.size()); + + if (query_name.empty()) { + LOG(ERROR) << "Query name is empty"; + return false; + } + auto& app_name_to_path_index = graph_.schema().GetPlugins(); + // get procedure id from name. + if (app_name_to_path_index.count(query_name) <= 0) { + LOG(ERROR) << "Query name is not registered: " << query_name; + return false; + } + + // get app + auto type = app_name_to_path_index.at(query_name).second; + auto app = graph_.GetApp(type); + if (!app) { + LOG(ERROR) << "Fail to get app for query: " << query_name + << ", type: " << type; + return false; + } + return app->Query(input_decoder, output); +} + +HQPSAdhocAppFactory::HQPSAdhocAppFactory() {} +HQPSAdhocAppFactory::~HQPSAdhocAppFactory() {} +AppWrapper HQPSAdhocAppFactory::CreateApp(GraphDBSession& graph) { + return AppWrapper(new HQPSAdhocApp(graph), NULL); +} + +HQPSProcedureAppFactory::HQPSProcedureAppFactory() {} +HQPSProcedureAppFactory::~HQPSProcedureAppFactory() {} +AppWrapper HQPSProcedureAppFactory::CreateApp(GraphDBSession& graph) { + return AppWrapper(new HQPSProcedureApp(graph), NULL); +} + +} // namespace gs diff --git a/flex/engines/graph_db/app/hqps_app.h b/flex/engines/graph_db/app/hqps_app.h new file mode 100644 index 000000000000..d9eec0f31e8c --- /dev/null +++ b/flex/engines/graph_db/app/hqps_app.h @@ -0,0 +1,70 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ENGINES_GRAPH_DB_APP_HQPS_APP_H_ +#define ENGINES_GRAPH_DB_APP_HQPS_APP_H_ + +#include "flex/engines/graph_db/app/app_base.h" +#include "flex/engines/graph_db/database/graph_db_session.h" + +namespace gs { + +/** + * @brief HQPSAdhocApp is a builtin, proxy app used to evaluate adhoc query. + */ +class HQPSAdhocApp : public AppBase { + public: + HQPSAdhocApp(GraphDBSession& graph); + + bool Query(Decoder& input, Encoder& output) override; + + private: + GraphDBSession& graph_; +}; + +/** + * @brief HQPSProcedureApp is a builtin, proxy app used to evaluate procedure + * query. + */ +class HQPSProcedureApp : public AppBase { + public: + HQPSProcedureApp(GraphDBSession& graph); + + bool Query(Decoder& input, Encoder& output) override; + + private: + GraphDBSession& graph_; +}; + +// Factory +class HQPSAdhocAppFactory : public AppFactoryBase { + public: + HQPSAdhocAppFactory(); + ~HQPSAdhocAppFactory(); + + AppWrapper CreateApp(GraphDBSession& graph) override; +}; + +// Factory +class HQPSProcedureAppFactory : public AppFactoryBase { + public: + HQPSProcedureAppFactory(); + ~HQPSProcedureAppFactory(); + + AppWrapper CreateApp(GraphDBSession& graph) override; +}; +} // namespace gs + +#endif // ENGINES_GRAPH_DB_APP_HQPS_APP_H_ diff --git a/flex/engines/graph_db/database/graph_db.cc b/flex/engines/graph_db/database/graph_db.cc index 24f553d665e6..3136562e0020 100644 --- a/flex/engines/graph_db/database/graph_db.cc +++ b/flex/engines/graph_db/database/graph_db.cc @@ -14,9 +14,9 @@ */ #include "flex/engines/graph_db/database/graph_db.h" -#include "flex/engines/graph_db/database/graph_db_session.h" - +#include "flex/engines/graph_db/app/hqps_app.h" #include "flex/engines/graph_db/app/server_app.h" +#include "flex/engines/graph_db/database/graph_db_session.h" #include "flex/engines/graph_db/database/wal.h" #include "flex/utils/yaml_utils.h" @@ -351,7 +351,15 @@ void GraphDB::initApps( for (size_t i = 0; i < 256; ++i) { app_factories_[i] = nullptr; } + // Builtin apps app_factories_[0] = std::make_shared(); +#ifdef BUILD_HQPS + app_factories_[Schema::HQPS_ADHOC_PLUGIN_ID] = + std::make_shared(); + app_factories_[Schema::HQPS_PROCEDURE_PLUGIN_ID] = + std::make_shared(); +#endif // BUILD_HQPS + size_t valid_plugins = 0; for (auto& path_and_index : plugins) { auto path = path_and_index.second.first; diff --git a/flex/engines/graph_db/database/graph_db_session.cc b/flex/engines/graph_db/database/graph_db_session.cc index 83ec87e369f1..ceaeca3c7b6e 100644 --- a/flex/engines/graph_db/database/graph_db_session.cc +++ b/flex/engines/graph_db/database/graph_db_session.cc @@ -24,27 +24,6 @@ namespace gs { -void put_argment(Encoder& encoder, const query::Argument& argment) { - auto& value = argment.value(); - auto item_case = value.item_case(); - switch (item_case) { - case common::Value::kI32: - encoder.put_int(value.i32()); - break; - case common::Value::kI64: - encoder.put_long(value.i64()); - break; - case common::Value::kF64: - encoder.put_double(value.f64()); - break; - case common::Value::kStr: - encoder.put_string(value.str()); - break; - default: - LOG(ERROR) << "Not recognizable param type" << static_cast(item_case); - } -} - ReadTransaction GraphDBSession::GetReadTransaction() { uint32_t ts = db_.version_manager_.acquire_read_timestamp(); return ReadTransaction(db_.graph_, db_.version_manager_, ts); @@ -124,8 +103,6 @@ std::shared_ptr GraphDBSession::get_vertex_id_column( } } -#define likely(x) __builtin_expect(!!(x), 1) - Result> GraphDBSession::Eval(const std::string& input) { const auto start = std::chrono::high_resolution_clock::now(); uint8_t type = input.back(); @@ -137,22 +114,11 @@ Result> GraphDBSession::Eval(const std::string& input) { Decoder decoder(str_data, str_len); Encoder encoder(result_buffer); - AppBase* app = nullptr; - if (likely(apps_[type] != nullptr)) { - app = apps_[type]; - } else { - app_wrappers_[type] = db_.CreateApp(type, thread_id_); - if (app_wrappers_[type].app() == NULL) { - LOG(ERROR) << "[Query-" + std::to_string((int) type) - << "] is not registered..."; - return Result>( - StatusCode::NotExists, - "Query:" + std::to_string((int) type) + " is not registere", - result_buffer); - } else { - apps_[type] = app_wrappers_[type].app(); - app = apps_[type]; - } + AppBase* app = GetApp(type); + if (!app) { + return Result>( + StatusCode::NotFound, + "Procedure not found, id:" + std::to_string((int) type), result_buffer); } for (size_t i = 0; i < MAX_RETRY; ++i) { @@ -193,127 +159,6 @@ Result> GraphDBSession::Eval(const std::string& input) { result_buffer); } -// Evaluating stored procedure for hqps adhoc query, the dynamic lib is closed -// immediately after the query -Result> GraphDBSession::EvalAdhoc( - const std::string& input_lib_path) { - std::vector result_buffer; - std::vector input_buffer; // empty. Adhoc query receives no input - Decoder decoder(input_buffer.data(), input_buffer.size()); - Encoder encoder(result_buffer); - - // the dynamic library will automatically be closed after the query - auto app_factory = std::make_shared(input_lib_path); - AppWrapper app_wrapper; // wrapper should be destroyed before the factory - - if (app_factory) { - app_wrapper = app_factory->CreateApp(*this); - if (app_wrapper.app() == NULL) { - LOG(ERROR) << "Fail to create app for adhoc query: " << input_lib_path; - return Result>( - StatusCode::InternalError, - "Fail to create app for: " + input_lib_path, result_buffer); - } - } else { - LOG(ERROR) << "Fail to evaluate adhoc query: " << input_lib_path; - return Result>( - StatusCode::NotExists, - "Fail to open dynamic lib for: " + input_lib_path, result_buffer); - } - - for (size_t i = 0; i < MAX_RETRY; ++i) { - if (app_wrapper.app()->Query(decoder, encoder)) { - return result_buffer; - } - - LOG(INFO) << "[Query-" << input_lib_path << "][Thread-" << thread_id_ - << "] retry - " << i << " / " << MAX_RETRY; - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - result_buffer.clear(); - } - return Result>( - StatusCode::QueryFailed, - "Query failed for adhoc query: " + input_lib_path, result_buffer); -} - -Result> GraphDBSession::EvalHqpsProcedure( - const query::Query& query_pb) { - auto query_name = query_pb.query_name().name(); - if (query_name.empty()) { - LOG(ERROR) << "Query name is empty"; - return Result>(StatusCode::InValidArgument, - "Query name is empty", {}); - } - auto& app_name_to_path_index = db_.schema().GetPlugins(); - // get procedure id from name. - if (app_name_to_path_index.count(query_name) <= 0) { - LOG(ERROR) << "Query name is not registered: " << query_name; - return Result>( - StatusCode::NotExists, "Query name is not registered: " + query_name, - {}); - } - - // get app - auto type = app_name_to_path_index.at(query_name).second; - if (type >= apps_.size()) { - LOG(ERROR) << "Query type is not registered: " << type; - return Result>( - StatusCode::NotExists, - "Query type is not registered: " + std::to_string(type), {}); - } - AppBase* app = nullptr; - if (likely(apps_[type] != nullptr)) { - app = apps_[type]; - } else { - app_wrappers_[type] = db_.CreateApp(type, thread_id_); - if (app_wrappers_[type].app() == NULL) { - LOG(ERROR) << "[Query-" + std::to_string((int) type) - << "] is not registered..."; - return Result>( - StatusCode::NotExists, - "Query:" + std::to_string((int) type) + " is not registered", {}); - } else { - apps_[type] = app_wrappers_[type].app(); - app = apps_[type]; - } - } - - if (app == nullptr) { - LOG(ERROR) << "Query type is not registered: " << type - << ", query name: " << query_name; - return Result>( - StatusCode::NotExists, - "Query type is not registered: " + std::to_string(type), {}); - } - - std::vector input_buffer; - gs::Encoder input_encoder(input_buffer); - auto& args = query_pb.arguments(); - for (int32_t i = 0; i < args.size(); ++i) { - auto& arg = args[i]; - put_argment(input_encoder, arg); - } - const char* str_data = input_buffer.data(); - size_t str_len = input_buffer.size(); - gs::Decoder input_decoder(input_buffer.data(), input_buffer.size()); - - for (size_t i = 0; i < MAX_RETRY; ++i) { - std::vector result_buffer; - gs::Encoder result_encoder(result_buffer); - if (app->Query(input_decoder, result_encoder)) { - return Result>(std::move(result_buffer)); - } - LOG(INFO) << "[Query-" << query_name << "][Thread-" << thread_id_ - << "] retry - " << i << " / " << MAX_RETRY; - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - input_decoder.reset(str_data, str_len); - } - return Result>( - StatusCode::QueryFailed, "Query failed for procedure: " + query_name, {}); -} - -#undef likely - void GraphDBSession::GetAppInfo(Encoder& result) { db_.GetAppInfo(result); } int GraphDBSession::SessionId() const { return thread_id_; } @@ -344,6 +189,34 @@ double GraphDBSession::eval_duration() const { int64_t GraphDBSession::query_num() const { return query_num_.load(); } +#define likely(x) __builtin_expect(!!(x), 1) + +AppBase* GraphDBSession::GetApp(int type) { + // create if not exist + if (type >= GraphDBSession::MAX_PLUGIN_NUM) { + LOG(ERROR) << "Query type is out of range: " << type << " > " + << GraphDBSession::MAX_PLUGIN_NUM; + return nullptr; + } + AppBase* app = nullptr; + if (likely(apps_[type] != nullptr)) { + app = apps_[type]; + } else { + app_wrappers_[type] = db_.CreateApp(type, thread_id_); + if (app_wrappers_[type].app() == NULL) { + LOG(ERROR) << "[Query-" + std::to_string((int) type) + << "] is not registered..."; + return nullptr; + } else { + apps_[type] = app_wrappers_[type].app(); + app = apps_[type]; + } + } + return app; +} + +#undef likely // likely + const AppMetric& GraphDBSession::GetAppMetric(int idx) const { return app_metrics_[idx]; } diff --git a/flex/engines/graph_db/database/graph_db_session.h b/flex/engines/graph_db/database/graph_db_session.h index cd6927cc3aef..e4adee1d84a3 100644 --- a/flex/engines/graph_db/database/graph_db_session.h +++ b/flex/engines/graph_db/database/graph_db_session.h @@ -24,7 +24,6 @@ #include "flex/engines/graph_db/database/single_vertex_insert_transaction.h" #include "flex/engines/graph_db/database/transaction_utils.h" #include "flex/engines/graph_db/database/update_transaction.h" -#include "flex/proto_generated_gie/stored_procedure.pb.h" #include "flex/storages/rt_mutable_graph/mutable_property_fragment.h" #include "flex/utils/property/column.h" #include "flex/utils/result.h" @@ -34,11 +33,10 @@ namespace gs { class GraphDB; class WalWriter; -void put_argment(gs::Encoder& encoder, const query::Argument& argment); - class GraphDBSession { public: static constexpr int32_t MAX_RETRY = 3; + static constexpr int32_t MAX_PLUGIN_NUM = 256; // 2^(sizeof(uint8_t)*8) GraphDBSession(GraphDB& db, Allocator& alloc, WalWriter& logger, const std::string& work_dir, int thread_id) : db_(db), @@ -83,13 +81,6 @@ class GraphDBSession { Result> Eval(const std::string& input); - // Evaluate a temporary stored procedure. close the handle of the dynamic lib - // immediately. - Result> EvalAdhoc(const std::string& input_lib_path); - - // Evaluate a stored procedure with input parameters given. - Result> EvalHqpsProcedure(const query::Query& query_pb); - void GetAppInfo(Encoder& result); int SessionId() const; @@ -104,6 +95,8 @@ class GraphDBSession { int64_t query_num() const; + AppBase* GetApp(int idx); + private: GraphDB& db_; Allocator& alloc_; @@ -111,9 +104,9 @@ class GraphDBSession { std::string work_dir_; int thread_id_; - std::array app_wrappers_; - std::array apps_; - std::array app_metrics_; + std::array app_wrappers_; + std::array apps_; + std::array app_metrics_; #ifdef MONITOR_SESSIONS std::atomic eval_duration_; diff --git a/flex/engines/http_server/CMakeLists.txt b/flex/engines/http_server/CMakeLists.txt index f337cff353d8..2123b59c29f6 100644 --- a/flex/engines/http_server/CMakeLists.txt +++ b/flex/engines/http_server/CMakeLists.txt @@ -6,15 +6,32 @@ if (Hiactor_FOUND) SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/ INCLUDE_PATHS ${Hiactor_INCLUDE_DIR},${CMAKE_CURRENT_SOURCE_DIR}/../../../,${CMAKE_CURRENT_SOURCE_DIR}/../../third_party/nlohmann-json/single_include/) + if (NOT BUILD_HQPS) + list(FILTER server_actor_autogen_files EXCLUDE REGEX ".*admin.*") + list(FILTER server_actor_autogen_files EXCLUDE REGEX ".*codegen.*") + endif () + + # get all .cc files in current directory, except for generated/ file(GLOB_RECURSE SERVER_FILES "${CMAKE_CURRENT_SOURCE_DIR}/*.cc") + list(FILTER SERVER_FILES EXCLUDE REGEX ".*generated.*") + + if (NOT BUILD_HQPS) + list(FILTER SERVER_FILES EXCLUDE REGEX ".*admin*") + list(FILTER SERVER_FILES EXCLUDE REGEX ".*hqps*") + list(FILTER SERVER_FILES EXCLUDE REGEX ".*codegen*") + list(FILTER SERVER_FILES EXCLUDE REGEX ".*workdir_manipulator*") + endif () add_library(flex_server STATIC ${SERVER_FILES} ${server_actor_autogen_files}) add_dependencies(flex_server server_actor_autogen) target_compile_options (flex_server PUBLIC -Wno-attributes) - target_link_libraries(flex_server Hiactor::hiactor hqps_plan_proto flex_graph_db) - target_include_directories(flex_server PUBLIC ${CMAKE_CURRENT_BINARY_DIR}/../hqps/) + target_link_libraries(flex_server Hiactor::hiactor flex_graph_db) + if (BUILD_HQPS) + target_include_directories(flex_server PUBLIC ${CMAKE_CURRENT_BINARY_DIR}/../hqps/) + target_link_libraries(flex_server hqps_plan_proto) + endif () install_without_export_flex_target(flex_server) endif () diff --git a/flex/engines/http_server/actor/codegen_actor.act.cc b/flex/engines/http_server/actor/codegen_actor.act.cc index 9fdfc1260c9d..6e1be0151859 100644 --- a/flex/engines/http_server/actor/codegen_actor.act.cc +++ b/flex/engines/http_server/actor/codegen_actor.act.cc @@ -37,13 +37,13 @@ codegen_actor::codegen_actor(hiactor::actor_base* exec_ctx, // ... } -seastar::future codegen_actor::do_codegen(query_param&& param) { +seastar::future codegen_actor::do_codegen(query_param&& param) { LOG(INFO) << "Running codegen for " << param.content.size(); // The received query's pay load shoud be able to deserialze to physical plan auto& str = param.content; if (str.size() <= 0) { LOG(INFO) << "Empty query"; - return seastar::make_exception_future( + return seastar::make_exception_future( std::runtime_error("Empty query string")); } @@ -57,7 +57,7 @@ seastar::future codegen_actor::do_codegen(query_param&& param) { VLOG(10) << "Parse physical plan: " << plan.DebugString(); } else { LOG(ERROR) << "Fail to parse physical plan"; - return seastar::make_exception_future( + return seastar::make_exception_future( std::runtime_error("Fail to parse physical plan")); } @@ -69,7 +69,7 @@ seastar::future codegen_actor::do_codegen(query_param&& param) { return codegen_proxy.DoGen(plan).then( [](std::pair&& job_id_and_lib_path) { if (job_id_and_lib_path.first == -1) { - return seastar::make_exception_future( + return seastar::make_exception_future( std::runtime_error("Fail to parse job id from codegen proxy")); } // 1. load and run. @@ -77,11 +77,11 @@ seastar::future codegen_actor::do_codegen(query_param&& param) { << job_id_and_lib_path.second << ", job id: " << job_id_and_lib_path.first << "local shard id: " << hiactor::local_shard_id(); - return seastar::make_ready_future( - std::move(job_id_and_lib_path)); + return seastar::make_ready_future( + std::move(job_id_and_lib_path.second)); }); } else { - return seastar::make_exception_future( + return seastar::make_exception_future( std::runtime_error("Codegen proxy not initialized")); } } diff --git a/flex/engines/http_server/actor/codegen_actor.act.h b/flex/engines/http_server/actor/codegen_actor.act.h index 6222a81db4fd..597b40167082 100644 --- a/flex/engines/http_server/actor/codegen_actor.act.h +++ b/flex/engines/http_server/actor/codegen_actor.act.h @@ -28,7 +28,7 @@ class ANNOTATION(actor:impl) codegen_actor : public hiactor::actor { codegen_actor(hiactor::actor_base* exec_ctx, const hiactor::byte_t* addr); ~codegen_actor() override; - seastar::future ANNOTATION(actor:method) do_codegen(query_param&& param); + seastar::future ANNOTATION(actor:method) do_codegen(query_param&& param); // DECLARE_RUN_QUERYS; /// Declare `do_work` func here, no need to implement. diff --git a/flex/engines/http_server/actor/executor.act.cc b/flex/engines/http_server/actor/executor.act.cc index cfedde1a3634..e0bcdcaeb177 100644 --- a/flex/engines/http_server/actor/executor.act.cc +++ b/flex/engines/http_server/actor/executor.act.cc @@ -17,9 +17,6 @@ #include "flex/engines/graph_db/database/graph_db.h" #include "flex/engines/graph_db/database/graph_db_session.h" -#include "flex/engines/http_server/codegen_proxy.h" -#include "flex/engines/http_server/workdir_manipulator.h" -#include "flex/proto_generated_gie/stored_procedure.pb.h" #include "nlohmann/json.hpp" #include @@ -51,72 +48,4 @@ seastar::future executor::run_graph_db_query( return seastar::make_ready_future(std::move(content)); } -// run_query_for stored_procedure -seastar::future executor::run_hqps_procedure_query( - query_param&& param) { - auto& str = param.content; - const char* str_data = str.data(); - size_t str_length = str.size(); - LOG(INFO) << "Receive pay load: " << str_length << " bytes"; - - query::Query cur_query; - if (!cur_query.ParseFromArray(str_data, str_length)) { - LOG(ERROR) << "Fail to parse query from pay load"; - return seastar::make_ready_future( - seastar::sstring("Fail to parse query from pay load")); - } - - auto ret = gs::GraphDB::get() - .GetSession(hiactor::local_shard_id()) - .EvalHqpsProcedure(cur_query); - if (!ret.ok()) { - LOG(ERROR) << "Eval failed: " << ret.status().error_message(); - return seastar::make_exception_future( - seastar::sstring(ret.status().error_message())); - } - auto result = ret.value(); - if (result.size() < 4) { - return seastar::make_exception_future(seastar::sstring( - "Internal Error, more than 4 bytes should be returned")); - } - seastar::sstring content( - result.data() + 4, - result.size() - 4); // skip 4 bytes, since the first 4 - // bytes is the size of the result - return seastar::make_ready_future(std::move(content)); -} - -seastar::future executor::run_hqps_adhoc_query( - adhoc_result&& param) { - LOG(INFO) << "Run adhoc query"; - // The received query's pay load shoud be able to deserialze to physical plan - // 1. load and run. - auto& content = param.content; - LOG(INFO) << "Okay, try to run the query of lib path: " << content.second - << ", job id: " << content.first - << "local shard id: " << hiactor::local_shard_id(); - // seastar::sstring result = server::load_and_run(content.first, - // content.second); - auto ret = gs::GraphDB::get() - .GetSession(hiactor::local_shard_id()) - .EvalAdhoc(content.second); - if (!ret.ok()) { - LOG(ERROR) << "Eval failed: " << ret.status().error_message(); - return seastar::make_exception_future( - seastar::sstring(ret.status().error_message())); - } - auto ret_value = ret.value(); - LOG(INFO) << "Adhoc query result size: " << ret_value.size(); - if (ret_value.size() < 4) { - return seastar::make_exception_future(seastar::sstring( - "Internal Error, more than 4 bytes should be returned")); - } - - seastar::sstring result( - ret_value.data() + 4, - ret_value.size() - 4); // skip 4 bytes, since the first 4 - // bytes is the size of the result - return seastar::make_ready_future(std::move(result)); -} - } // namespace server diff --git a/flex/engines/http_server/actor/executor.act.h b/flex/engines/http_server/actor/executor.act.h index e3e504415440..63f03f518f7e 100644 --- a/flex/engines/http_server/actor/executor.act.h +++ b/flex/engines/http_server/actor/executor.act.h @@ -30,9 +30,6 @@ class ANNOTATION(actor:impl) executor : public hiactor::actor { seastar::future ANNOTATION(actor:method) run_graph_db_query(query_param&& param); - seastar::future ANNOTATION(actor:method) run_hqps_procedure_query(query_param&& param); - - seastar::future ANNOTATION(actor:method) run_hqps_adhoc_query(adhoc_result&& param); // DECLARE_RUN_QUERYS; /// Declare `do_work` func here, no need to implement. diff --git a/flex/engines/http_server/codegen_proxy.h b/flex/engines/http_server/codegen_proxy.h index 44a0a6994ecb..c71b11a2d832 100644 --- a/flex/engines/http_server/codegen_proxy.h +++ b/flex/engines/http_server/codegen_proxy.h @@ -15,13 +15,13 @@ #ifndef ENGINES_HQPS_SERVER_CODEGEN_PROXY_H_ #define ENGINES_HQPS_SERVER_CODEGEN_PROXY_H_ +#include #include #include #include #include #include #include -#include #include "glog/logging.h" diff --git a/flex/engines/http_server/handler/admin_http_handler.cc b/flex/engines/http_server/handler/admin_http_handler.cc index 99823bccb4b2..415de6f7110d 100644 --- a/flex/engines/http_server/handler/admin_http_handler.cc +++ b/flex/engines/http_server/handler/admin_http_handler.cc @@ -508,8 +508,9 @@ void admin_http_handler::start() { .then([this] { return set_routes(); }) .then([this] { return server_.listen(http_port_); }) .then([this] { - fmt::print("HQPS admin http handler is listening on port {} ...\n", - http_port_); + fmt::print( + "HQPS admin http handler is listening on port {} ...\n", + http_port_); }); }); fut.wait(); diff --git a/flex/engines/http_server/handler/hqps_http_handler.cc b/flex/engines/http_server/handler/hqps_http_handler.cc index 82102867586f..9c1be8730ffd 100644 --- a/flex/engines/http_server/handler/hqps_http_handler.cc +++ b/flex/engines/http_server/handler/hqps_http_handler.cc @@ -15,7 +15,6 @@ #include "flex/engines/http_server/executor_group.actg.h" #include "flex/engines/http_server/options.h" #include "flex/engines/http_server/service/hqps_service.h" - #include "flex/engines/http_server/types.h" namespace server { @@ -86,9 +85,18 @@ seastar::future> hqps_ic_handler::handle( std::unique_ptr rep) { auto dst_executor = executor_idx_; executor_idx_ = (executor_idx_ + 1) % shard_concurrency_; + req->content.append(gs::Schema::HQPS_PROCEDURE_PLUGIN_ID_STR, 1); return executor_refs_[dst_executor] - .run_hqps_procedure_query(query_param{std::move(req->content)}) + .run_graph_db_query(query_param{std::move(req->content)}) + .then([](auto&& output) { + if (output.content.size() < 4) { + LOG(ERROR) << "Invalid output size: " << output.content.size(); + return seastar::make_ready_future(std::move(output)); + } + return seastar::make_ready_future( + std::move(output.content.substr(4))); + }) .then_wrapped( [rep = std::move(rep)](seastar::future&& fut) mutable { if (__builtin_expect(fut.failed(), false)) { @@ -223,8 +231,17 @@ hqps_adhoc_query_handler::handle(const seastar::sstring& path, return codegen_actor_refs_[0] .do_codegen(query_param{std::move(req->content)}) .then([this, dst_executor](auto&& param) { - return executor_refs_[dst_executor].run_hqps_adhoc_query( - std::move(param)); + param.content.append(gs::Schema::HQPS_ADHOC_PLUGIN_ID_STR, 1); + return executor_refs_[dst_executor].run_graph_db_query( + query_param{std::move(param.content)}); + }) + .then([](auto&& output) { + if (output.content.size() < 4) { + LOG(ERROR) << "Invalid output size: " << output.content.size(); + return seastar::make_ready_future(std::move(output)); + } + return seastar::make_ready_future( + std::move(output.content.substr(4))); }) .then_wrapped( [rep = std::move(rep)](seastar::future&& fut) mutable { diff --git a/flex/engines/http_server/types.h b/flex/engines/http_server/types.h index 965fbd7137d3..f43cd6d4fe08 100644 --- a/flex/engines/http_server/types.h +++ b/flex/engines/http_server/types.h @@ -55,7 +55,6 @@ struct payload { using query_param = payload; using query_result = payload; -using adhoc_result = payload>; using admin_query_result = payload>; // url_path, query_param using graph_management_param = diff --git a/flex/engines/http_server/workdir_manipulator.cc b/flex/engines/http_server/workdir_manipulator.cc index 0c00581fe179..56106449c784 100644 --- a/flex/engines/http_server/workdir_manipulator.cc +++ b/flex/engines/http_server/workdir_manipulator.cc @@ -1,3 +1,18 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #include "flex/engines/http_server/workdir_manipulator.h" #include "flex/engines/http_server/codegen_proxy.h" diff --git a/flex/interactive/examples/modern_graph/count_vertex_num.cypher b/flex/interactive/examples/modern_graph/count_vertex_num.cypher index cca16c40269d..8cecc7965cb7 100644 --- a/flex/interactive/examples/modern_graph/count_vertex_num.cypher +++ b/flex/interactive/examples/modern_graph/count_vertex_num.cypher @@ -1 +1 @@ -MATCH(v:person { id: $personId}) RETURN COUNT(v); \ No newline at end of file +MATCH(v:person) RETURN COUNT(v); \ No newline at end of file diff --git a/flex/storages/rt_mutable_graph/CMakeLists.txt b/flex/storages/rt_mutable_graph/CMakeLists.txt index 82150ac98abd..6db4a18e095a 100644 --- a/flex/storages/rt_mutable_graph/CMakeLists.txt +++ b/flex/storages/rt_mutable_graph/CMakeLists.txt @@ -3,9 +3,8 @@ find_package(yaml-cpp REQUIRED) include_directories(SYSTEM ${yaml-cpp_INCLUDE_DIRS}) -if (BUILD_ODPS_FRAGMENT_LOADER) - file(GLOB_RECURSE RT_MUTABLE_GRAPH_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/*.cc") -else() +file(GLOB_RECURSE RT_MUTABLE_GRAPH_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/*.cc") +if (NOT BUILD_ODPS_FRAGMENT_LOADER) # exclude odps_fragment_loader.cc message(STATUS "exclude odps_fragment_loader.cc") file(GLOB_RECURSE RT_MUTABLE_GRAPH_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/*.cc") diff --git a/flex/storages/rt_mutable_graph/schema.cc b/flex/storages/rt_mutable_graph/schema.cc index 362d6f39e7b3..66281a7188a2 100644 --- a/flex/storages/rt_mutable_graph/schema.cc +++ b/flex/storages/rt_mutable_graph/schema.cc @@ -930,6 +930,10 @@ bool Schema::EmplacePlugins( uint8_t cur_plugin_id = RESERVED_PLUGIN_NUM; std::unordered_set plugin_names; for (auto& f : plugin_paths_or_names) { + if (cur_plugin_id > MAX_PLUGIN_ID) { + LOG(ERROR) << "Too many plugins, max plugin id is " << MAX_PLUGIN_ID; + return false; + } if (std::filesystem::exists(f)) { plugin_name_to_path_and_id_.emplace(f, std::make_pair(f, cur_plugin_id++)); @@ -952,6 +956,10 @@ bool Schema::EmplacePlugins( // if there exists any plugins specified by name, add them // Iterator over the map, and add the plugin path and name to the vector for (auto cur_yaml : all_procedure_yamls) { + if (cur_plugin_id > MAX_PLUGIN_ID) { + LOG(ERROR) << "Too many plugins, max plugin id is " << MAX_PLUGIN_ID; + return false; + } YAML::Node root; try { root = YAML::LoadFile(cur_yaml); diff --git a/flex/storages/rt_mutable_graph/schema.h b/flex/storages/rt_mutable_graph/schema.h index 7f69cbac14b5..3eefc91a4f9d 100644 --- a/flex/storages/rt_mutable_graph/schema.h +++ b/flex/storages/rt_mutable_graph/schema.h @@ -30,6 +30,15 @@ class Schema { // How many built-in plugins are there. // Currently only one builtin plugin, SERVER_APP is supported. static constexpr uint8_t RESERVED_PLUGIN_NUM = 1; +#ifdef BUILD_HQPS + static constexpr uint8_t MAX_PLUGIN_ID = 253; + static constexpr uint8_t HQPS_ADHOC_PLUGIN_ID = 254; + static constexpr uint8_t HQPS_PROCEDURE_PLUGIN_ID = 255; + static constexpr const char* HQPS_ADHOC_PLUGIN_ID_STR = "\xFE"; + static constexpr const char* HQPS_PROCEDURE_PLUGIN_ID_STR = "\xFF"; +#else + static constexpr uint8_t MAX_PLUGIN_ID = 255; +#endif // BUILD_HQPS static constexpr const char* PRIMITIVE_TYPE_KEY = "primitive_type"; static constexpr const char* VARCHAR_KEY = "varchar"; static constexpr const char* MAX_LENGTH_KEY = "max_length"; diff --git a/flex/tests/hqps/CMakeLists.txt b/flex/tests/hqps/CMakeLists.txt index 003e4e2f44f9..57976bfbe043 100644 --- a/flex/tests/hqps/CMakeLists.txt +++ b/flex/tests/hqps/CMakeLists.txt @@ -2,11 +2,12 @@ # file(GLOB_RECURSE GS_TEST_FILES "${CMAKE_CURRENT_SOURCE_DIR}/*.cc") file(GLOB GS_TEST_FILES RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "${CMAKE_CURRENT_SOURCE_DIR}/*.cc") - -foreach(f ${GS_TEST_FILES}) - string(REGEX MATCH "^(.*)\\.[^.]*$" dummy ${f}) - set(T_NAME ${CMAKE_MATCH_1}) - message(STATUS "Found graphscope test - " ${T_NAME}) - add_executable(${T_NAME} ${CMAKE_CURRENT_SOURCE_DIR}/${T_NAME}.cc) - target_link_libraries(${T_NAME} hqps_plan_proto flex_rt_mutable_graph flex_graph_db flex_utils ${GLOG_LIBRARIES} ${LIBGRAPELITE_LIBRARIES}) -endforeach() +if (BUILD_HQPS) + foreach(f ${GS_TEST_FILES}) + string(REGEX MATCH "^(.*)\\.[^.]*$" dummy ${f}) + set(T_NAME ${CMAKE_MATCH_1}) + message(STATUS "Found graphscope test - " ${T_NAME}) + add_executable(${T_NAME} ${CMAKE_CURRENT_SOURCE_DIR}/${T_NAME}.cc) + target_link_libraries(${T_NAME} hqps_plan_proto flex_rt_mutable_graph flex_graph_db flex_utils ${GLOG_LIBRARIES} ${LIBGRAPELITE_LIBRARIES}) + endforeach() +endif() \ No newline at end of file diff --git a/flex/utils/app_utils.cc b/flex/utils/app_utils.cc index e2acfbb4cdec..d138080ef5ea 100644 --- a/flex/utils/app_utils.cc +++ b/flex/utils/app_utils.cc @@ -95,7 +95,7 @@ void Encoder::put_small_string_view(const std::string_view& v) { memcpy(&buf_[size + 1], v.data(), len); } -void Encoder::put_double(double v){ +void Encoder::put_double(double v) { size_t size = buf_.size(); buf_.resize(size + sizeof(double)); memcpy(&buf_[size], &v, sizeof(double)); @@ -113,7 +113,7 @@ static int char_ptr_to_int(const char* data) { return *ptr; } -static double char_ptr_to_double(const char* data){ +static double char_ptr_to_double(const char* data) { const double* ptr = reinterpret_cast(data); return *ptr; } @@ -130,7 +130,7 @@ int64_t Decoder::get_long() { return ret; } -double Decoder::get_double(){ +double Decoder::get_double() { double ret = char_ptr_to_double(data_); data_ += 8; return ret; @@ -154,6 +154,8 @@ uint8_t Decoder::get_byte() { return static_cast(*(data_++)); } const char* Decoder::data() const { return data_; } +size_t Decoder::size() const { return end_ - data_; } + bool Decoder::empty() const { return data_ == end_; } void Decoder::reset(const char* p, size_t size) { diff --git a/flex/utils/app_utils.h b/flex/utils/app_utils.h index cc23eadf0473..a8156376dc95 100644 --- a/flex/utils/app_utils.h +++ b/flex/utils/app_utils.h @@ -79,6 +79,8 @@ class Decoder { const char* data() const; + size_t size() const; + bool empty() const; void reset(const char* ptr, size_t size);