diff --git a/crates/paimon/src/spec/core_options.rs b/crates/paimon/src/spec/core_options.rs index aafdf52a..a7adb1df 100644 --- a/crates/paimon/src/spec/core_options.rs +++ b/crates/paimon/src/spec/core_options.rs @@ -106,6 +106,13 @@ impl ChangelogProducer { } } +pub(crate) fn first_row_supports_changelog_producer(producer: ChangelogProducer) -> bool { + matches!( + producer, + ChangelogProducer::None | ChangelogProducer::Lookup + ) +} + /// Format the bucket directory name for a given bucket number. /// Returns `"bucket-postpone"` for `POSTPONE_BUCKET` (-2), otherwise `"bucket-{N}"`. pub fn bucket_dir_name(bucket: i32) -> String { diff --git a/crates/paimon/src/spec/schema.rs b/crates/paimon/src/spec/schema.rs index e6d5af99..76d09b4c 100644 --- a/crates/paimon/src/spec/schema.rs +++ b/crates/paimon/src/spec/schema.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::spec::core_options::CoreOptions; +use crate::spec::core_options::{first_row_supports_changelog_producer, CoreOptions}; use crate::spec::types::{ArrayType, DataType, MapType, MultisetType, RowType}; use crate::spec::PartialUpdateConfig; use serde::{Deserialize, Serialize}; @@ -148,6 +148,7 @@ impl TableSchema { } } + Schema::validate_first_row_changelog_producer(&new_schema.options)?; Ok(new_schema) } @@ -263,6 +264,7 @@ pub fn escape_single_quotes(text: &str) -> String { pub const PRIMARY_KEY_OPTION: &str = "primary-key"; /// Option key for partition in table options (same as [CoreOptions.PARTITION](https://github.com/apache/paimon/blob/release-1.3/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java)). pub const PARTITION_OPTION: &str = "partition"; +const MERGE_ENGINE_OPTION: &str = "merge-engine"; /// Schema of a table (logical DDL schema). /// @@ -291,6 +293,7 @@ impl Schema { let fields = Self::normalize_fields(&fields, &partition_keys, &primary_keys)?; Self::validate_blob_fields(&fields, &partition_keys, &options)?; PartialUpdateConfig::new(&options).validate_create_mode(!primary_keys.is_empty())?; + Self::validate_first_row_changelog_producer(&options)?; Ok(Self { fields, @@ -506,6 +509,40 @@ impl Schema { Ok(()) } + fn validate_first_row_changelog_producer( + options: &HashMap, + ) -> crate::Result<()> { + if !options + .get(MERGE_ENGINE_OPTION) + .is_some_and(|value| value.eq_ignore_ascii_case("first-row")) + { + return Ok(()); + } + + let changelog_producer = CoreOptions::new(options) + .try_changelog_producer() + .map_err(Self::options_error_to_config_invalid)?; + if first_row_supports_changelog_producer(changelog_producer) { + return Ok(()); + } + + Err(crate::Error::ConfigInvalid { + message: format!( + "merge-engine=first-row only supports changelog-producer=none or lookup, but found changelog-producer={}", + changelog_producer.as_str() + ), + }) + } + + fn options_error_to_config_invalid(error: crate::Error) -> crate::Error { + match error { + crate::Error::Unsupported { message } => crate::Error::ConfigInvalid { message }, + other => crate::Error::ConfigInvalid { + message: other.to_string(), + }, + } + } + /// Returns top-level Blob field names for create-time Blob contract checks. fn top_level_blob_field_names(fields: &[DataField]) -> Vec<&str> { fields @@ -978,6 +1015,128 @@ mod tests { } } + #[test] + fn test_first_row_schema_validation_accepts_supported_changelog_producers() { + for producer in ["none", "lookup"] { + Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .primary_key(["id"]) + .option("merge-engine", "first-row") + .option("changelog-producer", producer) + .build() + .unwrap(); + } + } + + #[test] + fn test_first_row_schema_validation_rejects_incompatible_changelog_producers() { + for producer in ["input", "full-compaction"] { + let err = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .primary_key(["id"]) + .option("merge-engine", "first-row") + .option("changelog-producer", producer) + .build() + .unwrap_err(); + + assert!( + matches!(err, crate::Error::ConfigInvalid { ref message } + if message.contains("merge-engine=first-row") + && message.contains("changelog-producer") + && message.contains(producer)), + "first-row should reject changelog-producer={producer}, got {err:?}" + ); + } + } + + #[test] + fn test_first_row_apply_changes_rejects_incompatible_changelog_producers() { + let table_schema = TableSchema::new( + 0, + &Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .primary_key(["id"]) + .option("merge-engine", "first-row") + .option("changelog-producer", "lookup") + .build() + .unwrap(), + ); + + for producer in ["input", "full-compaction"] { + let err = table_schema + .apply_changes(vec![crate::spec::SchemaChange::set_option( + "changelog-producer".to_string(), + producer.to_string(), + )]) + .unwrap_err(); + + assert!( + matches!(err, crate::Error::ConfigInvalid { ref message } + if message.contains("merge-engine=first-row") + && message.contains("changelog-producer") + && message.contains(producer)), + "first-row alter should reject changelog-producer={producer}, got {err:?}" + ); + } + } + + #[test] + fn test_first_row_apply_changes_validates_final_options() { + let table_schema = TableSchema::new( + 0, + &Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .primary_key(["id"]) + .option("changelog-producer", "input") + .build() + .unwrap(), + ); + + let err = table_schema + .apply_changes(vec![crate::spec::SchemaChange::set_option( + "merge-engine".to_string(), + "first-row".to_string(), + )]) + .unwrap_err(); + + assert!( + matches!(err, crate::Error::ConfigInvalid { ref message } + if message.contains("merge-engine=first-row") + && message.contains("changelog-producer") + && message.contains("input")), + "first-row alter should reject incompatible final options, got {err:?}" + ); + + let new_schema = table_schema + .apply_changes(vec![ + crate::spec::SchemaChange::set_option( + "merge-engine".to_string(), + "first-row".to_string(), + ), + crate::spec::SchemaChange::set_option( + "changelog-producer".to_string(), + "lookup".to_string(), + ), + ]) + .unwrap(); + + assert_eq!( + new_schema.options().get("merge-engine").map(String::as_str), + Some("first-row") + ); + assert_eq!( + new_schema + .options() + .get("changelog-producer") + .map(String::as_str), + Some("lookup") + ); + } + #[test] fn test_schema_builder_column_row_type() { let row_type = RowType::new(vec![DataField::new( diff --git a/crates/paimon/src/table/table_write.rs b/crates/paimon/src/table/table_write.rs index f34c124c..fea6ff79 100644 --- a/crates/paimon/src/table/table_write.rs +++ b/crates/paimon/src/table/table_write.rs @@ -23,8 +23,8 @@ use crate::arrow::build_target_arrow_schema; use crate::spec::PartitionComputer; use crate::spec::{ - BinaryRow, ChangelogProducer, CoreOptions, DataType, MergeEngine, EMPTY_SERIALIZED_ROW, - POSTPONE_BUCKET, + first_row_supports_changelog_producer, BinaryRow, ChangelogProducer, CoreOptions, DataType, + MergeEngine, EMPTY_SERIALIZED_ROW, POSTPONE_BUCKET, }; use crate::table::blob_file_writer::AppendBlobFileWriter; use crate::table::bucket_assigner::{BucketAssignerEnum, PartitionBucketKey}; @@ -224,6 +224,18 @@ impl TableWrite { let merge_engine = core_options.merge_engine()?; + if merge_engine == MergeEngine::FirstRow + && !first_row_supports_changelog_producer(changelog_producer) + { + return Err(crate::Error::Unsupported { + message: format!( + "Table '{}' has incompatible table options: merge-engine=first-row only supports changelog-producer=none or lookup, but found changelog-producer={}", + table.identifier().full_name(), + changelog_producer.as_str() + ), + }); + } + if is_dynamic_cross_partition && merge_engine == MergeEngine::PartialUpdate { return Err(crate::Error::Unsupported { message: @@ -1564,6 +1576,21 @@ mod tests { TableSchema::new(0, &builder.build().unwrap()) } + fn loaded_first_row_schema_with_changelog_producer(producer: &str) -> TableSchema { + let schema = Schema::builder() + .column("id", DataType::Int(IntType::new())) + .column("value", DataType::Int(IntType::new())) + .primary_key(["id"]) + .option("merge-engine", "first-row") + .option("changelog-producer", "lookup") + .build() + .unwrap(); + let table_schema = TableSchema::new(0, &schema); + let mut value = serde_json::to_value(&table_schema).unwrap(); + value["options"]["changelog-producer"] = serde_json::Value::String(producer.to_string()); + serde_json::from_value(value).unwrap() + } + fn ordinary_dynamic_pk_changelog_schema() -> TableSchema { let schema = Schema::builder() .column("pt", DataType::VarChar(VarCharType::string_type())) @@ -1577,6 +1604,36 @@ mod tests { TableSchema::new(0, &schema) } + #[test] + fn test_table_write_rejects_loaded_first_row_with_incompatible_changelog_producer() { + let file_io = test_file_io(); + + for producer in ["input", "full-compaction"] { + let table = Table::new( + file_io.clone(), + Identifier::new("default", "test_first_row_changelog"), + "memory:/test_first_row_changelog".to_string(), + loaded_first_row_schema_with_changelog_producer(producer), + None, + ); + + let err = match TableWrite::new(&table, "test-user".to_string()) { + Ok(_) => panic!( + "first-row should reject changelog-producer={producer} during write setup" + ), + Err(err) => err, + }; + + assert!( + matches!(err, crate::Error::Unsupported { ref message } + if message.contains("incompatible table options") + && message.contains("merge-engine=first-row") + && message.contains(producer)), + "first-row runtime guard should reject changelog-producer={producer}, got {err:?}" + ); + } + } + #[tokio::test] async fn test_input_changelog_writes_raw_rows_separately_from_data_rows() { let file_io = test_file_io();