From 34a0bdfb6e573cccfccb5d9fe7a90c094a70b007 Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Tue, 19 May 2026 15:25:54 +0700 Subject: [PATCH] Core: Fix metrics mode lookup for escaped column names MetricsConfig keys per-column metrics modes by the original Iceberg column name, but MetricsUtil.metricsMode resolves a field id to a name using whatever schema it is given. When metrics are computed from a Parquet footer (the add_files / migrate / snapshot path via ParquetUtil.fileMetrics), the schema is reconstructed from the file and carries sanitized names (AvroSchemaUtil.makeCompatibleName, e.g. $event_time -> _x24event_time). The lookup then misses and silently falls back to the default mode, which is none once a table exceeds write.metadata.metrics.max-inferred-column-defaults, so column bounds and value counts are dropped for escaped columns. Explicit per-column overrides on escaped names are ignored on the same path. This registers, alongside the original-name column modes, a separate alias map keyed by the sanitized form of each column name (sanitizing each dotted path component). columnMode now falls back to this alias map, so lookups resolve whether the caller passes the original name (write path) or the sanitized name (footer path). The alias map is kept separate from columnModes so validateReferencedColumns continues to validate only user-supplied original names against the table schema. Adds regression tests in TestMetricsModes (inferred default and explicit override for an escaped column, queried by both names) and an end-to-end TestParquet test exercising ParquetUtil.fileMetrics on a file with an escaped column. Closes #11950 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../org/apache/iceberg/MetricsConfig.java | 50 +++++++++++++++- .../org/apache/iceberg/TestMetricsModes.java | 58 +++++++++++++++++++ .../apache/iceberg/parquet/TestParquet.java | 32 ++++++++++ 3 files changed, 139 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/MetricsConfig.java b/core/src/main/java/org/apache/iceberg/MetricsConfig.java index 2b55bcbeab22..6db3db0caa4f 100644 --- a/core/src/main/java/org/apache/iceberg/MetricsConfig.java +++ b/core/src/main/java/org/apache/iceberg/MetricsConfig.java @@ -26,14 +26,18 @@ import java.io.Serializable; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.function.Supplier; import javax.annotation.concurrent.Immutable; import org.apache.iceberg.MetricsModes.MetricsMode; +import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Splitter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Type; @@ -50,6 +54,7 @@ public final class MetricsConfig implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(MetricsConfig.class); private static final Joiner DOT = Joiner.on('.'); + private static final Splitter DOT_SPLITTER = Splitter.on('.'); // Disable metrics by default for wide tables to prevent excessive metadata private static final MetricsMode DEFAULT_MODE = @@ -65,13 +70,49 @@ public final class MetricsConfig implements Serializable { DEFAULT_MODE); private final Map columnModes; + // Column names are sanitized when written to data files (see + // AvroSchemaUtil.makeCompatibleName). Metrics mode lookups that resolve column names from a data + // file schema (e.g. when computing metrics for imported or migrated files) therefore query the + // sanitized name, while columnModes is keyed by the original name. This map holds the sanitized + // form of every column as an alias to the same mode so those lookups resolve correctly. It is + // kept separate from columnModes so it never reaches validateReferencedColumns. + private final Map sanitizedColumnModes; private final MetricsMode defaultMode; private MetricsConfig(Map columnModes, MetricsMode defaultMode) { this.columnModes = SerializableMap.copyOf(columnModes).immutableMap(); + this.sanitizedColumnModes = + SerializableMap.copyOf(sanitizedAliases(this.columnModes)).immutableMap(); this.defaultMode = defaultMode; } + private static Map sanitizedAliases(Map columnModes) { + Map aliases = Maps.newHashMap(); + for (Map.Entry entry : columnModes.entrySet()) { + String sanitized = sanitizeColumnName(entry.getKey()); + if (!sanitized.equals(entry.getKey())) { + // an explicit mode configured directly for the sanitized name wins over the alias + aliases.putIfAbsent(sanitized, entry.getValue()); + } + } + + return aliases; + } + + private static String sanitizeColumnName(String columnName) { + List sanitizedParts = Lists.newArrayList(); + for (String part : DOT_SPLITTER.split(columnName)) { + if (part.isEmpty()) { + // not a resolvable dotted path; AvroSchemaUtil rejects empty names, so add no alias + return columnName; + } + + sanitizedParts.add(AvroSchemaUtil.makeCompatibleName(part)); + } + + return DOT.join(sanitizedParts); + } + public static MetricsConfig getDefault() { return DEFAULT; } @@ -322,6 +363,13 @@ public void validateReferencedColumns(Schema schema) { } public MetricsMode columnMode(String columnAlias) { - return columnModes.getOrDefault(columnAlias, defaultMode); + MetricsMode mode = columnModes.get(columnAlias); + if (mode != null) { + return mode; + } + + // fall back to the sanitized name so lookups from a data file schema resolve (see + // sanitizedColumnModes); only the original-named columnModes feeds validateReferencedColumns + return sanitizedColumnModes.getOrDefault(columnAlias, defaultMode); } } diff --git a/core/src/test/java/org/apache/iceberg/TestMetricsModes.java b/core/src/test/java/org/apache/iceberg/TestMetricsModes.java index 1d03e342dfb4..c1e573cea7cc 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetricsModes.java +++ b/core/src/test/java/org/apache/iceberg/TestMetricsModes.java @@ -199,6 +199,64 @@ public void testMetricsConfigInferredDefaultModeLimit() throws IOException { assertThat(config.columnMode("col3")).isEqualTo(None.get()); } + @TestTemplate + public void testMetricsConfigInferredDefaultModeForEscapedColumn() throws IOException { + // "$data" is not a valid Avro/Parquet name and is sanitized to "_x24data" when written, so a + // data file schema resolves the column mode by the sanitized name (see issue #11950). + Schema schema = + new Schema( + required(1, "col1", Types.IntegerType.get()), + required(2, "$data", Types.IntegerType.get()), + required(3, "col3", Types.IntegerType.get())); + + Table table = + TestTables.create( + tableDir, + "test", + schema, + PartitionSpec.unpartitioned(), + SortOrder.unsorted(), + formatVersion); + + // only infer a default for the first two columns, disabling metrics for the rest + table + .updateProperties() + .set(TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS, "2") + .commit(); + + MetricsConfig config = MetricsConfig.forTable(table); + + // the inferred default applies whether the column is queried by its original or sanitized name + assertThat(config.columnMode("$data")).isEqualTo(Truncate.withLength(16)); + assertThat(config.columnMode("_x24data")).isEqualTo(Truncate.withLength(16)); + assertThat(config.columnMode("col3")).isEqualTo(None.get()); + } + + @TestTemplate + public void testMetricsConfigExplicitOverrideForEscapedColumn() throws IOException { + Schema schema = new Schema(required(1, "$data", Types.IntegerType.get())); + + Table table = + TestTables.create( + tableDir, + "test", + schema, + PartitionSpec.unpartitioned(), + SortOrder.unsorted(), + formatVersion); + + table + .updateProperties() + .set(TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX + "$data", "full") + .commit(); + + MetricsConfig config = MetricsConfig.forTable(table); + + // the explicit override is honored whether queried by the original or sanitized name + assertThat(config.columnMode("$data")).isEqualTo(Full.get()); + assertThat(config.columnMode("_x24data")).isEqualTo(Full.get()); + } + @TestTemplate public void testMetricsVariantSupported() { assumeThat(formatVersion).isGreaterThanOrEqualTo(3); diff --git a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java index 5f1e0c83cc0f..1742322dc87a 100644 --- a/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java +++ b/parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java @@ -159,6 +159,38 @@ public void testMetricsMissingColumnStatisticsInRowGroups() throws IOException { assertThat(metrics.upperBounds()).isEmpty(); } + @Test + public void testFileMetricsResolvesEscapedColumnName() throws IOException { + // Regression for #11950: "$data" is not a valid Parquet name and is sanitized to "_x24data" + // in the file schema. When metrics are computed from the footer (the add_files / migrate + // path), the metrics mode must still resolve from the configured original column name. + Schema schema = new Schema(optional(1, "$data", Types.StringType.get())); + String sanitizedName = AvroSchemaUtil.makeCompatibleName("$data"); + + File file = createTempFile(temp); + + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + GenericData.Record record = new GenericData.Record(avroSchema); + record.put(sanitizedName, "value"); + + write(file, schema, Collections.emptyMap(), ParquetAvroWriter::buildWriter, record); + + // config keyed by the original column name, as MetricsConfig.forTable would produce; the + // "none" default means a missed lookup silently drops all stats for the escaped column + MetricsConfig metricsConfig = + MetricsConfig.fromProperties( + ImmutableMap.of( + "write.metadata.metrics.default", "none", + "write.metadata.metrics.column.$data", "full")); + + Metrics metrics = ParquetUtil.fileMetrics(Files.localInput(file), metricsConfig); + + assertThat(metrics.valueCounts()).containsEntry(1, 1L); + assertThat(metrics.nullValueCounts()).containsEntry(1, 0L); + assertThat(metrics.lowerBounds()).containsKey(1); + assertThat(metrics.upperBounds()).containsKey(1); + } + @Test public void testNumberOfBytesWritten() throws IOException { Schema schema = new Schema(optional(1, "intCol", IntegerType.get()));