Skip to content

Commit

Permalink
Removed Java UserDefinedType, and made UDTs private[spark] for now
Browse files Browse the repository at this point in the history
  • Loading branch information
jkbradley committed Oct 31, 2014
1 parent 9523f5d commit 81ecfc3
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ object ScalaReflection {
// Java appends a '$' to the object name but Scala does not).
val udt = Utils.classForName(className)
.getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance()
UDTRegistry.registerType(udt)
//UDTRegistry.registerType(udt)
Schema(udt, nullable = true)
case t if UDTRegistry.udtRegistry.contains(t) =>
Schema(UDTRegistry.udtRegistry(t), nullable = true)
//case t if UDTRegistry.udtRegistry.contains(t) =>
//Schema(UDTRegistry.udtRegistry(t), nullable = true)
case t if t <:< typeOf[Option[_]] =>
val TypeRef(_, _, Seq(optType)) = t
Schema(schemaFor(optType).dataType, nullable = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,19 @@ case class MapType(
/**
* ::DeveloperApi::
* The data type for User Defined Types (UDTs).
*
* This interface allows a user to make their own classes more interoperable with SparkSQL;
* e.g., by creating a [[UserDefinedType]] for a class X, it becomes possible to create
* a SchemaRDD which has class X in the schema.
*
* For SparkSQL to recognize UDTs, the UDT must be registered in
* [[org.apache.spark.sql.catalyst.UDTRegistry]]. This registration can be done either
* explicitly by calling [[org.apache.spark.sql.catalyst.UDTRegistry.registerType()]]
* before using the UDT with SparkSQL, or implicitly by annotating the UDT with
* [[org.apache.spark.sql.catalyst.annotation.SQLUserDefinedType]].
*
* The conversion via `serialize` occurs when instantiating a `SchemaRDD` from another RDD.
* The conversion via `deserialize` occurs when reading from a `SchemaRDD`.
*/
@DeveloperApi
abstract class UserDefinedType[UserType] extends DataType with Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.api.java;

import java.io.Serializable;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand All @@ -27,7 +28,7 @@
* To get/create specific data type, users should use singleton objects and factory methods
* provided by this class.
*/
public abstract class DataType {
public abstract class DataType implements Serializable {

/**
* Gets the StringType object.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.api.java;

import java.io.Serializable;

import org.apache.spark.annotation.DeveloperApi;

/**
Expand All @@ -27,7 +29,7 @@
* TODO: Do we need to provide DataType#createUserDefinedType methods?
*/
@DeveloperApi
public abstract class UserDefinedType<UserType> extends DataType {
public abstract class UserDefinedType<UserType> extends DataType implements Serializable {

protected UserDefinedType() { // TODO?
}
Expand Down
14 changes: 1 addition & 13 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{UDTRegistry, ScalaReflection}
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.dsl.ExpressionConversions
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -468,15 +468,3 @@ class SQLContext(@transient val sparkContext: SparkContext)
new SchemaRDD(this, LogicalRDD(schema.toAttributes, rowRdd)(self))
}
}

object SQLContext {

/**
* Registers a User-Defined Type (UDT) so that schemas can include this type.
* UDTs can override built-in types.
*/
def registerUDT(udt: UserDefinedType[_]): Unit = {
UDTRegistry.registerType(udt)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import org.apache.hadoop.conf.Configuration

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.sql.catalyst.UDTRegistry
import org.apache.spark.sql.json.JsonRDD
import org.apache.spark.sql.types.util.DataTypeConversions
import org.apache.spark.sql.{SQLContext, StructType => SStructType}
import org.apache.spark.sql.{StructType => SStructType, SQLContext}
import org.apache.spark.sql.catalyst.annotation.SQLUserDefinedType
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.types.util.DataTypeConversions
import org.apache.spark.sql.types.util.DataTypeConversions.asScalaDataType
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -89,7 +89,7 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
* Applies a schema to an RDD of Java Beans.
*/
def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): JavaSchemaRDD = {
val schema = getSchema(beanClass)
val attributeSeq = getSchema(beanClass)
val className = beanClass.getName
val rowRdd = rdd.rdd.mapPartitions { iter =>
// BeanInfo is not serializable so we must rediscover it remotely for each partition.
Expand All @@ -100,11 +100,13 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {

iter.map { row =>
new GenericRow(
extractors.map(e => DataTypeConversions.convertJavaToCatalyst(e.invoke(row))).toArray[Any]
extractors.zip(attributeSeq).map { case (e, attr) =>
DataTypeConversions.convertJavaToCatalyst(e.invoke(row), attr.dataType)
}.toArray[Any]
): ScalaRow
}
}
new JavaSchemaRDD(sqlContext, LogicalRDD(schema, rowRdd)(sqlContext))
new JavaSchemaRDD(sqlContext, LogicalRDD(attributeSeq, rowRdd)(sqlContext))
}

/**
Expand Down Expand Up @@ -199,6 +201,8 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
fields.map { property =>
val (dataType, nullable) = property.getPropertyType match {
case c: Class[_] if c.isAnnotationPresent(classOf[SQLUserDefinedType]) =>
(c.getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance(), true)
case c: Class[_] if c == classOf[java.lang.String] =>
(org.apache.spark.sql.StringType, true)
case c: Class[_] if c == java.lang.Short.TYPE =>
Expand Down Expand Up @@ -241,13 +245,3 @@ class JavaSQLContext(val sqlContext: SQLContext) extends UDFRegistration {
}
}
}

object JavaSQLContext {
/**
* Registers a User-Defined Type (UDT) so that schemas can include this type.
* UDTs can override built-in types.
*/
def registerUDT(udt: UserDefinedType[_]): Unit = {
UDTRegistry.registerType(UDTWrappers.wrapAsScala(udt))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class Row(private[spark] val row: ScalaRow) extends Serializable {
override def equals(other: Any): Boolean = other match {
case that: Row =>
(that canEqual this) &&
row == that.row
row.equals(that.row)
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@

package org.apache.spark.sql.api.java

import org.apache.spark.sql.{DataType => ScalaDataType, UserDefinedType => ScalaUserDefinedType}
import org.apache.spark.sql.catalyst.types.{UserDefinedType => ScalaUserDefinedType}
import org.apache.spark.sql.{DataType => ScalaDataType}
import org.apache.spark.sql.types.util.DataTypeConversions

/**
* Scala wrapper for a Java UserDefinedType
*/
private[sql] class JavaToScalaUDTWrapper[UserType](val javaUDT: UserDefinedType[UserType])
extends ScalaUserDefinedType[UserType] {
extends ScalaUserDefinedType[UserType] with Serializable {

/** Underlying storage type for this UDT */
val sqlType: ScalaDataType = DataTypeConversions.asScalaDataType(javaUDT.sqlType())
Expand All @@ -42,7 +43,7 @@ private[sql] class JavaToScalaUDTWrapper[UserType](val javaUDT: UserDefinedType[
* Java wrapper for a Scala UserDefinedType
*/
private[sql] class ScalaToJavaUDTWrapper[UserType](val scalaUDT: ScalaUserDefinedType[UserType])
extends UserDefinedType[UserType] {
extends UserDefinedType[UserType] with Serializable {

/** Underlying storage type for this UDT */
val sqlType: DataType = DataTypeConversions.asJavaDataType(scalaUDT.sqlType)
Expand All @@ -58,13 +59,17 @@ private[sql] class ScalaToJavaUDTWrapper[UserType](val scalaUDT: ScalaUserDefine

private[sql] object UDTWrappers {

def wrapAsScala(udtType: UserDefinedType[_]): JavaToScalaUDTWrapper[_] = {
// TODO: Check if we can unwrap instead of wrapping.
new JavaToScalaUDTWrapper(udtType)
def wrapAsScala(udtType: UserDefinedType[_]): ScalaUserDefinedType[_] = {
udtType match {
case t: ScalaToJavaUDTWrapper[_] => t.scalaUDT
case _ => new JavaToScalaUDTWrapper(udtType)
}
}

def wrapAsJava(udtType: ScalaUserDefinedType[_]): ScalaToJavaUDTWrapper[_] = {
// TODO: Check if we can unwrap instead of wrapping.
new ScalaToJavaUDTWrapper(udtType)
def wrapAsJava(udtType: ScalaUserDefinedType[_]): UserDefinedType[_] = {
udtType match {
case t: JavaToScalaUDTWrapper[_] => t.javaUDT
case _ => new ScalaToJavaUDTWrapper(udtType)
}
}
}
24 changes: 0 additions & 24 deletions sql/core/src/main/scala/org/apache/spark/sql/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -414,28 +414,4 @@ package object sql {
*/
@DeveloperApi
val StructField = catalyst.types.StructField

/**
* :: DeveloperApi ::
*
* The data type for User Defined Types (UDTs).
*
* This interface allows a user to make their own classes more interoperable with SparkSQL;
* e.g., by creating a [[UserDefinedType]] for a class X, it becomes possible to create
* a SchemaRDD which has class X in the schema.
*
* For SparkSQL to recognize UDTs, the UDT must be registered in
* [[org.apache.spark.sql.catalyst.UDTRegistry]]. This registration can be done either
* explicitly by calling [[org.apache.spark.sql.catalyst.UDTRegistry.registerType()]]
* before using the UDT with SparkSQL, or implicitly by annotating the UDT with
* [[org.apache.spark.sql.catalyst.annotation.SQLUserDefinedType]].
*
* The conversion via `serialize` occurs when instantiating a `SchemaRDD` from another RDD.
* The conversion via `deserialize` occurs when reading from a `SchemaRDD`.
*/
@DeveloperApi
type UserDefinedType[UserType] = catalyst.types.UserDefinedType[UserType]

@DeveloperApi
type SQLUserDefinedType = catalyst.annotation.SQLUserDefinedType
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

package org.apache.spark.sql.types.util

import scala.collection.JavaConverters._

import org.apache.spark.sql._
import org.apache.spark.sql.api.java.{DataType => JDataType, StructField => JStructField, UDTWrappers, JavaToScalaUDTWrapper}
import org.apache.spark.sql.api.java.{DataType => JDataType, StructField => JStructField,
UDTWrappers}
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.types.UserDefinedType

import scala.collection.JavaConverters._

protected[sql] object DataTypeConversions {

Expand Down Expand Up @@ -116,9 +120,10 @@ protected[sql] object DataTypeConversions {
}

/** Converts Java objects to catalyst rows / types */
def convertJavaToCatalyst(a: Any): Any = a match {
case d: java.math.BigDecimal => BigDecimal(d)
case other => other
def convertJavaToCatalyst(a: Any, dataType: DataType): Any = (a, dataType) match {
case (obj, udt: UserDefinedType[_]) => ScalaReflection.convertToCatalyst(obj, udt) // Scala type
case (d: java.math.BigDecimal, _) => BigDecimal(d)
case (other, _) => other
}

/** Converts Java objects to catalyst rows / types */
Expand Down

0 comments on commit 81ecfc3

Please sign in to comment.