Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6908] [SQL] Use isolated Hive client #5876

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 0 additions & 23 deletions dev/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -142,29 +142,6 @@ CURRENT_BLOCK=$BLOCK_BUILD

{
HIVE_BUILD_ARGS="$SBT_MAVEN_PROFILES_ARGS -Phive -Phive-thriftserver"
HIVE_12_BUILD_ARGS="$HIVE_BUILD_ARGS -Phive-0.12.0"

# First build with Hive 0.12.0 to ensure patches do not break the Hive 0.12.0 build
echo "[info] Compile with Hive 0.12.0"
[ -d "lib_managed" ] && rm -rf lib_managed
echo "[info] Building Spark with these arguments: $HIVE_12_BUILD_ARGS"

if [ "${AMPLAB_JENKINS_BUILD_TOOL}" == "maven" ]; then
build/mvn $HIVE_12_BUILD_ARGS clean package -DskipTests
else
# NOTE: echo "q" is needed because sbt on encountering a build file with failure
# (either resolution or compilation) prompts the user for input either q, r, etc
# to quit or retry. This echo is there to make it not block.
# NOTE: Do not quote $BUILD_MVN_PROFILE_ARGS or else it will be interpreted as a
# single argument!
# QUESTION: Why doesn't 'yes "q"' work?
# QUESTION: Why doesn't 'grep -v -e "^\[info\] Resolving"' work?
echo -e "q\n" \
| build/sbt $HIVE_12_BUILD_ARGS clean hive/compile hive-thriftserver/compile \
| grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
fi

# Then build with default Hive version (0.13.1) because tests are based on this version
echo "[info] Compile with Hive 0.13.1"
[ -d "lib_managed" ] && rm -rf lib_managed
echo "[info] Building Spark with these arguments: $HIVE_BUILD_ARGS"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,16 +148,6 @@ case class InsertIntoTable(
}
}

case class CreateTableAsSelect[T](
databaseName: Option[String],
tableName: String,
child: LogicalPlan,
allowExisting: Boolean,
desc: Option[T] = None) extends UnaryNode {
override def output: Seq[Attribute] = Seq.empty[Attribute]
override lazy val resolved: Boolean = databaseName != None && childrenResolved
}

/**
* A container for holding named common table expressions (CTEs) and a query plan.
* This operator will be removed during analysis and the relations will be substituted into child.
Expand All @@ -177,10 +167,10 @@ case class WriteToFile(
}

/**
* @param order The ordering expressions
* @param global True means global sorting apply for entire data set,
* @param order The ordering expressions
* @param global True means global sorting apply for entire data set,
* False means sorting only apply within the partition.
* @param child Child logical plan
* @param child Child logical plan
*/
case class Sort(
order: Seq[SortOrder],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute

/**
* A logical node that represents a non-query command to be executed by the system. For example,
* commands can be used by parsers to represent DDL operations.
* commands can be used by parsers to represent DDL operations. Commands, unlike queries, are
* eagerly executed.
*/
abstract class Command extends LeafNode {
self: Product =>
def output: Seq[Attribute] = Seq.empty
}
trait Command
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,15 @@

package org.apache.spark.sql.catalyst

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.logical.Command
import org.scalatest.FunSuite

private[sql] case class TestCommand(cmd: String) extends Command
private[sql] case class TestCommand(cmd: String) extends LogicalPlan with Command {
override def output: Seq[Attribute] = Seq.empty
override def children: Seq[LogicalPlan] = Seq.empty
}

private[sql] class SuperLongKeywordTestParser extends AbstractSparkSQLParser {
protected val EXECUTE = Keyword("THISISASUPERLONGKEYWORDTEST")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ class DataFrame private[sql](
// happen right away to let these side effects take place eagerly.
case _: Command |
_: InsertIntoTable |
_: CreateTableAsSelect[_] |
_: CreateTableUsingAsSelect |
_: WriteToFile =>
LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
Expand Down
11 changes: 8 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ import org.apache.spark.{Partition, SparkContext}
* spark-sql> SELECT * FROM src LIMIT 1;
*
*-- Exception will be thrown and switch to dialect
*-- "sql" (for SQLContext) or
*-- "sql" (for SQLContext) or
*-- "hiveql" (for HiveContext)
* }}}
*/
Expand Down Expand Up @@ -107,7 +107,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* @return Spark SQL configuration
*/
protected[sql] def conf = tlSession.get().conf
protected[sql] def conf = currentSession().conf

/**
* Set Spark SQL configuration properties.
Expand Down Expand Up @@ -1189,13 +1189,17 @@ class SQLContext(@transient val sparkContext: SparkContext)
|${stringOrError(executedPlan)}
""".stripMargin.trim

override def toString: String =
override def toString: String = {
def output =
analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ")

// TODO previously will output RDD details by run (${stringOrError(toRdd.toDebugString)})
// however, the `toRdd` will cause the real execution, which is not what we want.
// We need to think about how to avoid the side effect.
s"""== Parsed Logical Plan ==
|${stringOrError(logical)}
|== Analyzed Logical Plan ==
|${stringOrError(output)}
|${stringOrError(analyzed)}
|== Optimized Logical Plan ==
|${stringOrError(optimizedPlan)}
Expand All @@ -1204,6 +1208,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
|Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
|== RDD ==
""".stripMargin.trim
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext}
* A logical command that is executed for its side-effects. `RunnableCommand`s are
* wrapped in `ExecutedCommand` during execution.
*/
trait RunnableCommand extends logical.Command {
trait RunnableCommand extends LogicalPlan with logical.Command {
self: Product =>

override def output: Seq[Attribute] = Seq.empty
override def children: Seq[LogicalPlan] = Seq.empty
def run(sqlContext: SQLContext): Seq[Row]
}

Expand Down
16 changes: 11 additions & 5 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,10 @@ private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRel
*/
private[sql] case class DescribeCommand(
table: LogicalPlan,
isExtended: Boolean) extends Command {
override val output = Seq(
isExtended: Boolean) extends LogicalPlan with Command {

override def children: Seq[LogicalPlan] = Seq.empty
override val output: Seq[Attribute] = Seq(
// Column names are based on Hive.
AttributeReference("col_name", StringType, nullable = false,
new MetadataBuilder().putString("comment", "name of the column").build())(),
Expand All @@ -292,7 +294,11 @@ private[sql] case class CreateTableUsing(
temporary: Boolean,
options: Map[String, String],
allowExisting: Boolean,
managedIfNoPath: Boolean) extends Command
managedIfNoPath: Boolean) extends LogicalPlan with Command {

override def output: Seq[Attribute] = Seq.empty
override def children: Seq[LogicalPlan] = Seq.empty
}

/**
* A node used to support CTAS statements and saveAsTable for the data source API.
Expand All @@ -318,7 +324,7 @@ private[sql] case class CreateTempTableUsing(
provider: String,
options: Map[String, String]) extends RunnableCommand {

def run(sqlContext: SQLContext): Seq[Row] = {
override def run(sqlContext: SQLContext): Seq[Row] = {
val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options)
sqlContext.registerDataFrameAsTable(
DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
Expand All @@ -333,7 +339,7 @@ private[sql] case class CreateTempTableUsingAsSelect(
options: Map[String, String],
query: LogicalPlan) extends RunnableCommand {

def run(sqlContext: SQLContext): Seq[Row] = {
override def run(sqlContext: SQLContext): Seq[Row] = {
val df = DataFrame(sqlContext, query)
val resolved = ResolvedDataSource(sqlContext, provider, mode, options, df)
sqlContext.registerDataFrameAsTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,16 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {

// It has a bug and it has been fixed by
// https://issues.apache.org/jira/browse/HIVE-7673 (in Hive 0.14 and trunk).
"input46"
"input46",

"combine1", // BROKEN

"part_inherit_tbl_props", // BROKEN
"part_inherit_tbl_props_with_star", // BROKEN

"nullformatCTAS", // NEED TO FINISH CTAS parser

"load_dyn_part14.*" // These work along but fail when run with other tests...
) ++ HiveShim.compatibilityBlackList

/**
Expand Down
Loading