From 83eb5a8d138fbc1e9b36966ccd6d3e8e9118fa29 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 28 Apr 2026 09:57:31 +0800 Subject: [PATCH 1/2] feat(datafusion): add TRUNCATE TABLE and DROP PARTITION SQL support Wire existing TableCommit::truncate_table() and truncate_partitions() APIs to the DataFusion SQL layer, supporting: - TRUNCATE TABLE db.t - TRUNCATE TABLE db.t PARTITION (col = val, ...) - ALTER TABLE db.t DROP PARTITION (col = val, ...) Co-Authored-By: Claude Opus 4.6 --- .../datafusion/src/sql_handler.rs | 286 +++++++++++++++++- 1 file changed, 285 insertions(+), 1 deletion(-) diff --git a/crates/integrations/datafusion/src/sql_handler.rs b/crates/integrations/datafusion/src/sql_handler.rs index ba679a1e..1ff9d272 100644 --- a/crates/integrations/datafusion/src/sql_handler.rs +++ b/crates/integrations/datafusion/src/sql_handler.rs @@ -29,6 +29,9 @@ //! - `ALTER TABLE db.t DROP COLUMN col` //! - `ALTER TABLE db.t RENAME COLUMN old TO new` //! - `ALTER TABLE db.t RENAME TO new_name` +//! - `ALTER TABLE db.t DROP PARTITION (col = val, ...)` +//! - `TRUNCATE TABLE db.t` +//! - `TRUNCATE TABLE db.t PARTITION (col = val, ...)` use std::collections::HashMap; use std::sync::Arc; @@ -45,7 +48,7 @@ use datafusion::prelude::{DataFrame, SessionContext}; use datafusion::sql::sqlparser::ast::{ AlterTableOperation, ColumnDef, CreateTable, CreateTableOptions, Delete, Expr as SqlExpr, FromTable, Insert, Merge, ObjectName, RenameTableNameKind, Reset, ResetStatement, Set, - SqlOption, Statement, TableFactor, TableObject, Update, Value as SqlValue, + SqlOption, Statement, TableFactor, TableObject, Truncate, Update, Value as SqlValue, }; use datafusion::sql::sqlparser::dialect::GenericDialect; use datafusion::sql::sqlparser::parser::Parser; @@ -195,6 +198,7 @@ impl PaimonSqlHandler { } self.ctx.sql(sql).await } + Statement::Truncate(truncate) => self.handle_truncate_table(truncate).await, _ => self.ctx.sql(sql).await, } } @@ -326,6 +330,16 @@ impl PaimonSqlHandler { } } } + // DropPartitions is a data operation (not a schema change), so we handle it + // separately and return early — it cannot be combined with schema changes. + // `if_exists` is intentionally ignored: the underlying overwrite is a no-op + // when the partition doesn't exist, which matches IF EXISTS semantics. + AlterTableOperation::DropPartitions { + partitions, + if_exists: _, + } => { + return self.handle_drop_partitions(&identifier, partitions).await; + } other => { return Err(DataFusionError::Plan(format!( "Unsupported ALTER TABLE operation: {other}" @@ -556,6 +570,76 @@ impl PaimonSqlHandler { crate::merge_into::ok_result(&self.ctx, row_count) } + async fn handle_truncate_table(&self, truncate: &Truncate) -> DFResult { + if truncate.table_names.len() > 1 { + return Err(DataFusionError::Plan( + "TRUNCATE TABLE does not support multiple tables".to_string(), + )); + } + let target = truncate.table_names.first().ok_or_else(|| { + DataFusionError::Plan("TRUNCATE TABLE requires a table name".to_string()) + })?; + let identifier = self.resolve_table_name(&target.name)?; + let table = self + .catalog + .get_table(&identifier) + .await + .map_err(to_datafusion_error)?; + + let wb = table.new_write_builder(); + let commit = wb.new_commit(); + + if let Some(partitions) = &truncate.partitions { + if !partitions.is_empty() { + let partition_values = parse_partition_values( + partitions, + table.schema().fields(), + table.schema().partition_keys(), + )?; + commit + .truncate_partitions(partition_values) + .await + .map_err(to_datafusion_error)?; + return ok_result(&self.ctx); + } + } + + commit.truncate_table().await.map_err(to_datafusion_error)?; + ok_result(&self.ctx) + } + + async fn handle_drop_partitions( + &self, + identifier: &Identifier, + partitions: &[SqlExpr], + ) -> DFResult { + if partitions.is_empty() { + return Err(DataFusionError::Plan( + "DROP PARTITIONS requires at least one partition specification".to_string(), + )); + } + let table = self + .catalog + .get_table(identifier) + .await + .map_err(to_datafusion_error)?; + + let partition_values = parse_partition_values( + partitions, + table.schema().fields(), + table.schema().partition_keys(), + )?; + + let wb = table.new_write_builder(); + let commit = wb.new_commit(); + commit + .truncate_partitions(partition_values) + .await + .map_err(to_datafusion_error)?; + + ok_result(&self.ctx) + } + /// Resolve an ObjectName like `paimon.db.table` or `db.table` to a Paimon Identifier. fn resolve_table_name(&self, name: &ObjectName) -> DFResult { let parts: Vec = name @@ -973,6 +1057,60 @@ fn extract_options(opts: &CreateTableOptions) -> DFResult> .collect() } +/// Parse partition expressions (`col = val, ...`) into partition value maps +/// suitable for `TableCommit::truncate_partitions`. +/// +/// All expressions are treated as belonging to a single partition specification. +/// For multiple partitions, callers should invoke this once per partition clause. +fn parse_partition_values( + exprs: &[SqlExpr], + all_fields: &[PaimonDataField], + partition_keys: &[String], +) -> DFResult>>> { + let field_map: HashMap<&str, &PaimonDataField> = + all_fields.iter().map(|f| (f.name(), f)).collect(); + + let mut partition = HashMap::new(); + for expr in exprs { + let (col_name, val_expr) = match expr { + SqlExpr::BinaryOp { + left, + op: datafusion::sql::sqlparser::ast::BinaryOperator::Eq, + right, + } => { + let col = match left.as_ref() { + SqlExpr::Identifier(ident) => ident.value.clone(), + other => { + return Err(DataFusionError::Plan(format!( + "Expected column name in partition spec, got: {other}" + ))) + } + }; + (col, right.as_ref()) + } + other => { + return Err(DataFusionError::Plan(format!( + "Expected 'column = value' in partition spec, got: {other}" + ))) + } + }; + + if !partition_keys.iter().any(|k| k == &col_name) { + return Err(DataFusionError::Plan(format!( + "Column '{col_name}' is not a partition column" + ))); + } + + let field = field_map.get(col_name.as_str()).ok_or_else(|| { + DataFusionError::Plan(format!("Column '{col_name}' not found in table schema")) + })?; + let datum = sql_expr_to_datum(val_expr, field.data_type())?; + partition.insert(col_name, Some(datum)); + } + + Ok(vec![partition]) +} + /// Parse static partition assignments from `PARTITION (col = val, ...)` expressions. /// Dynamic partition columns (bare identifiers without `= val`) are skipped — /// they will be read from the source query. @@ -2307,4 +2445,150 @@ mod tests { let opts = handler.dynamic_options().read().unwrap(); assert!(opts.is_empty()); } + + // ==================== TRUNCATE TABLE / DROP PARTITIONS tests ==================== + + async fn setup_fs_handler() -> (tempfile::TempDir, PaimonSqlHandler) { + use paimon::{CatalogOptions, FileSystemCatalog, Options}; + + let temp_dir = tempfile::TempDir::new().unwrap(); + let warehouse = format!("file://{}", temp_dir.path().display()); + let mut options = Options::new(); + options.set(CatalogOptions::WAREHOUSE, warehouse); + let catalog = Arc::new(FileSystemCatalog::new(options).unwrap()); + + let handler = + PaimonSqlHandler::new(SessionContext::new(), catalog.clone(), "paimon").unwrap(); + handler.sql("CREATE SCHEMA paimon.test_db").await.unwrap(); + + (temp_dir, handler) + } + + #[tokio::test] + async fn test_truncate_table() { + let (_tmp, handler) = setup_fs_handler().await; + + handler + .sql("CREATE TABLE paimon.test_db.t1 (id INT, value INT)") + .await + .unwrap(); + handler + .sql("INSERT INTO paimon.test_db.t1 VALUES (1, 10), (2, 20)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + handler + .sql("TRUNCATE TABLE paimon.test_db.t1") + .await + .unwrap(); + + let batches = handler + .sql("SELECT * FROM paimon.test_db.t1") + .await + .unwrap() + .collect() + .await + .unwrap(); + let total: usize = batches.iter().map(|b| b.num_rows()).sum(); + assert_eq!(total, 0); + } + + #[tokio::test] + async fn test_truncate_table_partition() { + let (_tmp, handler) = setup_fs_handler().await; + + handler + .sql("CREATE TABLE paimon.test_db.t2 (pt VARCHAR, id INT) PARTITIONED BY (pt)") + .await + .unwrap(); + handler + .sql("INSERT INTO paimon.test_db.t2 VALUES ('a', 1), ('a', 2), ('b', 3), ('b', 4)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + handler + .sql("TRUNCATE TABLE paimon.test_db.t2 PARTITION (pt = 'a')") + .await + .unwrap(); + + let batches = handler + .sql("SELECT pt, id FROM paimon.test_db.t2 ORDER BY id") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let mut rows = Vec::new(); + for batch in &batches { + let pts = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let ids = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + rows.push((pts.value(i).to_string(), ids.value(i))); + } + } + assert_eq!(rows, vec![("b".to_string(), 3), ("b".to_string(), 4)]); + } + + #[tokio::test] + async fn test_alter_table_drop_partitions() { + let (_tmp, handler) = setup_fs_handler().await; + + handler + .sql("CREATE TABLE paimon.test_db.t3 (pt VARCHAR, id INT) PARTITIONED BY (pt)") + .await + .unwrap(); + handler + .sql("INSERT INTO paimon.test_db.t3 VALUES ('a', 1), ('a', 2), ('b', 3), ('b', 4)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + handler + .sql("ALTER TABLE paimon.test_db.t3 DROP PARTITION (pt = 'b')") + .await + .unwrap(); + + let batches = handler + .sql("SELECT pt, id FROM paimon.test_db.t3 ORDER BY id") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let mut rows = Vec::new(); + for batch in &batches { + let pts = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let ids = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + for i in 0..batch.num_rows() { + rows.push((pts.value(i).to_string(), ids.value(i))); + } + } + assert_eq!(rows, vec![("a".to_string(), 1), ("a".to_string(), 2)]); + } } From 20a7f30393083f4b81545168187ff6bfbd70164c Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Tue, 28 Apr 2026 15:49:39 +0800 Subject: [PATCH 2/2] Fix comments --- .../datafusion/src/sql_handler.rs | 166 +++++++++++++++--- 1 file changed, 139 insertions(+), 27 deletions(-) diff --git a/crates/integrations/datafusion/src/sql_handler.rs b/crates/integrations/datafusion/src/sql_handler.rs index 1ff9d272..8993c134 100644 --- a/crates/integrations/datafusion/src/sql_handler.rs +++ b/crates/integrations/datafusion/src/sql_handler.rs @@ -330,15 +330,17 @@ impl PaimonSqlHandler { } } } - // DropPartitions is a data operation (not a schema change), so we handle it - // separately and return early — it cannot be combined with schema changes. - // `if_exists` is intentionally ignored: the underlying overwrite is a no-op - // when the partition doesn't exist, which matches IF EXISTS semantics. AlterTableOperation::DropPartitions { partitions, - if_exists: _, + if_exists: partition_if_exists, } => { - return self.handle_drop_partitions(&identifier, partitions).await; + return self + .handle_drop_partitions( + &identifier, + partitions, + if_exists || *partition_if_exists, + ) + .await; } other => { return Err(DataFusionError::Plan(format!( @@ -580,28 +582,33 @@ impl PaimonSqlHandler { DataFusionError::Plan("TRUNCATE TABLE requires a table name".to_string()) })?; let identifier = self.resolve_table_name(&target.name)?; - let table = self - .catalog - .get_table(&identifier) - .await - .map_err(to_datafusion_error)?; + let table = match self.catalog.get_table(&identifier).await { + Ok(t) => t, + Err(e) if truncate.if_exists && is_table_not_exist(&e) => { + return ok_result(&self.ctx); + } + Err(e) => return Err(to_datafusion_error(e)), + }; let wb = table.new_write_builder(); let commit = wb.new_commit(); if let Some(partitions) = &truncate.partitions { - if !partitions.is_empty() { - let partition_values = parse_partition_values( - partitions, - table.schema().fields(), - table.schema().partition_keys(), - )?; - commit - .truncate_partitions(partition_values) - .await - .map_err(to_datafusion_error)?; - return ok_result(&self.ctx); + if partitions.is_empty() { + return Err(DataFusionError::Plan( + "PARTITION clause requires at least one column = value".to_string(), + )); } + let partition_values = parse_partition_values( + partitions, + table.schema().fields(), + table.schema().partition_keys(), + )?; + commit + .truncate_partitions(partition_values) + .await + .map_err(to_datafusion_error)?; + return ok_result(&self.ctx); } commit.truncate_table().await.map_err(to_datafusion_error)?; @@ -612,17 +619,20 @@ impl PaimonSqlHandler { &self, identifier: &Identifier, partitions: &[SqlExpr], + if_exists: bool, ) -> DFResult { if partitions.is_empty() { return Err(DataFusionError::Plan( "DROP PARTITIONS requires at least one partition specification".to_string(), )); } - let table = self - .catalog - .get_table(identifier) - .await - .map_err(to_datafusion_error)?; + let table = match self.catalog.get_table(identifier).await { + Ok(t) => t, + Err(e) if if_exists && is_table_not_exist(&e) => { + return ok_result(&self.ctx); + } + Err(e) => return Err(to_datafusion_error(e)), + }; let partition_values = parse_partition_values( partitions, @@ -1057,6 +1067,10 @@ fn extract_options(opts: &CreateTableOptions) -> DFResult> .collect() } +fn is_table_not_exist(e: &paimon::Error) -> bool { + matches!(e, paimon::Error::TableNotExist { .. }) +} + /// Parse partition expressions (`col = val, ...`) into partition value maps /// suitable for `TableCommit::truncate_partitions`. /// @@ -1108,6 +1122,18 @@ fn parse_partition_values( partition.insert(col_name, Some(datum)); } + let missing: Vec<&str> = partition_keys + .iter() + .filter(|k| !partition.contains_key(k.as_str())) + .map(|k| k.as_str()) + .collect(); + if !missing.is_empty() { + return Err(DataFusionError::Plan(format!( + "Incomplete partition spec: missing keys [{}]. All partition columns must be specified.", + missing.join(", ") + ))); + } + Ok(vec![partition]) } @@ -2591,4 +2617,90 @@ mod tests { } assert_eq!(rows, vec![("a".to_string(), 1), ("a".to_string(), 2)]); } + + #[tokio::test] + async fn test_truncate_table_incomplete_partition_spec() { + let (_tmp, handler) = setup_fs_handler().await; + + handler + .sql("CREATE TABLE paimon.test_db.t_multi (pt1 VARCHAR, pt2 VARCHAR, id INT) PARTITIONED BY (pt1, pt2)") + .await + .unwrap(); + handler + .sql("INSERT INTO paimon.test_db.t_multi VALUES ('a', 'x', 1)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let err = handler + .sql("TRUNCATE TABLE paimon.test_db.t_multi PARTITION (pt1 = 'a')") + .await + .unwrap_err(); + assert!( + err.to_string().contains("Incomplete partition spec"), + "Expected incomplete partition spec error, got: {err}" + ); + } + + #[tokio::test] + async fn test_truncate_table_if_exists_nonexistent() { + let (_tmp, handler) = setup_fs_handler().await; + + handler + .sql("TRUNCATE TABLE IF EXISTS paimon.test_db.nonexistent") + .await + .unwrap(); + } + + #[tokio::test] + async fn test_truncate_table_nonexistent_without_if_exists() { + let (_tmp, handler) = setup_fs_handler().await; + + let err = handler + .sql("TRUNCATE TABLE paimon.test_db.nonexistent") + .await + .unwrap_err(); + assert!( + err.to_string().contains("does not exist"), + "Expected table-not-exist error, got: {err}" + ); + } + + #[tokio::test] + async fn test_alter_table_if_exists_drop_partition_nonexistent() { + let (_tmp, handler) = setup_fs_handler().await; + + handler + .sql("ALTER TABLE IF EXISTS paimon.test_db.nonexistent DROP PARTITION (pt = 'a')") + .await + .unwrap(); + } + + #[tokio::test] + async fn test_drop_partition_incomplete_spec() { + let (_tmp, handler) = setup_fs_handler().await; + + handler + .sql("CREATE TABLE paimon.test_db.t_dp (pt1 VARCHAR, pt2 VARCHAR, id INT) PARTITIONED BY (pt1, pt2)") + .await + .unwrap(); + handler + .sql("INSERT INTO paimon.test_db.t_dp VALUES ('a', 'x', 1)") + .await + .unwrap() + .collect() + .await + .unwrap(); + + let err = handler + .sql("ALTER TABLE paimon.test_db.t_dp DROP PARTITION (pt1 = 'a')") + .await + .unwrap_err(); + assert!( + err.to_string().contains("Incomplete partition spec"), + "Expected incomplete partition spec error, got: {err}" + ); + } }