Skip to content

Commit

Permalink
Fix query hang when two same queries are submitted
Browse files Browse the repository at this point in the history
  • Loading branch information
zhanglei1949 committed Oct 21, 2023
1 parent 85aa561 commit c543ad5
Show file tree
Hide file tree
Showing 14 changed files with 333 additions and 154 deletions.
10 changes: 8 additions & 2 deletions flex/bin/sync_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,11 @@ int main(int argc, char** argv) {
"codegen binary path")(
"graph-config,g", bpo::value<std::string>(), "graph schema config file")(
"data-path,a", bpo::value<std::string>(), "data directory path")(
"bulk-load,l", bpo::value<std::string>(), "bulk-load config file");
"bulk-load,l", bpo::value<std::string>(), "bulk-load config file")(
"open-thread-resource-pool", bpo::value<bool>()->default_value(true),
"open thread resource pool")("worker-thread-number",
bpo::value<unsigned>()->default_value(2),
"worker thread number");

setenv("TZ", "Asia/Shanghai", 1);
tzset();
Expand Down Expand Up @@ -253,7 +257,9 @@ int main(int argc, char** argv) {

gs::init_codegen_proxy(vm, graph_schema_path, server_config_path);

server::HQPSService::get().init(shard_num, http_port, false);
server::HQPSService::get().init(shard_num, http_port, false,
vm["open-thread-resource-pool"].as<bool>(),
vm["worker-thread-number"].as<unsigned>());
server::HQPSService::get().run_and_wait_for_exit();

return 0;
Expand Down
8 changes: 8 additions & 0 deletions flex/engines/http_server/actor_system.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,18 @@ void actor_system::launch_worker() {
char gateway[] = "--gw-ipv4-addr=172.24.255.253";
char net_mask[] = "--netmask-ipv4-addr=255.255.240.0";
char enable_dpdk[] = "--dpdk-pmd";
char enable_thread_resource_pool[] = "--open-thread-resource-pool=true";
char external_thread_num[32];
char shards[16];
snprintf(shards, sizeof(shards), "-c%d", num_shards_);
snprintf(external_thread_num, sizeof(external_thread_num),
"--worker-thread-number=%d", external_thread_num_);

std::vector<char*> argv = {prog_name, shards};
if (enable_thread_resource_pool_) {
argv.push_back(enable_thread_resource_pool);
argv.push_back(external_thread_num);
}
if (enable_dpdk_) {
argv.push_back(enable_native_stack);
argv.push_back(close_dhcp);
Expand Down
11 changes: 9 additions & 2 deletions flex/engines/http_server/actor_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,13 @@ namespace server {

class actor_system {
public:
actor_system(uint32_t num_shards, bool enable_dpdk)
: num_shards_(num_shards), enable_dpdk_(enable_dpdk) {}
actor_system(uint32_t num_shards, bool enable_dpdk,
bool enable_thread_resource_pool = false,
unsigned external_thread_num = 1)
: num_shards_(num_shards),
enable_dpdk_(enable_dpdk),
enable_thread_resource_pool_(enable_thread_resource_pool),
external_thread_num_(external_thread_num) {}
~actor_system();

void launch();
Expand All @@ -39,6 +44,8 @@ class actor_system {
private:
const uint32_t num_shards_;
const bool enable_dpdk_;
const bool enable_thread_resource_pool_;
const unsigned external_thread_num_;
std::unique_ptr<std::thread> main_thread_;
std::atomic<bool> running_{false};
sem_t ready_;
Expand Down
89 changes: 89 additions & 0 deletions flex/engines/http_server/codegen_actor.act.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "flex/engines/http_server/codegen_actor.act.h"

#include "flex/engines/graph_db/database/graph_db.h"
#include "flex/engines/graph_db/database/graph_db_session.h"
#include "flex/engines/http_server/codegen_proxy.h"
#include "flex/engines/http_server/stored_procedure.h"

#include <seastar/core/print.hh>

namespace server {

codegen_actor::~codegen_actor() {
// finalization
// ...
}

codegen_actor::codegen_actor(hiactor::actor_base* exec_ctx,
const hiactor::byte_t* addr)
: hiactor::actor(exec_ctx, addr) {
set_max_concurrency(1); // set max concurrency for task reentrancy
// (stateful) initialization
// ...
}

seastar::future<adhoc_result> codegen_actor::do_codegen(query_param&& param) {
LOG(INFO) << "Running codegen for " << param.content.size();
// The received query's pay load shoud be able to deserialze to physical plan
auto& str = param.content;
if (str.size() <= 0) {
LOG(INFO) << "Empty query";
return seastar::make_exception_future<adhoc_result>(
std::runtime_error("Empty query string"));
}

const char* str_data = str.data();
size_t str_length = str.size();
LOG(INFO) << "Deserialize physical job request" << str_length;

physical::PhysicalPlan plan;
bool ret = plan.ParseFromArray(str_data, str_length);
if (ret) {
VLOG(10) << "Parse physical plan: " << plan.DebugString();
} else {
LOG(ERROR) << "Fail to parse physical plan";
return seastar::make_exception_future<adhoc_result>(
std::runtime_error("Fail to parse physical plan"));
}

// 0. do codegen gen.
std::string lib_path = "";
int32_t job_id = -1;
auto& codegen_proxy = server::CodegenProxy::get();
if (codegen_proxy.Initialized()) {
return codegen_proxy.DoGen(plan).then(
[](std::pair<int32_t, std::string>&& job_id_and_lib_path) {
if (job_id_and_lib_path.first == -1) {
return seastar::make_exception_future<adhoc_result>(
std::runtime_error("Fail to parse job id from codegen proxy"));
}
// 1. load and run.
LOG(INFO) << "Okay, try to run the query of lib path: "
<< job_id_and_lib_path.second
<< ", job id: " << job_id_and_lib_path.first
<< "local shard id: " << hiactor::local_shard_id();
return seastar::make_ready_future<adhoc_result>(
std::move(job_id_and_lib_path));
});
} else {
return seastar::make_exception_future<adhoc_result>(
std::runtime_error("Codegen proxy not initialized"));
}
}

} // namespace server
43 changes: 43 additions & 0 deletions flex/engines/http_server/codegen_actor.act.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef ENGINES_HTTP_SERVER_CODEGEN_ACTOR_ACT_H_
#define ENGINES_HTTP_SERVER_CODEGEN_ACTOR_ACT_H_

#include "flex/engines/http_server/types.h"

#include <hiactor/core/actor-template.hh>
#include <hiactor/util/data_type.hh>

namespace server {

class ANNOTATION(actor:impl) codegen_actor : public hiactor::actor {
public:
codegen_actor(hiactor::actor_base* exec_ctx, const hiactor::byte_t* addr);
~codegen_actor() override;

seastar::future<adhoc_result> ANNOTATION(actor:method) do_codegen(query_param&& param);

// DECLARE_RUN_QUERYS;
/// Declare `do_work` func here, no need to implement.
ACTOR_DO_WORK()

private:
int32_t your_private_members_ = 0;
};

} // namespace serverP

#endif // ENGINES_HTTP_SERVER_CODEGEN_ACTOR_ACT_H_
Loading

0 comments on commit c543ad5

Please sign in to comment.