Skip to content

Commit

Permalink
feat(query): insert/mutation/update deduplicate based on label (datab…
Browse files Browse the repository at this point in the history
…endlabs#11610)

* feat: use label

* feat: use label

* fix: error

* fix: error

* feat: add update

* feat: add update

* feat: check label before whole pipeline

* feat: check label before whole pipeline

* fix: error

* fix: error

* fix: ci

* feat:lock

* fix:clippy

* fix: fmt

* fix: cargo

* fix: commit txn

* fix: error

* test: add

* test: add

* ci: add x

* fix: ci

* test: add copy test

* fix: adjust time

* fix: test

* fix: test

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
jun0315 and mergify[bot] authored Jun 1, 2023
1 parent 87a7969 commit b3d5654
Show file tree
Hide file tree
Showing 22 changed files with 154 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,11 @@ use common_meta_types::MatchSeqExt;
use common_meta_types::MetaError;
use common_meta_types::MetaId;
use common_meta_types::MetaNetworkError;
use common_meta_types::SeqV;
use common_meta_types::TxnCondition;
use common_meta_types::TxnGetRequest;
use common_meta_types::TxnOp;
use common_meta_types::TxnPutRequest;
use common_meta_types::TxnRequest;
use common_tracing::func_name;
use tracing::debug;
Expand Down Expand Up @@ -2585,6 +2587,12 @@ impl<KV: kvapi::KVApi<Error = MetaError>> SchemaApi for KV {
txn_req.if_then.extend(match_operations)
}

if let Some(deduplicated_label) = req.deduplicated_label.clone() {
txn_req
.if_then
.push(build_upsert_table_deduplicated_label(deduplicated_label))
}

let (succ, responses) = send_txn(self, txn_req).await?;

debug!(id = debug(&tbid), succ = display(succ), "update_table_meta");
Expand Down Expand Up @@ -3436,6 +3444,18 @@ fn build_upsert_table_copied_file_info_conditions(
Ok((condition, if_then))
}

fn build_upsert_table_deduplicated_label(deduplicated_label: String) -> TxnOp {
let expire_at = Some(SeqV::<()>::now_ms() / 1000 + 24 * 60 * 60);
TxnOp {
request: Some(Request::Put(TxnPutRequest {
key: deduplicated_label,
value: 1_i8.to_le_bytes().to_vec(),
prev_value: false,
expire_at,
})),
}
}

fn set_update_expire_operation(
key: &TableCopiedFileNameIdent,
file_info: &TableCopiedFileInfo,
Expand Down
12 changes: 12 additions & 0 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1923,6 +1923,7 @@ impl SchemaApiTestSuite {
seq: MatchSeq::Exact(table_version),
new_table_meta: new_table_meta.clone(),
copied_files: None,
deduplicated_label: None,
})
.await?;

Expand All @@ -1943,6 +1944,7 @@ impl SchemaApiTestSuite {
seq: MatchSeq::Exact(table_version + 1),
new_table_meta: new_table_meta.clone(),
copied_files: None,
deduplicated_label: None,
})
.await;

Expand Down Expand Up @@ -1983,6 +1985,7 @@ impl SchemaApiTestSuite {
seq: MatchSeq::Exact(table_version),
new_table_meta: new_table_meta.clone(),
copied_files: Some(upsert_source_table),
deduplicated_label: None,
})
.await?;

Expand Down Expand Up @@ -2021,6 +2024,7 @@ impl SchemaApiTestSuite {
seq: MatchSeq::Exact(table_version),
new_table_meta: new_table_meta.clone(),
copied_files: Some(upsert_source_table),
deduplicated_label: None,
})
.await?;

Expand Down Expand Up @@ -2060,6 +2064,7 @@ impl SchemaApiTestSuite {
seq: MatchSeq::Exact(table_version),
new_table_meta: new_table_meta.clone(),
copied_files: Some(upsert_source_table),
deduplicated_label: None,
})
.await;
let err = result.unwrap_err();
Expand Down Expand Up @@ -2538,6 +2543,7 @@ impl SchemaApiTestSuite {
seq: MatchSeq::Any,
new_table_meta: table_meta.clone(),
copied_files: Some(req),
deduplicated_label: None,
};

let _ = mt.update_table_meta(req).await?;
Expand Down Expand Up @@ -3265,6 +3271,7 @@ impl SchemaApiTestSuite {
seq: MatchSeq::Any,
new_table_meta: table_meta(created_on),
copied_files: Some(req),
deduplicated_label: None,
};

let _ = mt.update_table_meta(req).await?;
Expand Down Expand Up @@ -3301,6 +3308,7 @@ impl SchemaApiTestSuite {
seq: MatchSeq::Any,
new_table_meta: table_meta(created_on),
copied_files: Some(req),
deduplicated_label: None,
};

let _ = mt.update_table_meta(req).await?;
Expand Down Expand Up @@ -4606,6 +4614,7 @@ impl SchemaApiTestSuite {
seq: MatchSeq::Any,
new_table_meta: table_meta(created_on),
copied_files: Some(req),
deduplicated_label: None,
};

let _ = mt.update_table_meta(req).await?;
Expand Down Expand Up @@ -4651,6 +4660,7 @@ impl SchemaApiTestSuite {
seq: MatchSeq::Any,
new_table_meta: table_meta(created_on),
copied_files: Some(req),
deduplicated_label: None,
};

let result = mt.update_table_meta(req).await;
Expand Down Expand Up @@ -4693,6 +4703,7 @@ impl SchemaApiTestSuite {
seq: MatchSeq::Any,
new_table_meta: table_meta(created_on),
copied_files: Some(req),
deduplicated_label: None,
};

mt.update_table_meta(req).await?;
Expand Down Expand Up @@ -4845,6 +4856,7 @@ where MT: SchemaApi
seq: MatchSeq::Any,
new_table_meta: self.table_meta(),
copied_files: Some(req),
deduplicated_label: None,
};

self.mt.update_table_meta(req).await?;
Expand Down
1 change: 1 addition & 0 deletions src/meta/app/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ pub struct UpdateTableMetaReq {
pub seq: MatchSeq,
pub new_table_meta: TableMeta,
pub copied_files: Option<UpsertTableCopiedFileReq>,
pub deduplicated_label: Option<String>,
}

impl UpsertTableOptionReq {
Expand Down
1 change: 1 addition & 0 deletions src/query/service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ jsonb = { workspace = true }
# common-meta-embedded = { path = "../../meta/embedded" }
aggregating-index = { path = "../ee-features/aggregating-index" }
common-license = { path = "../../common/license" }
common-meta-kvapi = { path = "../../meta/kvapi" }
common-meta-store = { path = "../../meta/store" }
common-meta-types = { path = "../../meta/types" }
common-metrics = { path = "../../common/metrics" }
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/interpreters/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
mod grant;
mod stage;
mod table;
mod util;
pub use grant::validate_grant_object_exists;
pub use stage::try_purge_files;
pub use table::append2table;
pub use util::check_deduplicate_label;
43 changes: 43 additions & 0 deletions src/query/service/src/interpreters/common/util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_catalog::table_context::TableContext;
use common_exception::Result;
use common_meta_kvapi::kvapi::KVApi;
use common_users::UserApiProvider;

/// Checks if a duplicate label exists in the meta store.
///
/// # Arguments
///
/// * `ctx` - The table context. Must implement the `TableContext` trait and be wrapped in an `Arc`.
///
/// # Returns
///
/// Returns a `Result` containing a `bool` indicating whether specific duplicate label exists (`true`) or not (`false`).
pub async fn check_deduplicate_label(ctx: Arc<dyn TableContext>) -> Result<bool> {
match ctx.get_settings().get_deduplicate_label()? {
None => Ok(false),
Some(deduplicate_label) => {
let kv_store = UserApiProvider::instance().get_meta_store_client();
let raw = kv_store.get_kv(&deduplicate_label).await?;
match raw {
None => Ok(false),
Some(_) => Ok(true),
}
}
}
}
5 changes: 5 additions & 0 deletions src/query/service/src/interpreters/interpreter_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use tracing::error;
use tracing::info;

use crate::interpreters::common::append2table;
use crate::interpreters::common::check_deduplicate_label;
use crate::interpreters::Interpreter;
use crate::interpreters::SelectInterpreter;
use crate::pipelines::processors::transforms::TransformAddConstColumns;
Expand Down Expand Up @@ -501,6 +502,10 @@ impl Interpreter for CopyInterpreter {
#[tracing::instrument(level = "debug", name = "copy_interpreter_execute_v2", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))]
#[async_backtrace::framed]
async fn execute2(&self) -> Result<PipelineBuildResult> {
if check_deduplicate_label(self.ctx.clone()).await? {
return Ok(PipelineBuildResult::create());
}

match &self.plan {
CopyPlan::IntoTable(plan) => self.build_copy_into_table_pipeline(plan).await,

Expand Down
4 changes: 4 additions & 0 deletions src/query/service/src/interpreters/interpreter_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use parking_lot::Mutex;
use parking_lot::RwLock;

use crate::interpreters::common::append2table;
use crate::interpreters::common::check_deduplicate_label;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
use crate::pipelines::processors::transforms::TransformRuntimeCastSchema;
Expand Down Expand Up @@ -101,6 +102,9 @@ impl Interpreter for InsertInterpreter {

#[async_backtrace::framed]
async fn execute2(&self) -> Result<PipelineBuildResult> {
if check_deduplicate_label(self.ctx.clone()).await? {
return Ok(PipelineBuildResult::create());
}
let plan = &self.plan;
let table = self
.ctx
Expand Down
5 changes: 5 additions & 0 deletions src/query/service/src/interpreters/interpreter_replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use common_sql::plans::Plan;
use common_sql::plans::Replace;
use common_sql::NameResolutionContext;

use crate::interpreters::common::check_deduplicate_label;
use crate::interpreters::interpreter_copy::CopyInterpreter;
use crate::interpreters::interpreter_insert::ValueSource;
use crate::interpreters::Interpreter;
Expand Down Expand Up @@ -55,6 +56,10 @@ impl Interpreter for ReplaceInterpreter {

#[async_backtrace::framed]
async fn execute2(&self) -> Result<PipelineBuildResult> {
if check_deduplicate_label(self.ctx.clone()).await? {
return Ok(PipelineBuildResult::create());
}

self.check_on_conflicts()?;

let plan = &self.plan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ impl Interpreter for AddTableColumnInterpreter {
seq: MatchSeq::Exact(table_version),
new_table_meta,
copied_files: None,
deduplicated_label: None,
};

let res = catalog.update_table_meta(table_info, req).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl Interpreter for DropTableColumnInterpreter {
seq: MatchSeq::Exact(table_version),
new_table_meta,
copied_files: None,
deduplicated_label: None,
};

let res = catalog.update_table_meta(table_info, req).await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ impl Interpreter for ModifyTableColumnInterpreter {
seq: MatchSeq::Exact(table_version),
new_table_meta,
copied_files: None,
deduplicated_label: None,
};

let res = catalog.update_table_meta(table_info, req).await?;
Expand Down
5 changes: 5 additions & 0 deletions src/query/service/src/interpreters/interpreter_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use common_exception::Result;
use common_sql::executor::cast_expr_to_non_null_boolean;
use table_lock::TableLockHandlerWrapper;

use crate::interpreters::common::check_deduplicate_label;
use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
use crate::sessions::QueryContext;
Expand Down Expand Up @@ -48,6 +49,10 @@ impl Interpreter for UpdateInterpreter {
#[tracing::instrument(level = "debug", name = "update_interpreter_execute", skip(self), fields(ctx.id = self.ctx.get_id().as_str()))]
#[async_backtrace::framed]
async fn execute2(&self) -> Result<PipelineBuildResult> {
if check_deduplicate_label(self.ctx.clone()).await? {
return Ok(PipelineBuildResult::create());
}

let catalog_name = self.plan.catalog.as_str();
let db_name = self.plan.database.as_str();
let tbl_name = self.plan.table.as_str();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ impl TableContext for CtxDelegation {
}

fn get_settings(&self) -> Arc<Settings> {
todo!()
Settings::create("fake_settings".to_string())
}

fn get_shard_settings(&self) -> Arc<Settings> {
Expand Down
6 changes: 6 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,12 @@ impl DefaultSettings {
possible_values: None,
display_in_show_settings: true,
}),
("deduplicate_label", DefaultSettingValue {
value: UserSettingValue::String("".to_owned()),
desc: "Sql duplicate label for deduplication.",
possible_values: None,
display_in_show_settings: false,
}),
]);

Ok(Arc::new(DefaultSettings {
Expand Down
9 changes: 9 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,4 +357,13 @@ impl Settings {
pub fn set_enterprise_license(&self, val: String) -> Result<()> {
self.set_setting("enterprise_license".to_string(), val)
}

pub fn get_deduplicate_label(&self) -> Result<Option<String>> {
let deduplicate_label = self.try_get_string("deduplicate_label")?;
if deduplicate_label.is_empty() {
Ok(None)
} else {
Ok(Some(deduplicate_label))
}
}
}
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ impl FuseTable {
seq: MatchSeq::Exact(table_version),
new_table_meta,
copied_files: copied_files.clone(),
deduplicated_label: ctx.get_settings().get_deduplicate_label()?,
};

// 3. let's roll
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/operations/revert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl FuseTable {
seq: MatchSeq::Exact(base_version),
new_table_meta: table_meta_to_be_committed,
copied_files: None,
deduplicated_label: None,
};

// 4. let's roll
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/operations/truncate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl FuseTable {
seq: MatchSeq::Exact(table_version),
new_table_meta,
copied_files: None,
deduplicated_label: None,
})
.await?;

Expand Down
Loading

0 comments on commit b3d5654

Please sign in to comment.