Skip to content

Commit

Permalink
Support nested JavaBeans in SparkValueWriter
Browse files Browse the repository at this point in the history
ScalaValueHandler now intercepts JavaValueHandler's recursion calls and attempts to
handle the value before handing it back to the JavaValueHandler. Nested Beans works
now, and correctly reports serialization errors.

fixes #1021, relates #1019
  • Loading branch information
jbaiera committed Jul 24, 2017
1 parent 56582fd commit 5f7f7b1
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 14 deletions.
Expand Up @@ -40,6 +40,10 @@ public boolean isSuccesful() {
return SUCCESFUL == this;
}

public Object getUnknownValue() {
return unknownValue;
}

public static Result SUCCESFUL() {
return SUCCESFUL;
}
Expand Down
Expand Up @@ -37,13 +37,18 @@ class ScalaValueWriter(writeUnknownTypes: Boolean = false) extends JdkValueWrite
doWrite(value, generator, true)
}


override protected def doWrite(value: Any, generator: Generator, parentField: String): Result = {
doWrite(value, generator, true)
}

private def doWrite(value: Any, generator: Generator, acceptsJavaBeans: Boolean): Result = {
value match {
case null | None | () => generator.writeNull()
case Nil =>
generator.writeBeginArray(); generator.writeEndArray()

case Some(s: AnyRef) => return doWrite(s, generator, false)
case Some(s: AnyRef) => return doWrite(s, generator, true)

case m: Map[_, _] => {
generator.writeBeginObject()
Expand All @@ -62,7 +67,7 @@ class ScalaValueWriter(writeUnknownTypes: Boolean = false) extends JdkValueWrite
case i: Traversable[_] => {
generator.writeBeginArray()
for (v <- i) {
val result = doWrite(v, generator, false)
val result = doWrite(v, generator, true)
if (!result.isSuccesful()) {
return result
}
Expand All @@ -77,7 +82,7 @@ class ScalaValueWriter(writeUnknownTypes: Boolean = false) extends JdkValueWrite
case i: Array[_] => {
generator.writeBeginArray()
for (v <- i) {
val result = doWrite(v, generator, false)
val result = doWrite(v, generator, true)
if (!result.isSuccesful()) {
return result
}
Expand All @@ -88,15 +93,15 @@ class ScalaValueWriter(writeUnknownTypes: Boolean = false) extends JdkValueWrite
case p: Product => {
// handle case class
if (RU.isCaseClass(p)) {
val result = doWrite(RU.caseClassValues(p), generator, false)
val result = doWrite(RU.caseClassValues(p), generator, true)
if (!result.isSuccesful()) {
return result
}
} // normal product - treat it as a list/array
else {
generator.writeBeginArray()
for (t <- p.productIterator) {
val result = doWrite(t.asInstanceOf[AnyRef], generator, false)
val result = doWrite(t.asInstanceOf[AnyRef], generator, true)
if (!result.isSuccesful()) {
return result
}
Expand All @@ -111,13 +116,23 @@ class ScalaValueWriter(writeUnknownTypes: Boolean = false) extends JdkValueWrite
throw new EsHadoopIllegalArgumentException("Spark SQL types are not handled through basic RDD saveToEs() calls; typically this is a mistake(as the SQL schema will be ignored). Use 'org.elasticsearch.spark.sql' package instead")
}

// normal JDK types failed, try the JavaBean last
val result = super.write(value, generator)
if (!result.isSuccesful()) {
val result = super.doWrite(value, generator, generator.getParentPath)

// Normal JDK types failed, try the JavaBean last. The JavaBean logic accepts just about
// anything, even if it's not a real java bean. Check to see if the value that failed
// is the same value we're about to treat as a bean. If the failed value is not the current
// value, then the last call probably failed on a subfield of the current value that
// couldn't be serialized; There's a chance that we could treat a container object (map,
// list) like a java bean, which is improper. In these cases we should skip the javabean
// handling and just return the result
if (!result.isSuccesful() && result.getUnknownValue == value) {
if (acceptsJavaBeans && RU.isJavaBean(value)) {
return doWrite(RU.javaBeanAsMap(value), generator, false)
} else
return doWrite(RU.javaBeanAsMap(value), generator, true)
} else {
return result
}
} else {
return result
}
}
}
Expand Down
Expand Up @@ -19,26 +19,42 @@
package org.elasticsearch.spark.serialization

import org.elasticsearch.hadoop.serialization.json.JacksonJsonGenerator
import org.junit.Assert._
import org.junit.Test
import org.junit.Assert._
import org.hamcrest.Matchers._
import java.io.ByteArrayOutputStream

import org.elasticsearch.hadoop.cfg.ConfigurationOptions
import org.elasticsearch.hadoop.cfg.Settings
import org.elasticsearch.hadoop.serialization.EsHadoopSerializationException
import org.elasticsearch.hadoop.util.TestSettings
import org.elasticsearch.spark.serialization.testbeans.Contact
import org.elasticsearch.spark.serialization.testbeans.ContactBook

class ScalaValueWriterTest {

private def serialize(value: AnyRef): String = {
private def serialize(value: AnyRef): String = serialize(value, null)

private def serialize(value: AnyRef, settings: Settings): String = {
val out = new ByteArrayOutputStream()
val generator = new JacksonJsonGenerator(out)

val writer = new ScalaValueWriter()
writer.write(value, generator)
if (settings != null) {
writer.setSettings(settings)
}
val result = writer.write(value, generator)
if (result.isSuccesful == false) {
throw new EsHadoopSerializationException("Could not serialize [" + result.getUnknownValue + "]")
}
generator.flush()

new String(out.toByteArray)
}

case class SimpleCaseClass(s: String)
class Garbage(i: Int) {
def doNothing(): Unit = ()
}

@Test
def testSimpleMap() {
Expand Down Expand Up @@ -75,4 +91,19 @@ class ScalaValueWriterTest {
assertEquals("""{"p":{"s":"bar"}}""", serialize(Map("p" -> SimpleCaseClass("bar"))))
}

@Test
def testNestedJavaBean(): Unit = {
val contacts = new java.util.LinkedHashMap[String, Contact]()
contacts.put("Benny", new Contact("Benny", "Some guy"))
contacts.put("The Jets", new Contact("The Jets", "Benny's associates"))
assertEquals("""{"contacts":{"Benny":{"name":"Benny","relation":"Some guy"},"The Jets":{"name":"The Jets","relation":"Benny's associates"}},"owner":"me"}""", serialize(new ContactBook("me", contacts)))
}

@Test(expected = classOf[EsHadoopSerializationException])
def testMapWithInvalidObject(): Unit = {
val map = new java.util.HashMap[String, Object]()
map.put("test", new Garbage(42))
serialize(map)
}

}
@@ -0,0 +1,36 @@
package org.elasticsearch.spark.serialization.testbeans;

import java.io.Serializable;

/**
* Bean object with contact info for tests.
*/
public class Contact implements Serializable {

private String name;
private String relation;

public Contact() {
}

public Contact(String name, String relation) {
this.name = name;
this.relation = relation;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getRelation() {
return relation;
}

public void setRelation(String relation) {
this.relation = relation;
}
}
@@ -0,0 +1,38 @@
package org.elasticsearch.spark.serialization.testbeans;

import java.io.Serializable;
import java.util.Map;

/**
* A collection of contacts
*/
public class ContactBook implements Serializable {

private String owner;
private Map<String, Contact> contacts;

public ContactBook() {

}

public ContactBook(String owner, Map<String, Contact> contacts) {
this.owner = owner;
this.contacts = contacts;
}

public String getOwner() {
return owner;
}

public void setOwner(String owner) {
this.owner = owner;
}

public Map<String, Contact> getContacts() {
return contacts;
}

public void setContacts(Map<String, Contact> contacts) {
this.contacts = contacts;
}
}

0 comments on commit 5f7f7b1

Please sign in to comment.