Skip to content

Commit

Permalink
Address review remarks: AnalysisException & negative test
Browse files Browse the repository at this point in the history
  • Loading branch information
adrian-ionescu committed Apr 2, 2017
1 parent 3b031c7 commit 3ad0327
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ object ExternalCatalogUtils {
_.references.map(_.name).toSet.subsetOf(partitionColumnNames)
}
if (nonPartitionPruningPredicates.nonEmpty) {
sys.error("Expected only partition pruning predicates: " + nonPartitionPruningPredicates)
throw new AnalysisException("Expected only partition pruning predicates: " +
nonPartitionPruningPredicates)
}

val boundPredicate =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{FunctionAlreadyExistsException, NoSuchDatabaseException, NoSuchFunctionException}
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -439,7 +440,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
}

test("list partitions by filter") {
val tz = TimeZone.getDefault().getID()
val tz = TimeZone.getDefault.getID
val catalog = newBasicCatalog()

def checkAnswer(table: CatalogTable, filters: Seq[Expression],
Expand All @@ -450,23 +451,28 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
}
}

def pcol(table: CatalogTable, name: String): Expression = {
val col = table.partitionSchema(name)
AttributeReference(col.name, col.dataType, col.nullable)()
}
val tbl2 = catalog.getTable("db2", "tbl2")

checkAnswer(tbl2, Seq.empty, Set(part1, part2))
checkAnswer(tbl2, Seq(EqualTo(pcol(tbl2, "a"), Literal(1))), Set(part1))
checkAnswer(tbl2, Seq(EqualTo(pcol(tbl2, "a"), Literal(2))), Set.empty)
checkAnswer(tbl2, Seq(In(pcol(tbl2, "a"), Seq(Literal(3)))), Set(part2))
checkAnswer(tbl2, Seq(Not(In(pcol(tbl2, "a"), Seq(Literal(4))))), Set(part1, part2))
checkAnswer(tbl2, Seq(
EqualTo(pcol(tbl2, "a"), Literal(1)),
EqualTo(pcol(tbl2, "b"), Literal("2"))), Set(part1))
checkAnswer(tbl2, Seq(
EqualTo(pcol(tbl2, "a"), Literal(1)),
EqualTo(pcol(tbl2, "b"), Literal("x"))), Set.empty)
checkAnswer(tbl2, Seq('a.int <= 1), Set(part1))
checkAnswer(tbl2, Seq('a.int === 2), Set.empty)
checkAnswer(tbl2, Seq(In('a.int * 10, Seq(30))), Set(part2))
checkAnswer(tbl2, Seq(Not(In('a.int, Seq(4)))), Set(part1, part2))
checkAnswer(tbl2, Seq('a.int === 1, 'b.string === "2"), Set(part1))
checkAnswer(tbl2, Seq('a.int === 1 && 'b.string === "2"), Set(part1))
checkAnswer(tbl2, Seq('a.int === 1, 'b.string === "x"), Set.empty)
checkAnswer(tbl2, Seq('a.int === 1 || 'b.string === "x"), Set(part1))

intercept[AnalysisException] {
try {
checkAnswer(tbl2, Seq('a.int > 0 && 'col1.int > 0), Set.empty)
} catch {
// HiveExternalCatalog may be the first one to notice and throw an exception, which will
// then be caught and converted into a RuntimeException with a descriptive message.
case ex: RuntimeException if ex.getMessage.contains("MetaException") =>
throw new AnalysisException("")
}
}
}

test("drop partitions") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.types.StructType

Expand Down

0 comments on commit 3ad0327

Please sign in to comment.