Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
571 changes: 275 additions & 296 deletions nodedb/src/control/planner/converter.rs

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions nodedb/src/control/planner/converter_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,12 @@ impl PlanConverter {
let vshard = VShardId::from_collection(&collection);

// Check for timeseries collection with time_bucket() in GROUP BY.
if self.is_timeseries(tenant_id, &collection)
&& let Some(bucket_interval_ms) = try_extract_time_bucket_interval(&agg.group_expr)
if matches!(
self.collection_type(tenant_id, &collection),
Some(nodedb_types::CollectionType::Columnar(
nodedb_types::columnar::ColumnarProfile::Timeseries { .. }
))
) && let Some(bucket_interval_ms) = try_extract_time_bucket_interval(&agg.group_expr)
{
// Extract time range from input filters.
let (time_range, filter_bytes) =
Expand Down
32 changes: 19 additions & 13 deletions nodedb/src/control/planner/dml.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
//! DML plan conversion: INSERT, UPDATE, DELETE.

use datafusion::prelude::*;
use nodedb_types::CollectionType;
use nodedb_types::columnar::ColumnarProfile;

use crate::bridge::envelope::PhysicalPlan;
use crate::bridge::physical_plan::{ColumnarOp, DocumentOp, KvOp, TimeseriesOp};
Expand Down Expand Up @@ -76,19 +78,23 @@ impl PlanConverter {
let collection = dml.table_name.to_string().to_lowercase();
let vshard = VShardId::from_collection(&collection);

// KV collection DML routing.
if self.is_kv(tenant_id, &collection) {
return self.convert_kv_dml(dml, tenant_id, &collection, vshard);
}

// Timeseries collection DML routing.
if self.is_timeseries(tenant_id, &collection) {
return self.convert_timeseries_dml(dml, tenant_id, &collection, vshard);
}

// Plain columnar: route inserts to ColumnarOp::Insert.
if self.is_plain_columnar(tenant_id, &collection) {
return self.convert_columnar_dml(dml, &collection, tenant_id, vshard);
// Dispatch by collection type — exhaustive match ensures new
// types get a compile error instead of silent misrouting.
match self.collection_type(tenant_id, &collection) {
Some(CollectionType::KeyValue(_)) => {
return self.convert_kv_dml(dml, tenant_id, &collection, vshard);
}
Some(CollectionType::Columnar(ColumnarProfile::Timeseries { .. })) => {
return self.convert_timeseries_dml(dml, tenant_id, &collection, vshard);
}
Some(CollectionType::Columnar(ColumnarProfile::Plain)) => {
return self.convert_columnar_dml(dml, &collection, tenant_id, vshard);
}
Some(CollectionType::Columnar(ColumnarProfile::Spatial { .. })) => {
return self.convert_columnar_dml(dml, &collection, tenant_id, vshard);
}
// Document (schemaless/strict) or unknown catalog.
Some(CollectionType::Document(_)) | None => {}
}

// Strict and schemaless document collections both use DocumentOp.
Expand Down
12 changes: 9 additions & 3 deletions nodedb/src/control/server/ilp_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,13 +344,19 @@ async fn flush_ilp_batch(
.collect();

if !fields.is_empty()
&& let Some(catalog) = state.credentials.catalog()
&& let Some(catalog) = state.credentials.catalog().as_ref()
&& let Ok(Some(mut coll)) =
catalog.get_collection(tenant_id.as_u32(), &collection)
&& coll.fields.len() != fields.len()
&& coll.fields != fields
{
coll.fields = fields;
let _ = catalog.put_collection(&coll);
if let Err(e) = catalog.put_collection(&coll) {
tracing::warn!(
collection = %collection,
error = %e,
"failed to propagate ILP schema to catalog",
);
}
}
}
}
Expand Down
113 changes: 69 additions & 44 deletions nodedb/src/control/server/native/dispatch/plan_builder/document.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,46 @@
//! Document engine plan builders.

use nodedb_types::CollectionType;
use nodedb_types::columnar::ColumnarProfile;
use nodedb_types::protocol::TextFields;

use crate::bridge::envelope::PhysicalPlan;
use crate::bridge::physical_plan::{DocumentOp, KvOp, TimeseriesOp};

use super::{DispatchCtx, is_kv, is_timeseries, require_doc_id};
use super::{DispatchCtx, collection_type, require_doc_id};

pub(crate) fn build_point_get(
ctx: &DispatchCtx<'_>,
fields: &TextFields,
collection: &str,
) -> crate::Result<PhysicalPlan> {
let doc_id = require_doc_id(fields)?;
if is_kv(ctx, collection) {
return Ok(PhysicalPlan::Kv(KvOp::Get {
match collection_type(ctx, collection) {
Some(CollectionType::KeyValue(_)) => Ok(PhysicalPlan::Kv(KvOp::Get {
collection: collection.to_string(),
key: doc_id.into_bytes(),
rls_filters: Vec::new(),
}));
}
if is_timeseries(ctx, collection) {
return Err(crate::Error::BadRequest {
detail:
"PointGet not supported on timeseries collections (use SQL SELECT with time range)"
})),
Some(CollectionType::Columnar(ColumnarProfile::Timeseries { .. })) => {
Err(crate::Error::BadRequest {
detail: "PointGet not supported on timeseries collections \
(use SQL SELECT with time range)"
.to_string(),
});
})
}
Some(CollectionType::Columnar(_)) => Err(crate::Error::BadRequest {
detail: "PointGet not supported on columnar collections \
(use SQL SELECT with filters)"
.to_string(),
}),
Some(CollectionType::Document(_)) | None => {
Ok(PhysicalPlan::Document(DocumentOp::PointGet {
collection: collection.to_string(),
document_id: doc_id,
rls_filters: Vec::new(),
}))
}
}
Ok(PhysicalPlan::Document(DocumentOp::PointGet {
collection: collection.to_string(),
document_id: doc_id,
rls_filters: Vec::new(),
}))
}

pub(crate) fn build_point_put(
Expand All @@ -41,28 +50,35 @@ pub(crate) fn build_point_put(
) -> crate::Result<PhysicalPlan> {
let doc_id = require_doc_id(fields)?;
let value = fields.data.clone().unwrap_or_default();
if is_kv(ctx, collection) {
return Ok(PhysicalPlan::Kv(KvOp::Put {
match collection_type(ctx, collection) {
Some(CollectionType::KeyValue(_)) => Ok(PhysicalPlan::Kv(KvOp::Put {
collection: collection.to_string(),
key: doc_id.into_bytes(),
value,
ttl_ms: 0,
}));
}
if is_timeseries(ctx, collection) {
let json_str = String::from_utf8_lossy(&value);
let ilp_line = format!("{collection} value={json_str}\n");
return Ok(PhysicalPlan::Timeseries(TimeseriesOp::Ingest {
collection: collection.to_string(),
payload: ilp_line.into_bytes(),
format: "ilp".to_string(),
}));
})),
Some(CollectionType::Columnar(ColumnarProfile::Timeseries { .. })) => {
let json_str = String::from_utf8_lossy(&value);
let ilp_line = format!("{collection} value={json_str}\n");
Ok(PhysicalPlan::Timeseries(TimeseriesOp::Ingest {
collection: collection.to_string(),
payload: ilp_line.into_bytes(),
format: "ilp".to_string(),
}))
}
Some(CollectionType::Columnar(_)) => Err(crate::Error::BadRequest {
detail: "PointPut not supported on columnar collections \
(use SQL INSERT)"
.to_string(),
}),
Some(CollectionType::Document(_)) | None => {
Ok(PhysicalPlan::Document(DocumentOp::PointPut {
collection: collection.to_string(),
document_id: doc_id,
value,
}))
}
}
Ok(PhysicalPlan::Document(DocumentOp::PointPut {
collection: collection.to_string(),
document_id: doc_id,
value,
}))
}

pub(crate) fn build_point_delete(
Expand All @@ -71,21 +87,30 @@ pub(crate) fn build_point_delete(
collection: &str,
) -> crate::Result<PhysicalPlan> {
let doc_id = require_doc_id(fields)?;
if is_kv(ctx, collection) {
return Ok(PhysicalPlan::Kv(KvOp::Delete {
match collection_type(ctx, collection) {
Some(CollectionType::KeyValue(_)) => Ok(PhysicalPlan::Kv(KvOp::Delete {
collection: collection.to_string(),
keys: vec![doc_id.into_bytes()],
}));
}
if is_timeseries(ctx, collection) {
return Err(crate::Error::BadRequest {
detail: "PointDelete not supported on timeseries collections (append-only; use retention policies)".to_string(),
});
})),
Some(CollectionType::Columnar(ColumnarProfile::Timeseries { .. })) => {
Err(crate::Error::BadRequest {
detail: "PointDelete not supported on timeseries collections \
(append-only; use retention policies)"
.to_string(),
})
}
Some(CollectionType::Columnar(_)) => Err(crate::Error::BadRequest {
detail: "PointDelete not supported on columnar collections \
(append-only)"
.to_string(),
}),
Some(CollectionType::Document(_)) | None => {
Ok(PhysicalPlan::Document(DocumentOp::PointDelete {
collection: collection.to_string(),
document_id: doc_id,
}))
}
}
Ok(PhysicalPlan::Document(DocumentOp::PointDelete {
collection: collection.to_string(),
document_id: doc_id,
}))
}

pub(crate) fn build_range_scan(
Expand Down
31 changes: 13 additions & 18 deletions nodedb/src/control/server/native/dispatch/plan_builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,24 +102,19 @@ pub(crate) fn build_plan(

// ── Shared helpers ──────────────────────────────────────────────────

/// Check if a collection is KV type via catalog lookup.
pub(super) fn is_kv(ctx: &DispatchCtx<'_>, collection: &str) -> bool {
if let Some(catalog) = ctx.state.credentials.catalog()
&& let Ok(Some(coll)) = catalog.get_collection(ctx.identity.tenant_id.as_u32(), collection)
{
return coll.collection_type.is_kv();
}
false
}

/// Check if a collection is timeseries type via catalog lookup.
pub(super) fn is_timeseries(ctx: &DispatchCtx<'_>, collection: &str) -> bool {
if let Some(catalog) = ctx.state.credentials.catalog()
&& let Ok(Some(coll)) = catalog.get_collection(ctx.identity.tenant_id.as_u32(), collection)
{
return coll.collection_type.is_timeseries();
}
false
/// Single catalog lookup returning the collection's storage type.
///
/// Returns `None` when: no catalog available, collection not found,
/// or catalog read error. Callers treat `None` as "default to document".
pub(super) fn collection_type(
ctx: &DispatchCtx<'_>,
collection: &str,
) -> Option<nodedb_types::CollectionType> {
let catalog = ctx.state.credentials.catalog().as_ref()?;
let coll = catalog
.get_collection(ctx.identity.tenant_id.as_u32(), collection)
.ok()??;
Some(coll.collection_type.clone())
}

/// Extract document_id from request fields.
Expand Down
13 changes: 11 additions & 2 deletions nodedb/src/control/server/pgwire/ddl/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,23 @@ pub async fn dispatch_register_if_needed(
return;
};

// Determine storage mode from collection type.
// Determine storage mode from collection type — exhaustive match
// ensures new CollectionType variants get a compile error here.
let storage_mode = match &coll.collection_type {
nodedb_types::CollectionType::Document(nodedb_types::DocumentMode::Strict(schema)) => {
crate::bridge::physical_plan::StorageMode::Strict {
schema: schema.clone(),
}
}
_ => crate::bridge::physical_plan::StorageMode::Schemaless,
nodedb_types::CollectionType::KeyValue(config) => {
crate::bridge::physical_plan::StorageMode::Strict {
schema: config.schema.clone(),
}
}
nodedb_types::CollectionType::Document(nodedb_types::DocumentMode::Schemaless)
| nodedb_types::CollectionType::Columnar(_) => {
crate::bridge::physical_plan::StorageMode::Schemaless
}
};

// Parse index paths from FIELDS clause (if any).
Expand Down
8 changes: 4 additions & 4 deletions nodedb/src/control/server/pgwire/ddl/collection_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,10 @@ fn parse_write_statement(
if !coll.fields.is_empty() {
return None;
}
// Skip strict/columnar collections — they need schema-aware insert,
// not schemaless document insert. For now, dispatch them to the Data
// Plane which validates against the stored schema.
if coll.collection_type.is_strict() || coll.collection_type.is_columnar() {
// Skip non-schemaless collections — they need schema-aware insert
// (strict, columnar, timeseries, spatial) or engine-specific insert
// (KV). Dispatch these to DataFusion / Data Plane instead.
if !coll.collection_type.is_schemaless() {
return None;
}
}
Expand Down
Loading