Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion core/src/main/java/org/apache/iceberg/MetricsConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,18 @@

import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import javax.annotation.concurrent.Immutable;
import org.apache.iceberg.MetricsModes.MetricsMode;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Type;
Expand All @@ -50,6 +54,7 @@ public final class MetricsConfig implements Serializable {

private static final Logger LOG = LoggerFactory.getLogger(MetricsConfig.class);
private static final Joiner DOT = Joiner.on('.');
private static final Splitter DOT_SPLITTER = Splitter.on('.');

// Disable metrics by default for wide tables to prevent excessive metadata
private static final MetricsMode DEFAULT_MODE =
Expand All @@ -65,13 +70,49 @@ public final class MetricsConfig implements Serializable {
DEFAULT_MODE);

private final Map<String, MetricsMode> columnModes;
// Column names are sanitized when written to data files (see
// AvroSchemaUtil.makeCompatibleName). Metrics mode lookups that resolve column names from a data
// file schema (e.g. when computing metrics for imported or migrated files) therefore query the
// sanitized name, while columnModes is keyed by the original name. This map holds the sanitized
// form of every column as an alias to the same mode so those lookups resolve correctly. It is
// kept separate from columnModes so it never reaches validateReferencedColumns.
private final Map<String, MetricsMode> sanitizedColumnModes;
private final MetricsMode defaultMode;

private MetricsConfig(Map<String, MetricsMode> columnModes, MetricsMode defaultMode) {
this.columnModes = SerializableMap.copyOf(columnModes).immutableMap();
this.sanitizedColumnModes =
SerializableMap.copyOf(sanitizedAliases(this.columnModes)).immutableMap();
this.defaultMode = defaultMode;
}

private static Map<String, MetricsMode> sanitizedAliases(Map<String, MetricsMode> columnModes) {
Map<String, MetricsMode> aliases = Maps.newHashMap();
for (Map.Entry<String, MetricsMode> entry : columnModes.entrySet()) {
String sanitized = sanitizeColumnName(entry.getKey());
if (!sanitized.equals(entry.getKey())) {
// an explicit mode configured directly for the sanitized name wins over the alias
aliases.putIfAbsent(sanitized, entry.getValue());
}
}

return aliases;
}

private static String sanitizeColumnName(String columnName) {
List<String> sanitizedParts = Lists.newArrayList();
for (String part : DOT_SPLITTER.split(columnName)) {
if (part.isEmpty()) {
// not a resolvable dotted path; AvroSchemaUtil rejects empty names, so add no alias
return columnName;
}

sanitizedParts.add(AvroSchemaUtil.makeCompatibleName(part));
}

return DOT.join(sanitizedParts);
}

public static MetricsConfig getDefault() {
return DEFAULT;
}
Expand Down Expand Up @@ -322,6 +363,13 @@ public void validateReferencedColumns(Schema schema) {
}

public MetricsMode columnMode(String columnAlias) {
return columnModes.getOrDefault(columnAlias, defaultMode);
MetricsMode mode = columnModes.get(columnAlias);
if (mode != null) {
return mode;
}

// fall back to the sanitized name so lookups from a data file schema resolve (see
// sanitizedColumnModes); only the original-named columnModes feeds validateReferencedColumns
return sanitizedColumnModes.getOrDefault(columnAlias, defaultMode);
}
}
58 changes: 58 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestMetricsModes.java
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,64 @@ public void testMetricsConfigInferredDefaultModeLimit() throws IOException {
assertThat(config.columnMode("col3")).isEqualTo(None.get());
}

@TestTemplate
public void testMetricsConfigInferredDefaultModeForEscapedColumn() throws IOException {
// "$data" is not a valid Avro/Parquet name and is sanitized to "_x24data" when written, so a
// data file schema resolves the column mode by the sanitized name (see issue #11950).
Schema schema =
new Schema(
required(1, "col1", Types.IntegerType.get()),
required(2, "$data", Types.IntegerType.get()),
required(3, "col3", Types.IntegerType.get()));

Table table =
TestTables.create(
tableDir,
"test",
schema,
PartitionSpec.unpartitioned(),
SortOrder.unsorted(),
formatVersion);

// only infer a default for the first two columns, disabling metrics for the rest
table
.updateProperties()
.set(TableProperties.METRICS_MAX_INFERRED_COLUMN_DEFAULTS, "2")
.commit();

MetricsConfig config = MetricsConfig.forTable(table);

// the inferred default applies whether the column is queried by its original or sanitized name
assertThat(config.columnMode("$data")).isEqualTo(Truncate.withLength(16));
assertThat(config.columnMode("_x24data")).isEqualTo(Truncate.withLength(16));
assertThat(config.columnMode("col3")).isEqualTo(None.get());
}

@TestTemplate
public void testMetricsConfigExplicitOverrideForEscapedColumn() throws IOException {
Schema schema = new Schema(required(1, "$data", Types.IntegerType.get()));

Table table =
TestTables.create(
tableDir,
"test",
schema,
PartitionSpec.unpartitioned(),
SortOrder.unsorted(),
formatVersion);

table
.updateProperties()
.set(TableProperties.METRICS_MODE_COLUMN_CONF_PREFIX + "$data", "full")
.commit();

MetricsConfig config = MetricsConfig.forTable(table);

// the explicit override is honored whether queried by the original or sanitized name
assertThat(config.columnMode("$data")).isEqualTo(Full.get());
assertThat(config.columnMode("_x24data")).isEqualTo(Full.get());
}

@TestTemplate
public void testMetricsVariantSupported() {
assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
Expand Down
32 changes: 32 additions & 0 deletions parquet/src/test/java/org/apache/iceberg/parquet/TestParquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,38 @@ public void testMetricsMissingColumnStatisticsInRowGroups() throws IOException {
assertThat(metrics.upperBounds()).isEmpty();
}

@Test
public void testFileMetricsResolvesEscapedColumnName() throws IOException {
// Regression for #11950: "$data" is not a valid Parquet name and is sanitized to "_x24data"
// in the file schema. When metrics are computed from the footer (the add_files / migrate
// path), the metrics mode must still resolve from the configured original column name.
Schema schema = new Schema(optional(1, "$data", Types.StringType.get()));
String sanitizedName = AvroSchemaUtil.makeCompatibleName("$data");

File file = createTempFile(temp);

org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());
GenericData.Record record = new GenericData.Record(avroSchema);
record.put(sanitizedName, "value");

write(file, schema, Collections.emptyMap(), ParquetAvroWriter::buildWriter, record);

// config keyed by the original column name, as MetricsConfig.forTable would produce; the
// "none" default means a missed lookup silently drops all stats for the escaped column
MetricsConfig metricsConfig =
MetricsConfig.fromProperties(
ImmutableMap.of(
"write.metadata.metrics.default", "none",
"write.metadata.metrics.column.$data", "full"));

Metrics metrics = ParquetUtil.fileMetrics(Files.localInput(file), metricsConfig);

assertThat(metrics.valueCounts()).containsEntry(1, 1L);
assertThat(metrics.nullValueCounts()).containsEntry(1, 0L);
assertThat(metrics.lowerBounds()).containsKey(1);
assertThat(metrics.upperBounds()).containsKey(1);
}

@Test
public void testNumberOfBytesWritten() throws IOException {
Schema schema = new Schema(optional(1, "intCol", IntegerType.get()));
Expand Down