From db52d7243784fe7499ba0bc14f9242e9b30db2d7 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Tue, 18 Nov 2025 17:02:44 -0800 Subject: [PATCH 1/4] Add slt for INSERT INTO --- .../integrations/datafusion/src/table/mod.rs | 19 +++ crates/sqllogictest/src/engine/datafusion.rs | 72 ++++++++++- .../testdata/schedules/df_test.toml | 6 +- .../testdata/slts/df_test/insert_into.slt | 119 ++++++++++++++++++ .../testdata/slts/df_test/show_tables.slt | 6 + 5 files changed, 220 insertions(+), 2 deletions(-) create mode 100644 crates/sqllogictest/testdata/slts/df_test/insert_into.slt diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 8527668d6c..08d232f678 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -104,6 +104,25 @@ impl IcebergTableProvider { let table = self.catalog.load_table(&self.table_ident).await?; Ok(IcebergMetadataTableProvider { table, r#type }) } + + /// Reload the table from the catalog if needed to get fresh metadata. + /// This is useful after INSERT operations that modify the table. + pub(crate) async fn reload_if_needed(&self) -> Result { + if let Some(catalog) = &self.catalog { + // Reload the table from the catalog to get the latest metadata + let table = catalog.load_table(self.table.identifier()).await?; + let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); + Ok(IcebergTableProvider { + table, + snapshot_id: self.snapshot_id, + schema, + catalog: self.catalog.clone(), + }) + } else { + // If no catalog is available, return a clone of self + Ok(self.clone()) + } + } } #[async_trait] diff --git a/crates/sqllogictest/src/engine/datafusion.rs b/crates/sqllogictest/src/engine/datafusion.rs index b3e37d9206..c89c705a0f 100644 --- a/crates/sqllogictest/src/engine/datafusion.rs +++ b/crates/sqllogictest/src/engine/datafusion.rs @@ -22,8 +22,9 @@ use std::sync::Arc; use datafusion::catalog::CatalogProvider; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_sqllogictest::DataFusion; -use iceberg::CatalogBuilder; +use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation}; use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; +use iceberg::spec::{NestedField, PrimitiveType, Schema, Transform, Type, UnboundPartitionSpec}; use iceberg_datafusion::IcebergCatalogProvider; use indicatif::ProgressBar; use toml::Table as TomlTable; @@ -84,8 +85,77 @@ impl DataFusionEngine { ) .await?; + // Create a test namespace for INSERT INTO tests + let namespace = NamespaceIdent::new("default".to_string()); + catalog + .create_namespace(&namespace, HashMap::new()) + .await?; + + // Create test tables + Self::create_unpartitioned_table(&catalog, &namespace).await?; + Self::create_partitioned_table(&catalog, &namespace).await?; + Ok(Arc::new( IcebergCatalogProvider::try_new(Arc::new(catalog)).await?, )) } + + /// Create an unpartitioned test table with id and name columns + async fn create_unpartitioned_table( + catalog: &impl Catalog, + namespace: &NamespaceIdent, + ) -> anyhow::Result<()> { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build()?; + + catalog + .create_table( + namespace, + TableCreation::builder() + .name("test_table".to_string()) + .schema(schema) + .build(), + ) + .await?; + + Ok(()) + } + + /// Create a partitioned test table with id, category, and value columns + /// Partitioned by category using identity transform + async fn create_partitioned_table( + catalog: &impl Catalog, + namespace: &NamespaceIdent, + ) -> anyhow::Result<()> { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "category", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::optional(3, "value", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build()?; + + let partition_spec = UnboundPartitionSpec::builder() + .with_spec_id(0) + .add_partition_field(2, "category", Transform::Identity)? + .build(); + + catalog + .create_table( + namespace, + TableCreation::builder() + .name("test_partitioned_table".to_string()) + .schema(schema) + .partition_spec(partition_spec) + .build(), + ) + .await?; + + Ok(()) + } } diff --git a/crates/sqllogictest/testdata/schedules/df_test.toml b/crates/sqllogictest/testdata/schedules/df_test.toml index 0733744951..df5e638d5a 100644 --- a/crates/sqllogictest/testdata/schedules/df_test.toml +++ b/crates/sqllogictest/testdata/schedules/df_test.toml @@ -20,4 +20,8 @@ df = { type = "datafusion" } [[steps]] engine = "df" -slt = "df_test/show_tables.slt" \ No newline at end of file +slt = "df_test/show_tables.slt" + +[[steps]] +engine = "df" +slt = "df_test/insert_into.slt" diff --git a/crates/sqllogictest/testdata/slts/df_test/insert_into.slt b/crates/sqllogictest/testdata/slts/df_test/insert_into.slt new file mode 100644 index 0000000000..fe41467862 --- /dev/null +++ b/crates/sqllogictest/testdata/slts/df_test/insert_into.slt @@ -0,0 +1,119 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Verify the table is initially empty +query IT rowsort +SELECT * FROM default.default.test_table +---- + +# Insert a single row and verify the count +query I +INSERT INTO default.default.test_table VALUES (1, 'Alice') +---- +1 + +# Verify the inserted row +query IT rowsort +SELECT * FROM default.default.test_table +---- +1 Alice + +# Insert multiple rows and verify the count +query I +INSERT INTO default.default.test_table VALUES (2, 'Bob'), (3, 'Charlie') +---- +2 + +# Verify all rows +query IT rowsort +SELECT * FROM default.default.test_table +---- +1 Alice +2 Bob +3 Charlie + +# Insert with NULL value and verify the count +query I +INSERT INTO default.default.test_table VALUES (4, NULL) +---- +1 + +# Verify NULL handling +query IT rowsort +SELECT * FROM default.default.test_table +---- +1 Alice +2 Bob +3 Charlie +4 NULL + +# Test partitioned table - verify initially empty +query ITT rowsort +SELECT * FROM default.default.test_partitioned_table +---- + +# Insert single row into partitioned table +query I +INSERT INTO default.default.test_partitioned_table VALUES (1, 'electronics', 'laptop') +---- +1 + +# Verify the inserted row in partitioned table +query ITT rowsort +SELECT * FROM default.default.test_partitioned_table +---- +1 electronics laptop + +# Insert multiple rows with different partition values +query I +INSERT INTO default.default.test_partitioned_table VALUES (2, 'electronics', 'phone'), (3, 'books', 'novel'), (4, 'books', 'textbook'), (5, 'clothing', 'shirt') +---- +4 + +# Verify all rows in partitioned table +query ITT rowsort +SELECT * FROM default.default.test_partitioned_table +---- +1 electronics laptop +2 electronics phone +3 books novel +4 books textbook +5 clothing shirt + +# Insert with NULL value in optional column +query I +INSERT INTO default.default.test_partitioned_table VALUES (6, 'electronics', NULL) +---- +1 + +# Verify NULL handling in partitioned table +query ITT rowsort +SELECT * FROM default.default.test_partitioned_table +---- +1 electronics laptop +2 electronics phone +3 books novel +4 books textbook +5 clothing shirt +6 electronics NULL + +# Verify partition filtering works +query ITT rowsort +SELECT * FROM default.default.test_partitioned_table WHERE category = 'books' +---- +3 books novel +4 books textbook diff --git a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt index 34709d7359..d31c441652 100644 --- a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt +++ b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt @@ -25,6 +25,12 @@ datafusion information_schema routines VIEW datafusion information_schema schemata VIEW datafusion information_schema tables VIEW datafusion information_schema views VIEW +default default test_partitioned_table BASE TABLE +default default test_partitioned_table$manifests BASE TABLE +default default test_partitioned_table$snapshots BASE TABLE +default default test_table BASE TABLE +default default test_table$manifests BASE TABLE +default default test_table$snapshots BASE TABLE default information_schema columns VIEW default information_schema df_settings VIEW default information_schema parameters VIEW From 9a354122c62727675a8d6c73bd50f8201b10449b Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 27 Nov 2025 17:26:23 -0800 Subject: [PATCH 2/4] Add datafusion insert into slt --- .../integrations/datafusion/src/table/mod.rs | 19 ------------------- 1 file changed, 19 deletions(-) diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 08d232f678..8527668d6c 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -104,25 +104,6 @@ impl IcebergTableProvider { let table = self.catalog.load_table(&self.table_ident).await?; Ok(IcebergMetadataTableProvider { table, r#type }) } - - /// Reload the table from the catalog if needed to get fresh metadata. - /// This is useful after INSERT operations that modify the table. - pub(crate) async fn reload_if_needed(&self) -> Result { - if let Some(catalog) = &self.catalog { - // Reload the table from the catalog to get the latest metadata - let table = catalog.load_table(self.table.identifier()).await?; - let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); - Ok(IcebergTableProvider { - table, - snapshot_id: self.snapshot_id, - schema, - catalog: self.catalog.clone(), - }) - } else { - // If no catalog is available, return a clone of self - Ok(self.clone()) - } - } } #[async_trait] From 225b3eef1ad7e2280e3e8c6b0b69ff1080e04b78 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 27 Nov 2025 17:29:38 -0800 Subject: [PATCH 3/4] better naming --- crates/sqllogictest/src/engine/datafusion.rs | 2 +- .../testdata/slts/df_test/insert_into.slt | 14 +++++++------- .../testdata/slts/df_test/show_tables.slt | 6 +++--- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/crates/sqllogictest/src/engine/datafusion.rs b/crates/sqllogictest/src/engine/datafusion.rs index c89c705a0f..8bd867b301 100644 --- a/crates/sqllogictest/src/engine/datafusion.rs +++ b/crates/sqllogictest/src/engine/datafusion.rs @@ -116,7 +116,7 @@ impl DataFusionEngine { .create_table( namespace, TableCreation::builder() - .name("test_table".to_string()) + .name("test_unpartitioned_table".to_string()) .schema(schema) .build(), ) diff --git a/crates/sqllogictest/testdata/slts/df_test/insert_into.slt b/crates/sqllogictest/testdata/slts/df_test/insert_into.slt index fe41467862..2ba33afcd1 100644 --- a/crates/sqllogictest/testdata/slts/df_test/insert_into.slt +++ b/crates/sqllogictest/testdata/slts/df_test/insert_into.slt @@ -17,30 +17,30 @@ # Verify the table is initially empty query IT rowsort -SELECT * FROM default.default.test_table +SELECT * FROM default.default.test_unpartitioned_table ---- # Insert a single row and verify the count query I -INSERT INTO default.default.test_table VALUES (1, 'Alice') +INSERT INTO default.default.test_unpartitioned_table VALUES (1, 'Alice') ---- 1 # Verify the inserted row query IT rowsort -SELECT * FROM default.default.test_table +SELECT * FROM default.default.test_unpartitioned_table ---- 1 Alice # Insert multiple rows and verify the count query I -INSERT INTO default.default.test_table VALUES (2, 'Bob'), (3, 'Charlie') +INSERT INTO default.default.test_unpartitioned_table VALUES (2, 'Bob'), (3, 'Charlie') ---- 2 # Verify all rows query IT rowsort -SELECT * FROM default.default.test_table +SELECT * FROM default.default.test_unpartitioned_table ---- 1 Alice 2 Bob @@ -48,13 +48,13 @@ SELECT * FROM default.default.test_table # Insert with NULL value and verify the count query I -INSERT INTO default.default.test_table VALUES (4, NULL) +INSERT INTO default.default.test_unpartitioned_table VALUES (4, NULL) ---- 1 # Verify NULL handling query IT rowsort -SELECT * FROM default.default.test_table +SELECT * FROM default.default.test_unpartitioned_table ---- 1 Alice 2 Bob diff --git a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt index d31c441652..c5da5f6276 100644 --- a/crates/sqllogictest/testdata/slts/df_test/show_tables.slt +++ b/crates/sqllogictest/testdata/slts/df_test/show_tables.slt @@ -28,9 +28,9 @@ datafusion information_schema views VIEW default default test_partitioned_table BASE TABLE default default test_partitioned_table$manifests BASE TABLE default default test_partitioned_table$snapshots BASE TABLE -default default test_table BASE TABLE -default default test_table$manifests BASE TABLE -default default test_table$snapshots BASE TABLE +default default test_unpartitioned_table BASE TABLE +default default test_unpartitioned_table$manifests BASE TABLE +default default test_unpartitioned_table$snapshots BASE TABLE default information_schema columns VIEW default information_schema df_settings VIEW default information_schema parameters VIEW From ecf04b05f1d60eb495ad8dd98ec2ecc2f0cf24a2 Mon Sep 17 00:00:00 2001 From: Shawn Chang Date: Thu, 27 Nov 2025 17:32:40 -0800 Subject: [PATCH 4/4] fmt fix --- crates/sqllogictest/src/engine/datafusion.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/crates/sqllogictest/src/engine/datafusion.rs b/crates/sqllogictest/src/engine/datafusion.rs index 8bd867b301..f084c10e33 100644 --- a/crates/sqllogictest/src/engine/datafusion.rs +++ b/crates/sqllogictest/src/engine/datafusion.rs @@ -22,9 +22,9 @@ use std::sync::Arc; use datafusion::catalog::CatalogProvider; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_sqllogictest::DataFusion; -use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation}; use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; use iceberg::spec::{NestedField, PrimitiveType, Schema, Transform, Type, UnboundPartitionSpec}; +use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation}; use iceberg_datafusion::IcebergCatalogProvider; use indicatif::ProgressBar; use toml::Table as TomlTable; @@ -87,9 +87,7 @@ impl DataFusionEngine { // Create a test namespace for INSERT INTO tests let namespace = NamespaceIdent::new("default".to_string()); - catalog - .create_namespace(&namespace, HashMap::new()) - .await?; + catalog.create_namespace(&namespace, HashMap::new()).await?; // Create test tables Self::create_unpartitioned_table(&catalog, &namespace).await?; @@ -134,8 +132,7 @@ impl DataFusionEngine { let schema = Schema::builder() .with_fields(vec![ NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), - NestedField::required(2, "category", Type::Primitive(PrimitiveType::String)) - .into(), + NestedField::required(2, "category", Type::Primitive(PrimitiveType::String)).into(), NestedField::optional(3, "value", Type::Primitive(PrimitiveType::String)).into(), ]) .build()?;