Skip to content

Commit

Permalink
[AMORO-2553] Parallelize reading of EQ delete files and cache them on…
Browse files Browse the repository at this point in the history
… optimizers
  • Loading branch information
zhongqishang committed Mar 25, 2024
1 parent b749c9d commit 3ab149a
Show file tree
Hide file tree
Showing 17 changed files with 729 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,16 @@ public class OptimizerProperties {
public static final String OPTIMIZER_MEMORY_STORAGE_SIZE = "memory-storage-size";
public static final String MAX_INPUT_FILE_SIZE_PER_THREAD = "max-input-file-size-per-thread";
public static final Long MAX_INPUT_FILE_SIZE_PER_THREAD_DEFAULT = 512 * 1024 * 1024L; // 512MB

public static final String OPTIMIZER_CACHE_ENABLED = "cache-enabled";
public static final boolean OPTIMIZER_CACHE_ENABLED_DEFAULT = true;

public static final String OPTIMIZER_CACHE_TIMEOUT = "cache-timeout";
public static final long OPTIMIZER_CACHE_TIMEOUT_DEFAULT = 10;

public static final String OPTIMIZER_CACHE_MAX_ENTRY_SIZE = "cache-max-entry-size";
public static final long OPTIMIZER_CACHE_MAX_ENTRY_SIZE_DEFAULT = 64; // 64 MB

public static final String OPTIMIZER_CACHE_MAX_TOTAL_SIZE = "cache-max-total-size";
public static final long OPTIMIZER_CACHE_MAX_TOTAL_SIZE_DEFAULT = 128; // 128 MB
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

package com.netease.arctic.optimizer.common;

import static com.netease.arctic.api.OptimizerProperties.OPTIMIZER_CACHE_ENABLED_DEFAULT;
import static com.netease.arctic.api.OptimizerProperties.OPTIMIZER_CACHE_MAX_ENTRY_SIZE_DEFAULT;
import static com.netease.arctic.api.OptimizerProperties.OPTIMIZER_CACHE_MAX_TOTAL_SIZE_DEFAULT;
import static com.netease.arctic.api.OptimizerProperties.OPTIMIZER_CACHE_TIMEOUT_DEFAULT;

import com.netease.arctic.api.OptimizerProperties;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.kohsuke.args4j.CmdLineException;
Expand Down Expand Up @@ -85,6 +90,30 @@ public class OptimizerConfig implements Serializable {
@Option(name = "-id", aliases = "--" + OptimizerProperties.RESOURCE_ID, usage = "Resource id")
private String resourceId;

@Option(
name = "-ce",
aliases = "--" + OptimizerProperties.OPTIMIZER_CACHE_ENABLED,
usage = "Whether cache position delete files, default true")
private boolean cacheEnabled = OPTIMIZER_CACHE_ENABLED_DEFAULT;

@Option(
name = "-ct",
aliases = "--" + OptimizerProperties.OPTIMIZER_CACHE_TIMEOUT,
usage = "Memory storage size limit when extending disk storage(MB), default 512MB")
private long cacheTimeout = OPTIMIZER_CACHE_TIMEOUT_DEFAULT; // 10 Min

@Option(
name = "-cmes",
aliases = "--" + OptimizerProperties.OPTIMIZER_CACHE_MAX_ENTRY_SIZE,
usage = "Memory storage size limit when extending disk storage(MB), default 512MB")
private long cacheMaxEntrySize = OPTIMIZER_CACHE_MAX_ENTRY_SIZE_DEFAULT;

@Option(
name = "-cmts",
aliases = "--" + OptimizerProperties.OPTIMIZER_CACHE_MAX_TOTAL_SIZE,
usage = "Memory storage size limit when extending disk storage(MB), default 512MB")
private long cacheMaxTotalSize = OPTIMIZER_CACHE_MAX_TOTAL_SIZE_DEFAULT;

public OptimizerConfig() {}

public OptimizerConfig(String[] args) throws CmdLineException {
Expand Down Expand Up @@ -164,6 +193,38 @@ public void setResourceId(String resourceId) {
this.resourceId = resourceId;
}

public boolean isCacheEnabled() {
return cacheEnabled;
}

public void setCacheEnabled(boolean cacheEnabled) {
this.cacheEnabled = cacheEnabled;
}

public long getCacheTimeout() {
return cacheTimeout;
}

public void setCacheTimeout(long cacheTimeout) {
this.cacheTimeout = cacheTimeout;
}

public long getCacheMaxEntrySize() {
return cacheMaxEntrySize;
}

public void setCacheMaxEntrySize(long cacheMaxEntrySize) {
this.cacheMaxEntrySize = cacheMaxEntrySize;
}

public long getCacheMaxTotalSize() {
return cacheMaxTotalSize;
}

public void setCacheMaxTotalSize(long cacheMaxTotalSize) {
this.cacheMaxTotalSize = cacheMaxTotalSize;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand All @@ -176,6 +237,10 @@ public String toString() {
.add("rocksDBBasePath", diskStoragePath)
.add("memoryStorageSize", memoryStorageSize)
.add("resourceId", resourceId)
.add("cacheEnabled", cacheEnabled)
.add("cacheTimeout", cacheTimeout)
.add("cacheMaxEntrySize", cacheMaxEntrySize)
.add("cacheMaxTotalSize", cacheMaxTotalSize)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package com.netease.arctic.optimizer.common;

import static com.netease.arctic.api.OptimizerProperties.OPTIMIZER_CACHE_MAX_ENTRY_SIZE_DEFAULT;
import static com.netease.arctic.api.OptimizerProperties.OPTIMIZER_CACHE_MAX_TOTAL_SIZE_DEFAULT;

import org.junit.Assert;
import org.junit.Test;
import org.kohsuke.args4j.CmdLineException;
Expand All @@ -28,7 +31,8 @@ public class TestOptimizerConfig {

@Test
public void testParseArguments() throws CmdLineException {
String cmd = "-a thrift://127.0.0.1:1260 -p 11 -g g1 -hb 2000 -eds -dsp /tmp/arctic -msz 512";
String cmd =
"-a thrift://127.0.0.1:1260 -p 11 -g g1 -hb 2000 -eds -dsp /tmp/arctic -msz 512 -ce -ct 10 -cmes 64 -cmts 128";
String[] args = cmd.split(" ");
OptimizerConfig optimizerConfig = new OptimizerConfig(args);
Assert.assertEquals("thrift://127.0.0.1:1260", optimizerConfig.getAmsUrl());
Expand All @@ -38,6 +42,10 @@ public void testParseArguments() throws CmdLineException {
Assert.assertTrue(optimizerConfig.isExtendDiskStorage());
Assert.assertEquals("/tmp/arctic", optimizerConfig.getDiskStoragePath());
Assert.assertEquals(512, optimizerConfig.getMemoryStorageSize());
Assert.assertTrue(optimizerConfig.isCacheEnabled());
Assert.assertEquals(10, optimizerConfig.getCacheTimeout());
Assert.assertEquals(64, optimizerConfig.getCacheMaxEntrySize());
Assert.assertEquals(128, optimizerConfig.getCacheMaxTotalSize());
}

@Test
Expand All @@ -50,6 +58,7 @@ public void testSetAndGet() {
String diskStoragePath = "/tmp";
long memoryStorageSize = 1024;
String resourceId = UUID.randomUUID().toString();
long cacheTimeout = 10;

config.setAmsUrl(amsUrl);
config.setExecutionParallel(executionParallel);
Expand All @@ -59,6 +68,10 @@ public void testSetAndGet() {
config.setDiskStoragePath(diskStoragePath);
config.setMemoryStorageSize(memoryStorageSize);
config.setResourceId(resourceId);
config.setCacheEnabled(true);
config.setCacheTimeout(cacheTimeout);
config.setCacheMaxEntrySize(OPTIMIZER_CACHE_MAX_ENTRY_SIZE_DEFAULT);
config.setCacheMaxTotalSize(OPTIMIZER_CACHE_MAX_TOTAL_SIZE_DEFAULT);

Assert.assertEquals(amsUrl, config.getAmsUrl());
Assert.assertEquals(executionParallel, config.getExecutionParallel());
Expand All @@ -68,6 +81,10 @@ public void testSetAndGet() {
Assert.assertEquals(diskStoragePath, config.getDiskStoragePath());
Assert.assertEquals(memoryStorageSize, config.getMemoryStorageSize());
Assert.assertEquals(resourceId, config.getResourceId());
Assert.assertTrue(config.isCacheEnabled());
Assert.assertEquals(cacheTimeout, config.getCacheTimeout());
Assert.assertEquals(OPTIMIZER_CACHE_MAX_ENTRY_SIZE_DEFAULT, config.getCacheMaxEntrySize());
Assert.assertEquals(OPTIMIZER_CACHE_MAX_TOTAL_SIZE_DEFAULT, config.getCacheMaxTotalSize());
}

@Test(expected = CmdLineException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public static class TestOptimizingExecutorFactory
public void initialize(Map<String, String> properties) {}

@Override
public OptimizingExecutor createExecutor(TestOptimizingInput input) {
public OptimizingExecutor<?> createExecutor(TestOptimizingInput input) {
return new TestOptimizingExecutor(input);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.netease.arctic.optimizer.flink;

import com.netease.arctic.io.reader.OptimizerExecutorCache;
import com.netease.arctic.optimizer.common.Optimizer;
import com.netease.arctic.optimizer.common.OptimizerConfig;
import com.netease.arctic.optimizer.common.OptimizerToucher;
Expand All @@ -33,6 +34,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;

public class FlinkOptimizer extends Optimizer {
private static final Logger LOG = LoggerFactory.getLogger(FlinkOptimizer.class);

Expand All @@ -50,6 +53,13 @@ public static void main(String[] args) throws CmdLineException {
// calculate optimizer memory allocation
calcOptimizerMemory(optimizerConfig, env);

if (optimizerConfig.isCacheEnabled()) {
OptimizerExecutorCache.create(
Duration.ofMinutes(optimizerConfig.getCacheTimeout()),
optimizerConfig.getCacheMaxEntrySize() * 1024 * 1024,
optimizerConfig.getCacheMaxTotalSize() * 1024 * 1024);
}

Optimizer optimizer = new FlinkOptimizer(optimizerConfig);
env.addSource(new FlinkToucher(optimizer.getToucher()))
.setParallelism(1)
Expand Down
Loading

0 comments on commit 3ab149a

Please sign in to comment.