Skip to content

Commit

Permalink
[FLINK-35359][config] Make enum options Enum type
Browse files Browse the repository at this point in the history
  • Loading branch information
Sxnan authored and xintongsong committed May 23, 2024
1 parent 58f0813 commit fa8590d
Show file tree
Hide file tree
Showing 62 changed files with 263 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
</tr>
<tr>
<td><h5>taskmanager.network.compression.codec</h5></td>
<td style="word-wrap: break-word;">"LZ4"</td>
<td>String</td>
<td>The codec to be used when compressing shuffle data, only "LZ4", "LZO" and "ZSTD" are supported now. Through tpc-ds test of these three algorithms, the results show that "LZ4" algorithm has the highest compression and decompression speed, but the compression ratio is the lowest. "ZSTD" has the highest compression ratio, but the compression and decompression speed is the slowest, and LZO is between the two. Also note that this option is experimental and might be changed in the future.</td>
<td style="word-wrap: break-word;">LZ4</td>
<td><p>Enum</p></td>
<td>The codec to be used when compressing shuffle data, only "LZ4", "LZO" and "ZSTD" are supported now. Through tpc-ds test of these three algorithms, the results show that "LZ4" algorithm has the highest compression and decompression speed, but the compression ratio is the lowest. "ZSTD" has the highest compression ratio, but the compression and decompression speed is the slowest, and LZO is between the two. Also note that this option is experimental and might be changed in the future.<br /><br />Possible values:<ul><li>"LZ4"</li><li>"LZO"</li><li>"ZSTD"</li></ul></td>
</tr>
<tr>
<td><h5>taskmanager.network.detailed-metrics</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@
</tr>
<tr>
<td><h5>taskmanager.network.compression.codec</h5></td>
<td style="word-wrap: break-word;">"LZ4"</td>
<td>String</td>
<td>The codec to be used when compressing shuffle data, only "LZ4", "LZO" and "ZSTD" are supported now. Through tpc-ds test of these three algorithms, the results show that "LZ4" algorithm has the highest compression and decompression speed, but the compression ratio is the lowest. "ZSTD" has the highest compression ratio, but the compression and decompression speed is the slowest, and LZO is between the two. Also note that this option is experimental and might be changed in the future.</td>
<td style="word-wrap: break-word;">LZ4</td>
<td><p>Enum</p></td>
<td>The codec to be used when compressing shuffle data, only "LZ4", "LZO" and "ZSTD" are supported now. Through tpc-ds test of these three algorithms, the results show that "LZ4" algorithm has the highest compression and decompression speed, but the compression ratio is the lowest. "ZSTD" has the highest compression ratio, but the compression and decompression speed is the slowest, and LZO is between the two. Also note that this option is experimental and might be changed in the future.<br /><br />Possible values:<ul><li>"LZ4"</li><li>"LZO"</li><li>"ZSTD"</li></ul></td>
</tr>
<tr>
<td><h5>taskmanager.network.detailed-metrics</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@
<tbody>
<tr>
<td><h5>table.optimizer.agg-phase-strategy</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
<td style="word-wrap: break-word;">"AUTO"</td>
<td>String</td>
<td style="word-wrap: break-word;">AUTO</td>
<td><p>Enum</p></td>
<td>Strategy for aggregate phase. Only AUTO, TWO_PHASE or ONE_PHASE can be set.
AUTO: No special enforcer for aggregate stage. Whether to choose two stage aggregate or one stage aggregate depends on cost.
TWO_PHASE: Enforce to use two stage aggregate which has localAggregate and globalAggregate. Note that if aggregate call does not support optimize into two phase, we will still use one stage aggregate.
ONE_PHASE: Enforce to use one stage aggregate which only has CompleteGlobalAggregate.</td>
ONE_PHASE: Enforce to use one stage aggregate which only has CompleteGlobalAggregate.<br /><br />Possible values:<ul><li>"AUTO"</li><li>"ONE_PHASE"</li><li>"TWO_PHASE"</li></ul></td>
</tr>
<tr>
<td><h5>table.optimizer.bushy-join-reorder-threshold</h5><br> <span class="label label-primary">Batch</span> <span class="label label-primary">Streaming</span></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,10 @@ public class NettyShuffleEnvironmentOptions {
/** The codec to be used when compressing shuffle data. */
@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
@Experimental
public static final ConfigOption<String> SHUFFLE_COMPRESSION_CODEC =
public static final ConfigOption<CompressionCodec> SHUFFLE_COMPRESSION_CODEC =
key("taskmanager.network.compression.codec")
.stringType()
.defaultValue("LZ4")
.enumType(CompressionCodec.class)
.defaultValue(CompressionCodec.LZ4)
.withDescription(
"The codec to be used when compressing shuffle data, only \"LZ4\", \"LZO\" "
+ "and \"ZSTD\" are supported now. Through tpc-ds test of these "
Expand All @@ -121,6 +121,13 @@ public class NettyShuffleEnvironmentOptions {
+ "speed is the slowest, and LZO is between the two. Also note "
+ "that this option is experimental and might be changed in the future.");

/** Supported compression codec. */
public enum CompressionCodec {
LZ4,
LZO,
ZSTD
}

/**
* Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue
* lengths.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.runtime.io.compression;

import org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec;

import io.airlift.compress.lzo.LzoCompressor;
import io.airlift.compress.lzo.LzoDecompressor;
import io.airlift.compress.zstd.ZstdCompressor;
Expand All @@ -35,24 +37,14 @@ public interface BlockCompressionFactory {

BlockDecompressor getDecompressor();

/** Name of {@link BlockCompressionFactory}. */
enum CompressionFactoryName {
LZ4,
LZO,
ZSTD
}

/**
* Creates {@link BlockCompressionFactory} according to the configuration.
*
* @param compressionFactoryName supported compression codecs.
* @param compressionName supported compression codecs.
*/
static BlockCompressionFactory createBlockCompressionFactory(String compressionFactoryName) {

checkNotNull(compressionFactoryName);
static BlockCompressionFactory createBlockCompressionFactory(CompressionCodec compressionName) {

CompressionFactoryName compressionName =
CompressionFactoryName.valueOf(compressionFactoryName.toUpperCase());
checkNotNull(compressionName);

BlockCompressionFactory blockCompressionFactory;
switch (compressionName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.io.network.buffer;

import org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
Expand All @@ -39,7 +40,7 @@ public class BufferCompressor {
/** The backup array of intermediate buffer. */
private final byte[] internalBufferArray;

public BufferCompressor(int bufferSize, String factoryName) {
public BufferCompressor(int bufferSize, CompressionCodec factoryName) {
checkArgument(bufferSize > 0);
checkNotNull(factoryName);
// the size of this intermediate heap buffer will be gotten from the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.buffer;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
Expand All @@ -40,7 +41,7 @@ public class BufferDecompressor {
/** The backup array of intermediate buffer. */
private final byte[] internalBufferArray;

public BufferDecompressor(int bufferSize, String factoryName) {
public BufferDecompressor(int bufferSize, CompressionCodec factoryName) {
checkArgument(bufferSize > 0);
checkNotNull(factoryName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
import org.apache.flink.runtime.io.disk.FileChannelManager;
Expand Down Expand Up @@ -69,7 +70,7 @@ public class ResultPartitionFactory {

private final boolean batchShuffleCompressionEnabled;

private final String compressionCodec;
private final CompressionCodec compressionCodec;

private final int maxBuffersPerChannel;

Expand Down Expand Up @@ -98,7 +99,7 @@ public ResultPartitionFactory(
int floatingNetworkBuffersPerGate,
int networkBufferSize,
boolean batchShuffleCompressionEnabled,
String compressionCodec,
CompressionCodec compressionCodec,
int maxBuffersPerChannel,
int sortShuffleMinBuffers,
int sortShuffleMinParallelism,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
Expand Down Expand Up @@ -96,7 +97,7 @@ public class SingleInputGateFactory {

private final boolean batchShuffleCompressionEnabled;

private final String compressionCodec;
private final CompressionCodec compressionCodec;

private final int networkBufferSize;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
Expand Down Expand Up @@ -101,7 +102,7 @@ public class NettyShuffleEnvironmentConfiguration {

private final boolean batchShuffleCompressionEnabled;

private final String compressionCodec;
private final CompressionCodec compressionCodec;

private final int maxBuffersPerChannel;

Expand Down Expand Up @@ -135,7 +136,7 @@ public NettyShuffleEnvironmentConfiguration(
String[] tempDirs,
BoundedBlockingSubpartitionType blockingSubpartitionType,
boolean batchShuffleCompressionEnabled,
String compressionCodec,
CompressionCodec compressionCodec,
int maxBuffersPerChannel,
long batchShuffleReadMemoryBytes,
int sortShuffleMinBuffers,
Expand Down Expand Up @@ -259,7 +260,7 @@ public boolean isSSLEnabled() {
return nettyConfig != null && nettyConfig.getSSLEnabled();
}

public String getCompressionCodec() {
public CompressionCodec getCompressionCodec() {
return compressionCodec;
}

Expand Down Expand Up @@ -377,7 +378,7 @@ public static NettyShuffleEnvironmentConfiguration fromConfiguration(

boolean batchShuffleCompressionEnabled =
configuration.get(NettyShuffleEnvironmentOptions.BATCH_SHUFFLE_COMPRESSION_ENABLED);
String compressionCodec =
CompressionCodec compressionCodec =
configuration.get(NettyShuffleEnvironmentOptions.SHUFFLE_COMPRESSION_CODEC);

int maxNumConnections =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.runtime.io.compression;

import org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec;

import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

Expand All @@ -32,9 +34,9 @@
class BlockCompressionTest {
private static Stream<BlockCompressionFactory> compressCodecGenerator() {
return Stream.of(
BlockCompressionFactory.createBlockCompressionFactory("LZ4"),
BlockCompressionFactory.createBlockCompressionFactory("LZO"),
BlockCompressionFactory.createBlockCompressionFactory("ZSTD"));
BlockCompressionFactory.createBlockCompressionFactory(CompressionCodec.LZ4),
BlockCompressionFactory.createBlockCompressionFactory(CompressionCodec.LZO),
BlockCompressionFactory.createBlockCompressionFactory(CompressionCodec.ZSTD));
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
Expand Down Expand Up @@ -82,7 +83,7 @@ public class NettyShuffleEnvironmentBuilder {

private int maxOverdraftBuffersPerGate = 0;

private String compressionCodec = "LZ4";
private CompressionCodec compressionCodec = CompressionCodec.LZ4;

private ResourceID taskManagerLocation = ResourceID.generate();

Expand Down Expand Up @@ -195,7 +196,7 @@ public NettyShuffleEnvironmentBuilder setMaxOverdraftBuffersPerGate(
return this;
}

public NettyShuffleEnvironmentBuilder setCompressionCodec(String compressionCodec) {
public NettyShuffleEnvironmentBuilder setCompressionCodec(CompressionCodec compressionCodec) {
this.compressionCodec = compressionCodec;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.io.network.buffer;

import org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
Expand Down Expand Up @@ -60,30 +61,30 @@ class BufferCompressionTest {
public static Collection<Object[]> parameters() {
return Arrays.asList(
new Object[][] {
{true, "LZ4", true, false},
{true, "LZ4", false, true},
{true, "LZ4", false, false},
{false, "LZ4", true, false},
{false, "LZ4", false, true},
{false, "LZ4", false, false},
{true, "ZSTD", true, false},
{true, "ZSTD", false, true},
{true, "ZSTD", false, false},
{false, "ZSTD", true, false},
{false, "ZSTD", false, true},
{false, "ZSTD", false, false},
{true, "LZO", true, false},
{true, "LZO", false, true},
{true, "LZO", false, false},
{false, "LZO", true, false},
{false, "LZO", false, true},
{false, "LZO", false, false}
{true, CompressionCodec.LZ4, true, false},
{true, CompressionCodec.LZ4, false, true},
{true, CompressionCodec.LZ4, false, false},
{false, CompressionCodec.LZ4, true, false},
{false, CompressionCodec.LZ4, false, true},
{false, CompressionCodec.LZ4, false, false},
{true, CompressionCodec.ZSTD, true, false},
{true, CompressionCodec.ZSTD, false, true},
{true, CompressionCodec.ZSTD, false, false},
{false, CompressionCodec.ZSTD, true, false},
{false, CompressionCodec.ZSTD, false, true},
{false, CompressionCodec.ZSTD, false, false},
{true, CompressionCodec.LZO, true, false},
{true, CompressionCodec.LZO, false, true},
{true, CompressionCodec.LZO, false, false},
{false, CompressionCodec.LZO, true, false},
{false, CompressionCodec.LZO, false, true},
{false, CompressionCodec.LZO, false, false}
});
}

public BufferCompressionTest(
boolean isDirect,
String compressionCodec,
CompressionCodec compressionCodec,
boolean compressToOriginalBuffer,
boolean decompressToOriginalBuffer) {
this.compressToOriginalBuffer = compressToOriginalBuffer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.io.network.netty;

import org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
Expand Down Expand Up @@ -208,8 +209,10 @@ void testReceiveBuffer() throws Exception {
@ValueSource(strings = {"LZ4", "LZO", "ZSTD"})
void testReceiveCompressedBuffer(final String compressionCodec) throws Exception {
int bufferSize = 1024;
BufferCompressor compressor = new BufferCompressor(bufferSize, compressionCodec);
BufferDecompressor decompressor = new BufferDecompressor(bufferSize, compressionCodec);
BufferCompressor compressor =
new BufferCompressor(bufferSize, CompressionCodec.valueOf(compressionCodec));
BufferDecompressor decompressor =
new BufferDecompressor(bufferSize, CompressionCodec.valueOf(compressionCodec));
NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, bufferSize);
SingleInputGate inputGate =
new SingleInputGateBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.io.network.netty;

import org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.network.TestingPartitionRequestClient;
Expand Down Expand Up @@ -149,8 +150,9 @@ void testBufferResponseWithReadOnlySlice() {
@ParameterizedTest
@ValueSource(strings = {"LZ4", "LZO", "ZSTD"})
void testCompressedBufferResponse(final String codecFactoryName) {
compressor = new BufferCompressor(BUFFER_SIZE, codecFactoryName);
decompressor = new BufferDecompressor(BUFFER_SIZE, codecFactoryName);
compressor = new BufferCompressor(BUFFER_SIZE, CompressionCodec.valueOf(codecFactoryName));
decompressor =
new BufferDecompressor(BUFFER_SIZE, CompressionCodec.valueOf(codecFactoryName));
testBufferResponse(false, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.io.network.partition;

import org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.testutils.CheckedThread;
Expand Down Expand Up @@ -62,7 +63,7 @@ class BoundedBlockingSubpartitionWriteReadTest {

private static final int BUFFER_SIZE = 1024 * 1024;

private static final String COMPRESSION_CODEC = "LZ4";
private static final CompressionCodec COMPRESSION_CODEC = CompressionCodec.LZ4;

private static final BufferDecompressor decompressor =
new BufferDecompressor(BUFFER_SIZE, COMPRESSION_CODEC);
Expand Down
Loading

0 comments on commit fa8590d

Please sign in to comment.