diff --git a/ams/optimizer/common/src/test/java/com/netease/arctic/optimizer/common/TestOptimizerExecutor.java b/ams/optimizer/common/src/test/java/com/netease/arctic/optimizer/common/TestOptimizerExecutor.java index 2b9df930d4..f290fcefa2 100644 --- a/ams/optimizer/common/src/test/java/com/netease/arctic/optimizer/common/TestOptimizerExecutor.java +++ b/ams/optimizer/common/src/test/java/com/netease/arctic/optimizer/common/TestOptimizerExecutor.java @@ -143,7 +143,7 @@ public static class TestOptimizingExecutorFactory public void initialize(Map properties) {} @Override - public OptimizingExecutor createExecutor(TestOptimizingInput input) { + public OptimizingExecutor createExecutor(TestOptimizingInput input) { return new TestOptimizingExecutor(input); } } diff --git a/core/src/main/java/com/netease/arctic/io/reader/CombinedDeleteFilter.java b/core/src/main/java/com/netease/arctic/io/reader/CombinedDeleteFilter.java index c727e12298..c4e724543c 100644 --- a/core/src/main/java/com/netease/arctic/io/reader/CombinedDeleteFilter.java +++ b/core/src/main/java/com/netease/arctic/io/reader/CombinedDeleteFilter.java @@ -49,6 +49,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.math.LongMath; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.Filter; import org.apache.paimon.shade.guava30.com.google.common.hash.BloomFilter; @@ -65,6 +66,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -108,10 +110,15 @@ public abstract class CombinedDeleteFilter { private final long dataRecordCnt; private final boolean filterEqDelete; + private final String tableName; + private final ExecutorCache cache; + protected CombinedDeleteFilter( + String tableName, RewriteFilesInput rewriteFilesInput, Schema tableSchema, StructLikeCollections structLikeCollections) { + this.tableName = tableName; this.input = rewriteFilesInput; this.dataRecordCnt = Arrays.stream(rewriteFilesInput.dataFiles()).mapToLong(ContentFile::recordCount).sum(); @@ -152,6 +159,7 @@ protected CombinedDeleteFilter( this.structLikeCollections = structLikeCollections; } this.filterEqDelete = filterEqDelete(); + this.cache = ExecutorCache.getOrCreate(); } /** @@ -258,7 +266,7 @@ private Predicate> applyEqDeletes() { eqDeletes, s -> CloseableIterable.transform( - openFile(s, deleteSchema), + getOrReadEqDeletes(s, deleteSchema), r -> new RecordWithLsn(s.dataSequenceNumber(), r)))), RecordWithLsn::recordCopy); @@ -305,6 +313,37 @@ private Predicate> applyEqDeletes() { return isInDeleteSet; } + private CloseableIterable getOrReadEqDeletes(DeleteFile deleteFile, Schema projection) { + long estimatedSize = estimateEqDeletesSize(deleteFile, projection); + if (cache != null && estimatedSize < cache.maxEntrySize()) { + String cacheKey = deleteFile.path().toString(); + return getOrLoad(cacheKey, () -> openFile(deleteFile, projection), estimatedSize); + } else { + return openFile(deleteFile, projection); + } + } + + private CloseableIterable getOrLoad( + String key, Supplier> valueSupplier, long valueSize) { + return cache.getOrLoad(tableName, key, valueSupplier, valueSize); + } + + // estimates the memory required to cache equality deletes (in bytes) + // copy from iceberg 1.5.0 + private long estimateEqDeletesSize(DeleteFile deleteFile, Schema projection) { + try { + long recordCount = deleteFile.recordCount(); + int recordSize = estimateRecordSize(projection); + return LongMath.checkedMultiply(recordCount, recordSize); + } catch (ArithmeticException e) { + return Long.MAX_VALUE; + } + } + + private int estimateRecordSize(Schema schema) { + return schema.columns().stream().mapToInt(TypeUtil::estimateSize).sum(); + } + private CloseableIterable> applyEqDeletes( CloseableIterable> records) { Predicate> remainingRows = applyEqDeletes().negate(); diff --git a/core/src/main/java/com/netease/arctic/io/reader/ExecutorCache.java b/core/src/main/java/com/netease/arctic/io/reader/ExecutorCache.java new file mode 100644 index 0000000000..3a5a42b63a --- /dev/null +++ b/core/src/main/java/com/netease/arctic/io/reader/ExecutorCache.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.io.reader; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.List; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * Copy from SparkExecutorCache + * + *

An executor cache for reducing the computation and IO overhead in tasks. + * + *

The cache is configured and controlled through Spark SQL properties. It supports both limits + * on the total cache size and maximum size for individual entries. Additionally, it implements + * automatic eviction of entries after a specified duration of inactivity. The cache will respect + * the SQL configuration valid at the time of initialization. All subsequent changes to the + * configuration will have no effect. + * + *

The cache is accessed and populated via {@link #getOrLoad(String, String, Supplier, long)}. If + * the value is not present in the cache, it is computed using the provided supplier and stored in + * the cache, subject to the defined size constraints. When a key is added, it must be associated + * with a particular group ID. Once the group is no longer needed, it is recommended to explicitly + * invalidate its state by calling {@link #invalidate(String)} instead of relying on automatic + * eviction. + * + *

Note that this class employs the singleton pattern to ensure only one cache exists per JVM. + */ +public class ExecutorCache { + + private static final Logger LOG = LoggerFactory.getLogger(ExecutorCache.class); + + private static volatile ExecutorCache instance = null; + public static boolean enabled = true; + public static String lastTableName = ""; + private final Duration timeout; + private final long maxEntrySize; + private final long maxTotalSize; + private volatile Cache state; + + private ExecutorCache(Duration duration, long maxEntrySize, long maxTotalSize) { + this.timeout = duration; + this.maxEntrySize = maxEntrySize; + this.maxTotalSize = maxTotalSize; + } + + /** + * Returns the cache if created or creates and returns it. + * + *

Note this method returns null if caching is disabled. + */ + public static ExecutorCache getOrCreate() { + if (instance == null) { + if (ExecutorCache.enabled) { + synchronized (ExecutorCache.class) { + if (instance == null) { + ExecutorCache.instance = + new ExecutorCache( + Duration.ofMinutes(10), + 64 * 1024 * 1024, // 64 MB + 128 * 1024 * 1024 // 128 MB + ); + } + } + } + } + + return instance; + } + + /** Returns the cache if already created or null otherwise. */ + public static ExecutorCache get() { + return instance; + } + + public static void setEnabled(boolean enabled) { + ExecutorCache.enabled = enabled; + } + + /** Returns the max entry size in bytes that will be considered for caching. */ + public long maxEntrySize() { + return maxEntrySize; + } + + /** + * Gets the cached value for the key or populates the cache with a new mapping. + * + * @param group a group ID + * @param key a cache key + * @param valueSupplier a supplier to compute the value + * @param valueSize an estimated memory size of the value in bytes + * @return the cached or computed value + */ + public V getOrLoad(String group, String key, Supplier valueSupplier, long valueSize) { + if (valueSize > maxEntrySize) { + LOG.debug("{} exceeds max entry size: {} > {}", key, valueSize, maxEntrySize); + return valueSupplier.get(); + } + + String internalKey = group + "_" + key; + CacheValue value = state().get(internalKey, loadFunc(valueSupplier, valueSize)); + Preconditions.checkNotNull(value, "Loaded value must not be null"); + return value.get(); + } + + private Function loadFunc(Supplier valueSupplier, long valueSize) { + return key -> { + long start = System.currentTimeMillis(); + V value = valueSupplier.get(); + long end = System.currentTimeMillis(); + LOG.debug("Loaded {} with size {} in {} ms", key, valueSize, (end - start)); + return new CacheValue(value, valueSize); + }; + } + + /** + * Invalidates all keys associated with the given group ID. + * + * @param group a group ID + */ + public void invalidate(String group) { + if (state != null) { + List internalKeys = findInternalKeys(group); + LOG.info("Invalidating {} keys associated with {}", internalKeys.size(), group); + internalKeys.forEach(internalKey -> state.invalidate(internalKey)); + LOG.info("Current cache stats {}", state.stats()); + } + } + + /** Invalidates all keys associated. */ + public void invalidateAll() { + state.asMap().keySet().forEach(internalKey -> state.invalidate(internalKey)); + } + + public List findInternalKeys(String group) { + return state.asMap().keySet().stream() + .filter(internalKey -> internalKey.startsWith(group)) + .collect(Collectors.toList()); + } + + private Cache state() { + if (state == null) { + synchronized (this) { + if (state == null) { + LOG.info("Initializing cache state"); + this.state = initState(); + } + } + } + + return state; + } + + private Cache initState() { + return Caffeine.newBuilder() + .expireAfterAccess(timeout) + .maximumWeight(maxTotalSize) + .weigher((key, value) -> ((CacheValue) value).weight()) + .recordStats() + .removalListener((key, value, cause) -> LOG.debug("Evicted {} ({})", key, cause)) + .build(); + } + + @VisibleForTesting + static class CacheValue { + private final Object value; + private final long size; + + CacheValue(Object value, long size) { + this.value = value; + this.size = size; + } + + @SuppressWarnings("unchecked") + public V get() { + return (V) value; + } + + public int weight() { + return (int) Math.min(size, Integer.MAX_VALUE); + } + } +} diff --git a/core/src/main/java/com/netease/arctic/io/reader/GenericCombinedIcebergDataReader.java b/core/src/main/java/com/netease/arctic/io/reader/GenericCombinedIcebergDataReader.java index 375fc28cb5..6d90d651e6 100644 --- a/core/src/main/java/com/netease/arctic/io/reader/GenericCombinedIcebergDataReader.java +++ b/core/src/main/java/com/netease/arctic/io/reader/GenericCombinedIcebergDataReader.java @@ -70,6 +70,7 @@ public class GenericCombinedIcebergDataReader implements OptimizingDataReader { protected RewriteFilesInput input; public GenericCombinedIcebergDataReader( + String tableName, ArcticFileIO fileIO, Schema tableSchema, PartitionSpec spec, @@ -88,7 +89,7 @@ public GenericCombinedIcebergDataReader( this.reuseContainer = reuseContainer; this.input = rewriteFilesInput; this.deleteFilter = - new GenericDeleteFilter(rewriteFilesInput, tableSchema, structLikeCollections); + new GenericDeleteFilter(tableName, rewriteFilesInput, tableSchema, structLikeCollections); } @Override @@ -289,10 +290,11 @@ public CombinedDeleteFilter getDeleteFilter() { protected class GenericDeleteFilter extends CombinedDeleteFilter { public GenericDeleteFilter( + String tableName, RewriteFilesInput rewriteFilesInput, Schema tableSchema, StructLikeCollections structLikeCollections) { - super(rewriteFilesInput, tableSchema, structLikeCollections); + super(tableName, rewriteFilesInput, tableSchema, structLikeCollections); } @Override diff --git a/core/src/main/java/com/netease/arctic/optimizing/IcebergRewriteExecutor.java b/core/src/main/java/com/netease/arctic/optimizing/IcebergRewriteExecutor.java index 54bf68d97d..6674ede62f 100644 --- a/core/src/main/java/com/netease/arctic/optimizing/IcebergRewriteExecutor.java +++ b/core/src/main/java/com/netease/arctic/optimizing/IcebergRewriteExecutor.java @@ -61,6 +61,7 @@ protected StructLike partition() { @Override protected OptimizingDataReader dataReader() { return new GenericCombinedIcebergDataReader( + table.name(), io, table.schema(), table.spec(), diff --git a/core/src/main/java/com/netease/arctic/optimizing/IcebergRewriteExecutorFactory.java b/core/src/main/java/com/netease/arctic/optimizing/IcebergRewriteExecutorFactory.java index f858a224dc..fe185b5f3f 100644 --- a/core/src/main/java/com/netease/arctic/optimizing/IcebergRewriteExecutorFactory.java +++ b/core/src/main/java/com/netease/arctic/optimizing/IcebergRewriteExecutorFactory.java @@ -18,6 +18,7 @@ package com.netease.arctic.optimizing; +import com.netease.arctic.table.ArcticTable; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import java.util.Map; @@ -32,9 +33,10 @@ public void initialize(Map properties) { } @Override - public OptimizingExecutor createExecutor(RewriteFilesInput input) { + public OptimizingExecutor createExecutor(RewriteFilesInput input) { OptimizingInputProperties optimizingConfig = OptimizingInputProperties.parse(properties); - return new IcebergRewriteExecutor( - input, input.getTable(), optimizingConfig.getStructLikeCollections()); + ArcticTable table = input.getTable(); + disposeCache(table.name()); + return new IcebergRewriteExecutor(input, table, optimizingConfig.getStructLikeCollections()); } } diff --git a/core/src/main/java/com/netease/arctic/optimizing/OptimizingExecutorFactory.java b/core/src/main/java/com/netease/arctic/optimizing/OptimizingExecutorFactory.java index 01fbf896c5..fdb1eb7dda 100644 --- a/core/src/main/java/com/netease/arctic/optimizing/OptimizingExecutorFactory.java +++ b/core/src/main/java/com/netease/arctic/optimizing/OptimizingExecutorFactory.java @@ -18,8 +18,11 @@ package com.netease.arctic.optimizing; +import com.netease.arctic.io.reader.ExecutorCache; + import java.io.Serializable; import java.util.Map; +import java.util.Objects; /** A factory to create {@link OptimizingExecutor} */ public interface OptimizingExecutorFactory @@ -32,5 +35,15 @@ public interface OptimizingExecutorFactory properties); /** Create factory by input */ - OptimizingExecutor createExecutor(I input); + OptimizingExecutor createExecutor(I input); + + default void disposeCache(String table) { + if (ExecutorCache.enabled) { + ExecutorCache executorCache = ExecutorCache.getOrCreate(); + if (!Objects.equals(ExecutorCache.lastTableName, "") + && !Objects.equals(ExecutorCache.lastTableName, table)) { + executorCache.invalidateAll(); + } + } + } } diff --git a/core/src/main/java/org/apache/iceberg/types/TypeUtil.java b/core/src/main/java/org/apache/iceberg/types/TypeUtil.java new file mode 100644 index 0000000000..98dc25ae30 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/types/TypeUtil.java @@ -0,0 +1,781 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.types; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; + +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +public class TypeUtil { + + private static final int HEADER_SIZE = 12; + + private TypeUtil() {} + + /** + * Project extracts particular fields from a schema by ID. + * + *

Unlike {@link TypeUtil#select(Schema, Set)}, project will pick out only the fields + * enumerated. Structs that are explicitly projected are empty unless sub-fields are explicitly + * projected. Maps and lists cannot be explicitly selected in fieldIds. + * + * @param schema to project fields from + * @param fieldIds list of explicit fields to extract + * @return the schema with all fields fields not selected removed + */ + public static Schema project(Schema schema, Set fieldIds) { + Preconditions.checkNotNull(schema, "Schema cannot be null"); + + Types.StructType result = project(schema.asStruct(), fieldIds); + if (schema.asStruct().equals(result)) { + return schema; + } else if (result != null) { + if (schema.getAliases() != null) { + return new Schema(result.fields(), schema.getAliases()); + } else { + return new Schema(result.fields()); + } + } + return new Schema(Collections.emptyList(), schema.getAliases()); + } + + public static Types.StructType project(Types.StructType struct, Set fieldIds) { + Preconditions.checkNotNull(struct, "Struct cannot be null"); + Preconditions.checkNotNull(fieldIds, "Field ids cannot be null"); + + Type result = visit(struct, new PruneColumns(fieldIds, false)); + if (struct.equals(result)) { + return struct; + } else if (result != null) { + return result.asStructType(); + } + + return Types.StructType.of(); + } + + public static Schema select(Schema schema, Set fieldIds) { + Preconditions.checkNotNull(schema, "Schema cannot be null"); + + Types.StructType result = select(schema.asStruct(), fieldIds); + if (Objects.equals(schema.asStruct(), result)) { + return schema; + } else if (result != null) { + if (schema.getAliases() != null) { + return new Schema(result.fields(), schema.getAliases()); + } else { + return new Schema(result.fields()); + } + } + + return new Schema(ImmutableList.of(), schema.getAliases()); + } + + public static Types.StructType select(Types.StructType struct, Set fieldIds) { + Preconditions.checkNotNull(struct, "Struct cannot be null"); + Preconditions.checkNotNull(fieldIds, "Field ids cannot be null"); + + Type result = visit(struct, new PruneColumns(fieldIds, true)); + if (struct.equals(result)) { + return struct; + } else if (result != null) { + return result.asStructType(); + } + + return Types.StructType.of(); + } + + public static Set getProjectedIds(Schema schema) { + return ImmutableSet.copyOf(getIdsInternal(schema.asStruct(), true)); + } + + public static Set getProjectedIds(Type type) { + if (type.isPrimitiveType()) { + return ImmutableSet.of(); + } + return ImmutableSet.copyOf(getIdsInternal(type, true)); + } + + private static Set getIdsInternal(Type type, boolean includeStructIds) { + return visit(type, new GetProjectedIds(includeStructIds)); + } + + public static Types.StructType selectNot(Types.StructType struct, Set fieldIds) { + Set projectedIds = getIdsInternal(struct, false); + projectedIds.removeAll(fieldIds); + return project(struct, projectedIds); + } + + public static Schema selectNot(Schema schema, Set fieldIds) { + Set projectedIds = getIdsInternal(schema.asStruct(), false); + projectedIds.removeAll(fieldIds); + return project(schema, projectedIds); + } + + public static Schema join(Schema left, Schema right) { + List joinedColumns = Lists.newArrayList(left.columns()); + for (Types.NestedField rightColumn : right.columns()) { + Types.NestedField leftColumn = left.findField(rightColumn.fieldId()); + + if (leftColumn == null) { + joinedColumns.add(rightColumn); + } else { + Preconditions.checkArgument( + leftColumn.equals(rightColumn), + "Schemas have different columns with same id: %s, %s", + leftColumn, + rightColumn); + } + } + + return new Schema(joinedColumns); + } + + public static Map indexByName(Types.StructType struct) { + IndexByName indexer = new IndexByName(); + visit(struct, indexer); + return indexer.byName(); + } + + public static Map indexNameById(Types.StructType struct) { + IndexByName indexer = new IndexByName(); + visit(struct, indexer); + return indexer.byId(); + } + + public static Map indexQuotedNameById( + Types.StructType struct, Function quotingFunc) { + IndexByName indexer = new IndexByName(quotingFunc); + visit(struct, indexer); + return indexer.byId(); + } + + public static Map indexByLowerCaseName(Types.StructType struct) { + Map indexByLowerCaseName = Maps.newHashMap(); + indexByName(struct) + .forEach( + (name, integer) -> indexByLowerCaseName.put(name.toLowerCase(Locale.ROOT), integer)); + return indexByLowerCaseName; + } + + public static Map indexById(Types.StructType struct) { + return visit(struct, new IndexById()); + } + + public static Map indexParents(Types.StructType struct) { + return ImmutableMap.copyOf(visit(struct, new IndexParents())); + } + + /** + * Assigns fresh ids from the {@link NextID nextId function} for all fields in a type. + * + * @param type a type + * @param nextId an id assignment function + * @return an structurally identical type with new ids assigned by the nextId function + */ + public static Type assignFreshIds(Type type, NextID nextId) { + return TypeUtil.visit(type, new AssignFreshIds(nextId)); + } + + /** + * Assigns fresh ids from the {@link NextID nextId function} for all fields in a schema. + * + * @param schema a schema + * @param nextId an id assignment function + * @return a structurally identical schema with new ids assigned by the nextId function + */ + public static Schema assignFreshIds(Schema schema, NextID nextId) { + Types.StructType struct = + TypeUtil.visit(schema.asStruct(), new AssignFreshIds(nextId)).asStructType(); + return new Schema(struct.fields(), refreshIdentifierFields(struct, schema)); + } + + /** + * Assigns fresh ids from the {@link NextID nextId function} for all fields in a schema. + * + * @param schemaId an ID assigned to this schema + * @param schema a schema + * @param nextId an id assignment function + * @return a structurally identical schema with new ids assigned by the nextId function + */ + public static Schema assignFreshIds(int schemaId, Schema schema, NextID nextId) { + Types.StructType struct = + TypeUtil.visit(schema.asStruct(), new AssignFreshIds(nextId)).asStructType(); + return new Schema(schemaId, struct.fields(), refreshIdentifierFields(struct, schema)); + } + + /** + * Assigns ids to match a given schema, and fresh ids from the {@link NextID nextId function} for + * all other fields. + * + * @param schema a schema + * @param baseSchema a schema with existing IDs to copy by name + * @param nextId an id assignment function + * @return a structurally identical schema with new ids assigned by the nextId function + */ + public static Schema assignFreshIds(Schema schema, Schema baseSchema, NextID nextId) { + Types.StructType struct = + TypeUtil.visit(schema.asStruct(), new AssignFreshIds(schema, baseSchema, nextId)) + .asStructType(); + return new Schema(struct.fields(), refreshIdentifierFields(struct, schema)); + } + + /** + * Get the identifier fields in the fresh schema based on the identifier fields in the base + * schema. + * + * @param freshSchema fresh schema + * @param baseSchema base schema + * @return identifier fields in the fresh schema + */ + public static Set refreshIdentifierFields( + Types.StructType freshSchema, Schema baseSchema) { + Map nameToId = TypeUtil.indexByName(freshSchema); + Set identifierFieldNames = baseSchema.identifierFieldNames(); + identifierFieldNames.forEach( + name -> + Preconditions.checkArgument( + nameToId.containsKey(name), + "Cannot find ID for identifier field %s in schema %s", + name, + freshSchema)); + return identifierFieldNames.stream().map(nameToId::get).collect(Collectors.toSet()); + } + + /** + * Assigns strictly increasing fresh ids for all fields in a schema, starting from 1. + * + * @param schema a schema + * @return a structurally identical schema with new ids assigned strictly increasing from 1 + */ + public static Schema assignIncreasingFreshIds(Schema schema) { + AtomicInteger lastColumnId = new AtomicInteger(0); + return TypeUtil.assignFreshIds(schema, lastColumnId::incrementAndGet); + } + + /** + * Reassigns ids in a schema from another schema. + * + *

Ids are determined by field names. If a field in the schema cannot be found in the source + * schema, this will throw IllegalArgumentException. + * + *

This will not alter a schema's structure, nullability, or types. + * + * @param schema the schema to have ids reassigned + * @param idSourceSchema the schema from which field ids will be used + * @return an structurally identical schema with field ids matching the source schema + * @throws IllegalArgumentException if a field cannot be found (by name) in the source schema + */ + public static Schema reassignIds(Schema schema, Schema idSourceSchema) { + return reassignIds(schema, idSourceSchema, true); + } + + /** + * Reassigns doc in a schema from another schema. + * + *

Doc are determined by field id. If a field in the schema cannot be found in the source + * schema, this will throw IllegalArgumentException. + * + *

This will not alter a schema's structure, nullability, or types. + * + * @param schema the schema to have doc reassigned + * @param docSourceSchema the schema from which field doc will be used + * @return an structurally identical schema with field ids matching the source schema + * @throws IllegalArgumentException if a field cannot be found (by id) in the source schema + */ + public static Schema reassignDoc(Schema schema, Schema docSourceSchema) { + CustomOrderSchemaVisitor visitor = new ReassignDoc(docSourceSchema); + return new Schema( + visitor + .schema(schema, new VisitFuture<>(schema.asStruct(), visitor)) + .asStructType() + .fields()); + } + + /** + * Reassigns ids in a schema from another schema. + * + *

Ids are determined by field names. If a field in the schema cannot be found in the source + * schema, this will throw IllegalArgumentException. + * + *

This will not alter a schema's structure, nullability, or types. + * + * @param schema the schema to have ids reassigned + * @param idSourceSchema the schema from which field ids will be used + * @return an structurally identical schema with field ids matching the source schema + * @throws IllegalArgumentException if a field cannot be found (by name) in the source schema + */ + public static Schema reassignIds(Schema schema, Schema idSourceSchema, boolean caseSensitive) { + Types.StructType struct = + visit(schema, new ReassignIds(idSourceSchema, null, caseSensitive)).asStructType(); + return new Schema(struct.fields(), refreshIdentifierFields(struct, schema)); + } + + public static Schema reassignOrRefreshIds(Schema schema, Schema idSourceSchema) { + return reassignOrRefreshIds(schema, idSourceSchema, true); + } + + public static Schema reassignOrRefreshIds( + Schema schema, Schema idSourceSchema, boolean caseSensitive) { + AtomicInteger highest = new AtomicInteger(idSourceSchema.highestFieldId()); + Types.StructType struct = + visit(schema, new ReassignIds(idSourceSchema, highest::incrementAndGet, caseSensitive)) + .asStructType(); + return new Schema(struct.fields(), refreshIdentifierFields(struct, schema)); + } + + public static Type find(Schema schema, Predicate predicate) { + return visit(schema, new FindTypeVisitor(predicate)); + } + + public static boolean isPromotionAllowed(Type from, Type.PrimitiveType to) { + // Warning! Before changing this function, make sure that the type change doesn't introduce + // compatibility problems in partitioning. + if (from.equals(to)) { + return true; + } + + switch (from.typeId()) { + case INTEGER: + return to.typeId() == Type.TypeID.LONG; + + case FLOAT: + return to.typeId() == Type.TypeID.DOUBLE; + + case DECIMAL: + Types.DecimalType fromDecimal = (Types.DecimalType) from; + if (to.typeId() != Type.TypeID.DECIMAL) { + return false; + } + + Types.DecimalType toDecimal = (Types.DecimalType) to; + return fromDecimal.scale() == toDecimal.scale() + && fromDecimal.precision() <= toDecimal.precision(); + } + + return false; + } + + /** + * Check whether we could write the iceberg table with the user-provided write schema. + * + * @param tableSchema the table schema written in iceberg meta data. + * @param writeSchema the user-provided write schema. + * @param checkNullability If true, not allow to write optional values to a required field. + * @param checkOrdering If true, not allow input schema to have different ordering than table + * schema. + */ + public static void validateWriteSchema( + Schema tableSchema, Schema writeSchema, Boolean checkNullability, Boolean checkOrdering) { + String errMsg = "Cannot write incompatible dataset to table with schema:"; + checkSchemaCompatibility(errMsg, tableSchema, writeSchema, checkNullability, checkOrdering); + } + + /** + * Validates whether the provided schema is compatible with the expected schema. + * + * @param context the schema context (e.g. row ID) + * @param expectedSchema the expected schema + * @param providedSchema the provided schema + * @param checkNullability whether to check field nullability + * @param checkOrdering whether to check field ordering + */ + public static void validateSchema( + String context, + Schema expectedSchema, + Schema providedSchema, + boolean checkNullability, + boolean checkOrdering) { + String errMsg = + String.format("Provided %s schema is incompatible with expected schema:", context); + checkSchemaCompatibility( + errMsg, expectedSchema, providedSchema, checkNullability, checkOrdering); + } + + private static void checkSchemaCompatibility( + String errMsg, + Schema schema, + Schema providedSchema, + boolean checkNullability, + boolean checkOrdering) { + List errors; + if (checkNullability) { + errors = CheckCompatibility.writeCompatibilityErrors(schema, providedSchema, checkOrdering); + } else { + errors = CheckCompatibility.typeCompatibilityErrors(schema, providedSchema, checkOrdering); + } + + if (!errors.isEmpty()) { + StringBuilder sb = new StringBuilder(); + sb.append(errMsg) + .append("\n") + .append(schema) + .append("\n") + .append("Provided schema:") + .append("\n") + .append(providedSchema) + .append("\n") + .append("Problems:"); + for (String error : errors) { + sb.append("\n* ").append(error); + } + throw new IllegalArgumentException(sb.toString()); + } + } + + /** + * Estimates the number of bytes a value for a given field may occupy in memory. + * + *

This method approximates the memory size based on heuristics and the internal Java + * representation defined by {@link Type.TypeID}. It is important to note that the actual size + * might differ from this estimation. The method is designed to handle a variety of data types, + * including primitive types, strings, and nested types such as structs, maps, and lists. + * + * @param field a field for which to estimate the size + * @return the estimated size in bytes of the field's value in memory + */ + public static int estimateSize(Types.NestedField field) { + return estimateSize(field.type()); + } + + private static int estimateSize(Type type) { + switch (type.typeId()) { + case BOOLEAN: + // the size of a boolean variable is virtual machine dependent + // it is common to believe booleans occupy 1 byte in most JVMs + return 1; + case INTEGER: + case FLOAT: + case DATE: + // ints and floats occupy 4 bytes + // dates are internally represented as ints + return 4; + case LONG: + case DOUBLE: + case TIME: + case TIMESTAMP: + // longs and doubles occupy 8 bytes + // times and timestamps are internally represented as longs + return 8; + case STRING: + // 12 (header) + 6 (fields) + 16 (array overhead) + 20 (10 chars, 2 bytes each) = 54 bytes + return 54; + case UUID: + // 12 (header) + 16 (two long variables) = 28 bytes + return 28; + case FIXED: + return ((Types.FixedType) type).length(); + case BINARY: + return 80; + case DECIMAL: + // 12 (header) + (12 + 12 + 4) (BigInteger) + 4 (scale) = 44 bytes + return 44; + case STRUCT: + Types.StructType struct = (Types.StructType) type; + return HEADER_SIZE + struct.fields().stream().mapToInt(TypeUtil::estimateSize).sum(); + case LIST: + Types.ListType list = (Types.ListType) type; + return HEADER_SIZE + 5 * estimateSize(list.elementType()); + case MAP: + Types.MapType map = (Types.MapType) type; + int entrySize = HEADER_SIZE + estimateSize(map.keyType()) + estimateSize(map.valueType()); + return HEADER_SIZE + 5 * entrySize; + default: + return 16; + } + } + + /** Interface for passing a function that assigns column IDs. */ + public interface NextID { + int get(); + } + + public static class SchemaVisitor { + public void beforeField(Types.NestedField field) {} + + public void afterField(Types.NestedField field) {} + + public void beforeListElement(Types.NestedField elementField) { + beforeField(elementField); + } + + public void afterListElement(Types.NestedField elementField) { + afterField(elementField); + } + + public void beforeMapKey(Types.NestedField keyField) { + beforeField(keyField); + } + + public void afterMapKey(Types.NestedField keyField) { + afterField(keyField); + } + + public void beforeMapValue(Types.NestedField valueField) { + beforeField(valueField); + } + + public void afterMapValue(Types.NestedField valueField) { + afterField(valueField); + } + + public T schema(Schema schema, T structResult) { + return null; + } + + public T struct(Types.StructType struct, List fieldResults) { + return null; + } + + public T field(Types.NestedField field, T fieldResult) { + return null; + } + + public T list(Types.ListType list, T elementResult) { + return null; + } + + public T map(Types.MapType map, T keyResult, T valueResult) { + return null; + } + + public T primitive(Type.PrimitiveType primitive) { + return null; + } + } + + public static T visit(Schema schema, SchemaVisitor visitor) { + return visitor.schema(schema, visit(schema.asStruct(), visitor)); + } + + public static T visit(Type type, SchemaVisitor visitor) { + switch (type.typeId()) { + case STRUCT: + Types.StructType struct = type.asNestedType().asStructType(); + List results = Lists.newArrayListWithExpectedSize(struct.fields().size()); + for (Types.NestedField field : struct.fields()) { + visitor.beforeField(field); + T result; + try { + result = visit(field.type(), visitor); + } finally { + visitor.afterField(field); + } + results.add(visitor.field(field, result)); + } + return visitor.struct(struct, results); + + case LIST: + Types.ListType list = type.asNestedType().asListType(); + T elementResult; + + Types.NestedField elementField = list.field(list.elementId()); + visitor.beforeListElement(elementField); + try { + elementResult = visit(list.elementType(), visitor); + } finally { + visitor.afterListElement(elementField); + } + + return visitor.list(list, elementResult); + + case MAP: + Types.MapType map = type.asNestedType().asMapType(); + T keyResult; + T valueResult; + + Types.NestedField keyField = map.field(map.keyId()); + visitor.beforeMapKey(keyField); + try { + keyResult = visit(map.keyType(), visitor); + } finally { + visitor.afterMapKey(keyField); + } + + Types.NestedField valueField = map.field(map.valueId()); + visitor.beforeMapValue(valueField); + try { + valueResult = visit(map.valueType(), visitor); + } finally { + visitor.afterMapValue(valueField); + } + + return visitor.map(map, keyResult, valueResult); + + default: + return visitor.primitive(type.asPrimitiveType()); + } + } + + public static class CustomOrderSchemaVisitor { + public T schema(Schema schema, Supplier structResult) { + return null; + } + + public T struct(Types.StructType struct, Iterable fieldResults) { + return null; + } + + public T field(Types.NestedField field, Supplier fieldResult) { + return null; + } + + public T list(Types.ListType list, Supplier elementResult) { + return null; + } + + public T map(Types.MapType map, Supplier keyResult, Supplier valueResult) { + return null; + } + + public T primitive(Type.PrimitiveType primitive) { + return null; + } + } + + private static class VisitFuture implements Supplier { + private final Type type; + private final CustomOrderSchemaVisitor visitor; + + private VisitFuture(Type type, CustomOrderSchemaVisitor visitor) { + this.type = type; + this.visitor = visitor; + } + + @Override + public T get() { + return visit(type, visitor); + } + } + + private static class VisitFieldFuture implements Supplier { + private final Types.NestedField field; + private final CustomOrderSchemaVisitor visitor; + + private VisitFieldFuture(Types.NestedField field, CustomOrderSchemaVisitor visitor) { + this.field = field; + this.visitor = visitor; + } + + @Override + public T get() { + return visitor.field(field, new VisitFuture<>(field.type(), visitor)); + } + } + + public static T visit(Schema schema, CustomOrderSchemaVisitor visitor) { + return visitor.schema(schema, new VisitFuture<>(schema.asStruct(), visitor)); + } + + /** + * Used to traverse types with traversals other than post-order. + * + *

This passes a {@link Supplier} to each {@link CustomOrderSchemaVisitor visitor} method that + * returns the result of traversing child types. Structs are passed an {@link Iterable} that + * traverses child fields during iteration. + * + *

An example use is assigning column IDs, which should be done with a pre-order traversal. + * + * @param type a type to traverse with a visitor + * @param visitor a custom order visitor + * @param the type returned by the visitor + * @return the result of traversing the given type with the visitor + */ + public static T visit(Type type, CustomOrderSchemaVisitor visitor) { + switch (type.typeId()) { + case STRUCT: + Types.StructType struct = type.asNestedType().asStructType(); + List> results = + Lists.newArrayListWithExpectedSize(struct.fields().size()); + for (Types.NestedField field : struct.fields()) { + results.add(new VisitFieldFuture<>(field, visitor)); + } + + return visitor.struct(struct, Iterables.transform(results, VisitFieldFuture::get)); + + case LIST: + Types.ListType list = type.asNestedType().asListType(); + return visitor.list(list, new VisitFuture<>(list.elementType(), visitor)); + + case MAP: + Types.MapType map = type.asNestedType().asMapType(); + return visitor.map( + map, + new VisitFuture<>(map.keyType(), visitor), + new VisitFuture<>(map.valueType(), visitor)); + + default: + return visitor.primitive(type.asPrimitiveType()); + } + } + + static int decimalMaxPrecision(int numBytes) { + Preconditions.checkArgument( + numBytes >= 0 && numBytes < 24, "Unsupported decimal length: %s", numBytes); + return MAX_PRECISION[numBytes]; + } + + public static int decimalRequiredBytes(int precision) { + Preconditions.checkArgument( + precision >= 0 && precision < 40, "Unsupported decimal precision: %s", precision); + return REQUIRED_LENGTH[precision]; + } + + private static final int[] MAX_PRECISION = new int[24]; + private static final int[] REQUIRED_LENGTH = new int[40]; + + static { + // for each length, calculate the max precision + for (int len = 0; len < MAX_PRECISION.length; len += 1) { + MAX_PRECISION[len] = (int) Math.floor(Math.log10(Math.pow(2, 8 * len - 1) - 1)); + } + + // for each precision, find the first length that can hold it + for (int precision = 0; precision < REQUIRED_LENGTH.length; precision += 1) { + REQUIRED_LENGTH[precision] = -1; + for (int len = 0; len < MAX_PRECISION.length; len += 1) { + // find the first length that can hold the precision + if (precision <= MAX_PRECISION[len]) { + REQUIRED_LENGTH[precision] = len; + break; + } + } + if (REQUIRED_LENGTH[precision] < 0) { + throw new IllegalStateException( + "Could not find required length for precision " + precision); + } + } + } +} diff --git a/core/src/test/java/com/netease/arctic/io/TestIcebergCombinedReader.java b/core/src/test/java/com/netease/arctic/io/TestIcebergCombinedReader.java index 572914cf5b..4e24427379 100644 --- a/core/src/test/java/com/netease/arctic/io/TestIcebergCombinedReader.java +++ b/core/src/test/java/com/netease/arctic/io/TestIcebergCombinedReader.java @@ -188,6 +188,7 @@ public void initDataAndReader() throws IOException { public void readAllData() throws IOException { GenericCombinedIcebergDataReader dataReader = new GenericCombinedIcebergDataReader( + getArcticTable().name(), getArcticTable().io(), getArcticTable().schema(), getArcticTable().spec(), @@ -209,6 +210,7 @@ public void readAllData() throws IOException { public void readAllDataNegate() throws IOException { GenericCombinedIcebergDataReader dataReader = new GenericCombinedIcebergDataReader( + getArcticTable().name(), getArcticTable().io(), getArcticTable().schema(), getArcticTable().spec(), @@ -232,6 +234,7 @@ public void readAllDataNegate() throws IOException { public void readOnlyData() throws IOException { GenericCombinedIcebergDataReader dataReader = new GenericCombinedIcebergDataReader( + getArcticTable().name(), getArcticTable().io(), getArcticTable().schema(), getArcticTable().spec(), @@ -251,6 +254,7 @@ public void readOnlyData() throws IOException { public void readOnlyDataNegate() throws IOException { GenericCombinedIcebergDataReader dataReader = new GenericCombinedIcebergDataReader( + getArcticTable().name(), getArcticTable().io(), getArcticTable().schema(), getArcticTable().spec(), @@ -271,6 +275,7 @@ public void readDataEnableFilterEqDelete() throws IOException { CombinedDeleteFilter.FILTER_EQ_DELETE_TRIGGER_RECORD_COUNT = 100L; GenericCombinedIcebergDataReader dataReader = new GenericCombinedIcebergDataReader( + getArcticTable().name(), getArcticTable().io(), getArcticTable().schema(), getArcticTable().spec(), diff --git a/core/src/test/java/com/netease/arctic/io/TestIcebergCombinedReaderVariousTypes.java b/core/src/test/java/com/netease/arctic/io/TestIcebergCombinedReaderVariousTypes.java index 70d988201d..08b6477633 100644 --- a/core/src/test/java/com/netease/arctic/io/TestIcebergCombinedReaderVariousTypes.java +++ b/core/src/test/java/com/netease/arctic/io/TestIcebergCombinedReaderVariousTypes.java @@ -136,6 +136,7 @@ public void valid() throws IOException { CloseableIterable readData = new GenericCombinedIcebergDataReader( + table.name(), table.io(), table.schema(), table.spec(), @@ -186,6 +187,7 @@ public void readDataEnableFilterEqDelete() throws IOException { CombinedDeleteFilter.FILTER_EQ_DELETE_TRIGGER_RECORD_COUNT = 100L; GenericCombinedIcebergDataReader reader = new GenericCombinedIcebergDataReader( + table.name(), table.io(), table.schema(), table.spec(), diff --git a/mixed/hive/src/main/java/com/netease/arctic/hive/optimizing/MixFormatRewriteExecutorFactory.java b/mixed/hive/src/main/java/com/netease/arctic/hive/optimizing/MixFormatRewriteExecutorFactory.java index c8221bf3b3..233a9cb6a3 100644 --- a/mixed/hive/src/main/java/com/netease/arctic/hive/optimizing/MixFormatRewriteExecutorFactory.java +++ b/mixed/hive/src/main/java/com/netease/arctic/hive/optimizing/MixFormatRewriteExecutorFactory.java @@ -22,6 +22,7 @@ import com.netease.arctic.optimizing.OptimizingExecutorFactory; import com.netease.arctic.optimizing.OptimizingInputProperties; import com.netease.arctic.optimizing.RewriteFilesInput; +import com.netease.arctic.table.ArcticTable; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import java.util.Map; @@ -38,8 +39,10 @@ public void initialize(Map properties) { } @Override - public OptimizingExecutor createExecutor(RewriteFilesInput input) { + public OptimizingExecutor createExecutor(RewriteFilesInput input) { OptimizingInputProperties optimizingConfig = OptimizingInputProperties.parse(properties); + ArcticTable table = input.getTable(); + disposeCache(table.name()); return new MixFormatRewriteExecutor( input, input.getTable(),