Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-17865][checkpoint] Increase default size of 'state.backend.fs.memory-threshold' #12373

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/_includes/generated/checkpointing_configuration.html
Expand Up @@ -22,9 +22,9 @@
</tr>
<tr>
<td><h5>state.backend.fs.memory-threshold</h5></td>
<td style="word-wrap: break-word;">1024</td>
<td>Integer</td>
<td>The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file.</td>
<td style="word-wrap: break-word;">20 kb</td>
<td>MemorySize</td>
<td>The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file. The max memory threshold for this configuration is 1MB.</td>
</tr>
<tr>
<td><h5>state.backend.fs.write-buffer-size</h5></td>
Expand Down
6 changes: 3 additions & 3 deletions docs/_includes/generated/expert_state_backends_section.html
Expand Up @@ -16,9 +16,9 @@
</tr>
<tr>
<td><h5>state.backend.fs.memory-threshold</h5></td>
<td style="word-wrap: break-word;">1024</td>
<td>Integer</td>
<td>The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file.</td>
<td style="word-wrap: break-word;">20 kb</td>
<td>MemorySize</td>
<td>The minimum size of state data files. All state chunks smaller than that are stored inline in the root checkpoint metadata file. The max memory threshold for this configuration is 1MB.</td>
</tr>
<tr>
<td><h5>state.backend.fs.write-buffer-size</h5></td>
Expand Down
Expand Up @@ -140,11 +140,12 @@ public class CheckpointingOptions {
/** The minimum size of state data files. All state chunks smaller than that
* are stored inline in the root checkpoint metadata file. */
@Documentation.Section(Documentation.Sections.EXPERT_STATE_BACKENDS)
public static final ConfigOption<Integer> FS_SMALL_FILE_THRESHOLD = ConfigOptions
public static final ConfigOption<MemorySize> FS_SMALL_FILE_THRESHOLD = ConfigOptions
.key("state.backend.fs.memory-threshold")
.defaultValue(1024)
.memoryType()
.defaultValue(MemorySize.parse("20kb"))
.withDescription("The minimum size of state data files. All state chunks smaller than that are stored" +
" inline in the root checkpoint metadata file.");
" inline in the root checkpoint metadata file. The max memory threshold for this configuration is 1MB.");

/**
* The default size of the write buffer for the checkpoint streams that write to file systems.
Expand Down
Expand Up @@ -93,7 +93,7 @@ private static CheckpointStorageLocation createSavepointLocation(Path location)
location,
location,
reference,
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue(),
(int) CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes(),
CheckpointingOptions.FS_WRITE_BUFFER_SIZE.defaultValue());
}
}
Expand Down
Expand Up @@ -97,7 +97,7 @@ def test_get_min_file_size_threshold(self):

state_backend = FsStateBackend("file://var/checkpoints/")

self.assertEqual(state_backend.get_min_file_size_threshold(), 1024)
self.assertEqual(state_backend.get_min_file_size_threshold(), 20480)

state_backend = FsStateBackend("file://var/checkpoints/", file_state_size_threshold=2048)

Expand Down
Expand Up @@ -44,6 +44,7 @@
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.TernaryBoolean;

import org.slf4j.LoggerFactory;
Expand All @@ -55,6 +56,7 @@
import java.net.URI;
import java.util.Collection;

import static org.apache.flink.configuration.CheckpointingOptions.FS_SMALL_FILE_THRESHOLD;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -363,22 +365,24 @@ private FsStateBackend(FsStateBackend original, ReadableConfig configuration, Cl
this.asynchronousSnapshots = original.asynchronousSnapshots.resolveUndefined(
configuration.get(CheckpointingOptions.ASYNC_SNAPSHOTS));

final int sizeThreshold = original.fileStateThreshold >= 0 ?
original.fileStateThreshold :
configuration.get(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD);
if (getValidFileStateThreshold(original.fileStateThreshold) >= 0) {
this.fileStateThreshold = original.fileStateThreshold;
} else {
final int configuredStateThreshold =
getValidFileStateThreshold(configuration.get(FS_SMALL_FILE_THRESHOLD).getBytes());

if (sizeThreshold >= 0 && sizeThreshold <= MAX_FILE_STATE_THRESHOLD) {
this.fileStateThreshold = sizeThreshold;
}
else {
this.fileStateThreshold = CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue();
if (configuredStateThreshold >= 0) {
this.fileStateThreshold = configuredStateThreshold;
} else {
this.fileStateThreshold = MathUtils.checkedDownCast(FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes());

// because this is the only place we (unlikely) ever log, we lazily
// create the logger here
LoggerFactory.getLogger(AbstractFileStateBackend.class).warn(
// because this is the only place we (unlikely) ever log, we lazily
// create the logger here
LoggerFactory.getLogger(AbstractFileStateBackend.class).warn(
"Ignoring invalid file size threshold value ({}): {} - using default value {} instead.",
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.key(), sizeThreshold,
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue());
FS_SMALL_FILE_THRESHOLD.key(), configuration.get(FS_SMALL_FILE_THRESHOLD).getBytes(),
FS_SMALL_FILE_THRESHOLD.defaultValue());
}
}

final int bufferSize = original.writeBufferSize >= 0 ?
Expand All @@ -388,6 +392,13 @@ private FsStateBackend(FsStateBackend original, ReadableConfig configuration, Cl
this.writeBufferSize = Math.max(bufferSize, this.fileStateThreshold);
}

private int getValidFileStateThreshold(long fileStateThreshold) {
if (fileStateThreshold >= 0 && fileStateThreshold <= MAX_FILE_STATE_THRESHOLD) {
return (int) fileStateThreshold;
}
return -1;
}

// ------------------------------------------------------------------------
// Properties
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -432,7 +443,7 @@ public Path getCheckpointPath() {
public int getMinFileSizeThreshold() {
return fileStateThreshold >= 0 ?
fileStateThreshold :
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue();
MathUtils.checkedDownCast(FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes());
}

/**
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -227,7 +228,7 @@ public void testLoadFileSystemStateBackend() throws Exception {
final String savepointDir = new Path(tmp.newFolder().toURI()).toString();
final Path expectedCheckpointsPath = new Path(checkpointDir);
final Path expectedSavepointsPath = new Path(savepointDir);
final int threshold = 1000000;
final MemorySize threshold = MemorySize.parse("900kb");
final int minWriteBufferSize = 1024;
final boolean async = !CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue();

Expand All @@ -237,15 +238,15 @@ public void testLoadFileSystemStateBackend() throws Exception {
config1.setString(backendKey, "filesystem");
config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
config1.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold);
config1.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold);
config1.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, minWriteBufferSize);
config1.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);

final Configuration config2 = new Configuration();
config2.setString(backendKey, FsStateBackendFactory.class.getName());
config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
config2.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold);
config2.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, threshold);
config1.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, minWriteBufferSize);
config2.setBoolean(CheckpointingOptions.ASYNC_SNAPSHOTS, async);

Expand All @@ -262,10 +263,10 @@ public void testLoadFileSystemStateBackend() throws Exception {
assertEquals(expectedCheckpointsPath, fs2.getCheckpointPath());
assertEquals(expectedSavepointsPath, fs1.getSavepointPath());
assertEquals(expectedSavepointsPath, fs2.getSavepointPath());
assertEquals(threshold, fs1.getMinFileSizeThreshold());
assertEquals(threshold, fs2.getMinFileSizeThreshold());
assertEquals(Math.max(threshold, minWriteBufferSize), fs1.getWriteBufferSize());
assertEquals(Math.max(threshold, minWriteBufferSize), fs2.getWriteBufferSize());
assertEquals(threshold.getBytes(), fs1.getMinFileSizeThreshold());
assertEquals(threshold.getBytes(), fs2.getMinFileSizeThreshold());
assertEquals(Math.max(threshold.getBytes(), minWriteBufferSize), fs1.getWriteBufferSize());
assertEquals(Math.max(threshold.getBytes(), minWriteBufferSize), fs2.getWriteBufferSize());
assertEquals(async, fs1.isUsingAsynchronousSnapshots());
assertEquals(async, fs2.isUsingAsynchronousSnapshots());
}
Expand Down Expand Up @@ -293,7 +294,7 @@ public void testLoadFileSystemStateBackendMixed() throws Exception {
config.setString(backendKey, "jobmanager"); // this should not be picked up
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); // this should not be picked up
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 20); // this should not be picked up
config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.parse("20")); // this should not be picked up
config.setInteger(CheckpointingOptions.FS_WRITE_BUFFER_SIZE, 3000000); // this should not be picked up

final StateBackend loadedBackend =
Expand Down
Expand Up @@ -576,7 +576,7 @@ public void open(Configuration parameters) throws Exception {
if (data == null) {
// We need this to be large, because we want to test with files
Random rand = new Random(getRuntimeContext().getIndexOfThisSubtask());
data = new byte[CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue() + 1];
data = new byte[(int) CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue().getBytes() + 1];
rand.nextBytes(data);
}
}
Expand Down Expand Up @@ -833,7 +833,7 @@ private Configuration getFileBasedCheckpointsConfig(final String savepointDir) {
final Configuration config = new Configuration();
config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
return config;
}
Expand Down
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
Expand Down Expand Up @@ -109,7 +110,7 @@ private Configuration getConfiguration() throws Exception {

config.setString(CheckpointingOptions.STATE_BACKEND, "memory");
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
config.setInteger(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, 0);
config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, MemorySize.ZERO);
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 300L);

Expand Down