From 80a3a46444936ba2d1d0905cb6a70b3970691073 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Fri, 19 Jan 2024 16:51:10 +0800 Subject: [PATCH 01/16] fix -DBUILD_HQPS=OFF --- .github/workflows/flex.yml | 27 +++++++++++++++++-- flex/CMakeLists.txt | 5 ++++ flex/codegen/CMakeLists.txt | 8 +++--- flex/engines/http_server/CMakeLists.txt | 19 +++++++++++-- .../engines/http_server/actor/executor.act.cc | 2 ++ flex/storages/rt_mutable_graph/CMakeLists.txt | 5 ++-- flex/tests/hqps/CMakeLists.txt | 17 ++++++------ 7 files changed, 65 insertions(+), 18 deletions(-) diff --git a/.github/workflows/flex.yml b/.github/workflows/flex.yml index b6871e55e74e..baf0f4810a84 100644 --- a/.github/workflows/flex.yml +++ b/.github/workflows/flex.yml @@ -40,12 +40,35 @@ jobs: make -j$(nproc) make install - - name: Build + - name: Test CMake with different options env: HOME: /home/graphscope/ run: | - cd ${GITHUB_WORKSPACE}/ + cd ${GITHUB_WORKSPACE}/flex git submodule update --init + # try different combination of different options. -DBUILD_HQPS=ON/OFF -DBUILD_TEST=ON/OFF, -DBUILD_ODPS_FRAGMENT_LOADER=ON/OFF + mkdir build && cd build + cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=OFF + make -j$(nproc) + cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=ON + make -j$(nproc) + cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=OFF + make -j$(nproc) + cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=ON + make -j$(nproc) + cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=OFF + make -j$(nproc) + cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=ON + make -j$(nproc) + cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=OFF + make -j$(nproc) + cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=ON + make -j$(nproc) + + - name: Build + env: + HOME: /home/graphscope/ + run: | cd ${GITHUB_WORKSPACE}/flex mkdir build && cd build cmake .. && sudo make -j$(nproc) diff --git a/flex/CMakeLists.txt b/flex/CMakeLists.txt index 4381863e449f..19062fcb9167 100644 --- a/flex/CMakeLists.txt +++ b/flex/CMakeLists.txt @@ -31,6 +31,11 @@ set(CMAKE_BUILD_WITH_INSTALL_RPATH FALSE) set(CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/lib") set(CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE) +if (BUILD_HQPS) + message("Build HighQPS Engine") + add_definitions(-DBUILD_HQPS) +endif () + if (MONITOR_SESSIONS) message("Monitor sessions is enabled") add_definitions(-DMONITOR_SESSIONS) diff --git a/flex/codegen/CMakeLists.txt b/flex/codegen/CMakeLists.txt index 336350b15717..87f6834e656c 100644 --- a/flex/codegen/CMakeLists.txt +++ b/flex/codegen/CMakeLists.txt @@ -3,6 +3,8 @@ find_package(Boost REQUIRED COMPONENTS system filesystem context program_options regex thread) include_directories(SYSTEM ${Boost_INCLUDE_DIRS}) -add_executable(gen_code_from_plan gen_code_from_plan.cc) -target_link_libraries(gen_code_from_plan hqps_plan_proto ${GLOG_LIBRARIES} ${Boost_LIBRARIES}) -install_flex_target(gen_code_from_plan) +if (BUILD_HQPS) + add_executable(gen_code_from_plan gen_code_from_plan.cc) + target_link_libraries(gen_code_from_plan hqps_plan_proto ${GLOG_LIBRARIES} ${Boost_LIBRARIES}) + install_flex_target(gen_code_from_plan) +endif() diff --git a/flex/engines/http_server/CMakeLists.txt b/flex/engines/http_server/CMakeLists.txt index f337cff353d8..31feb6a9265d 100644 --- a/flex/engines/http_server/CMakeLists.txt +++ b/flex/engines/http_server/CMakeLists.txt @@ -7,14 +7,29 @@ if (Hiactor_FOUND) INCLUDE_PATHS ${Hiactor_INCLUDE_DIR},${CMAKE_CURRENT_SOURCE_DIR}/../../../,${CMAKE_CURRENT_SOURCE_DIR}/../../third_party/nlohmann-json/single_include/) file(GLOB_RECURSE SERVER_FILES "${CMAKE_CURRENT_SOURCE_DIR}/*.cc") + #if BUILD_HQPS, remove admin_http_handler.cc and hqps_*.cc + if (NOT BUILD_HQPS) + list(REMOVE_ITEM SERVER_FILES "${CMAKE_CURRENT_SOURCE_DIR}/handler/admin_http_handler.cc") + list(REMOVE_ITEM SERVER_FILES "${CMAKE_CURRENT_SOURCE_DIR}/handler/hqps_http_handler.cc") + list(REMOVE_ITEM SERVER_FILES "${CMAKE_CURRENT_SOURCE_DIR}/service/hqps_service.cc") + list(REMOVE_ITEM SERVER_FILES "${CMAKE_CURRENT_SOURCE_DIR}/service/codegen_proxy.h") + endif () add_library(flex_server STATIC ${SERVER_FILES} ${server_actor_autogen_files}) add_dependencies(flex_server server_actor_autogen) target_compile_options (flex_server PUBLIC -Wno-attributes) - target_link_libraries(flex_server Hiactor::hiactor hqps_plan_proto flex_graph_db) - target_include_directories(flex_server PUBLIC ${CMAKE_CURRENT_BINARY_DIR}/../hqps/) + target_link_libraries(flex_server Hiactor::hiactor flex_graph_db) + if (BUILD_HQPS) + target_include_directories(flex_server PUBLIC ${CMAKE_CURRENT_BINARY_DIR}/../hqps/) + target_link_libraries(flex_server hqps_plan_proto) + else () + # find protobuf and link, since hqps is not built + find_package(Protobuf REQUIRED) + target_include_directories(flex_server PUBLIC ${PROTOBUF_INCLUDE_DIRS}) + target_link_libraries(flex_server ${PROTOBUF_LIBRARIES}) + endif () install_without_export_flex_target(flex_server) endif () diff --git a/flex/engines/http_server/actor/executor.act.cc b/flex/engines/http_server/actor/executor.act.cc index cfedde1a3634..f6cd55da071a 100644 --- a/flex/engines/http_server/actor/executor.act.cc +++ b/flex/engines/http_server/actor/executor.act.cc @@ -19,7 +19,9 @@ #include "flex/engines/graph_db/database/graph_db_session.h" #include "flex/engines/http_server/codegen_proxy.h" #include "flex/engines/http_server/workdir_manipulator.h" +#ifdef BUILD_HQPS #include "flex/proto_generated_gie/stored_procedure.pb.h" +#endif // BUILD_HQPS #include "nlohmann/json.hpp" #include diff --git a/flex/storages/rt_mutable_graph/CMakeLists.txt b/flex/storages/rt_mutable_graph/CMakeLists.txt index 82150ac98abd..6db4a18e095a 100644 --- a/flex/storages/rt_mutable_graph/CMakeLists.txt +++ b/flex/storages/rt_mutable_graph/CMakeLists.txt @@ -3,9 +3,8 @@ find_package(yaml-cpp REQUIRED) include_directories(SYSTEM ${yaml-cpp_INCLUDE_DIRS}) -if (BUILD_ODPS_FRAGMENT_LOADER) - file(GLOB_RECURSE RT_MUTABLE_GRAPH_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/*.cc") -else() +file(GLOB_RECURSE RT_MUTABLE_GRAPH_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/*.cc") +if (NOT BUILD_ODPS_FRAGMENT_LOADER) # exclude odps_fragment_loader.cc message(STATUS "exclude odps_fragment_loader.cc") file(GLOB_RECURSE RT_MUTABLE_GRAPH_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/*.cc") diff --git a/flex/tests/hqps/CMakeLists.txt b/flex/tests/hqps/CMakeLists.txt index 003e4e2f44f9..57976bfbe043 100644 --- a/flex/tests/hqps/CMakeLists.txt +++ b/flex/tests/hqps/CMakeLists.txt @@ -2,11 +2,12 @@ # file(GLOB_RECURSE GS_TEST_FILES "${CMAKE_CURRENT_SOURCE_DIR}/*.cc") file(GLOB GS_TEST_FILES RELATIVE "${CMAKE_CURRENT_SOURCE_DIR}" "${CMAKE_CURRENT_SOURCE_DIR}/*.cc") - -foreach(f ${GS_TEST_FILES}) - string(REGEX MATCH "^(.*)\\.[^.]*$" dummy ${f}) - set(T_NAME ${CMAKE_MATCH_1}) - message(STATUS "Found graphscope test - " ${T_NAME}) - add_executable(${T_NAME} ${CMAKE_CURRENT_SOURCE_DIR}/${T_NAME}.cc) - target_link_libraries(${T_NAME} hqps_plan_proto flex_rt_mutable_graph flex_graph_db flex_utils ${GLOG_LIBRARIES} ${LIBGRAPELITE_LIBRARIES}) -endforeach() +if (BUILD_HQPS) + foreach(f ${GS_TEST_FILES}) + string(REGEX MATCH "^(.*)\\.[^.]*$" dummy ${f}) + set(T_NAME ${CMAKE_MATCH_1}) + message(STATUS "Found graphscope test - " ${T_NAME}) + add_executable(${T_NAME} ${CMAKE_CURRENT_SOURCE_DIR}/${T_NAME}.cc) + target_link_libraries(${T_NAME} hqps_plan_proto flex_rt_mutable_graph flex_graph_db flex_utils ${GLOG_LIBRARIES} ${LIBGRAPELITE_LIBRARIES}) + endforeach() +endif() \ No newline at end of file From 0bf59ee2b35cc3bc7c22df7b9c7b9b6ac62be385 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Fri, 19 Jan 2024 17:39:36 +0800 Subject: [PATCH 02/16] sudo --- .github/workflows/flex.yml | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/.github/workflows/flex.yml b/.github/workflows/flex.yml index baf0f4810a84..37ec0481aab9 100644 --- a/.github/workflows/flex.yml +++ b/.github/workflows/flex.yml @@ -49,21 +49,21 @@ jobs: # try different combination of different options. -DBUILD_HQPS=ON/OFF -DBUILD_TEST=ON/OFF, -DBUILD_ODPS_FRAGMENT_LOADER=ON/OFF mkdir build && cd build cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=OFF - make -j$(nproc) + sudo make -j$(nproc) cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=ON - make -j$(nproc) + sudo make -j$(nproc) cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=OFF - make -j$(nproc) + sudo make -j$(nproc) cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=ON - make -j$(nproc) + sudo make -j$(nproc) cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=OFF - make -j$(nproc) + sudo make -j$(nproc) cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=ON - make -j$(nproc) + sudo make -j$(nproc) cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=OFF - make -j$(nproc) + sudo make -j$(nproc) cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=ON - make -j$(nproc) + sudo make -j$(nproc) - name: Build env: From 8e9d08c299ed31f0a6b8bbb5cb8c7a0e31cf7684 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Mon, 22 Jan 2024 11:05:48 +0800 Subject: [PATCH 03/16] debug --- .github/workflows/flex.yml | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/.github/workflows/flex.yml b/.github/workflows/flex.yml index 37ec0481aab9..0e2498f2120b 100644 --- a/.github/workflows/flex.yml +++ b/.github/workflows/flex.yml @@ -48,21 +48,29 @@ jobs: git submodule update --init # try different combination of different options. -DBUILD_HQPS=ON/OFF -DBUILD_TEST=ON/OFF, -DBUILD_ODPS_FRAGMENT_LOADER=ON/OFF mkdir build && cd build - cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=OFF + echo "cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=ON" + cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=ON -DCMAKE_BUILD_TYPE=Debug -DBUILD_DOC=OFF sudo make -j$(nproc) - cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=ON + echo "cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=OFF" + cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=OFF -DCMAKE_BUILD_TYPE=Debug -DBUILD_DOC=OFF sudo make -j$(nproc) - cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=OFF + echo "cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=ON" + cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=ON -DCMAKE_BUILD_TYPE=Debug -DBUILD_DOC=OFF sudo make -j$(nproc) - cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=ON + echo "cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=OFF" + cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=OFF -DCMAKE_BUILD_TYPE=Debug -DBUILD_DOC=OFF sudo make -j$(nproc) - cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=OFF + echo "cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=ON" + cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=ON -DCMAKE_BUILD_TYPE=Debug -DBUILD_DOC=OFF sudo make -j$(nproc) - cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=ON + echo "cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=OFF" + cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=OFF -DCMAKE_BUILD_TYPE=Debug -DBUILD_DOC=OFF sudo make -j$(nproc) - cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=OFF + echo "cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=ON" + cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=ON -DCMAKE_BUILD_TYPE=Debug -DBUILD_DOC=OFF sudo make -j$(nproc) - cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=ON + echo "cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=OFF" + cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=OFF -DCMAKE_BUILD_TYPE=Debug -DBUILD_DOC=OFF sudo make -j$(nproc) - name: Build From e2d24ac15cde7a06d51132966ec75bafd63f124e Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Mon, 22 Jan 2024 14:35:13 +0800 Subject: [PATCH 04/16] fix --- flex/CMakeLists.txt | 8 +++ flex/engines/graph_db/CMakeLists.txt | 2 +- .../graph_db/database/graph_db_session.cc | 39 ++------------ .../graph_db/database/graph_db_session.h | 6 +-- flex/engines/http_server/CMakeLists.txt | 21 ++++++-- .../engines/http_server/actor/executor.act.cc | 23 ++------ flex/engines/http_server/actor/executor.act.h | 2 +- .../http_server/handler/hqps_http_handler.cc | 53 ++++++++++++++++++- .../http_server/handler/hqps_http_handler.h | 3 ++ flex/engines/http_server/types.h | 1 + flex/utils/app_utils.h | 2 + 11 files changed, 94 insertions(+), 66 deletions(-) diff --git a/flex/CMakeLists.txt b/flex/CMakeLists.txt index 19062fcb9167..1d058cc7f2c6 100644 --- a/flex/CMakeLists.txt +++ b/flex/CMakeLists.txt @@ -17,6 +17,14 @@ option(BUILD_ODPS_FRAGMENT_LOADER "Whether to build odps fragment loader" ON) option(MONITOR_SESSIONS "Whether monitor sessions" OFF) option(ENABLE_HUGEPAGE "Whether to use hugepages when open mmap array in memory" OFF) +#print options +message(STATUS "Build HighQPS Engine: ${BUILD_HQPS}") +message(STATUS "Build test: ${BUILD_TEST}") +message(STATUS "Build doc: ${BUILD_DOC}") +message(STATUS "Build odps fragment loader: ${BUILD_ODPS_FRAGMENT_LOADER}") +message(STATUS "Monitor sessions: ${MONITOR_SESSIONS}") +message(STATUS "Enable hugepage: ${ENABLE_HUGEPAGE}") + # ------------------------------------------------------------------------------ # cmake configs # ------------------------------------------------------------------------------ diff --git a/flex/engines/graph_db/CMakeLists.txt b/flex/engines/graph_db/CMakeLists.txt index 4f5544fcb23b..8a5873b13e04 100644 --- a/flex/engines/graph_db/CMakeLists.txt +++ b/flex/engines/graph_db/CMakeLists.txt @@ -4,7 +4,7 @@ file(GLOB_RECURSE GRAPH_DB_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/app/*.cc" add_library(flex_graph_db SHARED ${GRAPH_DB_SRC_FILES}) target_include_directories(flex_graph_db PUBLIC $) -target_link_libraries(flex_graph_db flex_rt_mutable_graph flex_utils hqps_plan_proto ${GLOG_LIBRARIES} ${LIBGRAPELITE_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) +target_link_libraries(flex_graph_db flex_rt_mutable_graph flex_utils ${GLOG_LIBRARIES} ${LIBGRAPELITE_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) install_flex_target(flex_graph_db) install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/database/graph_db.h diff --git a/flex/engines/graph_db/database/graph_db_session.cc b/flex/engines/graph_db/database/graph_db_session.cc index 20423d0c6cf4..16dfcf894773 100644 --- a/flex/engines/graph_db/database/graph_db_session.cc +++ b/flex/engines/graph_db/database/graph_db_session.cc @@ -24,27 +24,6 @@ namespace gs { -void put_argment(Encoder& encoder, const query::Argument& argment) { - auto& value = argment.value(); - auto item_case = value.item_case(); - switch (item_case) { - case common::Value::kI32: - encoder.put_int(value.i32()); - break; - case common::Value::kI64: - encoder.put_long(value.i64()); - break; - case common::Value::kF64: - encoder.put_double(value.f64()); - break; - case common::Value::kStr: - encoder.put_string(value.str()); - break; - default: - LOG(ERROR) << "Not recognizable param type" << static_cast(item_case); - } -} - ReadTransaction GraphDBSession::GetReadTransaction() { uint32_t ts = db_.version_manager_.acquire_read_timestamp(); return ReadTransaction(db_.graph_, db_.version_manager_, ts); @@ -236,8 +215,7 @@ Result> GraphDBSession::EvalAdhoc( } Result> GraphDBSession::EvalHqpsProcedure( - const query::Query& query_pb) { - auto query_name = query_pb.query_name().name(); + const std::string& query_name, Decoder& input_decoder) { if (query_name.empty()) { LOG(ERROR) << "Query name is empty"; return Result>(StatusCode::InValidArgument, @@ -284,17 +262,8 @@ Result> GraphDBSession::EvalHqpsProcedure( StatusCode::NotExists, "Query type is not registered: " + std::to_string(type), {}); } - - std::vector input_buffer; - gs::Encoder input_encoder(input_buffer); - auto& args = query_pb.arguments(); - for (int32_t i = 0; i < args.size(); ++i) { - auto& arg = args[i]; - put_argment(input_encoder, arg); - } - const char* str_data = input_buffer.data(); - size_t str_len = input_buffer.size(); - gs::Decoder input_decoder(input_buffer.data(), input_buffer.size()); + const char* input_char = input_decoder.data(); + size_t input_size = input_decoder.size(); for (size_t i = 0; i < MAX_RETRY; ++i) { std::vector result_buffer; @@ -305,7 +274,7 @@ Result> GraphDBSession::EvalHqpsProcedure( LOG(INFO) << "[Query-" << query_name << "][Thread-" << thread_id_ << "] retry - " << i << " / " << MAX_RETRY; std::this_thread::sleep_for(std::chrono::milliseconds(1)); - input_decoder.reset(str_data, str_len); + input_decoder.reset(input_char, input_size); } return Result>( StatusCode::QueryFailed, "Query failed for procedure: " + query_name, {}); diff --git a/flex/engines/graph_db/database/graph_db_session.h b/flex/engines/graph_db/database/graph_db_session.h index c12cec2e7c0e..68568c1b1dc0 100644 --- a/flex/engines/graph_db/database/graph_db_session.h +++ b/flex/engines/graph_db/database/graph_db_session.h @@ -24,7 +24,6 @@ #include "flex/engines/graph_db/database/single_vertex_insert_transaction.h" #include "flex/engines/graph_db/database/transaction_utils.h" #include "flex/engines/graph_db/database/update_transaction.h" -#include "flex/proto_generated_gie/stored_procedure.pb.h" #include "flex/storages/rt_mutable_graph/mutable_property_fragment.h" #include "flex/utils/property/column.h" #include "flex/utils/result.h" @@ -34,8 +33,6 @@ namespace gs { class GraphDB; class WalWriter; -void put_argment(gs::Encoder& encoder, const query::Argument& argment); - class GraphDBSession { public: static constexpr int32_t MAX_RETRY = 3; @@ -88,7 +85,8 @@ class GraphDBSession { Result> EvalAdhoc(const std::string& input_lib_path); // Evaluate a stored procedure with input parameters given. - Result> EvalHqpsProcedure(const query::Query& query_pb); + Result> EvalHqpsProcedure(const std::string& query_name, + Decoder& decoder); void GetAppInfo(Encoder& result); diff --git a/flex/engines/http_server/CMakeLists.txt b/flex/engines/http_server/CMakeLists.txt index 31feb6a9265d..5ce856a94290 100644 --- a/flex/engines/http_server/CMakeLists.txt +++ b/flex/engines/http_server/CMakeLists.txt @@ -6,14 +6,25 @@ if (Hiactor_FOUND) SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/ INCLUDE_PATHS ${Hiactor_INCLUDE_DIR},${CMAKE_CURRENT_SOURCE_DIR}/../../../,${CMAKE_CURRENT_SOURCE_DIR}/../../third_party/nlohmann-json/single_include/) + message(STATUS "server_actor_autogen_files: ${server_actor_autogen_files}") + + if (NOT BUILD_HQPS) + list(FILTER server_actor_autogen_files EXCLUDE REGEX ".*admin.*") + list(FILTER server_actor_autogen_files EXCLUDE REGEX ".*codegen.*") + message(STATUS "server_actor_autogen_files: ${server_actor_autogen_files}") + endif () + + # get all .cc files in current directory, except for generated/ file(GLOB_RECURSE SERVER_FILES "${CMAKE_CURRENT_SOURCE_DIR}/*.cc") - #if BUILD_HQPS, remove admin_http_handler.cc and hqps_*.cc + list(FILTER SERVER_FILES EXCLUDE REGEX ".*generated.*") + if (NOT BUILD_HQPS) - list(REMOVE_ITEM SERVER_FILES "${CMAKE_CURRENT_SOURCE_DIR}/handler/admin_http_handler.cc") - list(REMOVE_ITEM SERVER_FILES "${CMAKE_CURRENT_SOURCE_DIR}/handler/hqps_http_handler.cc") - list(REMOVE_ITEM SERVER_FILES "${CMAKE_CURRENT_SOURCE_DIR}/service/hqps_service.cc") - list(REMOVE_ITEM SERVER_FILES "${CMAKE_CURRENT_SOURCE_DIR}/service/codegen_proxy.h") + list(FILTER SERVER_FILES EXCLUDE REGEX ".*admin*") + list(FILTER SERVER_FILES EXCLUDE REGEX ".*hqps*") + list(FILTER SERVER_FILES EXCLUDE REGEX ".*codegen*") + list(FILTER SERVER_FILES EXCLUDE REGEX ".*workdir_manipulator*") endif () + message(STATUS "SERVER_FILES: ${SERVER_FILES}") add_library(flex_server STATIC ${SERVER_FILES} ${server_actor_autogen_files}) add_dependencies(flex_server server_actor_autogen) diff --git a/flex/engines/http_server/actor/executor.act.cc b/flex/engines/http_server/actor/executor.act.cc index f6cd55da071a..0710bb658be6 100644 --- a/flex/engines/http_server/actor/executor.act.cc +++ b/flex/engines/http_server/actor/executor.act.cc @@ -17,11 +17,6 @@ #include "flex/engines/graph_db/database/graph_db.h" #include "flex/engines/graph_db/database/graph_db_session.h" -#include "flex/engines/http_server/codegen_proxy.h" -#include "flex/engines/http_server/workdir_manipulator.h" -#ifdef BUILD_HQPS -#include "flex/proto_generated_gie/stored_procedure.pb.h" -#endif // BUILD_HQPS #include "nlohmann/json.hpp" #include @@ -55,22 +50,14 @@ seastar::future executor::run_graph_db_query( // run_query_for stored_procedure seastar::future executor::run_hqps_procedure_query( - query_param&& param) { - auto& str = param.content; - const char* str_data = str.data(); - size_t str_length = str.size(); - LOG(INFO) << "Receive pay load: " << str_length << " bytes"; - - query::Query cur_query; - if (!cur_query.ParseFromArray(str_data, str_length)) { - LOG(ERROR) << "Fail to parse query from pay load"; - return seastar::make_ready_future( - seastar::sstring("Fail to parse query from pay load")); - } + hqps_proc_param&& param) { + auto query_name = param.content.first; + auto query_args = param.content.second; + gs::Decoder decoder(query_args.data(), query_args.size()); auto ret = gs::GraphDB::get() .GetSession(hiactor::local_shard_id()) - .EvalHqpsProcedure(cur_query); + .EvalHqpsProcedure(query_name, decoder); if (!ret.ok()) { LOG(ERROR) << "Eval failed: " << ret.status().error_message(); return seastar::make_exception_future( diff --git a/flex/engines/http_server/actor/executor.act.h b/flex/engines/http_server/actor/executor.act.h index e3e504415440..50aeecfc0c13 100644 --- a/flex/engines/http_server/actor/executor.act.h +++ b/flex/engines/http_server/actor/executor.act.h @@ -30,7 +30,7 @@ class ANNOTATION(actor:impl) executor : public hiactor::actor { seastar::future ANNOTATION(actor:method) run_graph_db_query(query_param&& param); - seastar::future ANNOTATION(actor:method) run_hqps_procedure_query(query_param&& param); + seastar::future ANNOTATION(actor:method) run_hqps_procedure_query(hqps_proc_param&& param); seastar::future ANNOTATION(actor:method) run_hqps_adhoc_query(adhoc_result&& param); diff --git a/flex/engines/http_server/handler/hqps_http_handler.cc b/flex/engines/http_server/handler/hqps_http_handler.cc index 82102867586f..711c80e52dd0 100644 --- a/flex/engines/http_server/handler/hqps_http_handler.cc +++ b/flex/engines/http_server/handler/hqps_http_handler.cc @@ -15,11 +15,31 @@ #include "flex/engines/http_server/executor_group.actg.h" #include "flex/engines/http_server/options.h" #include "flex/engines/http_server/service/hqps_service.h" - #include "flex/engines/http_server/types.h" namespace server { +void put_argment(gs::Encoder& encoder, const query::Argument& argment) { + auto& value = argment.value(); + auto item_case = value.item_case(); + switch (item_case) { + case common::Value::kI32: + encoder.put_int(value.i32()); + break; + case common::Value::kI64: + encoder.put_long(value.i64()); + break; + case common::Value::kF64: + encoder.put_double(value.f64()); + break; + case common::Value::kStr: + encoder.put_string(value.str()); + break; + default: + LOG(ERROR) << "Not recognizable param type" << static_cast(item_case); + } +} + hqps_ic_handler::hqps_ic_handler(uint32_t init_group_id, uint32_t max_group_id, uint32_t group_inc_step, uint32_t shard_concurrency) @@ -86,9 +106,38 @@ seastar::future> hqps_ic_handler::handle( std::unique_ptr rep) { auto dst_executor = executor_idx_; executor_idx_ = (executor_idx_ + 1) % shard_concurrency_; + // first extract the query name and parameters + auto& str = req->content; + const char* str_data = str.data(); + size_t str_length = str.size(); + LOG(INFO) << "Receive pay load: " << str_length << " bytes"; + + query::Query cur_query; + if (!cur_query.ParseFromArray(str_data, str_length)) { + LOG(ERROR) << "Fail to parse query from pay load"; + rep->set_status(seastar::httpd::reply::status_type::bad_request); + rep->write_body("bin", + seastar::sstring("Fail to parse query from pay load")); + rep->done(); + return seastar::make_ready_future>( + std::move(rep)); + } + auto query_name = cur_query.query_name().name(); + + std::vector input_buffer; + gs::Encoder input_encoder(input_buffer); + auto& args = cur_query.arguments(); + for (int32_t i = 0; i < args.size(); ++i) { + auto& arg = args[i]; + put_argment(input_encoder, arg); + } + VLOG(10) << "Query name: " << query_name << ", args: " << input_buffer.size() + << " bytes"; + seastar::sstring input_str(input_buffer.data(), input_buffer.size()); return executor_refs_[dst_executor] - .run_hqps_procedure_query(query_param{std::move(req->content)}) + .run_hqps_procedure_query(hqps_proc_param{ + std::make_pair(seastar::sstring{query_name}, input_str)}) .then_wrapped( [rep = std::move(rep)](seastar::future&& fut) mutable { if (__builtin_expect(fut.failed(), false)) { diff --git a/flex/engines/http_server/handler/hqps_http_handler.h b/flex/engines/http_server/handler/hqps_http_handler.h index 62237280641f..5c620e575d67 100644 --- a/flex/engines/http_server/handler/hqps_http_handler.h +++ b/flex/engines/http_server/handler/hqps_http_handler.h @@ -21,8 +21,11 @@ #include #include "flex/engines/http_server/generated/actor/codegen_actor_ref.act.autogen.h" #include "flex/engines/http_server/generated/actor/executor_ref.act.autogen.h" +#include "flex/proto_generated_gie/stored_procedure.pb.h" +#include "flex/utils/app_utils.h" namespace server { +void put_argment(gs::Encoder& encoder, const query::Argument& argment); class hqps_ic_handler : public seastar::httpd::handler_base { public: diff --git a/flex/engines/http_server/types.h b/flex/engines/http_server/types.h index 965fbd7137d3..25474720589b 100644 --- a/flex/engines/http_server/types.h +++ b/flex/engines/http_server/types.h @@ -54,6 +54,7 @@ struct payload { }; using query_param = payload; +using hqps_proc_param = payload>; using query_result = payload; using adhoc_result = payload>; using admin_query_result = payload>; diff --git a/flex/utils/app_utils.h b/flex/utils/app_utils.h index cc23eadf0473..a8156376dc95 100644 --- a/flex/utils/app_utils.h +++ b/flex/utils/app_utils.h @@ -79,6 +79,8 @@ class Decoder { const char* data() const; + size_t size() const; + bool empty() const; void reset(const char* ptr, size_t size); From c9a1381b88724568ac7f32ccf40f1ed15aa87f26 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Mon, 22 Jan 2024 14:49:44 +0800 Subject: [PATCH 05/16] fix --- flex/utils/app_utils.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/flex/utils/app_utils.cc b/flex/utils/app_utils.cc index e2acfbb4cdec..d476df68e9be 100644 --- a/flex/utils/app_utils.cc +++ b/flex/utils/app_utils.cc @@ -154,6 +154,8 @@ uint8_t Decoder::get_byte() { return static_cast(*(data_++)); } const char* Decoder::data() const { return data_; } +size_t Decoder::size() const { return end_ - data_; } + bool Decoder::empty() const { return data_ == end_; } void Decoder::reset(const char* p, size_t size) { From 6d5436ea387cbedefd9d00e413441f3bb1171b4c Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Mon, 22 Jan 2024 15:16:17 +0800 Subject: [PATCH 06/16] fix --- .github/workflows/flex.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/flex.yml b/.github/workflows/flex.yml index 0e2498f2120b..fab2b094aac0 100644 --- a/.github/workflows/flex.yml +++ b/.github/workflows/flex.yml @@ -72,6 +72,7 @@ jobs: echo "cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=OFF" cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=OFF -DCMAKE_BUILD_TYPE=Debug -DBUILD_DOC=OFF sudo make -j$(nproc) + cd .. && rm -rf build - name: Build env: From ae436cb30cc69b8bdfd2dd00789e33bf40e27eae Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Wed, 24 Jan 2024 20:06:09 +0800 Subject: [PATCH 07/16] merge executor's method into one --- .github/workflows/hqps-db-ci.yml | 11 ++ flex/bin/load_plan_and_gen.sh | 2 +- flex/codegen/src/hqps/hqps_scan_builder.h | 3 + flex/codegen/src/hqps_generator.h | 13 +- .../http_server/actor/codegen_actor.act.cc | 14 +- .../http_server/actor/codegen_actor.act.h | 2 +- .../engines/http_server/actor/executor.act.cc | 177 +++++++++++------- flex/engines/http_server/actor/executor.act.h | 3 - .../handler/graph_db_http_handler.cc | 6 +- .../http_server/handler/hqps_http_handler.cc | 60 +----- flex/engines/http_server/types.h | 3 +- .../modern_graph/count_vertex_num.cypher | 2 +- 12 files changed, 158 insertions(+), 138 deletions(-) diff --git a/.github/workflows/hqps-db-ci.yml b/.github/workflows/hqps-db-ci.yml index 786973a39c8c..a57c8381a2f1 100644 --- a/.github/workflows/hqps-db-ci.yml +++ b/.github/workflows/hqps-db-ci.yml @@ -170,6 +170,17 @@ jobs: eval ${cmd} done + -name: Test cypher procedure generation + run: | + cd ${GITHUB_WORKSPACE}/flex/bin + ./load_plan_and_gen.sh -e=hqps -i=../interactive/examples/modern_graph/get_person_name.cypher -w=/tmp/codegen \ + --ir_conf=/workspaces/GraphScope/flex/tests/hqps/engine_config_test.yaml \ + --graph_schema_path=../interactive/examples/modern_graph/modern_graph.yaml + + ./load_plan_and_gen.sh -e=hqps -i=../interactive/examples/modern_graph/count_vertex_num.cypher -w=/tmp/codegen \ + --ir_conf=/workspaces/GraphScope/flex/tests/hqps/engine_config_test.yaml \ + --graph_schema_path=../interactive/examples/modern_graph/modern_graph.yaml + - name: Run End-to-End cypher adhoc ldbc query test env: GS_TEST_DIR: ${{ github.workspace }}/gstest diff --git a/flex/bin/load_plan_and_gen.sh b/flex/bin/load_plan_and_gen.sh index 3afc76828d47..a59ef3e6debe 100755 --- a/flex/bin/load_plan_and_gen.sh +++ b/flex/bin/load_plan_and_gen.sh @@ -276,7 +276,7 @@ compile_hqps_so() { if [ ! -z ${CMAKE_C_COMPILER} ]; then cmd="${cmd} -DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}" fi - info "Cmake command = ${cmd}" + info "CMake command = ${cmd}" info "---------------------------" eval ${cmd} diff --git a/flex/codegen/src/hqps/hqps_scan_builder.h b/flex/codegen/src/hqps/hqps_scan_builder.h index e59650e4fb84..fa7ea9eb5c61 100644 --- a/flex/codegen/src/hqps/hqps_scan_builder.h +++ b/flex/codegen/src/hqps/hqps_scan_builder.h @@ -199,6 +199,9 @@ class ScanOpBuilder { VLOG(10) << "receive param const in index predicate: " << dyn_param_pb.DebugString(); ctx_.AddParameterVar(param_const); + // set to oid_ and oid_type_name_ + oid_ = param_const.var_name; + oid_type_name_ = data_type_2_string(param_const.type); } return *this; diff --git a/flex/codegen/src/hqps_generator.h b/flex/codegen/src/hqps_generator.h index 294a9f94dafe..4890a799ad2c 100644 --- a/flex/codegen/src/hqps_generator.h +++ b/flex/codegen/src/hqps_generator.h @@ -227,12 +227,16 @@ class QueryGenerator { CHECK(param_vars[i] == param_vars[i - 1]); continue; } else { - ss << ", " << data_type_2_string(param_vars[i].type) << " " - << param_vars[i].var_name; + ss << data_type_2_string(param_vars[i].type) << " " + << param_vars[i].var_name << ","; } } } - return ss.str(); + auto str = ss.str(); + if (str.size() > 0) { + str.pop_back(); // remove the last comma + } + return str; } // implement the function that overrides the base class. @@ -265,9 +269,6 @@ class QueryGenerator { std::string param_vars_decoding, param_vars_concat_str; { std::stringstream ss; - if (param_names.size() > 0) { - ss << ","; - } for (size_t i = 0; i < param_names.size(); ++i) { ss << param_names[i]; if (i != param_names.size() - 1) { diff --git a/flex/engines/http_server/actor/codegen_actor.act.cc b/flex/engines/http_server/actor/codegen_actor.act.cc index 9fdfc1260c9d..6e1be0151859 100644 --- a/flex/engines/http_server/actor/codegen_actor.act.cc +++ b/flex/engines/http_server/actor/codegen_actor.act.cc @@ -37,13 +37,13 @@ codegen_actor::codegen_actor(hiactor::actor_base* exec_ctx, // ... } -seastar::future codegen_actor::do_codegen(query_param&& param) { +seastar::future codegen_actor::do_codegen(query_param&& param) { LOG(INFO) << "Running codegen for " << param.content.size(); // The received query's pay load shoud be able to deserialze to physical plan auto& str = param.content; if (str.size() <= 0) { LOG(INFO) << "Empty query"; - return seastar::make_exception_future( + return seastar::make_exception_future( std::runtime_error("Empty query string")); } @@ -57,7 +57,7 @@ seastar::future codegen_actor::do_codegen(query_param&& param) { VLOG(10) << "Parse physical plan: " << plan.DebugString(); } else { LOG(ERROR) << "Fail to parse physical plan"; - return seastar::make_exception_future( + return seastar::make_exception_future( std::runtime_error("Fail to parse physical plan")); } @@ -69,7 +69,7 @@ seastar::future codegen_actor::do_codegen(query_param&& param) { return codegen_proxy.DoGen(plan).then( [](std::pair&& job_id_and_lib_path) { if (job_id_and_lib_path.first == -1) { - return seastar::make_exception_future( + return seastar::make_exception_future( std::runtime_error("Fail to parse job id from codegen proxy")); } // 1. load and run. @@ -77,11 +77,11 @@ seastar::future codegen_actor::do_codegen(query_param&& param) { << job_id_and_lib_path.second << ", job id: " << job_id_and_lib_path.first << "local shard id: " << hiactor::local_shard_id(); - return seastar::make_ready_future( - std::move(job_id_and_lib_path)); + return seastar::make_ready_future( + std::move(job_id_and_lib_path.second)); }); } else { - return seastar::make_exception_future( + return seastar::make_exception_future( std::runtime_error("Codegen proxy not initialized")); } } diff --git a/flex/engines/http_server/actor/codegen_actor.act.h b/flex/engines/http_server/actor/codegen_actor.act.h index 6222a81db4fd..597b40167082 100644 --- a/flex/engines/http_server/actor/codegen_actor.act.h +++ b/flex/engines/http_server/actor/codegen_actor.act.h @@ -28,7 +28,7 @@ class ANNOTATION(actor:impl) codegen_actor : public hiactor::actor { codegen_actor(hiactor::actor_base* exec_ctx, const hiactor::byte_t* addr); ~codegen_actor() override; - seastar::future ANNOTATION(actor:method) do_codegen(query_param&& param); + seastar::future ANNOTATION(actor:method) do_codegen(query_param&& param); // DECLARE_RUN_QUERYS; /// Declare `do_work` func here, no need to implement. diff --git a/flex/engines/http_server/actor/executor.act.cc b/flex/engines/http_server/actor/executor.act.cc index 0710bb658be6..896264fdd638 100644 --- a/flex/engines/http_server/actor/executor.act.cc +++ b/flex/engines/http_server/actor/executor.act.cc @@ -18,10 +18,35 @@ #include "flex/engines/graph_db/database/graph_db.h" #include "flex/engines/graph_db/database/graph_db_session.h" #include "nlohmann/json.hpp" +#ifdef BUILD_HQPS +#include "flex/proto_generated_gie/stored_procedure.pb.h" +#endif #include namespace server { +#ifdef BUILD_HQPS +void put_argment(gs::Encoder& encoder, const query::Argument& argment) { + auto& value = argment.value(); + auto item_case = value.item_case(); + switch (item_case) { + case common::Value::kI32: + encoder.put_int(value.i32()); + break; + case common::Value::kI64: + encoder.put_long(value.i64()); + break; + case common::Value::kF64: + encoder.put_double(value.f64()); + break; + case common::Value::kStr: + encoder.put_string(value.str()); + break; + default: + LOG(ERROR) << "Not recognizable param type" << static_cast(item_case); + } +} +#endif executor::~executor() { // finalization @@ -37,75 +62,99 @@ executor::executor(hiactor::actor_base* exec_ctx, const hiactor::byte_t* addr) seastar::future executor::run_graph_db_query( query_param&& param) { - auto ret = gs::GraphDB::get() - .GetSession(hiactor::local_shard_id()) - .Eval(param.content); - if (!ret.ok()) { - LOG(ERROR) << "Eval failed: " << ret.status().error_message(); - } - auto result = ret.value(); - seastar::sstring content(result.data(), result.size()); - return seastar::make_ready_future(std::move(content)); -} - -// run_query_for stored_procedure -seastar::future executor::run_hqps_procedure_query( - hqps_proc_param&& param) { - auto query_name = param.content.first; - auto query_args = param.content.second; - gs::Decoder decoder(query_args.data(), query_args.size()); - - auto ret = gs::GraphDB::get() - .GetSession(hiactor::local_shard_id()) - .EvalHqpsProcedure(query_name, decoder); - if (!ret.ok()) { - LOG(ERROR) << "Eval failed: " << ret.status().error_message(); + auto& input_content = param.content; + if (input_content.size() < 1) { return seastar::make_exception_future( - seastar::sstring(ret.status().error_message())); + seastar::sstring("Invalid status")); } - auto result = ret.value(); - if (result.size() < 4) { - return seastar::make_exception_future(seastar::sstring( - "Internal Error, more than 4 bytes should be returned")); - } - seastar::sstring content( - result.data() + 4, - result.size() - 4); // skip 4 bytes, since the first 4 - // bytes is the size of the result - return seastar::make_ready_future(std::move(content)); -} + // get the last byte + char type = input_content[input_content.size() - 1]; + input_content.resize(input_content.size() - 1); + LOG(INFO) << "Run graph db query, type: " << type; + if (type == '\0') { // graph_db query + auto ret = gs::GraphDB::get() + .GetSession(hiactor::local_shard_id()) + .Eval(input_content); + if (!ret.ok()) { + LOG(ERROR) << "Eval failed: " << ret.status().error_message(); + } + auto result = ret.value(); + seastar::sstring content(result.data(), result.size()); + return seastar::make_ready_future(std::move(content)); + } else if (type == '\1') { // hqps procedure query. +#ifndef BUILD_HQPS + return seastar::make_exception_future( + seastar::sstring("HQPS is not disabled, please recompile with " + "BUILD_HQPS=ON to enable HQPS")); +#else + query::Query cur_query; + if (!cur_query.ParseFromArray(input_content.data(), input_content.size())) { + return seastar::make_exception_future( + seastar::sstring("Can not parse the query")); + } + auto query_name = cur_query.query_name().name(); -seastar::future executor::run_hqps_adhoc_query( - adhoc_result&& param) { - LOG(INFO) << "Run adhoc query"; - // The received query's pay load shoud be able to deserialze to physical plan - // 1. load and run. - auto& content = param.content; - LOG(INFO) << "Okay, try to run the query of lib path: " << content.second - << ", job id: " << content.first - << "local shard id: " << hiactor::local_shard_id(); - // seastar::sstring result = server::load_and_run(content.first, - // content.second); - auto ret = gs::GraphDB::get() - .GetSession(hiactor::local_shard_id()) - .EvalAdhoc(content.second); - if (!ret.ok()) { - LOG(ERROR) << "Eval failed: " << ret.status().error_message(); + std::vector input_buffer; + gs::Encoder input_encoder(input_buffer); + auto& args = cur_query.arguments(); + for (int32_t i = 0; i < args.size(); ++i) { + put_argment(input_encoder, args[i]); + } + VLOG(10) << "Query name: " << query_name + << ", args: " << input_buffer.size() << " bytes"; + gs::Decoder decoder(input_buffer.data(), input_buffer.size()); + auto ret = gs::GraphDB::get() + .GetSession(hiactor::local_shard_id()) + .EvalHqpsProcedure(query_name, decoder); + if (!ret.ok()) { + LOG(ERROR) << "Eval failed: " << ret.status().error_message(); + return seastar::make_exception_future( + seastar::sstring(ret.status().error_message())); + } + auto result = ret.value(); + if (result.size() < 4) { + return seastar::make_exception_future(seastar::sstring( + "Internal Error, more than 4 bytes should be returned")); + } + seastar::sstring content( + result.data() + 4, + result.size() - 4); // skip 4 bytes, since the first 4 + // bytes is the size of the result + return seastar::make_ready_future(std::move(content)); +#endif // BUILD_HQPS + } else if (type == '\2') { // hqp adhoc query +#ifndef BUILD_HQPS return seastar::make_exception_future( - seastar::sstring(ret.status().error_message())); + seastar::sstring("HQPS is not disabled, please recompile with " + "BUILD_HQPS=ON to enable HQPS")); +#else + LOG(INFO) << "Okay, try to run adhoc query of lib path: " << input_content; + // seastar::sstring result = server::load_and_run(content.first, + // content.second); + auto ret = gs::GraphDB::get() + .GetSession(hiactor::local_shard_id()) + .EvalAdhoc(input_content); + if (!ret.ok()) { + LOG(ERROR) << "Eval failed: " << ret.status().error_message(); + return seastar::make_exception_future( + seastar::sstring(ret.status().error_message())); + } + auto ret_value = ret.value(); + VLOG(10) << "Adhoc query result size: " << ret_value.size(); + if (ret_value.size() < 4) { + return seastar::make_exception_future(seastar::sstring( + "Internal Error, more than 4 bytes should be returned")); + } + seastar::sstring result( + ret_value.data() + 4, + ret_value.size() - 4); // skip 4 bytes, since the first 4 + // bytes is the size of the result + return seastar::make_ready_future(std::move(result)); +#endif // BUILD_HQPS + } else { + seastar::sstring error_msg = "Invalid query type: " + std::to_string(type); + return seastar::make_exception_future(error_msg); } - auto ret_value = ret.value(); - LOG(INFO) << "Adhoc query result size: " << ret_value.size(); - if (ret_value.size() < 4) { - return seastar::make_exception_future(seastar::sstring( - "Internal Error, more than 4 bytes should be returned")); - } - - seastar::sstring result( - ret_value.data() + 4, - ret_value.size() - 4); // skip 4 bytes, since the first 4 - // bytes is the size of the result - return seastar::make_ready_future(std::move(result)); } } // namespace server diff --git a/flex/engines/http_server/actor/executor.act.h b/flex/engines/http_server/actor/executor.act.h index 50aeecfc0c13..63f03f518f7e 100644 --- a/flex/engines/http_server/actor/executor.act.h +++ b/flex/engines/http_server/actor/executor.act.h @@ -30,9 +30,6 @@ class ANNOTATION(actor:impl) executor : public hiactor::actor { seastar::future ANNOTATION(actor:method) run_graph_db_query(query_param&& param); - seastar::future ANNOTATION(actor:method) run_hqps_procedure_query(hqps_proc_param&& param); - - seastar::future ANNOTATION(actor:method) run_hqps_adhoc_query(adhoc_result&& param); // DECLARE_RUN_QUERYS; /// Declare `do_work` func here, no need to implement. diff --git a/flex/engines/http_server/handler/graph_db_http_handler.cc b/flex/engines/http_server/handler/graph_db_http_handler.cc index 335eec54378d..c5e2cf3da31f 100644 --- a/flex/engines/http_server/handler/graph_db_http_handler.cc +++ b/flex/engines/http_server/handler/graph_db_http_handler.cc @@ -78,8 +78,12 @@ class graph_db_ic_handler : public seastar::httpd::handler_base { std::unique_ptr rep) override { auto dst_executor = dispatcher_.get_executor_idx(); + auto&& content = req->content; + // append int32_t value 0 to content + content.append("\0", 1); + return executor_refs_[dst_executor] - .run_graph_db_query(query_param{std::move(req->content)}) + .run_graph_db_query(query_param{std::move(content)}) .then_wrapped([rep = std::move(rep)]( seastar::future&& fut) mutable { if (__builtin_expect(fut.failed(), false)) { diff --git a/flex/engines/http_server/handler/hqps_http_handler.cc b/flex/engines/http_server/handler/hqps_http_handler.cc index 711c80e52dd0..f04d49e0d82a 100644 --- a/flex/engines/http_server/handler/hqps_http_handler.cc +++ b/flex/engines/http_server/handler/hqps_http_handler.cc @@ -19,27 +19,6 @@ namespace server { -void put_argment(gs::Encoder& encoder, const query::Argument& argment) { - auto& value = argment.value(); - auto item_case = value.item_case(); - switch (item_case) { - case common::Value::kI32: - encoder.put_int(value.i32()); - break; - case common::Value::kI64: - encoder.put_long(value.i64()); - break; - case common::Value::kF64: - encoder.put_double(value.f64()); - break; - case common::Value::kStr: - encoder.put_string(value.str()); - break; - default: - LOG(ERROR) << "Not recognizable param type" << static_cast(item_case); - } -} - hqps_ic_handler::hqps_ic_handler(uint32_t init_group_id, uint32_t max_group_id, uint32_t group_inc_step, uint32_t shard_concurrency) @@ -107,37 +86,11 @@ seastar::future> hqps_ic_handler::handle( auto dst_executor = executor_idx_; executor_idx_ = (executor_idx_ + 1) % shard_concurrency_; // first extract the query name and parameters - auto& str = req->content; - const char* str_data = str.data(); - size_t str_length = str.size(); - LOG(INFO) << "Receive pay load: " << str_length << " bytes"; - - query::Query cur_query; - if (!cur_query.ParseFromArray(str_data, str_length)) { - LOG(ERROR) << "Fail to parse query from pay load"; - rep->set_status(seastar::httpd::reply::status_type::bad_request); - rep->write_body("bin", - seastar::sstring("Fail to parse query from pay load")); - rep->done(); - return seastar::make_ready_future>( - std::move(rep)); - } - auto query_name = cur_query.query_name().name(); - - std::vector input_buffer; - gs::Encoder input_encoder(input_buffer); - auto& args = cur_query.arguments(); - for (int32_t i = 0; i < args.size(); ++i) { - auto& arg = args[i]; - put_argment(input_encoder, arg); - } - VLOG(10) << "Query name: " << query_name << ", args: " << input_buffer.size() - << " bytes"; - seastar::sstring input_str(input_buffer.data(), input_buffer.size()); + auto& input_content = req->content; + input_content.append("\1", 1); return executor_refs_[dst_executor] - .run_hqps_procedure_query(hqps_proc_param{ - std::make_pair(seastar::sstring{query_name}, input_str)}) + .run_graph_db_query(query_param{std::move(input_content)}) .then_wrapped( [rep = std::move(rep)](seastar::future&& fut) mutable { if (__builtin_expect(fut.failed(), false)) { @@ -272,8 +225,11 @@ hqps_adhoc_query_handler::handle(const seastar::sstring& path, return codegen_actor_refs_[0] .do_codegen(query_param{std::move(req->content)}) .then([this, dst_executor](auto&& param) { - return executor_refs_[dst_executor].run_hqps_adhoc_query( - std::move(param)); + auto query_path = param.content; + // Emplace back int32_t 2 to the end of the query_path + query_path.append("\2", 1); + return executor_refs_[dst_executor].run_graph_db_query( + query_param{std::move(query_path)}); }) .then_wrapped( [rep = std::move(rep)](seastar::future&& fut) mutable { diff --git a/flex/engines/http_server/types.h b/flex/engines/http_server/types.h index 25474720589b..7e55fe23adf1 100644 --- a/flex/engines/http_server/types.h +++ b/flex/engines/http_server/types.h @@ -54,9 +54,8 @@ struct payload { }; using query_param = payload; -using hqps_proc_param = payload>; + using query_result = payload; -using adhoc_result = payload>; using admin_query_result = payload>; // url_path, query_param using graph_management_param = diff --git a/flex/interactive/examples/modern_graph/count_vertex_num.cypher b/flex/interactive/examples/modern_graph/count_vertex_num.cypher index cca16c40269d..8cecc7965cb7 100644 --- a/flex/interactive/examples/modern_graph/count_vertex_num.cypher +++ b/flex/interactive/examples/modern_graph/count_vertex_num.cypher @@ -1 +1 @@ -MATCH(v:person { id: $personId}) RETURN COUNT(v); \ No newline at end of file +MATCH(v:person) RETURN COUNT(v); \ No newline at end of file From 2c1698c5553b7184bb4f09c7fc52fa7a8ba6925d Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Wed, 24 Jan 2024 20:16:32 +0800 Subject: [PATCH 08/16] use matrix to vary cmake options --- .github/workflows/flex.yml | 56 +++++++++++++++----------------------- 1 file changed, 22 insertions(+), 34 deletions(-) diff --git a/.github/workflows/flex.yml b/.github/workflows/flex.yml index fd1125eb367f..b439fd752548 100644 --- a/.github/workflows/flex.yml +++ b/.github/workflows/flex.yml @@ -50,6 +50,28 @@ jobs: mkdir build && cd build # only test default build cmake .. -DCMAKE_BUILD_TYPE=DEBUG -DBUILD_DOC=OFF && sudo make -j 4 + # test the different combination of cmake options: -DBUILD_HQPS=ON/OFF -DBUILD_TEST=ON/OFF, -DBUILD_ODPS_FRAGMENT_LOADER=ON/OFF + test-cmake-options: + runs-on: ubuntu-20.04 + container: + image: registry.cn-hongkong.aliyuncs.com/graphscope/hqps-server-base:v0.0.9 + strategy: + matrix: + BUILD_HQPS: [ON, OFF] + BUILD_TEST: [ON, OFF] + BUILD_ODPS_FRAGMENT_LOADER: [ON, OFF] + steps: + - uses: actions/checkout@v3 + + - name: Build + run: | + cd ${GITHUB_WORKSPACE}/flex + git submodule update --init + mkdir build && cd build + cmake .. -DBUILD_HQPS=${{ matrix.BUILD_HQPS }} -DBUILD_TEST=${{ matrix.BUILD_TEST }} \ + -DBUILD_ODPS_FRAGMENT_LOADER=${{ matrix.BUILD_ODPS_FRAGMENT_LOADER }} + sudo make -j4 + test-flex: runs-on: ubuntu-20.04 if: ${{ github.repository == 'alibaba/GraphScope' }} @@ -68,40 +90,6 @@ jobs: make -j$(nproc) make install - - name: Test CMake with different options - env: - HOME: /home/graphscope/ - run: | - cd ${GITHUB_WORKSPACE}/flex - git submodule update --init - # try different combination of different options. -DBUILD_HQPS=ON/OFF -DBUILD_TEST=ON/OFF, -DBUILD_ODPS_FRAGMENT_LOADER=ON/OFF - mkdir build && cd build - echo "cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=ON" - cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=ON -DCMAKE_BUILD_TYPE=Debug -DBUILD_DOC=OFF - sudo make -j$(nproc) - echo "cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=OFF" - cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=OFF -DCMAKE_BUILD_TYPE=Debug -DBUILD_DOC=OFF - sudo make -j$(nproc) - echo "cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=ON" - cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=ON -DCMAKE_BUILD_TYPE=Debug -DBUILD_DOC=OFF - sudo make -j$(nproc) - echo "cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=OFF" - cmake .. -DBUILD_HQPS=ON -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=OFF -DCMAKE_BUILD_TYPE=Debug -DBUILD_DOC=OFF - sudo make -j$(nproc) - echo "cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=ON" - cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=ON -DCMAKE_BUILD_TYPE=Debug -DBUILD_DOC=OFF - sudo make -j$(nproc) - echo "cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=OFF" - cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=ON -DBUILD_ODPS_FRAGMENT_LOADER=OFF -DCMAKE_BUILD_TYPE=Debug -DBUILD_DOC=OFF - sudo make -j$(nproc) - echo "cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=ON" - cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=ON -DCMAKE_BUILD_TYPE=Debug -DBUILD_DOC=OFF - sudo make -j$(nproc) - echo "cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=OFF" - cmake .. -DBUILD_HQPS=OFF -DBUILD_TEST=OFF -DBUILD_ODPS_FRAGMENT_LOADER=OFF -DCMAKE_BUILD_TYPE=Debug -DBUILD_DOC=OFF - sudo make -j$(nproc) - cd .. && rm -rf build - - name: Build env: HOME: /home/graphscope/ From b422cf4ab115abc76eab3c060f46715f5cb5a36e Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Thu, 25 Jan 2024 09:58:24 +0800 Subject: [PATCH 09/16] keep only one function in executor.act.h --- .github/workflows/flex.yml | 1 + flex/engines/http_server/types.h | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/flex.yml b/.github/workflows/flex.yml index b439fd752548..9f7b4f41b4c8 100644 --- a/.github/workflows/flex.yml +++ b/.github/workflows/flex.yml @@ -95,6 +95,7 @@ jobs: HOME: /home/graphscope/ run: | cd ${GITHUB_WORKSPACE}/flex + git submodule update --init mkdir build && cd build cmake .. && sudo make -j$(nproc) diff --git a/flex/engines/http_server/types.h b/flex/engines/http_server/types.h index 7e55fe23adf1..f43cd6d4fe08 100644 --- a/flex/engines/http_server/types.h +++ b/flex/engines/http_server/types.h @@ -54,7 +54,6 @@ struct payload { }; using query_param = payload; - using query_result = payload; using admin_query_result = payload>; // url_path, query_param From 75a1fff0daa87ceb3b73c04f3fe81d632abfce21 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Thu, 25 Jan 2024 10:19:32 +0800 Subject: [PATCH 10/16] move logic to GraphDBSession --- .../graph_db/database/graph_db_session.cc | 49 ++++++++++++++- .../graph_db/database/graph_db_session.h | 5 +- .../engines/http_server/actor/executor.act.cc | 61 +++---------------- .../handler/graph_db_http_handler.cc | 3 +- .../http_server/handler/hqps_http_handler.cc | 8 ++- 5 files changed, 64 insertions(+), 62 deletions(-) diff --git a/flex/engines/graph_db/database/graph_db_session.cc b/flex/engines/graph_db/database/graph_db_session.cc index 6fbe20a6a6f9..e93818389c9c 100644 --- a/flex/engines/graph_db/database/graph_db_session.cc +++ b/flex/engines/graph_db/database/graph_db_session.cc @@ -20,9 +20,34 @@ #include "flex/engines/graph_db/app/app_base.h" #include "flex/engines/graph_db/database/graph_db.h" #include "flex/engines/graph_db/database/graph_db_session.h" +#ifdef BUILD_HQPS +#include "flex/proto_generated_gie/stored_procedure.pb.h" +#endif #include "flex/utils/app_utils.h" namespace gs { +#ifdef BUILD_HQPS +void put_argment(gs::Encoder& encoder, const query::Argument& argment) { + auto& value = argment.value(); + auto item_case = value.item_case(); + switch (item_case) { + case common::Value::kI32: + encoder.put_int(value.i32()); + break; + case common::Value::kI64: + encoder.put_long(value.i64()); + break; + case common::Value::kF64: + encoder.put_double(value.f64()); + break; + case common::Value::kStr: + encoder.put_string(value.str()); + break; + default: + LOG(ERROR) << "Not recognizable param type" << static_cast(item_case); + } +} +#endif ReadTransaction GraphDBSession::GetReadTransaction() { uint32_t ts = db_.version_manager_.acquire_read_timestamp(); @@ -171,7 +196,7 @@ Result> GraphDBSession::Eval(const std::string& input) { "Query failed for procedure id:" + std::to_string((int) type), result_buffer); } - +#ifdef BUILD_HQPS // Evaluating stored procedure for hqps adhoc query, the dynamic lib is closed // immediately after the query Result> GraphDBSession::EvalAdhoc( @@ -216,7 +241,26 @@ Result> GraphDBSession::EvalAdhoc( } Result> GraphDBSession::EvalHqpsProcedure( - const std::string& query_name, Decoder& input_decoder) { + const std::string& input_content) { + query::Query cur_query; + if (!cur_query.ParseFromArray(input_content.data(), input_content.size())) { + LOG(ERROR) << "Fail to parse query from input content"; + return Result>(StatusCode::InValidArgument, + "Fail to parse query from input content", + {}); + } + auto query_name = cur_query.query_name().name(); + + std::vector input_buffer; + gs::Encoder input_encoder(input_buffer); + auto& args = cur_query.arguments(); + for (int32_t i = 0; i < args.size(); ++i) { + put_argment(input_encoder, args[i]); + } + VLOG(10) << "Query name: " << query_name << ", args: " << input_buffer.size() + << " bytes"; + gs::Decoder input_decoder(input_buffer.data(), input_buffer.size()); + if (query_name.empty()) { LOG(ERROR) << "Query name is empty"; return Result>(StatusCode::InValidArgument, @@ -280,6 +324,7 @@ Result> GraphDBSession::EvalHqpsProcedure( return Result>( StatusCode::QueryFailed, "Query failed for procedure: " + query_name, {}); } +#endif // BUILD_HQPS #undef likely diff --git a/flex/engines/graph_db/database/graph_db_session.h b/flex/engines/graph_db/database/graph_db_session.h index aa7d98206e43..c6b7a6cb50f9 100644 --- a/flex/engines/graph_db/database/graph_db_session.h +++ b/flex/engines/graph_db/database/graph_db_session.h @@ -80,13 +80,14 @@ class GraphDBSession { Result> Eval(const std::string& input); +#ifdef BUILD_HQPS // Evaluate a temporary stored procedure. close the handle of the dynamic lib // immediately. Result> EvalAdhoc(const std::string& input_lib_path); // Evaluate a stored procedure with input parameters given. - Result> EvalHqpsProcedure(const std::string& query_name, - Decoder& decoder); + Result> EvalHqpsProcedure(const std::string& input); +#endif void GetAppInfo(Encoder& result); diff --git a/flex/engines/http_server/actor/executor.act.cc b/flex/engines/http_server/actor/executor.act.cc index 896264fdd638..fdd787ef34e5 100644 --- a/flex/engines/http_server/actor/executor.act.cc +++ b/flex/engines/http_server/actor/executor.act.cc @@ -18,35 +18,10 @@ #include "flex/engines/graph_db/database/graph_db.h" #include "flex/engines/graph_db/database/graph_db_session.h" #include "nlohmann/json.hpp" -#ifdef BUILD_HQPS -#include "flex/proto_generated_gie/stored_procedure.pb.h" -#endif #include namespace server { -#ifdef BUILD_HQPS -void put_argment(gs::Encoder& encoder, const query::Argument& argment) { - auto& value = argment.value(); - auto item_case = value.item_case(); - switch (item_case) { - case common::Value::kI32: - encoder.put_int(value.i32()); - break; - case common::Value::kI64: - encoder.put_long(value.i64()); - break; - case common::Value::kF64: - encoder.put_double(value.f64()); - break; - case common::Value::kStr: - encoder.put_string(value.str()); - break; - default: - LOG(ERROR) << "Not recognizable param type" << static_cast(item_case); - } -} -#endif executor::~executor() { // finalization @@ -84,28 +59,12 @@ seastar::future executor::run_graph_db_query( } else if (type == '\1') { // hqps procedure query. #ifndef BUILD_HQPS return seastar::make_exception_future( - seastar::sstring("HQPS is not disabled, please recompile with " + seastar::sstring("HQPS is disabled, please recompile with " "BUILD_HQPS=ON to enable HQPS")); #else - query::Query cur_query; - if (!cur_query.ParseFromArray(input_content.data(), input_content.size())) { - return seastar::make_exception_future( - seastar::sstring("Can not parse the query")); - } - auto query_name = cur_query.query_name().name(); - - std::vector input_buffer; - gs::Encoder input_encoder(input_buffer); - auto& args = cur_query.arguments(); - for (int32_t i = 0; i < args.size(); ++i) { - put_argment(input_encoder, args[i]); - } - VLOG(10) << "Query name: " << query_name - << ", args: " << input_buffer.size() << " bytes"; - gs::Decoder decoder(input_buffer.data(), input_buffer.size()); auto ret = gs::GraphDB::get() .GetSession(hiactor::local_shard_id()) - .EvalHqpsProcedure(query_name, decoder); + .EvalHqpsProcedure(input_content); if (!ret.ok()) { LOG(ERROR) << "Eval failed: " << ret.status().error_message(); return seastar::make_exception_future( @@ -116,21 +75,17 @@ seastar::future executor::run_graph_db_query( return seastar::make_exception_future(seastar::sstring( "Internal Error, more than 4 bytes should be returned")); } - seastar::sstring content( - result.data() + 4, - result.size() - 4); // skip 4 bytes, since the first 4 - // bytes is the size of the result + // skip 4 bytes, since the first 4 bytes is the size of the result + seastar::sstring content(result.data() + 4, result.size() - 4); return seastar::make_ready_future(std::move(content)); #endif // BUILD_HQPS } else if (type == '\2') { // hqp adhoc query #ifndef BUILD_HQPS return seastar::make_exception_future( - seastar::sstring("HQPS is not disabled, please recompile with " + seastar::sstring("HQPS is disabled, please recompile with " "BUILD_HQPS=ON to enable HQPS")); #else LOG(INFO) << "Okay, try to run adhoc query of lib path: " << input_content; - // seastar::sstring result = server::load_and_run(content.first, - // content.second); auto ret = gs::GraphDB::get() .GetSession(hiactor::local_shard_id()) .EvalAdhoc(input_content); @@ -145,10 +100,8 @@ seastar::future executor::run_graph_db_query( return seastar::make_exception_future(seastar::sstring( "Internal Error, more than 4 bytes should be returned")); } - seastar::sstring result( - ret_value.data() + 4, - ret_value.size() - 4); // skip 4 bytes, since the first 4 - // bytes is the size of the result + // skip 4 bytes, since the first 4 bytes is the size of the result + seastar::sstring result(ret_value.data() + 4, ret_value.size() - 4); return seastar::make_ready_future(std::move(result)); #endif // BUILD_HQPS } else { diff --git a/flex/engines/http_server/handler/graph_db_http_handler.cc b/flex/engines/http_server/handler/graph_db_http_handler.cc index c5e2cf3da31f..1e6308047d3f 100644 --- a/flex/engines/http_server/handler/graph_db_http_handler.cc +++ b/flex/engines/http_server/handler/graph_db_http_handler.cc @@ -79,7 +79,8 @@ class graph_db_ic_handler : public seastar::httpd::handler_base { auto dst_executor = dispatcher_.get_executor_idx(); auto&& content = req->content; - // append int32_t value 0 to content + // append int32_t value 0 to content, indicate GraphDBSession.Eval() should + // be called. content.append("\0", 1); return executor_refs_[dst_executor] diff --git a/flex/engines/http_server/handler/hqps_http_handler.cc b/flex/engines/http_server/handler/hqps_http_handler.cc index f04d49e0d82a..6e5a7b877d56 100644 --- a/flex/engines/http_server/handler/hqps_http_handler.cc +++ b/flex/engines/http_server/handler/hqps_http_handler.cc @@ -85,9 +85,10 @@ seastar::future> hqps_ic_handler::handle( std::unique_ptr rep) { auto dst_executor = executor_idx_; executor_idx_ = (executor_idx_ + 1) % shard_concurrency_; - // first extract the query name and parameters auto& input_content = req->content; - input_content.append("\1", 1); + input_content.append( + "\1", 1); // append int32_t 1 to the end of the query, indicate + // GraphDBSession.EvalHqpsProcedure() should be called. return executor_refs_[dst_executor] .run_graph_db_query(query_param{std::move(input_content)}) @@ -226,7 +227,8 @@ hqps_adhoc_query_handler::handle(const seastar::sstring& path, .do_codegen(query_param{std::move(req->content)}) .then([this, dst_executor](auto&& param) { auto query_path = param.content; - // Emplace back int32_t 2 to the end of the query_path + // append int32_t value 2 to content, indicating + // GraphDBSession.EvalAdhoc() should be called. query_path.append("\2", 1); return executor_refs_[dst_executor].run_graph_db_query( query_param{std::move(query_path)}); From 3cff319e663b517fffea598b89400b0f3a39560c Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Thu, 25 Jan 2024 10:21:05 +0800 Subject: [PATCH 11/16] minor --- flex/engines/http_server/actor/executor.act.cc | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/flex/engines/http_server/actor/executor.act.cc b/flex/engines/http_server/actor/executor.act.cc index fdd787ef34e5..db59dae10b0e 100644 --- a/flex/engines/http_server/actor/executor.act.cc +++ b/flex/engines/http_server/actor/executor.act.cc @@ -72,8 +72,9 @@ seastar::future executor::run_graph_db_query( } auto result = ret.value(); if (result.size() < 4) { - return seastar::make_exception_future(seastar::sstring( - "Internal Error, more than 4 bytes should be returned")); + return seastar::make_exception_future( + seastar::sstring("Internal Error when calling procedure, more than 4 " + "bytes should be returned")); } // skip 4 bytes, since the first 4 bytes is the size of the result seastar::sstring content(result.data() + 4, result.size() - 4); @@ -85,7 +86,6 @@ seastar::future executor::run_graph_db_query( seastar::sstring("HQPS is disabled, please recompile with " "BUILD_HQPS=ON to enable HQPS")); #else - LOG(INFO) << "Okay, try to run adhoc query of lib path: " << input_content; auto ret = gs::GraphDB::get() .GetSession(hiactor::local_shard_id()) .EvalAdhoc(input_content); @@ -95,10 +95,10 @@ seastar::future executor::run_graph_db_query( seastar::sstring(ret.status().error_message())); } auto ret_value = ret.value(); - VLOG(10) << "Adhoc query result size: " << ret_value.size(); if (ret_value.size() < 4) { - return seastar::make_exception_future(seastar::sstring( - "Internal Error, more than 4 bytes should be returned")); + return seastar::make_exception_future( + seastar::sstring("Internal Error when running Adhoc query, more than " + "4 bytes should be returned")); } // skip 4 bytes, since the first 4 bytes is the size of the result seastar::sstring result(ret_value.data() + 4, ret_value.size() - 4); From a1518529b80fe1a4d5a6bcf6d94c3c0e23498ece Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Thu, 25 Jan 2024 15:53:26 +0800 Subject: [PATCH 12/16] add builtin app for hqps adhoc and procedure --- flex/engines/graph_db/CMakeLists.txt | 10 + flex/engines/graph_db/app/hqps_app.cc | 128 +++++++++++ flex/engines/graph_db/app/hqps_app.h | 70 ++++++ flex/engines/graph_db/database/graph_db.cc | 12 +- .../graph_db/database/graph_db_session.cc | 207 +++--------------- .../graph_db/database/graph_db_session.h | 11 +- flex/engines/http_server/CMakeLists.txt | 4 - .../engines/http_server/actor/executor.act.cc | 73 +----- flex/engines/http_server/codegen_proxy.h | 2 +- .../http_server/handler/admin_http_handler.cc | 5 +- .../handler/graph_db_http_handler.cc | 3 - .../http_server/handler/hqps_http_handler.cc | 8 +- .../http_server/handler/hqps_http_handler.h | 1 - .../http_server/workdir_manipulator.cc | 15 ++ flex/storages/rt_mutable_graph/schema.cc | 8 + flex/storages/rt_mutable_graph/schema.h | 9 + 16 files changed, 302 insertions(+), 264 deletions(-) create mode 100644 flex/engines/graph_db/app/hqps_app.cc create mode 100644 flex/engines/graph_db/app/hqps_app.h diff --git a/flex/engines/graph_db/CMakeLists.txt b/flex/engines/graph_db/CMakeLists.txt index 8a5873b13e04..852af87019b8 100644 --- a/flex/engines/graph_db/CMakeLists.txt +++ b/flex/engines/graph_db/CMakeLists.txt @@ -1,10 +1,16 @@ file(GLOB_RECURSE GRAPH_DB_SRC_FILES "${CMAKE_CURRENT_SOURCE_DIR}/app/*.cc" "${CMAKE_CURRENT_SOURCE_DIR}/database/*.cc") +if (NOT BUILD_HQPS) + list(FILTER GRAPH_DB_SRC_FILES EXCLUDE REGEX ".*hqps_app*.") +endif() add_library(flex_graph_db SHARED ${GRAPH_DB_SRC_FILES}) target_include_directories(flex_graph_db PUBLIC $) target_link_libraries(flex_graph_db flex_rt_mutable_graph flex_utils ${GLOG_LIBRARIES} ${LIBGRAPELITE_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT}) +if (BUILD_HQPS) + target_link_libraries(flex_graph_db hqps_plan_proto) +endif() install_flex_target(flex_graph_db) install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/database/graph_db.h @@ -22,4 +28,8 @@ install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/database/graph_db.h install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/app/app_base.h DESTINATION include/flex/engines/graph_db/app) +if (BUILD_HQPS) + install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/app/hqps_app.h + DESTINATION include/flex/engines/graph_db/app) +endif() diff --git a/flex/engines/graph_db/app/hqps_app.cc b/flex/engines/graph_db/app/hqps_app.cc new file mode 100644 index 000000000000..68ea9ad0f5a3 --- /dev/null +++ b/flex/engines/graph_db/app/hqps_app.cc @@ -0,0 +1,128 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "engines/graph_db/app/hqps_app.h" +// Can not add guard #ifdef BUILD_HQPS here, will cause hiactor_codegen failure. +#include "flex/proto_generated_gie/stored_procedure.pb.h" + +namespace gs { + +void put_argment(gs::Encoder& encoder, const query::Argument& argment) { + auto& value = argment.value(); + auto item_case = value.item_case(); + switch (item_case) { + case common::Value::kI32: + encoder.put_int(value.i32()); + break; + case common::Value::kI64: + encoder.put_long(value.i64()); + break; + case common::Value::kF64: + encoder.put_double(value.f64()); + break; + case common::Value::kStr: + encoder.put_string(value.str()); + break; + default: + LOG(ERROR) << "Not recognizable param type" << static_cast(item_case); + } +} + +HQPSAdhocApp::HQPSAdhocApp(GraphDBSession& graph) : graph_(graph) {} + +bool HQPSAdhocApp::Query(Decoder& input, Encoder& output) { + if (input.size() <= 4) { + LOG(ERROR) << "Invalid input for HQPSAdhocApp, input size: " + << input.size(); + return false; + } + std::string input_lib_path = std::string(input.get_string()); + auto app_factory = std::make_shared(input_lib_path); + AppWrapper app_wrapper; // wrapper should be destroyed before the factory + + if (app_factory) { + app_wrapper = app_factory->CreateApp(graph_); + if (app_wrapper.app() == NULL) { + LOG(ERROR) << "Fail to create app for adhoc query: " << input_lib_path; + return false; + } + } else { + LOG(ERROR) << "Fail to evaluate adhoc query: " << input_lib_path; + return false; + } + + return app_wrapper.app()->Query(input, output); +} + +HQPSProcedureApp::HQPSProcedureApp(GraphDBSession& graph) : graph_(graph) {} + +bool HQPSProcedureApp::Query(Decoder& input, Encoder& output) { + if (input.size() <= 0) { + LOG(ERROR) << "Invalid input for HQPSProcedureApp, input size: " + << input.size(); + return false; + } + query::Query cur_query; + if (!cur_query.ParseFromArray(input.data(), input.size())) { + LOG(ERROR) << "Fail to parse query from input content"; + return false; + } + auto query_name = cur_query.query_name().name(); + + std::vector input_buffer; + gs::Encoder input_encoder(input_buffer); + auto& args = cur_query.arguments(); + for (int32_t i = 0; i < args.size(); ++i) { + put_argment(input_encoder, args[i]); + } + VLOG(10) << "Query name: " << query_name << ", args: " << input_buffer.size() + << " bytes"; + gs::Decoder input_decoder(input_buffer.data(), input_buffer.size()); + + if (query_name.empty()) { + LOG(ERROR) << "Query name is empty"; + return false; + } + auto& app_name_to_path_index = graph_.schema().GetPlugins(); + // get procedure id from name. + if (app_name_to_path_index.count(query_name) <= 0) { + LOG(ERROR) << "Query name is not registered: " << query_name; + return false; + } + + // get app + auto type = app_name_to_path_index.at(query_name).second; + auto app = graph_.GetApp(type); + if (!app) { + LOG(ERROR) << "Fail to get app for query: " << query_name + << ", type: " << type; + return false; + } + return app->Query(input_decoder, output); +} + +HQPSAdhocAppFactory::HQPSAdhocAppFactory() {} +HQPSAdhocAppFactory::~HQPSAdhocAppFactory() {} +AppWrapper HQPSAdhocAppFactory::CreateApp(GraphDBSession& graph) { + return AppWrapper(new HQPSAdhocApp(graph), NULL); +} + +HQPSProcedureAppFactory::HQPSProcedureAppFactory() {} +HQPSProcedureAppFactory::~HQPSProcedureAppFactory() {} +AppWrapper HQPSProcedureAppFactory::CreateApp(GraphDBSession& graph) { + return AppWrapper(new HQPSProcedureApp(graph), NULL); +} + +} // namespace gs diff --git a/flex/engines/graph_db/app/hqps_app.h b/flex/engines/graph_db/app/hqps_app.h new file mode 100644 index 000000000000..d9eec0f31e8c --- /dev/null +++ b/flex/engines/graph_db/app/hqps_app.h @@ -0,0 +1,70 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ENGINES_GRAPH_DB_APP_HQPS_APP_H_ +#define ENGINES_GRAPH_DB_APP_HQPS_APP_H_ + +#include "flex/engines/graph_db/app/app_base.h" +#include "flex/engines/graph_db/database/graph_db_session.h" + +namespace gs { + +/** + * @brief HQPSAdhocApp is a builtin, proxy app used to evaluate adhoc query. + */ +class HQPSAdhocApp : public AppBase { + public: + HQPSAdhocApp(GraphDBSession& graph); + + bool Query(Decoder& input, Encoder& output) override; + + private: + GraphDBSession& graph_; +}; + +/** + * @brief HQPSProcedureApp is a builtin, proxy app used to evaluate procedure + * query. + */ +class HQPSProcedureApp : public AppBase { + public: + HQPSProcedureApp(GraphDBSession& graph); + + bool Query(Decoder& input, Encoder& output) override; + + private: + GraphDBSession& graph_; +}; + +// Factory +class HQPSAdhocAppFactory : public AppFactoryBase { + public: + HQPSAdhocAppFactory(); + ~HQPSAdhocAppFactory(); + + AppWrapper CreateApp(GraphDBSession& graph) override; +}; + +// Factory +class HQPSProcedureAppFactory : public AppFactoryBase { + public: + HQPSProcedureAppFactory(); + ~HQPSProcedureAppFactory(); + + AppWrapper CreateApp(GraphDBSession& graph) override; +}; +} // namespace gs + +#endif // ENGINES_GRAPH_DB_APP_HQPS_APP_H_ diff --git a/flex/engines/graph_db/database/graph_db.cc b/flex/engines/graph_db/database/graph_db.cc index 24f553d665e6..3136562e0020 100644 --- a/flex/engines/graph_db/database/graph_db.cc +++ b/flex/engines/graph_db/database/graph_db.cc @@ -14,9 +14,9 @@ */ #include "flex/engines/graph_db/database/graph_db.h" -#include "flex/engines/graph_db/database/graph_db_session.h" - +#include "flex/engines/graph_db/app/hqps_app.h" #include "flex/engines/graph_db/app/server_app.h" +#include "flex/engines/graph_db/database/graph_db_session.h" #include "flex/engines/graph_db/database/wal.h" #include "flex/utils/yaml_utils.h" @@ -351,7 +351,15 @@ void GraphDB::initApps( for (size_t i = 0; i < 256; ++i) { app_factories_[i] = nullptr; } + // Builtin apps app_factories_[0] = std::make_shared(); +#ifdef BUILD_HQPS + app_factories_[Schema::HQPS_ADHOC_PLUGIN_ID] = + std::make_shared(); + app_factories_[Schema::HQPS_PROCEDURE_PLUGIN_ID] = + std::make_shared(); +#endif // BUILD_HQPS + size_t valid_plugins = 0; for (auto& path_and_index : plugins) { auto path = path_and_index.second.first; diff --git a/flex/engines/graph_db/database/graph_db_session.cc b/flex/engines/graph_db/database/graph_db_session.cc index e93818389c9c..9ec55ebe0b41 100644 --- a/flex/engines/graph_db/database/graph_db_session.cc +++ b/flex/engines/graph_db/database/graph_db_session.cc @@ -20,34 +20,9 @@ #include "flex/engines/graph_db/app/app_base.h" #include "flex/engines/graph_db/database/graph_db.h" #include "flex/engines/graph_db/database/graph_db_session.h" -#ifdef BUILD_HQPS -#include "flex/proto_generated_gie/stored_procedure.pb.h" -#endif #include "flex/utils/app_utils.h" namespace gs { -#ifdef BUILD_HQPS -void put_argment(gs::Encoder& encoder, const query::Argument& argment) { - auto& value = argment.value(); - auto item_case = value.item_case(); - switch (item_case) { - case common::Value::kI32: - encoder.put_int(value.i32()); - break; - case common::Value::kI64: - encoder.put_long(value.i64()); - break; - case common::Value::kF64: - encoder.put_double(value.f64()); - break; - case common::Value::kStr: - encoder.put_string(value.str()); - break; - default: - LOG(ERROR) << "Not recognizable param type" << static_cast(item_case); - } -} -#endif ReadTransaction GraphDBSession::GetReadTransaction() { uint32_t ts = db_.version_manager_.acquire_read_timestamp(); @@ -128,8 +103,6 @@ std::shared_ptr GraphDBSession::get_vertex_id_column( } } -#define likely(x) __builtin_expect(!!(x), 1) - Result> GraphDBSession::Eval(const std::string& input) { const auto start = std::chrono::high_resolution_clock::now(); uint8_t type = input.back(); @@ -141,22 +114,11 @@ Result> GraphDBSession::Eval(const std::string& input) { Decoder decoder(str_data, str_len); Encoder encoder(result_buffer); - AppBase* app = nullptr; - if (likely(apps_[type] != nullptr)) { - app = apps_[type]; - } else { - app_wrappers_[type] = db_.CreateApp(type, thread_id_); - if (app_wrappers_[type].app() == NULL) { - LOG(ERROR) << "[Query-" + std::to_string((int) type) - << "] is not registered..."; - return Result>( - StatusCode::NotExists, - "Query:" + std::to_string((int) type) + " is not registere", - result_buffer); - } else { - apps_[type] = app_wrappers_[type].app(); - app = apps_[type]; - } + AppBase* app = GetApp(type); + if (!app) { + return Result>( + StatusCode::NotFound, + "Procedure not found, id:" + std::to_string((int) type), result_buffer); } for (size_t i = 0; i < MAX_RETRY; ++i) { @@ -196,137 +158,6 @@ Result> GraphDBSession::Eval(const std::string& input) { "Query failed for procedure id:" + std::to_string((int) type), result_buffer); } -#ifdef BUILD_HQPS -// Evaluating stored procedure for hqps adhoc query, the dynamic lib is closed -// immediately after the query -Result> GraphDBSession::EvalAdhoc( - const std::string& input_lib_path) { - std::vector result_buffer; - std::vector input_buffer; // empty. Adhoc query receives no input - Decoder decoder(input_buffer.data(), input_buffer.size()); - Encoder encoder(result_buffer); - - // the dynamic library will automatically be closed after the query - auto app_factory = std::make_shared(input_lib_path); - AppWrapper app_wrapper; // wrapper should be destroyed before the factory - - if (app_factory) { - app_wrapper = app_factory->CreateApp(*this); - if (app_wrapper.app() == NULL) { - LOG(ERROR) << "Fail to create app for adhoc query: " << input_lib_path; - return Result>( - StatusCode::InternalError, - "Fail to create app for: " + input_lib_path, result_buffer); - } - } else { - LOG(ERROR) << "Fail to evaluate adhoc query: " << input_lib_path; - return Result>( - StatusCode::NotExists, - "Fail to open dynamic lib for: " + input_lib_path, result_buffer); - } - - for (size_t i = 0; i < MAX_RETRY; ++i) { - if (app_wrapper.app()->Query(decoder, encoder)) { - return result_buffer; - } - - LOG(INFO) << "[Query-" << input_lib_path << "][Thread-" << thread_id_ - << "] retry - " << i << " / " << MAX_RETRY; - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - result_buffer.clear(); - } - return Result>( - StatusCode::QueryFailed, - "Query failed for adhoc query: " + input_lib_path, result_buffer); -} - -Result> GraphDBSession::EvalHqpsProcedure( - const std::string& input_content) { - query::Query cur_query; - if (!cur_query.ParseFromArray(input_content.data(), input_content.size())) { - LOG(ERROR) << "Fail to parse query from input content"; - return Result>(StatusCode::InValidArgument, - "Fail to parse query from input content", - {}); - } - auto query_name = cur_query.query_name().name(); - - std::vector input_buffer; - gs::Encoder input_encoder(input_buffer); - auto& args = cur_query.arguments(); - for (int32_t i = 0; i < args.size(); ++i) { - put_argment(input_encoder, args[i]); - } - VLOG(10) << "Query name: " << query_name << ", args: " << input_buffer.size() - << " bytes"; - gs::Decoder input_decoder(input_buffer.data(), input_buffer.size()); - - if (query_name.empty()) { - LOG(ERROR) << "Query name is empty"; - return Result>(StatusCode::InValidArgument, - "Query name is empty", {}); - } - auto& app_name_to_path_index = db_.schema().GetPlugins(); - // get procedure id from name. - if (app_name_to_path_index.count(query_name) <= 0) { - LOG(ERROR) << "Query name is not registered: " << query_name; - return Result>( - StatusCode::NotExists, "Query name is not registered: " + query_name, - {}); - } - - // get app - auto type = app_name_to_path_index.at(query_name).second; - if (type >= apps_.size()) { - LOG(ERROR) << "Query type is not registered: " << type; - return Result>( - StatusCode::NotExists, - "Query type is not registered: " + std::to_string(type), {}); - } - AppBase* app = nullptr; - if (likely(apps_[type] != nullptr)) { - app = apps_[type]; - } else { - app_wrappers_[type] = db_.CreateApp(type, thread_id_); - if (app_wrappers_[type].app() == NULL) { - LOG(ERROR) << "[Query-" + std::to_string((int) type) - << "] is not registered..."; - return Result>( - StatusCode::NotExists, - "Query:" + std::to_string((int) type) + " is not registered", {}); - } else { - apps_[type] = app_wrappers_[type].app(); - app = apps_[type]; - } - } - - if (app == nullptr) { - LOG(ERROR) << "Query type is not registered: " << type - << ", query name: " << query_name; - return Result>( - StatusCode::NotExists, - "Query type is not registered: " + std::to_string(type), {}); - } - const char* input_char = input_decoder.data(); - size_t input_size = input_decoder.size(); - - for (size_t i = 0; i < MAX_RETRY; ++i) { - std::vector result_buffer; - gs::Encoder result_encoder(result_buffer); - if (app->Query(input_decoder, result_encoder)) { - return Result>(std::move(result_buffer)); - } - LOG(INFO) << "[Query-" << query_name << "][Thread-" << thread_id_ - << "] retry - " << i << " / " << MAX_RETRY; - std::this_thread::sleep_for(std::chrono::milliseconds(1)); - input_decoder.reset(input_char, input_size); - } - return Result>( - StatusCode::QueryFailed, "Query failed for procedure: " + query_name, {}); -} -#endif // BUILD_HQPS - -#undef likely void GraphDBSession::GetAppInfo(Encoder& result) { db_.GetAppInfo(result); } @@ -358,6 +189,34 @@ double GraphDBSession::eval_duration() const { int64_t GraphDBSession::query_num() const { return query_num_.load(); } +#define likely(x) __builtin_expect(!!(x), 1) + +AppBase* GraphDBSession::GetApp(int type) { + // create if not exist + if (type > Schema::MAX_PLUGIN_ID) { + LOG(ERROR) << "Query type is out of range: " << type << " > " + << Schema::MAX_PLUGIN_ID; + return nullptr; + } + AppBase* app = nullptr; + if (likely(apps_[type] != nullptr)) { + app = apps_[type]; + } else { + app_wrappers_[type] = db_.CreateApp(type, thread_id_); + if (app_wrappers_[type].app() == NULL) { + LOG(ERROR) << "[Query-" + std::to_string((int) type) + << "] is not registered..."; + return nullptr; + } else { + apps_[type] = app_wrappers_[type].app(); + app = apps_[type]; + } + } + return app; +} + +#undef likely // likely + const AppMetric& GraphDBSession::GetAppMetric(int idx) const { return app_metrics_[idx]; } diff --git a/flex/engines/graph_db/database/graph_db_session.h b/flex/engines/graph_db/database/graph_db_session.h index c6b7a6cb50f9..5d234109a1d7 100644 --- a/flex/engines/graph_db/database/graph_db_session.h +++ b/flex/engines/graph_db/database/graph_db_session.h @@ -80,15 +80,6 @@ class GraphDBSession { Result> Eval(const std::string& input); -#ifdef BUILD_HQPS - // Evaluate a temporary stored procedure. close the handle of the dynamic lib - // immediately. - Result> EvalAdhoc(const std::string& input_lib_path); - - // Evaluate a stored procedure with input parameters given. - Result> EvalHqpsProcedure(const std::string& input); -#endif - void GetAppInfo(Encoder& result); int SessionId() const; @@ -103,6 +94,8 @@ class GraphDBSession { int64_t query_num() const; + AppBase* GetApp(int idx); + private: GraphDB& db_; Allocator& alloc_; diff --git a/flex/engines/http_server/CMakeLists.txt b/flex/engines/http_server/CMakeLists.txt index 5ce856a94290..d5e505c8f9ed 100644 --- a/flex/engines/http_server/CMakeLists.txt +++ b/flex/engines/http_server/CMakeLists.txt @@ -6,12 +6,9 @@ if (Hiactor_FOUND) SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/ INCLUDE_PATHS ${Hiactor_INCLUDE_DIR},${CMAKE_CURRENT_SOURCE_DIR}/../../../,${CMAKE_CURRENT_SOURCE_DIR}/../../third_party/nlohmann-json/single_include/) - message(STATUS "server_actor_autogen_files: ${server_actor_autogen_files}") - if (NOT BUILD_HQPS) list(FILTER server_actor_autogen_files EXCLUDE REGEX ".*admin.*") list(FILTER server_actor_autogen_files EXCLUDE REGEX ".*codegen.*") - message(STATUS "server_actor_autogen_files: ${server_actor_autogen_files}") endif () # get all .cc files in current directory, except for generated/ @@ -24,7 +21,6 @@ if (Hiactor_FOUND) list(FILTER SERVER_FILES EXCLUDE REGEX ".*codegen*") list(FILTER SERVER_FILES EXCLUDE REGEX ".*workdir_manipulator*") endif () - message(STATUS "SERVER_FILES: ${SERVER_FILES}") add_library(flex_server STATIC ${SERVER_FILES} ${server_actor_autogen_files}) add_dependencies(flex_server server_actor_autogen) diff --git a/flex/engines/http_server/actor/executor.act.cc b/flex/engines/http_server/actor/executor.act.cc index db59dae10b0e..9a834ff2357f 100644 --- a/flex/engines/http_server/actor/executor.act.cc +++ b/flex/engines/http_server/actor/executor.act.cc @@ -40,74 +40,23 @@ seastar::future executor::run_graph_db_query( auto& input_content = param.content; if (input_content.size() < 1) { return seastar::make_exception_future( - seastar::sstring("Invalid status")); + seastar::sstring("Invalid input, input size: ") + + std::to_string(input_content.size())); } // get the last byte char type = input_content[input_content.size() - 1]; input_content.resize(input_content.size() - 1); - LOG(INFO) << "Run graph db query, type: " << type; - if (type == '\0') { // graph_db query - auto ret = gs::GraphDB::get() - .GetSession(hiactor::local_shard_id()) - .Eval(input_content); - if (!ret.ok()) { - LOG(ERROR) << "Eval failed: " << ret.status().error_message(); - } - auto result = ret.value(); - seastar::sstring content(result.data(), result.size()); - return seastar::make_ready_future(std::move(content)); - } else if (type == '\1') { // hqps procedure query. -#ifndef BUILD_HQPS + auto ret = gs::GraphDB::get() + .GetSession(hiactor::local_shard_id()) + .Eval(input_content); + if (!ret.ok()) { + LOG(ERROR) << "Eval failed: " << ret.status().error_message(); return seastar::make_exception_future( - seastar::sstring("HQPS is disabled, please recompile with " - "BUILD_HQPS=ON to enable HQPS")); -#else - auto ret = gs::GraphDB::get() - .GetSession(hiactor::local_shard_id()) - .EvalHqpsProcedure(input_content); - if (!ret.ok()) { - LOG(ERROR) << "Eval failed: " << ret.status().error_message(); - return seastar::make_exception_future( - seastar::sstring(ret.status().error_message())); - } - auto result = ret.value(); - if (result.size() < 4) { - return seastar::make_exception_future( - seastar::sstring("Internal Error when calling procedure, more than 4 " - "bytes should be returned")); - } - // skip 4 bytes, since the first 4 bytes is the size of the result - seastar::sstring content(result.data() + 4, result.size() - 4); - return seastar::make_ready_future(std::move(content)); -#endif // BUILD_HQPS - } else if (type == '\2') { // hqp adhoc query -#ifndef BUILD_HQPS - return seastar::make_exception_future( - seastar::sstring("HQPS is disabled, please recompile with " - "BUILD_HQPS=ON to enable HQPS")); -#else - auto ret = gs::GraphDB::get() - .GetSession(hiactor::local_shard_id()) - .EvalAdhoc(input_content); - if (!ret.ok()) { - LOG(ERROR) << "Eval failed: " << ret.status().error_message(); - return seastar::make_exception_future( - seastar::sstring(ret.status().error_message())); - } - auto ret_value = ret.value(); - if (ret_value.size() < 4) { - return seastar::make_exception_future( - seastar::sstring("Internal Error when running Adhoc query, more than " - "4 bytes should be returned")); - } - // skip 4 bytes, since the first 4 bytes is the size of the result - seastar::sstring result(ret_value.data() + 4, ret_value.size() - 4); - return seastar::make_ready_future(std::move(result)); -#endif // BUILD_HQPS - } else { - seastar::sstring error_msg = "Invalid query type: " + std::to_string(type); - return seastar::make_exception_future(error_msg); + seastar::sstring("Eval failed: ") + ret.status().error_message()); } + auto result = ret.value(); + seastar::sstring content(result.data(), result.size()); + return seastar::make_ready_future(std::move(content)); } } // namespace server diff --git a/flex/engines/http_server/codegen_proxy.h b/flex/engines/http_server/codegen_proxy.h index 44a0a6994ecb..c71b11a2d832 100644 --- a/flex/engines/http_server/codegen_proxy.h +++ b/flex/engines/http_server/codegen_proxy.h @@ -15,13 +15,13 @@ #ifndef ENGINES_HQPS_SERVER_CODEGEN_PROXY_H_ #define ENGINES_HQPS_SERVER_CODEGEN_PROXY_H_ +#include #include #include #include #include #include #include -#include #include "glog/logging.h" diff --git a/flex/engines/http_server/handler/admin_http_handler.cc b/flex/engines/http_server/handler/admin_http_handler.cc index 99823bccb4b2..415de6f7110d 100644 --- a/flex/engines/http_server/handler/admin_http_handler.cc +++ b/flex/engines/http_server/handler/admin_http_handler.cc @@ -508,8 +508,9 @@ void admin_http_handler::start() { .then([this] { return set_routes(); }) .then([this] { return server_.listen(http_port_); }) .then([this] { - fmt::print("HQPS admin http handler is listening on port {} ...\n", - http_port_); + fmt::print( + "HQPS admin http handler is listening on port {} ...\n", + http_port_); }); }); fut.wait(); diff --git a/flex/engines/http_server/handler/graph_db_http_handler.cc b/flex/engines/http_server/handler/graph_db_http_handler.cc index 1e6308047d3f..a16103036885 100644 --- a/flex/engines/http_server/handler/graph_db_http_handler.cc +++ b/flex/engines/http_server/handler/graph_db_http_handler.cc @@ -79,9 +79,6 @@ class graph_db_ic_handler : public seastar::httpd::handler_base { auto dst_executor = dispatcher_.get_executor_idx(); auto&& content = req->content; - // append int32_t value 0 to content, indicate GraphDBSession.Eval() should - // be called. - content.append("\0", 1); return executor_refs_[dst_executor] .run_graph_db_query(query_param{std::move(content)}) diff --git a/flex/engines/http_server/handler/hqps_http_handler.cc b/flex/engines/http_server/handler/hqps_http_handler.cc index 6e5a7b877d56..5cefef83b5ae 100644 --- a/flex/engines/http_server/handler/hqps_http_handler.cc +++ b/flex/engines/http_server/handler/hqps_http_handler.cc @@ -86,9 +86,7 @@ seastar::future> hqps_ic_handler::handle( auto dst_executor = executor_idx_; executor_idx_ = (executor_idx_ + 1) % shard_concurrency_; auto& input_content = req->content; - input_content.append( - "\1", 1); // append int32_t 1 to the end of the query, indicate - // GraphDBSession.EvalHqpsProcedure() should be called. + input_content.append(gs::Schema::HQPS_PROCEDURE_PLUGIN_ID_STR, 1); return executor_refs_[dst_executor] .run_graph_db_query(query_param{std::move(input_content)}) @@ -227,9 +225,7 @@ hqps_adhoc_query_handler::handle(const seastar::sstring& path, .do_codegen(query_param{std::move(req->content)}) .then([this, dst_executor](auto&& param) { auto query_path = param.content; - // append int32_t value 2 to content, indicating - // GraphDBSession.EvalAdhoc() should be called. - query_path.append("\2", 1); + query_path.append(gs::Schema::HQPS_ADHOC_PLUGIN_ID_STR, 1); return executor_refs_[dst_executor].run_graph_db_query( query_param{std::move(query_path)}); }) diff --git a/flex/engines/http_server/handler/hqps_http_handler.h b/flex/engines/http_server/handler/hqps_http_handler.h index 5c620e575d67..b3e62ec7af5c 100644 --- a/flex/engines/http_server/handler/hqps_http_handler.h +++ b/flex/engines/http_server/handler/hqps_http_handler.h @@ -25,7 +25,6 @@ #include "flex/utils/app_utils.h" namespace server { -void put_argment(gs::Encoder& encoder, const query::Argument& argment); class hqps_ic_handler : public seastar::httpd::handler_base { public: diff --git a/flex/engines/http_server/workdir_manipulator.cc b/flex/engines/http_server/workdir_manipulator.cc index 0c00581fe179..56106449c784 100644 --- a/flex/engines/http_server/workdir_manipulator.cc +++ b/flex/engines/http_server/workdir_manipulator.cc @@ -1,3 +1,18 @@ +/** Copyright 2020 Alibaba Group Holding Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + #include "flex/engines/http_server/workdir_manipulator.h" #include "flex/engines/http_server/codegen_proxy.h" diff --git a/flex/storages/rt_mutable_graph/schema.cc b/flex/storages/rt_mutable_graph/schema.cc index be5eea319042..dcfcc719ee9d 100644 --- a/flex/storages/rt_mutable_graph/schema.cc +++ b/flex/storages/rt_mutable_graph/schema.cc @@ -927,6 +927,10 @@ bool Schema::EmplacePlugins( uint8_t cur_plugin_id = RESERVED_PLUGIN_NUM; std::unordered_set plugin_names; for (auto& f : plugin_paths_or_names) { + if (cur_plugin_id > MAX_PLUGIN_ID) { + LOG(ERROR) << "Too many plugins, max plugin id is " << MAX_PLUGIN_ID; + return false; + } if (std::filesystem::exists(f)) { plugin_name_to_path_and_id_.emplace(f, std::make_pair(f, cur_plugin_id++)); @@ -949,6 +953,10 @@ bool Schema::EmplacePlugins( // if there exists any plugins specified by name, add them // Iterator over the map, and add the plugin path and name to the vector for (auto cur_yaml : all_procedure_yamls) { + if (cur_plugin_id > MAX_PLUGIN_ID) { + LOG(ERROR) << "Too many plugins, max plugin id is " << MAX_PLUGIN_ID; + return false; + } YAML::Node root; try { root = YAML::LoadFile(cur_yaml); diff --git a/flex/storages/rt_mutable_graph/schema.h b/flex/storages/rt_mutable_graph/schema.h index 7f69cbac14b5..3eefc91a4f9d 100644 --- a/flex/storages/rt_mutable_graph/schema.h +++ b/flex/storages/rt_mutable_graph/schema.h @@ -30,6 +30,15 @@ class Schema { // How many built-in plugins are there. // Currently only one builtin plugin, SERVER_APP is supported. static constexpr uint8_t RESERVED_PLUGIN_NUM = 1; +#ifdef BUILD_HQPS + static constexpr uint8_t MAX_PLUGIN_ID = 253; + static constexpr uint8_t HQPS_ADHOC_PLUGIN_ID = 254; + static constexpr uint8_t HQPS_PROCEDURE_PLUGIN_ID = 255; + static constexpr const char* HQPS_ADHOC_PLUGIN_ID_STR = "\xFE"; + static constexpr const char* HQPS_PROCEDURE_PLUGIN_ID_STR = "\xFF"; +#else + static constexpr uint8_t MAX_PLUGIN_ID = 255; +#endif // BUILD_HQPS static constexpr const char* PRIMITIVE_TYPE_KEY = "primitive_type"; static constexpr const char* VARCHAR_KEY = "varchar"; static constexpr const char* MAX_LENGTH_KEY = "max_length"; From 3ab8a993740bcde7bf75eb19fc6593c4649c4cbf Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Thu, 25 Jan 2024 16:09:54 +0800 Subject: [PATCH 13/16] minor fix --- flex/engines/graph_db/app/hqps_app.cc | 3 +-- flex/engines/http_server/actor/executor.act.cc | 13 +------------ .../http_server/handler/graph_db_http_handler.cc | 4 +--- .../http_server/handler/hqps_http_handler.cc | 10 ++++------ .../engines/http_server/handler/hqps_http_handler.h | 2 -- 5 files changed, 7 insertions(+), 25 deletions(-) diff --git a/flex/engines/graph_db/app/hqps_app.cc b/flex/engines/graph_db/app/hqps_app.cc index 68ea9ad0f5a3..b6e1aa3a4020 100644 --- a/flex/engines/graph_db/app/hqps_app.cc +++ b/flex/engines/graph_db/app/hqps_app.cc @@ -13,8 +13,7 @@ * limitations under the License. */ -#include "engines/graph_db/app/hqps_app.h" -// Can not add guard #ifdef BUILD_HQPS here, will cause hiactor_codegen failure. +#include "flex/engines/graph_db/app/hqps_app.h" #include "flex/proto_generated_gie/stored_procedure.pb.h" namespace gs { diff --git a/flex/engines/http_server/actor/executor.act.cc b/flex/engines/http_server/actor/executor.act.cc index 9a834ff2357f..e0bcdcaeb177 100644 --- a/flex/engines/http_server/actor/executor.act.cc +++ b/flex/engines/http_server/actor/executor.act.cc @@ -37,22 +37,11 @@ executor::executor(hiactor::actor_base* exec_ctx, const hiactor::byte_t* addr) seastar::future executor::run_graph_db_query( query_param&& param) { - auto& input_content = param.content; - if (input_content.size() < 1) { - return seastar::make_exception_future( - seastar::sstring("Invalid input, input size: ") + - std::to_string(input_content.size())); - } - // get the last byte - char type = input_content[input_content.size() - 1]; - input_content.resize(input_content.size() - 1); auto ret = gs::GraphDB::get() .GetSession(hiactor::local_shard_id()) - .Eval(input_content); + .Eval(param.content); if (!ret.ok()) { LOG(ERROR) << "Eval failed: " << ret.status().error_message(); - return seastar::make_exception_future( - seastar::sstring("Eval failed: ") + ret.status().error_message()); } auto result = ret.value(); seastar::sstring content(result.data(), result.size()); diff --git a/flex/engines/http_server/handler/graph_db_http_handler.cc b/flex/engines/http_server/handler/graph_db_http_handler.cc index a16103036885..335eec54378d 100644 --- a/flex/engines/http_server/handler/graph_db_http_handler.cc +++ b/flex/engines/http_server/handler/graph_db_http_handler.cc @@ -78,10 +78,8 @@ class graph_db_ic_handler : public seastar::httpd::handler_base { std::unique_ptr rep) override { auto dst_executor = dispatcher_.get_executor_idx(); - auto&& content = req->content; - return executor_refs_[dst_executor] - .run_graph_db_query(query_param{std::move(content)}) + .run_graph_db_query(query_param{std::move(req->content)}) .then_wrapped([rep = std::move(rep)]( seastar::future&& fut) mutable { if (__builtin_expect(fut.failed(), false)) { diff --git a/flex/engines/http_server/handler/hqps_http_handler.cc b/flex/engines/http_server/handler/hqps_http_handler.cc index 5cefef83b5ae..576548831366 100644 --- a/flex/engines/http_server/handler/hqps_http_handler.cc +++ b/flex/engines/http_server/handler/hqps_http_handler.cc @@ -85,11 +85,10 @@ seastar::future> hqps_ic_handler::handle( std::unique_ptr rep) { auto dst_executor = executor_idx_; executor_idx_ = (executor_idx_ + 1) % shard_concurrency_; - auto& input_content = req->content; - input_content.append(gs::Schema::HQPS_PROCEDURE_PLUGIN_ID_STR, 1); + req->content.append(gs::Schema::HQPS_PROCEDURE_PLUGIN_ID_STR, 1); return executor_refs_[dst_executor] - .run_graph_db_query(query_param{std::move(input_content)}) + .run_graph_db_query(query_param{std::move(req->content)}) .then_wrapped( [rep = std::move(rep)](seastar::future&& fut) mutable { if (__builtin_expect(fut.failed(), false)) { @@ -224,10 +223,9 @@ hqps_adhoc_query_handler::handle(const seastar::sstring& path, return codegen_actor_refs_[0] .do_codegen(query_param{std::move(req->content)}) .then([this, dst_executor](auto&& param) { - auto query_path = param.content; - query_path.append(gs::Schema::HQPS_ADHOC_PLUGIN_ID_STR, 1); + param.content.append(gs::Schema::HQPS_ADHOC_PLUGIN_ID_STR, 1); return executor_refs_[dst_executor].run_graph_db_query( - query_param{std::move(query_path)}); + query_param{std::move(param.content)}); }) .then_wrapped( [rep = std::move(rep)](seastar::future&& fut) mutable { diff --git a/flex/engines/http_server/handler/hqps_http_handler.h b/flex/engines/http_server/handler/hqps_http_handler.h index b3e62ec7af5c..62237280641f 100644 --- a/flex/engines/http_server/handler/hqps_http_handler.h +++ b/flex/engines/http_server/handler/hqps_http_handler.h @@ -21,8 +21,6 @@ #include #include "flex/engines/http_server/generated/actor/codegen_actor_ref.act.autogen.h" #include "flex/engines/http_server/generated/actor/executor_ref.act.autogen.h" -#include "flex/proto_generated_gie/stored_procedure.pb.h" -#include "flex/utils/app_utils.h" namespace server { From a04e6302f6b602273b097de981ae8101e95c6feb Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Thu, 25 Jan 2024 16:15:37 +0800 Subject: [PATCH 14/16] minor --- flex/engines/http_server/CMakeLists.txt | 5 ----- 1 file changed, 5 deletions(-) diff --git a/flex/engines/http_server/CMakeLists.txt b/flex/engines/http_server/CMakeLists.txt index d5e505c8f9ed..2123b59c29f6 100644 --- a/flex/engines/http_server/CMakeLists.txt +++ b/flex/engines/http_server/CMakeLists.txt @@ -31,11 +31,6 @@ if (Hiactor_FOUND) if (BUILD_HQPS) target_include_directories(flex_server PUBLIC ${CMAKE_CURRENT_BINARY_DIR}/../hqps/) target_link_libraries(flex_server hqps_plan_proto) - else () - # find protobuf and link, since hqps is not built - find_package(Protobuf REQUIRED) - target_include_directories(flex_server PUBLIC ${PROTOBUF_INCLUDE_DIRS}) - target_link_libraries(flex_server ${PROTOBUF_LIBRARIES}) endif () install_without_export_flex_target(flex_server) From 8ffcdfd1633d1a3e7dc3dc5e6bc4b1170de444be Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Thu, 25 Jan 2024 16:49:43 +0800 Subject: [PATCH 15/16] fix --- flex/codegen/src/hqps_generator.h | 4 +++- flex/engines/graph_db/database/graph_db_session.cc | 4 ++-- flex/engines/graph_db/database/graph_db_session.h | 7 ++++--- flex/utils/app_utils.cc | 12 +++++++++--- flex/utils/app_utils.h | 2 ++ 5 files changed, 20 insertions(+), 9 deletions(-) diff --git a/flex/codegen/src/hqps_generator.h b/flex/codegen/src/hqps_generator.h index 4890a799ad2c..7ca2686e53e3 100644 --- a/flex/codegen/src/hqps_generator.h +++ b/flex/codegen/src/hqps_generator.h @@ -71,7 +71,9 @@ static constexpr const char* QUERY_TEMPLATE_STR = " // dump results to string\n" " std::string res_str = res.SerializeAsString();\n" " // encode results to encoder\n" - " encoder.put_string(res_str);\n" + " if (!res_str.empty()){\n" + " encoder.put_raw_bytes(res_str.data(), res_str.size());\n" + " }\n" " return true;\n" " }\n" " //private members\n" diff --git a/flex/engines/graph_db/database/graph_db_session.cc b/flex/engines/graph_db/database/graph_db_session.cc index 9ec55ebe0b41..ceaeca3c7b6e 100644 --- a/flex/engines/graph_db/database/graph_db_session.cc +++ b/flex/engines/graph_db/database/graph_db_session.cc @@ -193,9 +193,9 @@ int64_t GraphDBSession::query_num() const { return query_num_.load(); } AppBase* GraphDBSession::GetApp(int type) { // create if not exist - if (type > Schema::MAX_PLUGIN_ID) { + if (type >= GraphDBSession::MAX_PLUGIN_NUM) { LOG(ERROR) << "Query type is out of range: " << type << " > " - << Schema::MAX_PLUGIN_ID; + << GraphDBSession::MAX_PLUGIN_NUM; return nullptr; } AppBase* app = nullptr; diff --git a/flex/engines/graph_db/database/graph_db_session.h b/flex/engines/graph_db/database/graph_db_session.h index 5d234109a1d7..e4adee1d84a3 100644 --- a/flex/engines/graph_db/database/graph_db_session.h +++ b/flex/engines/graph_db/database/graph_db_session.h @@ -36,6 +36,7 @@ class WalWriter; class GraphDBSession { public: static constexpr int32_t MAX_RETRY = 3; + static constexpr int32_t MAX_PLUGIN_NUM = 256; // 2^(sizeof(uint8_t)*8) GraphDBSession(GraphDB& db, Allocator& alloc, WalWriter& logger, const std::string& work_dir, int thread_id) : db_(db), @@ -103,9 +104,9 @@ class GraphDBSession { std::string work_dir_; int thread_id_; - std::array app_wrappers_; - std::array apps_; - std::array app_metrics_; + std::array app_wrappers_; + std::array apps_; + std::array app_metrics_; #ifdef MONITOR_SESSIONS std::atomic eval_duration_; diff --git a/flex/utils/app_utils.cc b/flex/utils/app_utils.cc index d476df68e9be..43c069f198ec 100644 --- a/flex/utils/app_utils.cc +++ b/flex/utils/app_utils.cc @@ -95,12 +95,18 @@ void Encoder::put_small_string_view(const std::string_view& v) { memcpy(&buf_[size + 1], v.data(), len); } -void Encoder::put_double(double v){ +void Encoder::put_double(double v) { size_t size = buf_.size(); buf_.resize(size + sizeof(double)); memcpy(&buf_[size], &v, sizeof(double)); } +void Encoder::put_raw_bytes(const char* ptr, size_t size) { + size_t old_size = buf_.size(); + buf_.resize(old_size + size); + memcpy(&buf_[old_size], ptr, size); +} + void Encoder::clear() { buf_.clear(); } static int64_t char_ptr_to_long(const char* data) { @@ -113,7 +119,7 @@ static int char_ptr_to_int(const char* data) { return *ptr; } -static double char_ptr_to_double(const char* data){ +static double char_ptr_to_double(const char* data) { const double* ptr = reinterpret_cast(data); return *ptr; } @@ -130,7 +136,7 @@ int64_t Decoder::get_long() { return ret; } -double Decoder::get_double(){ +double Decoder::get_double() { double ret = char_ptr_to_double(data_); data_ += 8; return ret; diff --git a/flex/utils/app_utils.h b/flex/utils/app_utils.h index a8156376dc95..99de1ec48a3b 100644 --- a/flex/utils/app_utils.h +++ b/flex/utils/app_utils.h @@ -54,6 +54,8 @@ class Encoder { void put_double(double v); + void put_raw_bytes(const char* ptr, size_t size); + void clear(); private: From d8d0e25a691ff2082f72bbe1599d8b2976f2cb20 Mon Sep 17 00:00:00 2001 From: "xiaolei.zl" Date: Thu, 25 Jan 2024 17:32:57 +0800 Subject: [PATCH 16/16] use remove put_raw_bytes --- flex/codegen/src/hqps_generator.h | 2 +- .../http_server/handler/hqps_http_handler.cc | 16 ++++++++++++++++ flex/utils/app_utils.cc | 6 ------ flex/utils/app_utils.h | 2 -- 4 files changed, 17 insertions(+), 9 deletions(-) diff --git a/flex/codegen/src/hqps_generator.h b/flex/codegen/src/hqps_generator.h index 7ca2686e53e3..347ebf90af95 100644 --- a/flex/codegen/src/hqps_generator.h +++ b/flex/codegen/src/hqps_generator.h @@ -72,7 +72,7 @@ static constexpr const char* QUERY_TEMPLATE_STR = " std::string res_str = res.SerializeAsString();\n" " // encode results to encoder\n" " if (!res_str.empty()){\n" - " encoder.put_raw_bytes(res_str.data(), res_str.size());\n" + " encoder.put_string_view(res_str);\n" " }\n" " return true;\n" " }\n" diff --git a/flex/engines/http_server/handler/hqps_http_handler.cc b/flex/engines/http_server/handler/hqps_http_handler.cc index 576548831366..9c1be8730ffd 100644 --- a/flex/engines/http_server/handler/hqps_http_handler.cc +++ b/flex/engines/http_server/handler/hqps_http_handler.cc @@ -89,6 +89,14 @@ seastar::future> hqps_ic_handler::handle( return executor_refs_[dst_executor] .run_graph_db_query(query_param{std::move(req->content)}) + .then([](auto&& output) { + if (output.content.size() < 4) { + LOG(ERROR) << "Invalid output size: " << output.content.size(); + return seastar::make_ready_future(std::move(output)); + } + return seastar::make_ready_future( + std::move(output.content.substr(4))); + }) .then_wrapped( [rep = std::move(rep)](seastar::future&& fut) mutable { if (__builtin_expect(fut.failed(), false)) { @@ -227,6 +235,14 @@ hqps_adhoc_query_handler::handle(const seastar::sstring& path, return executor_refs_[dst_executor].run_graph_db_query( query_param{std::move(param.content)}); }) + .then([](auto&& output) { + if (output.content.size() < 4) { + LOG(ERROR) << "Invalid output size: " << output.content.size(); + return seastar::make_ready_future(std::move(output)); + } + return seastar::make_ready_future( + std::move(output.content.substr(4))); + }) .then_wrapped( [rep = std::move(rep)](seastar::future&& fut) mutable { if (__builtin_expect(fut.failed(), false)) { diff --git a/flex/utils/app_utils.cc b/flex/utils/app_utils.cc index 43c069f198ec..d138080ef5ea 100644 --- a/flex/utils/app_utils.cc +++ b/flex/utils/app_utils.cc @@ -101,12 +101,6 @@ void Encoder::put_double(double v) { memcpy(&buf_[size], &v, sizeof(double)); } -void Encoder::put_raw_bytes(const char* ptr, size_t size) { - size_t old_size = buf_.size(); - buf_.resize(old_size + size); - memcpy(&buf_[old_size], ptr, size); -} - void Encoder::clear() { buf_.clear(); } static int64_t char_ptr_to_long(const char* data) { diff --git a/flex/utils/app_utils.h b/flex/utils/app_utils.h index 99de1ec48a3b..a8156376dc95 100644 --- a/flex/utils/app_utils.h +++ b/flex/utils/app_utils.h @@ -54,8 +54,6 @@ class Encoder { void put_double(double v); - void put_raw_bytes(const char* ptr, size_t size); - void clear(); private: