Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/paimon/src/spec/core_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,13 @@ impl ChangelogProducer {
}
}

pub(crate) fn first_row_supports_changelog_producer(producer: ChangelogProducer) -> bool {
matches!(
producer,
ChangelogProducer::None | ChangelogProducer::Lookup
)
}

/// Format the bucket directory name for a given bucket number.
/// Returns `"bucket-postpone"` for `POSTPONE_BUCKET` (-2), otherwise `"bucket-{N}"`.
pub fn bucket_dir_name(bucket: i32) -> String {
Expand Down
161 changes: 160 additions & 1 deletion crates/paimon/src/spec/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use crate::spec::core_options::CoreOptions;
use crate::spec::core_options::{first_row_supports_changelog_producer, CoreOptions};
use crate::spec::types::{ArrayType, DataType, MapType, MultisetType, RowType};
use crate::spec::PartialUpdateConfig;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -148,6 +148,7 @@ impl TableSchema {
}
}

Schema::validate_first_row_changelog_producer(&new_schema.options)?;
Ok(new_schema)
}

Expand Down Expand Up @@ -263,6 +264,7 @@ pub fn escape_single_quotes(text: &str) -> String {
pub const PRIMARY_KEY_OPTION: &str = "primary-key";
/// Option key for partition in table options (same as [CoreOptions.PARTITION](https://github.com/apache/paimon/blob/release-1.3/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java)).
pub const PARTITION_OPTION: &str = "partition";
const MERGE_ENGINE_OPTION: &str = "merge-engine";

/// Schema of a table (logical DDL schema).
///
Expand Down Expand Up @@ -291,6 +293,7 @@ impl Schema {
let fields = Self::normalize_fields(&fields, &partition_keys, &primary_keys)?;
Self::validate_blob_fields(&fields, &partition_keys, &options)?;
PartialUpdateConfig::new(&options).validate_create_mode(!primary_keys.is_empty())?;
Self::validate_first_row_changelog_producer(&options)?;

Ok(Self {
fields,
Expand Down Expand Up @@ -506,6 +509,40 @@ impl Schema {
Ok(())
}

fn validate_first_row_changelog_producer(
options: &HashMap<String, String>,
) -> crate::Result<()> {
if !options
.get(MERGE_ENGINE_OPTION)
.is_some_and(|value| value.eq_ignore_ascii_case("first-row"))
{
return Ok(());
}

let changelog_producer = CoreOptions::new(options)
.try_changelog_producer()
.map_err(Self::options_error_to_config_invalid)?;
if first_row_supports_changelog_producer(changelog_producer) {
return Ok(());
}

Err(crate::Error::ConfigInvalid {
message: format!(
"merge-engine=first-row only supports changelog-producer=none or lookup, but found changelog-producer={}",
changelog_producer.as_str()
),
})
}

fn options_error_to_config_invalid(error: crate::Error) -> crate::Error {
match error {
crate::Error::Unsupported { message } => crate::Error::ConfigInvalid { message },
other => crate::Error::ConfigInvalid {
message: other.to_string(),
},
}
}

/// Returns top-level Blob field names for create-time Blob contract checks.
fn top_level_blob_field_names(fields: &[DataField]) -> Vec<&str> {
fields
Expand Down Expand Up @@ -978,6 +1015,128 @@ mod tests {
}
}

#[test]
fn test_first_row_schema_validation_accepts_supported_changelog_producers() {
for producer in ["none", "lookup"] {
Schema::builder()
.column("id", DataType::Int(IntType::new()))
.column("value", DataType::Int(IntType::new()))
.primary_key(["id"])
.option("merge-engine", "first-row")
.option("changelog-producer", producer)
.build()
.unwrap();
}
}

#[test]
fn test_first_row_schema_validation_rejects_incompatible_changelog_producers() {
for producer in ["input", "full-compaction"] {
let err = Schema::builder()
.column("id", DataType::Int(IntType::new()))
.column("value", DataType::Int(IntType::new()))
.primary_key(["id"])
.option("merge-engine", "first-row")
.option("changelog-producer", producer)
.build()
.unwrap_err();

assert!(
matches!(err, crate::Error::ConfigInvalid { ref message }
if message.contains("merge-engine=first-row")
&& message.contains("changelog-producer")
&& message.contains(producer)),
"first-row should reject changelog-producer={producer}, got {err:?}"
);
}
}

#[test]
fn test_first_row_apply_changes_rejects_incompatible_changelog_producers() {
let table_schema = TableSchema::new(
0,
&Schema::builder()
.column("id", DataType::Int(IntType::new()))
.column("value", DataType::Int(IntType::new()))
.primary_key(["id"])
.option("merge-engine", "first-row")
.option("changelog-producer", "lookup")
.build()
.unwrap(),
);

for producer in ["input", "full-compaction"] {
let err = table_schema
.apply_changes(vec![crate::spec::SchemaChange::set_option(
"changelog-producer".to_string(),
producer.to_string(),
)])
.unwrap_err();

assert!(
matches!(err, crate::Error::ConfigInvalid { ref message }
if message.contains("merge-engine=first-row")
&& message.contains("changelog-producer")
&& message.contains(producer)),
"first-row alter should reject changelog-producer={producer}, got {err:?}"
);
}
}

#[test]
fn test_first_row_apply_changes_validates_final_options() {
let table_schema = TableSchema::new(
0,
&Schema::builder()
.column("id", DataType::Int(IntType::new()))
.column("value", DataType::Int(IntType::new()))
.primary_key(["id"])
.option("changelog-producer", "input")
.build()
.unwrap(),
);

let err = table_schema
.apply_changes(vec![crate::spec::SchemaChange::set_option(
"merge-engine".to_string(),
"first-row".to_string(),
)])
.unwrap_err();

assert!(
matches!(err, crate::Error::ConfigInvalid { ref message }
if message.contains("merge-engine=first-row")
&& message.contains("changelog-producer")
&& message.contains("input")),
"first-row alter should reject incompatible final options, got {err:?}"
);

let new_schema = table_schema
.apply_changes(vec![
crate::spec::SchemaChange::set_option(
"merge-engine".to_string(),
"first-row".to_string(),
),
crate::spec::SchemaChange::set_option(
"changelog-producer".to_string(),
"lookup".to_string(),
),
])
.unwrap();

assert_eq!(
new_schema.options().get("merge-engine").map(String::as_str),
Some("first-row")
);
assert_eq!(
new_schema
.options()
.get("changelog-producer")
.map(String::as_str),
Some("lookup")
);
}

#[test]
fn test_schema_builder_column_row_type() {
let row_type = RowType::new(vec![DataField::new(
Expand Down
61 changes: 59 additions & 2 deletions crates/paimon/src/table/table_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
use crate::arrow::build_target_arrow_schema;
use crate::spec::PartitionComputer;
use crate::spec::{
BinaryRow, ChangelogProducer, CoreOptions, DataType, MergeEngine, EMPTY_SERIALIZED_ROW,
POSTPONE_BUCKET,
first_row_supports_changelog_producer, BinaryRow, ChangelogProducer, CoreOptions, DataType,
MergeEngine, EMPTY_SERIALIZED_ROW, POSTPONE_BUCKET,
};
use crate::table::blob_file_writer::AppendBlobFileWriter;
use crate::table::bucket_assigner::{BucketAssignerEnum, PartitionBucketKey};
Expand Down Expand Up @@ -224,6 +224,18 @@ impl TableWrite {

let merge_engine = core_options.merge_engine()?;

if merge_engine == MergeEngine::FirstRow
&& !first_row_supports_changelog_producer(changelog_producer)
{
return Err(crate::Error::Unsupported {
message: format!(
"Table '{}' has incompatible table options: merge-engine=first-row only supports changelog-producer=none or lookup, but found changelog-producer={}",
table.identifier().full_name(),
changelog_producer.as_str()
),
});
}

if is_dynamic_cross_partition && merge_engine == MergeEngine::PartialUpdate {
return Err(crate::Error::Unsupported {
message:
Expand Down Expand Up @@ -1564,6 +1576,21 @@ mod tests {
TableSchema::new(0, &builder.build().unwrap())
}

fn loaded_first_row_schema_with_changelog_producer(producer: &str) -> TableSchema {
let schema = Schema::builder()
.column("id", DataType::Int(IntType::new()))
.column("value", DataType::Int(IntType::new()))
.primary_key(["id"])
.option("merge-engine", "first-row")
.option("changelog-producer", "lookup")
.build()
.unwrap();
let table_schema = TableSchema::new(0, &schema);
let mut value = serde_json::to_value(&table_schema).unwrap();
value["options"]["changelog-producer"] = serde_json::Value::String(producer.to_string());
serde_json::from_value(value).unwrap()
}

fn ordinary_dynamic_pk_changelog_schema() -> TableSchema {
let schema = Schema::builder()
.column("pt", DataType::VarChar(VarCharType::string_type()))
Expand All @@ -1577,6 +1604,36 @@ mod tests {
TableSchema::new(0, &schema)
}

#[test]
fn test_table_write_rejects_loaded_first_row_with_incompatible_changelog_producer() {
let file_io = test_file_io();

for producer in ["input", "full-compaction"] {
let table = Table::new(
file_io.clone(),
Identifier::new("default", "test_first_row_changelog"),
"memory:/test_first_row_changelog".to_string(),
loaded_first_row_schema_with_changelog_producer(producer),
None,
);

let err = match TableWrite::new(&table, "test-user".to_string()) {
Ok(_) => panic!(
"first-row should reject changelog-producer={producer} during write setup"
),
Err(err) => err,
};

assert!(
matches!(err, crate::Error::Unsupported { ref message }
if message.contains("incompatible table options")
&& message.contains("merge-engine=first-row")
&& message.contains(producer)),
"first-row runtime guard should reject changelog-producer={producer}, got {err:?}"
);
}
}

#[tokio::test]
async fn test_input_changelog_writes_raw_rows_separately_from_data_rows() {
let file_io = test_file_io();
Expand Down
Loading