Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: start apiserver in standalone mode #609

Merged
merged 8 commits into from
Nov 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion release/bin/start-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# limitations under the License.

set -e
export COMPONENTS="tablet nameserver"
export COMPONENTS="tablet nameserver apiserver"
BASEDIR="$(dirname "$( cd "$( dirname "$0" )" && pwd )")"
OS="$(uname -a | awk '{print $1}' | tr '[:upper:]' '[:lower:]')"
cd "$BASEDIR"
Expand Down
2 changes: 1 addition & 1 deletion release/bin/stop-all.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

export COMPONENTS="tablet nameserver"
export COMPONENTS="tablet nameserver apiserver"
BASEDIR="$(dirname "$( cd "$( dirname "$0" )" && pwd )")"
for COMPONENT in $COMPONENTS; do
PID_FILE="$BASEDIR/bin/$COMPONENT.pid"
Expand Down
5 changes: 3 additions & 2 deletions release/conf/apiserver.flags
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# apiserver.conf
--endpoint=127.0.0.1:8080
--role=apiserver
--zk_cluster=127.0.0.1:7181
--zk_root_path=/openmldb_cluster
--nameserver=127.0.0.1:6527
#--zk_cluster=127.0.0.1:7181
#--zk_root_path=/openmldb_cluster

--openmldb_log_dir=./logs
--log_file_count=24
Expand Down
2 changes: 2 additions & 0 deletions src/catalog/tablet_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ bool TabletTableHandler::Init(const ClientManager& client_manager) {

bool TabletTableHandler::UpdateIndex(
const ::google::protobuf::RepeatedPtrField<::openmldb::common::ColumnKey>& indexs) {
index_list_.Clear();
index_hint_.clear();
if (!SchemaAdapter::ConvertIndex(indexs, &index_list_)) {
LOG(WARNING) << "fail to conver index to sql index";
return false;
Expand Down
9 changes: 3 additions & 6 deletions src/client/ns_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,21 +236,18 @@ bool NsClient::AddTableField(const std::string& table_name, const ::openmldb::co
return false;
}

bool NsClient::CreateProcedure(const ::openmldb::api::ProcedureInfo& sp_info, uint64_t request_timeout,
std::string* msg) {
if (msg == nullptr) return false;
base::Status NsClient::CreateProcedure(const ::openmldb::api::ProcedureInfo& sp_info, uint64_t request_timeout) {
::openmldb::api::CreateProcedureRequest request;
::openmldb::nameserver::GeneralResponse response;
::openmldb::api::ProcedureInfo* sp_info_ptr = request.mutable_sp_info();
sp_info_ptr->CopyFrom(sp_info);
request.set_timeout_ms(request_timeout);
bool ok = client_.SendRequest(&::openmldb::nameserver::NameServer_Stub::CreateProcedure, &request, &response,
request_timeout, 1);
*msg = response.msg();
if (!ok || response.code() != 0) {
return false;
return base::Status(base::ReturnCode::kError, response.msg());
}
return true;
return {};
}

bool NsClient::CreateTable(const ::openmldb::nameserver::TableInfo& table_info, std::string& msg) {
Expand Down
2 changes: 1 addition & 1 deletion src/client/ns_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ class NsClient : public Client {
bool DropProcedure(const std::string& db_name, const std::string& sp_name,
std::string& msg); // NOLINT

bool CreateProcedure(const ::openmldb::api::ProcedureInfo& sp_info, uint64_t request_timeout, std::string* msg);
base::Status CreateProcedure(const ::openmldb::api::ProcedureInfo& sp_info, uint64_t request_timeout);

bool ShowProcedure(const std::string& db_name, const std::string& sp_name, std::vector<api::ProcedureInfo>* infos,
std::string* msg);
Expand Down
37 changes: 29 additions & 8 deletions src/cmd/openmldb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
#include "base/glog_wapper.h"
#include "base/hash.h"
#include "base/ip.h"
#include "base/kv_iterator.h"
#include "base/linenoise.h"
#include "base/kv_iterator.h"
#include "base/server_name.h"
#include "base/strings.h"
#if defined(__linux__) || defined(__mac_tablet__)
Expand Down Expand Up @@ -60,6 +60,7 @@ using Schema = ::google::protobuf::RepeatedPtrField<::openmldb::common::ColumnDe
using TabletClient = openmldb::client::TabletClient;

DECLARE_string(endpoint);
DECLARE_string(nameserver);
DECLARE_int32(port);
DECLARE_string(zk_cluster);
DECLARE_string(zk_root_path);
Expand Down Expand Up @@ -4824,14 +4825,34 @@ void StartAPIServer() {
}

auto api_service = std::make_unique<::openmldb::apiserver::APIServerImpl>();
::openmldb::sdk::ClusterOptions cluster_options;
cluster_options.zk_cluster = FLAGS_zk_cluster;
cluster_options.zk_path = FLAGS_zk_root_path;
if (!api_service->Init(cluster_options)) {
PDLOG(WARNING, "Fail to init");
exit(1);
if (!FLAGS_nameserver.empty()) {
std::vector<std::string> vec;
boost::split(vec, FLAGS_nameserver, boost::is_any_of(":"));
if (vec.size() != 2) {
PDLOG(WARNING, "Invalid nameserver format");
exit(1);
}
int32_t port = 0;
try {
port = boost::lexical_cast<uint32_t>(vec[1]);
} catch (std::exception const& e) {
PDLOG(WARNING, "Invalid nameserver format");
exit(1);
}
auto sdk = new ::openmldb::sdk::StandAloneSDK(vec[0], port);
if (!sdk->Init() || !api_service->Init(sdk)) {
PDLOG(WARNING, "Fail to init");
exit(1);
}
} else {
::openmldb::sdk::ClusterOptions cluster_options;
cluster_options.zk_cluster = FLAGS_zk_cluster;
cluster_options.zk_path = FLAGS_zk_root_path;
if (!api_service->Init(cluster_options)) {
PDLOG(WARNING, "Fail to init");
exit(1);
}
}

brpc::ServerOptions options;
options.num_threads = FLAGS_thread_pool_size;
brpc::Server server;
Expand Down
36 changes: 23 additions & 13 deletions src/cmd/sql_cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,7 @@ void HandleCmd(const hybridse::node::CmdPlanNode* cmd_node) {
std::cout << "ERROR: Please enter database first" << std::endl;
return;
}
// TODO: Should support table name with database name
// TODO(denglong): Should support table name with database name
auto table = cs->GetTableInfo(db, cmd_node->GetArgs()[0]);
if (table == nullptr) {
std::cerr << "table " << cmd_node->GetArgs()[0] << " does not exist" << std::endl;
Expand Down Expand Up @@ -1034,11 +1034,7 @@ base::Status HandleDeploy(const hybridse::node::DeployPlanNode* deploy_node) {
return {base::ReturnCode::kError, "table " + kv.first + " load data failed"};
}
}
std::string msg;
if (!ns->CreateProcedure(sp_info, FLAGS_request_timeout_ms, &msg)) {
return {base::ReturnCode::kError, msg};
}
return {};
return ns->CreateProcedure(sp_info, FLAGS_request_timeout_ms);
}

void SetVariable(const std::string& key, const hybridse::node::ConstNode* value) {
Expand Down Expand Up @@ -1267,19 +1263,33 @@ void HandleSQL(const std::string& sql) {
std::cout << info->GetPhysicalPlan() << std::endl;
return;
}
case hybridse::node::kPlanTypeCreate:
case hybridse::node::kPlanTypeCreate: {
if (db.empty()) {
std::cout << "ERROR: Please use database first" << std::endl;
return;
}
auto create_node = dynamic_cast<hybridse::node::CreatePlanNode*>(node);
auto status = sr->HandleSQLCreateTable(create_node, db, cs->GetNsClient());
if (status.OK()) {
sr->RefreshCatalog();
std::cout << "SUCCEED: Create successfully" << std::endl;
} else {
std::cout << "ERROR: " << status.msg << std::endl;
}
return;
}
case hybridse::node::kPlanTypeCreateSp: {
if (db.empty()) {
std::cout << "ERROR: Please use database first" << std::endl;
return;
}
::hybridse::sdk::Status status;
bool ok = sr->ExecuteDDL(db, sql, &status);
if (!ok) {
std::cout << "ERROR: Fail to execute ddl" << std::endl;
} else {
auto create_node = dynamic_cast<hybridse::node::CreateProcedurePlanNode*>(node);
auto status = sr->HandleSQLCreateProcedure(create_node, db, sql, cs->GetNsClient());
if (status.OK()) {
sr->RefreshCatalog();
std::cout << "SUCCEED: Create successfully" << std::endl;
} else {
std::cout << "ERROR: " << status.msg << std::endl;
}
return;
}
Expand All @@ -1293,7 +1303,7 @@ void HandleSQL(const std::string& sql) {
return;
}
case hybridse::node::kPlanTypeInsert: {
// TODO: Should support table name with database name
// TODO(denglong): Should support table name with database name
if (db.empty()) {
std::cout << "ERROR: Please use database first" << std::endl;
return;
Expand Down
35 changes: 29 additions & 6 deletions src/cmd/sql_cmd_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ DEFINE_string(cmd, "", "Set cmd");
DECLARE_string(host);
DECLARE_int32(port);

::openmldb::sdk::StandaloneEnv env;

namespace openmldb {
namespace cmd {

Expand Down Expand Up @@ -171,12 +173,6 @@ TEST_F(SqlCmdTest, select_into_outfile) {
}

TEST_F(SqlCmdTest, deploy) {
::openmldb::sdk::StandaloneEnv env;
env.SetUp();
FLAGS_host = "127.0.0.1";
FLAGS_port = env.GetNsPort();
StandAloneInit();

HandleSQL("create database test1;");
HandleSQL("use test1;");
std::string create_sql = "create table trans (c1 string, c3 int, c4 bigint, c5 float, c6 double, c7 timestamp, "
Expand All @@ -195,6 +191,28 @@ TEST_F(SqlCmdTest, deploy) {
hybridse::node::PlanNode *node = plan_trees[0];
auto status = HandleDeploy(dynamic_cast<hybridse::node::DeployPlanNode*>(node));
ASSERT_TRUE(status.OK());
std::string msg;
ASSERT_FALSE(cs->GetNsClient()->DropTable("test1", "trans", msg));
ASSERT_TRUE(cs->GetNsClient()->DropProcedure("test1", "demo", msg));
ASSERT_TRUE(cs->GetNsClient()->DropTable("test1", "trans", msg));
}

TEST_F(SqlCmdTest, create_without_index_col) {
HandleSQL("create database test2;");
HandleSQL("use test2;");
std::string create_sql = "create table trans (c1 string, c3 int, c4 bigint, c5 float, c6 double, c7 timestamp, "
"c8 date, index(ts=c7));";
hybridse::node::NodeManager node_manager;
hybridse::base::Status sql_status;
hybridse::node::PlanNodeList plan_trees;
hybridse::plan::PlanAPI::CreatePlanTreeFromScript(create_sql, plan_trees, &node_manager, sql_status);
ASSERT_EQ(0, sql_status.code);
hybridse::node::PlanNode *node = plan_trees[0];
auto status = sr->HandleSQLCreateTable(dynamic_cast<hybridse::node::CreatePlanNode*>(node),
"test2", cs->GetNsClient());
ASSERT_TRUE(status.OK());
std::string msg;
ASSERT_TRUE(cs->GetNsClient()->DropTable("test2", "trans", msg));
}

} // namespace cmd
Expand All @@ -210,6 +228,11 @@ int main(int argc, char** argv) {
int ok = ::openmldb::cmd::mc_->SetUp(1);
sleep(1);
srand(time(NULL));
env.SetUp();
FLAGS_host = "127.0.0.1";
FLAGS_port = env.GetNsPort();
::openmldb::cmd::StandAloneInit();

ok = RUN_ALL_TESTS();
::openmldb::cmd::mc_->Close();
return ok;
Expand Down
1 change: 1 addition & 0 deletions src/flags.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ DEFINE_uint32(tablet_offline_check_interval, 1000, "config the check interval of
DEFINE_string(zk_cluster, "", "config the zookeeper cluster eg ip:2181,ip2:2181,ip3:2181");
DEFINE_string(zk_root_path, "/openmldb", "config the root path of zookeeper");
DEFINE_string(tablet, "", "config the endpoint of tablet");
DEFINE_string(nameserver, "", "config the endpoint of nameserver");
DEFINE_int32(zk_keep_alive_check_interval, 15000, "config the interval of keep alive check");
DEFINE_string(host, "", "used in stand-alone mode, config the name server ip");
DEFINE_int32(port, 0, "used in stand-alone mode, config the name server port");
Expand Down
5 changes: 5 additions & 0 deletions src/sdk/db_sdk.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ class DBSDK {
// create engine, then build the catalog
// TODO(hw): should prevent double init
virtual bool Init() = 0;

virtual bool IsClusterMode() const = 0;
bool Refresh() { return BuildCatalog(); }

inline uint64_t GetClusterVersion() { return cluster_version_.load(std::memory_order_relaxed); }
Expand Down Expand Up @@ -124,6 +126,7 @@ class ClusterSDK : public DBSDK {

~ClusterSDK() override;
bool Init() override;
bool IsClusterMode() const override { return true; }

protected:
bool BuildCatalog() override;
Expand Down Expand Up @@ -153,6 +156,8 @@ class StandAloneSDK : public DBSDK {
~StandAloneSDK() override { pool_.Stop(false); }
bool Init() override;

bool IsClusterMode() const override { return false; }

protected:
// Before connecting to ns, we only have the host&port
// NOTICE: when we call this method, we do not have the correct ns client, do not GetNsClient.
Expand Down
49 changes: 32 additions & 17 deletions src/sdk/node_adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ namespace sdk {

using hybridse::plan::PlanAPI;

bool NodeAdapter::TransformToTableDef(::hybridse::node::CreatePlanNode* create_node,
bool NodeAdapter::TransformToTableDef(::hybridse::node::CreatePlanNode* create_node, bool allow_empty_col_index,
::openmldb::nameserver::TableInfo* table, hybridse::base::Status* status) {
if (create_node == NULL || table == NULL || status == NULL) return false;
std::string table_name = create_node->GetTableName();
Expand All @@ -57,6 +57,7 @@ bool NodeAdapter::TransformToTableDef(::hybridse::node::CreatePlanNode* create_n
table->set_partition_num(create_node->GetPartitionNum());
table->set_format_version(1);
int no_ts_cnt = 0;
bool has_generate_index = false;
for (auto column_desc : column_desc_list) {
switch (column_desc->GetType()) {
case hybridse::node::kColumnDesc: {
Expand Down Expand Up @@ -110,13 +111,6 @@ bool NodeAdapter::TransformToTableDef(::hybridse::node::CreatePlanNode* create_n

case hybridse::node::kColumnIndex: {
auto* column_index = dynamic_cast<hybridse::node::ColumnIndexNode*>(column_desc);

if (column_index->GetKey().empty()) {
status->msg = "CREATE common: INDEX KEY empty";
status->code = hybridse::common::kUnsupportSql;
return false;
}

std::string index_name = column_index->GetName();
// index in `create table` won't set name
DCHECK(index_name.empty());
Expand All @@ -126,11 +120,31 @@ bool NodeAdapter::TransformToTableDef(::hybridse::node::CreatePlanNode* create_n
status->code = hybridse::common::kUnsupportSql;
return false;
}

::openmldb::common::ColumnKey* index = table->add_column_key();
if (column_index->GetKey().empty()) {
if (allow_empty_col_index && !has_generate_index && !column_index->GetTs().empty()) {
const auto& ts_name = column_index->GetTs();
for (const auto& kv : column_names) {
if (kv.first != ts_name && kv.second->data_type() != openmldb::type::DataType::kFloat &&
kv.second->data_type() != openmldb::type::DataType::kDouble) {
index->add_col_name(kv.first);
has_generate_index = true;
break;
}
}
if (!has_generate_index) {
status->msg = "CREATE common: can not found index col";
status->code = hybridse::common::kUnsupportSql;
return false;
}
} else {
status->msg = "CREATE common: INDEX KEY empty";
status->code = hybridse::common::kUnsupportSql;
return false;
}
}
index_names.insert(index_name);
column_index->SetName(index_name);

::openmldb::common::ColumnKey* index = table->add_column_key();
if (!TransformToColumnKey(column_index, column_names, index, status)) {
return false;
}
Expand Down Expand Up @@ -213,17 +227,18 @@ bool NodeAdapter::TransformToColumnKey(hybridse::node::ColumnIndexNode* column_i
index->set_index_name(column_index->GetName());

for (const auto& key : column_index->GetKey()) {
// if no column_names, skip check
if (!column_names.empty()) {
if (column_names.find(key) == column_names.end()) {
status->msg = "column " + key + " does not exist";
index->add_col_name(key);
}
// if no column_names, skip check
if (!column_names.empty()) {
for (const auto& col : index->col_name()) {
if (column_names.find(col) == column_names.end()) {
status->msg = "column " + col + " does not exist";
status->code = hybridse::common::kUnsupportSql;
return false;
}
}
index->add_col_name(key);
}

::openmldb::common::TTLSt* ttl_st = index->mutable_ttl();
if (!column_index->ttl_type().empty()) {
std::string ttl_type = column_index->ttl_type();
Expand Down
Loading