Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-2553] Parallelize reading of EQ delete files and cache them on optimizers #2584

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,16 @@ public class OptimizerProperties {
public static final String OPTIMIZER_MEMORY_STORAGE_SIZE = "memory-storage-size";
public static final String MAX_INPUT_FILE_SIZE_PER_THREAD = "max-input-file-size-per-thread";
public static final Long MAX_INPUT_FILE_SIZE_PER_THREAD_DEFAULT = 512 * 1024 * 1024L; // 512MB

public static final String OPTIMIZER_CACHE_ENABLED = "cache-enabled";
public static final boolean OPTIMIZER_CACHE_ENABLED_DEFAULT = true;

public static final String OPTIMIZER_CACHE_TIMEOUT = "cache-timeout";
public static final long OPTIMIZER_CACHE_TIMEOUT_DEFAULT = 10;

public static final String OPTIMIZER_CACHE_MAX_ENTRY_SIZE = "cache-max-entry-size";
public static final long OPTIMIZER_CACHE_MAX_ENTRY_SIZE_DEFAULT = 64; // 64 MB

public static final String OPTIMIZER_CACHE_MAX_TOTAL_SIZE = "cache-max-total-size";
public static final long OPTIMIZER_CACHE_MAX_TOTAL_SIZE_DEFAULT = 128; // 128 MB
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@

package com.netease.arctic.optimizer.common;

import static com.netease.arctic.api.OptimizerProperties.OPTIMIZER_CACHE_ENABLED_DEFAULT;
import static com.netease.arctic.api.OptimizerProperties.OPTIMIZER_CACHE_MAX_ENTRY_SIZE_DEFAULT;
import static com.netease.arctic.api.OptimizerProperties.OPTIMIZER_CACHE_MAX_TOTAL_SIZE_DEFAULT;
import static com.netease.arctic.api.OptimizerProperties.OPTIMIZER_CACHE_TIMEOUT_DEFAULT;

import com.netease.arctic.api.OptimizerProperties;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.kohsuke.args4j.CmdLineException;
Expand Down Expand Up @@ -85,6 +90,30 @@ public class OptimizerConfig implements Serializable {
@Option(name = "-id", aliases = "--" + OptimizerProperties.RESOURCE_ID, usage = "Resource id")
private String resourceId;

@Option(
name = "-ce",
aliases = "--" + OptimizerProperties.OPTIMIZER_CACHE_ENABLED,
usage = "Whether cache position delete files, default true")
private boolean cacheEnabled = OPTIMIZER_CACHE_ENABLED_DEFAULT;

@Option(
name = "-ct",
aliases = "--" + OptimizerProperties.OPTIMIZER_CACHE_TIMEOUT,
usage = "Cache timeout")
private long cacheTimeout = OPTIMIZER_CACHE_TIMEOUT_DEFAULT; // 10 Min

@Option(
name = "-cmes",
aliases = "--" + OptimizerProperties.OPTIMIZER_CACHE_MAX_ENTRY_SIZE,
usage = "Cache max entry size, default 64MB")
private long cacheMaxEntrySize = OPTIMIZER_CACHE_MAX_ENTRY_SIZE_DEFAULT;

@Option(
name = "-cmts",
aliases = "--" + OptimizerProperties.OPTIMIZER_CACHE_MAX_TOTAL_SIZE,
usage = "Cache max total size, default 128MB")
private long cacheMaxTotalSize = OPTIMIZER_CACHE_MAX_TOTAL_SIZE_DEFAULT;

public OptimizerConfig() {}

public OptimizerConfig(String[] args) throws CmdLineException {
Expand Down Expand Up @@ -164,6 +193,38 @@ public void setResourceId(String resourceId) {
this.resourceId = resourceId;
}

public boolean isCacheEnabled() {
return cacheEnabled;
}

public void setCacheEnabled(boolean cacheEnabled) {
this.cacheEnabled = cacheEnabled;
}

public long getCacheTimeout() {
return cacheTimeout;
}

public void setCacheTimeout(long cacheTimeout) {
this.cacheTimeout = cacheTimeout;
}

public long getCacheMaxEntrySize() {
return cacheMaxEntrySize;
}

public void setCacheMaxEntrySize(long cacheMaxEntrySize) {
this.cacheMaxEntrySize = cacheMaxEntrySize;
}

public long getCacheMaxTotalSize() {
return cacheMaxTotalSize;
}

public void setCacheMaxTotalSize(long cacheMaxTotalSize) {
this.cacheMaxTotalSize = cacheMaxTotalSize;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
Expand All @@ -176,6 +237,10 @@ public String toString() {
.add("rocksDBBasePath", diskStoragePath)
.add("memoryStorageSize", memoryStorageSize)
.add("resourceId", resourceId)
.add("cacheEnabled", cacheEnabled)
.add("cacheTimeout", cacheTimeout)
.add("cacheMaxEntrySize", cacheMaxEntrySize)
.add("cacheMaxTotalSize", cacheMaxTotalSize)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package com.netease.arctic.optimizer.common;

import static com.netease.arctic.api.OptimizerProperties.OPTIMIZER_CACHE_MAX_ENTRY_SIZE_DEFAULT;
import static com.netease.arctic.api.OptimizerProperties.OPTIMIZER_CACHE_MAX_TOTAL_SIZE_DEFAULT;

import org.junit.Assert;
import org.junit.Test;
import org.kohsuke.args4j.CmdLineException;
Expand All @@ -28,7 +31,8 @@ public class TestOptimizerConfig {

@Test
public void testParseArguments() throws CmdLineException {
String cmd = "-a thrift://127.0.0.1:1260 -p 11 -g g1 -hb 2000 -eds -dsp /tmp/arctic -msz 512";
String cmd =
"-a thrift://127.0.0.1:1260 -p 11 -g g1 -hb 2000 -eds -dsp /tmp/arctic -msz 512 -ce -ct 10 -cmes 64 -cmts 128";
String[] args = cmd.split(" ");
OptimizerConfig optimizerConfig = new OptimizerConfig(args);
Assert.assertEquals("thrift://127.0.0.1:1260", optimizerConfig.getAmsUrl());
Expand All @@ -38,6 +42,10 @@ public void testParseArguments() throws CmdLineException {
Assert.assertTrue(optimizerConfig.isExtendDiskStorage());
Assert.assertEquals("/tmp/arctic", optimizerConfig.getDiskStoragePath());
Assert.assertEquals(512, optimizerConfig.getMemoryStorageSize());
Assert.assertTrue(optimizerConfig.isCacheEnabled());
Assert.assertEquals(10, optimizerConfig.getCacheTimeout());
Assert.assertEquals(64, optimizerConfig.getCacheMaxEntrySize());
Assert.assertEquals(128, optimizerConfig.getCacheMaxTotalSize());
}

@Test
Expand All @@ -50,6 +58,7 @@ public void testSetAndGet() {
String diskStoragePath = "/tmp";
long memoryStorageSize = 1024;
String resourceId = UUID.randomUUID().toString();
long cacheTimeout = 10;

config.setAmsUrl(amsUrl);
config.setExecutionParallel(executionParallel);
Expand All @@ -59,6 +68,10 @@ public void testSetAndGet() {
config.setDiskStoragePath(diskStoragePath);
config.setMemoryStorageSize(memoryStorageSize);
config.setResourceId(resourceId);
config.setCacheEnabled(true);
config.setCacheTimeout(cacheTimeout);
config.setCacheMaxEntrySize(OPTIMIZER_CACHE_MAX_ENTRY_SIZE_DEFAULT);
config.setCacheMaxTotalSize(OPTIMIZER_CACHE_MAX_TOTAL_SIZE_DEFAULT);

Assert.assertEquals(amsUrl, config.getAmsUrl());
Assert.assertEquals(executionParallel, config.getExecutionParallel());
Expand All @@ -68,6 +81,10 @@ public void testSetAndGet() {
Assert.assertEquals(diskStoragePath, config.getDiskStoragePath());
Assert.assertEquals(memoryStorageSize, config.getMemoryStorageSize());
Assert.assertEquals(resourceId, config.getResourceId());
Assert.assertTrue(config.isCacheEnabled());
Assert.assertEquals(cacheTimeout, config.getCacheTimeout());
Assert.assertEquals(OPTIMIZER_CACHE_MAX_ENTRY_SIZE_DEFAULT, config.getCacheMaxEntrySize());
Assert.assertEquals(OPTIMIZER_CACHE_MAX_TOTAL_SIZE_DEFAULT, config.getCacheMaxTotalSize());
}

@Test(expected = CmdLineException.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public static class TestOptimizingExecutorFactory
public void initialize(Map<String, String> properties) {}

@Override
public OptimizingExecutor createExecutor(TestOptimizingInput input) {
public OptimizingExecutor<?> createExecutor(TestOptimizingInput input) {
return new TestOptimizingExecutor(input);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.netease.arctic.optimizer.flink;

import com.netease.arctic.io.reader.OptimizerExecutorCache;
import com.netease.arctic.optimizer.common.Optimizer;
import com.netease.arctic.optimizer.common.OptimizerConfig;
import com.netease.arctic.optimizer.common.OptimizerToucher;
Expand All @@ -33,6 +34,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;

public class FlinkOptimizer extends Optimizer {
private static final Logger LOG = LoggerFactory.getLogger(FlinkOptimizer.class);

Expand All @@ -50,6 +53,13 @@ public static void main(String[] args) throws CmdLineException {
// calculate optimizer memory allocation
calcOptimizerMemory(optimizerConfig, env);

if (optimizerConfig.isCacheEnabled()) {
OptimizerExecutorCache.create(
Duration.ofMinutes(optimizerConfig.getCacheTimeout()),
optimizerConfig.getCacheMaxEntrySize() * 1024 * 1024,
optimizerConfig.getCacheMaxTotalSize() * 1024 * 1024);
}

Optimizer optimizer = new FlinkOptimizer(optimizerConfig);
env.addSource(new FlinkToucher(optimizer.getToucher()))
.setParallelism(1)
Expand Down
Loading
Loading