Skip to content

[#3980]feat(flink-connector): Support partition for hive table operation#4096

Merged
FANNG1 merged 13 commits intoapache:mainfrom
coolderli:issue-3980
Jul 22, 2024
Merged

[#3980]feat(flink-connector): Support partition for hive table operation#4096
FANNG1 merged 13 commits intoapache:mainfrom
coolderli:issue-3980

Conversation

@coolderli
Copy link
Contributor

add UTs

What changes were proposed in this pull request?

  • support partitions for creating the hive table

Why are the changes needed?

Does this PR introduce any user-facing change?

  • no

How was this patch tested?

  • add ITs

@coolderli
Copy link
Contributor Author

@FANNG1 Please help review this. Thanks.

@FANNG1
Copy link
Contributor

FANNG1 commented Jul 11, 2024

LGTM, except few comments

@coolderli
Copy link
Contributor Author

coolderli commented Jul 11, 2024

@FANNG1 I fixed it. Please take a look.

@FANNG1
Copy link
Contributor

FANNG1 commented Jul 15, 2024

@coolderli , seems you add some partittion interfaces support since last review like listPartitions, is it expected?

@FANNG1
Copy link
Contributor

FANNG1 commented Jul 17, 2024

@coolderli , could you rebase the code?

@coolderli
Copy link
Contributor Author

@coolderli , seems you add some partittion interfaces support since last review like listPartitions, is it expected?

@FANNG1 I think it's necessary. I meet an exception as follows. So I have to implement it.

FlinkHiveCatalogIT > testHivePartitionTable() FAILED
    java.lang.UnsupportedOperationException
        at org.apache.gravitino.flink.connector.catalog.BaseCatalog.listPartitions(BaseCatalog.java:292)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkRecomputeStatisticsProgram.getPartitionsTableStats(FlinkRecomputeStatisticsProgram.java:180)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkRecomputeStatisticsProgram.recomputeStatistics(FlinkRecomputeStatisticsProgram.java:144)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkRecomputeStatisticsProgram.recomputeStatistics(FlinkRecomputeStatisticsProgram.java:100)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkRecomputeStatisticsProgram.access$000(FlinkRecomputeStatisticsProgram.java:66)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkRecomputeStatisticsProgram$1.visit(FlinkRecomputeStatisticsProgram.java:75)
        at org.apache.calcite.rel.core.TableScan.accept(TableScan.java:180)
        at org.apache.flink.table.planner.plan.utils.DefaultRelShuttle.$anonfun$visit$1(RelShuttles.scala:37)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.flink.table.planner.plan.utils.DefaultRelShuttle.visit(RelShuttles.scala:35)
        at org.apache.calcite.rel.RelHomogeneousShuttle.visit(RelHomogeneousShuttle.java:66)
        at org.apache.calcite.rel.logical.LogicalProject.accept(LogicalProject.java:135)
        at org.apache.flink.table.planner.plan.utils.DefaultRelShuttle.$anonfun$visit$1(RelShuttles.scala:37)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.flink.table.planner.plan.utils.DefaultRelShuttle.visit(RelShuttles.scala:35)
        at org.apache.calcite.rel.RelHomogeneousShuttle.visit(RelHomogeneousShuttle.java:90)
        at org.apache.calcite.rel.logical.LogicalSort.accept(LogicalSort.java:86)
        at org.apache.flink.table.planner.plan.utils.DefaultRelShuttle.$anonfun$visit$1(RelShuttles.scala:37)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.flink.table.planner.plan.utils.DefaultRelShuttle.visit(RelShuttles.scala:35)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkRecomputeStatisticsProgram.optimize(FlinkRecomputeStatisticsProgram.java:80)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkRecomputeStatisticsProgram.optimize(FlinkRecomputeStatisticsProgram.java:66)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
        at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
        at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
        at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
        at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
        at scala.collection.immutable.Range.foreach(Range.scala:155)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
        at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
        at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
        at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
        at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
        at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
        at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeTree(BatchCommonSubGraphBasedOptimizer.scala:93)
        at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.optimizeBlock(BatchCommonSubGraphBasedOptimizer.scala:58)
        at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1(BatchCommonSubGraphBasedOptimizer.scala:45)
        at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.$anonfun$doOptimize$1$adapted(BatchCommonSubGraphBasedOptimizer.scala:45)
        at scala.collection.immutable.List.foreach(List.scala:388)
        at org.apache.flink.table.planner.plan.optimize.BatchCommonSubGraphBasedOptimizer.doOptimize(BatchCommonSubGraphBasedOptimizer.scala:45)
        at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:87)
        at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:324)
        at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:182)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1277)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:1055)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1120)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
        at org.apache.gravitino.flink.connector.integration.test.FlinkEnvIT.sql(FlinkEnvIT.java:159)
        at org.apache.gravitino.flink.connector.integration.test.hive.FlinkHiveCatalogIT.lambda$testHivePartitionTable$3(FlinkHiveCatalogIT.java:360)
        at org.apache.gravitino.flink.connector.integration.test.FlinkEnvIT.doWithSchema(FlinkEnvIT.java:175)
        at org.apache.gravitino.flink.connector.integration.test.hive.FlinkHiveCatalogIT.testHivePartitionTable(FlinkHiveCatalogIT.java:307)


* properties to store the partition transform, so we can implement this interface to achieve more
* partition transform.
*/
public abstract class PartitionConverter {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interface?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FANNG1 I think the abstract class is OK. Other classes won't implement it. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interface is more proper here to me

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@coolderli
Copy link
Contributor Author

@FANNG1 I removed the getFunction and kept getXXStatistic. Please take a look. Thanks.

@coolderli
Copy link
Contributor Author

getFunction is also needed.

FlinkHiveCatalogIT > testHivePartitionTable() FAILED
    org.apache.flink.table.api.ValidationException: SQL validation failed. null
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:200)
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117)
        at org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:261)
        at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:728)
        at org.apache.gravitino.flink.connector.integration.test.FlinkEnvIT.sql(FlinkEnvIT.java:159)
        at org.apache.gravitino.flink.connector.integration.test.hive.FlinkHiveCatalogIT.lambda$testHivePartitionTable$3(FlinkHiveCatalogIT.java:360)
        at org.apache.gravitino.flink.connector.integration.test.FlinkEnvIT.doWithSchema(FlinkEnvIT.java:175)
        at org.apache.gravitino.flink.connector.integration.test.hive.FlinkHiveCatalogIT.testHivePartitionTable(FlinkHiveCatalogIT.java:307)

        Caused by:
        java.lang.UnsupportedOperationException
            at org.apache.gravitino.flink.connector.catalog.BaseCatalog.getFunction(BaseCatalog.java:358)
            at org.apache.flink.table.catalog.FunctionCatalog.resolvePreciseFunctionReference(FunctionCatalog.java:617)
            at org.apache.flink.table.catalog.FunctionCatalog.resolvePreciseFunctionReference(FunctionCatalog.java:593)
            at org.apache.flink.table.catalog.FunctionCatalog.lambda$resolveAmbiguousFunctionReference$5(FunctionCatalog.java:671)
            at java.util.Optional.orElseGet(Optional.java:267)
            at org.apache.flink.table.catalog.FunctionCatalog.resolveAmbiguousFunctionReference(FunctionCatalog.java:671)
            at org.apache.flink.table.catalog.FunctionCatalog.lookupFunction(FunctionCatalog.java:424)
            at org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:99)
            at org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:69)
            at org.apache.calcite.sql.validate.SqlValidatorImpl.makeNullaryCall(SqlValidatorImpl.java:1829)
            at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:6430)
            at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.visit(SqlValidatorImpl.java:6414)
            at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:324)
            at org.apache.calcite.sql.validate.SqlValidatorImpl$Expander.go(SqlValidatorImpl.java:6423)
            at org.apache.calcite.sql.validate.SqlValidatorImpl.expand(SqlValidatorImpl.java:6004)
            at org.apache.calcite.sql.validate.SqlValidatorImpl.validateOrderList(SqlValidatorImpl.java:4150)
            at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3621)
            at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:64)
            at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:89)
            at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1050)
            at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1025)
            at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:248)
            at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1000)
            at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:749)
            at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196)

@FANNG1 FANNG1 merged commit 529429e into apache:main Jul 22, 2024
@FANNG1
Copy link
Contributor

FANNG1 commented Jul 22, 2024

@coolderli , merged to main, thanks for your contribution!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Subtask][flink-connector] Support partition for hive table operation

2 participants