Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(interactive): Reuse compiled native lib for same physical plan #3301

Merged
merged 9 commits into from
Oct 25, 2023
10 changes: 8 additions & 2 deletions flex/bin/sync_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,11 @@ int main(int argc, char** argv) {
"codegen binary path")(
"graph-config,g", bpo::value<std::string>(), "graph schema config file")(
"data-path,a", bpo::value<std::string>(), "data directory path")(
"bulk-load,l", bpo::value<std::string>(), "bulk-load config file");
"bulk-load,l", bpo::value<std::string>(), "bulk-load config file")(
"open-thread-resource-pool", bpo::value<bool>()->default_value(true),
"open thread resource pool")("worker-thread-number",
bpo::value<unsigned>()->default_value(2),
"worker thread number");

setenv("TZ", "Asia/Shanghai", 1);
tzset();
Expand Down Expand Up @@ -253,7 +257,9 @@ int main(int argc, char** argv) {

gs::init_codegen_proxy(vm, graph_schema_path, server_config_path);

server::HQPSService::get().init(shard_num, http_port, false);
server::HQPSService::get().init(shard_num, http_port, false,
vm["open-thread-resource-pool"].as<bool>(),
vm["worker-thread-number"].as<unsigned>());
server::HQPSService::get().run_and_wait_for_exit();

return 0;
Expand Down
8 changes: 8 additions & 0 deletions flex/engines/http_server/actor_system.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,18 @@ void actor_system::launch_worker() {
char gateway[] = "--gw-ipv4-addr=172.24.255.253";
char net_mask[] = "--netmask-ipv4-addr=255.255.240.0";
char enable_dpdk[] = "--dpdk-pmd";
char enable_thread_resource_pool[] = "--open-thread-resource-pool=true";
char external_thread_num[32];
char shards[16];
snprintf(shards, sizeof(shards), "-c%d", num_shards_);
snprintf(external_thread_num, sizeof(external_thread_num),
"--worker-thread-number=%d", external_thread_num_);

std::vector<char*> argv = {prog_name, shards};
if (enable_thread_resource_pool_) {
argv.push_back(enable_thread_resource_pool);
argv.push_back(external_thread_num);
}
if (enable_dpdk_) {
argv.push_back(enable_native_stack);
argv.push_back(close_dhcp);
Expand Down
11 changes: 9 additions & 2 deletions flex/engines/http_server/actor_system.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,13 @@ namespace server {

class actor_system {
public:
actor_system(uint32_t num_shards, bool enable_dpdk)
: num_shards_(num_shards), enable_dpdk_(enable_dpdk) {}
actor_system(uint32_t num_shards, bool enable_dpdk,
bool enable_thread_resource_pool = false,
unsigned external_thread_num = 1)
: num_shards_(num_shards),
enable_dpdk_(enable_dpdk),
enable_thread_resource_pool_(enable_thread_resource_pool),
external_thread_num_(external_thread_num) {}
~actor_system();

void launch();
Expand All @@ -39,6 +44,8 @@ class actor_system {
private:
const uint32_t num_shards_;
const bool enable_dpdk_;
const bool enable_thread_resource_pool_;
const unsigned external_thread_num_;
std::unique_ptr<std::thread> main_thread_;
std::atomic<bool> running_{false};
sem_t ready_;
Expand Down
89 changes: 89 additions & 0 deletions flex/engines/http_server/codegen_actor.act.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "flex/engines/http_server/codegen_actor.act.h"

#include "flex/engines/graph_db/database/graph_db.h"
#include "flex/engines/graph_db/database/graph_db_session.h"
#include "flex/engines/http_server/codegen_proxy.h"
#include "flex/engines/http_server/stored_procedure.h"

#include <seastar/core/print.hh>

namespace server {

codegen_actor::~codegen_actor() {
// finalization
// ...
}

codegen_actor::codegen_actor(hiactor::actor_base* exec_ctx,
const hiactor::byte_t* addr)
: hiactor::actor(exec_ctx, addr) {
set_max_concurrency(1); // set max concurrency for task reentrancy
// (stateful) initialization
// ...
}

seastar::future<adhoc_result> codegen_actor::do_codegen(query_param&& param) {
LOG(INFO) << "Running codegen for " << param.content.size();
// The received query's pay load shoud be able to deserialze to physical plan
auto& str = param.content;
if (str.size() <= 0) {
LOG(INFO) << "Empty query";
return seastar::make_exception_future<adhoc_result>(
std::runtime_error("Empty query string"));
}

const char* str_data = str.data();
size_t str_length = str.size();
LOG(INFO) << "Deserialize physical job request" << str_length;

physical::PhysicalPlan plan;
bool ret = plan.ParseFromArray(str_data, str_length);
if (ret) {
VLOG(10) << "Parse physical plan: " << plan.DebugString();
} else {
LOG(ERROR) << "Fail to parse physical plan";
return seastar::make_exception_future<adhoc_result>(
std::runtime_error("Fail to parse physical plan"));
}

// 0. do codegen gen.
std::string lib_path = "";
int32_t job_id = -1;
auto& codegen_proxy = server::CodegenProxy::get();
if (codegen_proxy.Initialized()) {
return codegen_proxy.DoGen(plan).then(
[](std::pair<int32_t, std::string>&& job_id_and_lib_path) {
if (job_id_and_lib_path.first == -1) {
return seastar::make_exception_future<adhoc_result>(
std::runtime_error("Fail to parse job id from codegen proxy"));
}
// 1. load and run.
LOG(INFO) << "Okay, try to run the query of lib path: "
<< job_id_and_lib_path.second
<< ", job id: " << job_id_and_lib_path.first
<< "local shard id: " << hiactor::local_shard_id();
return seastar::make_ready_future<adhoc_result>(
std::move(job_id_and_lib_path));
});
} else {
return seastar::make_exception_future<adhoc_result>(
std::runtime_error("Codegen proxy not initialized"));
}
}

} // namespace server
43 changes: 43 additions & 0 deletions flex/engines/http_server/codegen_actor.act.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/** Copyright 2020 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef ENGINES_HTTP_SERVER_CODEGEN_ACTOR_ACT_H_
#define ENGINES_HTTP_SERVER_CODEGEN_ACTOR_ACT_H_

#include "flex/engines/http_server/types.h"

#include <hiactor/core/actor-template.hh>
#include <hiactor/util/data_type.hh>

namespace server {

class ANNOTATION(actor:impl) codegen_actor : public hiactor::actor {
public:
codegen_actor(hiactor::actor_base* exec_ctx, const hiactor::byte_t* addr);
~codegen_actor() override;

seastar::future<adhoc_result> ANNOTATION(actor:method) do_codegen(query_param&& param);

// DECLARE_RUN_QUERYS;
/// Declare `do_work` func here, no need to implement.
ACTOR_DO_WORK()

private:
int32_t your_private_members_ = 0;
};

} // namespace serverP

#endif // ENGINES_HTTP_SERVER_CODEGEN_ACTOR_ACT_H_
Loading
Loading