Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-32365][orc]get orc table statistics in parallel #22805

Merged
merged 9 commits into from
Jul 18, 2023

Conversation

Baibaiwuguo
Copy link
Contributor

@Baibaiwuguo Baibaiwuguo commented Jun 16, 2023

What is the purpose of the change

get orc table statistics in parallel, Improve acquisition speed.

Brief change log

Let HiveTableSource extend from SupportStatisticsReport

Verifying this change

Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (100MB)
  • Extended integration test for recovery after master (JobManager) failure
  • Added test that validates that TaskInfo is transferred only once across recoveries
  • Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Jun 16, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Baibaiwuguo Thanks for contribution. Overall LGTM. But I'd like to have @swuferhong give it a review.

@@ -58,11 +63,15 @@ public static TableStats getTableStatistics(
long rowCount = 0;
Map<String, ColumnStatistics> columnStatisticsMap = new HashMap<>();
RowType producedRowType = (RowType) producedDataType.getLogicalType();
ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering can we make it configurable but default is Runtime.getRuntime().availableProcessors() just like some configuration s3.upload.max.concurrent.uploads.

@Baibaiwuguo
Copy link
Contributor Author

@luoyuxia I also think tasks can be configured more rationally. I will refine this issue.

@Baibaiwuguo
Copy link
Contributor Author

@luoyuxia I find the code is called in multiple places. We make it configurable, we need change more moudles and we get more parameters. if we set parameter in hadoop config,both orc and parquet can use this parameter. Could you give me some idea?

@swuferhong
Copy link
Contributor

@luoyuxia I find the code is called in multiple places. We make it configurable, we need change more moudles and we get more parameters. if we set parameter in hadoop config,both orc and parquet can use this parameter. Could you give me some idea?

Hi, did you encounter the problem of slow reporting ORC statistics during using hive connector? If that, I think you can add this parameter into HiveOptions as a Flink conf, and you need to set this flink conf into job conf in method HiveSourceBuilder.setFlinkConfigurationToJobConf() (jobConf will be add into hadoopConf in hive source) . By doing this, you can get this parameter from hadoopConf, if this parameter not in hadoopConf, you can set it as Runtime.getRuntime().availableProcessors() as default. WDYT, @luoyuxia .

@luoyuxia
Copy link
Contributor

I will also recommend to only support to configure in HiveOption in the first.

@Baibaiwuguo
Copy link
Contributor Author

@luoyuxia I add configure in HiveOption. I change both of orc and parquet.

@Baibaiwuguo
Copy link
Contributor Author

@dongjoon-hyun @swuferhong @luoyuxia Could you help review when you are free.

@luoyuxia
Copy link
Contributor

@Baibaiwuguo The test fails.

Copy link
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@baiwuchang Thanks for updating. I left minor comments. PTAL

@@ -134,6 +134,13 @@ public class HiveOptions {
+ " custom: use policy class to create a commit policy."
+ " Support to configure multiple policies: 'metastore,success-file'.");

public static final ConfigOption<Integer> TABLE_EXEC_HIVE_READ_FORMAT_STATISTICS_THREAD_NUM =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remeber to add the doc for the newly added option.

try {
long rowCount = 0;
Map<String, ColumnStatistics> columnStatisticsMap = new HashMap<>();
RowType producedRowType = (RowType) producedDataType.getLogicalType();

ExecutorService executorService = Executors.newFixedThreadPool(statisticsThreadNum);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Executors.newFixedThreadPool(
                        statisticsThreadNum,
                        new ExecutorThreadFactory("orc-get-table-statistic-worker"));

?

try {
Map<String, Statistics<?>> columnStatisticsMap = new HashMap<>();
RowType producedRowType = (RowType) producedDataType.getLogicalType();
ExecutorService executorService = Executors.newFixedThreadPool(statisticsThreadNum);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dito

@@ -373,13 +374,18 @@ private TableStats getMapRedInputFormatStatistics(
.toLowerCase();
List<Path> files =
inputSplits.stream().map(FileSourceSplit::path).collect(Collectors.toList());
int statisticsThreadNum = flinkConf.get(TABLE_EXEC_HIVE_READ_FORMAT_STATISTICS_THREAD_NUM);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check the thread num is not less than 1;

@Baibaiwuguo
Copy link
Contributor Author

I changed some of the logic. The task read the file footer in parallel and calculate the file footer in serial.
I add the doc for the newly added option. I need you to review it when you free.

@Baibaiwuguo Baibaiwuguo requested a review from luoyuxia July 6, 2023 04:06
� Conflicts:
�	flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveOptions.java
@luoyuxia
Copy link
Contributor

luoyuxia commented Jul 6, 2023

I'll have a look when i'm free. Thxs.

Copy link
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Baibaiwuguo Thanks for updating. I left minor comment. PTAL.
Please rememer to append a commit to address my comments.

@@ -206,6 +206,10 @@ Users can do some performance tuning by tuning the split's size with the follow
- Currently, these configurations for tuning split size only works for the Hive table stored as ORC format.
{{< /hint >}}

### Read Table Statistics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't forget to also update chinese doc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also specify why we may need to scan the table's to get statistics.
When the table statistic is not available from Hive metastore, we will then try to get the statistic by scanning the table.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -134,6 +134,13 @@ public class HiveOptions {
+ " custom: use policy class to create a commit policy."
+ " Support to configure multiple policies: 'metastore,success-file'.");

public static final ConfigOption<Integer> TABLE_EXEC_HIVE_READ_FORMAT_STATISTICS_THREAD_NUM =
key("table.exec.hive.read-format-statistics.thread-num")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After rethink it, I don't think we don't need to expose the word format which I think may make user confused. So, I'd like to rename it to
table.exec.hive.read-statistics.thread-num.
WDYT?

}
}
}
executorService.shutdownNow();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use try {} finnal{} to shutdown the executorService

}
executorService.shutdownNow();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dito

@Baibaiwuguo
Copy link
Contributor Author

@luoyuxia Thank you for the very careful and patient review. I have fix your comment. PTAL

@Baibaiwuguo Baibaiwuguo requested a review from luoyuxia July 6, 2023 13:02
Copy link
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Baibaiwuguo Thanks for updating. I left minor comment. Should LGTM in next iteration.

@@ -135,7 +135,7 @@ public class HiveOptions {
+ " Support to configure multiple policies: 'metastore,success-file'.");

public static final ConfigOption<Integer> TABLE_EXEC_HIVE_READ_FORMAT_STATISTICS_THREAD_NUM =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TABLE_EXEC_HIVE_READ_FORMAT_STATISTICS_THREAD_NUM -> TABLE_EXEC_HIVE_READ_STATISTICS_THREAD_NUM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -190,6 +190,10 @@ Flink 允许你灵活的配置并发推断策略。你可以在 `TableConfig`
- 目前上述参数仅适用于 ORC 格式的 Hive 表。
{{< /hint >}}

### 读取表统计信息

当hive metastore(如`orc`或`parquet`)中没有表的统计信息时,需要扫描表获取信息。你可以使用`table.exec.hive.read-statistics.thread-num`去配置扫描线程数。默认值是当前系统可用处理器数,你配置的值应该大于0。
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion:
当hive metastore 中没有表的统计信息时,Flink 会尝试扫描表来获取统计信息从而生成合适的执行计划。此过程可以会比较耗时,你可以使用table.exec.hive.read-statistics.thread-num去配置使用多少个线程去扫描表,默认值是当前系统可用处理器数,配置的值应该大于0。

@@ -208,7 +208,9 @@ Users can do some performance tuning by tuning the split's size with the follow

### Read Table Statistics

To obtain hive table statistics faster, When hive table format is `orc` or `parquet`. You can use `table.exec.hive.read-format-statistics.thread-num` to configure the thread number. The default value is the number of available processors in the current system and the configured value should be bigger than 0.
When the table statistic is not available from Hive metastore, such as `orc` or `parquet`. We will then try to get the statistic by scanning the table.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion:
When the table statistic is not available from the Hive meta store, Flink will try to scan the table to get the statistic to generate a better execution plan. It may cost some time to get the statistic.

@@ -208,7 +208,9 @@ Users can do some performance tuning by tuning the split's size with the follow

### Read Table Statistics

To obtain hive table statistics faster, When hive table format is `orc` or `parquet`. You can use `table.exec.hive.read-format-statistics.thread-num` to configure the thread number. The default value is the number of available processors in the current system and the configured value should be bigger than 0.
When the table statistic is not available from Hive metastore, such as `orc` or `parquet`. We will then try to get the statistic by scanning the table.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion:
When the table statistic is not available from the Hive metastore, Flink will try to scan the table to get the statistic to generate a better execution plan. It may cost some time to get the statistic. To get it faster, you can use table.exec.hive.read-statistics.thread-num to configure how many threads to use to scan the table.
The default value is the number of available processors in the current system and the configured value should be bigger than 0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@Baibaiwuguo
Copy link
Contributor Author

@luoyuxia Thanks. I have fix your comment.

@Baibaiwuguo Baibaiwuguo requested a review from luoyuxia July 7, 2023 07:52
Copy link
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Baibaiwuguo Thanks for updating. LGTM.
But I would like have @swuferhong giave another final review.

@Baibaiwuguo
Copy link
Contributor Author

@luoyuxia thanks for your review!

@Baibaiwuguo
Copy link
Contributor Author

@swuferhong hi, thanks for you ideas. Can you help me to review it when you are free?

Copy link
Contributor

@swuferhong swuferhong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @Baibaiwuguo . LGTM +1

@luoyuxia luoyuxia merged commit 0f2ee60 into apache:master Jul 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants