-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-13290][table-planner-blink][hbase] SinkCodeGenerator should not compare row type field names and enable blink planner for hbase IT case #9275
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
cc @carp84 could you help review the hbase part? |
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
| return descriptorProperties.asMap(); | ||
| } | ||
|
|
||
| @Ignore |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why ignore?
| public class HBaseSinkITCase extends HBaseTestingClusterAutostarter { | ||
| private static final long serialVersionUID = 1L; | ||
|
|
||
| private static final String BLINK_PLANNER = "Blink"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use enum instead?
| val fieldIndexProcessCode = | ||
| if (getCompositeTypes(convertOutputType).map(fromTypeInfoToLogicalType) sameElements | ||
| inputTypeInfo.getFieldTypes.map(fromTypeInfoToLogicalType)) { | ||
| if (getCompositeTypes(convertOutputType) sameElements inputTypeInfo.getFieldTypes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it not work now...
inputTypeInfo is a BaseRowTypeInfo with internal type informations.
And convertOutputType is the type information from sink and users.
Consider Types.STRING and BinaryStringTypeInfo, Types.DECIMAL and DecimalTypeInfo.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And I think maybe you can try LogicalType.asSerializableString
| public void testHBaseLookupTableSource() throws Exception { | ||
| EnvironmentSettings settings = EnvironmentSettings.newInstance() | ||
| // only blink planner support lookup table source | ||
| .useBlinkPlanner().inStreamingMode().build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use parameterized test to both cover blink-planner and flink-planner just like the way in HBaseSinkITCase, and remove the testHBaseLookupFunction test.
9e7c51e to
cef04c0
Compare
|
Hi @carp84 , @JingsongLi , I have updated the PR. I split it into 3 commits:
cc @twalthr , it would be great if you can help to review 0a59bfc. |
twalthr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @wuchong, I had a look at 0a59bfc and some questions regarding the use case for this utility method. Wouldn't it be enough to just go through the types returned by getChildren() for structured, distinct type, and row types? If we skip the field names we could also skip more logical attributes such as isFinal, description, etc. and focus on the pure "more physical" types.
| } | ||
|
|
||
| public static boolean areTypeEqualsWithoutNames(LogicalType thisType, LogicalType thatType) { | ||
| if (thisType == null || thatType == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to add a precondition here. Types should not be null. If they are null, it is a bug that should throw an exception.
| } | ||
| } | ||
|
|
||
| private static class TypeEquivalenceIgnoreNamesExtractor extends Extractor<Boolean> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should directly extend from LogicalTypeDefaultVisitor because we override the default method anyway.
| * Returns true if tow LogicalType can equal. This is the same with {@link LogicalType#equals(Object)}. | ||
| */ | ||
| private boolean canEqual(LogicalType thatType) { | ||
| if (thisType == null || thatType == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as above regarding null checking
| if (!canEqual(thatType)) { | ||
| return false; | ||
| } | ||
| boolean thisIsFinal = ((DistinctType) thisType).isFinal(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Distinct types are always final. We don't need to check that.
| return logicalType.accept(SINGLE_FIELD_INTERVAL_EXTRACTOR); | ||
| } | ||
|
|
||
| public static boolean areTypeEqualsWithoutNames(LogicalType thisType, LogicalType thatType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: rename to hasEqualTypesWithoutNames
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reason to use hasEqualTypesWithoutNames? It looks like whether it contains some equal types (might children types are equal).
| RowType.RowField thatField = thatFields.get(i); | ||
| // ignore field names | ||
| if (!thisField.getType().equals(thatField.getType()) | ||
| || !Objects.equals(thisField.getDescription(), thatField.getDescription())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do we check the description? Isn't it as meaningless as the field names?
| return thisIsFinal == thatType.isFinal() && | ||
| thisObjectIdentifier.equals(thatType.getObjectIdentifier()) && | ||
| thisDescription.equals(thatType.getDescription()) && | ||
| areTypeEqualsWithoutNames(thisSourceType, thatType.getSourceType()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can just compare the source type. Everything else is logical meta data, no?
|
Hi @twalthr , thanks for the advice. The implementation of type equals has been much simpler. Please have a look again. |
carp84
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for further refactoring the patch @wuchong , added some more comments.
|
|
||
| package org.apache.flink.addons.hbase.util; | ||
|
|
||
| import org.apache.flink.addons.hbase.HBaseConnectorITCase2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The L21-L26 imports are unused and should be removed, let alone L21 introduces a compilation error.
The same applies to the L32 import.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, it seems this issue is already resolved by the later force push, and now the code here looks good.
| }; | ||
| // split keys | ||
| byte[][] splitKeys = new byte[][]{ Bytes.toBytes(4) }; | ||
| createTable(tableName, families, splitKeys); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: we could extract L101-L111 out to remove some duplicated codes.
| // https://github.com/apache/hbase/blob/master/hbase-server/src/test/java/org/apache/hadoop/hbase/filter/FilterTestingCluster.java | ||
| // | ||
| public class HBaseTestingClusterAutostarter extends TestLogger implements Serializable { | ||
| public abstract class HBaseTestingClusterAutostarter extends AbstractTestBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: HBaseTestingClusterAutostarter -> HBaseTestingClusterAutoStarter
| @Test | ||
| public void testTableInputFormat() throws Exception { | ||
| if (BLINK_PLANNER.equals(planner)) { | ||
| // this case is for testing TableInputFormat which is not works for flink-table |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is a little bit ambiguous, from the code logic it seems we should say "this case is for testing TableInputFormat which only applies to flink-table planner" or so?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not exactly. I mean TableInputFormat is a runtime implementation, not for flink table, not relative to what the planner is. You can see the test is only verified on DataSet, not on TableEnvironment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After some offline discussion, the case is irrelative to planner actually, but in case that later change somehow making the test case relative to planner implementation (although low possibility), let's just don't skip the test for any planner.
twalthr
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @wuchong. The code looks beautiful now. The only concern left is the naming of the method and visitor. Because it does more than just ignoring names now. How about hasEqualAtomicTypes?
Add a areTypesCompatible() method to LogicalTypeChecks. This will compare two LogicalTypes without field names and other logical attributes (e.g. description, isFinal).
…re row type field names
This commit combines HBaseTableSourceITCase and HBaseLookupFunctionITCase and HBaseConnectorITCase into one class. This can save much cluster initialization time for us.
|
As discussed with @twalthr , we would like to go with PR updated and squashed. |
carp84
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The latest changes LGTM. Thanks for the efforts @wuchong !
This commit combines HBaseTableSourceITCase and HBaseLookupFunctionITCase and HBaseConnectorITCase into one class. This can save much cluster initialization time for us. This closes #9275
This commit combines HBaseTableSourceITCase and HBaseLookupFunctionITCase and HBaseConnectorITCase into one class. This can save much cluster initialization time for us. This closes apache#9275
What is the purpose of the change
Fix
SinkCodeGeneratorshouldn't compare type field names. Also we added blink planner for hbase IT cases.Brief change log
SinkCodeGeneratorstill use type information to compareVerifying this change
SinkCodeGeneratorDoes this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation