[3.0][flink-cdc-composer] Connector JAR discovery utilities#2662
[3.0][flink-cdc-composer] Connector JAR discovery utilities#2662PatrickRen merged 5 commits intoapache:masterfrom
Conversation
f7080b6 to
edced27
Compare
PatrickRen
left a comment
There was a problem hiding this comment.
@GOODBOY008 Thanks for the PR! I left some comments.
| private FactoryDiscoveryUtils() {} | ||
|
|
||
| /** Returns the {@link Factory} for the given identifier. */ | ||
| public static Factory getFactoryByIdentifier(String identifier) { |
There was a problem hiding this comment.
Thinking about the scenario: MySQL connector defines two classes: MySQLDataSourceFactory and MySQLDataSinkFactory, and both factories use "mysql" as the identifier. In this case this method will throw exception because two factories are found.
I think using generic type here would be more friendly to callers, like this one in Flink. To resolve the issue above, callers can use getFactoryByIdentifier("mysql", DataSourceFactory.class) to get the only MySQLDataSourceFactory.
|
|
||
| @Override | ||
| public void handleEventFromOperator(int i, int i1, OperatorEvent operatorEvent) | ||
| public void notifyCheckpointAborted(long checkpointId) { |
There was a problem hiding this comment.
Maybe splitting this into a hotfix commit, or sync with @ruanhang1993 ?
c8372f6 to
6e6d2ed
Compare
PatrickRen
left a comment
There was a problem hiding this comment.
@GOODBOY008 Thanks for the update! I left some comments. Also please add tests for new utilities.
| /** Returns the {@link Factory} for the given identifier. */ | ||
| @SuppressWarnings("unchecked") | ||
| public static <T extends Factory> T getFactoryByIdentifier( | ||
| String identifier, Class<T> implementClass) { |
There was a problem hiding this comment.
What about using factoryClass instead of implementClass for the second argument? The function uses a factory interface for searching an implementation.
| factoryList.add(factory); | ||
| } | ||
| } catch (Throwable e) { | ||
| if (e.getCause() instanceof NoClassDefFoundError) { |
There was a problem hiding this comment.
I don't quite get the logic of handling NoClassDefFoundError here. The same logic in Flink is for handling a special case where flink-connector-files is missing. I think we don't have that issue in our project, so maybe just throw the exception directly?
6e6d2ed to
6ff9469
Compare
PatrickRen
left a comment
There was a problem hiding this comment.
@GOODBOY008 Thanks for the update! LGTM overall. I pushed a commit to use AssertJ-style assertions in FactoryDiscoveryUtilsTest. Will merge the PR after all CI tests pass
After #2661 (review) merged will carry on.
[3.0][flink-cdc-composer] Connector JAR discovery utilities.
Close #2620