Skip to content

Commit

Permalink
Remove createParquetFile and add applySchema for Java to SQLContext.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Jan 15, 2015
1 parent ecd6685 commit a326a1a
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 245 deletions.
116 changes: 79 additions & 37 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@

package org.apache.spark.sql

import java.beans.Introspector
import java.util.Properties

import scala.collection.immutable
import scala.language.implicitConversions
import scala.reflect.runtime.universe.TypeTag

import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis._
Expand All @@ -36,9 +37,9 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.execution._
import org.apache.spark.sql.json._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.sources.{LogicalRelation, BaseRelation, DDLParser, DataSourceStrategy}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

/**
* :: AlphaComponent ::
Expand Down Expand Up @@ -175,6 +176,43 @@ class SQLContext(@transient val sparkContext: SparkContext)
new SchemaRDD(this, logicalPlan)
}

/**
* Applies a schema to an RDD of Java Beans.
*
* WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
* SELECT * queries will return the columns in an undefined order.
*/
def applySchema(rdd: RDD[_], beanClass: Class[_]): SchemaRDD = {
val attributeSeq = getSchema(beanClass)
val className = beanClass.getName
val rowRdd = rdd.mapPartitions { iter =>
// BeanInfo is not serializable so we must rediscover it remotely for each partition.
val localBeanInfo = Introspector.getBeanInfo(
Class.forName(className, true, Utils.getContextOrSparkClassLoader))
val extractors =
localBeanInfo.getPropertyDescriptors.filterNot(_.getName == "class").map(_.getReadMethod)

iter.map { row =>
new GenericRow(
extractors.zip(attributeSeq).map { case (e, attr) =>
DataTypeConversions.convertJavaToCatalyst(e.invoke(row), attr.dataType)
}.toArray[Any]
) : Row
}
}
new SchemaRDD(this, LogicalRDD(attributeSeq, rowRdd)(this))
}

/**
* Applies a schema to an RDD of Java Beans.
*
* WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
* SELECT * queries will return the columns in an undefined order.
*/
def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): SchemaRDD = {
applySchema(rdd.rdd, beanClass)
}

/**
* Loads a Parquet file, returning the result as a [[SchemaRDD]].
*
Expand Down Expand Up @@ -253,41 +291,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
applySchema(rowRDD, appliedSchema)
}

/**
* :: Experimental ::
* Creates an empty parquet file with the schema of class `A`, which can be registered as a table.
* This registered table can be used as the target of future `insertInto` operations.
*
* {{{
* val sqlContext = new SQLContext(...)
* import sqlContext._
*
* case class Person(name: String, age: Int)
* createParquetFile[Person]("path/to/file.parquet").registerTempTable("people")
* sql("INSERT INTO people SELECT 'michael', 29")
* }}}
*
* @tparam A A case class type that describes the desired schema of the parquet file to be
* created.
* @param path The path where the directory containing parquet metadata should be created.
* Data inserted into this table will also be stored at this location.
* @param allowExisting When false, an exception will be thrown if this directory already exists.
* @param conf A Hadoop configuration object that can be used to specify options to the parquet
* output format.
*
* @group userf
*/
@Experimental
def createParquetFile[A <: Product : TypeTag](
path: String,
allowExisting: Boolean = true,
conf: Configuration = new Configuration()): SchemaRDD = {
new SchemaRDD(
this,
ParquetRelation.createEmpty(
path, ScalaReflection.attributesFor[A], allowExisting, conf, this))
}

/**
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
* during the lifetime of this instance of SQLContext.
Expand Down Expand Up @@ -519,4 +522,43 @@ class SQLContext(@transient val sparkContext: SparkContext)

new SchemaRDD(this, LogicalRDD(schema.toAttributes, rowRdd)(self))
}

/**
* Returns a Catalyst Schema for the given java bean class.
*/
protected def getSchema(beanClass: Class[_]): Seq[AttributeReference] = {
// TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
val beanInfo = Introspector.getBeanInfo(beanClass)

// Note: The ordering of elements may differ from when the schema is inferred in Scala.
// This is because beanInfo.getPropertyDescriptors gives no guarantees about
// element ordering.
val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
fields.map { property =>
val (dataType, nullable) = property.getPropertyType match {
case c: Class[_] if c.isAnnotationPresent(classOf[SQLUserDefinedType]) =>
(c.getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance(), true)
case c: Class[_] if c == classOf[java.lang.String] => (StringType, true)
case c: Class[_] if c == java.lang.Short.TYPE => (ShortType, false)
case c: Class[_] if c == java.lang.Integer.TYPE => (IntegerType, false)
case c: Class[_] if c == java.lang.Long.TYPE => (LongType, false)
case c: Class[_] if c == java.lang.Double.TYPE => (DoubleType, false)
case c: Class[_] if c == java.lang.Byte.TYPE => (ByteType, false)
case c: Class[_] if c == java.lang.Float.TYPE => (FloatType, false)
case c: Class[_] if c == java.lang.Boolean.TYPE => (BooleanType, false)

case c: Class[_] if c == classOf[java.lang.Short] => (ShortType, true)
case c: Class[_] if c == classOf[java.lang.Integer] => (IntegerType, true)
case c: Class[_] if c == classOf[java.lang.Long] => (LongType, true)
case c: Class[_] if c == classOf[java.lang.Double] => (DoubleType, true)
case c: Class[_] if c == classOf[java.lang.Byte] => (ByteType, true)
case c: Class[_] if c == classOf[java.lang.Float] => (FloatType, true)
case c: Class[_] if c == classOf[java.lang.Boolean] => (BooleanType, true)
case c: Class[_] if c == classOf[java.math.BigDecimal] => (DecimalType(), true)
case c: Class[_] if c == classOf[java.sql.Date] => (DateType, true)
case c: Class[_] if c == classOf[java.sql.Timestamp] => (TimestampType, true)
}
AttributeReference(property.getName, dataType, nullable)()
}
}
}
160 changes: 0 additions & 160 deletions sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -402,23 +402,6 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
Utils.deleteRecursively(file)
}

test("Insert (overwrite) via Scala API") {
val dirname = Utils.createTempDir()
val source_rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
.map(i => TestRDDEntry(i, s"val_$i"))
source_rdd.registerTempTable("source")
val dest_rdd = createParquetFile[TestRDDEntry](dirname.toString)
dest_rdd.registerTempTable("dest")
sql("INSERT OVERWRITE INTO dest SELECT * FROM source").collect()
val rdd_copy1 = sql("SELECT * FROM dest").collect()
assert(rdd_copy1.size === 100)

sql("INSERT INTO dest SELECT * FROM source")
val rdd_copy2 = sql("SELECT * FROM dest").collect().sortBy(_.getInt(0))
assert(rdd_copy2.size === 200)
Utils.deleteRecursively(dirname)
}

test("Insert (appending) to same table via Scala API") {
sql("INSERT INTO testsource SELECT * FROM testsource")
val double_rdd = sql("SELECT * FROM testsource").collect()
Expand Down Expand Up @@ -902,15 +885,6 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA
Utils.deleteRecursively(tmpdir)
}

test("Querying on empty parquet throws exception (SPARK-3536)") {
val tmpdir = Utils.createTempDir()
Utils.deleteRecursively(tmpdir)
createParquetFile[TestRDDEntry](tmpdir.toString()).registerTempTable("tmpemptytable")
val result1 = sql("SELECT * FROM tmpemptytable").collect()
assert(result1.size === 0)
Utils.deleteRecursively(tmpdir)
}

test("read/write fixed-length decimals") {
for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17))) {
val tempDir = getTempFilePath("parquetTest").getCanonicalPath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,6 @@ class ParquetQuerySuite2 extends QueryTest with ParquetTest {
}
}

test("insertion") {
withTempDir { dir =>
val data = (0 until 10).map(i => (i, i.toString))
withParquetTable(data, "t") {
createParquetFile[(Int, String)](dir.toString).registerTempTable("dest")
withTempTable("dest") {
sql("INSERT OVERWRITE INTO dest SELECT * FROM t")
checkAnswer(table("dest"), data)
}
}
}
}

test("appending") {
val data = (0 until 10).map(i => (i, i.toString))
withParquetTable(data, "t") {
Expand Down Expand Up @@ -98,13 +85,4 @@ class ParquetQuerySuite2 extends QueryTest with ParquetTest {
checkAnswer(sql(s"SELECT _1 FROM t WHERE _1 < 10"), (1 to 9).map(Row.apply(_)))
}
}

test("SPARK-3536 regression: query empty Parquet file shouldn't throw") {
withTempDir { dir =>
createParquetFile[(Int, String)](dir.toString).registerTempTable("t")
withTempTable("t") {
checkAnswer(sql("SELECT * FROM t"), Seq.empty[Row])
}
}
}
}

0 comments on commit a326a1a

Please sign in to comment.