Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d6d19e9
pr example
dwmclary Jul 22, 2014
5d34e37
merge resolved
dwmclary Oct 10, 2014
f7d166a
added toJSON method
dwmclary Nov 11, 2014
626a5b1
added toJSON to SchemaRDD
dwmclary Nov 11, 2014
424f130
tests pass, ready for pull and PR
dwmclary Nov 11, 2014
319e3ba
updated to upstream master with merged SPARK-4228
dwmclary Nov 11, 2014
1d171aa
upated missing brace on if statement
dwmclary Nov 11, 2014
aaeba58
added toJSON to pyspark SchemaRDD
dwmclary Nov 12, 2014
6c94a54
added toJSON to pyspark SchemaRDD
dwmclary Nov 12, 2014
5e5eb1b
switched to Jackson for JSON processing
dwmclary Nov 15, 2014
149dafd
wrapped scala toJSON in sql.py
dwmclary Nov 15, 2014
4d11c0c
JsonFactory rewrite of toJSON for SchemaRDD
dwmclary Nov 15, 2014
6af72d1
deleted extaneous comment
dwmclary Nov 15, 2014
11d2016
formatting and unicode deserialization default fixed
dwmclary Nov 16, 2014
1b11980
Map and UserDefinedTypes partially done
dwmclary Nov 16, 2014
8f7bfb6
Map type added to SchemaRDD.toJSON
dwmclary Nov 16, 2014
4387dd5
Added UserDefinedType, cleaned up case formatting
dwmclary Nov 17, 2014
2ee1e70
moved rowToJSON to JsonRDD
dwmclary Nov 17, 2014
47ceff6
cleaned up scala style issues
dwmclary Nov 18, 2014
4a651f0
cleaned PEP and Scala style failures. Moved tests to JsonSuite
dwmclary Nov 18, 2014
1a5fd30
removing SPARK-4228 from SQLQuerySuite
dwmclary Nov 18, 2014
6598cee
adding complex type queries
dwmclary Nov 18, 2014
f9471d3
added pyspark doc and doctest
dwmclary Nov 19, 2014
cac2879
move pyspark comment and doctest to correct location
dwmclary Nov 20, 2014
d714e1d
fixed PEP 8 error
dwmclary Nov 20, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion python/pyspark/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

from pyspark.rdd import RDD
from pyspark.serializers import BatchedSerializer, AutoBatchedSerializer, PickleSerializer, \
CloudPickleSerializer
CloudPickleSerializer, UTF8Deserializer
from pyspark.storagelevel import StorageLevel
from pyspark.traceback_utils import SCCallSiteSync

Expand Down Expand Up @@ -1870,6 +1870,21 @@ def limit(self, num):
rdd = self._jschema_rdd.baseSchemaRDD().limit(num).toJavaSchemaRDD()
return SchemaRDD(rdd, self.sql_ctx)

def toJSON(self, use_unicode=False):
"""Convert a SchemaRDD into a MappedRDD of JSON documents; one document per row.

>>> srdd1 = sqlCtx.jsonRDD(json)
>>> sqlCtx.registerRDDAsTable(srdd1, "table1")
>>> srdd2 = sqlCtx.sql( "SELECT * from table1")
>>> srdd2.toJSON().take(1)[0] == '{"field1":1,"field2":"row1","field3":{"field4":11}}'
True
>>> srdd3 = sqlCtx.sql( "SELECT field3.field4 from table1")
>>> srdd3.toJSON().collect() == ['{"field4":11}', '{"field4":22}', '{"field4":33}']
True
"""
rdd = self._jschema_rdd.baseSchemaRDD().toJSON()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you put some simple tests (also will be examples in docs) here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing.

On Wed, Nov 19, 2014 at 1:34 PM, Davies Liu notifications@github.com
wrote:

In python/pyspark/sql.py:

@@ -1870,6 +1870,10 @@ def limit(self, num):
rdd = self._jschema_rdd.baseSchemaRDD().limit(num).toJavaSchemaRDD()
return SchemaRDD(rdd, self.sql_ctx)

  • def toJSON(self, use_unicode=False):
  •    rdd = self._jschema_rdd.baseSchemaRDD().toJSON()
    

Could you put some simple tests (also will be examples in docs) here?


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/3213/files#r20609235.

return RDD(rdd.toJavaRDD(), self._sc, UTF8Deserializer(use_unicode))

def saveAsParquetFile(self, path):
"""Save the contents as a Parquet file, preserving the schema.

Expand Down
24 changes: 21 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,28 @@

package org.apache.spark.sql

import java.util.{List => JList}

import org.apache.spark.api.python.SerDeUtil
import java.util.{Map => JMap, List => JList}
import java.io.StringWriter

import scala.collection.JavaConversions._

import com.fasterxml.jackson.core.JsonFactory

import net.razorvine.pickle.Pickler

import org.apache.spark.{Dependency, OneToOneDependency, Partition, Partitioner, TaskContext}
import org.apache.spark.annotation.{AlphaComponent, Experimental}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.python.SerDeUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.api.java.JavaSchemaRDD
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.types.UserDefinedType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused imports.

import org.apache.spark.sql.json.JsonRDD
import org.apache.spark.sql.execution.{LogicalRDD, EvaluatePython}
import org.apache.spark.storage.StorageLevel

Expand Down Expand Up @@ -131,6 +135,20 @@ class SchemaRDD(
*/
lazy val schema: StructType = queryExecution.analyzed.schema

/** Returns a new RDD of JSON strings, one string per row
*
* @group schema
*/
def toJSON: RDD[String] = {
val rowSchema = this.schema
this.mapPartitions { iter =>
val jsonFactory = new JsonFactory()
iter.map(JsonRDD.rowToJSON(rowSchema, jsonFactory))
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra line.

}


// =======================================================================
// Query DSL
// =======================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ class JavaSchemaRDD(

// Transformations (return a new RDD)

/**
* Return a new RDD that is the schema transformed to JSON
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returns an RDD with each row transformed to a JSON string.

*/
def toJSON(): JavaRDD[String] =
baseSchemaRDD.toJSON.toJavaRDD

/**
* Return a new RDD that is reduced into `numPartitions` partitions.
*/
Expand Down
60 changes: 60 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ package org.apache.spark.sql.json
import org.apache.spark.sql.catalyst.types.decimal.Decimal
import org.apache.spark.sql.types.util.DataTypeConversions

import java.io.StringWriter

import scala.collection.Map
import scala.collection.convert.Wrappers.{JMapWrapper, JListWrapper}
import scala.math.BigDecimal
import java.sql.{Date, Timestamp}

import com.fasterxml.jackson.core.JsonProcessingException
import com.fasterxml.jackson.core.JsonFactory
import com.fasterxml.jackson.databind.ObjectMapper

import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -424,4 +427,61 @@ private[sql] object JsonRDD extends Logging {

row
}

/** Transforms a single Row to JSON using Jackson
*
* @param jsonFactory a JsonFactory object to construct a JsonGenerator
* @param rowSchema the schema object used for conversion
* @param row The row to convert
*/
private[sql] def rowToJSON(rowSchema: StructType, jsonFactory: JsonFactory)(row: Row): String = {
val writer = new StringWriter()
val gen = jsonFactory.createGenerator(writer)

def valWriter: (DataType, Any) => Unit = {
case (_, null) | (NullType, _) => gen.writeNull()
case (StringType, v: String) => gen.writeString(v)
case (TimestampType, v: java.sql.Timestamp) => gen.writeString(v.toString)
case (IntegerType, v: Int) => gen.writeNumber(v)
case (ShortType, v: Short) => gen.writeNumber(v)
case (FloatType, v: Float) => gen.writeNumber(v)
case (DoubleType, v: Double) => gen.writeNumber(v)
case (LongType, v: Long) => gen.writeNumber(v)
case (DecimalType(), v: scala.math.BigDecimal) => gen.writeNumber(v.bigDecimal)
case (DecimalType(), v: java.math.BigDecimal) => gen.writeNumber(v)
case (ByteType, v: Byte) => gen.writeNumber(v.toInt)
case (BinaryType, v: Array[Byte]) => gen.writeBinary(v)
case (BooleanType, v: Boolean) => gen.writeBoolean(v)
case (DateType, v) => gen.writeString(v.toString)
case (udt: UserDefinedType[_], v) => valWriter(udt.sqlType, v)

case (ArrayType(ty, _), v: Seq[_] ) =>
gen.writeStartArray()
v.foreach(valWriter(ty,_))
gen.writeEndArray()

case (MapType(kv,vv, _), v: Map[_,_]) =>
gen.writeStartObject
v.foreach { p =>
gen.writeFieldName(p._1.toString)
valWriter(vv,p._2)
}
gen.writeEndObject

case (StructType(ty), v: Seq[_]) =>
gen.writeStartObject()
ty.zip(v).foreach {
case (_, null) =>
case (field, v) =>
gen.writeFieldName(field.name)
valWriter(field.dataType, v)
}
gen.writeEndObject()
}

valWriter(rowSchema, row)
gen.close()
writer.toString
}

}
122 changes: 122 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.types.decimal.Decimal
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.json.JsonRDD.{enforceCorrectType, compatibleType}
import org.apache.spark.sql.{Row, SQLConf, QueryTest}
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.test.TestSQLContext._

Expand Down Expand Up @@ -779,4 +780,125 @@ class JsonSuite extends QueryTest {
Seq(null, null, null, Seq(Seq(null, Seq(1, 2, 3)))) :: Nil
)
}

test("SPARK-4228 SchemaRDD to JSON")
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move it to the line above.

Also, can you make changes according to @marmbrus's comments on tests (round-trip our existing datasets with this code path)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm already working on it. Shouldn't take too much longer.

On Mon, Nov 17, 2014 at 5:12 PM, Yin Huai notifications@github.com wrote:

In sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala:

@@ -779,4 +780,52 @@ class JsonSuite extends QueryTest {
Seq(null, null, null, Seq(Seq(null, Seq(1, 2, 3)))) :: Nil
)
}
+

  • test("SPARK-4228 SchemaRDD to JSON")
  • {

Move it to the line above.

Also, can you make changes according to @marmbrus
https://github.com/marmbrus's comments on tests (round-trip our
existing datasets with this code path)?


Reply to this email directly or view it on GitHub
https://github.com/apache/spark/pull/3213/files#r20479487.

val schema1 = StructType(
StructField("f1", IntegerType, false) ::
StructField("f2", StringType, false) ::
StructField("f3", BooleanType, false) ::
StructField("f4", ArrayType(StringType), nullable = true) ::
StructField("f5", IntegerType, true) :: Nil)

val rowRDD1 = unparsedStrings.map { r =>
val values = r.split(",").map(_.trim)
val v5 = try values(3).toInt catch {
case _: NumberFormatException => null
}
Row(values(0).toInt, values(1), values(2).toBoolean, r.split(",").toList, v5)
}

val schemaRDD1 = applySchema(rowRDD1, schema1)
schemaRDD1.registerTempTable("applySchema1")
val schemaRDD2 = schemaRDD1.toSchemaRDD
val result = schemaRDD2.toJSON.collect()
assert(result(0) == "{\"f1\":1,\"f2\":\"A1\",\"f3\":true,\"f4\":[\"1\",\" A1\",\" true\",\" null\"]}")
assert(result(3) == "{\"f1\":4,\"f2\":\"D4\",\"f3\":true,\"f4\":[\"4\",\" D4\",\" true\",\" 2147483644\"],\"f5\":2147483644}")

val schema2 = StructType(
StructField("f1", StructType(
StructField("f11", IntegerType, false) ::
StructField("f12", BooleanType, false) :: Nil), false) ::
StructField("f2", MapType(StringType, IntegerType, true), false) :: Nil)

val rowRDD2 = unparsedStrings.map { r =>
val values = r.split(",").map(_.trim)
val v4 = try values(3).toInt catch {
case _: NumberFormatException => null
}
Row(Row(values(0).toInt, values(2).toBoolean), Map(values(1) -> v4))
}

val schemaRDD3 = applySchema(rowRDD2, schema2)
schemaRDD3.registerTempTable("applySchema2")
val schemaRDD4 = schemaRDD3.toSchemaRDD
val result2 = schemaRDD4.toJSON.collect()

assert(result2(1) == "{\"f1\":{\"f11\":2,\"f12\":false},\"f2\":{\"B2\":null}}")
assert(result2(3) == "{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}")

val jsonSchemaRDD = jsonRDD(primitiveFieldAndType)
val primTable = jsonRDD(jsonSchemaRDD.toJSON)
primTable.registerTempTable("primativeTable")
checkAnswer(
sql("select * from primativeTable"),
(BigDecimal("92233720368547758070"),
true,
1.7976931348623157E308,
10,
21474836470L,
"this is a simple string.") :: Nil
)

val complexJsonSchemaRDD = jsonRDD(complexFieldAndType1)
val compTable = jsonRDD(complexJsonSchemaRDD.toJSON)
compTable.registerTempTable("complexTable")
// Access elements of a primitive array.
checkAnswer(
sql("select arrayOfString[0], arrayOfString[1], arrayOfString[2] from complexTable"),
("str1", "str2", null) :: Nil
)

// Access an array of null values.
checkAnswer(
sql("select arrayOfNull from complexTable"),
Seq(Seq(null, null, null, null)) :: Nil
)

// Access elements of a BigInteger array (we use DecimalType internally).
checkAnswer(
sql("select arrayOfBigInteger[0], arrayOfBigInteger[1], arrayOfBigInteger[2] from complexTable"),
(BigDecimal("922337203685477580700"), BigDecimal("-922337203685477580800"), null) :: Nil
)

// Access elements of an array of arrays.
checkAnswer(
sql("select arrayOfArray1[0], arrayOfArray1[1] from complexTable"),
(Seq("1", "2", "3"), Seq("str1", "str2")) :: Nil
)

// Access elements of an array of arrays.
checkAnswer(
sql("select arrayOfArray2[0], arrayOfArray2[1] from complexTable"),
(Seq(1.0, 2.0, 3.0), Seq(1.1, 2.1, 3.1)) :: Nil
)

// Access elements of an array inside a filed with the type of ArrayType(ArrayType).
checkAnswer(
sql("select arrayOfArray1[1][1], arrayOfArray2[1][1] from complexTable"),
("str2", 2.1) :: Nil
)

// Access a struct and fields inside of it.
checkAnswer(
sql("select struct, struct.field1, struct.field2 from complexTable"),
Row(
Row(true, BigDecimal("92233720368547758070")),
true,
BigDecimal("92233720368547758070")) :: Nil
)

// Access an array field of a struct.
checkAnswer(
sql("select structWithArrayFields.field1, structWithArrayFields.field2 from complexTable"),
(Seq(4, 5, 6), Seq("str1", "str2")) :: Nil
)

// Access elements of an array field of a struct.
checkAnswer(
sql("select structWithArrayFields.field1[1], structWithArrayFields.field2[3] from complexTable"),
(5, null) :: Nil
)

}
}