Skip to content

Conversation

@luoyuxia
Copy link
Contributor

@luoyuxia luoyuxia commented Mar 13, 2023

What is the purpose of the change

This is to make Hive dialect use public interfaces provided by pluggable dialect.
After this, we can then remove provided dependency of flink-table-planner in FLINK-31413

Brief change log

The basic idea comes from FLIP-216.

  • In first commit, we introduce a slim module named flink-table-calcite-bridge which ships calcite dependency and provide a CalciteContext usded by other plugin module. In the next pr, we can then make hive-connector depend on flink-table-calcite-bridge

  • In second commit, we introudce or expose some public interfaces like DialectFactory/CatalogRegistry for pulgin dialect .
    Something to add, in here we remove ExtendedOperationExecutor, as a result of which we wrap the logic in HiveExtendedOperationExecutor to HiveExecutableOperation. HiveExecutableOperation will delegate the execution logic to HiveExtendedOperationExecutor in method execute. It's some hack logic and will be refactor in other pr.

  • In thrid commit, we make HiveParser use public interface CatalogRegistry and an internal inteface CalciteContext instead of internal class/interfaces like CatalogManager/FlinkPlannerImpl/PlannerContext, ...

Verifying this change

The change has been verified by existing tests.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 13, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@luoyuxia luoyuxia marked this pull request as draft March 13, 2023 06:08
@luoyuxia luoyuxia marked this pull request as ready for review March 13, 2023 06:21
@luoyuxia luoyuxia changed the title Flink 31409 [FLINK-31409][hive] Hive connector should use public interfaces for Hive dialct Mar 13, 2023
@luoyuxia luoyuxia changed the title [FLINK-31409][hive] Hive connector should use public interfaces for Hive dialct [FLINK-31409][hive] Hive dialect should use public interfaces Mar 13, 2023
Comment on lines 46 to 60
"mvn dependency:tree" as of Calcite 1.26.0:
[INFO] +- org.apache.calcite:calcite-core:jar:1.26.0:compile
[INFO] | +- org.apache.calcite:calcite-linq4j:jar:1.26.0:compile
[INFO] | +- com.fasterxml.jackson.core:jackson-annotations:jar:2.12.1:compile
[INFO] | +- org.apiguardian:apiguardian-api:jar:1.1.0:compile
[INFO] | +- com.esri.geometry:esri-geometry-api:jar:2.2.0:runtime
[INFO] | +- com.fasterxml.jackson.core:jackson-core:jar:2.12.1:runtime
[INFO] | +- com.fasterxml.jackson.core:jackson-databind:jar:2.12.1:runtime
[INFO] | +- com.jayway.jsonpath:json-path:jar:2.4.0:runtime
[INFO] | | \- net.minidev:json-smart:jar:2.3:runtime
[INFO] | | \- net.minidev:accessors-smart:jar:1.2:runtime
[INFO] | | \- org.ow2.asm:asm:jar:5.0.4:runtime
[INFO] | +- commons-codec:commons-codec:jar:1.10:runtime
[INFO] | +- org.apache.commons:commons-lang3:jar:3.3.2:compile
[INFO] | \- commons-io:commons-io:jar:2.4:compile
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be 1.29.0 dependencies like at https://github.com/apache/flink/pull/22166/files#diff-6d81d722fce4e303d49b2136ae8e137f74ed3f4ece4d2a990445bcf6a2388392L132-L147

Suggested change
"mvn dependency:tree" as of Calcite 1.26.0:
[INFO] +- org.apache.calcite:calcite-core:jar:1.26.0:compile
[INFO] | +- org.apache.calcite:calcite-linq4j:jar:1.26.0:compile
[INFO] | +- com.fasterxml.jackson.core:jackson-annotations:jar:2.12.1:compile
[INFO] | +- org.apiguardian:apiguardian-api:jar:1.1.0:compile
[INFO] | +- com.esri.geometry:esri-geometry-api:jar:2.2.0:runtime
[INFO] | +- com.fasterxml.jackson.core:jackson-core:jar:2.12.1:runtime
[INFO] | +- com.fasterxml.jackson.core:jackson-databind:jar:2.12.1:runtime
[INFO] | +- com.jayway.jsonpath:json-path:jar:2.4.0:runtime
[INFO] | | \- net.minidev:json-smart:jar:2.3:runtime
[INFO] | | \- net.minidev:accessors-smart:jar:1.2:runtime
[INFO] | | \- org.ow2.asm:asm:jar:5.0.4:runtime
[INFO] | +- commons-codec:commons-codec:jar:1.10:runtime
[INFO] | +- org.apache.commons:commons-lang3:jar:3.3.2:compile
[INFO] | \- commons-io:commons-io:jar:2.4:compile
"mvn dependency:tree" as of Calcite 1.29.0:
[INFO] +- org.apache.calcite:calcite-core:jar:1.29.0:compile
[INFO] | +- org.apache.calcite:calcite-linq4j:jar:1.29.0:compile
[INFO] | +- com.esri.geometry:esri-geometry-api:jar:2.2.0:compile
[INFO] | +- com.fasterxml.jackson.core:jackson-annotations:jar:2.13.4:compile
[INFO] | +- org.apache.calcite.avatica:avatica-core:jar:1.20.0:compile
[INFO] | +- com.fasterxml.jackson.core:jackson-core:jar:2.13.4:compile
[INFO] | +- com.fasterxml.jackson.core:jackson-databind:jar:2.13.4.2:compile
[INFO] | +- com.jayway.jsonpath:json-path:jar:2.4.0:runtime
[INFO] | | \- net.minidev:json-smart:jar:2.3:runtime
[INFO] | | \- net.minidev:accessors-smart:jar:1.2:runtime
[INFO] | | \- org.ow2.asm:asm:jar:5.0.4:runtime
[INFO] | +- commons-codec:commons-codec:jar:1.15:runtime
[INFO] | \- commons-io:commons-io:jar:2.11.0:compile

@luoyuxia luoyuxia force-pushed the FLINK-31409 branch 2 times, most recently from ceb3607 to 2b0107c Compare March 13, 2023 12:45
@luoyuxia
Copy link
Contributor Author

@wuchong @lincoln-lil Could you please help review when you are free.

Copy link
Contributor

@lincoln-lil lincoln-lil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@luoyuxia thanks for driving the hive decoupling! Overall looks good to me, and I've left some minor comments.

parser
}

def getOperationTreeBuilder: OperationTreeBuilder = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be 'private'

)
}

def getTableReferenceExpression(path: String): Optional[TableReferenceExpression] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Optional.of(ApiExpressionUtils.tableRef(path, queryOperation))
}
} catch {
case _: SqlParserException => Optional.empty()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the exception here not thrown directly? Are there any test cases related to this exception path?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's copied from TableEnvironmentImpl, I think we can also copy comment to here.
Haven't find any test cases related to this exception path. Considering it's copied from existing codebase , may be we can add test in a dedicated pr if need.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to create another followup issue to add a related test

resultRows = tableResultInternal.collectInternal();
rowDataToStringConverter = tableResultInternal.getRowDataToStringConverter();
} else {
// sometime, the tableResult maybe not an instance of TableResultInternal
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 'sometimes'

Copy link
Contributor

@lincoln-lil lincoln-lil left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM +1

Optional.of(ApiExpressionUtils.tableRef(path, queryOperation))
}
} catch {
case _: SqlParserException => Optional.empty()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good to create another followup issue to add a related test

@lincoln-lil lincoln-lil self-requested a review March 21, 2023 04:01
@lincoln-lil
Copy link
Contributor

@luoyuxia seems the latest change failed to pass the style check, PTAL
otherwise +1

@luoyuxia
Copy link
Contributor Author

@flinkbot run azure

FunctionCatalog getFunctionCatalog();

/**
* Create the {@link RelOptTable.ToRelContext} used t0 convert a table into a relational
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* Create the {@link RelOptTable.ToRelContext} used t0 convert a table into a relational
* Create a new instance of {@link RelOptTable.ToRelContext} used to convert a table into a relational

Parser create(Context context);

/** Create an extended operation executor. */
default ExtendedOperationExecutor createExtendedOperationExecutor(Context context) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking of removing the ExtendedOperationExecutor because we have introduced ExecutableOperation which is a self-contained way to support the operation execution. In this way, we can rename DialectFactory back to ParserFactory to align with the original FLIP proposal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing ExtendedOperationExecutor is good idea. It makes everything simple. I have updated it for removing and renming.
Hope the internal ExecutableOperation interface won't change frequently

} else {
// sometimes, the tableResult maybe not an instance of TableResultInternal
// in the case of pluggable dialect implements method
// ExtendedOperationExecutor#executeOperation, and it doesn't return TableResultInternal
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which case doesn't return TableResultInternal?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous, we have ExtendedOperationExecutor#executeOperation marked as PublicEvolving, so the result of this method should be TableResult which is also marked as PublicEvolving. Since this interface don't require implementer to return TableResultInternal, the implementer can return anything that implements TableResult.

But now, I have removed ExtendedOperationExecutor and reverted this change.

* @return the content of the execution result.
*/
TableResultInternal executeInternal(Operation operation);
TableResult executeInternal(Operation operation);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to change this?

Copy link
Contributor Author

@luoyuxia luoyuxia Mar 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous, we have ExtendedOperationExecutor#executeOperation marked as PublicEvolving, so the result of this method should be TableResult which is also marked as PublicEvolving. So we should also change it to TableResult in here.

Now, I revert this changes since we remove ExtendedOperationExecutor.

Comment on lines 180 to 191
private def getOperationTreeBuilder: OperationTreeBuilder = {
OperationTreeBuilderImpl.create(
tableConfig,
classLoader,
functionCatalog.asLookup(f => getParser.parseIdentifier(f)),
catalogManager.getDataTypeFactory,
(path: String) => getTableReferenceExpression(path),
(s: String, inputRowType: RowType, outputType) =>
getParser.parseSqlExpression(s, inputRowType, outputType),
isStreamingMode
)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The creation of OperationTreeBuilderImpl is super complex, and the logic is duplicated. This is difficult to maintain logic consistency in the future.

Besides, the extracted OperationTreeBuilder seems incomplete (e.g, join) and is never used. I would prefer to introduce the OperationTreeBuilder when we indeed need this, otherwise, it only increases the overhead of maintenance. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do agree. We can add it in the future when we do need it. That does make everything simple.

Btw, the reason that OperationTreeBuilder miss join is the JoinType is not marked as PublicEvolving , so we can't expose it in OperationTreeBuilder.

@luoyuxia luoyuxia force-pushed the FLINK-31409 branch 2 times, most recently from 636f992 to 7612cca Compare March 24, 2023 09:35
@luoyuxia
Copy link
Contributor Author

I reoriginize the commits intead of appending for this change make append commit hard to track. I also update the brief change log in the description for this pr.

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the update. I think this is in good shape. During reviewing the code, I find the methods of CatalogRegistry are not smooth to use. For example,

  1. the ResolvedCatalogBaseTable vs ContextResolvedTable mentioned above,
  2. the Optional return type is trivial, we always throw exceptions when working with an empty Optional result.
  3. it's a long method chain to get the current catalog, but this is a frequent call.

I'm thinking about revisiting the design of CatalogRegistry into this:

/** A catalog registry for dealing with catalogs. */
@PublicEvolving
public interface CatalogRegistry {
    /** Get the current database. */
    String getCurrentDatabase();

    /** Gets the current catalog. */
    String getCurrentCatalog();

    /**
     * Returns the full name of the given table path, this name may be padded with current
     * catalog/database name based on the {@code identifier's} length.
     *
     * @param identifier an unresolved identifier
     * @return a fully qualified object identifier
     */
    ObjectIdentifier qualifyIdentifier(UnresolvedIdentifier identifier);

    /** Gets the current catalog. */
    Catalog getCurrentCatalog();

    /**
     * Gets a catalog by name.
     *
     * @param catalogName name of the catalog to retrieve
     * @return the requested catalog
     * @throws CatalogNotExistException if the catalog does not exist
     */
    Catalog getCatalogOrError(String catalogName);

    /**
     * Gets a fully qualified and resolved table with context by name. If the path is not yet fully
     * qualified use {@link #qualifyIdentifier(UnresolvedIdentifier)} first.
     *
     * @param objectIdentifier full path of the table to retrieve
     * @return resolved table with context that the path points to
     * @throws TableNotExistException if the table does not exist
     */
    ContextResolvedTable getTableOrError(ObjectIdentifier objectIdentifier);

    /**
     * Return whether the table with a fully qualified table path is temporary or not.
     *
     * @param objectIdentifier full path of the table
     * @return the table is temporary or not.
     */
    boolean isTemporaryTable(ObjectIdentifier objectIdentifier);

    /**
     * Retrieves a partition with a fully qualified table path and partition spec.
     *
     * @param tableIdentifier full path of the table to retrieve
     * @param partitionSpec full partition spec
     * @return partition in the table.
     */
    Optional<CatalogPartition> getPartition(
            ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec);
    
    /**
     * Retrieves a partition with a fully qualified table path and partition spec.
     *
     * @param tableIdentifier full path of the table to retrieve
     * @param partitionSpec full partition spec
     * @return partition in the table.
     * @throws PartitionNotExistException if the partition does not exist
     */
    CatalogPartition getPartitionOrError(
            ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec);
}

What do you think?

Collections.emptyMap());
}

private ContextResolvedTable getContextResolvedTable(ObjectIdentifier tableIdentifier) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we still need to depend on ContextResolvedTable, which is an @Internal class. The getContextResolvedTable method is somehow duplicated with CatalogManager#getTable. Besides, I don't favor the verbose method signature of CatalogRegistry#getResolvedCatalogBaseTable. What do you think about replacing the CatalogRegistry#getResolvedCatalogBaseTable with CatalogRegistry#getTableOrError? This can also make the callers easier without dealing with the empty result.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we still needs to depend on ContextResolvedTable for it's need to construct SinkModifyOperation which is also @Internal. In fact, Hive dialect still use many internal operations. Maybe we need to expose them to public then, but it may be another topic.
Considering our plan is decoupling Hive with flink-table-planner in the first, I think we can keep them to see how much these internal interfaces/classes will affect the maintainance/development of Hive dialect to take further steps.
Btw, it make me be aware of the title of the pr/jira is not correct, it should be Hive dialect shouldn't use interfaces in table-planner to build LogicPlan.

@Override
public ExtendedOperationExecutor createExtendedOperationExecutor(Context context) {
return new HiveOperationExecutor(context.getCatalogManager(), context.getPlannerContext());
return new HiveParser((CalciteContext) context);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment about the hard cast.

@luoyuxia
Copy link
Contributor Author

Thank you for the update. I think this is in good shape. During reviewing the code, I find the methods of CatalogRegistry are not smooth to use. For example,

  1. the ResolvedCatalogBaseTable vs ContextResolvedTable mentioned above,
  2. the Optional return type is trivial, we always throw exceptions when working with an empty Optional result.
  3. it's a long method chain to get the current catalog, but this is a frequent call.

I'm thinking about revisiting the design of CatalogRegistry into this:

/** A catalog registry for dealing with catalogs. */
@PublicEvolving
public interface CatalogRegistry {
    /** Get the current database. */
    String getCurrentDatabase();

    /** Gets the current catalog. */
    String getCurrentCatalog();

    /**
     * Returns the full name of the given table path, this name may be padded with current
     * catalog/database name based on the {@code identifier's} length.
     *
     * @param identifier an unresolved identifier
     * @return a fully qualified object identifier
     */
    ObjectIdentifier qualifyIdentifier(UnresolvedIdentifier identifier);

    /** Gets the current catalog. */
    Catalog getCurrentCatalog();

    /**
     * Gets a catalog by name.
     *
     * @param catalogName name of the catalog to retrieve
     * @return the requested catalog
     * @throws CatalogNotExistException if the catalog does not exist
     */
    Catalog getCatalogOrError(String catalogName);

    /**
     * Gets a fully qualified and resolved table with context by name. If the path is not yet fully
     * qualified use {@link #qualifyIdentifier(UnresolvedIdentifier)} first.
     *
     * @param objectIdentifier full path of the table to retrieve
     * @return resolved table with context that the path points to
     * @throws TableNotExistException if the table does not exist
     */
    ContextResolvedTable getTableOrError(ObjectIdentifier objectIdentifier);

    /**
     * Return whether the table with a fully qualified table path is temporary or not.
     *
     * @param objectIdentifier full path of the table
     * @return the table is temporary or not.
     */
    boolean isTemporaryTable(ObjectIdentifier objectIdentifier);

    /**
     * Retrieves a partition with a fully qualified table path and partition spec.
     *
     * @param tableIdentifier full path of the table to retrieve
     * @param partitionSpec full partition spec
     * @return partition in the table.
     */
    Optional<CatalogPartition> getPartition(
            ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec);
    
    /**
     * Retrieves a partition with a fully qualified table path and partition spec.
     *
     * @param tableIdentifier full path of the table to retrieve
     * @param partitionSpec full partition spec
     * @return partition in the table.
     * @throws PartitionNotExistException if the partition does not exist
     */
    CatalogPartition getPartitionOrError(
            ObjectIdentifier tableIdentifier, CatalogPartitionSpec partitionSpec);
}

What do you think?

Thanks for the advice, I like the idea that getOrError to make caller easy to use.
So, I add getCatalogOrError to this method since it'll always throw error in Hive dialect if the catalog doesn't exist.

But for some other methods getTableOrError, the HiveParser actually won't always throw exception in this method .
So, we'll still need another method getTable which returns an option.

And for the method getPartitionOrError, it will throw PartitionNotExistException which is not a runtime exception.
The caller like HiveParser will still need to catch the exception and then wrap it to a RunTimeException. For me, it seems not to be much easy to call this method, at lest in HiveParser with existing codebase.

Beside, I'm trying to expose less interfaces in this case that it seems not as so inconvenient to use to me and some extra interfaces seem to be verbose/duplicated to me.

Btw, ContextResolvedTable is an Internal class, so we still can't return ContextResolvedTable in method CatalogRegistry#getResolvedCatalogBaseTable. To simply the name, I rename it to CatalogRegistry#getCatalogBaseTable.

@luoyuxia
Copy link
Contributor Author

@wuchong Thanks for your review. I have addressed your comments in 9e508a,
9a5350.

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@luoyuxia thank you for the update. LGTM.

@luoyuxia luoyuxia closed this in f476675 Apr 27, 2023
@luoyuxia
Copy link
Contributor Author

Thanks all for the review. :-)

xishuaidelin pushed a commit to xishuaidelin/flink that referenced this pull request Apr 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants