Skip to content

Commit

Permalink
[FLINK-17865][checkpoint] Increase default size of 'state.backend.fs.…
Browse files Browse the repository at this point in the history
…memory-threshold'

This closes #12373.
  • Loading branch information
Myasuka authored and carp84 committed May 28, 2020
1 parent 79cb1a1 commit 6f9042b
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 36 deletions.
6 changes: 3 additions & 3 deletions docs/_includes/generated/checkpointing_configuration.html
Expand Up @@ -22,9 +22,9 @@
</tr>
<tr>
<td><h5>state.backend.fs.memory-threshold</h5></td>
<td style="word-wrap: break-word;">1024</td>
<td>Integer</td>
<td>The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file.</td>
<td style="word-wrap: break-word;">20 kb</td>
<td>MemorySize</td>
<td>The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file. The max memory threshold for this configuration is 1MB.</td>
</tr>
<tr>
<td><h5>state.backend.fs.write-buffer-size</h5></td>
Expand Down
6 changes: 3 additions & 3 deletions docs/_includes/generated/expert_state_backends_section.html
Expand Up @@ -16,9 +16,9 @@
</tr>
<tr>
<td><h5>state.backend.fs.memory-threshold</h5></td>
<td style="word-wrap: break-word;">1024</td>
<td>Integer</td>
<td>The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file.</td>
<td style="word-wrap: break-word;">20 kb</td>
<td>MemorySize</td>
<td>The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file. The max memory threshold for this configuration is 1MB.</td>
</tr>
<tr>
<td><h5>state.backend.fs.write-buffer-size</h5></td>
Expand Down
Expand Up @@ -140,11 +140,12 @@ public class CheckpointingOptions {
/** The minimum size of state data files. All state chunks smaller than that
* are stored inline in the root checkpoint metadata file. */
@Documentation.Section(Documentation.Sections.EXPERT_STATE_BACKENDS)
public static final ConfigOption<Integer> FS_SMALL_FILE_THRESHOLD = ConfigOptions
public static final ConfigOption<MemorySize> FS_SMALL_FILE_THRESHOLD = ConfigOptions
.key("state.backend.fs.memory-threshold")
.defaultValue(1024)
.memoryType()
.defaultValue(MemorySize.parse("20kb"))
.withDescription("The minimum size of state data files. All state chunks smaller than that are stored" +
" inline in the root checkpoint metadata file.");
" inline in the root checkpoint metadata file. The max memory threshold for this configuration is 1MB.");

/**
* The default size of the write buffer for the checkpoint streams that write to file systems.
Expand Down
Expand Up @@ -93,7 +93,7 @@ private static CheckpointStorageLocation createSavepointLocation(Path location)
location,
location,
reference,
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue(),
(int) CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes(),
CheckpointingOptions.FS_WRITE_BUFFER_SIZE.defaultValue());
}
}
Expand Down
Expand Up @@ -97,7 +97,7 @@ def test_get_min_file_size_threshold(self):

state_backend = FsStateBackend("file://var/checkpoints/")

self.assertEqual(state_backend.get_min_file_size_threshold(), 1024)
self.assertEqual(state_backend.get_min_file_size_threshold(), 20480)

state_backend = FsStateBackend("file://var/checkpoints/", file_state_size_threshold=2048)

Expand Down
Expand Up @@ -44,6 +44,7 @@
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.TernaryBoolean;

import org.slf4j.LoggerFactory;
Expand All @@ -55,6 +56,7 @@
import java.net.URI;
import java.util.Collection;

import static org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -363,22 +365,24 @@ private FsStateBackend(FsStateBackend original, ReadableConfig configuration, Cl
this.asynchronousSnapshots = original.asynchronousSnapshots.resolveUndefined(
configuration.get(CheckpointingOptions.ASYNC_SNAPSHOTS));

final int sizeThreshold = original.fileStateThreshold >= 0 ?
original.fileStateThreshold :
configuration.get(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD);
if (getValidFileStateThreshold(original.fileStateThreshold) >= 0) {
this.fileStateThreshold = original.fileStateThreshold;
} else {
final int configuredStateThreshold =
getValidFileStateThreshold(configuration.get(FS_SMALL_FILE_THRESHOLD).getBytes());

if (sizeThreshold >= 0 && sizeThreshold <= MAX_FILE_STATE_THRESHOLD) {
this.fileStateThreshold = sizeThreshold;
}
else {
this.fileStateThreshold = CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue();
if (configuredStateThreshold >= 0) {
this.fileStateThreshold = configuredStateThreshold;
} else {
this.fileStateThreshold = MathUtils.checkedDownCast(FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes());

// because this is the only place we (unlikely) ever log, we lazily
// create the logger here
LoggerFactory.getLogger(AbstractFileStateBackend.class).warn(
// because this is the only place we (unlikely) ever log, we lazily
// create the logger here
LoggerFactory.getLogger(AbstractFileStateBackend.class).warn(
"Ignoring invalid file size threshold value ({}): {} - using default value {} instead.",
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.key(), sizeThreshold,
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue());
FS_SMALL_FILE_THRESHOLD.key(), configuration.get(FS_SMALL_FILE_THRESHOLD).getBytes(),
FS_SMALL_FILE_THRESHOLD.defaultValue());
}
}

final int bufferSize = original.writeBufferSize >= 0 ?
Expand All @@ -388,6 +392,13 @@ private FsStateBackend(FsStateBackend original, ReadableConfig configuration, Cl
this.writeBufferSize = Math.max(bufferSize, this.fileStateThreshold);
}

private int getValidFileStateThreshold(long fileStateThreshold) {
if (fileStateThreshold >= 0 && fileStateThreshold <= MAX_FILE_STATE_THRESHOLD) {
return (int) fileStateThreshold;
}
return -1;
}

// ------------------------------------------------------------------------
// Properties
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -432,7 +443,7 @@ public Path getCheckpointPath() {
public int getMinFileSizeThreshold() {
return fileStateThreshold >= 0 ?
fileStateThreshold :
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue();
MathUtils.checkedDownCast(FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes());
}

/**
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -227,7 +228,7 @@ public void testLoadFileSystemStateBackend() throws Exception {
final String savepointDir = new Path(tmp.newFolder().toURI()).toString();
final Path expectedCheckpointsPath = new Path(checkpointDir);
final Path expectedSavepointsPath = new Path(savepointDir);
final int threshold = 1000000;
final MemorySize threshold = MemorySize.parse("900kb");
final int minWriteBufferSize = 1024;
final boolean async = !CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue();

Expand All @@ -237,15 +238,15 @@ public void testLoadFileSystemStateBackend() throws Exception {
config1.setString(backendKey, "filesystem");
config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
config1.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold);
config1.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold);
config1.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, minWriteBufferSize);
config1.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);

final Configuration config2 = new Configuration();
config2.setString(backendKey, FsStateBackendFactory.class.getName());
config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
config2.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold);
config2.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold);
config1.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, minWriteBufferSize);
config2.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);

Expand All @@ -262,10 +263,10 @@ public void testLoadFileSystemStateBackend() throws Exception {
assertEquals(expectedCheckpointsPath, fs2.getCheckpointPath());
assertEquals(expectedSavepointsPath, fs1.getSavepointPath());
assertEquals(expectedSavepointsPath, fs2.getSavepointPath());
assertEquals(threshold, fs1.getMinFileSizeThreshold());
assertEquals(threshold, fs2.getMinFileSizeThreshold());
assertEquals(Math.max(threshold, minWriteBufferSize), fs1.getWriteBufferSize());
assertEquals(Math.max(threshold, minWriteBufferSize), fs2.getWriteBufferSize());
assertEquals(threshold.getBytes(), fs1.getMinFileSizeThreshold());
assertEquals(threshold.getBytes(), fs2.getMinFileSizeThreshold());
assertEquals(Math.max(threshold.getBytes(), minWriteBufferSize), fs1.getWriteBufferSize());
assertEquals(Math.max(threshold.getBytes(), minWriteBufferSize), fs2.getWriteBufferSize());
assertEquals(async, fs1.isUsingAsynchronousSnapshots());
assertEquals(async, fs2.isUsingAsynchronousSnapshots());
}
Expand Down Expand Up @@ -293,7 +294,7 @@ public void testLoadFileSystemStateBackendMixed() throws Exception {
config.setString(backendKey, "jobmanager"); // this should not be picked up
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); // this should not be picked up
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 20); // this should not be picked up
config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.parse("20")); // this should not be picked up
config.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, 3000000); // this should not be picked up

final StateBackend loadedBackend =
Expand Down
Expand Up @@ -576,7 +576,7 @@ public void open(Configuration parameters) throws Exception {
if (data == null) {
// We need this to be large, because we want to test with files
Random rand = new Random(getRuntimeContext().getIndexOfThisSubtask());
data = new byte[CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue() + 1];
data = new byte[(int) CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes() + 1];
rand.nextBytes(data);
}
}
Expand Down Expand Up @@ -833,7 +833,7 @@ private Configuration getFileBasedCheckpointsConfig(final String savepointDir) {
final Configuration config = new Configuration();
config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
return config;
}
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
Expand Down Expand Up @@ -109,7 +110,7 @@ private Configuration getConfiguration() throws Exception {

config.setString(CheckpointingOptions.STATE_BACKEND, "memory");
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 300L);

Expand Down

0 comments on commit 6f9042b

Please sign in to comment.