diff --git a/src/dataflow-types/src/types.rs b/src/dataflow-types/src/types.rs index 81a49f7e7c753..5f26385ab0c89 100644 --- a/src/dataflow-types/src/types.rs +++ b/src/dataflow-types/src/types.rs @@ -1438,6 +1438,31 @@ pub mod sources { } } + /// Returns `true` if this connector yields data that is + /// append-only/monotonic. Append-monly means the source + /// never produces retractions. + // TODO(guswynn): consider enforcing this more completely at the + // parsing/typechecking level, by not using an `envelope` + // for sources like pg + pub fn append_only(&self) -> bool { + match self { + // Postgres can produce retractions (deletes) + SourceConnector::External { + connector: ExternalSourceConnector::Postgres(_), + .. + } => false, + // Local sources (i.e., tables) also support retractions (deletes) + SourceConnector::Local { .. } => false, + // Other sources the `None` envelope are append-only + SourceConnector::External { + envelope: SourceEnvelope::None(_), + .. + } => true, + // Other envelopes can produce retractions + _ => false, + } + } + pub fn name(&self) -> &'static str { match self { SourceConnector::External { connector, .. } => connector.name(), diff --git a/src/transform/src/dataflow.rs b/src/transform/src/dataflow.rs index 98408a96ec3f0..3a639748033dc 100644 --- a/src/transform/src/dataflow.rs +++ b/src/transform/src/dataflow.rs @@ -351,11 +351,7 @@ where pub fn optimize_dataflow_monotonic(dataflow: &mut DataflowDesc) -> Result<(), TransformError> { let mut monotonic = std::collections::HashSet::new(); for (source_id, source) in dataflow.source_imports.iter_mut() { - if let mz_dataflow_types::sources::SourceConnector::External { - envelope: mz_dataflow_types::sources::SourceEnvelope::None(_), - .. - } = source.description.connector - { + if source.description.connector.append_only() { monotonic.insert(source_id.clone()); } }