Hive: Fix RetryingMetaStoreClient for Hive 2.1#3403
Conversation
|
Got a little greedy to use just one version of the API across Hive 1,2,3. Seems in Hive1 (Spark 2 tests), HiveMetaHookLoader cannot be null, so put back the original Hive1 API call |
| try { | ||
| try { | ||
| return GET_CLIENT.invoke(hiveConf, true); | ||
| return GET_CLIENT.invoke(hiveConf, null, HiveMetaStoreClient.class.getName()); |
There was a problem hiding this comment.
pass null as HiveMetaHookLoader will cause another exception.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Unable to create a sink for writing table 'default_catalog.default_database.ods_mysql_cdc_hive_exec_job'.
Table options are:
...
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:364) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:213) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:818) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:248) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1060) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1138) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_112]
at javax.security.auth.Subject.doAs(Subject.java:422) [?:1.8.0_112]
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754) [flink-shaded-hadoop-2-uber-2.7.5-7.0.jar:2.7.5-7.0]
at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) [flink-dist_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1138) [flink-dist_2.12-1.12.1.jar:1.12.1]
Caused by: org.apache.flink.table.api.ValidationException: Unable to create a sink for writing table 'default_catalog.default_database.ods_mysql_cdc_hive_exec_job'.
Table options are:
...
at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:166) ~[flink-table_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:368) ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:220) ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:164) ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.Iterator.foreach(Iterator.scala:937) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.Iterator.foreach$(Iterator.scala:937) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.IterableLike.foreach(IterableLike.scala:70) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.IterableLike.foreach$(IterableLike.scala:69) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.TraversableLike.map(TraversableLike.scala:233) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.TraversableLike.map$(TraversableLike.scala:226) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1267) ~[flink-table_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:675) ~[flink-table_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:759) ~[flink-table_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:665) ~[flink-table_2.12-1.12.0.jar:1.12.0]
at com.huya.dc.walrus.lakehouse.flink.sql.FlinkSQLSubmitter.executeSQL(FlinkSQLSubmitter.java:156) ~[?:?]
at com.huya.dc.walrus.lakehouse.flink.sql.FlinkSQLSubmitter.main(FlinkSQLSubmitter.java:112) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_112]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_112]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_112]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_112]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:347) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
... 11 more
Caused by: org.apache.iceberg.hive.RuntimeMetaException: Failed to connect to Hive Metastore
at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:73) ~[?:?]
at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:35) ~[?:?]
at org.apache.iceberg.ClientPoolImpl.get(ClientPoolImpl.java:125) ~[?:?]
at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:56) ~[?:?]
at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51) ~[?:?]
at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:76) ~[?:?]
at org.apache.iceberg.hive.HiveCatalog.loadNamespaceMetadata(HiveCatalog.java:386) ~[?:?]
at org.apache.iceberg.flink.FlinkCatalog.getDatabase(FlinkCatalog.java:173) ~[?:?]
at org.apache.iceberg.flink.FlinkCatalog.databaseExists(FlinkCatalog.java:185) ~[?:?]
at org.apache.iceberg.flink.FlinkDynamicTableFactory.createTableLoader(FlinkDynamicTableFactory.java:163) ~[?:?]
at org.apache.iceberg.flink.FlinkDynamicTableFactory.createDynamicTableSink(FlinkDynamicTableFactory.java:112) ~[?:?]
at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:163) ~[flink-table_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:368) ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:220) ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:164) ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.Iterator.foreach(Iterator.scala:937) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.Iterator.foreach$(Iterator.scala:937) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.IterableLike.foreach(IterableLike.scala:70) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.IterableLike.foreach$(IterableLike.scala:69) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.TraversableLike.map(TraversableLike.scala:233) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.TraversableLike.map$(TraversableLike.scala:226) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1267) ~[flink-table_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:675) ~[flink-table_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:759) ~[flink-table_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:665) ~[flink-table_2.12-1.12.0.jar:1.12.0]
at com.huya.dc.walrus.lakehouse.flink.sql.FlinkSQLSubmitter.executeSQL(FlinkSQLSubmitter.java:156) ~[?:?]
at com.huya.dc.walrus.lakehouse.flink.sql.FlinkSQLSubmitter.main(FlinkSQLSubmitter.java:112) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_112]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_112]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_112]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_112]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:347) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
... 11 more
Caused by: java.lang.IllegalArgumentException: Object : null is not an instance of interface org.apache.hadoop.hive.metastore.HiveMetaHookLoader
at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1644) ~[?:?]
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:83) ~[?:?]
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:133) ~[?:?]
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104) ~[?:?]
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:97) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_112]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_112]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_112]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_112]
at org.apache.iceberg.common.DynMethods$UnboundMethod.invokeChecked(DynMethods.java:65) ~[?:?]
at org.apache.iceberg.common.DynMethods$UnboundMethod.invoke(DynMethods.java:77) ~[?:?]
at org.apache.iceberg.common.DynMethods$StaticMethod.invoke(DynMethods.java:196) ~[?:?]
at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:56) ~[?:?]
at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:35) ~[?:?]
at org.apache.iceberg.ClientPoolImpl.get(ClientPoolImpl.java:125) ~[?:?]
at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:56) ~[?:?]
at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:51) ~[?:?]
at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:76) ~[?:?]
at org.apache.iceberg.hive.HiveCatalog.loadNamespaceMetadata(HiveCatalog.java:386) ~[?:?]
at org.apache.iceberg.flink.FlinkCatalog.getDatabase(FlinkCatalog.java:173) ~[?:?]
at org.apache.iceberg.flink.FlinkCatalog.databaseExists(FlinkCatalog.java:185) ~[?:?]
at org.apache.iceberg.flink.FlinkDynamicTableFactory.createTableLoader(FlinkDynamicTableFactory.java:163) ~[?:?]
at org.apache.iceberg.flink.FlinkDynamicTableFactory.createDynamicTableSink(FlinkDynamicTableFactory.java:112) ~[?:?]
at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:163) ~[flink-table_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:368) ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:220) ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.table.planner.delegation.PlannerBase.$anonfun$translate$1(PlannerBase.scala:164) ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.Iterator.foreach(Iterator.scala:937) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.Iterator.foreach$(Iterator.scala:937) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.IterableLike.foreach(IterableLike.scala:70) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.IterableLike.foreach$(IterableLike.scala:69) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.AbstractIterable.foreach(Iterable.scala:54) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.TraversableLike.map(TraversableLike.scala:233) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.TraversableLike.map$(TraversableLike.scala:226) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at scala.collection.AbstractTraversable.map(Traversable.scala:104) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) ~[flink-table-blink_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1267) ~[flink-table_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:675) ~[flink-table_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:759) ~[flink-table_2.12-1.12.0.jar:1.12.0]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:665) ~[flink-table_2.12-1.12.0.jar:1.12.0]
at com.huya.dc.walrus.lakehouse.flink.sql.FlinkSQLSubmitter.executeSQL(FlinkSQLSubmitter.java:156) ~[?:?]
at com.huya.dc.walrus.lakehouse.flink.sql.FlinkSQLSubmitter.main(FlinkSQLSubmitter.java:112) ~[?:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_112]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_112]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_112]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_112]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:347) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
... 11 more
There was a problem hiding this comment.
I use DynMethods.builder("getProxy").impl(RetryingMetaStoreClient.class, HiveConf.class, Class[].class, Object[].class, String.class).buildStatic() can work on hms 2.1.1. But I am not sure this api will work on other version, I hope that can help.
There was a problem hiding this comment.
What version of Hive does it use? Also did you try with my latest commit here? I also saw that it failed this way in my first attempt on Hive 1.x branch, and fixed in my second attempt by making it prefer another method that does not pass null.
There was a problem hiding this comment.
Yes, I test it use the latest commit, but still fail. Our Hive version is 2.1.1 too.
|
Trying to use a non-null HiveMetaHookLoader (not sure if that HiveMetaHookLoader changed between Hive versions) |
|
Looks like it passes for at least the tested Hive 1 and 2 versions in Iceberg test environment. @Reo-LEI do you want to try this one? |
|
Thanks @szehon-ho , I will try this in today or tomorrow. |
|
@szehon-ho, is this good to go? |
| try { | ||
| try { | ||
| return GET_CLIENT.invoke(hiveConf, true); | ||
| return GET_CLIENT.invoke(hiveConf, (HiveMetaHookLoader) tbl -> null, HiveMetaStoreClient.class.getName()); |
There was a problem hiding this comment.
I test this in hive 2.1.1 and that is work, I think this approach is ok.
|
Thanks, @szehon-ho for fixing this. And thanks @Reo-LEI for testing! |
Fixes : #3363, RetryingMetaStoreClient broken for Hive 2.1.
The root cause is this commit: https://issues.apache.org/jira/browse/HIVE-12918, fixed in Hive 2.3 (the version we build against) by https://issues.apache.org/jira/browse/HIVE-15081
Change to use this more stable method in RetryingMetaStoreClient: https://github.com/apache/hive/blob/rel/release-2.3.8/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java#L95.
I'm usually hesitant to use @VisibleForTesting methods, but as @sunchao pointed out offline to me, for Hive it means the method is at least tested :), unlike the non marked one.