Skip to content

Commit

Permalink
use sparkPlan for checking
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Jan 15, 2016
1 parent 908c8cb commit 1feab20
Show file tree
Hide file tree
Showing 16 changed files with 34 additions and 34 deletions.
Expand Up @@ -36,12 +36,12 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
import testImplicits._

def rddIdOf(tableName: String): Int = {
val executedPlan = sqlContext.table(tableName).queryExecution.executedPlan
executedPlan.collect {
val plan = sqlContext.table(tableName).queryExecution.sparkPlan
plan.collect {
case InMemoryColumnarTableScan(_, _, relation) =>
relation.cachedColumnBuffers.id
case _ =>
fail(s"Table $tableName is not cached\n" + executedPlan)
fail(s"Table $tableName is not cached\n" + plan)
}.head
}

Expand Down
Expand Up @@ -123,15 +123,15 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext {
val df2 = Seq((1, "1"), (2, "2")).toDF("key", "value")

// equijoin - should be converted into broadcast join
val plan1 = df1.join(broadcast(df2), "key").queryExecution.executedPlan
val plan1 = df1.join(broadcast(df2), "key").queryExecution.sparkPlan
assert(plan1.collect { case p: BroadcastHashJoin => p }.size === 1)

// no join key -- should not be a broadcast join
val plan2 = df1.join(broadcast(df2)).queryExecution.executedPlan
val plan2 = df1.join(broadcast(df2)).queryExecution.sparkPlan
assert(plan2.collect { case p: BroadcastHashJoin => p }.size === 0)

// planner should not crash without a join
broadcast(df1).queryExecution.executedPlan
broadcast(df1).queryExecution.sparkPlan

// SPARK-12275: no physical plan for BroadcastHint in some condition
withTempPath { path =>
Expand Down
Expand Up @@ -246,7 +246,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
private def testCodeGen(sqlText: String, expectedResults: Seq[Row]): Unit = {
val df = sql(sqlText)
// First, check if we have GeneratedAggregate.
val hasGeneratedAgg = df.queryExecution.executedPlan
val hasGeneratedAgg = df.queryExecution.sparkPlan
.collect { case _: aggregate.TungstenAggregate => true }
.nonEmpty
if (!hasGeneratedAgg) {
Expand Down Expand Up @@ -791,11 +791,11 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {

test("SPARK-11111 null-safe join should not use cartesian product") {
val df = sql("select count(*) from testData a join testData b on (a.key <=> b.key)")
val cp = df.queryExecution.executedPlan.collect {
val cp = df.queryExecution.sparkPlan.collect {
case cp: CartesianProduct => cp
}
assert(cp.isEmpty, "should not use CartesianProduct for null-safe join")
val smj = df.queryExecution.executedPlan.collect {
val smj = df.queryExecution.sparkPlan.collect {
case smj: SortMergeJoin => smj
}
assert(smj.size > 0, "should use SortMergeJoin")
Expand Down
Expand Up @@ -94,7 +94,7 @@ class PlannerSuite extends SharedSQLContext {
"""
|SELECT l.a, l.b
|FROM testData2 l JOIN (SELECT * FROM testLimit LIMIT 1) r ON (l.a = r.key)
""".stripMargin).queryExecution.executedPlan
""".stripMargin).queryExecution.sparkPlan

val broadcastHashJoins = planned.collect { case join: BroadcastHashJoin => join }
val sortMergeJoins = planned.collect { case join: SortMergeJoin => join }
Expand Down Expand Up @@ -147,7 +147,7 @@ class PlannerSuite extends SharedSQLContext {

val a = testData.as("a")
val b = sqlContext.table("tiny").as("b")
val planned = a.join(b, $"a.key" === $"b.key").queryExecution.executedPlan
val planned = a.join(b, $"a.key" === $"b.key").queryExecution.sparkPlan

val broadcastHashJoins = planned.collect { case join: BroadcastHashJoin => join }
val sortMergeJoins = planned.collect { case join: SortMergeJoin => join }
Expand All @@ -168,7 +168,7 @@ class PlannerSuite extends SharedSQLContext {
sqlContext.registerDataFrameAsTable(df, "testPushed")

withTempTable("testPushed") {
val exp = sql("select * from testPushed where key = 15").queryExecution.executedPlan
val exp = sql("select * from testPushed where key = 15").queryExecution.sparkPlan
assert(exp.toString.contains("PushedFilters: [EqualTo(key,15)]"))
}
}
Expand Down
Expand Up @@ -31,7 +31,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
setupTestData()

test("simple columnar query") {
val plan = sqlContext.executePlan(testData.logicalPlan).executedPlan
val plan = sqlContext.executePlan(testData.logicalPlan).sparkPlan
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)

checkAnswer(scan, testData.collect().toSeq)
Expand All @@ -48,7 +48,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
}

test("projection") {
val plan = sqlContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
val plan = sqlContext.executePlan(testData.select('value, 'key).logicalPlan).sparkPlan
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)

checkAnswer(scan, testData.collect().map {
Expand All @@ -57,7 +57,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
}

test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
val plan = sqlContext.executePlan(testData.logicalPlan).executedPlan
val plan = sqlContext.executePlan(testData.logicalPlan).sparkPlan
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)

checkAnswer(scan, testData.collect().toSeq)
Expand Down
Expand Up @@ -114,7 +114,7 @@ class PartitionBatchPruningSuite extends SparkFunSuite with SharedSQLContext {
df.collect().map(_(0)).toArray
}

val (readPartitions, readBatches) = df.queryExecution.executedPlan.collect {
val (readPartitions, readBatches) = df.queryExecution.sparkPlan.collect {
case in: InMemoryColumnarTableScan => (in.readPartitions.value, in.readBatches.value)
}.head

Expand Down
Expand Up @@ -62,7 +62,7 @@ class BroadcastJoinSuite extends QueryTest with BeforeAndAfterAll {
// Comparison at the end is for broadcast left semi join
val joinExpression = df1("key") === df2("key") && df1("value") > df2("value")
val df3 = df1.join(broadcast(df2), joinExpression, joinType)
val plan = df3.queryExecution.executedPlan
val plan = df3.queryExecution.sparkPlan
assert(plan.collect { case p: T => p }.size === 1)
plan.executeCollect()
}
Expand Down
Expand Up @@ -615,7 +615,7 @@ class HiveContext private[hive](
* Returns the result as a hive compatible sequence of strings. For native commands, the
* execution is simply passed back to Hive.
*/
def stringResult(): Seq[String] = executedPlan match {
def stringResult(): Seq[String] = sparkPlan match {
case ExecutedCommand(desc: DescribeHiveTableCommand) =>
// If it is a describe command for a Hive table, we want to have the output format
// be similar with Hive.
Expand Down
Expand Up @@ -107,7 +107,7 @@ private[hive] trait HiveStrategies {
DescribeHiveTableCommand(t, describe.output, describe.isExtended)) :: Nil

case o: LogicalPlan =>
val resultPlan = context.executePlan(o).executedPlan
val resultPlan = context.executePlan(o).sparkPlan
ExecutedCommand(RunnableDescribeCommand(
resultPlan, describe.output, describe.isExtended)) :: Nil
}
Expand Down
Expand Up @@ -30,12 +30,12 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton {
import hiveContext._

def rddIdOf(tableName: String): Int = {
val executedPlan = table(tableName).queryExecution.executedPlan
executedPlan.collect {
val plan = table(tableName).queryExecution.sparkPlan
plan.collect {
case InMemoryColumnarTableScan(_, _, relation) =>
relation.cachedColumnBuffers.id
case _ =>
fail(s"Table $tableName is not cached\n" + executedPlan)
fail(s"Table $tableName is not cached\n" + plan)
}.head
}

Expand Down
Expand Up @@ -477,15 +477,15 @@ abstract class HiveComparisonTest
// If this query is reading other tables that were created during this test run
// also print out the query plans and results for those.
val computedTablesMessages: String = try {
val tablesRead = new TestHive.QueryExecution(query).executedPlan.collect {
val tablesRead = new TestHive.QueryExecution(query).sparkPlan.collect {
case ts: HiveTableScan => ts.relation.tableName
}.toSet

TestHive.reset()
val executions = queryList.map(new TestHive.QueryExecution(_))
executions.foreach(_.toRdd)
val tablesGenerated = queryList.zip(executions).flatMap {
case (q, e) => e.executedPlan.collect {
case (q, e) => e.sparkPlan.collect {
case i: InsertIntoHiveTable if tablesRead contains i.table.tableName =>
(q, e, i)
}
Expand Down
Expand Up @@ -43,7 +43,7 @@ class HiveTypeCoercionSuite extends HiveComparisonTest {

test("[SPARK-2210] boolean cast on boolean value should be removed") {
val q = "select cast(cast(key=0 as boolean) as boolean) from src"
val project = TestHive.sql(q).queryExecution.executedPlan.collect {
val project = TestHive.sql(q).queryExecution.sparkPlan.collect {
case e: Project => e
}.head

Expand Down
Expand Up @@ -144,7 +144,7 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter {
expectedScannedColumns: Seq[String],
expectedPartValues: Seq[Seq[String]]): Unit = {
test(s"$testCaseName - pruning test") {
val plan = new TestHive.QueryExecution(sql).executedPlan
val plan = new TestHive.QueryExecution(sql).sparkPlan
val actualOutputColumns = plan.output.map(_.name)
val (actualScannedColumns, actualPartValues) = plan.collect {
case p @ HiveTableScan(columns, relation, _) =>
Expand Down
Expand Up @@ -190,11 +190,11 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {

test(s"conversion is working") {
assert(
sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
sql("SELECT * FROM normal_parquet").queryExecution.sparkPlan.collect {
case _: HiveTableScan => true
}.isEmpty)
assert(
sql("SELECT * FROM normal_parquet").queryExecution.executedPlan.collect {
sql("SELECT * FROM normal_parquet").queryExecution.sparkPlan.collect {
case _: PhysicalRDD => true
}.nonEmpty)
}
Expand Down Expand Up @@ -305,7 +305,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
""".stripMargin)

val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
df.queryExecution.executedPlan match {
df.queryExecution.sparkPlan match {
case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation, _, _)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[ParquetRelation].getCanonicalName} and " +
Expand Down Expand Up @@ -335,7 +335,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
""".stripMargin)

val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
df.queryExecution.executedPlan match {
df.queryExecution.sparkPlan match {
case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation, _, _)) => // OK
case o => fail("test_insert_parquet should be converted to a " +
s"${classOf[ParquetRelation].getCanonicalName} and " +
Expand Down
Expand Up @@ -149,7 +149,7 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {

sqlContext.range(2).select('id as 'a, 'id as 'b).write.partitionBy("b").parquet(path)
val df = sqlContext.read.parquet(path).filter('a === 0).select('b)
val physicalPlan = df.queryExecution.executedPlan
val physicalPlan = df.queryExecution.sparkPlan

assert(physicalPlan.collect { case p: execution.Project => p }.length === 1)
assert(physicalPlan.collect { case p: execution.Filter => p }.length === 1)
Expand Down
Expand Up @@ -156,9 +156,9 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
test(s"pruning and filtering: df.select(${projections.mkString(", ")}).where($filter)") {
val df = partitionedDF.where(filter).select(projections: _*)
val queryExecution = df.queryExecution
val executedPlan = queryExecution.executedPlan
val sparkPlan = queryExecution.sparkPlan

val rawScan = executedPlan.collect {
val rawScan = sparkPlan.collect {
case p: PhysicalRDD => p
} match {
case Seq(scan) => scan
Expand All @@ -177,7 +177,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest with Predicat
assert(requiredColumns === SimpleTextRelation.requiredColumns)

val nonPushedFilters = {
val boundFilters = executedPlan.collect {
val boundFilters = sparkPlan.collect {
case f: execution.Filter => f
} match {
case Nil => Nil
Expand Down

0 comments on commit 1feab20

Please sign in to comment.