diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala b/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala new file mode 100644 index 0000000000000..f0e6a8f332188 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.annotation.Experimental + +/** + * Holder for experimental methods for the bravest. We make NO guarantee about the stability + * regarding binary compatibility and source compatibility of methods here. + */ +@Experimental +class ExperimentalMethods protected[sql](sqlContext: SQLContext) { + + /** + * Allows extra strategies to be injected into the query planner at runtime. Note this API + * should be consider experimental and is not intended to be stable across releases. + */ + @Experimental + var extraStrategies: Seq[Strategy] = Nil + +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index d9f3b3a53f582..279671ced0a17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -17,15 +17,16 @@ package org.apache.spark.sql +import java.beans.Introspector import java.util.Properties import scala.collection.immutable import scala.language.implicitConversions import scala.reflect.runtime.universe.TypeTag -import org.apache.hadoop.conf.Configuration import org.apache.spark.SparkContext import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental} +import org.apache.spark.api.java.JavaRDD import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.ScalaReflection import org.apache.spark.sql.catalyst.analysis._ @@ -36,9 +37,9 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution._ import org.apache.spark.sql.json._ -import org.apache.spark.sql.parquet.ParquetRelation -import org.apache.spark.sql.sources.{BaseRelation, DDLParser, DataSourceStrategy, LogicalRelation} +import org.apache.spark.sql.sources.{LogicalRelation, BaseRelation, DDLParser, DataSourceStrategy} import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils /** * :: AlphaComponent :: @@ -59,7 +60,7 @@ class SQLContext(@transient val sparkContext: SparkContext) self => // Note that this is a lazy val so we can override the default value in subclasses. - private[sql] lazy val conf: SQLConf = new SQLConf + protected[sql] lazy val conf: SQLConf = new SQLConf /** Set Spark SQL configuration properties. */ def setConf(props: Properties): Unit = conf.setConf(props) @@ -117,15 +118,6 @@ class SQLContext(@transient val sparkContext: SparkContext) case _ => } - /** - * :: DeveloperApi :: - * Allows catalyst LogicalPlans to be executed as a SchemaRDD. Note that the LogicalPlan - * interface is considered internal, and thus not guaranteed to be stable. As a result, using - * them directly is not recommended. - */ - @DeveloperApi - implicit def logicalPlanToSparkQuery(plan: LogicalPlan): SchemaRDD = new SchemaRDD(this, plan) - /** * Creates a SchemaRDD from an RDD of case classes. * @@ -139,8 +131,11 @@ class SQLContext(@transient val sparkContext: SparkContext) new SchemaRDD(this, LogicalRDD(attributeSeq, rowRDD)(self)) } - implicit def baseRelationToSchemaRDD(baseRelation: BaseRelation): SchemaRDD = { - logicalPlanToSparkQuery(LogicalRelation(baseRelation)) + /** + * Convert a [[BaseRelation]] created for external data sources into a [[SchemaRDD]]. + */ + def baseRelationToSchemaRDD(baseRelation: BaseRelation): SchemaRDD = { + new SchemaRDD(this, LogicalRelation(baseRelation)) } /** @@ -181,6 +176,43 @@ class SQLContext(@transient val sparkContext: SparkContext) new SchemaRDD(this, logicalPlan) } + /** + * Applies a schema to an RDD of Java Beans. + * + * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, + * SELECT * queries will return the columns in an undefined order. + */ + def applySchema(rdd: RDD[_], beanClass: Class[_]): SchemaRDD = { + val attributeSeq = getSchema(beanClass) + val className = beanClass.getName + val rowRdd = rdd.mapPartitions { iter => + // BeanInfo is not serializable so we must rediscover it remotely for each partition. + val localBeanInfo = Introspector.getBeanInfo( + Class.forName(className, true, Utils.getContextOrSparkClassLoader)) + val extractors = + localBeanInfo.getPropertyDescriptors.filterNot(_.getName == "class").map(_.getReadMethod) + + iter.map { row => + new GenericRow( + extractors.zip(attributeSeq).map { case (e, attr) => + DataTypeConversions.convertJavaToCatalyst(e.invoke(row), attr.dataType) + }.toArray[Any] + ) : Row + } + } + new SchemaRDD(this, LogicalRDD(attributeSeq, rowRdd)(this)) + } + + /** + * Applies a schema to an RDD of Java Beans. + * + * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, + * SELECT * queries will return the columns in an undefined order. + */ + def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): SchemaRDD = { + applySchema(rdd.rdd, beanClass) + } + /** * Loads a Parquet file, returning the result as a [[SchemaRDD]]. * @@ -259,41 +291,6 @@ class SQLContext(@transient val sparkContext: SparkContext) applySchema(rowRDD, appliedSchema) } - /** - * :: Experimental :: - * Creates an empty parquet file with the schema of class `A`, which can be registered as a table. - * This registered table can be used as the target of future `insertInto` operations. - * - * {{{ - * val sqlContext = new SQLContext(...) - * import sqlContext._ - * - * case class Person(name: String, age: Int) - * createParquetFile[Person]("path/to/file.parquet").registerTempTable("people") - * sql("INSERT INTO people SELECT 'michael', 29") - * }}} - * - * @tparam A A case class type that describes the desired schema of the parquet file to be - * created. - * @param path The path where the directory containing parquet metadata should be created. - * Data inserted into this table will also be stored at this location. - * @param allowExisting When false, an exception will be thrown if this directory already exists. - * @param conf A Hadoop configuration object that can be used to specify options to the parquet - * output format. - * - * @group userf - */ - @Experimental - def createParquetFile[A <: Product : TypeTag]( - path: String, - allowExisting: Boolean = true, - conf: Configuration = new Configuration()): SchemaRDD = { - new SchemaRDD( - this, - ParquetRelation.createEmpty( - path, ScalaReflection.attributesFor[A], allowExisting, conf, this)) - } - /** * Registers the given RDD as a temporary table in the catalog. Temporary tables exist only * during the lifetime of this instance of SQLContext. @@ -336,12 +333,10 @@ class SQLContext(@transient val sparkContext: SparkContext) new SchemaRDD(this, catalog.lookupRelation(Seq(tableName))) /** - * :: DeveloperApi :: - * Allows extra strategies to be injected into the query planner at runtime. Note this API - * should be consider experimental and is not intended to be stable across releases. + * A collection of methods that are considered experimental, but can be used to hook into + * the query planner for advanced functionalities. */ - @DeveloperApi - var extraStrategies: Seq[Strategy] = Nil + val experimental: ExperimentalMethods = new ExperimentalMethods(this) protected[sql] class SparkPlanner extends SparkStrategies { val sparkContext: SparkContext = self.sparkContext @@ -353,7 +348,7 @@ class SQLContext(@transient val sparkContext: SparkContext) def numPartitions = self.conf.numShufflePartitions def strategies: Seq[Strategy] = - extraStrategies ++ ( + experimental.extraStrategies ++ ( DataSourceStrategy :: DDLStrategy :: TakeOrdered :: @@ -479,14 +474,14 @@ class SQLContext(@transient val sparkContext: SparkContext) * have the same format as the one generated by `toString` in scala. * It is only used by PySpark. */ - private[sql] def parseDataType(dataTypeString: String): DataType = { + protected[sql] def parseDataType(dataTypeString: String): DataType = { DataType.fromJson(dataTypeString) } /** * Apply a schema defined by the schemaString to an RDD. It is only used by PySpark. */ - private[sql] def applySchemaToPythonRDD( + protected[sql] def applySchemaToPythonRDD( rdd: RDD[Array[Any]], schemaString: String): SchemaRDD = { val schema = parseDataType(schemaString).asInstanceOf[StructType] @@ -496,7 +491,7 @@ class SQLContext(@transient val sparkContext: SparkContext) /** * Apply a schema defined by the schema to an RDD. It is only used by PySpark. */ - private[sql] def applySchemaToPythonRDD( + protected[sql] def applySchemaToPythonRDD( rdd: RDD[Array[Any]], schema: StructType): SchemaRDD = { @@ -527,4 +522,43 @@ class SQLContext(@transient val sparkContext: SparkContext) new SchemaRDD(this, LogicalRDD(schema.toAttributes, rowRdd)(self)) } + + /** + * Returns a Catalyst Schema for the given java bean class. + */ + protected def getSchema(beanClass: Class[_]): Seq[AttributeReference] = { + // TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific. + val beanInfo = Introspector.getBeanInfo(beanClass) + + // Note: The ordering of elements may differ from when the schema is inferred in Scala. + // This is because beanInfo.getPropertyDescriptors gives no guarantees about + // element ordering. + val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class") + fields.map { property => + val (dataType, nullable) = property.getPropertyType match { + case c: Class[_] if c.isAnnotationPresent(classOf[SQLUserDefinedType]) => + (c.getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance(), true) + case c: Class[_] if c == classOf[java.lang.String] => (StringType, true) + case c: Class[_] if c == java.lang.Short.TYPE => (ShortType, false) + case c: Class[_] if c == java.lang.Integer.TYPE => (IntegerType, false) + case c: Class[_] if c == java.lang.Long.TYPE => (LongType, false) + case c: Class[_] if c == java.lang.Double.TYPE => (DoubleType, false) + case c: Class[_] if c == java.lang.Byte.TYPE => (ByteType, false) + case c: Class[_] if c == java.lang.Float.TYPE => (FloatType, false) + case c: Class[_] if c == java.lang.Boolean.TYPE => (BooleanType, false) + + case c: Class[_] if c == classOf[java.lang.Short] => (ShortType, true) + case c: Class[_] if c == classOf[java.lang.Integer] => (IntegerType, true) + case c: Class[_] if c == classOf[java.lang.Long] => (LongType, true) + case c: Class[_] if c == classOf[java.lang.Double] => (DoubleType, true) + case c: Class[_] if c == classOf[java.lang.Byte] => (ByteType, true) + case c: Class[_] if c == classOf[java.lang.Float] => (FloatType, true) + case c: Class[_] if c == classOf[java.lang.Boolean] => (BooleanType, true) + case c: Class[_] if c == classOf[java.math.BigDecimal] => (DecimalType(), true) + case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true) + case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, true) + } + AttributeReference(property.getName, dataType, nullable)() + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index af6b07bd6c2f4..52a31f01a4358 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -20,11 +20,11 @@ package org.apache.spark.sql.execution import org.apache.spark.Logging import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{SchemaRDD, SQLConf, SQLContext} import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions.{Row, Attribute} import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.{SQLConf, SQLContext} /** * A logical command that is executed for its side-effects. `RunnableCommand`s are @@ -137,14 +137,12 @@ case class CacheTableCommand( isLazy: Boolean) extends RunnableCommand { override def run(sqlContext: SQLContext) = { - import sqlContext._ - - plan.foreach(_.registerTempTable(tableName)) - cacheTable(tableName) + plan.foreach(p => new SchemaRDD(sqlContext, p).registerTempTable(tableName)) + sqlContext.cacheTable(tableName) if (!isLazy) { // Performs eager caching - table(tableName).count() + sqlContext.table(tableName).count() } Seq.empty[Row] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 4cc9641c4d9e0..381298caba6f2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -22,7 +22,7 @@ import scala.util.parsing.combinator.syntactical.StandardTokenParsers import scala.util.parsing.combinator.PackratParsers import org.apache.spark.Logging -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{SchemaRDD, SQLContext} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.SqlLexical import org.apache.spark.sql.execution.RunnableCommand @@ -234,8 +234,7 @@ private [sql] case class CreateTempTableUsing( def run(sqlContext: SQLContext) = { val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options) - - sqlContext.baseRelationToSchemaRDD(resolved.relation).registerTempTable(tableName) + new SchemaRDD(sqlContext, LogicalRelation(resolved.relation)).registerTempTable(tableName) Seq.empty } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala index 8c80be106f3cb..f9c082216085d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/test/TestSQLContext.scala @@ -17,8 +17,11 @@ package org.apache.spark.sql.test +import scala.language.implicitConversions + import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.{SQLConf, SQLContext} +import org.apache.spark.sql.{SchemaRDD, SQLConf, SQLContext} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan /** A SQLContext that can be used for local testing. */ object TestSQLContext @@ -29,7 +32,16 @@ object TestSQLContext new SparkConf().set("spark.sql.testkey", "true"))) { /** Fewer partitions to speed up testing. */ - private[sql] override lazy val conf: SQLConf = new SQLConf { + protected[sql] override lazy val conf: SQLConf = new SQLConf { override def numShufflePartitions: Int = this.getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt } + + /** + * Turn a logical plan into a SchemaRDD. This should be removed once we have an easier way to + * construct SchemaRDD directly out of local data without relying on implicits. + */ + protected[sql] implicit def logicalPlanToSparkQuery(plan: LogicalPlan): SchemaRDD = { + new SchemaRDD(this, plan) + } + } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala deleted file mode 100644 index c87d762751e6d..0000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala +++ /dev/null @@ -1,160 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql - -import _root_.java.io.File - -/* Implicits */ -import org.apache.spark.sql.test.TestSQLContext._ - -class InsertIntoSuite extends QueryTest { - TestData // Initialize TestData - import TestData._ - - test("insertInto() created parquet file") { - val testFilePath = File.createTempFile("sparkSql", "pqt") - testFilePath.delete() - testFilePath.deleteOnExit() - val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath) - testFile.registerTempTable("createAndInsertTest") - - // Add some data. - testData.insertInto("createAndInsertTest") - - // Make sure its there for a new instance of parquet file. - checkAnswer( - parquetFile(testFilePath.getCanonicalPath), - testData.collect().toSeq - ) - - // Make sure the registered table has also been updated. - checkAnswer( - sql("SELECT * FROM createAndInsertTest"), - testData.collect().toSeq - ) - - // Add more data. - testData.insertInto("createAndInsertTest") - - // Make sure all data is there for a new instance of parquet file. - checkAnswer( - parquetFile(testFilePath.getCanonicalPath), - testData.collect().toSeq ++ testData.collect().toSeq - ) - - // Make sure the registered table has also been updated. - checkAnswer( - sql("SELECT * FROM createAndInsertTest"), - testData.collect().toSeq ++ testData.collect().toSeq - ) - - // Now overwrite. - testData.insertInto("createAndInsertTest", overwrite = true) - - // Make sure its there for a new instance of parquet file. - checkAnswer( - parquetFile(testFilePath.getCanonicalPath), - testData.collect().toSeq - ) - - // Make sure the registered table has also been updated. - checkAnswer( - sql("SELECT * FROM createAndInsertTest"), - testData.collect().toSeq - ) - - testFilePath.delete() - } - - test("INSERT INTO parquet table") { - val testFilePath = File.createTempFile("sparkSql", "pqt") - testFilePath.delete() - testFilePath.deleteOnExit() - val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath) - testFile.registerTempTable("createAndInsertSQLTest") - - sql("INSERT INTO createAndInsertSQLTest SELECT * FROM testData") - - // Make sure its there for a new instance of parquet file. - checkAnswer( - parquetFile(testFilePath.getCanonicalPath), - testData.collect().toSeq - ) - - // Make sure the registered table has also been updated. - checkAnswer( - sql("SELECT * FROM createAndInsertSQLTest"), - testData.collect().toSeq - ) - - // Append more data. - sql("INSERT INTO createAndInsertSQLTest SELECT * FROM testData") - - // Make sure all data is there for a new instance of parquet file. - checkAnswer( - parquetFile(testFilePath.getCanonicalPath), - testData.collect().toSeq ++ testData.collect().toSeq - ) - - // Make sure the registered table has also been updated. - checkAnswer( - sql("SELECT * FROM createAndInsertSQLTest"), - testData.collect().toSeq ++ testData.collect().toSeq - ) - - sql("INSERT OVERWRITE INTO createAndInsertSQLTest SELECT * FROM testData") - - // Make sure its there for a new instance of parquet file. - checkAnswer( - parquetFile(testFilePath.getCanonicalPath), - testData.collect().toSeq - ) - - // Make sure the registered table has also been updated. - checkAnswer( - sql("SELECT * FROM createAndInsertSQLTest"), - testData.collect().toSeq - ) - - testFilePath.delete() - } - - test("Double create fails when allowExisting = false") { - val testFilePath = File.createTempFile("sparkSql", "pqt") - testFilePath.delete() - testFilePath.deleteOnExit() - val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath) - - intercept[RuntimeException] { - createParquetFile[TestData](testFilePath.getCanonicalPath, allowExisting = false) - } - - testFilePath.delete() - } - - test("Double create does not fail when allowExisting = true") { - val testFilePath = File.createTempFile("sparkSql", "pqt") - testFilePath.delete() - testFilePath.deleteOnExit() - val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath) - - createParquetFile[TestData](testFilePath.getCanonicalPath, allowExisting = true) - - testFilePath.delete() - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index fe781ec05fb6f..3a073a6b7057e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -402,23 +402,6 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA Utils.deleteRecursively(file) } - test("Insert (overwrite) via Scala API") { - val dirname = Utils.createTempDir() - val source_rdd = TestSQLContext.sparkContext.parallelize((1 to 100)) - .map(i => TestRDDEntry(i, s"val_$i")) - source_rdd.registerTempTable("source") - val dest_rdd = createParquetFile[TestRDDEntry](dirname.toString) - dest_rdd.registerTempTable("dest") - sql("INSERT OVERWRITE INTO dest SELECT * FROM source").collect() - val rdd_copy1 = sql("SELECT * FROM dest").collect() - assert(rdd_copy1.size === 100) - - sql("INSERT INTO dest SELECT * FROM source") - val rdd_copy2 = sql("SELECT * FROM dest").collect().sortBy(_.getInt(0)) - assert(rdd_copy2.size === 200) - Utils.deleteRecursively(dirname) - } - test("Insert (appending) to same table via Scala API") { sql("INSERT INTO testsource SELECT * FROM testsource") val double_rdd = sql("SELECT * FROM testsource").collect() @@ -902,15 +885,6 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA Utils.deleteRecursively(tmpdir) } - test("Querying on empty parquet throws exception (SPARK-3536)") { - val tmpdir = Utils.createTempDir() - Utils.deleteRecursively(tmpdir) - createParquetFile[TestRDDEntry](tmpdir.toString()).registerTempTable("tmpemptytable") - val result1 = sql("SELECT * FROM tmpemptytable").collect() - assert(result1.size === 0) - Utils.deleteRecursively(tmpdir) - } - test("read/write fixed-length decimals") { for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17))) { val tempDir = getTempFilePath("parquetTest").getCanonicalPath diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala index daa7ca65cd993..4c081fb4510b2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite2.scala @@ -34,19 +34,6 @@ class ParquetQuerySuite2 extends QueryTest with ParquetTest { } } - test("insertion") { - withTempDir { dir => - val data = (0 until 10).map(i => (i, i.toString)) - withParquetTable(data, "t") { - createParquetFile[(Int, String)](dir.toString).registerTempTable("dest") - withTempTable("dest") { - sql("INSERT OVERWRITE INTO dest SELECT * FROM t") - checkAnswer(table("dest"), data) - } - } - } - } - test("appending") { val data = (0 until 10).map(i => (i, i.toString)) withParquetTable(data, "t") { @@ -98,13 +85,4 @@ class ParquetQuerySuite2 extends QueryTest with ParquetTest { checkAnswer(sql(s"SELECT _1 FROM t WHERE _1 < 10"), (1 to 9).map(Row.apply(_))) } } - - test("SPARK-3536 regression: query empty Parquet file shouldn't throw") { - withTempDir { dir => - createParquetFile[(Int, String)](dir.toString).registerTempTable("t") - withTempTable("t") { - checkAnswer(sql("SELECT * FROM t"), Seq.empty[Row]) - } - } - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index bf56e60cf995f..a9a20a54bebe8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -71,7 +71,7 @@ class LocalHiveContext(sc: SparkContext) extends HiveContext(sc) { class HiveContext(sc: SparkContext) extends SQLContext(sc) { self => - private[sql] override lazy val conf: SQLConf = new SQLConf { + protected[sql] override lazy val conf: SQLConf = new SQLConf { override def dialect: String = getConf(SQLConf.DIALECT, "hiveql") } @@ -348,7 +348,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { val hivePlanner = new SparkPlanner with HiveStrategies { val hiveContext = self - override def strategies: Seq[Strategy] = extraStrategies ++ Seq( + override def strategies: Seq[Strategy] = experimental.extraStrategies ++ Seq( DataSourceStrategy, HiveCommandStrategy(self), HiveDDLStrategy, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index 52e1f0d94fbd4..47431cef03e13 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -102,7 +102,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { new this.QueryExecution { val logical = plan } /** Fewer partitions to speed up testing. */ - private[sql] override lazy val conf: SQLConf = new SQLConf { + protected[sql] override lazy val conf: SQLConf = new SQLConf { override def numShufflePartitions: Int = getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt override def dialect: String = getConf(SQLConf.DIALECT, "hiveql") }