Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions docs/sql-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -304,9 +304,9 @@ registered as a table. Tables can be used in subsequent SQL statements.

Spark SQL supports automatically converting an RDD of
[JavaBeans](http://stackoverflow.com/questions/3295496/what-is-a-javabean-exactly) into a DataFrame.
The `BeanInfo`, obtained using reflection, defines the schema of the table. Currently, Spark SQL
does not support JavaBeans that contain `Map` field(s). Nested JavaBeans and `List` or `Array`
fields are supported though. You can create a JavaBean by creating a class that implements
The `BeanInfo`, obtained using reflection, defines the schema of the table. Spark SQL supports
fields that contain `List`, `Array`, `Map` or a nested JavaBean. JavaBeans are also supported as collection elements.
You can create a JavaBean by creating a class that implements
Serializable and has getters and setters for all of its fields.

{% include_example schema_inferring java/org/apache/spark/examples/sql/JavaSparkSQLExample.java %}
Expand Down
48 changes: 44 additions & 4 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package org.apache.spark.sql

import java.lang.reflect.{Array => JavaArray, ParameterizedType, Type}
import java.util.Properties

import scala.collection.JavaConverters._
import scala.collection.immutable
import scala.reflect.runtime.universe.TypeTag

Expand All @@ -30,6 +32,7 @@ import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData}
import org.apache.spark.sql.execution.command.ShowTablesCommand
import org.apache.spark.sql.internal.{SessionState, SharedState, SQLConf}
import org.apache.spark.sql.sources.BaseRelation
Expand Down Expand Up @@ -1098,12 +1101,20 @@ object SQLContext {
data: Iterator[_],
beanClass: Class[_],
attrs: Seq[AttributeReference]): Iterator[InternalRow] = {
def interfaceParameters(t: Type, interface: Class[_], dataType: DataType): Array[Type] =
t match {
case parType: ParameterizedType if parType.getRawType == interface =>
parType.getActualTypeArguments
case _ => throw new UnsupportedOperationException(
s"Type ${t.getTypeName} is not supported for data type ${dataType.simpleString}. " +
s"Expected ${interface.getName}")
}
def createStructConverter(cls: Class[_], fieldTypes: Seq[DataType]): Any => InternalRow = {
val methodConverters =
JavaTypeInference.getJavaBeanReadableProperties(cls).zip(fieldTypes)
.map { case (property, fieldType) =>
val method = property.getReadMethod
method -> createConverter(method.getReturnType, fieldType)
method -> createConverter(method.getGenericReturnType, fieldType)
}
value =>
if (value == null) {
Expand All @@ -1115,9 +1126,38 @@ object SQLContext {
})
}
}
def createConverter(cls: Class[_], dataType: DataType): Any => Any = dataType match {
case struct: StructType => createStructConverter(cls, struct.map(_.dataType))
case _ => CatalystTypeConverters.createToCatalystConverter(dataType)
def createConverter(t: Type, dataType: DataType): Any => Any = (t, dataType) match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, how about we put this method in CatalystTypeConverters? Looks it is a Catalyst converter for beans. Few Java types like java.lang.Iterable, java.math.BigDecimal and java.math.BigInteger are being handled there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay to move this to CatalystTypeConverters , but note that unfortunately seems like CatalystTypeConverters doesn't work properly with nested beans as we are trying to support it here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea .. was just thinking of moving this func to there .. looks ugly that this file getting long.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I took a quick look at CatalystTypeConverters and I believe there would be a problem in not being able to reliably distinguish Java beans from other arbitrary classes. We might use setters or set fields directly to objects which would not be prepared for such manipulation, potentially creating hard to find errors. This method already assumes a Java bean so that problem is not present here. Isn't that so?

case (cls: Class[_], struct: StructType) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait .. can we reuse JavaTypeInference.serializerFor and make a projection, rather then reimplementing whole logics here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// TODO: we should only collect properties that have getter and setter. However, some tests
// pass in scala case class as java bean class which doesn't have getter and setter.

We should drop the support for getter or setter only. adding @cloud-fan here as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reusing JavaTypeInference.serializerFor would be great, but currently it behaves a little differently. At least it doesn't support java.lang.Iterable[_], so we can't use it immediately. We need to extend it to support Iterable (and also deserializerFor).

Copy link
Member

@HyukjinKwon HyukjinKwon Oct 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, how about we fix them together while we are here?

I also checked another difference which is beans without getter and/or setter but I think this is something we should fix in 3.0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Frankly, I was not really sure about serializing sets as arrays as the result stops behaving like a set, but I found a PR (#18416) where this seems to have been permitted, so I will go ahead and add that.

// bean type
createStructConverter(cls, struct.map(_.dataType))
case (arrayType: Class[_], array: ArrayType) if arrayType.isArray =>
// array type
val converter = createConverter(arrayType.getComponentType, array.elementType)
value => new GenericArrayData(
(0 until JavaArray.getLength(value)).map(i =>
converter(JavaArray.get(value, i))).toArray)
case (_, array: ArrayType) =>
Copy link
Member

@viirya viirya Oct 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add few code comments explaining why having two cases both for ArrayType?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I should have added a check for cls.isArray in the array case. That would make it clearer. I will also add a comment to each case with the actual type expected for that conversion.

// java.util.List type
val cls = classOf[java.util.List[_]]
Copy link
Member

@ueshin ueshin Oct 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like JavaTypeInference.inferDataType() supports java.lang.Iterable, not only List, but serializer/deserializer don't. I'm not sure whether we should change inferDataType(). This issue would be in a separate pr anyway, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you are right. It should be better to change it to avoid confusion. I also agree with a separate PR for that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thoughts, we should use java.lang.Iterable here. We can convert Iterable to ArrayType as ArrayConverter is trying. If we use java.util.List here, it leads behavior changes for list of primitives.

val params = interfaceParameters(t, cls, dataType)
val converter = createConverter(params(0), array.elementType)
value => new GenericArrayData(
value.asInstanceOf[java.util.List[_]].asScala.map(converter).toArray)
case (_, map: MapType) =>
// java.util.Map type
val cls = classOf[java.util.Map[_, _]]
val params = interfaceParameters(t, cls, dataType)
val keyConverter = createConverter(params(0), map.keyType)
val valueConverter = createConverter(params(1), map.valueType)
value => {
val (keys, values) = value.asInstanceOf[java.util.Map[_, _]].asScala.unzip[Any, Any]
new ArrayBasedMapData(
new GenericArrayData(keys.map(keyConverter).toArray),
new GenericArrayData(values.map(valueConverter).toArray))
}
case _ =>
// other types
CatalystTypeConverters.createToCatalystConverter(dataType)
}
val dataConverter = createStructConverter(beanClass, attrs.map(_.dataType))
data.map(dataConverter)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ public static class Bean implements Serializable {
private BigInteger e = new BigInteger("1234567");
private NestedBean f = new NestedBean();
private NestedBean g = null;
private NestedBean[] h = new NestedBean[] { new NestedBean() };
private List<NestedBean> i = Collections.singletonList(new NestedBean());
private Map<Integer, NestedBean> j = Collections.singletonMap(1, new NestedBean());

public double getA() {
return a;
Expand Down Expand Up @@ -163,6 +166,18 @@ public NestedBean getG() {
return g;
}

public NestedBean[] getH() {
return h;
}

public List<NestedBean> getI() {
return i;
}

public Map<Integer, NestedBean> getJ() {
return j;
}

public static class NestedBean implements Serializable {
private int a = 1;

Expand Down Expand Up @@ -196,7 +211,18 @@ void validateDataFrameWithBeans(Bean bean, Dataset<Row> df) {
schema.apply("f"));
Assert.assertEquals(new StructField("g", nestedBeanType, true, Metadata.empty()),
schema.apply("g"));
Row first = df.select("a", "b", "c", "d", "e", "f", "g").first();
ArrayType nestedBeanTypeList = new ArrayType(nestedBeanType, true);
Assert.assertEquals(
new StructField("h", nestedBeanTypeList, true, Metadata.empty()),
schema.apply("h"));
Assert.assertEquals(
new StructField("i", nestedBeanTypeList, true, Metadata.empty()),
schema.apply("i"));
Assert.assertEquals(
new StructField("j", new MapType(IntegerType$.MODULE$, nestedBeanType, true),
true, Metadata.empty()),
schema.apply("j"));
Row first = df.select("a", "b", "c", "d", "e", "f", "g", "h", "i", "j").first();
Assert.assertEquals(bean.getA(), first.getDouble(0), 0.0);
// Now Java lists and maps are converted to Scala Seq's and Map's. Once we get a Seq below,
// verify that it has the expected length, and contains expected elements.
Expand All @@ -220,6 +246,21 @@ void validateDataFrameWithBeans(Bean bean, Dataset<Row> df) {
Row nested = first.getStruct(5);
Assert.assertEquals(bean.getF().getA(), nested.getInt(0));
Assert.assertTrue(first.isNullAt(6));
List<Row> nestedList = first.getList(7);
Assert.assertEquals(bean.getH().length, nestedList.size());
for (int i = 0; i < bean.getH().length; ++i) {
Assert.assertEquals(bean.getH()[i].getA(), nestedList.get(i).getInt(0));
}
nestedList = first.getList(8);
Assert.assertEquals(bean.getI().size(), nestedList.size());
for (int i = 0; i < bean.getI().size(); ++i) {
Assert.assertEquals(bean.getI().get(i).getA(), nestedList.get(i).getInt(0));
}
Map<Integer, Row> nestedMap = first.getJavaMap(9);
Assert.assertEquals(bean.getJ().size(), nestedMap.size());
for (Map.Entry<Integer, Bean.NestedBean> entry : bean.getJ().entrySet()) {
Assert.assertEquals(entry.getValue().getA(), nestedMap.get(entry.getKey()).getInt(0));
}
}

@Test
Expand Down