From 18af241d5f4dd23736b5cdc3983254ab874dfd90 Mon Sep 17 00:00:00 2001 From: Farhan Syah Date: Mon, 30 Mar 2026 21:31:16 +0800 Subject: [PATCH 1/2] refactor(planner): consolidate catalog lookups behind a single collection_type() method Replace four separate is_timeseries / is_plain_columnar / is_spatial / is_kv predicate methods with a single collection_type() call that performs one catalog read. Each call site is updated to match exhaustively on CollectionType, so new collection variants produce a compile error instead of silently falling through to document routing. Filter(TableScan) and bare TableScan paths in the converter now share the same dispatch logic. DML routing in dml.rs and point-operation builders in the native dispatch layer follow the same pattern. Helper functions serialize_predicate_filters, serialize_scan_filters, and resolve_scan_projection are extracted to eliminate repeated inline serialization blocks. --- nodedb/src/control/planner/converter.rs | 571 +++++++++--------- .../src/control/planner/converter_helpers.rs | 8 +- nodedb/src/control/planner/dml.rs | 32 +- .../native/dispatch/plan_builder/document.rs | 113 ++-- .../native/dispatch/plan_builder/mod.rs | 31 +- .../control/server/pgwire/ddl/collection.rs | 13 +- .../server/pgwire/ddl/collection_insert.rs | 8 +- 7 files changed, 397 insertions(+), 379 deletions(-) diff --git a/nodedb/src/control/planner/converter.rs b/nodedb/src/control/planner/converter.rs index 9ef09907..9829d641 100644 --- a/nodedb/src/control/planner/converter.rs +++ b/nodedb/src/control/planner/converter.rs @@ -1,5 +1,7 @@ use datafusion::logical_expr::{FetchType, LogicalPlan}; use datafusion::prelude::*; +use nodedb_types::CollectionType; +use nodedb_types::columnar::ColumnarProfile; use crate::bridge::envelope::PhysicalPlan; use crate::bridge::physical_plan::{ @@ -39,60 +41,21 @@ impl PlanConverter { } } - /// Check if a collection is a timeseries collection. - pub(super) fn is_timeseries(&self, tenant_id: TenantId, collection: &str) -> bool { - if let Some(ref creds) = self.credentials - && let Some(catalog) = creds.catalog() - && let Ok(Some(coll)) = catalog.get_collection(tenant_id.as_u32(), collection) - { - return coll.collection_type.is_timeseries(); - } - false - } - - /// Check if a collection is a plain columnar collection (not timeseries, not spatial). - pub(super) fn is_plain_columnar(&self, tenant_id: TenantId, collection: &str) -> bool { - if let Some(ref creds) = self.credentials - && let Some(catalog) = creds.catalog() - && let Ok(Some(coll)) = catalog.get_collection(tenant_id.as_u32(), collection) - { - return coll.collection_type.is_columnar() - && !coll.collection_type.is_timeseries() - && !matches!( - coll.collection_type, - nodedb_types::CollectionType::Columnar( - nodedb_types::columnar::ColumnarProfile::Spatial { .. } - ) - ); - } - false - } - - /// Check if a collection is a spatial columnar collection. - pub(super) fn is_spatial(&self, tenant_id: TenantId, collection: &str) -> bool { - if let Some(ref creds) = self.credentials - && let Some(catalog) = creds.catalog() - && let Ok(Some(coll)) = catalog.get_collection(tenant_id.as_u32(), collection) - { - return matches!( - coll.collection_type, - nodedb_types::CollectionType::Columnar( - nodedb_types::columnar::ColumnarProfile::Spatial { .. } - ) - ); - } - false - } - - /// Check if a collection is a KV collection. - pub(super) fn is_kv(&self, tenant_id: TenantId, collection: &str) -> bool { - if let Some(ref creds) = self.credentials - && let Some(catalog) = creds.catalog() - && let Ok(Some(coll)) = catalog.get_collection(tenant_id.as_u32(), collection) - { - return coll.collection_type.is_kv(); - } - false + /// Single catalog lookup returning the collection's storage type. + /// + /// Returns `None` when: no catalog available, collection not found, + /// or catalog read error. Callers treat `None` as "default to document". + pub(super) fn collection_type( + &self, + tenant_id: TenantId, + collection: &str, + ) -> Option { + let creds = self.credentials.as_ref()?; + let catalog = creds.catalog().as_ref()?; + let coll = catalog + .get_collection(tenant_id.as_u32(), collection) + .ok()??; + Some(coll.collection_type.clone()) } /// Convert a DataFusion logical plan into one or more physical tasks. @@ -181,96 +144,144 @@ impl PlanConverter { return Ok(vec![task]); } - // Check if the filter predicate can be converted to a point get - // before recursing into the input. + // Filter(TableScan): dispatch by collection type. if let LogicalPlan::TableScan(scan) = filter.input.as_ref() { let collection = scan.table_name.to_string().to_lowercase(); let vshard = VShardId::from_collection(&collection); + let coll_type = self.collection_type(tenant_id, &collection); + + match coll_type { + Some(CollectionType::KeyValue(_)) => { + // Try O(1) hash lookup: WHERE = . + if let Some(key_bytes) = extract_equality_key(&filter.predicate) { + return Ok(vec![PhysicalTask { + tenant_id, + vshard_id: vshard, + plan: PhysicalPlan::Kv(KvOp::Get { + collection, + key: key_bytes, + rls_filters: Vec::new(), + }), + }]); + } + + // Fall back to KV scan with filters. + let filter_bytes = serialize_predicate_filters(&filter.predicate)?; + let limit = scan.fetch.unwrap_or(1000); + return Ok(vec![PhysicalTask { + tenant_id, + vshard_id: vshard, + plan: PhysicalPlan::Kv(KvOp::Scan { + collection, + cursor: Vec::new(), + count: limit, + filters: filter_bytes, + match_pattern: None, + }), + }]); + } + Some(CollectionType::Columnar(ColumnarProfile::Timeseries { .. })) => { + // Combine Filter predicate with any pushed-down + // TableScan filters, then extract time-range bounds. + let mut all_filters = vec![filter.predicate.clone()]; + all_filters.extend(scan.filters.iter().cloned()); + let (time_range, filter_bytes) = + super::converter_helpers::extract_timeseries_filters(&all_filters)?; + let limit = scan.fetch.unwrap_or(10_000); + let projection = resolve_scan_projection(scan); - // KV routing: try PK point-get first, then fall back to scan. - // Note: is_kv() requires catalog access. If the catalog is - // unavailable, this returns false and the query falls through - // to document scan instead of KV scan. - if self.is_kv(tenant_id, &collection) { - // Try O(1) hash lookup: WHERE = . - if let Some(key_bytes) = extract_equality_key(&filter.predicate) { return Ok(vec![PhysicalTask { tenant_id, vshard_id: vshard, - plan: PhysicalPlan::Kv(KvOp::Get { + plan: PhysicalPlan::Timeseries(TimeseriesOp::Scan { collection, - key: key_bytes, + time_range, + projection, + limit, + filters: filter_bytes, + bucket_interval_ms: 0, rls_filters: Vec::new(), }), }]); } + Some(CollectionType::Columnar(ColumnarProfile::Plain)) => { + let filter_bytes = serialize_predicate_filters(&filter.predicate)?; + let limit = scan.fetch.unwrap_or(10_000); + let projection = resolve_scan_projection(scan); - // Fall back to KV scan with filters. - let filters = expr_to_scan_filters(&filter.predicate); - let filter_bytes = rmp_serde::to_vec_named(&filters).map_err(|e| { - crate::Error::Serialization { - format: "msgpack".into(), - detail: format!("kv filter serialization: {e}"), + return Ok(vec![PhysicalTask { + tenant_id, + vshard_id: vshard, + plan: PhysicalPlan::Columnar(ColumnarOp::Scan { + collection, + projection, + limit, + filters: filter_bytes, + rls_filters: Vec::new(), + }), + }]); + } + Some(CollectionType::Columnar(ColumnarProfile::Spatial { .. })) => { + // Spatial ST_* predicates are handled above + // by try_extract_spatial_scan. Non-spatial + // WHERE on spatial collections → columnar scan. + let filter_bytes = serialize_predicate_filters(&filter.predicate)?; + let limit = scan.fetch.unwrap_or(10_000); + let projection = resolve_scan_projection(scan); + + return Ok(vec![PhysicalTask { + tenant_id, + vshard_id: vshard, + plan: PhysicalPlan::Columnar(ColumnarOp::Scan { + collection, + projection, + limit, + filters: filter_bytes, + rls_filters: Vec::new(), + }), + }]); + } + // Document (schemaless/strict) or unknown: try + // point-get, range-scan, then full document scan. + Some(CollectionType::Document(_)) | None => { + if let Some(task) = self.try_point_get( + &collection, + std::slice::from_ref(&filter.predicate), + tenant_id, + vshard, + )? { + return Ok(vec![task]); } - })?; - let limit = scan.fetch.unwrap_or(1000); - return Ok(vec![PhysicalTask { - tenant_id, - vshard_id: vshard, - plan: PhysicalPlan::Kv(KvOp::Scan { - collection, - cursor: Vec::new(), - count: limit, - filters: filter_bytes, - match_pattern: None, - }), - }]); - } - if let Some(task) = self.try_point_get( - &collection, - std::slice::from_ref(&filter.predicate), - tenant_id, - vshard, - )? { - return Ok(vec![task]); - } + if let Some(task) = try_range_scan_from_predicate( + &collection, + &filter.predicate, + tenant_id, + vshard, + ) { + return Ok(vec![task]); + } - // Try secondary index: equality or range on a non-id field → RangeScan. - if let Some(task) = try_range_scan_from_predicate( - &collection, - &filter.predicate, - tenant_id, - vshard, - ) { - return Ok(vec![task]); - } + let filter_bytes = serialize_predicate_filters(&filter.predicate)?; + let limit = scan.fetch.unwrap_or(1000); - // Not a point get or indexed scan — emit DocumentScan with filters. - let filters = expr_to_scan_filters(&filter.predicate); - let filter_bytes = rmp_serde::to_vec_named(&filters).map_err(|e| { - crate::Error::Serialization { - format: "msgpack".into(), - detail: format!("filter serialization: {e}"), + return Ok(vec![PhysicalTask { + tenant_id, + vshard_id: vshard, + plan: PhysicalPlan::Document(DocumentOp::Scan { + collection, + limit, + offset: 0, + sort_keys: Vec::new(), + filters: filter_bytes, + distinct: false, + projection: Vec::new(), + computed_columns: Vec::new(), + window_functions: Vec::new(), + }), + }]); } - })?; - let limit = scan.fetch.unwrap_or(1000); - - return Ok(vec![PhysicalTask { - tenant_id, - vshard_id: vshard, - plan: PhysicalPlan::Document(DocumentOp::Scan { - collection, - limit, - offset: 0, - sort_keys: Vec::new(), - filters: filter_bytes, - distinct: false, - projection: Vec::new(), - computed_columns: Vec::new(), - window_functions: Vec::new(), - }), - }]); + } } // Filter wrapping Aggregate = HAVING clause. if matches!(filter.input.as_ref(), LogicalPlan::Aggregate(_)) { @@ -338,191 +349,119 @@ impl PlanConverter { LogicalPlan::TableScan(scan) => { let collection = scan.table_name.to_string().to_lowercase(); let vshard = VShardId::from_collection(&collection); + let coll_type = self.collection_type(tenant_id, &collection); - // Timeseries routing: if collection is timeseries, emit TimeseriesScan. - if self.is_timeseries(tenant_id, &collection) { - let limit = scan.fetch.unwrap_or(10_000); - let (time_range, filter_bytes) = - super::converter_helpers::extract_timeseries_filters(&scan.filters)?; - - // Resolve column projection from indices to names. - let projection = scan - .projection - .as_ref() - .map(|indices| { - let schema = scan.source.schema(); - indices - .iter() - .filter_map(|&idx| { - schema.fields().get(idx).map(|f| f.name().clone()) - }) - .collect::>() - }) - .unwrap_or_default(); + match coll_type { + Some(CollectionType::Columnar(ColumnarProfile::Timeseries { .. })) => { + let limit = scan.fetch.unwrap_or(10_000); + let (time_range, filter_bytes) = + super::converter_helpers::extract_timeseries_filters(&scan.filters)?; + let projection = resolve_scan_projection(scan); - return Ok(vec![PhysicalTask { - tenant_id, - vshard_id: vshard, - plan: PhysicalPlan::Timeseries(TimeseriesOp::Scan { - collection, - time_range, - projection, - limit, - filters: filter_bytes, - bucket_interval_ms: 0, - rls_filters: Vec::new(), - }), - }]); - } - - // Plain columnar routing: uses same infrastructure as timeseries - // but without time-range constraints or bucketing. - if self.is_plain_columnar(tenant_id, &collection) { - let limit = scan.fetch.unwrap_or(10_000); - let filter_bytes = if !scan.filters.is_empty() { - let mut all_filters = Vec::new(); - for f in &scan.filters { - all_filters.extend(expr_to_scan_filters(f)); - } - rmp_serde::to_vec_named(&all_filters).map_err(|e| { - crate::Error::Serialization { - format: "msgpack".into(), - detail: format!("columnar filter serialization: {e}"), - } - })? - } else { - Vec::new() - }; - - let projection = scan - .projection - .as_ref() - .map(|indices| { - let schema = scan.source.schema(); - indices - .iter() - .filter_map(|&idx| { - schema.fields().get(idx).map(|f| f.name().clone()) - }) - .collect::>() - }) - .unwrap_or_default(); - - return Ok(vec![PhysicalTask { - tenant_id, - vshard_id: vshard, - plan: PhysicalPlan::Columnar(ColumnarOp::Scan { - collection, - projection, - limit, - filters: filter_bytes, - rls_filters: Vec::new(), - }), - }]); - } - - // Spatial columnar routing: bare table scans on spatial collections - // read from columnar memtable. ST_* predicate queries go through - // SpatialOp::Scan (handled in the Filter case above). - if self.is_spatial(tenant_id, &collection) { - let limit = scan.fetch.unwrap_or(10_000); - let projection = scan - .projection - .as_ref() - .map(|indices| { - let schema = scan.source.schema(); - indices - .iter() - .filter_map(|&idx| { - schema.fields().get(idx).map(|f| f.name().clone()) - }) - .collect::>() - }) - .unwrap_or_default(); - - return Ok(vec![PhysicalTask { - tenant_id, - vshard_id: vshard, - plan: PhysicalPlan::Columnar(ColumnarOp::Scan { - collection, - projection, - limit, - filters: Vec::new(), - rls_filters: Vec::new(), - }), - }]); - } - - // KV routing: if collection is KV, emit KvScan. - if self.is_kv(tenant_id, &collection) { - let limit = scan.fetch.unwrap_or(1000); - return Ok(vec![PhysicalTask { - tenant_id, - vshard_id: vshard, - plan: PhysicalPlan::Kv(KvOp::Scan { - collection, - cursor: Vec::new(), - count: limit, - filters: Vec::new(), - match_pattern: None, - }), - }]); - } - - // Check for filter pushdown: equality on id → point get. - if let Some(task) = - self.try_point_get(&collection, &scan.filters, tenant_id, vshard)? - { - return Ok(vec![task]); - } + Ok(vec![PhysicalTask { + tenant_id, + vshard_id: vshard, + plan: PhysicalPlan::Timeseries(TimeseriesOp::Scan { + collection, + time_range, + projection, + limit, + filters: filter_bytes, + bucket_interval_ms: 0, + rls_filters: Vec::new(), + }), + }]) + } + Some(CollectionType::Columnar(ColumnarProfile::Plain)) => { + let limit = scan.fetch.unwrap_or(10_000); + let filter_bytes = serialize_scan_filters(&scan.filters)?; + let projection = resolve_scan_projection(scan); - // Default: full document scan. - let limit = scan.fetch.unwrap_or(1000); + Ok(vec![PhysicalTask { + tenant_id, + vshard_id: vshard, + plan: PhysicalPlan::Columnar(ColumnarOp::Scan { + collection, + projection, + limit, + filters: filter_bytes, + rls_filters: Vec::new(), + }), + }]) + } + Some(CollectionType::Columnar(ColumnarProfile::Spatial { .. })) => { + let limit = scan.fetch.unwrap_or(10_000); + let projection = resolve_scan_projection(scan); - // Convert any TableScan filters to scan filters. - let filter_bytes = if !scan.filters.is_empty() { - let mut all_filters = Vec::new(); - for f in &scan.filters { - all_filters.extend(expr_to_scan_filters(f)); + Ok(vec![PhysicalTask { + tenant_id, + vshard_id: vshard, + plan: PhysicalPlan::Columnar(ColumnarOp::Scan { + collection, + projection, + limit, + filters: Vec::new(), + rls_filters: Vec::new(), + }), + }]) } - rmp_serde::to_vec_named(&all_filters).map_err(|e| { - crate::Error::Serialization { - format: "msgpack".into(), - detail: format!("filter serialization: {e}"), + Some(CollectionType::KeyValue(_)) => { + let limit = scan.fetch.unwrap_or(1000); + Ok(vec![PhysicalTask { + tenant_id, + vshard_id: vshard, + plan: PhysicalPlan::Kv(KvOp::Scan { + collection, + cursor: Vec::new(), + count: limit, + filters: Vec::new(), + match_pattern: None, + }), + }]) + } + Some(CollectionType::Document(_)) | None => { + // Try filter pushdown: equality on id → point get. + if let Some(task) = + self.try_point_get(&collection, &scan.filters, tenant_id, vshard)? + { + return Ok(vec![task]); } - })? - } else { - Vec::new() - }; - Ok(vec![PhysicalTask { - tenant_id, - vshard_id: vshard, - plan: PhysicalPlan::Document(DocumentOp::Scan { - collection, - limit, - offset: 0, - sort_keys: Vec::new(), - filters: filter_bytes, - distinct: false, - projection: Vec::new(), - computed_columns: Vec::new(), - window_functions: Vec::new(), - }), - }]) + let limit = scan.fetch.unwrap_or(1000); + let filter_bytes = serialize_scan_filters(&scan.filters)?; + + Ok(vec![PhysicalTask { + tenant_id, + vshard_id: vshard, + plan: PhysicalPlan::Document(DocumentOp::Scan { + collection, + limit, + offset: 0, + sort_keys: Vec::new(), + filters: filter_bytes, + distinct: false, + projection: Vec::new(), + computed_columns: Vec::new(), + window_functions: Vec::new(), + }), + }]) + } + } } LogicalPlan::Limit(limit_plan) => { let mut tasks = self.convert(&limit_plan.input, tenant_id)?; - // Extract LIMIT (fetch) value. + // Extract LIMIT (fetch) value — propagate to all scan types. if let Ok(FetchType::Literal(Some(n))) = limit_plan.get_fetch_type() { for task in &mut tasks { match &mut task.plan { - PhysicalPlan::Document(DocumentOp::Scan { limit, .. }) => *limit = n, - PhysicalPlan::Document(DocumentOp::RangeScan { limit, .. }) => { - *limit = n - } + PhysicalPlan::Document(DocumentOp::Scan { limit, .. }) + | PhysicalPlan::Document(DocumentOp::RangeScan { limit, .. }) + | PhysicalPlan::Columnar(ColumnarOp::Scan { limit, .. }) + | PhysicalPlan::Timeseries(TimeseriesOp::Scan { limit, .. }) + | PhysicalPlan::Spatial(SpatialOp::Scan { limit, .. }) => *limit = n, + PhysicalPlan::Kv(KvOp::Scan { count, .. }) => *count = n, _ => {} } } @@ -643,6 +582,46 @@ impl PlanConverter { } } +/// Resolve column projection from DataFusion's index-based representation +/// to named columns. Returns empty vec for "select all". +fn resolve_scan_projection(scan: &datafusion::logical_expr::TableScan) -> Vec { + scan.projection + .as_ref() + .map(|indices| { + let schema = scan.source.schema(); + indices + .iter() + .filter_map(|&idx| schema.fields().get(idx).map(|f| f.name().clone())) + .collect::>() + }) + .unwrap_or_default() +} + +/// Serialize a single Filter predicate expression into ScanFilter bytes. +fn serialize_predicate_filters(predicate: &Expr) -> crate::Result> { + let filters = expr_to_scan_filters(predicate); + rmp_serde::to_vec_named(&filters).map_err(|e| crate::Error::Serialization { + format: "msgpack".into(), + detail: format!("filter serialization: {e}"), + }) +} + +/// Serialize pushed-down TableScan filter expressions into ScanFilter bytes. +/// Returns empty vec when no filters are present. +fn serialize_scan_filters(scan_filters: &[Expr]) -> crate::Result> { + if scan_filters.is_empty() { + return Ok(Vec::new()); + } + let mut all_filters = Vec::new(); + for f in scan_filters { + all_filters.extend(expr_to_scan_filters(f)); + } + rmp_serde::to_vec_named(&all_filters).map_err(|e| crate::Error::Serialization { + format: "msgpack".into(), + detail: format!("filter serialization: {e}"), + }) +} + /// Extract serialized filter predicates from a LogicalPlan (for recursive CTE terms). fn extract_filters_from_plan(plan: &LogicalPlan) -> Vec { match plan { diff --git a/nodedb/src/control/planner/converter_helpers.rs b/nodedb/src/control/planner/converter_helpers.rs index c010f9bb..02da2fbe 100644 --- a/nodedb/src/control/planner/converter_helpers.rs +++ b/nodedb/src/control/planner/converter_helpers.rs @@ -178,8 +178,12 @@ impl PlanConverter { let vshard = VShardId::from_collection(&collection); // Check for timeseries collection with time_bucket() in GROUP BY. - if self.is_timeseries(tenant_id, &collection) - && let Some(bucket_interval_ms) = try_extract_time_bucket_interval(&agg.group_expr) + if matches!( + self.collection_type(tenant_id, &collection), + Some(nodedb_types::CollectionType::Columnar( + nodedb_types::columnar::ColumnarProfile::Timeseries { .. } + )) + ) && let Some(bucket_interval_ms) = try_extract_time_bucket_interval(&agg.group_expr) { // Extract time range from input filters. let (time_range, filter_bytes) = diff --git a/nodedb/src/control/planner/dml.rs b/nodedb/src/control/planner/dml.rs index 5aa63079..7c967d7f 100644 --- a/nodedb/src/control/planner/dml.rs +++ b/nodedb/src/control/planner/dml.rs @@ -1,6 +1,8 @@ //! DML plan conversion: INSERT, UPDATE, DELETE. use datafusion::prelude::*; +use nodedb_types::CollectionType; +use nodedb_types::columnar::ColumnarProfile; use crate::bridge::envelope::PhysicalPlan; use crate::bridge::physical_plan::{ColumnarOp, DocumentOp, KvOp, TimeseriesOp}; @@ -76,19 +78,23 @@ impl PlanConverter { let collection = dml.table_name.to_string().to_lowercase(); let vshard = VShardId::from_collection(&collection); - // KV collection DML routing. - if self.is_kv(tenant_id, &collection) { - return self.convert_kv_dml(dml, tenant_id, &collection, vshard); - } - - // Timeseries collection DML routing. - if self.is_timeseries(tenant_id, &collection) { - return self.convert_timeseries_dml(dml, tenant_id, &collection, vshard); - } - - // Plain columnar: route inserts to ColumnarOp::Insert. - if self.is_plain_columnar(tenant_id, &collection) { - return self.convert_columnar_dml(dml, &collection, tenant_id, vshard); + // Dispatch by collection type — exhaustive match ensures new + // types get a compile error instead of silent misrouting. + match self.collection_type(tenant_id, &collection) { + Some(CollectionType::KeyValue(_)) => { + return self.convert_kv_dml(dml, tenant_id, &collection, vshard); + } + Some(CollectionType::Columnar(ColumnarProfile::Timeseries { .. })) => { + return self.convert_timeseries_dml(dml, tenant_id, &collection, vshard); + } + Some(CollectionType::Columnar(ColumnarProfile::Plain)) => { + return self.convert_columnar_dml(dml, &collection, tenant_id, vshard); + } + Some(CollectionType::Columnar(ColumnarProfile::Spatial { .. })) => { + return self.convert_columnar_dml(dml, &collection, tenant_id, vshard); + } + // Document (schemaless/strict) or unknown catalog. + Some(CollectionType::Document(_)) | None => {} } // Strict and schemaless document collections both use DocumentOp. diff --git a/nodedb/src/control/server/native/dispatch/plan_builder/document.rs b/nodedb/src/control/server/native/dispatch/plan_builder/document.rs index a0c57307..241dffe6 100644 --- a/nodedb/src/control/server/native/dispatch/plan_builder/document.rs +++ b/nodedb/src/control/server/native/dispatch/plan_builder/document.rs @@ -1,11 +1,13 @@ //! Document engine plan builders. +use nodedb_types::CollectionType; +use nodedb_types::columnar::ColumnarProfile; use nodedb_types::protocol::TextFields; use crate::bridge::envelope::PhysicalPlan; use crate::bridge::physical_plan::{DocumentOp, KvOp, TimeseriesOp}; -use super::{DispatchCtx, is_kv, is_timeseries, require_doc_id}; +use super::{DispatchCtx, collection_type, require_doc_id}; pub(crate) fn build_point_get( ctx: &DispatchCtx<'_>, @@ -13,25 +15,32 @@ pub(crate) fn build_point_get( collection: &str, ) -> crate::Result { let doc_id = require_doc_id(fields)?; - if is_kv(ctx, collection) { - return Ok(PhysicalPlan::Kv(KvOp::Get { + match collection_type(ctx, collection) { + Some(CollectionType::KeyValue(_)) => Ok(PhysicalPlan::Kv(KvOp::Get { collection: collection.to_string(), key: doc_id.into_bytes(), rls_filters: Vec::new(), - })); - } - if is_timeseries(ctx, collection) { - return Err(crate::Error::BadRequest { - detail: - "PointGet not supported on timeseries collections (use SQL SELECT with time range)" + })), + Some(CollectionType::Columnar(ColumnarProfile::Timeseries { .. })) => { + Err(crate::Error::BadRequest { + detail: "PointGet not supported on timeseries collections \ + (use SQL SELECT with time range)" .to_string(), - }); + }) + } + Some(CollectionType::Columnar(_)) => Err(crate::Error::BadRequest { + detail: "PointGet not supported on columnar collections \ + (use SQL SELECT with filters)" + .to_string(), + }), + Some(CollectionType::Document(_)) | None => { + Ok(PhysicalPlan::Document(DocumentOp::PointGet { + collection: collection.to_string(), + document_id: doc_id, + rls_filters: Vec::new(), + })) + } } - Ok(PhysicalPlan::Document(DocumentOp::PointGet { - collection: collection.to_string(), - document_id: doc_id, - rls_filters: Vec::new(), - })) } pub(crate) fn build_point_put( @@ -41,28 +50,35 @@ pub(crate) fn build_point_put( ) -> crate::Result { let doc_id = require_doc_id(fields)?; let value = fields.data.clone().unwrap_or_default(); - if is_kv(ctx, collection) { - return Ok(PhysicalPlan::Kv(KvOp::Put { + match collection_type(ctx, collection) { + Some(CollectionType::KeyValue(_)) => Ok(PhysicalPlan::Kv(KvOp::Put { collection: collection.to_string(), key: doc_id.into_bytes(), value, ttl_ms: 0, - })); - } - if is_timeseries(ctx, collection) { - let json_str = String::from_utf8_lossy(&value); - let ilp_line = format!("{collection} value={json_str}\n"); - return Ok(PhysicalPlan::Timeseries(TimeseriesOp::Ingest { - collection: collection.to_string(), - payload: ilp_line.into_bytes(), - format: "ilp".to_string(), - })); + })), + Some(CollectionType::Columnar(ColumnarProfile::Timeseries { .. })) => { + let json_str = String::from_utf8_lossy(&value); + let ilp_line = format!("{collection} value={json_str}\n"); + Ok(PhysicalPlan::Timeseries(TimeseriesOp::Ingest { + collection: collection.to_string(), + payload: ilp_line.into_bytes(), + format: "ilp".to_string(), + })) + } + Some(CollectionType::Columnar(_)) => Err(crate::Error::BadRequest { + detail: "PointPut not supported on columnar collections \ + (use SQL INSERT)" + .to_string(), + }), + Some(CollectionType::Document(_)) | None => { + Ok(PhysicalPlan::Document(DocumentOp::PointPut { + collection: collection.to_string(), + document_id: doc_id, + value, + })) + } } - Ok(PhysicalPlan::Document(DocumentOp::PointPut { - collection: collection.to_string(), - document_id: doc_id, - value, - })) } pub(crate) fn build_point_delete( @@ -71,21 +87,30 @@ pub(crate) fn build_point_delete( collection: &str, ) -> crate::Result { let doc_id = require_doc_id(fields)?; - if is_kv(ctx, collection) { - return Ok(PhysicalPlan::Kv(KvOp::Delete { + match collection_type(ctx, collection) { + Some(CollectionType::KeyValue(_)) => Ok(PhysicalPlan::Kv(KvOp::Delete { collection: collection.to_string(), keys: vec![doc_id.into_bytes()], - })); - } - if is_timeseries(ctx, collection) { - return Err(crate::Error::BadRequest { - detail: "PointDelete not supported on timeseries collections (append-only; use retention policies)".to_string(), - }); + })), + Some(CollectionType::Columnar(ColumnarProfile::Timeseries { .. })) => { + Err(crate::Error::BadRequest { + detail: "PointDelete not supported on timeseries collections \ + (append-only; use retention policies)" + .to_string(), + }) + } + Some(CollectionType::Columnar(_)) => Err(crate::Error::BadRequest { + detail: "PointDelete not supported on columnar collections \ + (append-only)" + .to_string(), + }), + Some(CollectionType::Document(_)) | None => { + Ok(PhysicalPlan::Document(DocumentOp::PointDelete { + collection: collection.to_string(), + document_id: doc_id, + })) + } } - Ok(PhysicalPlan::Document(DocumentOp::PointDelete { - collection: collection.to_string(), - document_id: doc_id, - })) } pub(crate) fn build_range_scan( diff --git a/nodedb/src/control/server/native/dispatch/plan_builder/mod.rs b/nodedb/src/control/server/native/dispatch/plan_builder/mod.rs index dc98d65c..ac772f76 100644 --- a/nodedb/src/control/server/native/dispatch/plan_builder/mod.rs +++ b/nodedb/src/control/server/native/dispatch/plan_builder/mod.rs @@ -102,24 +102,19 @@ pub(crate) fn build_plan( // ── Shared helpers ────────────────────────────────────────────────── -/// Check if a collection is KV type via catalog lookup. -pub(super) fn is_kv(ctx: &DispatchCtx<'_>, collection: &str) -> bool { - if let Some(catalog) = ctx.state.credentials.catalog() - && let Ok(Some(coll)) = catalog.get_collection(ctx.identity.tenant_id.as_u32(), collection) - { - return coll.collection_type.is_kv(); - } - false -} - -/// Check if a collection is timeseries type via catalog lookup. -pub(super) fn is_timeseries(ctx: &DispatchCtx<'_>, collection: &str) -> bool { - if let Some(catalog) = ctx.state.credentials.catalog() - && let Ok(Some(coll)) = catalog.get_collection(ctx.identity.tenant_id.as_u32(), collection) - { - return coll.collection_type.is_timeseries(); - } - false +/// Single catalog lookup returning the collection's storage type. +/// +/// Returns `None` when: no catalog available, collection not found, +/// or catalog read error. Callers treat `None` as "default to document". +pub(super) fn collection_type( + ctx: &DispatchCtx<'_>, + collection: &str, +) -> Option { + let catalog = ctx.state.credentials.catalog().as_ref()?; + let coll = catalog + .get_collection(ctx.identity.tenant_id.as_u32(), collection) + .ok()??; + Some(coll.collection_type.clone()) } /// Extract document_id from request fields. diff --git a/nodedb/src/control/server/pgwire/ddl/collection.rs b/nodedb/src/control/server/pgwire/ddl/collection.rs index d78c9d54..b68677dc 100644 --- a/nodedb/src/control/server/pgwire/ddl/collection.rs +++ b/nodedb/src/control/server/pgwire/ddl/collection.rs @@ -213,14 +213,23 @@ pub async fn dispatch_register_if_needed( return; }; - // Determine storage mode from collection type. + // Determine storage mode from collection type — exhaustive match + // ensures new CollectionType variants get a compile error here. let storage_mode = match &coll.collection_type { nodedb_types::CollectionType::Document(nodedb_types::DocumentMode::Strict(schema)) => { crate::bridge::physical_plan::StorageMode::Strict { schema: schema.clone(), } } - _ => crate::bridge::physical_plan::StorageMode::Schemaless, + nodedb_types::CollectionType::KeyValue(config) => { + crate::bridge::physical_plan::StorageMode::Strict { + schema: config.schema.clone(), + } + } + nodedb_types::CollectionType::Document(nodedb_types::DocumentMode::Schemaless) + | nodedb_types::CollectionType::Columnar(_) => { + crate::bridge::physical_plan::StorageMode::Schemaless + } }; // Parse index paths from FIELDS clause (if any). diff --git a/nodedb/src/control/server/pgwire/ddl/collection_insert.rs b/nodedb/src/control/server/pgwire/ddl/collection_insert.rs index 2dee261a..9bbb305e 100644 --- a/nodedb/src/control/server/pgwire/ddl/collection_insert.rs +++ b/nodedb/src/control/server/pgwire/ddl/collection_insert.rs @@ -49,10 +49,10 @@ fn parse_write_statement( if !coll.fields.is_empty() { return None; } - // Skip strict/columnar collections — they need schema-aware insert, - // not schemaless document insert. For now, dispatch them to the Data - // Plane which validates against the stored schema. - if coll.collection_type.is_strict() || coll.collection_type.is_columnar() { + // Skip non-schemaless collections — they need schema-aware insert + // (strict, columnar, timeseries, spatial) or engine-specific insert + // (KV). Dispatch these to DataFusion / Data Plane instead. + if !coll.collection_type.is_schemaless() { return None; } } From 4bc69cbdb7962cc534244e162cd6f1863f6c2d48 Mon Sep 17 00:00:00 2001 From: Farhan Syah Date: Mon, 30 Mar 2026 21:44:42 +0800 Subject: [PATCH 2/2] fix(ilp): correct schema propagation and remove unwrap in ILP ingest path In ilp_listener.rs, fix schema comparison to use content equality instead of length comparison, replace silent error swallowing on catalog writes with tracing::warn!, fix catalog() call to use .as_ref(), and collapse nested if chains into let-chain conditions per clippy. In timeseries.rs, replace .unwrap() on memtable lookups with proper let-else guards that return a structured Internal error response, and convert the third unwrap in the schema-columns branch to a let-chain condition so a missing memtable no longer panics. --- nodedb/src/control/server/ilp_listener.rs | 57 +++++++------- .../src/data/executor/handlers/timeseries.rs | 74 +++++++++++-------- 2 files changed, 74 insertions(+), 57 deletions(-) diff --git a/nodedb/src/control/server/ilp_listener.rs b/nodedb/src/control/server/ilp_listener.rs index ee60db6d..95f7ecd2 100644 --- a/nodedb/src/control/server/ilp_listener.rs +++ b/nodedb/src/control/server/ilp_listener.rs @@ -326,33 +326,36 @@ async fn flush_ilp_batch( ) .await?; - if !response.payload.is_empty() { - if let Ok(v) = serde_json::from_slice::(&response.payload) { - total_accepted += v.get("accepted").and_then(|a| a.as_u64()).unwrap_or(0); - - if let Some(schema_cols) = v.get("schema_columns").and_then(|s| s.as_array()) { - let fields: Vec<(String, String)> = schema_cols - .iter() - .filter_map(|pair| { - let arr = pair.as_array()?; - Some(( - arr.first()?.as_str()?.to_string(), - arr.get(1)?.as_str()?.to_string(), - )) - }) - .collect(); - - if !fields.is_empty() { - if let Some(catalog) = state.credentials.catalog() { - if let Ok(Some(mut coll)) = - catalog.get_collection(tenant_id.as_u32(), &collection) - { - if coll.fields.len() != fields.len() { - coll.fields = fields; - let _ = catalog.put_collection(&coll); - } - } - } + if !response.payload.is_empty() + && let Ok(v) = serde_json::from_slice::(&response.payload) + { + total_accepted += v.get("accepted").and_then(|a| a.as_u64()).unwrap_or(0); + + if let Some(schema_cols) = v.get("schema_columns").and_then(|s| s.as_array()) { + let fields: Vec<(String, String)> = schema_cols + .iter() + .filter_map(|pair| { + let arr = pair.as_array()?; + Some(( + arr.first()?.as_str()?.to_string(), + arr.get(1)?.as_str()?.to_string(), + )) + }) + .collect(); + + if !fields.is_empty() + && let Some(catalog) = state.credentials.catalog().as_ref() + && let Ok(Some(mut coll)) = + catalog.get_collection(tenant_id.as_u32(), &collection) + && coll.fields != fields + { + coll.fields = fields; + if let Err(e) = catalog.put_collection(&coll) { + tracing::warn!( + collection = %collection, + error = %e, + "failed to propagate ILP schema to catalog", + ); } } } diff --git a/nodedb/src/data/executor/handlers/timeseries.rs b/nodedb/src/data/executor/handlers/timeseries.rs index a763ff3f..dcd6c0ff 100644 --- a/nodedb/src/data/executor/handlers/timeseries.rs +++ b/nodedb/src/data/executor/handlers/timeseries.rs @@ -416,48 +416,62 @@ impl CoreLoop { self.columnar_memtables.insert(collection.to_string(), mt); } - let mt = self.columnar_memtables.get_mut(collection).unwrap(); + let Some(mt) = self.columnar_memtables.get_mut(collection) else { + return self.response_error( + task, + ErrorCode::Internal { + detail: format!("memtable missing after init: {collection}"), + }, + ); + }; let mut series_keys = HashMap::new(); let (accepted, rejected) = ilp_ingest::ingest_batch(mt, &lines, &mut series_keys, now_ms); // Check if memtable needs flushing. - let mt = self.columnar_memtables.get(collection).unwrap(); + let Some(mt) = self.columnar_memtables.get(collection) else { + return self.response_error( + task, + ErrorCode::Internal { + detail: format!("memtable missing after ingest: {collection}"), + }, + ); + }; if mt.memory_bytes() >= 64 * 1024 * 1024 { self.flush_ts_collection(collection, now_ms); } self.checkpoint_coordinator .mark_dirty("timeseries", accepted); - let result = if is_new_memtable { - let mt = self.columnar_memtables.get(collection).unwrap(); - let schema_columns: Vec = mt - .schema() - .columns - .iter() - .map(|(name, col_type)| { - let type_str = match col_type { - ColumnType::Timestamp => "TIMESTAMP", - ColumnType::Float64 => "FLOAT", - ColumnType::Int64 => "BIGINT", - ColumnType::Symbol => "VARCHAR", - }; - serde_json::json!([name, type_str]) + let result = + if is_new_memtable && let Some(mt) = self.columnar_memtables.get(collection) { + let schema_columns: Vec = mt + .schema() + .columns + .iter() + .map(|(name, col_type)| { + let type_str = match col_type { + ColumnType::Timestamp => "TIMESTAMP", + ColumnType::Float64 => "FLOAT", + ColumnType::Int64 => "BIGINT", + ColumnType::Symbol => "VARCHAR", + }; + serde_json::json!([name, type_str]) + }) + .collect(); + serde_json::json!({ + "accepted": accepted, + "rejected": rejected, + "collection": collection, + "schema_columns": schema_columns, }) - .collect(); - serde_json::json!({ - "accepted": accepted, - "rejected": rejected, - "collection": collection, - "schema_columns": schema_columns, - }) - } else { - serde_json::json!({ - "accepted": accepted, - "rejected": rejected, - "collection": collection, - }) - }; + } else { + serde_json::json!({ + "accepted": accepted, + "rejected": rejected, + "collection": collection, + }) + }; let json = serde_json::to_vec(&result).unwrap_or_default(); Response { request_id: task.request.request_id,