diff --git a/Cargo.lock b/Cargo.lock index 0b183afc260..dcb3b9de8bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -581,11 +581,11 @@ dependencies = [ [[package]] name = "async-compression" -version = "0.4.9" +version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e9eabd7a98fe442131a17c316bd9349c43695e49e730c3c8e12cfb5f4da2693" +checksum = "9c90a406b4495d129f00461241616194cb8a032c8d1c53c657f0961d5f8e0498" dependencies = [ - "brotli 5.0.0", + "brotli 6.0.0", "bzip2", "flate2", "futures-core", @@ -1068,9 +1068,9 @@ dependencies = [ [[package]] name = "brotli" -version = "5.0.0" +version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19483b140a7ac7174d34b5a581b406c64f84da5409d3e09cf4fff604f9270e67" +checksum = "74f7971dbd9326d58187408ab83117d8ac1bb9c17b085fdacd1cf2f598719b6b" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", @@ -2589,7 +2589,7 @@ dependencies = [ "arrow-array", "arrow-ipc", "arrow-schema", - "async-compression 0.4.9", + "async-compression 0.4.10", "async-trait", "bytes", "bzip2", @@ -2641,7 +2641,7 @@ dependencies = [ "arrow-array", "arrow-ipc", "arrow-schema", - "async-compression 0.4.9", + "async-compression 0.4.10", "async-trait", "bytes", "bzip2", @@ -3577,9 +3577,9 @@ dependencies = [ [[package]] name = "errno" -version = "0.3.8" +version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a258e46cdc063eb8519c00b9fc845fc47bcfca4130e2f08e88665ceda8474245" +checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba" dependencies = [ "libc", "windows-sys 0.52.0", @@ -4200,7 +4200,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "greptime-proto" version = "0.1.0" -source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=219b2409bb701f75b43fc0ba64967d2ed8e75491#219b2409bb701f75b43fc0ba64967d2ed8e75491" +source = "git+https://github.com/killme2008/greptime-proto.git?rev=57e186d572c6c5898e90d6ab9e91b0867c30d1da#57e186d572c6c5898e90d6ab9e91b0867c30d1da" dependencies = [ "prost 0.12.4", "serde", @@ -4550,7 +4550,7 @@ dependencies = [ "clap 4.5.4", "data-encoding", "itertools 0.10.5", - "prettyplease 0.2.19", + "prettyplease 0.2.20", "proc-macro2", "quote", "regex", @@ -6173,9 +6173,9 @@ dependencies = [ [[package]] name = "num" -version = "0.4.2" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3135b08af27d103b0a51f2ae0f8632117b7b185ccf931445affa8df530576a41" +checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23" dependencies = [ "num-bigint", "num-complex", @@ -6214,9 +6214,9 @@ dependencies = [ [[package]] name = "num-complex" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23c6602fda94a57c990fe0df199a035d83576b496aa29f4e634a8ac6004e68a6" +checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" dependencies = [ "num-traits", ] @@ -6270,11 +6270,10 @@ dependencies = [ [[package]] name = "num-rational" -version = "0.4.1" +version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0638a1c9d0a3c0914158145bc76cff373a75a627e6ecbfb71cbe6f453a5a19b0" +checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" dependencies = [ - "autocfg", "num-bigint", "num-integer", "num-traits", @@ -6643,6 +6642,7 @@ dependencies = [ "sql", "sqlparser 0.44.0 (git+https://github.com/GreptimeTeam/sqlparser-rs.git?rev=c919990bf62ad38d2b0c0a3bc90b26ad919d51b0)", "store-api", + "substrait 0.7.2", "table", "tokio", "tonic 0.11.0", @@ -7022,9 +7022,9 @@ dependencies = [ [[package]] name = "petgraph" -version = "0.6.4" +version = "0.6.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1d3afd2628e69da2be385eb6f2fd57c8ac7977ceeff6dc166ff1657b0e386a9" +checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" dependencies = [ "fixedbitset", "indexmap 2.2.6", @@ -7313,7 +7313,7 @@ dependencies = [ "parking_lot 0.12.2", "prost 0.12.4", "prost-build 0.12.4", - "prost-derive 0.12.4", + "prost-derive 0.12.5", "protobuf", "sha2", "smallvec", @@ -7386,9 +7386,9 @@ dependencies = [ [[package]] name = "prettyplease" -version = "0.2.19" +version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ac2cf0f2e4f42b49f5ffd07dae8d746508ef7526c13940e5f524012ae6c6550" +checksum = "5f12335488a2f3b0a83b14edad48dca9879ce89b2edd10e80237e4e852dd645e" dependencies = [ "proc-macro2", "syn 2.0.61", @@ -7439,9 +7439,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.81" +version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d1597b0c024618f09a9c3b8655b7e430397a36d23fdafec26d6965e9eec3eba" +checksum = "8ad3d49ab951a01fbaafe34f2ec74122942fe18a3f9814c3268f1bb72042131b" dependencies = [ "unicode-ident", ] @@ -7561,7 +7561,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0f5d036824e4761737860779c906171497f6d55681139d8312388f8fe398922" dependencies = [ "bytes", - "prost-derive 0.12.4", + "prost-derive 0.12.5", ] [[package]] @@ -7599,7 +7599,7 @@ dependencies = [ "multimap 0.10.0", "once_cell", "petgraph", - "prettyplease 0.2.19", + "prettyplease 0.2.20", "prost 0.12.4", "prost-types 0.12.4", "regex", @@ -7622,9 +7622,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.12.4" +version = "0.12.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19de2de2a00075bf566bee3bd4db014b11587e84184d3f7a791bc17f1a8e9e48" +checksum = "9554e3ab233f0a932403704f1a1d08c30d5ccd931adfdfa1e8b5a19b52c1d55a" dependencies = [ "anyhow", "itertools 0.12.1", @@ -8087,18 +8087,18 @@ dependencies = [ [[package]] name = "ref-cast" -version = "1.0.22" +version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4846d4c50d1721b1a3bef8af76924eef20d5e723647333798c1b519b3a9473f" +checksum = "ccf0a6f84d5f1d581da8b41b47ec8600871962f2a528115b542b362d4b744931" dependencies = [ "ref-cast-impl", ] [[package]] name = "ref-cast-impl" -version = "1.0.22" +version = "1.0.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fddb4f8d99b0a2ebafc65a87a69a7b9875e4b1ae1f00db265d300ef7f28bccc" +checksum = "bcc303e793d3734489387d205e9b186fac9c6cfacedd98cbb2e8a5943595f3e6" dependencies = [ "proc-macro2", "quote", @@ -8591,9 +8591,9 @@ dependencies = [ [[package]] name = "rustc-demangle" -version = "0.1.23" +version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" [[package]] name = "rustc-hash" @@ -8694,9 +8694,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.5.0" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "beb461507cee2c2ff151784c52762cf4d9ff6a61f3e80968600ed24fa837fa54" +checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" [[package]] name = "rustls-webpki" @@ -9318,18 +9318,18 @@ checksum = "a3f0bf26fd526d2a95683cd0f87bf103b8539e2ca1ef48ce002d67aad59aa0b4" [[package]] name = "serde" -version = "1.0.200" +version = "1.0.201" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddc6f9cc94d67c0e21aaf7eda3a010fd3af78ebf6e096aa6e2e13c79749cce4f" +checksum = "780f1cebed1629e4753a1a38a3c72d30b97ec044f0aef68cb26650a3c5cf363c" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.200" +version = "1.0.201" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "856f046b9400cee3c8c94ed572ecdb752444c24528c035cd35882aad6f492bcb" +checksum = "c5e405930b9796f1c00bee880d03fc7e0bb4b9a11afc776885ffe84320da2865" dependencies = [ "proc-macro2", "quote", @@ -9349,9 +9349,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.116" +version = "1.0.117" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e17db7126d17feb94eb3fad46bf1a96b034e8aacbc2e775fe81505f8b0b2813" +checksum = "455182ea6142b14f93f4bc5320a2b31c1f266b66a4a5c858b013302a5d8cbfc3" dependencies = [ "indexmap 2.2.6", "itoa", @@ -10310,7 +10310,7 @@ checksum = "f1e8440a1c9b95a7c9a00a19f78b980749e8c945eb880687a5d673cea83729c5" dependencies = [ "git2", "heck 0.4.1", - "prettyplease 0.2.19", + "prettyplease 0.2.20", "prost 0.12.4", "prost-build 0.12.4", "prost-types 0.12.4", @@ -10331,7 +10331,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba959c71b2a1a341a94e1f362615d7e5f1a4de9d25d82fceea8160f79f1e1dfb" dependencies = [ "heck 0.5.0", - "prettyplease 0.2.19", + "prettyplease 0.2.20", "prost 0.12.4", "prost-build 0.12.4", "prost-types 0.12.4", @@ -11189,7 +11189,7 @@ version = "0.10.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d021fc044c18582b9a2408cd0dd05b1596e3ecdb5c4df822bb0183545683889" dependencies = [ - "prettyplease 0.2.19", + "prettyplease 0.2.20", "proc-macro2", "prost-build 0.12.4", "quote", @@ -11202,7 +11202,7 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be4ef6dd70a610078cb4e338a0f79d06bc759ff1b22d2120c2ff02ae264ba9c2" dependencies = [ - "prettyplease 0.2.19", + "prettyplease 0.2.20", "proc-macro2", "prost-build 0.12.4", "quote", @@ -11249,7 +11249,7 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" dependencies = [ - "async-compression 0.4.9", + "async-compression 0.4.10", "base64 0.21.7", "bitflags 2.5.0", "bytes", @@ -12605,18 +12605,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.7.33" +version = "0.7.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "087eca3c1eaf8c47b94d02790dd086cd594b912d2043d4de4bfdd466b3befb7c" +checksum = "ae87e3fcd617500e5d106f0380cf7b77f3c6092aae37191433159dda23cfb087" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.7.33" +version = "0.7.34" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f4b6c273f496d8fd4eaf18853e6b448760225dc030ff2c485a786859aea6393" +checksum = "15e934569e47891f7d9411f1a451d947a60e000ab3bd24fbb970f000387d1b3b" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 08ea98ff349..1ed5e93ac3d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -117,7 +117,7 @@ etcd-client = { git = "https://github.com/MichaelScofield/etcd-client.git", rev fst = "0.4.7" futures = "0.3" futures-util = "0.3" -greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "219b2409bb701f75b43fc0ba64967d2ed8e75491" } +greptime-proto = { git = "https://github.com/killme2008/greptime-proto.git", rev = "57e186d572c6c5898e90d6ab9e91b0867c30d1da" } humantime = "2.1" humantime-serde = "1.1" itertools = "0.10" diff --git a/src/api/src/helper.rs b/src/api/src/helper.rs index bf7786bcaf6..6a51aa71fe4 100644 --- a/src/api/src/helper.rs +++ b/src/api/src/helper.rs @@ -480,6 +480,8 @@ fn ddl_request_type(request: &DdlRequest) -> &'static str { Some(Expr::TruncateTable(_)) => "ddl.truncate_table", Some(Expr::CreateFlow(_)) => "ddl.create_flow", Some(Expr::DropFlow(_)) => "ddl.drop_flow", + Some(Expr::CreateView(_)) => "ddl.create_view", + Some(Expr::DropView(_)) => "ddl.drop_view", None => "ddl.empty", } } diff --git a/src/common/meta/src/ddl.rs b/src/common/meta/src/ddl.rs index a922ce02d91..8d8cb8d5a45 100644 --- a/src/common/meta/src/ddl.rs +++ b/src/common/meta/src/ddl.rs @@ -38,6 +38,7 @@ pub mod create_flow; pub mod create_logical_tables; pub mod create_table; mod create_table_template; +pub mod create_view; pub mod drop_database; pub mod drop_flow; pub mod drop_table; @@ -89,6 +90,7 @@ pub struct TableMetadataAllocatorContext { } /// Metadata allocated to a table. +#[derive(Default)] pub struct TableMetadata { /// Table id. pub table_id: TableId, diff --git a/src/common/meta/src/ddl/create_logical_tables/metadata.rs b/src/common/meta/src/ddl/create_logical_tables/metadata.rs index 2d61719d391..89f5c374afb 100644 --- a/src/common/meta/src/ddl/create_logical_tables/metadata.rs +++ b/src/common/meta/src/ddl/create_logical_tables/metadata.rs @@ -43,7 +43,7 @@ impl CreateLogicalTablesProcedure { } else { self.context .table_metadata_allocator - .allocate_table_id(task) + .allocate_table_id(&task.create_table.table_id) .await? }; task.set_table_id(table_id); diff --git a/src/common/meta/src/ddl/create_view.rs b/src/common/meta/src/ddl/create_view.rs new file mode 100644 index 00000000000..c9163dd5cab --- /dev/null +++ b/src/common/meta/src/ddl/create_view.rs @@ -0,0 +1,276 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use async_trait::async_trait; +use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu}; +use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status}; +use common_telemetry::info; +use serde::{Deserialize, Serialize}; +use snafu::{ensure, OptionExt, ResultExt}; +use strum::AsRefStr; +use table::metadata::{RawTableInfo, TableId, TableType}; +use table::table_reference::TableReference; + +use crate::ddl::utils::handle_retry_error; +use crate::ddl::{DdlContext, TableMetadata, TableMetadataAllocatorContext}; +use crate::error::{self, Result}; +use crate::key::table_name::TableNameKey; +use crate::lock_key::{CatalogLock, SchemaLock, TableNameLock}; +use crate::rpc::ddl::CreateViewTask; +use crate::{metrics, ClusterId}; + +// The proceudure to execute `[CreateViewTask]`. +pub struct CreateViewProcedure { + pub context: DdlContext, + pub creator: ViewCreator, +} + +impl CreateViewProcedure { + pub const TYPE_NAME: &'static str = "metasrv-procedure::CreateView"; + + pub fn new(cluster_id: ClusterId, task: CreateViewTask, context: DdlContext) -> Self { + Self { + context, + creator: ViewCreator::new(cluster_id, task), + } + } + + pub fn from_json(json: &str, context: DdlContext) -> ProcedureResult { + let data = serde_json::from_str(json).context(FromJsonSnafu)?; + + let creator = ViewCreator { data }; + + Ok(CreateViewProcedure { context, creator }) + } + + fn view_info(&self) -> &RawTableInfo { + &self.creator.data.task.view_info + } + + fn need_update(&self) -> bool { + self.creator.data.need_update + } + + pub(crate) fn view_id(&self) -> TableId { + self.view_info().ident.table_id + } + + #[cfg(any(test, feature = "testing"))] + pub fn set_allocated_metadata(&mut self, view_id: TableId) { + self.creator.set_allocated_metadata(view_id, false) + } + + /// On the prepare step, it performs: + /// - Checks whether the view exists. + /// - Allocates the view id. + /// + /// Abort(non-retry): + /// - ViewName exists and `create_if_not_exists` is false. + /// - Failed to allocate [ViewMetadata]. + pub(crate) async fn on_prepare(&mut self) -> Result { + let expr = &self.creator.data.task.create_view; + let view_name_value = self + .context + .table_metadata_manager + .table_name_manager() + .get(TableNameKey::new( + &expr.catalog_name, + &expr.schema_name, + &expr.view_name, + )) + .await?; + + // If `view_id` is None, creating the new view, + // otherwise: + // - replaces the exists one when `or_replace` is true. + // - returns the exists one when `create_if_not_exists` is true. + // - throws the `[ViewAlreadyExistsSnafu]` error. + let mut view_id = None; + + if let Some(value) = view_name_value { + ensure!( + expr.create_if_not_exists || expr.or_replace, + error::ViewAlreadyExistsSnafu { + view_name: self.creator.data.table_ref().to_string(), + } + ); + + let exists_view_id = value.table_id(); + + if !expr.or_replace { + return Ok(Status::done_with_output(exists_view_id)); + } + view_id = Some(exists_view_id); + } + + if let Some(view_id) = view_id { + let view_info_value = self + .context + .table_metadata_manager + .table_info_manager() + .get(view_id) + .await? + .with_context(|| error::TableInfoNotFoundSnafu { + table: self.creator.data.table_ref().to_string(), + })?; + + // Ensure the exists one is view, we can't replace a table. + ensure!( + view_info_value.table_info.table_type == TableType::View, + error::TableAlreadyExistsSnafu { + table_name: self.creator.data.table_ref().to_string(), + } + ); + + self.creator.set_allocated_metadata(view_id, true); + } else { + // Allocate the new `view_id`. + let TableMetadata { table_id, .. } = self + .context + .table_metadata_allocator + .create_view( + &TableMetadataAllocatorContext { + cluster_id: self.creator.data.cluster_id, + }, + &None, + ) + .await?; + self.creator.set_allocated_metadata(table_id, false); + } + + self.creator.data.state = CreateViewState::CreateMetadata; + + Ok(Status::executing(true)) + } + + /// Creates view metadata + /// + /// Abort(not-retry): + /// - Failed to create view metadata. + async fn on_create_metadata(&mut self, ctx: &ProcedureContext) -> Result { + let view_id = self.view_id(); + let manager = &self.context.table_metadata_manager; + + if self.need_update() { + // Retrieve the current view info and try to update it. + let current_view_info = manager + .view_info_manager() + .get(view_id) + .await? + .with_context(|| error::ViewNotFoundSnafu { + view_name: self.creator.data.table_ref().to_string(), + })?; + let new_logical_plan = self.creator.data.task.raw_logical_plan().clone(); + manager + .update_view_info(view_id, ¤t_view_info, new_logical_plan) + .await?; + + info!("Updated view metadata for view {view_id}"); + } else { + let raw_view_info = self.view_info().clone(); + manager + .create_view_metadata(raw_view_info, self.creator.data.task.raw_logical_plan()) + .await?; + + info!( + "Created view metadata for view {view_id} with procedure: {}", + ctx.procedure_id + ); + } + + Ok(Status::done_with_output(view_id)) + } +} + +#[async_trait] +impl Procedure for CreateViewProcedure { + fn type_name(&self) -> &str { + Self::TYPE_NAME + } + + async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult { + let state = &self.creator.data.state; + + let _timer = metrics::METRIC_META_PROCEDURE_CREATE_VIEW + .with_label_values(&[state.as_ref()]) + .start_timer(); + + match state { + CreateViewState::Prepare => self.on_prepare().await, + CreateViewState::CreateMetadata => self.on_create_metadata(ctx).await, + } + .map_err(handle_retry_error) + } + + fn dump(&self) -> ProcedureResult { + serde_json::to_string(&self.creator.data).context(ToJsonSnafu) + } + + fn lock_key(&self) -> LockKey { + let table_ref = &self.creator.data.table_ref(); + + LockKey::new(vec![ + CatalogLock::Read(table_ref.catalog).into(), + SchemaLock::read(table_ref.catalog, table_ref.schema).into(), + TableNameLock::new(table_ref.catalog, table_ref.schema, table_ref.table).into(), + ]) + } +} + +/// The VIEW creator +pub struct ViewCreator { + /// The serializable data. + pub data: CreateViewData, +} + +impl ViewCreator { + pub fn new(cluster_id: u64, task: CreateViewTask) -> Self { + Self { + data: CreateViewData { + state: CreateViewState::Prepare, + cluster_id, + task, + need_update: false, + }, + } + } + + fn set_allocated_metadata(&mut self, view_id: TableId, need_update: bool) { + self.data.task.view_info.ident.table_id = view_id; + self.data.need_update = need_update; + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, AsRefStr, PartialEq)] +pub enum CreateViewState { + /// Prepares to create the table + Prepare, + /// Creates metadata + CreateMetadata, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct CreateViewData { + pub state: CreateViewState, + pub task: CreateViewTask, + pub cluster_id: ClusterId, + /// Whether to update the view info. + pub need_update: bool, +} + +impl CreateViewData { + fn table_ref(&self) -> TableReference<'_> { + self.task.table_ref() + } +} diff --git a/src/common/meta/src/ddl/table_meta.rs b/src/common/meta/src/ddl/table_meta.rs index 3d4e5113acb..2b55315ec4c 100644 --- a/src/common/meta/src/ddl/table_meta.rs +++ b/src/common/meta/src/ddl/table_meta.rs @@ -62,8 +62,11 @@ impl TableMetadataAllocator { } } - pub(crate) async fn allocate_table_id(&self, task: &CreateTableTask) -> Result { - let table_id = if let Some(table_id) = &task.create_table.table_id { + pub(crate) async fn allocate_table_id( + &self, + table_id: &Option, + ) -> Result { + let table_id = if let Some(table_id) = table_id { let table_id = table_id.id; ensure!( @@ -143,12 +146,26 @@ impl TableMetadataAllocator { Ok(PhysicalTableRouteValue::new(region_routes)) } + /// Create VIEW metadata + pub async fn create_view( + &self, + _ctx: &TableMetadataAllocatorContext, + table_id: &Option, + ) -> Result { + let table_id = self.allocate_table_id(table_id).await?; + + Ok(TableMetadata { + table_id, + ..Default::default() + }) + } + pub async fn create( &self, ctx: &TableMetadataAllocatorContext, task: &CreateTableTask, ) -> Result { - let table_id = self.allocate_table_id(task).await?; + let table_id = self.allocate_table_id(&task.create_table.table_id).await?; let table_route = self.create_table_route(ctx, table_id, task).await?; let region_wal_options = self.create_wal_options(&table_route)?; diff --git a/src/common/meta/src/ddl/tests.rs b/src/common/meta/src/ddl/tests.rs index 46019d8c25d..3c550883ffc 100644 --- a/src/common/meta/src/ddl/tests.rs +++ b/src/common/meta/src/ddl/tests.rs @@ -17,6 +17,7 @@ mod alter_table; mod create_flow; mod create_logical_tables; mod create_table; +mod create_view; mod drop_database; mod drop_flow; mod drop_table; diff --git a/src/common/meta/src/ddl/tests/create_table.rs b/src/common/meta/src/ddl/tests/create_table.rs index f9c464be0ab..b2756ceb405 100644 --- a/src/common/meta/src/ddl/tests/create_table.rs +++ b/src/common/meta/src/ddl/tests/create_table.rs @@ -40,7 +40,7 @@ use crate::kv_backend::memory::MemoryKvBackend; use crate::rpc::ddl::CreateTableTask; use crate::test_util::{new_ddl_context, new_ddl_context_with_kv_backend, MockDatanodeManager}; -fn test_create_table_task(name: &str) -> CreateTableTask { +pub(crate) fn test_create_table_task(name: &str) -> CreateTableTask { let create_table = TestCreateTableExprBuilder::default() .column_defs([ TestColumnDefBuilder::default() diff --git a/src/common/meta/src/ddl/tests/create_view.rs b/src/common/meta/src/ddl/tests/create_view.rs new file mode 100644 index 00000000000..693faddeb3f --- /dev/null +++ b/src/common/meta/src/ddl/tests/create_view.rs @@ -0,0 +1,227 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::assert_matches::assert_matches; +use std::sync::Arc; + +use api::v1::CreateViewExpr; +use common_error::ext::ErrorExt; +use common_error::status_code::StatusCode; +use common_procedure::{Context as ProcedureContext, Procedure, ProcedureId, Status}; +use common_procedure_test::MockContextProvider; +use table::metadata; +use table::metadata::{RawTableInfo, RawTableMeta, TableType}; + +use crate::ddl::create_table::CreateTableProcedure; +use crate::ddl::create_view::CreateViewProcedure; +use crate::ddl::test_util::datanode_handler::NaiveDatanodeHandler; +use crate::ddl::tests::create_table::test_create_table_task; +use crate::error::Error; +use crate::rpc::ddl::CreateViewTask; +use crate::test_util::{new_ddl_context, MockDatanodeManager}; + +fn test_create_view_task(name: &str) -> CreateViewTask { + let expr = CreateViewExpr { + catalog_name: "greptime".to_string(), + schema_name: "public".to_string(), + view_name: name.to_string(), + or_replace: false, + create_if_not_exists: false, + logical_plan: vec![1, 2, 3], + }; + + let view_info = RawTableInfo { + ident: metadata::TableIdent { + table_id: 0, + version: 0, + }, + name: expr.view_name.clone(), + desc: None, + catalog_name: expr.catalog_name.clone(), + schema_name: expr.schema_name.clone(), + meta: RawTableMeta::default(), + table_type: TableType::View, + }; + + CreateViewTask { + create_view: expr, + view_info, + } +} + +#[tokio::test] +async fn test_on_prepare_view_exists_err() { + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); + let cluster_id = 1; + let task = test_create_view_task("foo"); + assert!(!task.create_view.create_if_not_exists); + // Puts a value to table name key. + ddl_context + .table_metadata_manager + .create_view_metadata(task.view_info.clone(), &task.create_view.logical_plan) + .await + .unwrap(); + let mut procedure = CreateViewProcedure::new(cluster_id, task, ddl_context); + let err = procedure.on_prepare().await.unwrap_err(); + assert_matches!(err, Error::ViewAlreadyExists { .. }); + assert_eq!(err.status_code(), StatusCode::TableAlreadyExists); +} + +#[tokio::test] +async fn test_on_prepare_with_create_if_view_exists() { + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); + let cluster_id = 1; + let mut task = test_create_view_task("foo"); + task.create_view.create_if_not_exists = true; + task.view_info.ident.table_id = 1024; + // Puts a value to table name key. + ddl_context + .table_metadata_manager + .create_view_metadata(task.view_info.clone(), &task.create_view.logical_plan) + .await + .unwrap(); + let mut procedure = CreateViewProcedure::new(cluster_id, task, ddl_context); + let status = procedure.on_prepare().await.unwrap(); + assert_matches!(status, Status::Done { output: Some(..) }); + let table_id = *status.downcast_output_ref::().unwrap(); + assert_eq!(table_id, 1024); +} + +#[tokio::test] +async fn test_on_prepare_without_create_if_table_exists() { + let node_manager = Arc::new(MockDatanodeManager::new(())); + let ddl_context = new_ddl_context(node_manager); + let cluster_id = 1; + let mut task = test_create_view_task("foo"); + task.create_view.create_if_not_exists = true; + let mut procedure = CreateViewProcedure::new(cluster_id, task, ddl_context); + let status = procedure.on_prepare().await.unwrap(); + assert_matches!(status, Status::Executing { persist: true }); + assert_eq!(procedure.view_id(), 1024); +} + +#[tokio::test] +async fn test_on_create_metadata() { + common_telemetry::init_default_ut_logging(); + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager); + let cluster_id = 1; + let task = test_create_view_task("foo"); + assert!(!task.create_view.create_if_not_exists); + let mut procedure = CreateViewProcedure::new(cluster_id, task, ddl_context); + procedure.on_prepare().await.unwrap(); + let ctx = ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + }; + // Triggers procedure to create view metadata + let status = procedure.execute(&ctx).await.unwrap(); + let view_id = status.downcast_output_ref::().unwrap(); + assert_eq!(*view_id, 1024); +} + +#[tokio::test] +async fn test_replace_view_metadata() { + common_telemetry::init_default_ut_logging(); + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager.clone()); + let cluster_id = 1; + let task = test_create_view_task("foo"); + assert!(!task.create_view.create_if_not_exists); + let mut procedure = CreateViewProcedure::new(cluster_id, task.clone(), ddl_context.clone()); + procedure.on_prepare().await.unwrap(); + let ctx = ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + }; + // Triggers procedure to create view metadata + let status = procedure.execute(&ctx).await.unwrap(); + let view_id = status.downcast_output_ref::().unwrap(); + assert_eq!(*view_id, 1024); + + let current_view_info = ddl_context + .table_metadata_manager + .view_info_manager() + .get(*view_id) + .await + .unwrap() + .unwrap(); + + assert_eq!(current_view_info.view_info, vec![1, 2, 3]); + + // Create new task to replace the exists one. + let mut task = test_create_view_task("foo"); + // The view already exists, prepare should fail + { + let mut procedure = CreateViewProcedure::new(cluster_id, task.clone(), ddl_context.clone()); + let err = procedure.on_prepare().await.unwrap_err(); + assert_matches!(err, Error::ViewAlreadyExists { .. }); + assert_eq!(err.status_code(), StatusCode::TableAlreadyExists); + } + + // Set `or_replce` to be `true` and try again + task.create_view.or_replace = true; + task.create_view.logical_plan = vec![4, 5, 6]; + let mut procedure = CreateViewProcedure::new(cluster_id, task, ddl_context.clone()); + procedure.on_prepare().await.unwrap(); + let ctx = ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + }; + // Triggers procedure to replace view metadata, but the view_id is unchanged. + let status = procedure.execute(&ctx).await.unwrap(); + let view_id = status.downcast_output_ref::().unwrap(); + assert_eq!(*view_id, 1024); + + let current_view_info = ddl_context + .table_metadata_manager + .view_info_manager() + .get(*view_id) + .await + .unwrap() + .unwrap(); + + assert_eq!(current_view_info.view_info, vec![4, 5, 6]); +} + +#[tokio::test] +async fn test_replace_table() { + common_telemetry::init_default_ut_logging(); + let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler)); + let ddl_context = new_ddl_context(node_manager.clone()); + let cluster_id = 1; + + { + // Create a `foo` table. + let task = test_create_table_task("foo"); + let mut procedure = CreateTableProcedure::new(cluster_id, task, ddl_context.clone()); + procedure.on_prepare().await.unwrap(); + let ctx = ProcedureContext { + procedure_id: ProcedureId::random(), + provider: Arc::new(MockContextProvider::default()), + }; + procedure.execute(&ctx).await.unwrap(); + procedure.execute(&ctx).await.unwrap(); + } + + // Try to replace a view named `foo` too. + let mut task = test_create_view_task("foo"); + task.create_view.or_replace = true; + let mut procedure = CreateViewProcedure::new(cluster_id, task.clone(), ddl_context.clone()); + let err = procedure.on_prepare().await.unwrap_err(); + assert_matches!(err, Error::TableAlreadyExists { .. }); + assert_eq!(err.status_code(), StatusCode::TableAlreadyExists); +} diff --git a/src/common/meta/src/ddl_manager.rs b/src/common/meta/src/ddl_manager.rs index 2f19faa47d8..03cd00d13f5 100644 --- a/src/common/meta/src/ddl_manager.rs +++ b/src/common/meta/src/ddl_manager.rs @@ -29,6 +29,7 @@ use crate::ddl::create_database::CreateDatabaseProcedure; use crate::ddl::create_flow::CreateFlowProcedure; use crate::ddl::create_logical_tables::CreateLogicalTablesProcedure; use crate::ddl::create_table::CreateTableProcedure; +use crate::ddl::create_view::CreateViewProcedure; use crate::ddl::drop_database::DropDatabaseProcedure; use crate::ddl::drop_flow::DropFlowProcedure; use crate::ddl::drop_table::DropTableProcedure; @@ -45,12 +46,12 @@ use crate::key::table_name::TableNameKey; use crate::key::{DeserializedValueWithBytes, TableMetadataManagerRef}; use crate::rpc::ddl::DdlTask::{ AlterLogicalTables, AlterTable, CreateDatabase, CreateFlow, CreateLogicalTables, CreateTable, - DropDatabase, DropFlow, DropLogicalTables, DropTable, TruncateTable, + CreateView, DropDatabase, DropFlow, DropLogicalTables, DropTable, DropView, TruncateTable, }; use crate::rpc::ddl::{ - AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask, DropDatabaseTask, - DropFlowTask, DropTableTask, QueryContext, SubmitDdlTaskRequest, SubmitDdlTaskResponse, - TruncateTableTask, + AlterTableTask, CreateDatabaseTask, CreateFlowTask, CreateTableTask, CreateViewTask, + DropDatabaseTask, DropFlowTask, DropTableTask, QueryContext, SubmitDdlTaskRequest, + SubmitDdlTaskResponse, TruncateTableTask, }; use crate::rpc::procedure; use crate::rpc::procedure::{MigrateRegionRequest, MigrateRegionResponse, ProcedureStateResponse}; @@ -122,6 +123,7 @@ impl DdlManager { let loaders: Vec<(&str, &BoxedProcedureLoaderFactory)> = procedure_loader!( CreateTableProcedure, CreateLogicalTablesProcedure, + CreateViewProcedure, CreateFlowProcedure, AlterTableProcedure, AlterLogicalTablesProcedure, @@ -142,8 +144,8 @@ impl DdlManager { Ok(()) } - #[tracing::instrument(skip_all)] /// Submits and executes an alter table task. + #[tracing::instrument(skip_all)] pub async fn submit_alter_table_task( &self, cluster_id: ClusterId, @@ -159,8 +161,8 @@ impl DdlManager { self.submit_procedure(procedure_with_id).await } - #[tracing::instrument(skip_all)] /// Submits and executes a create table task. + #[tracing::instrument(skip_all)] pub async fn submit_create_table_task( &self, cluster_id: ClusterId, @@ -175,8 +177,24 @@ impl DdlManager { self.submit_procedure(procedure_with_id).await } + /// Submits and executes a `[CreateViewTask]`. #[tracing::instrument(skip_all)] + pub async fn submit_create_view_task( + &self, + cluster_id: ClusterId, + create_view_task: CreateViewTask, + ) -> Result<(ProcedureId, Option)> { + let context = self.create_context(); + + let procedure = CreateViewProcedure::new(cluster_id, create_view_task, context); + + let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure)); + + self.submit_procedure(procedure_with_id).await + } + /// Submits and executes a create multiple logical table tasks. + #[tracing::instrument(skip_all)] pub async fn submit_create_logical_table_tasks( &self, cluster_id: ClusterId, @@ -197,8 +215,8 @@ impl DdlManager { self.submit_procedure(procedure_with_id).await } - #[tracing::instrument(skip_all)] /// Submits and executes alter multiple table tasks. + #[tracing::instrument(skip_all)] pub async fn submit_alter_logical_table_tasks( &self, cluster_id: ClusterId, @@ -219,8 +237,8 @@ impl DdlManager { self.submit_procedure(procedure_with_id).await } - #[tracing::instrument(skip_all)] /// Submits and executes a drop table task. + #[tracing::instrument(skip_all)] pub async fn submit_drop_table_task( &self, cluster_id: ClusterId, @@ -235,8 +253,8 @@ impl DdlManager { self.submit_procedure(procedure_with_id).await } - #[tracing::instrument(skip_all)] /// Submits and executes a create database task. + #[tracing::instrument(skip_all)] pub async fn submit_create_database( &self, _cluster_id: ClusterId, @@ -255,8 +273,8 @@ impl DdlManager { self.submit_procedure(procedure_with_id).await } - #[tracing::instrument(skip_all)] /// Submits and executes a drop table task. + #[tracing::instrument(skip_all)] pub async fn submit_drop_database( &self, _cluster_id: ClusterId, @@ -273,8 +291,8 @@ impl DdlManager { self.submit_procedure(procedure_with_id).await } - #[tracing::instrument(skip_all)] /// Submits and executes a create flow task. + #[tracing::instrument(skip_all)] pub async fn submit_create_flow_task( &self, cluster_id: ClusterId, @@ -302,8 +320,8 @@ impl DdlManager { self.submit_procedure(procedure_with_id).await } - #[tracing::instrument(skip_all)] /// Submits and executes a truncate table task. + #[tracing::instrument(skip_all)] pub async fn submit_truncate_table_task( &self, cluster_id: ClusterId, @@ -649,6 +667,34 @@ async fn handle_alter_logical_table_tasks( }) } +/// Handle the `[CreateViewTask]` and returns the DDL response when success. +async fn handle_create_view_task( + ddl_manager: &DdlManager, + cluster_id: ClusterId, + create_view_task: CreateViewTask, +) -> Result { + let (id, output) = ddl_manager + .submit_create_view_task(cluster_id, create_view_task) + .await?; + + let procedure_id = id.to_string(); + let output = output.context(ProcedureOutputSnafu { + procedure_id: &procedure_id, + err_msg: "empty output", + })?; + let view_id = *(output.downcast_ref::().context(ProcedureOutputSnafu { + procedure_id: &procedure_id, + err_msg: "downcast to `u32`", + })?); + info!("View: {view_id} is created via procedure_id {id:?}"); + + Ok(SubmitDdlTaskResponse { + key: procedure_id.into(), + table_id: Some(view_id), + ..Default::default() + }) +} + /// TODO(dennis): let [`DdlManager`] implement [`ProcedureExecutor`] looks weird, find some way to refactor it. #[async_trait::async_trait] impl ProcedureExecutor for DdlManager { @@ -704,6 +750,12 @@ impl ProcedureExecutor for DdlManager { DropFlow(drop_flow_task) => { handle_drop_flow_task(self, cluster_id, drop_flow_task).await } + CreateView(create_view_task) => { + handle_create_view_task(self, cluster_id, create_view_task).await + } + DropView(_create_view_task) => { + todo!("implemented in the following PR"); + } } } .trace(span) diff --git a/src/common/meta/src/error.rs b/src/common/meta/src/error.rs index 02b80edc15f..fdd130f8c8d 100644 --- a/src/common/meta/src/error.rs +++ b/src/common/meta/src/error.rs @@ -313,6 +313,13 @@ pub enum Error { location: Location, }, + #[snafu(display("View already exists, view: {}", view_name))] + ViewAlreadyExists { + view_name: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Flow already exists: {}", flow_name))] FlowAlreadyExists { flow_name: String, @@ -350,6 +357,13 @@ pub enum Error { location: Location, }, + #[snafu(display("View not found: '{}'", view_name))] + ViewNotFound { + view_name: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Flow not found: '{}'", flow_name))] FlowNotFound { flow_name: String, @@ -378,6 +392,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Invalid view info, err: {}", err_msg))] + InvalidViewInfo { + err_msg: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to get kv cache, err: {}", err_msg))] GetKvCache { err_msg: String }, @@ -638,6 +659,7 @@ impl ErrorExt for Error { | RouteInfoCorrupted { .. } | InvalidProtoMsg { .. } | InvalidTableMetadata { .. } + | InvalidViewInfo { .. } | MoveRegion { .. } | Unexpected { .. } | TableInfoNotFound { .. } @@ -688,8 +710,8 @@ impl ErrorExt for Error { FlowNotFound { .. } => StatusCode::FlowNotFound, FlowAlreadyExists { .. } => StatusCode::FlowAlreadyExists, - TableNotFound { .. } => StatusCode::TableNotFound, - TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, + ViewNotFound { .. } | TableNotFound { .. } => StatusCode::TableNotFound, + ViewAlreadyExists { .. } | TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, SubmitProcedure { source, .. } | QueryProcedure { source, .. } diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 7fb1fedeff6..9090eb075f3 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -48,6 +48,9 @@ //! 9. Table flow key: `__flow/source_table/{table_id}/{flownode_id}/{flow_id}/{partition_id}` //! - Mapping source table's {table_id} to {flownode_id} //! - Used in `Flownode` booting. +//! 10. View info key: `__view_info/{view_id}` +//! - The value is a [ViewInfoValue] struct; it contains the encoded logical plan. +//! - This key is mainly used in constructing the view in Datanode and Frontend. //! //! All keys have related managers. The managers take care of the serialization and deserialization //! of keys and values, and the interaction with the underlying KV store backend. @@ -89,6 +92,7 @@ pub mod table_name; // TODO(weny): removes it. #[allow(deprecated)] pub mod table_region; +pub mod view_info; // TODO(weny): removes it. #[allow(deprecated)] pub mod table_route; @@ -117,6 +121,7 @@ use store_api::storage::RegionNumber; use table::metadata::{RawTableInfo, TableId}; use table_info::{TableInfoKey, TableInfoManager, TableInfoValue}; use table_name::{TableNameKey, TableNameManager, TableNameValue}; +use view_info::{ViewInfoKey, ViewInfoManager, ViewInfoValue}; use self::catalog_name::{CatalogManager, CatalogNameKey, CatalogNameValue}; use self::datanode_table::RegionInfo; @@ -142,6 +147,7 @@ pub const MAINTENANCE_KEY: &str = "maintenance"; const DATANODE_TABLE_KEY_PREFIX: &str = "__dn_table"; const TABLE_REGION_KEY_PREFIX: &str = "__table_region"; pub const TABLE_INFO_KEY_PREFIX: &str = "__table_info"; +pub const VIEW_INFO_KEY_PREFIX: &str = "__view_info"; pub const TABLE_NAME_KEY_PREFIX: &str = "__table_name"; pub const CATALOG_NAME_KEY_PREFIX: &str = "__catalog_name"; pub const SCHEMA_NAME_KEY_PREFIX: &str = "__schema_name"; @@ -166,6 +172,11 @@ lazy_static! { Regex::new(&format!("^{TABLE_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap(); } +lazy_static! { + static ref VIEW_INFO_KEY_PATTERN: Regex = + Regex::new(&format!("^{VIEW_INFO_KEY_PREFIX}/([0-9]+)$")).unwrap(); +} + lazy_static! { static ref TABLE_ROUTE_KEY_PATTERN: Regex = Regex::new(&format!("^{TABLE_ROUTE_PREFIX}/([0-9]+)$")).unwrap(); @@ -188,7 +199,7 @@ lazy_static! { static ref CATALOG_NAME_KEY_PATTERN: Regex = Regex::new(&format!( "^{CATALOG_NAME_KEY_PREFIX}/({NAME_PATTERN})$" )) - .unwrap(); + .unwrap(); } lazy_static! { @@ -196,7 +207,7 @@ lazy_static! { static ref SCHEMA_NAME_KEY_PATTERN:Regex=Regex::new(&format!( "^{SCHEMA_NAME_KEY_PREFIX}/({NAME_PATTERN})/({NAME_PATTERN})$" )) - .unwrap(); + .unwrap(); } /// The key of metadata. @@ -247,6 +258,7 @@ pub type TableMetadataManagerRef = Arc; pub struct TableMetadataManager { table_name_manager: TableNameManager, table_info_manager: TableInfoManager, + view_info_manager: ViewInfoManager, datanode_table_manager: DatanodeTableManager, catalog_manager: CatalogManager, schema_manager: SchemaManager, @@ -390,6 +402,7 @@ impl TableMetadataManager { TableMetadataManager { table_name_manager: TableNameManager::new(kv_backend.clone()), table_info_manager: TableInfoManager::new(kv_backend.clone()), + view_info_manager: ViewInfoManager::new(kv_backend.clone()), datanode_table_manager: DatanodeTableManager::new(kv_backend.clone()), catalog_manager: CatalogManager::new(kv_backend.clone()), schema_manager: SchemaManager::new(kv_backend.clone()), @@ -427,6 +440,10 @@ impl TableMetadataManager { &self.table_info_manager } + pub fn view_info_manager(&self) -> &ViewInfoManager { + &self.view_info_manager + } + pub fn datanode_table_manager(&self) -> &DatanodeTableManager { &self.datanode_table_manager } @@ -468,6 +485,69 @@ impl TableMetadataManager { Ok((table_info_value, table_route_value)) } + /// Creates metadata for view and returns an error if different metadata exists. + /// The caller MUST ensure it has the exclusive access to `TableNameKey`. + pub async fn create_view_metadata( + &self, + view_info: RawTableInfo, + raw_logical_plan: &Vec, + ) -> Result<()> { + let view_id = view_info.ident.table_id; + + // Creates view name. + let view_name = TableNameKey::new( + &view_info.catalog_name, + &view_info.schema_name, + &view_info.name, + ); + let create_table_name_txn = self + .table_name_manager() + .build_create_txn(&view_name, view_id)?; + + // Creates table info. + let table_info_value = TableInfoValue::new(view_info); + + let (create_table_info_txn, on_create_table_info_failure) = self + .table_info_manager() + .build_create_txn(view_id, &table_info_value)?; + + // Creates view info + let view_info_value = ViewInfoValue::new(raw_logical_plan); + let (create_view_info_txn, on_create_view_info_failure) = self + .view_info_manager() + .build_create_txn(view_id, &view_info_value)?; + + let txn = Txn::merge_all(vec![ + create_table_name_txn, + create_table_info_txn, + create_view_info_txn, + ]); + + let mut r = self.kv_backend.txn(txn).await?; + + // Checks whether metadata was already created. + if !r.succeeded { + let mut set = TxnOpGetResponseSet::from(&mut r.responses); + let remote_table_info = on_create_table_info_failure(&mut set)? + .context(error::UnexpectedSnafu { + err_msg: "Reads the empty table info during the create table metadata", + })? + .into_inner(); + + let remote_view_info = on_create_view_info_failure(&mut set)? + .context(error::UnexpectedSnafu { + err_msg: "Reads the empty view info during the create view info", + })? + .into_inner(); + + let op_name = "the creating view metadata"; + ensure_values!(remote_table_info, table_info_value, op_name); + ensure_values!(remote_view_info, view_info_value, op_name); + } + + Ok(()) + } + /// Creates metadata for table and returns an error if different metadata exists. /// The caller MUST ensure it has the exclusive access to `TableNameKey`. pub async fn create_table_metadata( @@ -817,6 +897,37 @@ impl TableMetadataManager { Ok(()) } + /// Updates view info and returns an error if different metadata exists. + pub async fn update_view_info( + &self, + view_id: TableId, + current_view_info_value: &DeserializedValueWithBytes, + new_view_info: Vec, + ) -> Result<()> { + let new_view_info_value = current_view_info_value.update(new_view_info); + + // Updates view info. + let (update_view_info_txn, on_update_view_info_failure) = self + .view_info_manager() + .build_update_txn(view_id, current_view_info_value, &new_view_info_value)?; + + let mut r = self.kv_backend.txn(update_view_info_txn).await?; + + // Checks whether metadata was already updated. + if !r.succeeded { + let mut set = TxnOpGetResponseSet::from(&mut r.responses); + let remote_view_info = on_update_view_info_failure(&mut set)? + .context(error::UnexpectedSnafu { + err_msg: "Reads the empty view info during the updating view info", + })? + .into_inner(); + + let op_name = "the updating view info"; + ensure_values!(remote_view_info, new_view_info_value, op_name); + } + Ok(()) + } + pub fn batch_update_table_info_value_chunk_size(&self) -> usize { self.kv_backend.max_txn_ops() } @@ -1025,6 +1136,7 @@ macro_rules! impl_meta_key_get_txn_op { impl_meta_key_get_txn_op! { TableNameKey<'_>, TableInfoKey, + ViewInfoKey, TableRouteKey, DatanodeTableKey } @@ -1049,6 +1161,7 @@ macro_rules! impl_optional_meta_value { impl_table_meta_value! { TableNameValue, TableInfoValue, + ViewInfoValue, DatanodeTableValue, FlowInfoValue, FlowNameValue @@ -1080,7 +1193,7 @@ mod tests { use crate::key::table_info::TableInfoValue; use crate::key::table_name::TableNameKey; use crate::key::table_route::TableRouteValue; - use crate::key::{DeserializedValueWithBytes, TableMetadataManager}; + use crate::key::{DeserializedValueWithBytes, TableMetadataManager, ViewInfoValue}; use crate::kv_backend::memory::MemoryKvBackend; use crate::peer::Peer; use crate::rpc::router::{region_distribution, Region, RegionRoute, RegionStatus}; @@ -1836,4 +1949,87 @@ mod tests { let kvs = mem_kv.dump(); assert_eq!(kvs, expected_result); } + + #[tokio::test] + async fn test_create_update_view_info() { + let mem_kv = Arc::new(MemoryKvBackend::default()); + let table_metadata_manager = TableMetadataManager::new(mem_kv); + + let view_info: RawTableInfo = new_test_table_info(Vec::::new().into_iter()).into(); + + let view_id = view_info.ident.table_id; + + let logical_plan: Vec = vec![1, 2, 3]; + + // Create metadata + table_metadata_manager + .create_view_metadata(view_info.clone(), &logical_plan) + .await + .unwrap(); + + { + // assert view info + let current_view_info = table_metadata_manager + .view_info_manager() + .get(view_id) + .await + .unwrap() + .unwrap() + .into_inner(); + assert_eq!(current_view_info.view_info, logical_plan); + // assert table info + let current_table_info = table_metadata_manager + .table_info_manager() + .get(view_id) + .await + .unwrap() + .unwrap() + .into_inner(); + assert_eq!(current_table_info.table_info, view_info); + } + + let new_logical_plan: Vec = vec![4, 5, 6]; + let current_view_info_value = + DeserializedValueWithBytes::from_inner(ViewInfoValue::new(&logical_plan)); + // should be ok. + table_metadata_manager + .update_view_info(view_id, ¤t_view_info_value, new_logical_plan.clone()) + .await + .unwrap(); + // if table info was updated, it should be ok. + table_metadata_manager + .update_view_info(view_id, ¤t_view_info_value, new_logical_plan.clone()) + .await + .unwrap(); + + // updated view_info should equal the `new_logical_plan` + let updated_view_info = table_metadata_manager + .view_info_manager() + .get(view_id) + .await + .unwrap() + .unwrap() + .into_inner(); + assert_eq!(updated_view_info.view_info, new_logical_plan); + + let wrong_view_info = logical_plan.clone(); + let wrong_view_info_value = + DeserializedValueWithBytes::from_inner(current_view_info_value.update(wrong_view_info)); + // if the current_view_info_value is wrong, it should return an error. + // The ABA problem. + assert!(table_metadata_manager + .update_view_info(view_id, &wrong_view_info_value, new_logical_plan.clone()) + .await + .is_err()); + + // The view_info is not changed. + let current_view_info = table_metadata_manager + .view_info_manager() + .get(view_id) + .await + .unwrap() + .unwrap() + .into_inner(); + assert_eq!(current_view_info.view_info, new_logical_plan); + } } diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 47b79ea60b5..979d789ab1c 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -56,7 +56,7 @@ pub enum TableRouteValue { Logical(LogicalTableRouteValue), } -#[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] +#[derive(Debug, PartialEq, Serialize, Deserialize, Clone, Default)] pub struct PhysicalTableRouteValue { pub region_routes: Vec, version: u64, diff --git a/src/common/meta/src/key/view_info.rs b/src/common/meta/src/key/view_info.rs new file mode 100644 index 00000000000..98c8a1a7317 --- /dev/null +++ b/src/common/meta/src/key/view_info.rs @@ -0,0 +1,265 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::fmt::Display; + +use serde::{Deserialize, Serialize}; +use snafu::OptionExt; +use table::metadata::TableId; + +use super::VIEW_INFO_KEY_PATTERN; +use crate::error::{InvalidViewInfoSnafu, Result}; +use crate::key::txn_helper::TxnOpGetResponseSet; +use crate::key::{DeserializedValueWithBytes, MetaKey, TableMetaValue, VIEW_INFO_KEY_PREFIX}; +use crate::kv_backend::txn::Txn; +use crate::kv_backend::KvBackendRef; +use crate::rpc::store::BatchGetRequest; + +/// The VIEW logical plan encoded bytes +type RawViewLogicalPlan = Vec; + +/// The key stores the metadata of the view. +/// +/// The layout: `__view_info/{view_id}`. +#[derive(Debug, PartialEq)] +pub struct ViewInfoKey { + view_id: TableId, +} + +impl ViewInfoKey { + /// Returns a new `[ViewInfoKey]`. + pub fn new(view_id: TableId) -> Self { + Self { view_id } + } +} + +impl Display for ViewInfoKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}/{}", VIEW_INFO_KEY_PREFIX, self.view_id) + } +} + +impl<'a> MetaKey<'a, ViewInfoKey> for ViewInfoKey { + fn to_bytes(&self) -> Vec { + self.to_string().into_bytes() + } + + fn from_bytes(bytes: &[u8]) -> Result { + let key = std::str::from_utf8(bytes).map_err(|e| { + InvalidViewInfoSnafu { + err_msg: format!( + "ViewInfoKey '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + let captures = VIEW_INFO_KEY_PATTERN + .captures(key) + .context(InvalidViewInfoSnafu { + err_msg: format!("Invalid ViewInfoKey '{key}'"), + })?; + // Safety: pass the regex check above + let view_id = captures[1].parse::().unwrap(); + Ok(ViewInfoKey { view_id }) + } +} + +/// The VIEW info value that keeps the metadata. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct ViewInfoValue { + pub view_info: RawViewLogicalPlan, + version: u64, +} + +impl ViewInfoValue { + pub fn new(view_info: &RawViewLogicalPlan) -> Self { + Self { + view_info: view_info.clone(), + version: 0, + } + } + + pub(crate) fn update(&self, new_view_info: RawViewLogicalPlan) -> Self { + Self { + view_info: new_view_info, + version: self.version + 1, + } + } +} + +/// The `[ViewInfo]` manager +pub struct ViewInfoManager { + kv_backend: KvBackendRef, +} + +impl ViewInfoManager { + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { kv_backend } + } + + /// Builds a create view info transaction, it expected the `__view_info/{view_id}` wasn't occupied. + pub(crate) fn build_create_txn( + &self, + view_id: TableId, + view_info_value: &ViewInfoValue, + ) -> Result<( + Txn, + impl FnOnce( + &mut TxnOpGetResponseSet, + ) -> Result>>, + )> { + let key = ViewInfoKey::new(view_id); + let raw_key = key.to_bytes(); + + let txn = Txn::put_if_not_exists(raw_key.clone(), view_info_value.try_as_raw_value()?); + + Ok(( + txn, + TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)), + )) + } + + /// Builds a update view info transaction, it expected the remote value equals the `current_current_view_info_value`. + /// It retrieves the latest value if the comparing failed. + pub(crate) fn build_update_txn( + &self, + view_id: TableId, + current_view_info_value: &DeserializedValueWithBytes, + new_view_info_value: &ViewInfoValue, + ) -> Result<( + Txn, + impl FnOnce( + &mut TxnOpGetResponseSet, + ) -> Result>>, + )> { + let key = ViewInfoKey::new(view_id); + let raw_key = key.to_bytes(); + let raw_value = current_view_info_value.get_raw_bytes(); + let new_raw_value: Vec = new_view_info_value.try_as_raw_value()?; + + let txn = Txn::compare_and_put(raw_key.clone(), raw_value, new_raw_value); + + Ok(( + txn, + TxnOpGetResponseSet::decode_with(TxnOpGetResponseSet::filter(raw_key)), + )) + } + + /// Get the `[ViewInfoValue]` by the view id + pub async fn get( + &self, + view_id: TableId, + ) -> Result>> { + let key = ViewInfoKey::new(view_id); + let raw_key = key.to_bytes(); + self.kv_backend + .get(&raw_key) + .await? + .map(|x| DeserializedValueWithBytes::from_inner_slice(&x.value)) + .transpose() + } + + /// Get the `[ViewInfoValue]` by the view id slice in batch + pub async fn batch_get(&self, view_ids: &[TableId]) -> Result> { + let lookup_table = view_ids + .iter() + .map(|id| (ViewInfoKey::new(*id).to_bytes(), id)) + .collect::>(); + + let resp = self + .kv_backend + .batch_get(BatchGetRequest { + keys: lookup_table.keys().cloned().collect::>(), + }) + .await?; + + let values = resp + .kvs + .iter() + .map(|kv| { + Ok(( + // Safety: must exist. + **lookup_table.get(kv.key()).unwrap(), + ViewInfoValue::try_from_raw_value(&kv.value)?, + )) + }) + .collect::>>()?; + + Ok(values) + } + + /// Returns batch of `DeserializedValueWithBytes`. + pub async fn batch_get_raw( + &self, + view_ids: &[TableId], + ) -> Result>> { + let lookup_table = view_ids + .iter() + .map(|id| (ViewInfoKey::new(*id).to_bytes(), id)) + .collect::>(); + + let resp = self + .kv_backend + .batch_get(BatchGetRequest { + keys: lookup_table.keys().cloned().collect::>(), + }) + .await?; + + let values = resp + .kvs + .iter() + .map(|kv| { + Ok(( + // Safety: must exist. + **lookup_table.get(kv.key()).unwrap(), + DeserializedValueWithBytes::from_inner_slice(&kv.value)?, + )) + }) + .collect::>>()?; + + Ok(values) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_key_serialization() { + let key = ViewInfoKey::new(42); + let raw_key = key.to_bytes(); + assert_eq!(raw_key, b"__view_info/42"); + } + + #[test] + fn test_key_deserialization() { + let expected = ViewInfoKey::new(42); + let key = ViewInfoKey::from_bytes(b"__view_info/42").unwrap(); + assert_eq!(key, expected); + } + + #[test] + fn test_value_serialization() { + let value = ViewInfoValue { + view_info: vec![1, 2, 3], + version: 1, + }; + let serialized = value.try_as_raw_value().unwrap(); + let deserialized = ViewInfoValue::try_from_raw_value(&serialized).unwrap(); + assert_eq!(value, deserialized); + } +} diff --git a/src/common/meta/src/metrics.rs b/src/common/meta/src/metrics.rs index 726b4fd474c..0aa1d9a3cc6 100644 --- a/src/common/meta/src/metrics.rs +++ b/src/common/meta/src/metrics.rs @@ -39,6 +39,12 @@ lazy_static! { &["step"] ) .unwrap(); + pub static ref METRIC_META_PROCEDURE_CREATE_VIEW: HistogramVec = register_histogram_vec!( + "greptime_meta_procedure_create_view", + "meta procedure create view", + &["step"] + ) + .unwrap(); pub static ref METRIC_META_PROCEDURE_CREATE_FLOW: HistogramVec = register_histogram_vec!( "greptime_meta_procedure_create_flow", "meta procedure create flow", diff --git a/src/common/meta/src/rpc/ddl.rs b/src/common/meta/src/rpc/ddl.rs index 50fcedb62be..e42639c381a 100644 --- a/src/common/meta/src/rpc/ddl.rs +++ b/src/common/meta/src/rpc/ddl.rs @@ -20,14 +20,16 @@ use api::v1::meta::{ AlterTableTask as PbAlterTableTask, AlterTableTasks as PbAlterTableTasks, CreateDatabaseTask as PbCreateDatabaseTask, CreateFlowTask as PbCreateFlowTask, CreateTableTask as PbCreateTableTask, CreateTableTasks as PbCreateTableTasks, - DdlTaskRequest as PbDdlTaskRequest, DdlTaskResponse as PbDdlTaskResponse, - DropDatabaseTask as PbDropDatabaseTask, DropFlowTask as PbDropFlowTask, - DropTableTask as PbDropTableTask, DropTableTasks as PbDropTableTasks, Partition, ProcedureId, + CreateViewTask as PbCreateViewTask, DdlTaskRequest as PbDdlTaskRequest, + DdlTaskResponse as PbDdlTaskResponse, DropDatabaseTask as PbDropDatabaseTask, + DropFlowTask as PbDropFlowTask, DropTableTask as PbDropTableTask, + DropTableTasks as PbDropTableTasks, DropViewTask as PbDropViewTask, Partition, ProcedureId, TruncateTableTask as PbTruncateTableTask, }; use api::v1::{ - AlterExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, DropDatabaseExpr, DropFlowExpr, - DropTableExpr, QueryContext as PbQueryContext, TruncateTableExpr, + AlterExpr, CreateDatabaseExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr, + DropDatabaseExpr, DropFlowExpr, DropTableExpr, DropViewExpr, QueryContext as PbQueryContext, + TruncateTableExpr, }; use base64::engine::general_purpose; use base64::Engine as _; @@ -43,6 +45,7 @@ use crate::error::{self, Result}; use crate::key::FlowId; use crate::table_name::TableName; +/// DDL tasks #[derive(Debug, Clone)] pub enum DdlTask { CreateTable(CreateTableTask), @@ -56,6 +59,8 @@ pub enum DdlTask { DropDatabase(DropDatabaseTask), CreateFlow(CreateFlowTask), DropFlow(DropFlowTask), + CreateView(CreateViewTask), + DropView(DropViewTask), } impl DdlTask { @@ -148,6 +153,14 @@ impl DdlTask { table_id, }) } + + // Create a `[DdlTask::CreateView]` task. + pub fn new_create_view(create_view: CreateViewExpr, view_info: RawTableInfo) -> Self { + DdlTask::CreateView(CreateViewTask { + create_view, + view_info, + }) + } } impl TryFrom for DdlTask { @@ -197,6 +210,8 @@ impl TryFrom for DdlTask { } Task::CreateFlowTask(create_flow) => Ok(DdlTask::CreateFlow(create_flow.try_into()?)), Task::DropFlowTask(drop_flow) => Ok(DdlTask::DropFlow(drop_flow.try_into()?)), + Task::CreateViewTask(create_view) => Ok(DdlTask::CreateView(create_view.try_into()?)), + Task::DropViewTask(drop_view) => Ok(DdlTask::DropView(drop_view.try_into()?)), } } } @@ -213,7 +228,7 @@ impl TryFrom for PbDdlTaskRequest { fn try_from(request: SubmitDdlTaskRequest) -> Result { let task = match request.task { DdlTask::CreateTable(task) => Task::CreateTableTask(task.try_into()?), - DdlTask::DropTable(task) => Task::DropTableTask(task.try_into()?), + DdlTask::DropTable(task) => Task::DropTableTask(task.into()), DdlTask::AlterTable(task) => Task::AlterTableTask(task.try_into()?), DdlTask::TruncateTable(task) => Task::TruncateTableTask(task.try_into()?), DdlTask::CreateLogicalTables(tasks) => { @@ -227,8 +242,8 @@ impl TryFrom for PbDdlTaskRequest { DdlTask::DropLogicalTables(tasks) => { let tasks = tasks .into_iter() - .map(|task| task.try_into()) - .collect::>>()?; + .map(|task| task.into()) + .collect::>(); Task::DropTableTasks(PbDropTableTasks { tasks }) } @@ -244,6 +259,8 @@ impl TryFrom for PbDdlTaskRequest { DdlTask::DropDatabase(task) => Task::DropDatabaseTask(task.try_into()?), DdlTask::CreateFlow(task) => Task::CreateFlowTask(task.into()), DdlTask::DropFlow(task) => Task::DropFlowTask(task.into()), + DdlTask::CreateView(task) => Task::CreateViewTask(task.try_into()?), + DdlTask::DropView(task) => Task::DropViewTask(task.into()), }; Ok(Self { @@ -295,6 +312,138 @@ impl From for PbDdlTaskResponse { } } +/// A `CREATE VIEW` task. +#[derive(Debug, PartialEq, Clone)] +pub struct CreateViewTask { + pub create_view: CreateViewExpr, + pub view_info: RawTableInfo, +} + +impl CreateViewTask { + /// Returns the `[TableReference]` of view. + pub fn table_ref(&self) -> TableReference { + TableReference { + catalog: &self.create_view.catalog_name, + schema: &self.create_view.schema_name, + table: &self.create_view.view_name, + } + } + + pub fn raw_logical_plan(&self) -> &Vec { + &self.create_view.logical_plan + } +} + +impl TryFrom for CreateViewTask { + type Error = error::Error; + + fn try_from(pb: PbCreateViewTask) -> Result { + let view_info = serde_json::from_slice(&pb.view_info).context(error::SerdeJsonSnafu)?; + + Ok(CreateViewTask { + create_view: pb.create_view.context(error::InvalidProtoMsgSnafu { + err_msg: "expected create view", + })?, + view_info, + }) + } +} + +impl TryFrom for PbCreateViewTask { + type Error = error::Error; + + fn try_from(task: CreateViewTask) -> Result { + Ok(PbCreateViewTask { + create_view: Some(task.create_view), + view_info: serde_json::to_vec(&task.view_info).context(error::SerdeJsonSnafu)?, + }) + } +} + +impl Serialize for CreateViewTask { + fn serialize(&self, serializer: S) -> result::Result + where + S: serde::Serializer, + { + let view_info = serde_json::to_vec(&self.view_info) + .map_err(|err| serde::ser::Error::custom(err.to_string()))?; + + let pb = PbCreateViewTask { + create_view: Some(self.create_view.clone()), + view_info, + }; + let buf = pb.encode_to_vec(); + let encoded = general_purpose::STANDARD_NO_PAD.encode(buf); + serializer.serialize_str(&encoded) + } +} + +impl<'de> Deserialize<'de> for CreateViewTask { + fn deserialize(deserializer: D) -> result::Result + where + D: serde::Deserializer<'de>, + { + let encoded = String::deserialize(deserializer)?; + let buf = general_purpose::STANDARD_NO_PAD + .decode(encoded) + .map_err(|err| serde::de::Error::custom(err.to_string()))?; + let expr: PbCreateViewTask = PbCreateViewTask::decode(&*buf) + .map_err(|err| serde::de::Error::custom(err.to_string()))?; + + let expr = CreateViewTask::try_from(expr) + .map_err(|err| serde::de::Error::custom(err.to_string()))?; + + Ok(expr) + } +} + +/// A `DROP VIEW` task. +#[derive(Debug, PartialEq, Clone)] +pub struct DropViewTask { + pub catalog: String, + pub schema: String, + pub view: String, + pub view_id: TableId, + pub drop_if_exists: bool, +} + +impl TryFrom for DropViewTask { + type Error = error::Error; + + fn try_from(pb: PbDropViewTask) -> Result { + let expr = pb.drop_view.context(error::InvalidProtoMsgSnafu { + err_msg: "expected drop view", + })?; + + Ok(DropViewTask { + catalog: expr.catalog_name, + schema: expr.schema_name, + view: expr.view_name, + view_id: expr + .view_id + .context(error::InvalidProtoMsgSnafu { + err_msg: "expected view_id", + })? + .id, + drop_if_exists: expr.drop_if_exists, + }) + } +} + +impl From for PbDropViewTask { + fn from(task: DropViewTask) -> Self { + PbDropViewTask { + drop_view: Some(DropViewExpr { + catalog_name: task.catalog, + schema_name: task.schema, + view_name: task.view, + view_id: Some(api::v1::TableId { id: task.view_id }), + drop_if_exists: task.drop_if_exists, + }), + } + } +} + #[derive(Debug, PartialEq, Serialize, Deserialize, Clone)] pub struct DropTableTask { pub catalog: String, @@ -346,11 +495,9 @@ impl TryFrom for DropTableTask { } } -impl TryFrom for PbDropTableTask { - type Error = error::Error; - - fn try_from(task: DropTableTask) -> Result { - Ok(PbDropTableTask { +impl From for PbDropTableTask { + fn from(task: DropTableTask) -> Self { + PbDropTableTask { drop_table: Some(DropTableExpr { catalog_name: task.catalog, schema_name: task.schema, @@ -358,7 +505,7 @@ impl TryFrom for PbDropTableTask { table_id: Some(api::v1::TableId { id: task.table_id }), drop_if_exists: task.drop_if_exists, }), - }) + } } } diff --git a/src/datatypes/src/schema/raw.rs b/src/datatypes/src/schema/raw.rs index d952bb9f3c7..eaca9617a06 100644 --- a/src/datatypes/src/schema/raw.rs +++ b/src/datatypes/src/schema/raw.rs @@ -20,7 +20,7 @@ use crate::schema::{ColumnSchema, Schema, SchemaBuilder}; /// Struct used to serialize and deserialize [`Schema`](crate::schema::Schema). /// /// This struct only contains necessary data to recover the Schema. -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] pub struct RawSchema { /// Schema of columns. pub column_schemas: Vec, diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 5768d34e7c2..f342762bbb0 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -511,7 +511,6 @@ pub fn check_permission( | Statement::ShowDatabases(_) | Statement::DropDatabase(_) | Statement::DropFlow(_) => {} - Statement::ShowCreateTable(stmt) => { validate_param(&stmt.table_name, query_ctx)?; } @@ -522,6 +521,9 @@ pub fn check_permission( // TODO: should also validate source table name here? validate_param(&stmt.sink_table_name, query_ctx)?; } + Statement::CreateView(stmt) => { + validate_param(&stmt.name, query_ctx)?; + } Statement::Alter(stmt) => { validate_param(stmt.table_name(), query_ctx)?; } diff --git a/src/frontend/src/instance/grpc.rs b/src/frontend/src/instance/grpc.rs index 75cdff69dd9..7be2c09ec1d 100644 --- a/src/frontend/src/instance/grpc.rs +++ b/src/frontend/src/instance/grpc.rs @@ -159,6 +159,17 @@ impl GrpcQueryHandler for Instance { .drop_flow(catalog_name, flow_name, drop_if_exists, ctx.clone()) .await? } + DdlExpr::CreateView(expr) => { + let _ = self + .statement_executor + .create_view_by_expr(expr, ctx.clone()) + .await?; + + Output::new_with_affected_rows(0) + } + DdlExpr::DropView(_) => { + todo!("implemented in the following PR") + } } } }; @@ -207,6 +218,12 @@ fn fill_catalog_and_schema_from_context(ddl_expr: &mut DdlExpr, ctx: &QueryConte expr.catalog_name = catalog.to_string(); } } + Expr::CreateView(expr) => { + check_and_fill!(expr); + } + Expr::DropView(expr) => { + check_and_fill!(expr); + } } } diff --git a/src/operator/Cargo.toml b/src/operator/Cargo.toml index aacbfeb685d..f99ea1c63b5 100644 --- a/src/operator/Cargo.toml +++ b/src/operator/Cargo.toml @@ -53,6 +53,7 @@ snafu.workspace = true sql.workspace = true sqlparser.workspace = true store-api.workspace = true +substrait.workspace = true table.workspace = true tokio.workspace = true tonic.workspace = true diff --git a/src/operator/src/error.rs b/src/operator/src/error.rs index 40bce2101f7..132c3fc36f0 100644 --- a/src/operator/src/error.rs +++ b/src/operator/src/error.rs @@ -35,6 +35,13 @@ pub enum Error { location: Location, }, + #[snafu(display("View already exists: `{name}`"))] + ViewAlreadyExists { + name: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to invalidate table cache"))] InvalidateTableCache { #[snafu(implicit)] @@ -129,6 +136,12 @@ pub enum Error { source: api::error::Error, }, + #[snafu(display("Invalid statement to create view"))] + InvalidViewStmt { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to convert column default constraint, column: {}", column_name))] ConvertColumnDefaultConstraint { column_name: String, @@ -637,6 +650,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Invalid view name: {name}"))] + InvalidViewName { + name: String, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Do not support {} in multiple catalogs", ddl_name))] DdlWithMultiCatalogs { ddl_name: String, @@ -686,6 +706,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Failed to convert between logical plan and substrait plan"))] + SubstraitCodec { + #[snafu(implicit)] + location: Location, + source: substrait::error::Error, + }, } pub type Result = std::result::Result; @@ -714,10 +741,14 @@ impl ErrorExt for Error { | Error::SchemaIncompatible { .. } | Error::UnsupportedRegionRequest { .. } | Error::InvalidTableName { .. } - | Error::ConvertIdentifier { .. } - | Error::InvalidExpr { .. } => StatusCode::InvalidArguments, + | Error::InvalidViewName { .. } + | Error::InvalidExpr { .. } + | Error::InvalidViewStmt { .. } + | Error::ConvertIdentifier { .. } => StatusCode::InvalidArguments, - Error::TableAlreadyExists { .. } => StatusCode::TableAlreadyExists, + Error::TableAlreadyExists { .. } | Error::ViewAlreadyExists { .. } => { + StatusCode::TableAlreadyExists + } Error::NotSupported { .. } => StatusCode::Unsupported, @@ -744,6 +775,7 @@ impl ErrorExt for Error { Error::RequestInserts { source, .. } => source.status_code(), Error::RequestRegion { source, .. } => source.status_code(), Error::RequestDeletes { source, .. } => source.status_code(), + Error::SubstraitCodec { source, .. } => source.status_code(), Error::ColumnDataType { source, .. } | Error::InvalidColumnDef { source, .. } => { source.status_code() diff --git a/src/operator/src/expr_factory.rs b/src/operator/src/expr_factory.rs index e48a9ed1936..b34bedc7c90 100644 --- a/src/operator/src/expr_factory.rs +++ b/src/operator/src/expr_factory.rs @@ -18,8 +18,8 @@ use api::helper::ColumnDataTypeWrapper; use api::v1::alter_expr::Kind; use api::v1::{ AddColumn, AddColumns, AlterExpr, ChangeColumnType, ChangeColumnTypes, Column, ColumnDataType, - ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, DropColumn, DropColumns, RenameTable, - SemanticType, TableName, + ColumnDataTypeExtension, CreateFlowExpr, CreateTableExpr, CreateViewExpr, DropColumn, + DropColumns, RenameTable, SemanticType, TableName, }; use common_error::ext::BoxedError; use common_grpc_expr::util::ColumnExpr; @@ -36,7 +36,9 @@ use session::table_name::table_idents_to_full_name; use snafu::{ensure, OptionExt, ResultExt}; use sql::ast::{ColumnDef, ColumnOption, TableConstraint}; use sql::statements::alter::{AlterTable, AlterTableOperation}; -use sql::statements::create::{CreateExternalTable, CreateFlow, CreateTable, TIME_INDEX}; +use sql::statements::create::{ + CreateExternalTable, CreateFlow, CreateTable, CreateView, TIME_INDEX, +}; use sql::statements::{ column_def_to_schema, sql_column_def_to_grpc_column_def, sql_data_type_to_concrete_data_type, }; @@ -513,6 +515,28 @@ pub(crate) fn to_alter_expr( }) } +/// Try to cast the `[CreateViewExpr]` statement into gRPC `[CreateViewExpr]`. +pub fn to_create_view_expr( + stmt: CreateView, + logical_plan: Vec, + query_ctx: QueryContextRef, +) -> Result { + let (catalog_name, schema_name, view_name) = table_idents_to_full_name(&stmt.name, &query_ctx) + .map_err(BoxedError::new) + .context(ExternalSnafu)?; + + let expr = CreateViewExpr { + catalog_name, + schema_name, + view_name, + logical_plan, + create_if_not_exists: stmt.if_not_exists, + or_replace: stmt.or_replace, + }; + + Ok(expr) +} + pub fn to_create_flow_task_expr( create_flow: CreateFlow, query_ctx: &QueryContextRef, @@ -767,4 +791,54 @@ mod tests { ); assert!(change_column_type.target_type_extension.is_none()); } + + #[test] + fn test_to_create_view_expr() { + let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS"; + let stmt = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap() + .pop() + .unwrap(); + + let Statement::CreateView(stmt) = stmt else { + unreachable!() + }; + + let logical_plan = vec![1, 2, 3]; + + let expr = to_create_view_expr(stmt, logical_plan.clone(), QueryContext::arc()).unwrap(); + + assert_eq!("greptime", expr.catalog_name); + assert_eq!("public", expr.schema_name); + assert_eq!("test", expr.view_name); + assert!(!expr.create_if_not_exists); + assert!(!expr.or_replace); + assert_eq!(logical_plan, expr.logical_plan); + } + + #[test] + fn test_to_create_view_expr_complex() { + let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test.test_view AS SELECT * FROM NUMBERS"; + let stmt = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap() + .pop() + .unwrap(); + + let Statement::CreateView(stmt) = stmt else { + unreachable!() + }; + + let logical_plan = vec![1, 2, 3]; + + let expr = to_create_view_expr(stmt, logical_plan.clone(), QueryContext::arc()).unwrap(); + + assert_eq!("greptime", expr.catalog_name); + assert_eq!("test", expr.schema_name); + assert_eq!("test_view", expr.view_name); + assert!(expr.create_if_not_exists); + assert!(expr.or_replace); + assert_eq!(logical_plan, expr.logical_plan); + } } diff --git a/src/operator/src/metrics.rs b/src/operator/src/metrics.rs index 6774b26deca..932aca16800 100644 --- a/src/operator/src/metrics.rs +++ b/src/operator/src/metrics.rs @@ -41,4 +41,9 @@ lazy_static! { "table operator delete rows" ) .unwrap(); + pub static ref DIST_CREATE_VIEW: Histogram = register_histogram!( + "greptime_ddl_operator_create_view", + "DDL operator create view" + ) + .unwrap(); } diff --git a/src/operator/src/statement.rs b/src/operator/src/statement.rs index beed26fc90b..d1eb1d06426 100644 --- a/src/operator/src/statement.rs +++ b/src/operator/src/statement.rs @@ -179,6 +179,10 @@ impl StatementExecutor { ) .await } + Statement::CreateView(stmt) => { + let _ = self.create_view(stmt, query_ctx).await?; + Ok(Output::new_with_affected_rows(0)) + } Statement::Alter(alter_table) => self.alter_table(alter_table, query_ctx).await, Statement::DropTable(stmt) => { let (catalog, schema, table) = @@ -274,6 +278,13 @@ impl StatementExecutor { .context(PlanStatementSnafu) } + pub fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result { + self.query_engine + .planner() + .optimize(plan) + .context(PlanStatementSnafu) + } + #[tracing::instrument(skip_all)] async fn plan_exec(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result { let plan = self.plan(stmt, query_ctx.clone()).await?; diff --git a/src/operator/src/statement/ddl.rs b/src/operator/src/statement/ddl.rs index d86bbf9fafb..521e3563fa9 100644 --- a/src/operator/src/statement/ddl.rs +++ b/src/operator/src/statement/ddl.rs @@ -17,7 +17,7 @@ use std::sync::Arc; use api::helper::ColumnDataTypeWrapper; use api::v1::meta::CreateFlowTask as PbCreateFlowTask; -use api::v1::{column_def, AlterExpr, CreateFlowExpr, CreateTableExpr}; +use api::v1::{column_def, AlterExpr, CreateFlowExpr, CreateTableExpr, CreateViewExpr}; use catalog::CatalogManagerRef; use chrono::Utc; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; @@ -34,7 +34,7 @@ use common_meta::rpc::ddl::{ use common_meta::rpc::router::{Partition, Partition as MetaPartition}; use common_meta::table_name::TableName; use common_query::Output; -use common_telemetry::{info, tracing}; +use common_telemetry::{debug, info, tracing}; use common_time::Timezone; use datatypes::prelude::ConcreteDataType; use datatypes::schema::RawSchema; @@ -42,6 +42,7 @@ use datatypes::value::Value; use lazy_static::lazy_static; use partition::expr::{Operand, PartitionExpr, RestrictedOp}; use partition::partition::{PartitionBound, PartitionDef}; +use query::parser::QueryStatement; use query::sql::create_table_stmt; use regex::Regex; use session::context::QueryContextRef; @@ -49,11 +50,13 @@ use session::table_name::table_idents_to_full_name; use snafu::{ensure, OptionExt, ResultExt}; use sql::statements::alter::AlterTable; use sql::statements::create::{ - CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, Partitions, + CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, Partitions, }; use sql::statements::sql_value_to_value; +use sql::statements::statement::Statement; use sqlparser::ast::{Expr, Ident, Value as ParserValue}; use store_api::metric_engine_consts::{LOGICAL_TABLE_METADATA_KEY, METRIC_ENGINE_NAME}; +use substrait::{DFLogicalSubstraitConvertor, SubstraitPlan}; use table::dist_table::DistTable; use table::metadata::{self, RawTableInfo, RawTableMeta, TableId, TableInfo, TableType}; use table::requests::{AlterKind, AlterTableRequest, TableOptions}; @@ -65,8 +68,9 @@ use crate::error::{ CreateLogicalTablesSnafu, CreateTableInfoSnafu, DdlWithMultiCatalogsSnafu, DdlWithMultiSchemasSnafu, DeserializePartitionSnafu, EmptyDdlExprSnafu, FlowNotFoundSnafu, InvalidPartitionColumnsSnafu, InvalidPartitionRuleSnafu, InvalidTableNameSnafu, - ParseSqlValueSnafu, Result, SchemaNotFoundSnafu, TableAlreadyExistsSnafu, - TableMetadataManagerSnafu, TableNotFoundSnafu, UnrecognizedTableOptionSnafu, + InvalidViewNameSnafu, InvalidViewStmtSnafu, ParseSqlValueSnafu, Result, SchemaNotFoundSnafu, + SubstraitCodecSnafu, TableAlreadyExistsSnafu, TableMetadataManagerSnafu, TableNotFoundSnafu, + UnrecognizedTableOptionSnafu, ViewAlreadyExistsSnafu, }; use crate::expr_factory; use crate::statement::show::create_partitions_stmt; @@ -374,6 +378,165 @@ impl StatementExecutor { .context(error::ExecuteDdlSnafu) } + #[tracing::instrument(skip_all)] + pub async fn create_view( + &self, + create_view: CreateView, + ctx: QueryContextRef, + ) -> Result { + // convert input into logical plan + let logical_plan = match &*create_view.query { + Statement::Query(query) => { + self.plan( + QueryStatement::Sql(Statement::Query(query.clone())), + ctx.clone(), + ) + .await? + } + Statement::Tql(query) => self.plan_tql(query.clone(), &ctx).await?, + _ => { + return InvalidViewStmtSnafu {}.fail(); + } + }; + let optimized_plan = self.optimize_logical_plan(logical_plan)?; + + // encode logical plan + let encoded_plan = DFLogicalSubstraitConvertor + .encode(&optimized_plan.unwrap_df_plan()) + .context(SubstraitCodecSnafu)?; + + let expr = + expr_factory::to_create_view_expr(create_view, encoded_plan.to_vec(), ctx.clone())?; + + self.create_view_by_expr(expr, ctx).await + } + + pub async fn create_view_by_expr( + &self, + expr: CreateViewExpr, + ctx: QueryContextRef, + ) -> Result { + let _timer = crate::metrics::DIST_CREATE_VIEW.start_timer(); + + let schema_exists = self + .table_metadata_manager + .schema_manager() + .exists(SchemaNameKey::new(&expr.catalog_name, &expr.schema_name)) + .await + .context(TableMetadataManagerSnafu)?; + + ensure!( + schema_exists, + SchemaNotFoundSnafu { + schema_info: &expr.schema_name, + } + ); + + // if view or table exists. + if let Some(table) = self + .catalog_manager + .table(&expr.catalog_name, &expr.schema_name, &expr.view_name) + .await + .context(CatalogSnafu)? + { + let table_type = table.table_info().table_type; + + match (table_type, expr.create_if_not_exists, expr.or_replace) { + (TableType::View, true, false) => { + return Ok(table); + } + (TableType::View, false, false) => { + return ViewAlreadyExistsSnafu { + name: format_full_table_name( + &expr.catalog_name, + &expr.schema_name, + &expr.view_name, + ), + } + .fail(); + } + (TableType::View, _, true) => { + // Try to replace an exists view + } + _ => { + return TableAlreadyExistsSnafu { + table: format_full_table_name( + &expr.catalog_name, + &expr.schema_name, + &expr.view_name, + ), + } + .fail(); + } + } + } + + ensure!( + NAME_PATTERN_REG.is_match(&expr.view_name), + InvalidViewNameSnafu { + name: expr.view_name.clone(), + } + ); + + let view_name = TableName::new(&expr.catalog_name, &expr.schema_name, &expr.view_name); + + let mut view_info = RawTableInfo { + ident: metadata::TableIdent { + // The view id of distributed table is assigned by Meta, set "0" here as a placeholder. + table_id: 0, + version: 0, + }, + name: expr.view_name.clone(), + desc: None, + catalog_name: expr.catalog_name.clone(), + schema_name: expr.schema_name.clone(), + // The meta doesn't make sense for views, so using a default one. + meta: RawTableMeta::default(), + table_type: TableType::View, + }; + + let request = SubmitDdlTaskRequest { + query_context: ctx, + task: DdlTask::new_create_view(expr, view_info.clone()), + }; + + let resp = self + .procedure_executor + .submit_ddl_task(&ExecutorContext::default(), request) + .await + .context(error::ExecuteDdlSnafu)?; + + debug!( + "Submit creating view '{view_name}' task response: {:?}", + resp + ); + + let view_id = resp.table_id.context(error::UnexpectedSnafu { + violated: "expected table_id", + })?; + info!("Successfully created view '{view_name}' with view id {view_id}"); + + // Invalidates local cache ASAP. + self.cache_invalidator + .invalidate( + &Context::default(), + &[ + CacheIdent::TableId(view_id), + CacheIdent::TableName(view_name.clone()), + ], + ) + .await + .context(error::InvalidateTableCacheSnafu)?; + + view_info.ident.table_id = view_id; + + let view_info = Arc::new(view_info.try_into().context(CreateTableInfoSnafu)?); + + let table = DistTable::table(view_info); + + Ok(table) + } + #[tracing::instrument(skip_all)] pub async fn drop_flow( &self, @@ -1175,7 +1338,7 @@ mod test { .unwrap_err() .to_string(), "Invalid partition columns when creating table 'my_table', \ - reason: partition column must belongs to primary keys or equals to time index", + reason: partition column must belongs to primary keys or equals to time index", ); } diff --git a/src/operator/src/statement/tql.rs b/src/operator/src/statement/tql.rs index 2d9e34bc9fe..72b2db641b8 100644 --- a/src/operator/src/statement/tql.rs +++ b/src/operator/src/statement/tql.rs @@ -20,6 +20,7 @@ use query::parser::{ PromQuery, QueryLanguageParser, ANALYZE_NODE_NAME, ANALYZE_VERBOSE_NODE_NAME, DEFAULT_LOOKBACK_STRING, EXPLAIN_NODE_NAME, EXPLAIN_VERBOSE_NODE_NAME, }; +use query::plan::LogicalPlan; use session::context::QueryContextRef; use snafu::ResultExt; use sql::statements::tql::Tql; @@ -28,8 +29,9 @@ use crate::error::{ExecLogicalPlanSnafu, ParseQuerySnafu, PlanStatementSnafu, Re use crate::statement::StatementExecutor; impl StatementExecutor { + /// Plan the given [Tql] query and return the [LogicalPlan]. #[tracing::instrument(skip_all)] - pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result { + pub async fn plan_tql(&self, tql: Tql, query_ctx: &QueryContextRef) -> Result { let stmt = match tql { Tql::Eval(eval) => { let promql = PromQuery { @@ -41,7 +43,7 @@ impl StatementExecutor { .lookback .unwrap_or_else(|| DEFAULT_LOOKBACK_STRING.to_string()), }; - QueryLanguageParser::parse_promql(&promql, &query_ctx).context(ParseQuerySnafu)? + QueryLanguageParser::parse_promql(&promql, query_ctx).context(ParseQuerySnafu)? } Tql::Explain(explain) => { let promql = PromQuery { @@ -58,7 +60,7 @@ impl StatementExecutor { } .to_string(); let params = HashMap::from([("name".to_string(), explain_node_name)]); - QueryLanguageParser::parse_promql(&promql, &query_ctx) + QueryLanguageParser::parse_promql(&promql, query_ctx) .context(ParseQuerySnafu)? .post_process(params) .unwrap() @@ -80,18 +82,23 @@ impl StatementExecutor { } .to_string(); let params = HashMap::from([("name".to_string(), analyze_node_name)]); - QueryLanguageParser::parse_promql(&promql, &query_ctx) + QueryLanguageParser::parse_promql(&promql, query_ctx) .context(ParseQuerySnafu)? .post_process(params) .unwrap() } }; - let plan = self - .query_engine + self.query_engine .planner() .plan(stmt, query_ctx.clone()) .await - .context(PlanStatementSnafu)?; + .context(PlanStatementSnafu) + } + + /// Execute the given [Tql] query and return the result. + #[tracing::instrument(skip_all)] + pub(super) async fn execute_tql(&self, tql: Tql, query_ctx: QueryContextRef) -> Result { + let plan = self.plan_tql(tql, &query_ctx).await?; self.query_engine .execute(plan, query_ctx) .await diff --git a/src/query/src/plan.rs b/src/query/src/plan.rs index 0e2dd710e7a..34495dee989 100644 --- a/src/query/src/plan.rs +++ b/src/query/src/plan.rs @@ -87,6 +87,13 @@ impl LogicalPlan { .context(DataFusionSnafu) .map(LogicalPlan::DfPlan) } + + /// Unwrap the logical plan into a DataFusion logical plan + pub fn unwrap_df_plan(self) -> DfLogicalPlan { + match self { + LogicalPlan::DfPlan(plan) => plan, + } + } } impl From for LogicalPlan { diff --git a/src/query/src/planner.rs b/src/query/src/planner.rs index eb3cb255d6f..5f350a638d7 100644 --- a/src/query/src/planner.rs +++ b/src/query/src/planner.rs @@ -42,6 +42,8 @@ use crate::{DfContextProviderAdapter, QueryEngineContext}; pub trait LogicalPlanner: Send + Sync { async fn plan(&self, stmt: QueryStatement, query_ctx: QueryContextRef) -> Result; + fn optimize(&self, plan: LogicalPlan) -> Result; + fn as_any(&self) -> &dyn Any; } @@ -145,6 +147,14 @@ impl DfLogicalPlanner { .map_err(BoxedError::new) .context(QueryPlanSnafu) } + + #[tracing::instrument(skip_all)] + fn optimize_logical_plan(&self, plan: LogicalPlan) -> Result { + self.engine_state + .optimize_logical_plan(plan.unwrap_df_plan()) + .context(DataFusionSnafu) + .map(Into::into) + } } #[async_trait] @@ -157,6 +167,10 @@ impl LogicalPlanner for DfLogicalPlanner { } } + fn optimize(&self, plan: LogicalPlan) -> Result { + self.optimize_logical_plan(plan) + } + fn as_any(&self) -> &dyn Any { self } diff --git a/src/query/src/query_engine/state.rs b/src/query/src/query_engine/state.rs index 391ed2d0bb3..9fdee8fc0e3 100644 --- a/src/query/src/query_engine/state.rs +++ b/src/query/src/query_engine/state.rs @@ -157,6 +157,11 @@ impl QueryEngineState { }) } + /// Run the full logical plan optimize phase for the given plan. + pub fn optimize_logical_plan(&self, plan: DfLogicalPlan) -> DfResult { + self.session_state().optimize(&plan) + } + /// Register an udf function. /// Will override if the function with same name is already registered. pub fn register_function(&self, func: FunctionRef) { diff --git a/src/sql/src/parsers/create_parser.rs b/src/sql/src/parsers/create_parser.rs index 5797a32902c..152e6a81f7a 100644 --- a/src/sql/src/parsers/create_parser.rs +++ b/src/sql/src/parsers/create_parser.rs @@ -33,8 +33,8 @@ use crate::error::{ }; use crate::parser::{ParserContext, FLOW}; use crate::statements::create::{ - CreateDatabase, CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, Partitions, - TIME_INDEX, + CreateDatabase, CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, + Partitions, TIME_INDEX, }; use crate::statements::statement::Statement; use crate::statements::{get_data_type_by_alias_name, OptionMap}; @@ -70,6 +70,7 @@ impl<'a> ParserContext<'a> { .context(SyntaxSnafu)?; match self.parser.next_token().token { Token::Word(w) => match w.keyword { + Keyword::VIEW => self.parse_create_view(true), Keyword::NoKeyword => { let uppercase = w.value.to_uppercase(); match uppercase.as_str() { @@ -83,6 +84,11 @@ impl<'a> ParserContext<'a> { } } + Keyword::VIEW => { + let _ = self.parser.next_token(); + self.parse_create_view(false) + } + Keyword::NoKeyword => { let _ = self.parser.next_token(); let uppercase = w.value.to_uppercase(); @@ -91,13 +97,31 @@ impl<'a> ParserContext<'a> { _ => self.unsupported(w.to_string()), } } - _ => self.unsupported(w.to_string()), }, unexpected => self.unsupported(unexpected.to_string()), } } + /// Parse `CREAVE VIEW` statement. + fn parse_create_view(&mut self, or_replace: bool) -> Result { + let if_not_exists = self.parse_if_not_exist()?; + let view_name = self.intern_parse_table_name()?; + + self.parser + .expect_keyword(Keyword::AS) + .context(SyntaxSnafu)?; + + let query = self.parse_query()?; + + Ok(Statement::CreateView(CreateView { + name: view_name, + or_replace, + query: Box::new(query), + if_not_exists, + })) + } + fn parse_create_external_table(&mut self) -> Result { let _ = self.parser.next_token(); self.parser @@ -1770,4 +1794,46 @@ non TIMESTAMP(6) TIME INDEX, _ => unreachable!(), } } + + #[test] + fn test_parse_create_view() { + let sql = "CREATE VIEW test AS SELECT * FROM NUMBERS"; + + let result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + match &result[0] { + Statement::CreateView(c) => { + assert_eq!(c.to_string(), sql); + assert!(!c.or_replace); + assert!(!c.if_not_exists); + assert_eq!("test", c.name.to_string()); + } + _ => unreachable!(), + } + + let sql = "CREATE OR REPLACE VIEW IF NOT EXISTS test AS SELECT * FROM NUMBERS"; + + let result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()) + .unwrap(); + match &result[0] { + Statement::CreateView(c) => { + assert_eq!(c.to_string(), sql); + assert!(c.or_replace); + assert!(c.if_not_exists); + assert_eq!("test", c.name.to_string()); + } + _ => unreachable!(), + } + } + + #[test] + fn test_parse_create_view_invalid_query() { + let sql = "CREATE VIEW test AS DELETE from demo"; + let result = + ParserContext::create_with_dialect(sql, &GreptimeDbDialect {}, ParseOptions::default()); + assert!(result.is_err()); + assert_matches!(result, Err(crate::error::Error::Syntax { .. })); + } } diff --git a/src/sql/src/statements/create.rs b/src/sql/src/statements/create.rs index fd52af19153..b4903a666c6 100644 --- a/src/sql/src/statements/create.rs +++ b/src/sql/src/statements/create.rs @@ -20,6 +20,7 @@ use sqlparser::ast::{Expr, Query}; use sqlparser_derive::{Visit, VisitMut}; use crate::ast::{ColumnDef, Ident, ObjectName, TableConstraint, Value as SqlValue}; +use crate::statements::statement::Statement; use crate::statements::OptionMap; const LINE_SEP: &str = ",\n"; @@ -284,6 +285,35 @@ impl Display for CreateFlow { } } +/// Create SQL view statement. +#[derive(Debug, PartialEq, Eq, Clone, Visit, VisitMut)] +pub struct CreateView { + /// View name + pub name: ObjectName, + /// The clause after `As` that defines the VIEW. + /// Can only be either [Statement::Query] or [Statement::Tql]. + pub query: Box, + /// Whether to replace existing VIEW + pub or_replace: bool, + /// Create VIEW only when it doesn't exists + pub if_not_exists: bool, +} + +impl Display for CreateView { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "CREATE ")?; + if self.or_replace { + write!(f, "OR REPLACE ")?; + } + write!(f, "VIEW ")?; + if self.if_not_exists { + write!(f, "IF NOT EXISTS ")?; + } + write!(f, "{} ", &self.name)?; + write!(f, "AS {}", &self.query) + } +} + #[cfg(test)] mod tests { use std::assert_matches::assert_matches; diff --git a/src/sql/src/statements/statement.rs b/src/sql/src/statements/statement.rs index ef05db0f535..aad9575afb8 100644 --- a/src/sql/src/statements/statement.rs +++ b/src/sql/src/statements/statement.rs @@ -21,7 +21,7 @@ use sqlparser_derive::{Visit, VisitMut}; use crate::error::{ConvertToDfStatementSnafu, Error}; use crate::statements::alter::AlterTable; use crate::statements::create::{ - CreateDatabase, CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, + CreateDatabase, CreateExternalTable, CreateFlow, CreateTable, CreateTableLike, CreateView, }; use crate::statements::delete::Delete; use crate::statements::describe::DescribeTable; @@ -56,6 +56,8 @@ pub enum Statement { CreateFlow(CreateFlow), // DROP FLOW DropFlow(DropFlow), + // CREATE VIEW ... AS + CreateView(CreateView), // DROP TABLE DropTable(DropTable), // DROP DATABASE @@ -126,6 +128,7 @@ impl Display for Statement { Statement::ShowCollation(kind) => { write!(f, "SHOW COLLATION {kind}") } + Statement::CreateView(s) => s.fmt(f), } } } diff --git a/src/table/src/metadata.rs b/src/table/src/metadata.rs index 6e91b37fd84..600945d6071 100644 --- a/src/table/src/metadata.rs +++ b/src/table/src/metadata.rs @@ -722,7 +722,7 @@ impl From for TableIdent { } /// Struct used to serialize and deserialize [`TableMeta`]. -#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] +#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize, Default)] pub struct RawTableMeta { pub schema: RawSchema, /// The indices of columns in primary key. Note that the index of timestamp column diff --git a/tests/cases/standalone/common/view/create.result b/tests/cases/standalone/common/view/create.result new file mode 100644 index 00000000000..dbcd435a742 --- /dev/null +++ b/tests/cases/standalone/common/view/create.result @@ -0,0 +1,148 @@ +--- test CREATE VIEW --- +CREATE DATABASE for_test_view; + +Affected Rows: 1 + +USE for_test_view; + +Affected Rows: 0 + +CREATE TABLE test_table(a STRING, ts TIMESTAMP TIME INDEX); + +Affected Rows: 0 + +CREATE VIEW test_view; + +Error: 2000(InvalidSyntax), sql parser error: Expected AS, found: ; at Line: 1, Column 22 + +CREATE VIEW test_view as DELETE FROM public.numbers; + +Error: 2000(InvalidSyntax), sql parser error: Expected SELECT, VALUES, or a subquery in the query body, found: DELETE at Line: 1, Column 26 + +--- Table already exists --- +CREATE VIEW test_table as SELECT * FROM public.numbers; + +Error: 4000(TableAlreadyExists), Table already exists: `greptime.for_test_view.test_table` + +--- Table already exists even when create_if_not_exists --- +CREATE VIEW IF NOT EXISTS test_table as SELECT * FROM public.numbers; + +Error: 4000(TableAlreadyExists), Table already exists: `greptime.for_test_view.test_table` + +--- Table already exists even when or_replace --- +CREATE OR REPLACE VIEW test_table as SELECT * FROM public.numbers; + +Error: 4000(TableAlreadyExists), Table already exists: `greptime.for_test_view.test_table` + +CREATE VIEW test_view as SELECT * FROM public.numbers; + +Affected Rows: 0 + +--- View already exists ---- +CREATE VIEW test_view as SELECT * FROM public.numbers; + +Error: 4000(TableAlreadyExists), View already exists: `greptime.for_test_view.test_view` + +CREATE VIEW IF NOT EXISTS test_view as SELECT * FROM public.numbers; + +Affected Rows: 0 + +CREATE OR REPLACE VIEW test_view as SELECT * FROM public.numbers; + +Affected Rows: 0 + +SHOW TABLES; + ++------------+ +| Tables | ++------------+ +| test_table | +| test_view | ++------------+ + +SHOW FULL TABLES; + ++------------+------------+ +| Tables | Table_type | ++------------+------------+ +| test_table | BASE TABLE | +| test_view | VIEW | ++------------+------------+ + +-- SQLNESS REPLACE (\s\d+\s) ID +SELECT * FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_NAME, TABLE_TYPE; + ++---------------+--------------------+---------------------------------------+-----------------+----------+-------------+ +| table_catalog | table_schema | table_name | table_type | table_id | engine | ++---------------+--------------------+---------------------------------------+-----------------+----------+-------------+ +| greptime | information_schema | build_info | LOCAL TEMPORARY |ID | | +| greptime | information_schema | character_sets | LOCAL TEMPORARY |ID | | +| greptime | information_schema | check_constraints | LOCAL TEMPORARY |ID | | +| greptime | information_schema | cluster_info | LOCAL TEMPORARY |ID | | +| greptime | information_schema | collation_character_set_applicability | LOCAL TEMPORARY |ID | | +| greptime | information_schema | collations | LOCAL TEMPORARY |ID | | +| greptime | information_schema | column_privileges | LOCAL TEMPORARY |ID | | +| greptime | information_schema | column_statistics | LOCAL TEMPORARY |ID | | +| greptime | information_schema | columns | LOCAL TEMPORARY |ID | | +| greptime | information_schema | engines | LOCAL TEMPORARY |ID | | +| greptime | information_schema | events | LOCAL TEMPORARY |ID | | +| greptime | information_schema | files | LOCAL TEMPORARY |ID | | +| greptime | information_schema | global_status | LOCAL TEMPORARY |ID | | +| greptime | information_schema | key_column_usage | LOCAL TEMPORARY |ID | | +| greptime | public | numbers | LOCAL TEMPORARY |ID | test_engine | +| greptime | information_schema | optimizer_trace | LOCAL TEMPORARY |ID | | +| greptime | information_schema | parameters | LOCAL TEMPORARY |ID | | +| greptime | information_schema | partitions | LOCAL TEMPORARY |ID | | +| greptime | information_schema | profiling | LOCAL TEMPORARY |ID | | +| greptime | information_schema | referential_constraints | LOCAL TEMPORARY |ID | | +| greptime | information_schema | region_peers | LOCAL TEMPORARY |ID | | +| greptime | information_schema | routines | LOCAL TEMPORARY |ID | | +| greptime | information_schema | runtime_metrics | LOCAL TEMPORARY |ID | | +| greptime | information_schema | schema_privileges | LOCAL TEMPORARY |ID | | +| greptime | information_schema | schemata | LOCAL TEMPORARY |ID | | +| greptime | information_schema | session_status | LOCAL TEMPORARY |ID | | +| greptime | information_schema | table_constraints | LOCAL TEMPORARY |ID | | +| greptime | information_schema | table_privileges | LOCAL TEMPORARY |ID | | +| greptime | information_schema | tables | LOCAL TEMPORARY |ID | | +| greptime | for_test_view | test_table | BASE TABLE |ID | mito | +| greptime | for_test_view | test_view | VIEW |ID | | +| greptime | information_schema | triggers | LOCAL TEMPORARY |ID | | ++---------------+--------------------+---------------------------------------+-----------------+----------+-------------+ + +-- SQLNESS REPLACE (\s\d+\s) ID +SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'VIEW'; + ++---------------+---------------+------------+------------+----------+--------+ +| table_catalog | table_schema | table_name | table_type | table_id | engine | ++---------------+---------------+------------+------------+----------+--------+ +| greptime | for_test_view | test_view | VIEW |ID | | ++---------------+---------------+------------+------------+----------+--------+ + +SHOW COLUMNS FROM test_view; + +++ +++ + +SHOW FULL COLUMNS FROM test_view; + +++ +++ + +SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'test_view'; + +++ +++ + +--- FIXED in the following PR --- +SELECT * FROM test_view; + +Error: 3001(EngineExecuteQuery), DataFusion error: Unsupported operation: get stream from a distributed table + +USE public; + +Affected Rows: 0 + +DROP DATABASE for_test_view; + +Affected Rows: 0 + diff --git a/tests/cases/standalone/common/view/create.sql b/tests/cases/standalone/common/view/create.sql new file mode 100644 index 00000000000..a01741f9166 --- /dev/null +++ b/tests/cases/standalone/common/view/create.sql @@ -0,0 +1,52 @@ +--- test CREATE VIEW --- + +CREATE DATABASE for_test_view; + +USE for_test_view; + +CREATE TABLE test_table(a STRING, ts TIMESTAMP TIME INDEX); + +CREATE VIEW test_view; + +CREATE VIEW test_view as DELETE FROM public.numbers; + +--- Table already exists --- +CREATE VIEW test_table as SELECT * FROM public.numbers; + +--- Table already exists even when create_if_not_exists --- +CREATE VIEW IF NOT EXISTS test_table as SELECT * FROM public.numbers; + +--- Table already exists even when or_replace --- +CREATE OR REPLACE VIEW test_table as SELECT * FROM public.numbers; + +CREATE VIEW test_view as SELECT * FROM public.numbers; + +--- View already exists ---- +CREATE VIEW test_view as SELECT * FROM public.numbers; + +CREATE VIEW IF NOT EXISTS test_view as SELECT * FROM public.numbers; + +CREATE OR REPLACE VIEW test_view as SELECT * FROM public.numbers; + +SHOW TABLES; + +SHOW FULL TABLES; + +-- SQLNESS REPLACE (\s\d+\s) ID +SELECT * FROM INFORMATION_SCHEMA.TABLES ORDER BY TABLE_NAME, TABLE_TYPE; + +-- SQLNESS REPLACE (\s\d+\s) ID +SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE = 'VIEW'; + +SHOW COLUMNS FROM test_view; + +SHOW FULL COLUMNS FROM test_view; + +SELECT * FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME = 'test_view'; + +--- FIXED in the following PR --- +SELECT * FROM test_view; + +USE public; + +DROP DATABASE for_test_view;