diff --git a/bindings/c/src/table.rs b/bindings/c/src/table.rs index 78927579..c3f57fe4 100644 --- a/bindings/c/src/table.rs +++ b/bindings/c/src/table.rs @@ -48,6 +48,18 @@ unsafe fn free_table_wrapper(ptr: *mut T, get_inner: impl FnOnce(&T) -> *mut } } +// Helper to box a ReadBuilderState and return a raw pointer. +unsafe fn box_read_builder_state(state: ReadBuilderState) -> *mut paimon_read_builder { + let inner = Box::into_raw(Box::new(state)) as *mut c_void; + Box::into_raw(Box::new(paimon_read_builder { inner })) +} + +// Helper to box a TableReadState and return a raw pointer. +unsafe fn box_table_read_state(state: TableReadState) -> *mut paimon_table_read { + let inner = Box::into_raw(Box::new(state)) as *mut c_void; + Box::into_raw(Box::new(paimon_table_read { inner })) +} + // ======================= Table =============================== /// Free a paimon_table. @@ -74,8 +86,12 @@ pub unsafe extern "C" fn paimon_table_new_read_builder( }; } let table_ref = &*((*table).inner as *const Table); + let state = ReadBuilderState { + table: table_ref.clone(), + projected_columns: None, + }; paimon_result_read_builder { - read_builder: box_table_wrapper(table_ref, |inner| paimon_read_builder { inner }), + read_builder: box_read_builder_state(state), error: std::ptr::null_mut(), } } @@ -88,7 +104,57 @@ pub unsafe extern "C" fn paimon_table_new_read_builder( /// Only call with a read_builder returned from `paimon_table_new_read_builder`. #[no_mangle] pub unsafe extern "C" fn paimon_read_builder_free(rb: *mut paimon_read_builder) { - free_table_wrapper(rb, |r| r.inner); + if !rb.is_null() { + let wrapper = Box::from_raw(rb); + if !wrapper.inner.is_null() { + drop(Box::from_raw(wrapper.inner as *mut ReadBuilderState)); + } + } +} + +/// Set column projection for a ReadBuilder. +/// +/// The `columns` parameter is a null-terminated array of null-terminated C strings. +/// Output order follows the caller-specified order. Unknown or duplicate names +/// cause `paimon_read_builder_new_read()` to fail; an empty list is a valid +/// zero-column projection. +/// +/// # Safety +/// `rb` must be a valid pointer from `paimon_table_new_read_builder`, or null (returns error). +/// `columns` must be a null-terminated array of null-terminated C strings, or null for no projection. +#[no_mangle] +pub unsafe extern "C" fn paimon_read_builder_with_projection( + rb: *mut paimon_read_builder, + columns: *const *const std::ffi::c_char, +) -> *mut paimon_error { + if let Err(e) = check_non_null(rb, "rb") { + return e; + } + + let state = &mut *((*rb).inner as *mut ReadBuilderState); + + if columns.is_null() { + state.projected_columns = None; + return std::ptr::null_mut(); + } + + let mut col_names = Vec::new(); + let mut ptr = columns; + while !(*ptr).is_null() { + let c_str = std::ffi::CStr::from_ptr(*ptr); + match c_str.to_str() { + Ok(s) => col_names.push(s.to_string()), + Err(e) => { + return paimon_error::from_paimon(paimon::Error::ConfigInvalid { + message: format!("Invalid UTF-8 in column name: {e}"), + }); + } + } + ptr = ptr.add(1); + } + + state.projected_columns = Some(col_names); + std::ptr::null_mut() } /// Create a new TableScan from a ReadBuilder. @@ -105,9 +171,9 @@ pub unsafe extern "C" fn paimon_read_builder_new_scan( error: e, }; } - let table = &*((*rb).inner as *const Table); + let state = &*((*rb).inner as *const ReadBuilderState); paimon_result_table_scan { - scan: box_table_wrapper(table, |inner| paimon_table_scan { inner }), + scan: box_table_wrapper(&state.table, |inner| paimon_table_scan { inner }), error: std::ptr::null_mut(), } } @@ -126,13 +192,23 @@ pub unsafe extern "C" fn paimon_read_builder_new_read( error: e, }; } - let table = &*((*rb).inner as *const Table); - let rb_rust = table.new_read_builder(); + let state = &*((*rb).inner as *const ReadBuilderState); + let mut rb_rust = state.table.new_read_builder(); + + // Apply projection if set + if let Some(ref columns) = state.projected_columns { + let col_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect(); + rb_rust.with_projection(&col_refs); + } + match rb_rust.new_read() { - Ok(_) => { - let wrapper = box_table_wrapper(table, |inner| paimon_table_read { inner }); + Ok(table_read) => { + let read_state = TableReadState { + table: state.table.clone(), + read_type: table_read.read_type().to_vec(), + }; paimon_result_new_read { - read: wrapper, + read: box_table_read_state(read_state), error: std::ptr::null_mut(), } } @@ -226,7 +302,12 @@ pub unsafe extern "C" fn paimon_plan_num_splits(plan: *const paimon_plan) -> usi /// Only call with a read returned from `paimon_read_builder_new_read`. #[no_mangle] pub unsafe extern "C" fn paimon_table_read_free(read: *mut paimon_table_read) { - free_table_wrapper(read, |r| r.inner); + if !read.is_null() { + let wrapper = Box::from_raw(read); + if !wrapper.inner.is_null() { + drop(Box::from_raw(wrapper.inner as *mut TableReadState)); + } + } } /// Read table data as Arrow record batches via a streaming reader. @@ -261,31 +342,27 @@ pub unsafe extern "C" fn paimon_table_read_to_arrow( }; } - let table = &*((*read).inner as *const Table); + let state = &*((*read).inner as *const TableReadState); let plan_ref = &*((*plan).inner as *const Plan); let all_splits = plan_ref.splits(); let start = offset.min(all_splits.len()); let end = (offset.saturating_add(length)).min(all_splits.len()); let selected = &all_splits[start..end]; - let rb = table.new_read_builder(); - match rb.new_read() { - Ok(table_read) => match table_read.to_arrow(selected) { - Ok(stream) => { - let reader = Box::new(stream); - let wrapper = Box::new(paimon_record_batch_reader { - inner: Box::into_raw(reader) as *mut c_void, - }); - paimon_result_record_batch_reader { - reader: Box::into_raw(wrapper), - error: std::ptr::null_mut(), - } + // Create TableRead with the stored read_type (projection) + let table_read = paimon::table::TableRead::new(&state.table, state.read_type.clone()); + + match table_read.to_arrow(selected) { + Ok(stream) => { + let reader = Box::new(stream); + let wrapper = Box::new(paimon_record_batch_reader { + inner: Box::into_raw(reader) as *mut c_void, + }); + paimon_result_record_batch_reader { + reader: Box::into_raw(wrapper), + error: std::ptr::null_mut(), } - Err(e) => paimon_result_record_batch_reader { - reader: std::ptr::null_mut(), - error: paimon_error::from_paimon(e), - }, - }, + } Err(e) => paimon_result_record_batch_reader { reader: std::ptr::null_mut(), error: paimon_error::from_paimon(e), diff --git a/bindings/c/src/types.rs b/bindings/c/src/types.rs index 1cb2be7f..a95f9654 100644 --- a/bindings/c/src/types.rs +++ b/bindings/c/src/types.rs @@ -17,6 +17,9 @@ use std::ffi::c_void; +use paimon::spec::DataField; +use paimon::table::Table; + /// C-compatible byte buffer. #[repr(C)] #[derive(Clone, Copy)] @@ -68,6 +71,12 @@ pub struct paimon_read_builder { pub inner: *mut c_void, } +/// Internal state for ReadBuilder that stores table and projection columns. +pub(crate) struct ReadBuilderState { + pub table: Table, + pub projected_columns: Option>, +} + #[repr(C)] pub struct paimon_table_scan { pub inner: *mut c_void, @@ -78,6 +87,12 @@ pub struct paimon_table_read { pub inner: *mut c_void, } +/// Internal state for TableRead that stores table and projected read type. +pub(crate) struct TableReadState { + pub table: Table, + pub read_type: Vec, +} + #[repr(C)] pub struct paimon_plan { pub inner: *mut c_void, diff --git a/bindings/go/read_builder.go b/bindings/go/read_builder.go index 20b7a0dd..fde6bf53 100644 --- a/bindings/go/read_builder.go +++ b/bindings/go/read_builder.go @@ -21,6 +21,7 @@ package paimon import ( "context" + "runtime" "sync" "unsafe" @@ -44,6 +45,17 @@ func (rb *ReadBuilder) Close() { }) } +// WithProjection sets column projection by name. Output order follows the +// caller-specified order. Unknown or duplicate names cause NewRead() to fail; +// an empty list is a valid zero-column projection. +func (rb *ReadBuilder) WithProjection(columns []string) error { + if rb.inner == nil { + return ErrClosed + } + projFn := ffiReadBuilderWithProjection.symbol(rb.ctx) + return projFn(rb.inner, columns) +} + // NewScan creates a TableScan for planning which data files to read. func (rb *ReadBuilder) NewScan() (*TableScan, error) { if rb.inner == nil { @@ -85,6 +97,45 @@ var ffiReadBuilderFree = newFFI(ffiOpts{ } }) +var ffiReadBuilderWithProjection = newFFI(ffiOpts{ + sym: "paimon_read_builder_with_projection", + rType: &ffi.TypePointer, + aTypes: []*ffi.Type{&ffi.TypePointer, &ffi.TypePointer}, +}, func(ctx context.Context, ffiCall ffiCall) func(rb *paimonReadBuilder, columns []string) error { + return func(rb *paimonReadBuilder, columns []string) error { + var colPtrs []*byte + var cStrings [][]byte + + // Convert Go strings to null-terminated C strings + for _, col := range columns { + cStr := append([]byte(col), 0) + cStrings = append(cStrings, cStr) + colPtrs = append(colPtrs, &cStr[0]) + } + // Null-terminate the array + colPtrs = append(colPtrs, nil) + + var colsPtr unsafe.Pointer + if len(colPtrs) > 0 { + colsPtr = unsafe.Pointer(&colPtrs[0]) + } + + var errPtr *paimonError + ffiCall( + unsafe.Pointer(&errPtr), + unsafe.Pointer(&rb), + unsafe.Pointer(&colsPtr), + ) + // Ensure Go-managed buffers stay alive for the full native call. + runtime.KeepAlive(cStrings) + runtime.KeepAlive(colPtrs) + if errPtr != nil { + return parseError(ctx, errPtr) + } + return nil + } +}) + var ffiReadBuilderNewScan = newFFI(ffiOpts{ sym: "paimon_read_builder_new_scan", rType: &typeResultTableScan, diff --git a/bindings/go/tests/paimon_test.go b/bindings/go/tests/paimon_test.go index 28e3fc2c..ef38c2be 100644 --- a/bindings/go/tests/paimon_test.go +++ b/bindings/go/tests/paimon_test.go @@ -154,3 +154,114 @@ func TestReadLogTable(t *testing.T) { } } } + +// TestReadWithProjection reads only the "id" column via WithProjection and +// verifies that only the projected column is returned with correct values. +func TestReadWithProjection(t *testing.T) { + warehouse := os.Getenv("PAIMON_TEST_WAREHOUSE") + if warehouse == "" { + warehouse = "/tmp/paimon-warehouse" + } + + if _, err := os.Stat(warehouse); os.IsNotExist(err) { + t.Skipf("Skipping: warehouse %s does not exist (run 'make docker-up' first)", warehouse) + } + + catalog, err := paimon.NewFileSystemCatalog(warehouse) + if err != nil { + t.Fatalf("Failed to create catalog: %v", err) + } + defer catalog.Close() + + table, err := catalog.GetTable(paimon.NewIdentifier("default", "simple_log_table")) + if err != nil { + t.Fatalf("Failed to get table: %v", err) + } + defer table.Close() + + rb, err := table.NewReadBuilder() + if err != nil { + t.Fatalf("Failed to create read builder: %v", err) + } + defer rb.Close() + + // Set projection to only read "id" column + if err := rb.WithProjection([]string{"id"}); err != nil { + t.Fatalf("Failed to set projection: %v", err) + } + + scan, err := rb.NewScan() + if err != nil { + t.Fatalf("Failed to create scan: %v", err) + } + defer scan.Close() + + plan, err := scan.Plan() + if err != nil { + t.Fatalf("Failed to plan: %v", err) + } + defer plan.Close() + + splits := plan.Splits() + if len(splits) == 0 { + t.Fatal("Expected at least one split") + } + + read, err := rb.NewRead() + if err != nil { + t.Fatalf("Failed to create table read: %v", err) + } + defer read.Close() + + reader, err := read.NewRecordBatchReader(splits) + if err != nil { + t.Fatalf("Failed to create record batch reader: %v", err) + } + defer reader.Close() + + var ids []int32 + batchIdx := 0 + for { + record, err := reader.NextRecord() + if errors.Is(err, io.EOF) { + break + } + if err != nil { + t.Fatalf("Batch %d: failed to read next record: %v", batchIdx, err) + } + + // Verify schema only contains the projected column + schema := record.Schema() + if schema.NumFields() != 1 { + record.Release() + t.Fatalf("Batch %d: expected 1 field, got %d: %s", batchIdx, schema.NumFields(), schema) + } + if schema.Field(0).Name != "id" { + record.Release() + t.Fatalf("Batch %d: expected field 'id', got '%s'", batchIdx, schema.Field(0).Name) + } + + idCol := record.Column(0).(*array.Int32) + for j := 0; j < int(record.NumRows()); j++ { + ids = append(ids, idCol.Value(j)) + } + record.Release() + batchIdx++ + } + + if len(ids) == 0 { + t.Fatal("Expected at least one row, got 0") + } + + sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] }) + + expected := []int32{1, 2, 3} + if len(ids) != len(expected) { + t.Fatalf("Expected %d rows, got %d: %v", len(expected), len(ids), ids) + } + for i, exp := range expected { + if ids[i] != exp { + t.Errorf("Row %d: expected id=%d, got id=%d", i, exp, ids[i]) + } + } +} diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index e567d15c..fcb1497e 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -30,15 +30,21 @@ use paimon::table::Table; use crate::error::to_datafusion_error; -/// Execution plan that scans a Paimon table (read-only, no projection, no predicate, no limit). +/// Execution plan that scans a Paimon table with optional column projection. #[derive(Debug)] pub struct PaimonTableScan { table: Table, + /// Projected column names (if None, reads all columns). + projected_columns: Option>, plan_properties: PlanProperties, } impl PaimonTableScan { - pub(crate) fn new(schema: ArrowSchemaRef, table: Table) -> Self { + pub(crate) fn new( + schema: ArrowSchemaRef, + table: Table, + projected_columns: Option>, + ) -> Self { let plan_properties = PlanProperties::new( EquivalenceProperties::new(schema.clone()), // TODO: Currently all Paimon splits are read in a single DataFusion partition, @@ -51,6 +57,7 @@ impl PaimonTableScan { ); Self { table, + projected_columns, plan_properties, } } @@ -91,9 +98,17 @@ impl ExecutionPlan for PaimonTableScan { ) -> DFResult { let table = self.table.clone(); let schema = self.schema(); + let projected_columns = self.projected_columns.clone(); let fut = async move { - let read_builder = table.new_read_builder(); + let mut read_builder = table.new_read_builder(); + + // Apply projection if specified + if let Some(ref columns) = projected_columns { + let col_refs: Vec<&str> = columns.iter().map(|s| s.as_str()).collect(); + read_builder.with_projection(&col_refs); + } + let scan = read_builder.new_scan(); let plan = scan.plan().await.map_err(to_datafusion_error)?; let read = read_builder.new_read().map_err(to_datafusion_error)?; diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 04ccab64..1ba06f4b 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -21,9 +21,8 @@ use std::any::Any; use std::sync::Arc; use async_trait::async_trait; -use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use datafusion::arrow::datatypes::{Field, Schema, SchemaRef as ArrowSchemaRef}; use datafusion::catalog::Session; -use datafusion::common::DataFusionError; use datafusion::datasource::{TableProvider, TableType}; use datafusion::error::Result as DFResult; use datafusion::logical_expr::Expr; @@ -35,8 +34,8 @@ use crate::schema::paimon_schema_to_arrow; /// Read-only table provider for a Paimon table. /// -/// Supports full table scan only (no write, no subset/reordered projection, no predicate -/// pushdown). +/// Supports full table scan and column projection. Predicate pushdown and writes +/// are not yet supported. #[derive(Debug, Clone)] pub struct PaimonTableProvider { table: Table, @@ -79,20 +78,22 @@ impl TableProvider for PaimonTableProvider { _filters: &[Expr], _limit: Option, ) -> DFResult> { - if let Some(projection) = projection { - let is_full_schema_projection = projection.len() == self.schema.fields().len() - && projection.iter().copied().eq(0..self.schema.fields().len()); - - if !is_full_schema_projection { - return Err(DataFusionError::NotImplemented( - "Paimon DataFusion integration does not yet support subset or reordered projections; use SELECT * until apache/paimon-rust#146 is implemented".to_string(), - )); - } - } + // Convert projection indices to column names and compute projected schema + let (projected_schema, projected_columns) = if let Some(indices) = projection { + let fields: Vec = indices + .iter() + .map(|&i| self.schema.field(i).clone()) + .collect(); + let column_names: Vec = fields.iter().map(|f| f.name().clone()).collect(); + (Arc::new(Schema::new(fields)), Some(column_names)) + } else { + (self.schema.clone(), None) + }; Ok(Arc::new(PaimonTableScan::new( - self.schema.clone(), + projected_schema, self.table.clone(), + projected_columns, ))) } } diff --git a/crates/integrations/datafusion/tests/read_tables.rs b/crates/integrations/datafusion/tests/read_tables.rs index f813d78d..97bad768 100644 --- a/crates/integrations/datafusion/tests/read_tables.rs +++ b/crates/integrations/datafusion/tests/read_tables.rs @@ -120,15 +120,39 @@ async fn test_read_primary_key_table_via_datafusion() { } #[tokio::test] -async fn test_subset_projection_returns_not_implemented() { - let error = collect_query("simple_log_table", "SELECT id FROM simple_log_table") +async fn test_projection_via_datafusion() { + let batches = collect_query("simple_log_table", "SELECT id FROM simple_log_table") .await - .expect_err("Subset projection should be rejected until projection support lands"); + .expect("Subset projection should succeed"); assert!( - error - .to_string() - .contains("does not yet support subset or reordered projections"), - "Expected explicit unsupported projection error, got: {error}" + !batches.is_empty(), + "Expected at least one batch from projected query" + ); + + let mut actual_ids = Vec::new(); + for batch in &batches { + let schema = batch.schema(); + let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect(); + assert_eq!( + field_names, + vec!["id"], + "Projected query should only return 'id' column" + ); + + let id_array = batch + .column_by_name("id") + .and_then(|col| col.as_any().downcast_ref::()) + .expect("Expected Int32Array for id column"); + for i in 0..id_array.len() { + actual_ids.push(id_array.value(i)); + } + } + + actual_ids.sort(); + assert_eq!( + actual_ids, + vec![1, 2, 3], + "Projected id values should match" ); } diff --git a/crates/paimon/src/api/auth/dlf_provider.rs b/crates/paimon/src/api/auth/dlf_provider.rs index e5a7ea0e..4f264d88 100644 --- a/crates/paimon/src/api/auth/dlf_provider.rs +++ b/crates/paimon/src/api/auth/dlf_provider.rs @@ -168,7 +168,7 @@ impl DLFECSTokenLoader { async fn get_token(&self, url: &str) -> Result { let token_json = self.http_client.get(url).await?; serde_json::from_str(&token_json).map_err(|e| Error::DataInvalid { - message: format!("Failed to parse token JSON: {}", e), + message: format!("Failed to parse token JSON: {e}"), source: None, }) } @@ -176,7 +176,7 @@ impl DLFECSTokenLoader { /// Build the token URL from base URL and role name. fn build_token_url(&self, role_name: &str) -> String { let base_url = self.ecs_metadata_url.trim_end_matches('/'); - format!("{}/{}", base_url, role_name) + format!("{base_url}/{role_name}") } } @@ -396,7 +396,7 @@ impl TokenHTTPClient { match self.client.get(url).send().await { Ok(response) if response.status().is_success() => { return response.text().await.map_err(|e| Error::DataInvalid { - message: format!("Failed to read response: {}", e), + message: format!("Failed to read response: {e}"), source: None, }); } @@ -404,7 +404,7 @@ impl TokenHTTPClient { last_error = format!("HTTP error: {}", response.status()); } Err(e) => { - last_error = format!("Request failed: {}", e); + last_error = format!("Request failed: {e}"); } } diff --git a/crates/paimon/src/api/auth/dlf_signer.rs b/crates/paimon/src/api/auth/dlf_signer.rs index 133970bf..ca49a283 100644 --- a/crates/paimon/src/api/auth/dlf_signer.rs +++ b/crates/paimon/src/api/auth/dlf_signer.rs @@ -236,7 +236,7 @@ impl DLFDefaultSigner { let sorted_headers = self.build_sorted_signed_headers_map(headers); for (key, value) in sorted_headers { - parts.push(format!("{}:{}", key, value)); + parts.push(format!("{key}:{value}")); } let content_sha256 = headers @@ -262,7 +262,7 @@ impl DLFDefaultSigner { let key = Self::trim(key); if !value.is_empty() { let value = Self::trim(value); - format!("{}={}", key, value) + format!("{key}={value}") } else { key.to_string() } @@ -480,7 +480,7 @@ impl DLFOpenApiSigner { let mut result = String::new(); for (key, value) in sorted_headers { - result.push_str(&format!("{}:{}\n", key, value)); + result.push_str(&format!("{key}:{value}\n")); } result } @@ -500,7 +500,7 @@ impl DLFOpenApiSigner { .map(|(key, value)| { let decoded_value = urlencoding::decode(value).unwrap_or_default(); if !decoded_value.is_empty() { - format!("{}={}", key, decoded_value) + format!("{key}={decoded_value}") } else { key.to_string() } diff --git a/crates/paimon/src/api/rest_api.rs b/crates/paimon/src/api/rest_api.rs index f7851117..be09e9a4 100644 --- a/crates/paimon/src/api/rest_api.rs +++ b/crates/paimon/src/api/rest_api.rs @@ -50,7 +50,7 @@ use super::rest_util::RESTUtil; fn validate_non_empty(value: &str, field_name: &str) -> Result<()> { if value.trim().is_empty() { return Err(crate::Error::ConfigInvalid { - message: format!("{} cannot be empty", field_name), + message: format!("{field_name} cannot be empty"), }); } Ok(()) diff --git a/crates/paimon/src/arrow/reader.rs b/crates/paimon/src/arrow/reader.rs index d7b07a7c..693aaf87 100644 --- a/crates/paimon/src/arrow/reader.rs +++ b/crates/paimon/src/arrow/reader.rs @@ -165,8 +165,7 @@ impl ArrowReader { batch.schema().index_of(name).map_err(|_| { Error::UnexpectedError { message: format!( - "Projected column '{}' not found in Parquet batch schema of file {}", - name, path_to_read + "Projected column '{name}' not found in Parquet batch schema of file {path_to_read}" ), source: None, } diff --git a/crates/paimon/src/table/read_builder.rs b/crates/paimon/src/table/read_builder.rs index 454807eb..69e16741 100644 --- a/crates/paimon/src/table/read_builder.rs +++ b/crates/paimon/src/table/read_builder.rs @@ -65,10 +65,7 @@ impl<'a> ReadBuilder<'a> { Some(projected) => self.resolve_projected_fields(projected)?, }; - Ok(TableRead { - table: self.table, - read_type, - }) + Ok(TableRead::new(self.table, read_type)) } fn resolve_projected_fields(&self, projected_fields: &[String]) -> Result> { @@ -91,10 +88,7 @@ impl<'a> ReadBuilder<'a> { for name in projected_fields { if !seen.insert(name.as_str()) { return Err(Error::ConfigInvalid { - message: format!( - "Duplicate projection column '{}' for table {}", - name, full_name - ), + message: format!("Duplicate projection column '{name}' for table {full_name}"), }); } @@ -121,6 +115,11 @@ pub struct TableRead<'a> { } impl<'a> TableRead<'a> { + /// Create a new TableRead with a specific read type (projected fields). + pub fn new(table: &'a Table, read_type: Vec) -> Self { + Self { table, read_type } + } + /// Schema (fields) that this read will produce. pub fn read_type(&self) -> &[DataField] { &self.read_type diff --git a/crates/paimon/tests/mock_server.rs b/crates/paimon/tests/mock_server.rs index fee16c72..def0ada6 100644 --- a/crates/paimon/tests/mock_server.rs +++ b/crates/paimon/tests/mock_server.rs @@ -115,7 +115,7 @@ impl RESTServer { let err = ErrorResponse::new( None, None, - Some(format!("Warehouse {} not found", warehouse)), + Some(format!("Warehouse {warehouse} not found")), Some(404), ); return (StatusCode::NOT_FOUND, Json(err)).into_response(); @@ -253,7 +253,7 @@ impl RESTServer { if s.databases.remove(&name).is_some() { // Also remove all tables in this database - let prefix = format!("{}.", name); + let prefix = format!("{name}."); s.tables.retain(|key, _| !key.starts_with(&prefix)); s.no_permission_tables .retain(|key| !key.starts_with(&prefix)); @@ -296,7 +296,7 @@ impl RESTServer { return (StatusCode::NOT_FOUND, Json(err)).into_response(); } - let prefix = format!("{}.", db); + let prefix = format!("{db}."); let mut tables: Vec = s .tables .keys() @@ -353,7 +353,7 @@ impl RESTServer { return (StatusCode::NOT_FOUND, Json(err)).into_response(); } - let key = format!("{}.{}", db, table_name); + let key = format!("{db}.{table_name}"); if s.tables.contains_key(&key) { let err = ErrorResponse::new( Some("table".to_string()), @@ -385,7 +385,7 @@ impl RESTServer { ) -> impl IntoResponse { let s = state.inner.lock().unwrap(); - let key = format!("{}.{}", db, table); + let key = format!("{db}.{table}"); if s.no_permission_tables.contains(&key) { let err = ErrorResponse::new( Some("table".to_string()), @@ -426,7 +426,7 @@ impl RESTServer { ) -> impl IntoResponse { let mut s = state.inner.lock().unwrap(); - let key = format!("{}.{}", db, table); + let key = format!("{db}.{table}"); if s.no_permission_tables.contains(&key) { let err = ErrorResponse::new( Some("table".to_string()), @@ -556,7 +556,7 @@ impl RESTServer { ) }); - let key = format!("{}.{}", database, table); + let key = format!("{database}.{table}"); s.tables.entry(key).or_insert_with(|| { GetTableResponse::new( Some(table.to_string()), @@ -574,12 +574,11 @@ impl RESTServer { #[allow(dead_code)] pub fn add_no_permission_table(&self, database: &str, table: &str) { let mut s = self.inner.lock().unwrap(); - s.no_permission_tables - .insert(format!("{}.{}", database, table)); + s.no_permission_tables.insert(format!("{database}.{table}")); } /// Get the server URL. pub fn url(&self) -> Option { - self.addr.map(|a| format!("http://{}", a)) + self.addr.map(|a| format!("http://{a}")) } /// Get the warehouse path. #[allow(dead_code)] @@ -678,25 +677,25 @@ pub async fn start_mock_server( .route("/v1/config", get(RESTServer::get_config)) // Database routes .route( - &format!("{}/databases", prefix), + &format!("{prefix}/databases"), get(RESTServer::list_databases).post(RESTServer::create_database), ) .route( - &format!("{}/databases/:name", prefix), + &format!("{prefix}/databases/:name"), get(RESTServer::get_database) .post(RESTServer::alter_database) .delete(RESTServer::drop_database), ) .route( - &format!("{}/databases/:db/tables", prefix), + &format!("{prefix}/databases/:db/tables"), get(RESTServer::list_tables).post(RESTServer::create_table), ) .route( - &format!("{}/databases/:db/tables/:table", prefix), + &format!("{prefix}/databases/:db/tables/:table"), get(RESTServer::get_table).delete(RESTServer::drop_table), ) .route( - &format!("{}/tables/rename", prefix), + &format!("{prefix}/tables/rename"), axum::routing::post(RESTServer::rename_table), ) // ECS metadata endpoints (for token loader testing) @@ -717,7 +716,7 @@ pub async fn start_mock_server( let server_handle = tokio::spawn(async move { if let Err(e) = axum::serve(listener, app.into_make_service()).await { - eprintln!("mock server error: {}", e); + eprintln!("mock server error: {e}"); } }); diff --git a/crates/paimon/tests/rest_api_test.rs b/crates/paimon/tests/rest_api_test.rs index 9bd31fc4..753f5d1f 100644 --- a/crates/paimon/tests/rest_api_test.rs +++ b/crates/paimon/tests/rest_api_test.rs @@ -88,7 +88,7 @@ async fn test_create_database() { // Create new database let result = ctx.api.create_database("new_db", None).await; - assert!(result.is_ok(), "failed to create database: {:?}", result); + assert!(result.is_ok(), "failed to create database: {result:?}"); // Verify creation let dbs = ctx.api.list_databases().await.unwrap(); @@ -132,7 +132,7 @@ async fn test_error_responses_status_mapping() { ); assert_eq!(j.get("code").and_then(|v| v.as_u64()), Some(403)); } - Err(e) => panic!("Expected 403 response, got error: {:?}", e), + Err(e) => panic!("Expected 403 response, got error: {e:?}"), } // POST create existing database -> 409 @@ -168,7 +168,7 @@ async fn test_alter_database() { updates.insert("key2".to_string(), "value2".to_string()); let result = ctx.api.alter_database("default", vec![], updates).await; - assert!(result.is_ok(), "failed to alter database: {:?}", result); + assert!(result.is_ok(), "failed to alter database: {result:?}"); // Verify the updates by getting the database let db_resp = ctx.api.get_database("default").await.unwrap(); @@ -180,7 +180,7 @@ async fn test_alter_database() { .api .alter_database("default", vec!["key1".to_string()], HashMap::new()) .await; - assert!(result.is_ok(), "failed to remove key: {:?}", result); + assert!(result.is_ok(), "failed to remove key: {result:?}"); let db_resp = ctx.api.get_database("default").await.unwrap(); assert!(!db_resp.options.contains_key("key1")); @@ -211,7 +211,7 @@ async fn test_drop_database() { // Drop database let result = ctx.api.drop_database("to_drop").await; - assert!(result.is_ok(), "failed to drop database: {:?}", result); + assert!(result.is_ok(), "failed to drop database: {result:?}"); // Verify database is gone let dbs = ctx.api.list_databases().await.unwrap(); @@ -278,8 +278,7 @@ async fn test_list_tables_empty_database() { let tables = ctx.api.list_tables("default").await.unwrap(); assert!( tables.is_empty(), - "expected empty tables list, got: {:?}", - tables + "expected empty tables list, got: {tables:?}" ); } @@ -323,7 +322,7 @@ async fn test_create_table() { .api .create_table(&Identifier::new("default", "new_table"), schema) .await; - assert!(result.is_ok(), "failed to create table: {:?}", result); + assert!(result.is_ok(), "failed to create table: {result:?}"); // Verify table exists let tables = ctx.api.list_tables("default").await.unwrap(); @@ -354,7 +353,7 @@ async fn test_drop_table() { .api .drop_table(&Identifier::new("default", "table_to_drop")) .await; - assert!(result.is_ok(), "failed to drop table: {:?}", result); + assert!(result.is_ok(), "failed to drop table: {result:?}"); // Verify table is gone let tables = ctx.api.list_tables("default").await.unwrap(); @@ -398,7 +397,7 @@ async fn test_rename_table() { &Identifier::new("default", "new_table"), ) .await; - assert!(result.is_ok(), "failed to rename table: {:?}", result); + assert!(result.is_ok(), "failed to rename table: {result:?}"); // Verify old table is gone let tables = ctx.api.list_tables("default").await.unwrap(); diff --git a/docs/src/getting-started.md b/docs/src/getting-started.md index c1dc287c..6e42ae3a 100644 --- a/docs/src/getting-started.md +++ b/docs/src/getting-started.md @@ -157,7 +157,7 @@ let df = ctx.sql("SELECT * FROM my_table").await?; df.show().await?; ``` -> **Note:** The DataFusion integration currently supports full table scans only. Column projection and predicate pushdown are not yet implemented. +> **Note:** The DataFusion integration supports full table scans and column projection. Predicate pushdown is not yet implemented. ## Building from Source