-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-29880][hive] Introduce auto compaction for Hive sink in batch mode #21703
Conversation
0a2a9f6
to
079e6bd
Compare
@flinkbot run azure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@luoyuxia Thanks for your contribution, I left some comments.
final int parallelism) | ||
throws IOException { | ||
return dataStream | ||
.map((MapFunction<RowData, Row>) value -> (Row) converter.toExternal(value)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.map((MapFunction<RowData, Row>) value -> (Row) converter.toExternal(value)) | |
.map(value -> ((Row) converter.toExternal(value))) |
DynamicTableSink.DataStructureConverter converter, | ||
FileSystemOutputFormat<Row> fileSystemOutputFormat, | ||
final int parallelism) | ||
throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is no need to throw exception.
boolean isToLocal, | ||
boolean overwrite, | ||
final int compactParallelism) | ||
throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto
|
||
DataStream<CoordinatorInput> writerDataStream = | ||
dataStream | ||
.map((MapFunction<RowData, Row>) value -> (Row) converter.toExternal(value)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.map((MapFunction<RowData, Row>) value -> (Row) converter.toExternal(value)) | |
.map(value -> (Row) converter.toExternal(value)) |
public class BatchSink { | ||
private BatchSink() {} | ||
|
||
public static DataStreamSink<Row> createBatchNoAutoCompactSink( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think createBatchSink
is enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still prefer createBatchNoAutoCompactSink
which is consistent with createBatchCompactSink
.
|
||
PartitionCommitPolicyFactory partitionCommitPolicyFactory = | ||
new PartitionCommitPolicyFactory( | ||
conf.get(HiveOptions.SINK_PARTITION_COMMIT_POLICY_KIND), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also check the commit policy is not null when table has partition key like streaming sink?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we don't. In batch mode, it'll always commit partitions even though the metastore
policy has been configured.
catalogTable.getOptions().forEach(conf::setString); | ||
boolean autoCompaction = conf.getBoolean(FileSystemConnectorOptions.AUTO_COMPACTION); | ||
if (autoCompaction) { | ||
if (batchShuffleMode != BatchShuffleMode.ALL_EXCHANGES_BLOCKING) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in pipeline shuffle mode, auto compaction can also work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for configuring it out. After think it over, yes, it should still work in pipeline shufle mode.
public static final ConfigOption<MemorySize> COMPACT_SMALL_FILES_AVG_SIZE = | ||
key("compaction.small-files.avg-size") | ||
.memoryType() | ||
.defaultValue(MemorySize.ofMebiBytes(16)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you get this default value? Is it reasonable for user?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure. But It comes for Hive, I think it may be reasonable at least to Hive user.
private TableEnvironment createNoBlockingModeTableEnv() { | ||
EnvironmentSettings settings = EnvironmentSettings.inBatchMode(); | ||
settings.getConfiguration() | ||
.set(ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_PIPELINED); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this option is job level, so we don't need to create a new TableEnvironment.
tableEnv.getConfig().set(ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_PIPELINED);
@lsyldliu Thanks for reviewing. I have addressed your comments. |
} | ||
|
||
@Test | ||
public void testNoCompaction() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use JUnit parameter to cover the two cases: ALL_EXCHANGES_PIPELINED and ALL_EXCHANGES_BLOCKING.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In deed, I'm intended to add case to cover ALL_EXCHANGES_BLOCKING
for it'll increase the test time. We always try to reduce the test time as hive moudle has cost much time.
Also, from the side of these file compaction pipeline, the shuffle mode makes no difference. And if we cover the two cases, what about the other shuffle modes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should test it manually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, have tested it manually. The test passes for the different shuffle mode.
#### Batch Mode | ||
|
||
When it's in batch mode and auto compaction is enabled, after finishing writing files, Flink will calculate the average size of written files for each partition. And if the average size is less than the | ||
threshold configured, Flink will then try to compact these files to files with a target size. The following is the table's options for file compactions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
threshold configured, Flink will then try to compact these files to files with a target size. The following is the table's options for file compactions. | |
configured threshold, then Flink will try to compact these files to files with the target size. The following are the table's options for file compaction. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I accpet it except that I still think we should use a target size
instead of the target size
|
||
#### Stream Mode | ||
|
||
In stream mode, the behavior is same to `FileSystem` sink. Please refer to [File Compaction]({{< ref "docs/connectors/table/filesystem" >}}#file-compaction) for more details. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In stream mode, the behavior is same to `FileSystem` sink. Please refer to [File Compaction]({{< ref "docs/connectors/table/filesystem" >}}#file-compaction) for more details. | |
In stream mode, the behavior is the same as `FileSystem` sink. Please refer to [File Compaction]({{< ref "docs/connectors/table/filesystem" >}}#file-compaction) for more details. |
<td>Integer</td> | ||
<td> | ||
The parallelism to compact files. If not set, it will use the <a href="{{< ref "docs/connectors/table/filesystem" >}}#sink-parallelism">sink parallelism</a>. | ||
When use <a href="{{< ref "docs/deployment/elastic_scaling" >}}#adaptive-batch-scheduler">adaptive batch scheduler</a>, the parallelism may be small, which will cause taking much time to finish compaction. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When use <a href="{{< ref "docs/deployment/elastic_scaling" >}}#adaptive-batch-scheduler">adaptive batch scheduler</a>, the parallelism may be small, which will cause taking much time to finish compaction. | |
When using <a href="{{< ref "docs/deployment/elastic_scaling" >}}#adaptive-batch-scheduler">adaptive batch scheduler</a>, the parallelism of the compact operator deduced by the scheduler may be small, which will cause taking much time to finish compaction. |
<td> | ||
The parallelism to compact files. If not set, it will use the <a href="{{< ref "docs/connectors/table/filesystem" >}}#sink-parallelism">sink parallelism</a>. | ||
When use <a href="{{< ref "docs/deployment/elastic_scaling" >}}#adaptive-batch-scheduler">adaptive batch scheduler</a>, the parallelism may be small, which will cause taking much time to finish compaction. | ||
In such case, please remember to set this value to a bigger value manually. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In such case, please remember to set this value to a bigger value manually. | |
In such a case, please remember to set this option to a bigger value manually. |
|
||
#### Batch Mode | ||
|
||
在批模式,并且自动合并小文件已经开启的情况下,在结束写 Hive 表后,Flink 会计算每个分区下的文件平均大小,如果文件的平均大小小于用户指定的一个阈值,Flink 则会将这些文件合并成指定大小的文件。下面是文件合并涉及到的参数: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
在批模式,并且自动合并小文件已经开启的情况下,在结束写 Hive 表后,Flink 会计算每个分区下的文件平均大小,如果文件的平均大小小于用户指定的一个阈值,Flink 则会将这些文件合并成指定大小的文件。下面是文件合并涉及到的参数: | |
在批模式,并且自动合并小文件已经开启的情况下,在结束写 Hive 表后,Flink 会计算每个分区下文件的平均大小,如果文件的平均大小小于用户指定的一个阈值,Flink 则会将这些文件合并成指定大小的文件。下面是文件合并涉及到的参数: |
@flinkbot run azure |
} | ||
|
||
@Test | ||
public void testNoCompaction() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should test it manually.
<td>no</td> | ||
<td style="word-wrap: break-word;">false</td> | ||
<td>Boolean</td> | ||
<td>Whether to enable automatic compaction in Hive sink or not. The data will be written to temporary files. The temporary files are invisible before compaction.</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<td>Whether to enable automatic compaction in Hive sink or not. The data will be written to temporary files. The temporary files are invisible before compaction.</td> | |
<td>Whether to enable automatic compaction in Hive sink or not. The data will be written to temporary files first. The temporary files are invisible before compaction.</td> |
<td>yes</td> | ||
<td style="word-wrap: break-word;">16MB</td> | ||
<td>MemorySize</td> | ||
<td>合并文件的阈值,当文件的平均大小小于该阈值, Flink 将对这些文件进行合并. 默认值是 16MB.</td> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<td>合并文件的阈值,当文件的平均大小小于该阈值, Flink 将对这些文件进行合并. 默认值是 16MB.</td> | |
<td>合并文件的阈值,当文件的平均大小小于该阈值,Flink 将对这些文件进行合并。 默认值是 16MB。</td> |
Please note the Chinese and English symbols.
@lsyldliu Thanks for reviewing. I have addressed your comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@wuchong Could you please help merge. Thanks. |
What is the purpose of the change
Introduce auto compaction for Hive sink in batch mode
Brief change log
compaction.small-files.avg-size
/compaction.file-size
/compaction.parallelism
for auto compactionBatchFileWriter
->BatchCompactCoordinator
->BatchCompactOperator
->BatchPartitionCommitter
which will support auto compaction.BatchCompactOperator
andCompactOperator
toCompactFileUtils#doCompact
.Verifying this change
Added ITCase in
HiveTableCompactSinkITCase
.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation