From f6472c79e5ff693f805e9a97452e5e2134f63d0c Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Fri, 17 Feb 2017 18:05:02 +0900 Subject: [PATCH 1/3] Add from_json in FunctionRegistry --- .../catalyst/analysis/FunctionRegistry.scala | 1 + .../expressions/jsonExpressions.scala | 36 +++++- .../sql-tests/inputs/json-functions.sql | 13 +++ .../sql-tests/results/json-functions.sql.out | 107 +++++++++++++++++- .../apache/spark/sql/JsonFunctionsSuite.scala | 35 +++++- 5 files changed, 186 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 0dcb44081f608..0486e67dbdf86 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -426,6 +426,7 @@ object FunctionRegistry { // json expression[StructToJson]("to_json"), + expression[JsonToStruct]("from_json"), // Cast aliases (SPARK-16730) castAlias("boolean", BooleanType), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 18b5f2f7ed2e8..a63bbb0f7a35b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -26,6 +26,7 @@ import com.fasterxml.jackson.core._ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, ParseModes} @@ -483,6 +484,17 @@ case class JsonTuple(children: Seq[Expression]) /** * Converts an json input string to a [[StructType]] or [[ArrayType]] with the specified schema. */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`.", + extended = """ + Examples: + > SELECT _FUNC_('{"a":1}', 'a INT'); + {"a":1} + > SELECT _FUNC_('{"time":"26/08/2015"}', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy')); + {"time":"2015-08-26 00:00:00.0"} + """) +// scalastyle:on line.size.limit case class JsonToStruct( schema: DataType, options: Map[String, String], @@ -494,6 +506,21 @@ case class JsonToStruct( def this(schema: DataType, options: Map[String, String], child: Expression) = this(schema, options, child, None) + // Used in `FunctionRegistry` + def this(child: Expression, schema: Expression) = + this( + schema = JsonExprUtils.validateSchemaLiteral(schema), + options = Map.empty[String, String], + child = child, + timeZoneId = None) + + def this(child: Expression, schema: Expression, options: Expression) = + this( + schema = JsonExprUtils.validateSchemaLiteral(schema), + options = JsonExprUtils.convertToMapData(options), + child = child, + timeZoneId = None) + override def checkInputDataTypes(): TypeCheckResult = schema match { case _: StructType | ArrayType(_: StructType, _) => super.checkInputDataTypes() @@ -589,7 +616,7 @@ case class StructToJson( def this(child: Expression) = this(Map.empty, child, None) def this(child: Expression, options: Expression) = this( - options = StructToJson.convertToMapData(options), + options = JsonExprUtils.convertToMapData(options), child = child, timeZoneId = None) @@ -634,7 +661,12 @@ case class StructToJson( override def inputTypes: Seq[AbstractDataType] = StructType :: Nil } -object StructToJson { +object JsonExprUtils { + + def validateSchemaLiteral(exp: Expression): StructType = exp match { + case Literal(s, StringType) => CatalystSqlParser.parseTableSchema(s.toString) + case e => throw new AnalysisException(s"Must be a string literal, but: $e") + } def convertToMapData(exp: Expression): Map[String, String] = exp match { case m: CreateMap diff --git a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql index 9308560451bf5..83243c5e5a12f 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql @@ -5,4 +5,17 @@ select to_json(named_struct('a', 1, 'b', 2)); select to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy')); -- Check if errors handled select to_json(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE')); +select to_json(named_struct('a', 1, 'b', 2), map('mode', 1)); select to_json(); + +-- from_json +describe function from_json; +describe function extended from_json; +select from_json('{"a":1}', 'a INT'); +select from_json('{"time":"26/08/2015"}', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy')); +-- Check if errors handled +select from_json('{"a":1}', 1); +select from_json('{"a":1}', 'a InvalidType'); +select from_json('{"a":1}', 'a INT', named_struct('mode', 'PERMISSIVE')); +select from_json('{"a":1}', 'a INT', map('mode', 1)); +select from_json(); diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out index d8aa4fb9fa788..52a0214b1f1cd 100644 --- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 6 +-- Number of queries: 16 -- !query 0 @@ -55,9 +55,112 @@ Must use a map() function for options;; line 1 pos 7 -- !query 5 -select to_json() +select to_json(named_struct('a', 1, 'b', 2), map('mode', 1)) -- !query 5 schema struct<> -- !query 5 output org.apache.spark.sql.AnalysisException +A type of keys and values in map() must be string, but got MapType(StringType,IntegerType,false);; line 1 pos 7 + + +-- !query 6 +select to_json() +-- !query 6 schema +struct<> +-- !query 6 output +org.apache.spark.sql.AnalysisException Invalid number of arguments for function to_json; line 1 pos 7 + + +-- !query 7 +describe function from_json +-- !query 7 schema +struct +-- !query 7 output +Class: org.apache.spark.sql.catalyst.expressions.JsonToStruct +Function: from_json +Usage: from_json(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`. + + +-- !query 8 +describe function extended from_json +-- !query 8 schema +struct +-- !query 8 output +Class: org.apache.spark.sql.catalyst.expressions.JsonToStruct +Extended Usage: + Examples: + > SELECT from_json('{"a":1}', 'a INT'); + {"a":1} + > SELECT from_json('{"time":"26/08/2015"}', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy')); + {"time":"2015-08-26 00:00:00.0"} + +Function: from_json +Usage: from_json(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`. + + +-- !query 9 +select from_json('{"a":1}', 'a INT') +-- !query 9 schema +struct> +-- !query 9 output +{"a":1} + + +-- !query 10 +select from_json('{"time":"26/08/2015"}', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy')) +-- !query 10 schema +struct> +-- !query 10 output +{"time":2015-08-26 00:00:00.0} + + +-- !query 11 +select from_json('{"a":1}', 1) +-- !query 11 schema +struct<> +-- !query 11 output +org.apache.spark.sql.AnalysisException +Must be a string literal, but: 1;; line 1 pos 7 + + +-- !query 12 +select from_json('{"a":1}', 'a InvalidType') +-- !query 12 schema +struct<> +-- !query 12 output +org.apache.spark.sql.AnalysisException + +DataType invalidtype() is not supported.(line 1, pos 2) + +== SQL == +a InvalidType +--^^^ +; line 1 pos 7 + + +-- !query 13 +select from_json('{"a":1}', 'a INT', named_struct('mode', 'PERMISSIVE')) +-- !query 13 schema +struct<> +-- !query 13 output +org.apache.spark.sql.AnalysisException +Must use a map() function for options;; line 1 pos 7 + + +-- !query 14 +select from_json('{"a":1}', 'a INT', map('mode', 1)) +-- !query 14 schema +struct<> +-- !query 14 output +org.apache.spark.sql.AnalysisException +A type of keys and values in map() must be string, but got MapType(StringType,IntegerType,false);; line 1 pos 7 + + +-- !query 15 +select from_json() +-- !query 15 schema +struct<> +-- !query 15 output +org.apache.spark.sql.AnalysisException +Invalid number of arguments for function from_json; line 1 pos 7 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index cdea3b9a0f79f..17f39fd9b588c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -202,12 +202,12 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { val df1 = Seq(Tuple1(Tuple1(1))).toDF("a") checkAnswer( df1.selectExpr("to_json(a)"), - Row("""{"_1":1}""") :: Nil) + Row( """{"_1":1}""") :: Nil) val df2 = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))).toDF("a") checkAnswer( df2.selectExpr("to_json(a, map('timestampFormat', 'dd/MM/yyyy HH:mm'))"), - Row("""{"_1":"26/08/2015 18:00"}""") :: Nil) + Row( """{"_1":"26/08/2015 18:00"}""") :: Nil) val errMsg1 = intercept[AnalysisException] { df2.selectExpr("to_json(a, named_struct('a', 1))") @@ -220,4 +220,35 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { assert(errMsg2.getMessage.startsWith( "A type of keys and values in map() must be string, but got")) } + + test("SPARK-19967 Support from_json in SQL") { + val df1 = Seq("""{"a": 1}""").toDS() + checkAnswer( + df1.selectExpr(s"from_json(value, 'a INT')"), + Row(Row(1)) :: Nil) + + val df2 = Seq("""{"time": "26/08/2015 18:00"}""").toDS() + checkAnswer( + df2.selectExpr( + s"from_json(value, 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy HH:mm'))"), + Row(Row(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))) + + val errMsg1 = intercept[AnalysisException] { + df2.selectExpr(s"from_json(value, 1)") + } + assert(errMsg1.getMessage.startsWith("Must be a string literal, but:")) + val errMsg2 = intercept[AnalysisException] { + df2.selectExpr(s"""from_json(value, 'time InvalidType')""") + } + assert(errMsg2.getMessage.contains("DataType invalidtype() is not supported")) + val errMsg3 = intercept[AnalysisException] { + df2.selectExpr(s"from_json(value, 'time Timestamp', named_struct('a', 1))") + } + assert(errMsg3.getMessage.startsWith("Must use a map() function for options")) + val errMsg4 = intercept[AnalysisException] { + df2.selectExpr(s"from_json(value, 'time Timestamp', map('a', 1))") + } + assert(errMsg4.getMessage.startsWith( + "A type of keys and values in map() must be string, but got")) + } } From 439c6f7c4f15eaf3b6660e9ff2d7fbdf9e953e27 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Fri, 17 Mar 2017 10:14:14 +0900 Subject: [PATCH 2/3] Apply comments --- .../expressions/jsonExpressions.scala | 6 ++--- .../apache/spark/sql/JsonFunctionsSuite.scala | 27 +++++++++++-------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index a63bbb0f7a35b..37e4bb5060436 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -489,8 +489,8 @@ case class JsonTuple(children: Seq[Expression]) usage = "_FUNC_(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`.", extended = """ Examples: - > SELECT _FUNC_('{"a":1}', 'a INT'); - {"a":1} + > SELECT _FUNC_('{"a":1, "b":0.8}', 'a INT, b DOUBLE'); + {"a":1, "b":0.8} > SELECT _FUNC_('{"time":"26/08/2015"}', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy')); {"time":"2015-08-26 00:00:00.0"} """) @@ -665,7 +665,7 @@ object JsonExprUtils { def validateSchemaLiteral(exp: Expression): StructType = exp match { case Literal(s, StringType) => CatalystSqlParser.parseTableSchema(s.toString) - case e => throw new AnalysisException(s"Must be a string literal, but: $e") + case e => throw new AnalysisException(s"Expected a string literal instead of $e") } def convertToMapData(exp: Expression): Map[String, String] = exp match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 17f39fd9b588c..2345b82081161 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -202,12 +202,12 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { val df1 = Seq(Tuple1(Tuple1(1))).toDF("a") checkAnswer( df1.selectExpr("to_json(a)"), - Row( """{"_1":1}""") :: Nil) + Row("""{"_1":1}""") :: Nil) val df2 = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))).toDF("a") checkAnswer( df2.selectExpr("to_json(a, map('timestampFormat', 'dd/MM/yyyy HH:mm'))"), - Row( """{"_1":"26/08/2015 18:00"}""") :: Nil) + Row("""{"_1":"26/08/2015 18:00"}""") :: Nil) val errMsg1 = intercept[AnalysisException] { df2.selectExpr("to_json(a, named_struct('a', 1))") @@ -224,29 +224,34 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { test("SPARK-19967 Support from_json in SQL") { val df1 = Seq("""{"a": 1}""").toDS() checkAnswer( - df1.selectExpr(s"from_json(value, 'a INT')"), + df1.selectExpr("from_json(value, 'a INT')"), Row(Row(1)) :: Nil) - val df2 = Seq("""{"time": "26/08/2015 18:00"}""").toDS() + val df2 = Seq("""{"c0": "a", "c1": 1, "c2": {"c20": 3.8, "c21": 8}}""").toDS() + checkAnswer( + df2.selectExpr("from_json(value, 'c0 STRING, c1 INT, c2 STRUCT')"), + Row(Row("a", 1, Row(3.8, 8))) :: Nil) + + val df3 = Seq("""{"time": "26/08/2015 18:00"}""").toDS() checkAnswer( - df2.selectExpr( - s"from_json(value, 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy HH:mm'))"), + df3.selectExpr( + "from_json(value, 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy HH:mm'))"), Row(Row(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))) val errMsg1 = intercept[AnalysisException] { - df2.selectExpr(s"from_json(value, 1)") + df3.selectExpr("from_json(value, 1)") } - assert(errMsg1.getMessage.startsWith("Must be a string literal, but:")) + assert(errMsg1.getMessage.startsWith("Expected a string literal instead of")) val errMsg2 = intercept[AnalysisException] { - df2.selectExpr(s"""from_json(value, 'time InvalidType')""") + df3.selectExpr("""from_json(value, 'time InvalidType')""") } assert(errMsg2.getMessage.contains("DataType invalidtype() is not supported")) val errMsg3 = intercept[AnalysisException] { - df2.selectExpr(s"from_json(value, 'time Timestamp', named_struct('a', 1))") + df3.selectExpr("from_json(value, 'time Timestamp', named_struct('a', 1))") } assert(errMsg3.getMessage.startsWith("Must use a map() function for options")) val errMsg4 = intercept[AnalysisException] { - df2.selectExpr(s"from_json(value, 'time Timestamp', map('a', 1))") + df3.selectExpr("from_json(value, 'time Timestamp', map('a', 1))") } assert(errMsg4.getMessage.startsWith( "A type of keys and values in map() must be string, but got")) From ce39a9dae6d322d0b800b260b9a4822d9e0e1f1d Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Fri, 17 Mar 2017 13:07:28 +0900 Subject: [PATCH 3/3] Update test cases --- .../test/resources/sql-tests/results/json-functions.sql.out | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out index 52a0214b1f1cd..b57cbbc1d843b 100644 --- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out @@ -90,8 +90,8 @@ struct Class: org.apache.spark.sql.catalyst.expressions.JsonToStruct Extended Usage: Examples: - > SELECT from_json('{"a":1}', 'a INT'); - {"a":1} + > SELECT from_json('{"a":1, "b":0.8}', 'a INT, b DOUBLE'); + {"a":1, "b":0.8} > SELECT from_json('{"time":"26/08/2015"}', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy')); {"time":"2015-08-26 00:00:00.0"} @@ -121,7 +121,7 @@ select from_json('{"a":1}', 1) struct<> -- !query 11 output org.apache.spark.sql.AnalysisException -Must be a string literal, but: 1;; line 1 pos 7 +Expected a string literal instead of 1;; line 1 pos 7 -- !query 12