New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-29980] Handle partition keys directly in hive bulk format #21290
base: master
Are you sure you want to change the base?
Conversation
4c3b6bf
to
0a9702c
Compare
0a9702c
to
8558e4d
Compare
@flinkbot run azure |
hi @luoyuxia , can you help review this pr, thanks |
Thanks for contribution, I'll have a look when I'm free. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Aitozi Thanks for contribution. I left some comments. PTAL.
BTW, could you please rebase master?
|
||
public FileInfoExtractorBulkFormat( | ||
BulkFormat<RowData, FileSourceSplit> wrapped, | ||
BulkFormat<RowData, SplitT> wrapped, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change this?
DataType producedDataType, | ||
TypeInformation<RowData> producedTypeInformation, | ||
Map<String, FileSystemTableSource.FileInfoAccessor> metadataColumns, | ||
List<String> partitionColumns, | ||
String defaultPartName) { | ||
PartitionFieldExtractor<SplitT> partitionFieldExtractor) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change this? It seems like a code refactor?
// Fill the metadata + partition columns row | ||
List<FileInfoExtractor.PartitionColumn> partitionColumns = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like another code refactor? Do we really need refactor it? Is there any special reason?
|
||
/** Record mapper definition. */ | ||
@FunctionalInterface | ||
public interface RecordMapper<I, O> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why move RecordMapper
to here? I think it's fine to keep it in origin place.
@@ -58,14 +56,10 @@ OrcColumnarRowInputFormat<VectorizedRowBatch, SplitT> createPartitionedFormat( | |||
Configuration hadoopConfig, | |||
RowType tableType, | |||
List<String> partitionKeys, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then, as todo said, partitionKeys
code should be pruned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After partitionKeys
code is pruned, the name & comment of this method should change.
inputFormat, | ||
producedType, | ||
producedTypeInfo, | ||
new HashMap<>(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:Collections.emptyMap()
@@ -143,21 +139,6 @@ public HiveMapredSplitReader( | |||
|
|||
// construct reuse row | |||
this.row = new GenericRowData(selectedFields.length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
selectedFields
will contains partition column, we should exclude partition column as we handle in extern wrapper.
@@ -294,56 +286,6 @@ void testProjectionReadUnknownField(int rowGroupSize) throws IOException { | |||
}); | |||
} | |||
|
|||
@ParameterizedTest | |||
@MethodSource("parameters") | |||
void testPartitionValues(int rowGroupSize) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After remove the invalid test in parquet/orc input format test, I think we should add test for FileInfoExtractorBulkFormat
to make sure it can get partition columns correctly.
} | ||
|
||
/** Info of the partition column. */ | ||
public static class PartitionColumn implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this class?
producedRowFieldNames, mutableRowFieldNames, fixedRowFieldNames); | ||
} | ||
|
||
public List<PartitionColumn> getPartitionColumns() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just return the name of partition column? So that we won't need PartitionColumn
What is the purpose of the change
This is meant to leverage the
EnrichedRowData
to handle partition keys logic in the hive connectors. After this it will not depend on the parquet/orc formats to make up the rowdata with partition keys. Also, the formats module do not have to care about make up the partition keys.Brief change log
Verifying this change
This change is already covered by existing tests:
HiveSourceITCase
andHiveTableSourceITCase