Skip to content

Commit

Permalink
[SPARK-28293][SQL] Implement Spark's own GetTableTypesOperation
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

The table type is from Hive now. This will have some issues. For example, we don't support `index_table`, different Hive supports different table types:
Build with Hive 1.2.1:
![image](https://user-images.githubusercontent.com/5399861/60792689-be38b880-a198-11e9-82b8-868992a505e3.png)
Build with Hive 2.3.5:
![image](https://user-images.githubusercontent.com/5399861/60792727-d4467900-a198-11e9-952c-210bb7bb3bed.png)

This pr implement Spark's own `GetTableTypesOperation`.

## How was this patch tested?

unit tests and manual tests:
![image](https://user-images.githubusercontent.com/5399861/60793368-2a67ec00-a19a-11e9-9511-c67483dcc370.png)

Closes #25073 from wangyum/SPARK-28293.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
  • Loading branch information
wangyum authored and gatorsmile committed Jul 24, 2019
1 parent 167fa04 commit 045191e
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 13 deletions.
Expand Up @@ -561,6 +561,8 @@ object CatalogTableType {
val EXTERNAL = new CatalogTableType("EXTERNAL")
val MANAGED = new CatalogTableType("MANAGED")
val VIEW = new CatalogTableType("VIEW")

val tableTypes = Seq(EXTERNAL, MANAGED, VIEW)
}


Expand Down
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.thriftserver

import java.util.UUID

import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.GetTableTypesOperation
import org.apache.hive.service.cli.session.HiveSession

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.util.{Utils => SparkUtils}

/**
* Spark's own GetTableTypesOperation
*
* @param sqlContext SQLContext to use
* @param parentSession a HiveSession from SessionManager
*/
private[hive] class SparkGetTableTypesOperation(
sqlContext: SQLContext,
parentSession: HiveSession)
extends GetTableTypesOperation(parentSession) with SparkMetadataOperationUtils with Logging {

private var statementId: String = _

override def close(): Unit = {
super.close()
HiveThriftServer2.listener.onOperationClosed(statementId)
}

override def runInternal(): Unit = {
statementId = UUID.randomUUID().toString
val logMsg = "Listing table types"
logInfo(s"$logMsg with $statementId")
setState(OperationState.RUNNING)
// Always use the latest class loader provided by executionHive's state.
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)

if (isAuthV2Enabled) {
authorizeMetaGets(HiveOperationType.GET_TABLETYPES, null)
}

HiveThriftServer2.listener.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
logMsg,
statementId,
parentSession.getUsername)

try {
val tableTypes = CatalogTableType.tableTypes.map(tableTypeString).toSet
tableTypes.foreach { tableType =>
rowSet.addRow(Array[AnyRef](tableType))
}
setState(OperationState.FINISHED)
} catch {
case e: HiveSQLException =>
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw e
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}
}
Expand Up @@ -53,7 +53,7 @@ private[hive] class SparkGetTablesOperation(
tableName: String,
tableTypes: JList[String])
extends GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes)
with Logging{
with SparkMetadataOperationUtils with Logging {

private var statementId: String = _

Expand Down Expand Up @@ -146,11 +146,4 @@ private[hive] class SparkGetTablesOperation(
rowSet.addRow(rowData)
}
}

private def tableTypeString(tableType: CatalogTableType): String = tableType match {
case EXTERNAL | MANAGED => "TABLE"
case VIEW => "VIEW"
case t =>
throw new IllegalArgumentException(s"Unknown table type is found at showCreateHiveTable: $t")
}
}
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.thriftserver

import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.catalog.CatalogTableType.{EXTERNAL, MANAGED, VIEW}

/**
* Utils for metadata operations.
*/
private[hive] trait SparkMetadataOperationUtils {

def tableTypeString(tableType: CatalogTableType): String = tableType match {
case EXTERNAL | MANAGED => "TABLE"
case VIEW => "VIEW"
case t =>
throw new IllegalArgumentException(s"Unknown table type is found: $t")
}
}
Expand Up @@ -21,13 +21,13 @@ import java.util.{List => JList, Map => JMap}
import java.util.concurrent.ConcurrentHashMap

import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, GetColumnsOperation, GetSchemasOperation, MetadataOperation, Operation, OperationManager}
import org.apache.hive.service.cli.operation._
import org.apache.hive.service.cli.session.HiveSession

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.thriftserver.{ReflectionUtils, SparkExecuteStatementOperation, SparkGetColumnsOperation, SparkGetSchemasOperation, SparkGetTablesOperation}
import org.apache.spark.sql.hive.thriftserver._
import org.apache.spark.sql.internal.SQLConf

/**
Expand Down Expand Up @@ -100,14 +100,25 @@ private[thriftserver] class SparkSQLOperationManager()
columnName: String): GetColumnsOperation = synchronized {
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
s" initialized or had already closed.")
" initialized or had already closed.")
val operation = new SparkGetColumnsOperation(sqlContext, parentSession,
catalogName, schemaName, tableName, columnName)
handleToOperation.put(operation.getHandle, operation)
logDebug(s"Created GetColumnsOperation with session=$parentSession.")
operation
}

override def newGetTableTypesOperation(
parentSession: HiveSession): GetTableTypesOperation = synchronized {
val sqlContext = sessionToContexts.get(parentSession.getSessionHandle)
require(sqlContext != null, s"Session handle: ${parentSession.getSessionHandle} has not been" +
" initialized or had already closed.")
val operation = new SparkGetTableTypesOperation(sqlContext, parentSession)
handleToOperation.put(operation.getHandle, operation)
logDebug(s"Created GetTableTypesOperation with session=$parentSession.")
operation
}

def setConfMap(conf: SQLConf, confMap: java.util.Map[String, String]): Unit = {
val iterator = confMap.entrySet().iterator()
while (iterator.hasNext) {
Expand Down
Expand Up @@ -166,4 +166,20 @@ class SparkMetadataOperationSuite extends HiveThriftJdbcTest {
checkResult(metaData.getColumns(null, "%", "table_not_exist", null), Seq.empty)
}
}

test("Spark's own GetTableTypesOperation(SparkGetTableTypesOperation)") {
def checkResult(rs: ResultSet, tableTypes: Seq[String]): Unit = {
for (i <- tableTypes.indices) {
assert(rs.next())
assert(rs.getString("TABLE_TYPE") === tableTypes(i))
}
// Make sure there are no more elements
assert(!rs.next())
}

withJdbcStatement() { statement =>
val metaData = statement.getConnection.getMetaData
checkResult(metaData.getTableTypes, Seq("TABLE", "VIEW"))
}
}
}
Expand Up @@ -39,7 +39,7 @@ public class GetTableTypesOperation extends MetadataOperation {
protected static TableSchema RESULT_SET_SCHEMA = new TableSchema()
.addStringColumn("TABLE_TYPE", "Table type name.");

private final RowSet rowSet;
protected final RowSet rowSet;
private final TableTypeMapping tableTypeMapping;

protected GetTableTypesOperation(HiveSession parentSession) {
Expand Down
Expand Up @@ -39,7 +39,7 @@ public class GetTableTypesOperation extends MetadataOperation {
protected static TableSchema RESULT_SET_SCHEMA = new TableSchema()
.addStringColumn("TABLE_TYPE", "Table type name.");

private final RowSet rowSet;
protected final RowSet rowSet;
private final TableTypeMapping tableTypeMapping;

protected GetTableTypesOperation(HiveSession parentSession) {
Expand Down

0 comments on commit 045191e

Please sign in to comment.