diff --git a/cases/plan/cmd.yaml b/cases/plan/cmd.yaml index bcffc51507c..8f215bacddc 100644 --- a/cases/plan/cmd.yaml +++ b/cases/plan/cmd.yaml @@ -754,3 +754,11 @@ cases: +-node[CMD] +-cmd_type: show create table +-args: [db1, t1] + + - id: callstmt + sql: call sp(1, cast("ab" as string)) + expect: + plan_tree_str: | + +-[kPlanTypeCallStmt] + +-procedure_name: [sp] + +-arguments: 1, string(ab) diff --git a/cases/query/fail_query.yaml b/cases/query/fail_query.yaml index 415fa203127..2fc90ba4058 100644 --- a/cases/query/fail_query.yaml +++ b/cases/query/fail_query.yaml @@ -70,3 +70,53 @@ cases: expect: success: false msg: unsupport join type RightJoin + - id: 4 + mode: batch-unsupport + inputs: + - name: t1 + columns: ["c1 string","c2 int","c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",20,1000] + - ["bb",30,1000] + sql: | + SELECT * from t1 + CONFIG (execute_mode = 'request', values = []) + expect: + success: false + msg: | + FAILED_PRECONDITION: element number of the request values must not empty + - id: 5 + mode: batch-unsupport + inputs: + - name: t1 + columns: ["c1 string","c2 int","c4 timestamp"] + indexs: ["index1:c1:c4"] + rows: + - ["aa",20,1000] + - ["bb",30,1000] + sql: | + SELECT * from t1 + CONFIG (execute_mode = 'request', values = ("c1")) + expect: + success: false + msg: | + INTERNAL: pass in expr number do not match, expect 3 but got 1: (c1) + + # TODO: open this case after #3847 + # - id: 6 + # mode: batch-unsupport + # inputs: + # - name: t1 + # columns: ["c1 string","c2 int","c4 date"] + # indexs: ["index1:c1"] + # rows: + # - ["aa",20,1000] + # - ["bb",30,1000] + # sql: | + # SELECT * from t1 + # CONFIG (execute_mode = 'request', values = ("c1", 19, 12) ) + # expect: + # success: false + # msg: | + # xx diff --git a/cases/query/simple_query.yaml b/cases/query/simple_query.yaml index ea110b2be66..8a4c5294f33 100644 --- a/cases/query/simple_query.yaml +++ b/cases/query/simple_query.yaml @@ -985,3 +985,29 @@ cases: data: | 1, 10, 1000 2, 30, 2000 + + - id: 108 + mode: request-unsupport + inputs: + # t1 as request table, only one request row concerned + # from SQL config options + - name: t1 + columns: ["id int32", "b int32", "ts int64"] + indexs: ["idx2:b:ts"] + data: | + 1, 10, 1000 + - name: t2 + columns: ["id int32", "b int32", "ts int64"] + indexs: ["idx2:b:ts"] + data: | + 3, 10, 1000 + 4, 30, 2000 + sql: | + select t1.*, t2.id as ix from t1 + last join t2 on t1.b = t2.b + config (execute_mode = 'request', values = (5, 10, timestamp(4000))) + expect: + columns: ["id int32", "b int32", "ts int64", "ix int32"] + order: id + data: | + 5, 10, 4000, 3 diff --git a/hybridse/examples/toydb/src/cmd/toydb_run_engine.cc b/hybridse/examples/toydb/src/cmd/toydb_run_engine.cc index a3da3076968..f0c80177356 100644 --- a/hybridse/examples/toydb/src/cmd/toydb_run_engine.cc +++ b/hybridse/examples/toydb/src/cmd/toydb_run_engine.cc @@ -14,9 +14,10 @@ * limitations under the License. */ -#include "absl/strings/match.h" #include "testing/toydb_engine_test_base.h" +#include "gflags/gflags.h" + DEFINE_string(yaml_path, "", "Yaml filepath to load cases from"); DEFINE_string(runner_mode, "batch", "Specify runner mode, can be batch or request"); @@ -82,14 +83,15 @@ int RunSingle(const std::string& yaml_path) { if (!FLAGS_case_id.empty() && FLAGS_case_id != sql_case.id()) { continue; } - EngineMode mode; + EngineMode default_mode; if (absl::EqualsIgnoreCase(FLAGS_runner_mode, "batch")) { - mode = kBatchMode; + default_mode = kBatchMode; } else if (absl::EqualsIgnoreCase(FLAGS_runner_mode, "request")) { - mode = kRequestMode; + default_mode = kRequestMode; } else { - mode = kBatchRequestMode; + default_mode = kBatchRequestMode; } + auto mode = Engine::TryDetermineEngineMode(sql_case.sql_str_, default_mode); int ret = DoRunEngine(sql_case, options, mode); if (ret != ENGINE_TEST_RET_SUCCESS) { return ret; diff --git a/hybridse/examples/toydb/src/testing/toydb_engine_test_base.cc b/hybridse/examples/toydb/src/testing/toydb_engine_test_base.cc index 35a595b431e..b0834aeac22 100644 --- a/hybridse/examples/toydb/src/testing/toydb_engine_test_base.cc +++ b/hybridse/examples/toydb/src/testing/toydb_engine_test_base.cc @@ -194,6 +194,7 @@ void BatchRequestEngineCheck(const SqlCase& sql_case, void EngineCheck(const SqlCase& sql_case, const EngineOptions& options, EngineMode engine_mode) { + engine_mode = hybridse::vm::Engine::TryDetermineEngineMode(sql_case.sql_str(), engine_mode); if (engine_mode == kBatchMode) { ToydbBatchEngineTestRunner engine_test(sql_case, options); engine_test.RunCheck(); diff --git a/hybridse/include/node/node_enum.h b/hybridse/include/node/node_enum.h index ad8f03bf422..dbf7a2bb847 100644 --- a/hybridse/include/node/node_enum.h +++ b/hybridse/include/node/node_enum.h @@ -98,6 +98,7 @@ enum SqlNodeType { kColumnSchema, kCreateUserStmt, kAlterUserStmt, + kCallStmt, kSqlNodeTypeLast, // debug type }; @@ -143,8 +144,9 @@ enum ExprType { kExprIn, kExprEscaped, kExprArray, - kExprArrayElement, // extract value from a array or map, with `[]` operator - kExprFake, // not a real one + kExprArrayElement, // extract value from a array or map, with `[]` operator + kExprStructCtorParens, // (expr1, expr2, ...) + kExprFake, // not a real one kExprLast = kExprFake, }; @@ -344,6 +346,7 @@ enum PlanType { kPlanTypeShow, kPlanTypeCreateUser, kPlanTypeAlterUser, + kPlanTypeCallStmt, kUnknowPlan = -1, }; diff --git a/hybridse/include/node/plan_node.h b/hybridse/include/node/plan_node.h index c4fcf3beadb..ec82b6a586f 100644 --- a/hybridse/include/node/plan_node.h +++ b/hybridse/include/node/plan_node.h @@ -809,6 +809,22 @@ class AlterTableStmtPlanNode : public LeafPlanNode { std::vector actions_; }; +class CallStmtPlan : public LeafPlanNode { + public: + CallStmtPlan(const std::vector names, const std::vector args) + : LeafPlanNode(kPlanTypeCallStmt), procedure_name_(names), arguments_(args) {} + ~CallStmtPlan() override {} + + const std::vector &procedure_name() const { return procedure_name_; } + const std::vector &arguments() const { return arguments_; } + + void Print(std::ostream &output, const std::string &org_tab) const override; + + private: + const std::vector procedure_name_; + const std::vector arguments_; +}; + bool PlanEquals(const PlanNode *left, const PlanNode *right); bool PlanListEquals(const std::vector &list1, const std::vector &list2); void PrintPlanVector(std::ostream &output, const std::string &tab, PlanNodeList vec, const std::string vector_name, diff --git a/hybridse/include/node/sql_node.h b/hybridse/include/node/sql_node.h index b21b68bb49b..25cb334d438 100644 --- a/hybridse/include/node/sql_node.h +++ b/hybridse/include/node/sql_node.h @@ -45,10 +45,10 @@ class LlvmUdfGenBase; namespace hybridse { namespace node { -class ConstNode; +class ExprNode; class WithClauseEntry; -typedef std::unordered_map OptionsMap; +typedef std::unordered_map OptionsMap; // Global methods std::string NameOfSqlNodeType(const SqlNodeType &type); @@ -2578,6 +2578,7 @@ class FnReturnStmt : public FnNode { ExprNode *return_expr_; }; +// DEPRECATED! class StructExpr : public ExprNode { public: explicit StructExpr(const std::string &name) : ExprNode(kExprStruct), class_name_(name) {} @@ -2598,6 +2599,26 @@ class StructExpr : public ExprNode { FnNodeList *methods_; }; +// (expr1, expr2, ...) +class StructCtorWithParens : public ExprNode { + public: + explicit StructCtorWithParens(absl::Span fields) + : ExprNode(kExprStructCtorParens) { + for (auto e : fields) { + AddChild(e); + } + } + ~StructCtorWithParens() override {} + + absl::Span fields() const { return children_; } + + // LOW priority + // void Print(std::ostream &output, const std::string &org_tab) const override; + const std::string GetExprString() const override; + StructCtorWithParens *ShadowCopy(NodeManager *nm) const override; + Status InferAttr(ExprAnalysisContext *ctx) override; +}; + class ExternalFnDefNode : public FnDefNode { public: ExternalFnDefNode(const std::string &name, void *fn_ptr, const node::TypeNode *ret_type, bool ret_nullable, @@ -3005,6 +3026,20 @@ class InputParameterNode : public SqlNode { bool is_constant_; }; +class CallStmt : public SqlNode { + public: + CallStmt(const std::vector names, const std::vector args) + : SqlNode(kCallStmt, 0, 0), procedure_name_(names), arguments_(args) {} + ~CallStmt() override {} + + const std::vector &procedure_name() const { return procedure_name_; } + const std::vector &arguments() const { return arguments_; } + + private: + const std::vector procedure_name_; + const std::vector arguments_; +}; + std::string ExprString(const ExprNode *expr); std::string MakeExprWithTable(const ExprNode *expr, const std::string db); const bool IsNullPrimary(const ExprNode *expr); diff --git a/hybridse/include/plan/plan_api.h b/hybridse/include/plan/plan_api.h index 371d0dd32fa..7007396e10b 100644 --- a/hybridse/include/plan/plan_api.h +++ b/hybridse/include/plan/plan_api.h @@ -16,6 +16,7 @@ #ifndef HYBRIDSE_INCLUDE_PLAN_PLAN_API_H_ #define HYBRIDSE_INCLUDE_PLAN_PLAN_API_H_ +#include #include #include @@ -49,6 +50,8 @@ class PlanAPI { static const std::string GenerateName(const std::string prefix, int id); }; +absl::Status ParseStatement(absl::string_view, std::unique_ptr*); + // Parse the input str and SQL type and convert to TypeNode representation // // unimplemnted, reserved for later usage diff --git a/hybridse/include/vm/engine.h b/hybridse/include/vm/engine.h index 6829de40040..09586a8b03d 100644 --- a/hybridse/include/vm/engine.h +++ b/hybridse/include/vm/engine.h @@ -20,9 +20,10 @@ #include #include #include +#include #include #include -#include + #include "base/spin_lock.h" #include "codec/fe_row_codec.h" #include "vm/catalog.h" @@ -36,6 +37,7 @@ using ::hybridse::codec::Row; inline constexpr const char* LONG_WINDOWS = "long_windows"; +class SqlContext; class Engine; /// \brief An options class for controlling engine behaviour. class EngineOptions { @@ -148,7 +150,7 @@ class RunSession { } /// Return query related compile information. - virtual std::shared_ptr GetCompileInfo() { + virtual std::shared_ptr GetCompileInfo() const { return compile_info_; } @@ -352,6 +354,8 @@ class Engine { /// \brief Create an Engine with a specific Catalog object. explicit Engine(const std::shared_ptr& cl); + ~Engine(); + /// \brief Create an Engine a specific Catalog object, configuring it with EngineOptions Engine(const std::shared_ptr& cl, const EngineOptions& options); @@ -360,7 +364,9 @@ class Engine { static void InitializeUnsafeRowOptFlag(bool isUnsafeRowOpt); - ~Engine(); + /// determine engine mode for `sql`, `sql` may contains option defining + /// execute_mode, `default_mode` used if not or error. + static EngineMode TryDetermineEngineMode(absl::string_view sql, EngineMode default_mode); /// \brief Compile sql in db and stored the results in the session bool Get(const std::string& sql, const std::string& db, @@ -418,6 +424,13 @@ class Engine { EngineOptions GetEngineOptions(); private: + /// extract request rows info in SQL. + /// A SQL e.g 'SELECT ... FROM t1 options (execute_mode = "request", values = ...)' + /// request row info exists in 'values' option, as a format of: + /// 1. [(col1_expr, col2_expr, ... ), (...), ...] + /// 2. (col1_expr, col2_expr, ... ) + static absl::Status ExtractRequestRowsInSQL(SqlContext* ctx); + std::shared_ptr GetCacheLocked(const std::string& db, const std::string& sql, EngineMode engine_mode); diff --git a/hybridse/include/vm/engine_context.h b/hybridse/include/vm/engine_context.h index 4e197a5659a..98514ceb7e9 100644 --- a/hybridse/include/vm/engine_context.h +++ b/hybridse/include/vm/engine_context.h @@ -24,8 +24,9 @@ namespace hybridse { namespace vm { -enum EngineMode { kBatchMode, kRequestMode, kMockRequestMode, kBatchRequestMode }; +enum EngineMode { kBatchMode, kRequestMode, kMockRequestMode, kBatchRequestMode, kOffline }; std::string EngineModeName(EngineMode mode); +absl::StatusOr UnparseEngineMode(absl::string_view); struct BatchRequestInfo { // common column indices in batch request mode diff --git a/hybridse/include/vm/physical_op.h b/hybridse/include/vm/physical_op.h index eaea21050f2..3122b5ea1ca 100644 --- a/hybridse/include/vm/physical_op.h +++ b/hybridse/include/vm/physical_op.h @@ -1835,14 +1835,14 @@ class PhysicalSelectIntoNode : public PhysicalUnaryNode { return nullptr; } auto it = options_->find(option); - return it == options_->end() ? nullptr : it->second; + return it == options_->end() ? nullptr : it->second->GetAsOrNull(); } const hybridse::node::ConstNode *GetConfigOption(const std::string &option) const { if (!config_options_) { return nullptr; } auto it = config_options_->find(option); - return it == config_options_->end() ? nullptr : it->second; + return it == config_options_->end() ? nullptr : it->second->GetAsOrNull(); } std::string query_str_, out_file_; @@ -1878,7 +1878,7 @@ class PhysicalLoadDataNode : public PhysicalOpNode { return nullptr; } auto it = options_->find(option); - return it == options_->end() ? nullptr : it->second; + return it == options_->end() ? nullptr : it->second->GetAsOrNull(); } std::string file_; diff --git a/hybridse/include/vm/sql_ctx.h b/hybridse/include/vm/sql_ctx.h index 25182b86647..bcdfebd865d 100644 --- a/hybridse/include/vm/sql_ctx.h +++ b/hybridse/include/vm/sql_ctx.h @@ -20,6 +20,7 @@ #include #include #include +#include #include "node/node_manager.h" #include "vm/engine_context.h" @@ -36,7 +37,7 @@ class ClusterJob; struct SqlContext { // mode: batch|request|batch request - ::hybridse::vm::EngineMode engine_mode; + ::hybridse::vm::EngineMode engine_mode = EngineMode::kBatchMode; bool is_cluster_optimized = false; bool is_batch_request_optimized = false; bool enable_expr_optimize = false; @@ -73,6 +74,16 @@ struct SqlContext { ::hybridse::node::NodeManager nm; ::hybridse::udf::UdfLibrary* udf_library = nullptr; + // ref to request row expressions defined in SQL `options (execute_mode = 'request', values = ... )`. + // those expressions are used to construct request row in needed. + // can be: + // 1. Array [ StructCtorWithParens ] + // 2. StructCtorWithParens + const node::ExprNode* request_expressions = nullptr; + // compiled request rows from SQL CONFIG clause, `values` option + // request_rows get fullfilled if engine mode is kRequestMode or kBatchRequestMode + std::vector request_rows; + ::hybridse::vm::BatchRequestInfo batch_request_info; std::shared_ptr> options; diff --git a/hybridse/src/codegen/insert_row_builder.cc b/hybridse/src/codegen/insert_row_builder.cc index a4dd41eb4aa..a01dfae7c12 100644 --- a/hybridse/src/codegen/insert_row_builder.cc +++ b/hybridse/src/codegen/insert_row_builder.cc @@ -25,6 +25,7 @@ #include "absl/cleanup/cleanup.h" #include "absl/status/status.h" #include "absl/strings/str_join.h" +#include "absl/strings/substitute.h" #include "base/fe_status.h" #include "codegen/buf_ir_builder.h" #include "codegen/context.h" @@ -57,6 +58,10 @@ absl::StatusOr> InsertRowBuilder::ComputeRow(absl::Span< } absl::StatusOr InsertRowBuilder::ComputeRowUnsafe(absl::Span values) { + if (schema_->size() != values.size()) { + return absl::FailedPreconditionError( + absl::Substitute("invalid expression number, expect $0, but got $1", schema_->size(), values.size())); + } absl::Cleanup clean = [&]() { fn_counter_++; }; DLOG(INFO) << absl::StrJoin(values, ", ", [](std::string* str, const node::ExprNode* const expr) { diff --git a/hybridse/src/node/expr_node.cc b/hybridse/src/node/expr_node.cc index bfc46b2867e..75213ed4203 100644 --- a/hybridse/src/node/expr_node.cc +++ b/hybridse/src/node/expr_node.cc @@ -1216,5 +1216,21 @@ Status ArrayElementExpr::InferAttr(ExprAnalysisContext* ctx) { } ExprNode *ArrayElementExpr::array() const { return GetChild(0); } ExprNode *ArrayElementExpr::position() const { return GetChild(1); } + +StructCtorWithParens* StructCtorWithParens::ShadowCopy(NodeManager* nm) const { + return nm->MakeNode(fields()); +} +const std::string StructCtorWithParens::GetExprString() const { + return absl::StrCat( + "(", + absl::StrJoin(fields(), ", ", + [](std::string* out, const ExprNode* e) { absl::StrAppend(out, e->GetExprString()); }), + ")"); +} + +Status StructCtorWithParens::InferAttr(ExprAnalysisContext* ctx) { + // TODO + return {}; +} } // namespace node } // namespace hybridse diff --git a/hybridse/src/node/plan_node.cc b/hybridse/src/node/plan_node.cc index c829ab880e5..2f871e4db59 100644 --- a/hybridse/src/node/plan_node.cc +++ b/hybridse/src/node/plan_node.cc @@ -19,6 +19,7 @@ #include #include "absl/algorithm/container.h" +#include "absl/strings/str_join.h" namespace hybridse { namespace node { @@ -228,6 +229,8 @@ std::string NameOfPlanNodeType(const PlanType &type) { return "kPlanTypeCreateUser"; case kPlanTypeAlterUser: return "kPlanTypeAlterUser"; + case kPlanTypeCallStmt: + return "kPlanTypeCallStmt"; case kUnknowPlan: return std::string("kUnknow"); } @@ -867,5 +870,16 @@ void AlterTableStmtPlanNode::Print(std::ostream &output, const std::string &org_ } } +void CallStmtPlan::Print(std::ostream &output, const std::string &org_tab) const { + LeafPlanNode::Print(output, org_tab); + output << "\n"; + auto tab = org_tab + INDENT; + PrintValue(output, tab, procedure_name_, "procedure_name", false); + output << "\n"; + PrintValue(output, tab, + absl::StrJoin(arguments_, ", ", + [](std::string *out, const ExprNode *n) { absl::StrAppend(out, n->GetExprString()); }), + "arguments", true); +} } // namespace node } // namespace hybridse diff --git a/hybridse/src/node/sql_node.cc b/hybridse/src/node/sql_node.cc index 4e35793e1b4..80fc868752b 100644 --- a/hybridse/src/node/sql_node.cc +++ b/hybridse/src/node/sql_node.cc @@ -145,9 +145,10 @@ static absl::flat_hash_map CreateExprTypeNamesMap() {kExprEscaped, "escape"}, {kExprArray, "array"}, {kExprArrayElement, "array element"}, + {kExprStructCtorParens, "struct with parens"}, }; for (auto kind = 0; kind < ExprType::kExprLast; ++kind) { - DCHECK(map.find(static_cast(kind)) != map.end()); + DCHECK(map.find(static_cast(kind)) != map.end()); } return map; } @@ -1191,6 +1192,7 @@ static absl::flat_hash_map CreateSqlNodeTypeToNa {kWithClauseEntry, "kWithClauseEntry"}, {kAlterTableStmt, "kAlterTableStmt"}, {kColumnSchema, "kColumnSchema"}, + {kCallStmt, "kCallStmt"}, }; for (auto kind = 0; kind < SqlNodeType::kSqlNodeTypeLast; ++kind) { DCHECK(map.find(static_cast(kind)) != map.end()) @@ -2763,7 +2765,7 @@ std::string SetOptionsAction::DebugString() const { } absl::StrAppend(&output, kv.first); absl::StrAppend(&output, "="); - absl::StrAppend(&output, kv.second->GetAsString()); + absl::StrAppend(&output, kv.second->GetExprString()); } return absl::Substitute("SetOptionsAction ($0)", output); } diff --git a/hybridse/src/plan/planner.cc b/hybridse/src/plan/planner.cc index 8dd57b6f9e5..b2a57b4128c 100644 --- a/hybridse/src/plan/planner.cc +++ b/hybridse/src/plan/planner.cc @@ -39,6 +39,8 @@ inline bool IsCurRowRelativeWinFun(absl::string_view fn_name) { absl::EqualsIgnoreCase("lead", fn_name); } +base::Status ConvertCall(const node::CallStmt* call, node::NodeManager* nm, node::CallStmtPlan** out); + Planner::Planner(node::NodeManager *manager, const bool is_batch_mode, const bool is_cluster_optimized, const bool enable_batch_window_parallelization, const std::unordered_map* extra_options) @@ -774,8 +776,6 @@ base::Status SimplePlanner::CreatePlanTree(const NodePointVector &parser_trees, break; } case ::hybridse::node::kSetStmt: { - CHECK_TRUE(is_batch_mode_, common::kPlanError, - "Non-support SET Op in online serving"); node::PlanNode *set_plan_node = nullptr; CHECK_STATUS(CreateSetPlanNode(dynamic_cast(parser_tree), &set_plan_node)); plan_trees.push_back(set_plan_node); @@ -805,9 +805,19 @@ base::Status SimplePlanner::CreatePlanTree(const NodePointVector &parser_trees, case ::hybridse::node::kAlterTableStmt: { node::AlterTableStmtPlanNode* out = nullptr; CHECK_STATUS(ConvertGuard( - parser_tree, &out, [this](const node::AlterTableStmt *from, node::AlterTableStmtPlanNode **out) { - *out = node_manager_->MakeNode(from->db_, from->table_, - from->actions_); + parser_tree, &out, + [](const node::AlterTableStmt *from, node::NodeManager *nm, node::AlterTableStmtPlanNode **out) { + *out = nm->MakeNode(from->db_, from->table_, from->actions_); + return base::Status::OK(); + })); + plan_trees.push_back(out); + break; + } + case ::hybridse::node::kCallStmt: { + node::CallStmtPlan *out = nullptr; + CHECK_STATUS(ConvertGuard( + parser_tree, &out, [](const node::CallStmt *from, node::NodeManager *nm, node::CallStmtPlan **out) { + *out = nm->MakeNode(from->procedure_name(), from->arguments()); return base::Status::OK(); })); plan_trees.push_back(out); diff --git a/hybridse/src/plan/planner.h b/hybridse/src/plan/planner.h index 6da3068fdd8..64eb875e429 100644 --- a/hybridse/src/plan/planner.h +++ b/hybridse/src/plan/planner.h @@ -25,8 +25,6 @@ #include "absl/status/statusor.h" #include "base/fe_status.h" -#include "gflags/gflags.h" -#include "glog/logging.h" #include "node/node_manager.h" #include "node/plan_node.h" #include "node/sql_node.h" @@ -68,7 +66,7 @@ class Planner { ABSL_MUST_USE_RESULT base::Status ConvertGuard(const node::SqlNode *node, OutputType **output, ConvertFn &&func) { auto specific_node = dynamic_cast>>(node); CHECK_TRUE(specific_node != nullptr, common::kUnsupportSql, "unable to cast"); - return func(specific_node, output); + return func(specific_node, node_manager_, output); } static absl::StatusOr IsTable(node::PlanNode *node); diff --git a/hybridse/src/planv2/ast_node_converter.cc b/hybridse/src/planv2/ast_node_converter.cc index 46bb285f993..a8453e1221c 100644 --- a/hybridse/src/planv2/ast_node_converter.cc +++ b/hybridse/src/planv2/ast_node_converter.cc @@ -63,6 +63,9 @@ static base::Status ConvertSchemaNode(const zetasql::ASTColumnSchema* stmt, node node::ColumnSchemaNode** out); static base::Status ConvertArrayElement(const zetasql::ASTArrayElement* expr, node::NodeManager* nm, node::ArrayElementExpr** out); +static base::Status ConvertCallStmt(const zetasql::ASTCallStatement*, node::NodeManager*, node::CallStmt**); +static base::Status ConvertStructCtor(const zetasql::ASTStructConstructorWithParens*, node::NodeManager*, + node::StructCtorWithParens**); /// Used to convert zetasql ASTExpression Node into our ExprNode base::Status ConvertExprNode(const zetasql::ASTExpression* ast_expression, node::NodeManager* node_manager, @@ -542,6 +545,14 @@ base::Status ConvertExprNode(const zetasql::ASTExpression* ast_expression, node: ConvertArrayExpr); } + case zetasql::AST_STRUCT_CONSTRUCTOR_WITH_PARENS: { + node::StructCtorWithParens* expr = nullptr; + CHECK_STATUS(ConvertGuard(ast_expression, node_manager, &expr, + ConvertStructCtor)); + *output = expr; + return base::Status::OK(); + } + default: { FAIL_STATUS(common::kUnsupportSql, "Unsupport ASTExpression ", ast_expression->GetNodeKindString()) } @@ -889,6 +900,13 @@ base::Status ConvertStatement(const zetasql::ASTStatement* statement, node::Node ConvertGuard(statement, node_manager, output, ConvertAlterTableStmt)); break; } + case zetasql::AST_CALL_STATEMENT: { + node::CallStmt* call = nullptr; + CHECK_STATUS( + ConvertGuard(statement, node_manager, &call, ConvertCallStmt)); + *output = call; + break; + } default: { FAIL_STATUS(common::kSqlAstError, "Un-support statement type: ", statement->GetNodeKindString()); } @@ -2178,9 +2196,7 @@ base::Status ConvertAstOptionsListToMap(const zetasql::ASTOptionsList* options, auto entry_value = entry->value(); node::ExprNode* value = nullptr; CHECK_STATUS(ConvertExprNode(entry_value, node_manager, &value)); - CHECK_TRUE(value->GetExprType() == node::kExprPrimary, common::kSqlAstError, - "Unsupported value other than const type: ", entry_value->DebugString()); - options_map->emplace(key, dynamic_cast(value)); + options_map->emplace(key, value); } return base::Status::OK(); } @@ -2556,5 +2572,38 @@ base::Status ConvertArrayElement(const zetasql::ASTArrayElement* expr, node::Nod return {}; } +base::Status ConvertCallStmt(const zetasql::ASTCallStatement* call, node::NodeManager* nm, node::CallStmt** out) { + std::vector names; + CHECK_STATUS(AstPathExpressionToStringList(call->procedure_name(), names)); + + std::vector args; + for (auto arg : call->arguments()) { + if (arg->expr()) { + node::ExprNode* e; + CHECK_STATUS(ConvertExprNode(arg->expr(), nm, &e)); + args.push_back(e); + } else { + CHECK_TRUE(false, common::kSqlAstError, + "unsupported argument type for call statement: ", arg->DebugString()); + } + } + + *out = nm->MakeNode(names, args); + + return {}; +} + +base::Status ConvertStructCtor(const zetasql::ASTStructConstructorWithParens* expr, node::NodeManager* nm, + node::StructCtorWithParens** out) { + std::vector converted; + for (auto e : expr->field_expressions()) { + node::ExprNode* ce; + CHECK_STATUS(ConvertExprNode(e, nm, &ce)); + converted.push_back(ce); + } + *out = nm->MakeNode(converted); + return {}; +} + } // namespace plan } // namespace hybridse diff --git a/hybridse/src/planv2/ast_node_converter_test.cc b/hybridse/src/planv2/ast_node_converter_test.cc index 178117728ad..4255cc19524 100644 --- a/hybridse/src/planv2/ast_node_converter_test.cc +++ b/hybridse/src/planv2/ast_node_converter_test.cc @@ -742,7 +742,7 @@ TEST_F(ASTNodeConverterTest, ConvertCreateFunctionOKTest) { auto option = create_fun_stmt->Options(); ASSERT_EQ(option->size(), 1); ASSERT_EQ(option->begin()->first, "PATH"); - ASSERT_EQ(option->begin()->second->GetAsString(), "/tmp/libmyfun.so"); + ASSERT_EQ(option->begin()->second->GetExprString(), "/tmp/libmyfun.so"); std::string sql2 = "CREATE AGGREGATE FUNCTION fun1 (x BIGINT) RETURNS STRING OPTIONS (PATH='/tmp/libmyfun.so');"; create_fun_stmt = nullptr; diff --git a/hybridse/src/planv2/plan_api.cc b/hybridse/src/planv2/plan_api.cc index 6f3cdfe5871..a563222f82d 100644 --- a/hybridse/src/planv2/plan_api.cc +++ b/hybridse/src/planv2/plan_api.cc @@ -15,6 +15,7 @@ */ #include "plan/plan_api.h" +#include "absl/status/status.h" #include "absl/strings/substitute.h" #include "planv2/ast_node_converter.h" #include "planv2/planner_v2.h" @@ -46,7 +47,19 @@ base::Status PlanAPI::CreatePlanTreeFromScript(vm::SqlContext *ctx) { auto planner_ptr = std::make_unique(&ctx->nm, ctx->engine_mode == vm::kBatchMode, ctx->is_cluster_optimized, ctx->enable_batch_window_parallelization, ctx->options.get()); - return planner_ptr->CreateASTScriptPlan(script, ctx->logical_plan); + CHECK_STATUS(planner_ptr->CreateASTScriptPlan(script, ctx->logical_plan)); + + for (auto plan : ctx->logical_plan) { + if (plan->GetType() == node::kPlanTypeQuery) { + auto query = dynamic_cast(plan); + if (query && query->config_options_ && query->config_options_->count("values") != 0) { + ctx->request_expressions = query->config_options_->at("values"); + break; + } + } + } + + return {}; } bool PlanAPI::CreatePlanTreeFromScript(const std::string &sql, PlanNodeList &plan_trees, NodeManager *node_manager, @@ -89,6 +102,22 @@ const std::string PlanAPI::GenerateName(const std::string prefix, int id) { return name; } +absl::Status ParseStatement(absl::string_view sql, std::unique_ptr* out) { + zetasql::ParserOptions parser_opts; + zetasql::LanguageOptions language_opts; + language_opts.EnableLanguageFeature(zetasql::FEATURE_V_1_3_COLUMN_DEFAULT_VALUE); + parser_opts.set_language_options(&language_opts); + auto zetasql_status = zetasql::ParseStatement(sql, parser_opts, out); + zetasql::ErrorLocation location; + if (!zetasql_status.ok()) { + zetasql::ErrorLocation location; + GetErrorLocation(zetasql_status, &location); + return absl::InvalidArgumentError(zetasql::FormatError(zetasql_status)); + } + + return absl::OkStatus(); +} + absl::StatusOr ParseTableColumSchema(absl::string_view str) { zetasql::ParserOptions parser_opts; zetasql::LanguageOptions language_opts; diff --git a/hybridse/src/planv2/planner_v2.h b/hybridse/src/planv2/planner_v2.h index 2555ffd66e2..a6f5e86d449 100644 --- a/hybridse/src/planv2/planner_v2.h +++ b/hybridse/src/planv2/planner_v2.h @@ -22,17 +22,12 @@ #include "base/fe_status.h" #include "node/node_manager.h" -#include "node/plan_node.h" #include "plan/planner.h" #include "zetasql/parser/parser.h" namespace hybridse { namespace plan { -using base::Status; -using node::NodePointVector; -using node::PlanNodeList; - class SimplePlannerV2 : public SimplePlanner { public: SimplePlannerV2(node::NodeManager *manager, bool is_batch_mode, bool is_cluster_optimized = false, diff --git a/hybridse/src/planv2/planner_v2_test.cc b/hybridse/src/planv2/planner_v2_test.cc index 2b83865b1bb..70356a75716 100644 --- a/hybridse/src/planv2/planner_v2_test.cc +++ b/hybridse/src/planv2/planner_v2_test.cc @@ -91,15 +91,14 @@ INSTANTIATE_TEST_SUITE_P(SQLCmdParserTest, PlannerV2Test, TEST_P(PlannerV2Test, PlannerSucessTest) { const auto& param = GetParam(); std::string sqlstr = param.sql_str(); - std::cout << sqlstr << std::endl; - base::Status status; - node::PlanNodeList plan_trees; - EXPECT_EQ(param.expect().success_, PlanAPI::CreatePlanTreeFromScript(sqlstr, plan_trees, manager_, status)) - << status; + vm::SqlContext ctx; + ctx.sql = sqlstr; + auto status = PlanAPI::CreatePlanTreeFromScript(&ctx); + EXPECT_EQ(param.expect().success_, status.isOK()) << status; if (param.expect().success_) { if (!param.expect().plan_tree_str_.empty()) { // HACK: weak implementation, but usually it works - EXPECT_EQ(param.expect().plan_tree_str_, plan_trees.at(0)->GetTreeString()); + EXPECT_EQ(param.expect().plan_tree_str_, ctx.logical_plan.at(0)->GetTreeString()); } } else { if (!param.expect().msg_.empty()) { @@ -117,10 +116,13 @@ TEST_P(PlannerV2Test, PlannerClusterOnlineServingOptTest) { LOG(INFO) << "Skip mode " << sql_case.mode(); return; } - base::Status status; - node::PlanNodeList plan_trees; + vm::SqlContext ctx; + ctx.sql = sqlstr; + ctx.engine_mode = vm::kRequestMode; + ctx.is_cluster_optimized = true; + auto status = PlanAPI::CreatePlanTreeFromScript(&ctx); // TODO(ace): many tests defined in 'cases/plan/' do not pass, should annotated in yaml file - plan::PlanAPI::CreatePlanTreeFromScript(sqlstr, plan_trees, manager_, status, false, true); + plan::PlanAPI::CreatePlanTreeFromScript(&ctx); } TEST_F(PlannerV2Test, SimplePlannerCreatePlanTest) { @@ -1880,9 +1882,11 @@ TEST_P(PlannerV2ErrorTest, RequestModePlanErrorTest) { LOG(INFO) << "Skip mode " << sql_case.mode() << " for request mode error test"; return; } - base::Status status; - node::PlanNodeList plan_trees; - EXPECT_FALSE(plan::PlanAPI::CreatePlanTreeFromScript(sqlstr, plan_trees, manager_, status, false, false)) << status; + vm::SqlContext ctx; + ctx.sql = sqlstr; + ctx.engine_mode = vm::kRequestMode; + auto status = plan::PlanAPI::CreatePlanTreeFromScript(&ctx); + EXPECT_FALSE(status.isOK()) << status; if (!sql_case.expect_.msg_.empty()) { EXPECT_EQ(absl::StripAsciiWhitespace(sql_case.expect_.msg_), status.msg); } @@ -1896,9 +1900,12 @@ TEST_P(PlannerV2ErrorTest, ClusterRequestModePlanErrorTest) { LOG(INFO) << "Skip mode " << sql_case.mode() << " for cluster request mode error test"; return; } - base::Status status; - node::PlanNodeList plan_trees; - EXPECT_FALSE(plan::PlanAPI::CreatePlanTreeFromScript(sqlstr, plan_trees, manager_, status, false, true)) << status; + vm::SqlContext ctx; + ctx.sql = sqlstr; + ctx.engine_mode = vm::kRequestMode; + ctx.is_cluster_optimized = true; + auto status = plan::PlanAPI::CreatePlanTreeFromScript(&ctx); + EXPECT_FALSE(status.isOK()) << status; if (!sql_case.expect_.msg_.empty()) { EXPECT_EQ(absl::StripAsciiWhitespace(sql_case.expect_.msg_), status.msg); } @@ -1911,9 +1918,11 @@ TEST_P(PlannerV2ErrorTest, BatchModePlanErrorTest) { LOG(INFO) << "Skip mode " << sql_case.mode() << " for batch mode error test"; return; } - base::Status status; - node::PlanNodeList plan_trees; - EXPECT_FALSE(plan::PlanAPI::CreatePlanTreeFromScript(sqlstr, plan_trees, manager_, status, true)) << status; + vm::SqlContext ctx; + ctx.sql = sqlstr; + ctx.engine_mode = vm::kBatchMode; + auto status = plan::PlanAPI::CreatePlanTreeFromScript(&ctx); + EXPECT_FALSE(status.isOK()) << status; if (!sql_case.expect_.msg_.empty()) { EXPECT_EQ(absl::StripAsciiWhitespace(sql_case.expect_.msg_), status.msg); } diff --git a/hybridse/src/testing/engine_test_base.cc b/hybridse/src/testing/engine_test_base.cc index f55b5454339..ea8b2db876c 100644 --- a/hybridse/src/testing/engine_test_base.cc +++ b/hybridse/src/testing/engine_test_base.cc @@ -17,11 +17,13 @@ #include +#include "absl/cleanup/cleanup.h" +#include "absl/strings/ascii.h" +#include "absl/time/clock.h" #include "base/texttable.h" +#include "google/protobuf/util/message_differencer.h" #include "plan/plan_api.h" -#include "boost/algorithm/string.hpp" #include "vm/sql_compiler.h" -#include "google/protobuf/util/message_differencer.h" namespace hybridse { namespace vm { @@ -406,19 +408,19 @@ Status EngineTestRunner::Compile() { CHECK_TRUE(parameter_schema_.empty(), common::kUnSupport, "Request or BatchRequest mode do not support parameterized query currently") } - struct timeval st; - struct timeval et; - gettimeofday(&st, nullptr); - Status status; - bool ok = engine_->Get(sql_str, sql_case_.db(), *session_, status); - gettimeofday(&et, nullptr); - double mill = (et.tv_sec - st.tv_sec) * 1000 + (et.tv_usec - st.tv_usec) / 1000.0; - DLOG(INFO) << "SQL Compile take " << mill << " milliseconds"; + + base::Status status; + bool ok = false; + { + absl::Time start = absl::Now(); + absl::Cleanup clean = [&start]() { DLOG(INFO) << "compile takes " << absl::Now() - start; }; + ok = engine_->Get(sql_str, sql_case_.db(), *session_, status); + } if (!ok || !status.isOK()) { DLOG(INFO) << status; if (!sql_case_.expect().msg_.empty()) { - EXPECT_EQ(sql_case_.expect().msg_, status.msg); + EXPECT_EQ(absl::StripAsciiWhitespace(sql_case_.expect().msg_), status.msg); } return_code_ = ENGINE_TEST_RET_COMPILE_ERROR; } else { diff --git a/hybridse/src/vm/engine.cc b/hybridse/src/vm/engine.cc index ac9ee9dcaaf..5c179fb79b6 100644 --- a/hybridse/src/vm/engine.cc +++ b/hybridse/src/vm/engine.cc @@ -25,20 +25,22 @@ #include "codec/fe_row_codec.h" #include "gflags/gflags.h" #include "llvm-c/Target.h" +#include "plan/plan_api.h" #include "udf/default_udf_library.h" #include "vm/internal/node_helper.h" #include "vm/local_tablet_handler.h" #include "vm/mem_catalog.h" #include "vm/runner_ctx.h" #include "vm/sql_compiler.h" +#include "zetasql/parser/parser.h" DECLARE_bool(enable_spark_unsaferow_format); +#define EXECUTE_MODE_OPT "execute_mode" +#define VALUES_OPT "values" namespace hybridse { namespace vm { -static bool LLVM_IS_INITIALIZED = false; - EngineOptions::EngineOptions() : keep_ir_(false), compile_only_(false), @@ -51,22 +53,24 @@ EngineOptions::EngineOptions() max_sql_cache_size_(50) { } +static absl::Status ExtractRows(const node::ExprNode* expr, const codec::Schema* sc, std::vector* out) + ABSL_ATTRIBUTE_NONNULL(); + Engine::Engine(const std::shared_ptr& catalog) : cl_(catalog), options_(), mu_(), lru_cache_() {} Engine::Engine(const std::shared_ptr& catalog, const EngineOptions& options) : cl_(catalog), options_(options), mu_(), lru_cache_() {} Engine::~Engine() {} -void Engine::InitializeGlobalLLVM() { - // not thread safe, but is generally fine to call multiple times - if (LLVM_IS_INITIALIZED) return; - +static bool InitializeLLVM() { absl::Time begin = absl::Now(); LLVMInitializeNativeTarget(); LLVMInitializeNativeAsmPrinter(); LOG(INFO) << "initialize llvm native target and asm printer, takes " << absl::Now() - begin; - LLVM_IS_INITIALIZED = true; + return true; } +void Engine::InitializeGlobalLLVM() { [[maybe_unused]] static bool LLVM_IS_INITIALIZED = InitializeLLVM(); } + void Engine::InitializeUnsafeRowOptFlag(bool isUnsafeRowOpt) { FLAGS_enable_spark_unsaferow_format = isUnsafeRowOpt; } @@ -190,6 +194,15 @@ bool Engine::Get(const std::string& sql, const std::string& db, RunSession& sess } } + { + auto s = ExtractRequestRowsInSQL(&sql_context); + if (!s.ok()) { + status.code = common::kCodegenError; + status.msg = s.ToString(); + return false; + } + } + SetCacheLocked(db, sql, session.engine_mode(), info); session.SetCompileInfo(info); if (session.is_debug_) { @@ -383,12 +396,72 @@ bool RunSession::SetCompileInfo(const std::shared_ptr& compile_info return true; } +absl::Status ExtractRows(const node::ExprNode* expr, const codec::Schema* sc, std::vector* out) { + switch (expr->GetExprType()) { + case node::kExprStructCtorParens: { + auto struct_expr = expr->GetAsOrNull(); + base::Status s; + auto jit = + std::shared_ptr(vm::HybridSeJitWrapper::CreateWithDefaultSymbols(&s)); + CHECK_STATUS_TO_ABSL(s); + codec::RowBuilder2 builder(jit.get(), {*sc}); + CHECK_STATUS_TO_ABSL(builder.Init()); + codec::Row r; + CHECK_STATUS_TO_ABSL(builder.Build(struct_expr->children_, &r)); + out->push_back(r); + break; + } + case node::kExprArray: { + auto arr = expr->GetAsOrNull(); + if (arr->GetChildNum() == 0) { + return absl::FailedPreconditionError("element number of the request values must not empty"); + } + for (auto e : arr->children_) { + CHECK_ABSL_STATUS(ExtractRows(e, sc, out)); + } + break; + } + default: { + // try build as request schema size = 1, necessary since AST parser simplify '(expr1)' to 'expr1'. + // this also allows syntax like 'values = [12, 12]', where request table schema is '[int]', + // it is a special rule to go with AST parser, not recommanded to users generally. + base::Status s; + auto jit = + std::shared_ptr(vm::HybridSeJitWrapper::CreateWithDefaultSymbols(&s)); + CHECK_STATUS_TO_ABSL(s); + codec::RowBuilder2 builder(jit.get(), {*sc}); + CHECK_STATUS_TO_ABSL(builder.Init()); + codec::Row r; + CHECK_STATUS_TO_ABSL(builder.Build({const_cast(expr)}, &r)); + out->push_back(r); + } + } + + return absl::OkStatus(); +} + +absl::Status Engine::ExtractRequestRowsInSQL(SqlContext* sql_ctx) { + if ((sql_ctx->engine_mode == kRequestMode || sql_ctx->engine_mode == kBatchRequestMode) && + !sql_ctx->request_schema.empty() && sql_ctx->request_expressions != nullptr) { + // extract rows if request table and request values expression both exists + vm::Engine::InitializeGlobalLLVM(); + CHECK_ABSL_STATUS(ExtractRows(sql_ctx->request_expressions, &sql_ctx->request_schema, &sql_ctx->request_rows)); + } + return absl::OkStatus(); +} + int32_t RequestRunSession::Run(const Row& in_row, Row* out_row) { DLOG(INFO) << "Request Row Run with main task"; return Run(std::dynamic_pointer_cast(compile_info_)->get_sql_context().cluster_job->main_task_id(), in_row, out_row); } int32_t RequestRunSession::Run(const uint32_t task_id, const Row& in_row, Row* out_row) { + ::hybridse::codec::Row row = in_row; + std::vector<::hybridse::codec::Row>& sql_request_rows = + std::dynamic_pointer_cast(GetCompileInfo())->get_sql_context().request_rows; + if (!sql_request_rows.empty()) { + row = sql_request_rows.at(0); + } auto task = std::dynamic_pointer_cast(compile_info_) ->get_sql_context() .cluster_job->GetTask(task_id) @@ -398,7 +471,7 @@ int32_t RequestRunSession::Run(const uint32_t task_id, const Row& in_row, Row* o return -2; } DLOG(INFO) << "Request Row Run with task_id " << task_id; - RunnerContext ctx(std::dynamic_pointer_cast(compile_info_)->get_sql_context().cluster_job, in_row, + RunnerContext ctx(std::dynamic_pointer_cast(compile_info_)->get_sql_context().cluster_job, row, sp_name_, is_debug_); auto output = task->RunWithCache(ctx); if (!output) { @@ -418,8 +491,11 @@ int32_t BatchRequestRunSession::Run(const std::vector& request_batch, std:: } int32_t BatchRequestRunSession::Run(const uint32_t id, const std::vector& request_batch, std::vector& output) { + std::vector<::hybridse::codec::Row>& sql_request_rows = + std::dynamic_pointer_cast(GetCompileInfo())->get_sql_context().request_rows; + RunnerContext ctx(std::dynamic_pointer_cast(compile_info_)->get_sql_context().cluster_job, - request_batch, sp_name_, is_debug_); + sql_request_rows.empty() ? request_batch : sql_request_rows, sp_name_, is_debug_); auto task = std::dynamic_pointer_cast(compile_info_)->get_sql_context().cluster_job->GetTask(id).GetRoot(); if (nullptr == task) { @@ -548,5 +624,48 @@ std::shared_ptr LocalTablet::SubQuery(uint32_t task_id, const std: } return std::make_shared(task_id, session, in_rows, request_is_common); } + +EngineMode Engine::TryDetermineEngineMode(absl::string_view sql, EngineMode default_mode) { + // DESIGN ISSUE as compiler need EngineMode before compilation, + // give this method as a distinct function to SQL Compile function. + std::unique_ptr ast; + auto s = hybridse::plan::ParseStatement(sql, &ast); + if (!s.ok()) { + return default_mode; + } + + EngineMode mode = default_mode; + if (ast->statement() && + ast->statement()->node_kind() == zetasql::AST_QUERY_STATEMENT ) { + auto query = ast->statement()->GetAsOrNull(); + if (query && query->config_clause()) { + auto options = query->config_clause()->options_list()->options_entries(); + bool values_arr_size_gt_1 = false; + for (auto kv : options) { + auto name = kv->name()->GetAsStringView(); + if (absl::EqualsIgnoreCase(name, EXECUTE_MODE_OPT)) { + auto val = kv->value()->GetAsOrNull(); + if (val) { + auto m = UnparseEngineMode(val->string_value()); + mode = m.value_or(default_mode); + } + } + + if (absl::EqualsIgnoreCase(name, VALUES_OPT)) { + auto arr_expr = kv->value()->GetAsOrNull(); + if (arr_expr) { + values_arr_size_gt_1 = arr_expr->elements().size() > 1; + } + } + } + + if (mode == vm::kRequestMode && values_arr_size_gt_1) { + mode = kBatchRequestMode; + } + } + } + + return mode; +} } // namespace vm } // namespace hybridse diff --git a/hybridse/src/vm/engine_compile_test.cc b/hybridse/src/vm/engine_compile_test.cc index e30cc569f15..9e5a9f37867 100644 --- a/hybridse/src/vm/engine_compile_test.cc +++ b/hybridse/src/vm/engine_compile_test.cc @@ -709,6 +709,26 @@ TEST_F(EngineCompileTest, ExternalFunctionTest) { std::string sql2 = "select cut2(col0) from t1;"; ASSERT_TRUE(engine.Get(sql2, "simple_db", session, get_status)); } + +TEST_F(EngineCompileTest, DetermineEngineMode) { + EXPECT_EQ(vm::kRequestMode, Engine::TryDetermineEngineMode(R"(SELECT * from t1 +CONFIG (execute_mode = 'request', values = [12]))", + kBatchMode)); + + EXPECT_EQ(vm::kBatchRequestMode, Engine::TryDetermineEngineMode(R"(SELECT * from t1 +CONFIG (execute_mode = 'request', values = [(12), (12)]))", + kBatchMode)); + + EXPECT_EQ(vm::kBatchRequestMode, Engine::TryDetermineEngineMode(R"(SELECT * from t1 +CONFIG (execute_mode = 'batchrequest', values = [(12)]))", + kBatchMode)); + + // no option, default + EXPECT_EQ(vm::kRequestMode, Engine::TryDetermineEngineMode(R"(SELECT * FROM t1)", kRequestMode)); + // on error, default + EXPECT_EQ(vm::kBatchMode, Engine::TryDetermineEngineMode(R"(SELECT)", kBatchMode)); +} + } // namespace vm } // namespace hybridse diff --git a/hybridse/src/vm/engine_context.cc b/hybridse/src/vm/engine_context.cc new file mode 100644 index 00000000000..570726aa0eb --- /dev/null +++ b/hybridse/src/vm/engine_context.cc @@ -0,0 +1,73 @@ +/** + * Copyright (c) 2024 OpenMLDB authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "vm/engine_context.h" + +#include "absl/container/flat_hash_map.h" + +namespace hybridse { +namespace vm { + +static absl::flat_hash_map createModeMap() { + absl::flat_hash_map map = { + {"online", kBatchMode}, // 'online' default to batch + {"batch", kBatchMode}, + {"online_preview", kBatchMode}, + {"online_batch", kBatchMode}, + {"request", kRequestMode}, + {"online_request", kRequestMode}, + {"batch_request", kBatchRequestMode}, + {"batchrequest", kBatchRequestMode}, + {"online_batchrequest", kBatchRequestMode}, + {"offline", kOffline}, + }; + return map; +} + +static const auto& getModeMap() { + static const absl::flat_hash_map mode_map = *new auto(createModeMap()); + return mode_map; +} + +std::string EngineModeName(EngineMode mode) { + switch (mode) { + case kBatchMode: + return "kBatchMode"; + case kRequestMode: + return "kRequestMode"; + case kBatchRequestMode: + return "kBatchRequestMode"; + case kMockRequestMode: + return "kMockRequestMode"; + case kOffline: + return "kOffline"; + default: + return "unknown"; + } +} + +absl::StatusOr UnparseEngineMode(absl::string_view str) { + auto& m = getModeMap(); + auto it = m.find(str); + if (it != m.end()) { + return it->second; + } + + return absl::NotFoundError(absl::StrCat("no matching mode for string '", str, "'")); +} + +} // namespace vm +} // namespace hybridse diff --git a/hybridse/src/vm/sql_compiler.cc b/hybridse/src/vm/sql_compiler.cc index 7b53c50143c..2abe728cfca 100644 --- a/hybridse/src/vm/sql_compiler.cc +++ b/hybridse/src/vm/sql_compiler.cc @@ -133,21 +133,6 @@ bool SqlCompiler::Compile(SqlContext& ctx, Status& status) { // NOLINT return true; } -std::string EngineModeName(EngineMode mode) { - switch (mode) { - case kBatchMode: - return "kBatchMode"; - case kRequestMode: - return "kRequestMode"; - case kBatchRequestMode: - return "kBatchRequestMode"; - case kMockRequestMode: - return "kMockRequestMode"; - default: - return "unknown"; - } -} - Status SqlCompiler::BuildBatchModePhysicalPlan(SqlContext* ctx, const ::hybridse::node::PlanNodeList& plan_list, ::llvm::Module* llvm_module, udf::UdfLibrary* library, PhysicalOpNode** output) { diff --git a/src/cmd/sql_cmd_test.cc b/src/cmd/sql_cmd_test.cc index 149fdf9ab2b..a4067bfaa1d 100644 --- a/src/cmd/sql_cmd_test.cc +++ b/src/cmd/sql_cmd_test.cc @@ -4087,7 +4087,7 @@ struct DeploymentEnv { auto common_column_indices = std::make_shared(); auto row_batch = std::make_shared(rr->GetSchema(), common_column_indices); ASSERT_TRUE(row_batch->AddRow(rr)); - sr->CallSQLBatchRequestProcedure(db_, dp_name_, row_batch, &status); + sr_->CallSQLBatchRequestProcedure(db_, dp_name_, row_batch, &status); ASSERT_TRUE(status.IsOK()) << status.msg << "\n" << status.trace; } @@ -4095,15 +4095,37 @@ struct DeploymentEnv { hybridse::sdk::Status status; std::shared_ptr rr = std::make_shared(); GetRequestRow(&rr, dp_name_); - sr->CallProcedure(db_, dp_name_, rr, &status); + sr_->CallProcedure(db_, dp_name_, rr, &status); ASSERT_TRUE(status.IsOK()) << status.msg << "\n" << status.trace; } + void CallDeployProcedureWithCallStmt() { + hybridse::sdk::Status ss; + auto call = absl::Substitute( + // casting is mandatory util #3847 + "call $0('12', 99, cast(100 as int64), cast(77.7 as float), cast(88.8 as double), timestamp(8000), " + "date(null))", + dp_name_); + auto rs = sr_->ExecuteSQL(call, &ss); + ASSERT_TRUE(ss.IsOK()) << ss.ToString(); + ASSERT_EQ(rs->Size(), 1); + rs->Next(); + std::string col1; + ASSERT_TRUE(rs->GetString(0, &col1)); + EXPECT_EQ("12", col1); + int32_t col2 = 0; + ASSERT_TRUE(rs->GetInt32(1, &col2)); + EXPECT_EQ(99, col2); + int64_t col3 = 0; + ASSERT_TRUE(rs->GetInt64(2, &col3)); + EXPECT_EQ(100, col3); + HandleSQL(call); + } void CallProcedure() { hybridse::sdk::Status status; std::shared_ptr rr = std::make_shared(); GetRequestRow(&rr, procedure_name_); - sr->CallProcedure(db_, procedure_name_, rr, &status); + sr_->CallProcedure(db_, procedure_name_, rr, &status); ASSERT_TRUE(status.IsOK()) << status.msg << "\n" << status.trace; } @@ -4133,6 +4155,18 @@ struct DeploymentEnv { } }; +TEST_P(DBSDKTest, deploymentCall) { + auto cli = GetParam(); + cs = cli->cs; + sr = cli->sr; + DeploymentEnv env(sr); + + env.SetUp(); + + env.CallDeployProcedure(); + env.CallDeployProcedureWithCallStmt(); +} + class StripSpaceTest : public ::testing::TestWithParam> {}; std::vector> strip_cases = { diff --git a/src/sdk/node_adapter.cc b/src/sdk/node_adapter.cc index 8d8336fbaa3..c7c0d191922 100644 --- a/src/sdk/node_adapter.cc +++ b/src/sdk/node_adapter.cc @@ -793,10 +793,12 @@ absl::StatusOr NodeAdapter::ExtractUserOption(const hybridse::node: if (!absl::EqualsIgnoreCase(map.begin()->first, "password")) { return absl::InvalidArgumentError("invalid option " + map.begin()->first); } - if (map.begin()->second->GetDataType() != hybridse::node::kVarchar) { + auto& kv = *map.begin(); + auto cnode = kv.second->GetAsOrNull<::hybridse::node::ConstNode>(); + if (cnode == nullptr || cnode->GetDataType() != hybridse::node::kVarchar) { return absl::InvalidArgumentError("the value of password should be string"); } - return map.begin()->second->GetAsString(); + return cnode->GetAsString(); } } // namespace openmldb::sdk diff --git a/src/sdk/options_map_parser.h b/src/sdk/options_map_parser.h index 426b5fb4c15..991570593f9 100644 --- a/src/sdk/options_map_parser.h +++ b/src/sdk/options_map_parser.h @@ -47,7 +47,8 @@ class OptionsMapParser { if (options_map_.find(key) != options_map_.end()) { LOG(WARNING) << "option " << key << " already exists, won't replace"; } else { - options_map_.insert(std::pair(key, *(item.second))); // copy + options_map_.insert(std::pair( + key, *(item.second)->GetAsOrNull())); // copy } } } diff --git a/src/sdk/sql_cluster_router.cc b/src/sdk/sql_cluster_router.cc index e84d2b251f8..924a3169c28 100644 --- a/src/sdk/sql_cluster_router.cc +++ b/src/sdk/sql_cluster_router.cc @@ -2680,14 +2680,15 @@ std::shared_ptr SQLClusterRouter::ExecuteSQL( // functions we called later may not change the status if it's succeed. So if we pass error status here, we'll get a // fake error status->SetOK(); - hybridse::node::NodeManager node_manager; - hybridse::node::PlanNodeList plan_trees; - hybridse::base::Status sql_status; - hybridse::plan::PlanAPI::CreatePlanTreeFromScript(sql, plan_trees, &node_manager, sql_status); + hybridse::vm::SqlContext ctx; + ctx.sql = sql; + auto sql_status = hybridse::plan::PlanAPI::CreatePlanTreeFromScript(&ctx); if (!sql_status.isOK()) { COPY_PREPEND_AND_WARN(status, sql_status, "create logic plan tree failed"); return {}; } + + auto& plan_trees = ctx.logical_plan; auto ns_ptr = cluster_sdk_->GetNsClient(); if (!ns_ptr) { SET_STATUS_AND_WARN(status, StatusCode::kRuntimeError, "no ns client, retry or check ns process"); @@ -3082,6 +3083,43 @@ std::shared_ptr SQLClusterRouter::ExecuteSQL( } return {}; } + case hybridse::node::kPlanTypeCallStmt: { + auto* call = dynamic_cast(node); + std::string db = db_; + std::string procedure_name; + if (call->procedure_name().size() == 1) { + procedure_name = call->procedure_name().at(0); + } else if (call->procedure_name().size() == 2) { + db = call->procedure_name().at(0); + procedure_name = call->procedure_name().at(1); + } else { + *status = {StatusCode::kCmdError, + absl::StrCat("invalid procedure name: ", absl::StrJoin(call->procedure_name(), "."))}; + return {}; + } + auto req = GetRequestRowByProcedure(db, procedure_name, status); + if (!status->IsOK()) { + return {}; + } + auto sc = std::dynamic_pointer_cast(req->GetSchema()); + ::hybridse::vm::Engine::InitializeGlobalLLVM(); + hybridse::base::Status s; + auto jit = std::shared_ptr( + hybridse::vm::HybridSeJitWrapper::CreateWithDefaultSymbols(&s)); + if (!s.isOK()) { + APPEND_FROM_BASE(status, s, ""); + return {}; + } + hybridse::codec::SliceBuilder builder(jit.get(), &sc->GetSchema()); + hybridse::base::RefCountedSlice slice; + s = builder.Build(call->arguments(), &slice); + if (!s.isOK()) { + APPEND_FROM_BASE(status, s, ""); + return {}; + } + base::Slice sl(slice.data(), slice.size(), false); + return CallProcedure(db, procedure_name, sl, "", status); + } default: { *status = {StatusCode::kCmdError, "Unsupported command"}; return {}; @@ -3696,16 +3734,18 @@ hybridse::sdk::Status SQLClusterRouter::HandleCreateFunction(const hybridse::nod } fun->set_file((*option)["FILE"]->GetExprString()); if (auto iter = option->find("RETURN_NULLABLE"); iter != option->end()) { - if (iter->second->GetDataType() != hybridse::node::kBool) { + auto cnode = iter->second->GetAsOrNull(); + if (cnode == nullptr || cnode->GetDataType() != hybridse::node::kBool) { return {StatusCode::kCmdError, "return_nullable should be bool"}; } - fun->set_return_nullable(iter->second->GetBool()); + fun->set_return_nullable(cnode->GetBool()); } if (auto iter = option->find("ARG_NULLABLE"); iter != option->end()) { - if (iter->second->GetDataType() != hybridse::node::kBool) { + auto cnode = iter->second->GetAsOrNull(); + if (cnode == nullptr || cnode->GetDataType() != hybridse::node::kBool) { return {StatusCode::kCmdError, "arg_nullable should be bool"}; } - fun->set_arg_nullable(iter->second->GetBool()); + fun->set_arg_nullable(cnode->GetBool()); } hybridse::sdk::Status st; if (cluster_sdk_->IsClusterMode()) { @@ -3833,13 +3873,13 @@ hybridse::sdk::Status SQLClusterRouter::HandleDeploy(const std::string& db, Bias bias; iter = deploy_node->Options()->find(RANGE_BIAS_OPTION); if (iter != deploy_node->Options()->end()) { - if (!bias.SetRange(iter->second)) { + if (!bias.SetRange(iter->second->GetAsOrNull())) { return {StatusCode::kCmdError, "range bias '" + iter->second->GetExprString() + "' is illegal"}; } } iter = deploy_node->Options()->find(ROWS_BIAS_OPTION); if (iter != deploy_node->Options()->end()) { - if (!bias.SetRows(iter->second)) { + if (!bias.SetRows(iter->second->GetAsOrNull())) { return {StatusCode::kCmdError, "rows bias '" + iter->second->GetExprString() + "' is illegal"}; } } diff --git a/src/sdk/sql_sdk_test.cc b/src/sdk/sql_sdk_test.cc index 70d33a1fed1..50bafac7522 100644 --- a/src/sdk/sql_sdk_test.cc +++ b/src/sdk/sql_sdk_test.cc @@ -23,17 +23,11 @@ #include #include -#include "base/file_util.h" #include "base/glog_wrapper.h" -#include "codec/fe_row_codec.h" -#include "common/timer.h" #include "gflags/gflags.h" #include "sdk/mini_cluster.h" #include "sdk/sql_router.h" -#include "sdk/sql_sdk_base_test.h" -#include "test/base_test.h" #include "test/util.h" -#include "vm/catalog.h" namespace openmldb { namespace sdk { @@ -44,6 +38,7 @@ std::shared_ptr router_ = std::shared_ptr(); static void SetOnlineMode(std::shared_ptr router) { ::hybridse::sdk::Status status; router->ExecuteSQL("SET @@execute_mode='online';", &status); + ASSERT_TRUE(status.IsOK()) << status.msg; } static std::shared_ptr GetNewSQLRouter() { diff --git a/src/sdk/sql_sdk_test.h b/src/sdk/sql_sdk_test.h index 4945686b4c9..82c51294da4 100644 --- a/src/sdk/sql_sdk_test.h +++ b/src/sdk/sql_sdk_test.h @@ -18,7 +18,7 @@ #define SRC_SDK_SQL_SDK_TEST_H_ #include -#include "base/glog_wrapper.h" + #include "sdk/sql_sdk_base_test.h" namespace openmldb { @@ -58,6 +58,8 @@ INSTANTIATE_TEST_SUITE_P(SQLSDKLastJoinWhere, SQLSDKQueryTest, testing::ValuesIn(SQLSDKQueryTest::InitCases("cases/query/last_join_where.yaml"))); INSTANTIATE_TEST_SUITE_P(SQLSDKParameterizedQuery, SQLSDKQueryTest, testing::ValuesIn(SQLSDKQueryTest::InitCases("cases/query/parameterized_query.yaml"))); +INSTANTIATE_TEST_SUITE_P(SQLSimpleQuery, SQLSDKQueryTest, + testing::ValuesIn(SQLSDKQueryTest::InitCases("cases/query/simple_query.yaml"))); // Test Cluster INSTANTIATE_TEST_SUITE_P( diff --git a/src/tablet/tablet_impl.cc b/src/tablet/tablet_impl.cc index 8f4b8ab4c1e..580f984b1b0 100644 --- a/src/tablet/tablet_impl.cc +++ b/src/tablet/tablet_impl.cc @@ -1684,8 +1684,12 @@ void TabletImpl::ProcessQuery(bool is_sub, RpcController* ctrl, const openmldb:: } }; + hybridse::vm::EngineMode default_mode = + request->is_batch() ? hybridse::vm::EngineMode::kBatchMode : hybridse::vm::EngineMode::kRequestMode; + auto mode = hybridse::vm::Engine::TryDetermineEngineMode(request->sql(), default_mode); + ::hybridse::base::Status status; - if (request->is_batch()) { + if (mode == hybridse::vm::EngineMode::kBatchMode) { // convert repeated openmldb:type::DataType into hybridse::codec::Schema hybridse::codec::Schema parameter_schema; for (int i = 0; i < request->parameter_types().size(); i++) {