New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-19365][hive] Migrate Hive source to FLIP-27 source interface f… #13915
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 197131c (Wed Nov 04 03:50:36 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
...nnectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
Outdated
Show resolved
Hide resolved
...nnectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
Outdated
Show resolved
Hide resolved
...nnectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
Outdated
Show resolved
Hide resolved
...nnectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
Outdated
Show resolved
Hide resolved
JobConf jobConf, | ||
CatalogTable catalogTable, | ||
List<HiveTablePartition> partitions, | ||
int[] projectedFields, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already has producedType
, I think projectedFields
can be removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HiveMapredSplitReader
still requires it. We can remove it when we refactor HiveMapredSplitReader
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can create projectedFields
from producedType
easily.
...nnectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSource.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
private void serialize(ObjectOutputStream outputStream, HiveSourceSplit split) throws IOException { | ||
byte[] superBytes = FileSourceSplitSerializer.INSTANCE.serialize(new FileSourceSplit( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you want to create a new FileSourceSplit
? Why not just use HiveSourceSplit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to reuse FileSourceSplitSerializer to serialize the super class, but FileSourceSplitSerializer doesn't accept sub-classes.
Or we can just call InstantiationUtil.serializeObject
to serialize the whole object. WDYT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but FileSourceSplitSerializer doesn't accept sub-classes.
Oh, that is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am OK to keep this as it is.
...connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveSourceSplitSerializer.java
Show resolved
Hide resolved
...onnector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveBulkFormatAdapter.java
Outdated
Show resolved
Hide resolved
...link-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveSourceSplit.java
Outdated
Show resolved
Hide resolved
partitionKeys, | ||
jobConfWrapper.conf().get(HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname, | ||
HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultStrVal), | ||
(PartitionValueConverter) (colName, valStr, type) -> split.getHiveTablePartition().getPartitionSpec().get(colName), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will fail when partition values can not be found in path.
I create #13919 to refactor this.
JobConf clonedConf = new JobConf(jobConfWrapper.conf()); | ||
addSchemaToConf(clonedConf); | ||
HiveTableInputSplit oldSplit = new HiveTableInputSplit(-1, split.toMapRedSplit(), clonedConf, split.getHiveTablePartition()); | ||
hiveMapredSplitReader = new HiveMapredSplitReader(clonedConf, partitionKeys, fieldTypes, selectedFields, oldSplit, hiveShim); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you don't want migrate HiveMapredSplitReader
now, can you create JIRA for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just created https://issues.apache.org/jira/browse/FLINK-19965 for that
c85d007
to
54abdd1
Compare
@JingsongLi Thanks for the review. I have rebased and addressed your comments. |
new org.apache.flink.core.fs.Path[1], | ||
new HiveSourceFileEnumerator.Provider(partitions, new JobConfWrapper(jobConf)), | ||
DEFAULT_SPLIT_ASSIGNER, | ||
createBulkFormat(new JobConf(jobConf), catalogTable, hiveVersion, producedDataType, useMapRedReader, limit), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you should use LimitableBulkFormat
here.
parallelism = Math.min(parallelism, (int) limit / 1000); | ||
int limit(Long limit) { | ||
if (limit != null) { | ||
parallelism = Math.min(parallelism, limit.intValue() / 1000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you change limit.intValue() / 1000
to (int) (limit / 1000)
private final RowType producedDataType; | ||
private final boolean useMapRedReader; | ||
// We should limit the input read count of the splits, null represents no limit. | ||
private final Long limit; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove limit things in this class
private final DataType[] fieldTypes; | ||
private final String hiveVersion; | ||
private final HiveShim hiveShim; | ||
private final RowType producedDataType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
producedRowType
String hiveVersion, | ||
boolean useMapRedReader, | ||
boolean isStreamingSource, | ||
RowType producedDataType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
producedRowType
} | ||
|
||
private boolean reachLimit() { | ||
return limit != null && numRead >= limit; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove limit things
...ors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/HiveTableSource.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, Thanks @lirui-apache for update.
…or batch
What is the purpose of the change
Migrate hive connector to FLIP-27 source for batch reading
Brief change log
HiveSourceSplit
as sub-class ofFileSourceSplit
. It remembers the partition which a split belongs to.HiveSourceSplitSerializer
as SerDe forHiveSourceSplit
.HiveSource
as sub-class ofAbstractFileSource
, and works withRowData
andHiveSourceSplit
.HiveSourceFileEnumerator
as an implementation ofFileEnumerator
, but it generates splits based on partitions rather than paths.HiveBulkFormatAdapter
as an implementation ofBulkFormat<RowData, HiveSourceSplit>
. It delegates the reading to other BulkFormat instances based on the information we have from each partition to read.PartitionValueConverter
to take partition col name when converting the partition value. This makes it easier for hive to reuseParquetColumnarRowInputFormat
because the partition values have been pre-computed.Verifying this change
Existing tests
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation