Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HIVE-24404: Invoke getUserName before getMSC to avoid metastore client connection been closed and retry #1685

Closed
wants to merge 1 commit into from

Conversation

artiship
Copy link

@artiship artiship commented Nov 18, 2020

What changes were proposed in this pull request?

Modify Hive.java to make sure invoke the getUserName method before getMSC method.

Why are the changes needed?

Using spark sql drop partition will always throws exception.

hive version: 2.3.7
spark version: 3.0.1

sql:

alter table mydb.some_table drop if exists partition(dt = '2020-11-12',hh = '17');

exception:

20/11/12 19:37:57 WARN SessionState: METASTORE_FILTER_HOOK will be ignored, since hive.security.authorization.manager is set to instance of HiveAuthorizerFactory.
20/11/12 19:37:57 WARN RetryingMetaStoreClient: MetaStoreClient lost connection. Attempting to reconnect (1 of 1) after 1s. listPartitionsWithAuthInfo
org.apache.thrift.transport.TTransportException: Cannot write to null outputStream
	at org.apache.thrift.transport.TIOStreamTransport.write(TIOStreamTransport.java:142)
	at org.apache.thrift.protocol.TBinaryProtocol.writeI32(TBinaryProtocol.java:185)
	at org.apache.thrift.protocol.TBinaryProtocol.writeMessageBegin(TBinaryProtocol.java:116)
	at org.apache.thrift.TServiceClient.sendBase(TServiceClient.java:70)
	at org.apache.thrift.TServiceClient.sendBase(TServiceClient.java:62)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.send_get_partitions_ps_with_auth(ThriftHiveMetastore.java:2562)
	at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_partitions_ps_with_auth(ThriftHiveMetastore.java:2549)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.listPartitionsWithAuthInfo(HiveMetaStoreClient.java:1209)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:173)
	at com.sun.proxy.$Proxy32.listPartitionsWithAuthInfo(Unknown Source)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.hadoop.hive.metastore.HiveMetaStoreClient$SynchronizedHandler.invoke(HiveMetaStoreClient.java:2336)
	at com.sun.proxy.$Proxy32.listPartitionsWithAuthInfo(Unknown Source)
	at org.apache.hadoop.hive.ql.metadata.Hive.getPartitions(Hive.java:2555)
	at org.apache.hadoop.hive.ql.metadata.Hive.getPartitions(Hive.java:2581)
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$dropPartitions$2(HiveClientImpl.scala:628)
	at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:245)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at scala.collection.TraversableLike.flatMap(TraversableLike.scala:245)
	at scala.collection.TraversableLike.flatMap$(TraversableLike.scala:242)
	at scala.collection.AbstractTraversable.flatMap(Traversable.scala:108)
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$dropPartitions$1(HiveClientImpl.scala:622)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$withHiveState$1(HiveClientImpl.scala:294)
	at org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:227)
	at org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:226)
	at org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:276)
	at org.apache.spark.sql.hive.client.HiveClientImpl.dropPartitions(HiveClientImpl.scala:617)
	at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$dropPartitions$1(HiveExternalCatalog.scala:1018)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:103)
	at org.apache.spark.sql.hive.HiveExternalCatalog.dropPartitions(HiveExternalCatalog.scala:1015)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.dropPartitions(ExternalCatalogWithListener.scala:211)
	at org.apache.spark.sql.catalyst.catalog.SessionCatalog.dropPartitions(SessionCatalog.scala:988)
	at org.apache.spark.sql.execution.command.AlterTableDropPartitionCommand.run(ddl.scala:581)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
	at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:229)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:607)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:602)
	at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:64)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:377)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:496)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:490)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:490)
	at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:336)
	at org.apache.hadoop.hive.cli.CliDriver.processReader(CliDriver.java:474)
	at org.apache.hadoop.hive.cli.CliDriver.processFile(CliDriver.java:490)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:208)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Response code
Time taken: 4.192 seconds

The problem raised from the getPartitions method, where the metastore client that getMSC get will later been closed by the getUserName method. The getUserName method will close the current client during setAuth and result in the underlying thrift transport been closed.

public List<Partition> getPartitions(Table tbl, Map<String, String> partialPartSpec,
    short limit)
throws HiveException {
  if (!tbl.isPartitioned()) {
    throw new HiveException(ErrorMsg.TABLE_NOT_PARTITIONED, tbl.getTableName());
  }

  List<String> partialPvals = MetaStoreUtils.getPvals(tbl.getPartCols(), partialPartSpec);

  List<org.apache.hadoop.hive.metastore.api.Partition> partitions = null;
  try {
    partitions = getMSC().listPartitionsWithAuthInfo(tbl.getDbName(), tbl.getTableName(),
        partialPvals, limit, getUserName(), getGroupNames());
  } catch (Exception e) {
    throw new HiveException(e);
  }

  List<Partition> qlPartitions = new ArrayList<Partition>();
  for (org.apache.hadoop.hive.metastore.api.Partition p : partitions) {
    qlPartitions.add( new Partition(tbl, p));
  }

  return qlPartitions;
}

This pr is simple that just moving calling the getUserName ahead before getMSC for preventing connection been close.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

I tested it manually by using the reproduce steps provided. And it will never throw the connection lost exception again.

@artiship
Copy link
Author

@kgyrtkirk The failed tests seems to have no relationship with my modification.

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.
Feel free to reach out on the dev@hive.apache.org list if the patch is in need of reviews.

@abstractdog
Copy link
Contributor

abstractdog commented Dec 16, 2022

reopened this PR, @artiship can you please rebase your patch on the latest master to get some test results?
this fix solved the same issue with spark 3.3 and a newer hive version (Cloudera internal version), it really makes sense
is there a chance you can add a unit test to e.g. TestHive.java that validates the fix? (it might be complicated as it needs hacking/mocking with the underlying connection but it's worth a try)

@abstractdog
Copy link
Contributor

started a PR against master: #3883

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.
Feel free to reach out on the dev@hive.apache.org list if the patch is in need of reviews.

@github-actions github-actions bot added the stale label Feb 19, 2023
@artiship
Copy link
Author

@abstractdog just noticed that you've made this fix merged. thanks

@artiship artiship closed this Feb 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants