Skip to content

Commit

Permalink
[AMORO-2553] Read eq delete files with cache them on executors
Browse files Browse the repository at this point in the history
  • Loading branch information
zhongqishang committed Feb 28, 2024
1 parent bfdbee7 commit 8fc1a68
Show file tree
Hide file tree
Showing 11 changed files with 1,065 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public static class TestOptimizingExecutorFactory
public void initialize(Map<String, String> properties) {}

@Override
public OptimizingExecutor createExecutor(TestOptimizingInput input) {
public OptimizingExecutor<?> createExecutor(TestOptimizingInput input) {
return new TestOptimizingExecutor(input);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.math.LongMath;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.Filter;
import org.apache.paimon.shade.guava30.com.google.common.hash.BloomFilter;
Expand All @@ -65,6 +66,7 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -108,10 +110,15 @@ public abstract class CombinedDeleteFilter<T extends StructLike> {
private final long dataRecordCnt;
private final boolean filterEqDelete;

private final String tableName;
private final ExecutorCache cache;

protected CombinedDeleteFilter(
String tableName,
RewriteFilesInput rewriteFilesInput,
Schema tableSchema,
StructLikeCollections structLikeCollections) {
this.tableName = tableName;
this.input = rewriteFilesInput;
this.dataRecordCnt =
Arrays.stream(rewriteFilesInput.dataFiles()).mapToLong(ContentFile::recordCount).sum();
Expand Down Expand Up @@ -152,6 +159,7 @@ protected CombinedDeleteFilter(
this.structLikeCollections = structLikeCollections;
}
this.filterEqDelete = filterEqDelete();
this.cache = ExecutorCache.getOrCreate();
}

/**
Expand Down Expand Up @@ -258,7 +266,7 @@ private Predicate<StructForDelete<T>> applyEqDeletes() {
eqDeletes,
s ->
CloseableIterable.transform(
openFile(s, deleteSchema),
getOrReadEqDeletes(s, deleteSchema),
r -> new RecordWithLsn(s.dataSequenceNumber(), r)))),
RecordWithLsn::recordCopy);

Expand Down Expand Up @@ -305,6 +313,37 @@ private Predicate<StructForDelete<T>> applyEqDeletes() {
return isInDeleteSet;
}

private CloseableIterable<Record> getOrReadEqDeletes(DeleteFile deleteFile, Schema projection) {
long estimatedSize = estimateEqDeletesSize(deleteFile, projection);
if (cache != null && estimatedSize < cache.maxEntrySize()) {
String cacheKey = deleteFile.path().toString();
return getOrLoad(cacheKey, () -> openFile(deleteFile, projection), estimatedSize);
} else {
return openFile(deleteFile, projection);
}
}

private CloseableIterable<Record> getOrLoad(
String key, Supplier<CloseableIterable<Record>> valueSupplier, long valueSize) {
return cache.getOrLoad(tableName, key, valueSupplier, valueSize);
}

// estimates the memory required to cache equality deletes (in bytes)
// copy from iceberg 1.5.0
private long estimateEqDeletesSize(DeleteFile deleteFile, Schema projection) {
try {
long recordCount = deleteFile.recordCount();
int recordSize = estimateRecordSize(projection);
return LongMath.checkedMultiply(recordCount, recordSize);
} catch (ArithmeticException e) {
return Long.MAX_VALUE;
}
}

private int estimateRecordSize(Schema schema) {
return schema.columns().stream().mapToInt(TypeUtil::estimateSize).sum();
}

private CloseableIterable<StructForDelete<T>> applyEqDeletes(
CloseableIterable<StructForDelete<T>> records) {
Predicate<StructForDelete<T>> remainingRows = applyEqDeletes().negate();
Expand Down
208 changes: 208 additions & 0 deletions core/src/main/java/com/netease/arctic/io/reader/ExecutorCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic.io.reader;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
* Copy from SparkExecutorCache
*
* <p>An executor cache for reducing the computation and IO overhead in tasks.
*
* <p>The cache is configured and controlled through Spark SQL properties. It supports both limits
* on the total cache size and maximum size for individual entries. Additionally, it implements
* automatic eviction of entries after a specified duration of inactivity. The cache will respect
* the SQL configuration valid at the time of initialization. All subsequent changes to the
* configuration will have no effect.
*
* <p>The cache is accessed and populated via {@link #getOrLoad(String, String, Supplier, long)}. If
* the value is not present in the cache, it is computed using the provided supplier and stored in
* the cache, subject to the defined size constraints. When a key is added, it must be associated
* with a particular group ID. Once the group is no longer needed, it is recommended to explicitly
* invalidate its state by calling {@link #invalidate(String)} instead of relying on automatic
* eviction.
*
* <p>Note that this class employs the singleton pattern to ensure only one cache exists per JVM.
*/
public class ExecutorCache {

private static final Logger LOG = LoggerFactory.getLogger(ExecutorCache.class);

private static volatile ExecutorCache instance = null;
public static boolean enabled = true;
public static String lastTableName = "";
private final Duration timeout;
private final long maxEntrySize;
private final long maxTotalSize;
private volatile Cache<String, CacheValue> state;

private ExecutorCache(Duration duration, long maxEntrySize, long maxTotalSize) {
this.timeout = duration;
this.maxEntrySize = maxEntrySize;
this.maxTotalSize = maxTotalSize;
}

/**
* Returns the cache if created or creates and returns it.
*
* <p>Note this method returns null if caching is disabled.
*/
public static ExecutorCache getOrCreate() {
if (instance == null) {
if (ExecutorCache.enabled) {
synchronized (ExecutorCache.class) {
if (instance == null) {
ExecutorCache.instance =
new ExecutorCache(
Duration.ofMinutes(10),
64 * 1024 * 1024, // 64 MB
128 * 1024 * 1024 // 128 MB
);
}
}
}
}

return instance;
}

/** Returns the cache if already created or null otherwise. */
public static ExecutorCache get() {
return instance;
}

public static void setEnabled(boolean enabled) {
ExecutorCache.enabled = enabled;
}

/** Returns the max entry size in bytes that will be considered for caching. */
public long maxEntrySize() {
return maxEntrySize;
}

/**
* Gets the cached value for the key or populates the cache with a new mapping.
*
* @param group a group ID
* @param key a cache key
* @param valueSupplier a supplier to compute the value
* @param valueSize an estimated memory size of the value in bytes
* @return the cached or computed value
*/
public <V> V getOrLoad(String group, String key, Supplier<V> valueSupplier, long valueSize) {
if (valueSize > maxEntrySize) {
LOG.debug("{} exceeds max entry size: {} > {}", key, valueSize, maxEntrySize);
return valueSupplier.get();
}

String internalKey = group + "_" + key;
CacheValue value = state().get(internalKey, loadFunc(valueSupplier, valueSize));
Preconditions.checkNotNull(value, "Loaded value must not be null");
return value.get();
}

private <V> Function<String, CacheValue> loadFunc(Supplier<V> valueSupplier, long valueSize) {
return key -> {
long start = System.currentTimeMillis();
V value = valueSupplier.get();
long end = System.currentTimeMillis();
LOG.debug("Loaded {} with size {} in {} ms", key, valueSize, (end - start));
return new CacheValue(value, valueSize);
};
}

/**
* Invalidates all keys associated with the given group ID.
*
* @param group a group ID
*/
public void invalidate(String group) {
if (state != null) {
List<String> internalKeys = findInternalKeys(group);
LOG.info("Invalidating {} keys associated with {}", internalKeys.size(), group);
internalKeys.forEach(internalKey -> state.invalidate(internalKey));
LOG.info("Current cache stats {}", state.stats());
}
}

/** Invalidates all keys associated. */
public void invalidateAll() {
state.asMap().keySet().forEach(internalKey -> state.invalidate(internalKey));
}

public List<String> findInternalKeys(String group) {
return state.asMap().keySet().stream()
.filter(internalKey -> internalKey.startsWith(group))
.collect(Collectors.toList());
}

private Cache<String, CacheValue> state() {
if (state == null) {
synchronized (this) {
if (state == null) {
LOG.info("Initializing cache state");
this.state = initState();
}
}
}

return state;
}

private Cache<String, CacheValue> initState() {
return Caffeine.newBuilder()
.expireAfterAccess(timeout)
.maximumWeight(maxTotalSize)
.weigher((key, value) -> ((CacheValue) value).weight())
.recordStats()
.removalListener((key, value, cause) -> LOG.debug("Evicted {} ({})", key, cause))
.build();
}

@VisibleForTesting
static class CacheValue {
private final Object value;
private final long size;

CacheValue(Object value, long size) {
this.value = value;
this.size = size;
}

@SuppressWarnings("unchecked")
public <V> V get() {
return (V) value;
}

public int weight() {
return (int) Math.min(size, Integer.MAX_VALUE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class GenericCombinedIcebergDataReader implements OptimizingDataReader {
protected RewriteFilesInput input;

public GenericCombinedIcebergDataReader(
String tableName,
ArcticFileIO fileIO,
Schema tableSchema,
PartitionSpec spec,
Expand All @@ -88,7 +89,7 @@ public GenericCombinedIcebergDataReader(
this.reuseContainer = reuseContainer;
this.input = rewriteFilesInput;
this.deleteFilter =
new GenericDeleteFilter(rewriteFilesInput, tableSchema, structLikeCollections);
new GenericDeleteFilter(tableName, rewriteFilesInput, tableSchema, structLikeCollections);
}

@Override
Expand Down Expand Up @@ -289,10 +290,11 @@ public CombinedDeleteFilter<Record> getDeleteFilter() {
protected class GenericDeleteFilter extends CombinedDeleteFilter<Record> {

public GenericDeleteFilter(
String tableName,
RewriteFilesInput rewriteFilesInput,
Schema tableSchema,
StructLikeCollections structLikeCollections) {
super(rewriteFilesInput, tableSchema, structLikeCollections);
super(tableName, rewriteFilesInput, tableSchema, structLikeCollections);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ protected StructLike partition() {
@Override
protected OptimizingDataReader dataReader() {
return new GenericCombinedIcebergDataReader(
table.name(),
io,
table.schema(),
table.spec(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.netease.arctic.optimizing;

import com.netease.arctic.table.ArcticTable;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

import java.util.Map;
Expand All @@ -32,9 +33,10 @@ public void initialize(Map<String, String> properties) {
}

@Override
public OptimizingExecutor createExecutor(RewriteFilesInput input) {
public OptimizingExecutor<?> createExecutor(RewriteFilesInput input) {
OptimizingInputProperties optimizingConfig = OptimizingInputProperties.parse(properties);
return new IcebergRewriteExecutor(
input, input.getTable(), optimizingConfig.getStructLikeCollections());
ArcticTable table = input.getTable();
disposeCache(table.name());
return new IcebergRewriteExecutor(input, table, optimizingConfig.getStructLikeCollections());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@

package com.netease.arctic.optimizing;

import com.netease.arctic.io.reader.ExecutorCache;

import java.io.Serializable;
import java.util.Map;
import java.util.Objects;

/** A factory to create {@link OptimizingExecutor} */
public interface OptimizingExecutorFactory<I extends TableOptimizing.OptimizingInput>
Expand All @@ -32,5 +35,15 @@ public interface OptimizingExecutorFactory<I extends TableOptimizing.OptimizingI
void initialize(Map<String, String> properties);

/** Create factory by input */
OptimizingExecutor createExecutor(I input);
OptimizingExecutor<?> createExecutor(I input);

default void disposeCache(String table) {
if (ExecutorCache.enabled) {
ExecutorCache executorCache = ExecutorCache.getOrCreate();
if (!Objects.equals(ExecutorCache.lastTableName, "")
&& !Objects.equals(ExecutorCache.lastTableName, table)) {
executorCache.invalidateAll();
}
}
}
}

0 comments on commit 8fc1a68

Please sign in to comment.