From ffa92ba187eb81f20dc25f139c674bd4a13baac5 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Fri, 17 Feb 2017 18:05:02 +0900 Subject: [PATCH 01/10] Add from_json/to_json in FunctionRegistry --- .../catalyst/analysis/FunctionRegistry.scala | 4 ++ .../expressions/jsonExpressions.scala | 39 ++++++++++++++++++ .../sql/catalyst/json/JacksonUtils.scala | 35 ++++++++++++++++ .../apache/spark/sql/JsonFunctionsSuite.scala | 40 +++++++++++++++++++ 4 files changed, 118 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 9c9465f6b8def..7c981331303bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -421,6 +421,10 @@ object FunctionRegistry { expression[BitwiseOr]("|"), expression[BitwiseXor]("^"), + // json + expression[StructToJson]("to_json"), + expression[JsonToStruct]("from_json"), + // Cast aliases (SPARK-16730) castAlias("boolean", BooleanType), castAlias("tinyint", ByteType), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index dbff62efdddb6..8c329ad28b699 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -482,6 +482,15 @@ case class JsonTuple(children: Seq[Expression]) /** * Converts an json input string to a [[StructType]] or [[ArrayType]] with the specified schema. */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(jsonStr, schema[, options]) - Return a `StructType` value with the given `jsonStr` and `schema`.", + extended = """ + Examples: + > SELECT _FUNC_('{"a":1}', '{"type":"struct", "fields":[{"name":"a", "type":"integer", "nullable":true}]}'); + {"a":1} + """) +// scalastyle:on line.size.limit case class JsonToStruct( schema: DataType, options: Map[String, String], @@ -493,6 +502,21 @@ case class JsonToStruct( def this(schema: DataType, options: Map[String, String], child: Expression) = this(schema, options, child, None) + // Used in `FunctionRegistry` + def this(child: Expression, schema: Expression) = + this( + schema = JacksonUtils.validateSchemaLiteral(schema), + options = Map.empty[String, String], + child = child, + timeZoneId = None) + + def this(child: Expression, schema: Expression, options: Expression) = + this( + schema = JacksonUtils.validateSchemaLiteral(schema), + options = JacksonUtils.validateOptionsLiteral(options), + child = child, + timeZoneId = None) + override def checkInputDataTypes(): TypeCheckResult = schema match { case _: StructType | ArrayType(_: StructType, _) => super.checkInputDataTypes() @@ -564,6 +588,13 @@ case class JsonToStruct( /** * Converts a [[StructType]] to a json output string. */ +@ExpressionDescription( + usage = "_FUNC_(expr[, options]) - Return a json string with a given `StructType` value", + extended = """ + Examples: + > SELECT _FUNC_(named_struct('a', 1, 'b', 2)); + {"a":1,"b":2} + """) case class StructToJson( options: Map[String, String], child: Expression, @@ -573,6 +604,14 @@ case class StructToJson( def this(options: Map[String, String], child: Expression) = this(options, child, None) + // Used in `FunctionRegistry` + def this(child: Expression) = this(Map.empty, child, None) + def this(child: Expression, options: Expression) = + this( + options = JacksonUtils.validateOptionsLiteral(options), + child = child, + timeZoneId = None) + @transient lazy val writer = new CharArrayWriter() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala index 3b23c6cd2816f..d7017e77fc537 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala @@ -18,7 +18,14 @@ package org.apache.spark.sql.catalyst.json import com.fasterxml.jackson.core.{JsonParser, JsonToken} +import org.json4s._ +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.types._ object JacksonUtils { @@ -55,4 +62,32 @@ object JacksonUtils { schema.foreach(field => verifyType(field.name, field.dataType)) } + + private def validateStringLiteral(exp: Expression): String = exp match { + case Literal(s, StringType) => s.toString + case e => throw new AnalysisException("Must be a string literal, but: " + e) + } + + def validateSchemaLiteral(exp: Expression): StructType = + DataType.fromJson(validateStringLiteral(exp)).asInstanceOf[StructType] + + /** + * Convert a literal including a json option string (e.g., '{"mode": "PERMISSIVE", ...}') + * to Map-type data. + */ + def validateOptionsLiteral(exp: Expression): Map[String, String] = { + val json = validateStringLiteral(exp) + parse(json) match { + case JObject(options) => + options.map { + case (key, JString(value)) => + key -> value + case _ => + throw new AnalysisException( + s"""The format must be '{"key": "value", ...}', but ${json}""") + }.toMap + case _ => + Map.empty + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 953d161ec2a1d..b8ff7d85816c4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -197,4 +197,44 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { .select(to_json($"struct").as("json")) checkAnswer(dfTwo, readBackTwo) } + + test("SPARK-19637 Support to_json/from_json in SQL") { + // to_json + val df1 = Seq(Tuple1(Tuple1(1))).toDF("a") + checkAnswer( + df1.selectExpr("to_json(a)"), + Row("""{"_1":1}""") :: Nil) + + val df2 = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))).toDF("a") + checkAnswer( + df2.selectExpr("""to_json(a, '{"timestampFormat": "dd/MM/yyyy HH:mm"}')"""), + Row("""{"_1":"26/08/2015 18:00"}""") :: Nil) + + val errMsg1 = intercept[AnalysisException] { + df2.selectExpr("""to_json(a, '{"k": [{"k": "v"}]}')""").collect + } + assert(errMsg1.getMessage.startsWith( + "The format must be '{\"key\": \"value\", ...}', but {\"k\": [{\"k\": \"v\"}]}")) + + // from_json + val df3 = Seq("""{"a": 1}""").toDS() + val schema1 = new StructType().add("a", IntegerType) + checkAnswer( + df3.selectExpr(s"from_json(value, '${schema1.json}')"), + Row(Row(1)) :: Nil) + + val df4 = Seq("""{"time": "26/08/2015 18:00"}""").toDS() + val schema2 = new StructType().add("time", TimestampType) + checkAnswer( + df4.selectExpr( + s"""from_json(value, '${schema2.json}', """ + + """'{"timestampFormat": "dd/MM/yyyy HH:mm"}')"""), + Row(Row(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))) + + val errMsg2 = intercept[AnalysisException] { + df4.selectExpr(s"""from_json(value, '${schema2.json}', '{"k": [{"k": "v"}]}')""") + } + assert(errMsg2.getMessage.startsWith( + "The format must be '{\"key\": \"value\", ...}', but {\"k\": [{\"k\": \"v\"}]}")) + } } From 7f12e948a9c2dc17380950c2658af5cbe4cde91a Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sat, 18 Feb 2017 23:38:38 +0900 Subject: [PATCH 02/10] Apply review comments --- .../sql/catalyst/json/JacksonUtils.scala | 26 +++++++------------ .../apache/spark/sql/JsonFunctionsSuite.scala | 4 +-- 2 files changed, 11 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala index d7017e77fc537..9159d5986bc89 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala @@ -17,15 +17,13 @@ package org.apache.spark.sql.catalyst.json +import scala.util.{Failure, Success, Try} + import com.fasterxml.jackson.core.{JsonParser, JsonToken} -import org.json4s._ -import org.json4s.JsonAST.JValue -import org.json4s.JsonDSL._ -import org.json4s.jackson.JsonMethods._ +import org.json4s.jackson.JsonMethods.parse import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} import org.apache.spark.sql.types._ object JacksonUtils { @@ -76,18 +74,12 @@ object JacksonUtils { * to Map-type data. */ def validateOptionsLiteral(exp: Expression): Map[String, String] = { + implicit val formats = org.json4s.DefaultFormats val json = validateStringLiteral(exp) - parse(json) match { - case JObject(options) => - options.map { - case (key, JString(value)) => - key -> value - case _ => - throw new AnalysisException( - s"""The format must be '{"key": "value", ...}', but ${json}""") - }.toMap - case _ => - Map.empty + Try(parse(json).extract[Map[String, String]]) match { + case Success(m) => m + case Failure(_) => + throw new AnalysisException(s"""The format must be '{"key": "value", ...}', but ${json}"""") } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index b8ff7d85816c4..7ea5eec42f898 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -214,7 +214,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { df2.selectExpr("""to_json(a, '{"k": [{"k": "v"}]}')""").collect } assert(errMsg1.getMessage.startsWith( - "The format must be '{\"key\": \"value\", ...}', but {\"k\": [{\"k\": \"v\"}]}")) + """The format must be '{"key": "value", ...}', but {"k": [{"k": "v"}]}""")) // from_json val df3 = Seq("""{"a": 1}""").toDS() @@ -235,6 +235,6 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { df4.selectExpr(s"""from_json(value, '${schema2.json}', '{"k": [{"k": "v"}]}')""") } assert(errMsg2.getMessage.startsWith( - "The format must be '{\"key\": \"value\", ...}', but {\"k\": [{\"k\": \"v\"}]}")) + """The format must be '{"key": "value", ...}', but {"k": [{"k": "v"}]}""")) } } From a960dfd737fdefe171ce8aac85cc3ca70241b537 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 20 Feb 2017 13:27:51 +0900 Subject: [PATCH 03/10] Apply review comments --- .../spark/sql/catalyst/expressions/jsonExpressions.scala | 4 ++-- .../org/apache/spark/sql/catalyst/json/JacksonUtils.scala | 8 +++----- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 8c329ad28b699..730351e24f3c9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -484,7 +484,7 @@ case class JsonTuple(children: Seq[Expression]) */ // scalastyle:off line.size.limit @ExpressionDescription( - usage = "_FUNC_(jsonStr, schema[, options]) - Return a `StructType` value with the given `jsonStr` and `schema`.", + usage = "_FUNC_(jsonStr, schema[, options]) - Return a struct value with the given `jsonStr` and `schema`.", extended = """ Examples: > SELECT _FUNC_('{"a":1}', '{"type":"struct", "fields":[{"name":"a", "type":"integer", "nullable":true}]}'); @@ -589,7 +589,7 @@ case class JsonToStruct( * Converts a [[StructType]] to a json output string. */ @ExpressionDescription( - usage = "_FUNC_(expr[, options]) - Return a json string with a given `StructType` value", + usage = "_FUNC_(expr[, options]) - Return a json string with a given struct value", extended = """ Examples: > SELECT _FUNC_(named_struct('a', 1, 'b', 2)); diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala index 9159d5986bc89..6fb9240fa5049 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala @@ -63,7 +63,7 @@ object JacksonUtils { private def validateStringLiteral(exp: Expression): String = exp match { case Literal(s, StringType) => s.toString - case e => throw new AnalysisException("Must be a string literal, but: " + e) + case e => throw new AnalysisException(s"Must be a string literal, but: $e") } def validateSchemaLiteral(exp: Expression): StructType = @@ -76,10 +76,8 @@ object JacksonUtils { def validateOptionsLiteral(exp: Expression): Map[String, String] = { implicit val formats = org.json4s.DefaultFormats val json = validateStringLiteral(exp) - Try(parse(json).extract[Map[String, String]]) match { - case Success(m) => m - case Failure(_) => - throw new AnalysisException(s"""The format must be '{"key": "value", ...}', but ${json}"""") + Try(parse(json).extract[Map[String, String]]).getOrElse { + throw new AnalysisException(s"""The format must be '{"key": "value", ...}', but ${json}"""") } } } From 1d494b2d184f13e21907d282d65063f384a8b407 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 20 Feb 2017 13:56:05 +0900 Subject: [PATCH 04/10] Add strToStructType in JacksonUtils --- .../spark/sql/catalyst/expressions/jsonExpressions.scala | 6 +++--- .../apache/spark/sql/catalyst/json/JacksonUtils.scala | 9 ++++++++- .../src/main/scala/org/apache/spark/sql/functions.scala | 3 ++- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 730351e24f3c9..0d4c67156d18f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -330,7 +330,7 @@ case class GetJsonObject(json: Expression, path: Expression) // scalastyle:off line.size.limit @ExpressionDescription( - usage = "_FUNC_(jsonStr, p1, p2, ..., pn) - Return a tuple like the function get_json_object, but it takes multiple names. All the input parameters and output column types are string.", + usage = "_FUNC_(jsonStr, p1, p2, ..., pn) - Returns a tuple like the function get_json_object, but it takes multiple names. All the input parameters and output column types are string.", extended = """ Examples: > SELECT _FUNC_('{"a":1, "b":2}', 'a', 'b'); @@ -484,7 +484,7 @@ case class JsonTuple(children: Seq[Expression]) */ // scalastyle:off line.size.limit @ExpressionDescription( - usage = "_FUNC_(jsonStr, schema[, options]) - Return a struct value with the given `jsonStr` and `schema`.", + usage = "_FUNC_(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`.", extended = """ Examples: > SELECT _FUNC_('{"a":1}', '{"type":"struct", "fields":[{"name":"a", "type":"integer", "nullable":true}]}'); @@ -589,7 +589,7 @@ case class JsonToStruct( * Converts a [[StructType]] to a json output string. */ @ExpressionDescription( - usage = "_FUNC_(expr[, options]) - Return a json string with a given struct value", + usage = "_FUNC_(expr[, options]) - Returns a json string with a given struct value", extended = """ Examples: > SELECT _FUNC_(named_struct('a', 1, 'b', 2)); diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala index 6fb9240fa5049..e187583e107fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala @@ -66,8 +66,15 @@ object JacksonUtils { case e => throw new AnalysisException(s"Must be a string literal, but: $e") } + def strToStructType(schemaAsJson: String): StructType = Try { + DataType.fromJson(schemaAsJson).asInstanceOf[StructType] + }.getOrElse { + throw new AnalysisException( + s"""Illegal json string for representing a schema: $schemaAsJson"""") + } + def validateSchemaLiteral(exp: Expression): StructType = - DataType.fromJson(validateStringLiteral(exp)).asInstanceOf[StructType] + strToStructType(validateStringLiteral(exp)) /** * Convert a literal including a json option string (e.g., '{"mode": "PERMISSIVE", ...}') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 201f726db3fad..ad6e1beb9f5d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedFunction} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.json.JacksonUtils import org.apache.spark.sql.catalyst.plans.logical.BroadcastHint import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.expressions.UserDefinedFunction @@ -3059,7 +3060,7 @@ object functions { * @since 2.1.0 */ def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column = - from_json(e, DataType.fromJson(schema), options) + from_json(e, JacksonUtils.strToStructType(schema), options) /** * (Scala-specific) Converts a column containing a `StructType` into a JSON string with the From 94bfe2d8650a5487619e9e2169894219fa652de5 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Mon, 20 Feb 2017 17:41:26 +0900 Subject: [PATCH 05/10] Apply review comments --- .../expressions/jsonExpressions.scala | 10 +- .../sql/catalyst/json/JacksonUtils.scala | 36 ++--- .../sql-tests/inputs/json-functions.sql | 19 +++ .../sql-tests/results/json-functions.sql.out | 142 ++++++++++++++++++ .../apache/spark/sql/JsonFunctionsSuite.scala | 23 +-- 5 files changed, 199 insertions(+), 31 deletions(-) create mode 100644 sql/core/src/test/resources/sql-tests/inputs/json-functions.sql create mode 100644 sql/core/src/test/resources/sql-tests/results/json-functions.sql.out diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 0d4c67156d18f..45516f135a84d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -489,6 +489,8 @@ case class JsonTuple(children: Seq[Expression]) Examples: > SELECT _FUNC_('{"a":1}', '{"type":"struct", "fields":[{"name":"a", "type":"integer", "nullable":true}]}'); {"a":1} + > SELECT _FUNC_('{"time":"26/08/2015"}', '{"type":"struct", "fields":[{"name":"time", "type":"timestamp", "nullable":true}]}', map('timestampFormat', 'dd/MM/yyyy')); + {"time":"2015-08-26 00:00:00.0"} """) // scalastyle:on line.size.limit case class JsonToStruct( @@ -513,7 +515,7 @@ case class JsonToStruct( def this(child: Expression, schema: Expression, options: Expression) = this( schema = JacksonUtils.validateSchemaLiteral(schema), - options = JacksonUtils.validateOptionsLiteral(options), + options = JacksonUtils.validateMapData(options), child = child, timeZoneId = None) @@ -588,13 +590,17 @@ case class JsonToStruct( /** * Converts a [[StructType]] to a json output string. */ +// scalastyle:off line.size.limit @ExpressionDescription( usage = "_FUNC_(expr[, options]) - Returns a json string with a given struct value", extended = """ Examples: > SELECT _FUNC_(named_struct('a', 1, 'b', 2)); {"a":1,"b":2} + > SELECT _FUNC_(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy')); + {"time":"26/08/2015"} """) +// scalastyle:on line.size.limit case class StructToJson( options: Map[String, String], child: Expression, @@ -608,7 +614,7 @@ case class StructToJson( def this(child: Expression) = this(Map.empty, child, None) def this(child: Expression, options: Expression) = this( - options = JacksonUtils.validateOptionsLiteral(options), + options = JacksonUtils.validateMapData(options), child = child, timeZoneId = None) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala index e187583e107fa..2021dfbb05199 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala @@ -17,13 +17,13 @@ package org.apache.spark.sql.catalyst.json -import scala.util.{Failure, Success, Try} +import scala.util.Try import com.fasterxml.jackson.core.{JsonParser, JsonToken} -import org.json4s.jackson.JsonMethods.parse import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} +import org.apache.spark.sql.catalyst.expressions.{CreateMap, Expression, Literal} +import org.apache.spark.sql.catalyst.util.ArrayBasedMapData import org.apache.spark.sql.types._ object JacksonUtils { @@ -61,11 +61,6 @@ object JacksonUtils { schema.foreach(field => verifyType(field.name, field.dataType)) } - private def validateStringLiteral(exp: Expression): String = exp match { - case Literal(s, StringType) => s.toString - case e => throw new AnalysisException(s"Must be a string literal, but: $e") - } - def strToStructType(schemaAsJson: String): StructType = Try { DataType.fromJson(schemaAsJson).asInstanceOf[StructType] }.getOrElse { @@ -73,18 +68,19 @@ object JacksonUtils { s"""Illegal json string for representing a schema: $schemaAsJson"""") } - def validateSchemaLiteral(exp: Expression): StructType = - strToStructType(validateStringLiteral(exp)) + def validateSchemaLiteral(exp: Expression): StructType = exp match { + case Literal(s, StringType) => strToStructType(s.toString) + case e => throw new AnalysisException(s"Must be a string literal, but: $e") + } - /** - * Convert a literal including a json option string (e.g., '{"mode": "PERMISSIVE", ...}') - * to Map-type data. - */ - def validateOptionsLiteral(exp: Expression): Map[String, String] = { - implicit val formats = org.json4s.DefaultFormats - val json = validateStringLiteral(exp) - Try(parse(json).extract[Map[String, String]]).getOrElse { - throw new AnalysisException(s"""The format must be '{"key": "value", ...}', but ${json}"""") - } + /** Convert a map literal to Map-type data. */ + def validateMapData(exp: Expression): Map[String, String] = exp match { + case m: CreateMap => m.dataType.acceptsType(MapType(StringType, StringType, false)) + val arrayMap = m.eval().asInstanceOf[ArrayBasedMapData] + ArrayBasedMapData.toScalaMap(arrayMap).map { case (key, value) => + key.toString -> value.toString + }.toMap + case _ => + throw new AnalysisException("Must use a map() function for options") } } diff --git a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql new file mode 100644 index 0000000000000..9f18b8c69691b --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql @@ -0,0 +1,19 @@ +-- to_json +describe function to_json; +describe function extended to_json; +select to_json(named_struct('a', 1, 'b', 2)); +select to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy')); +-- Check if errors handled +select to_json(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE')); +select to_json(); + +-- from_json +describe function from_json; +describe function extended from_json; +select from_json('{"a":1}', '{"type":"struct", "fields":[{"name":"a", "type":"integer", "nullable":true}]}'); +select from_json('{"time":"26/08/2015"}', '{"type":"struct", "fields":[{"name":"time", "type":"timestamp", "nullable":true}]}', map('timestampFormat', 'dd/MM/yyyy')); +-- Check if errors handled +select from_json('{"a":1}', 1); +select from_json('{"a":1}', '{"a": 1}'); +select from_json('{"a":1}', '{"type":"struct", "fields":[{"name":"a", "type":"integer", "nullable":true}]}', named_struct('mode', 'PERMISSIVE')); +select from_json(); diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out new file mode 100644 index 0000000000000..a1f5d5e26f249 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out @@ -0,0 +1,142 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 14 + + +-- !query 0 +describe function to_json +-- !query 0 schema +struct +-- !query 0 output +Class: org.apache.spark.sql.catalyst.expressions.StructToJson +Function: to_json +Usage: to_json(expr[, options]) - Returns a json string with a given struct value + + +-- !query 1 +describe function extended to_json +-- !query 1 schema +struct +-- !query 1 output +Class: org.apache.spark.sql.catalyst.expressions.StructToJson +Extended Usage: + Examples: + > SELECT to_json(named_struct('a', 1, 'b', 2)); + {"a":1,"b":2} + > SELECT to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy')); + {"time":"26/08/2015"} + +Function: to_json +Usage: to_json(expr[, options]) - Returns a json string with a given struct value + + +-- !query 2 +select to_json(named_struct('a', 1, 'b', 2)) +-- !query 2 schema +struct +-- !query 2 output +{"a":1,"b":2} + + +-- !query 3 +select to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy')) +-- !query 3 schema +struct +-- !query 3 output +{"time":"26/08/2015"} + + +-- !query 4 +select to_json(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE')) +-- !query 4 schema +struct<> +-- !query 4 output +org.apache.spark.sql.AnalysisException +Must use a map() function for options;; line 1 pos 7 + + +-- !query 5 +select to_json() +-- !query 5 schema +struct<> +-- !query 5 output +org.apache.spark.sql.AnalysisException +Invalid number of arguments for function to_json; line 1 pos 7 + + +-- !query 6 +describe function from_json +-- !query 6 schema +struct +-- !query 6 output +Class: org.apache.spark.sql.catalyst.expressions.JsonToStruct +Function: from_json +Usage: from_json(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`. + + +-- !query 7 +describe function extended from_json +-- !query 7 schema +struct +-- !query 7 output +Class: org.apache.spark.sql.catalyst.expressions.JsonToStruct +Extended Usage: + Examples: + > SELECT from_json('{"a":1}', '{"type":"struct", "fields":[{"name":"a", "type":"integer", "nullable":true}]}'); + {"a":1} + > SELECT from_json('{"time":"26/08/2015"}', '{"type":"struct", "fields":[{"name":"time", "type":"timestamp", "nullable":true}]}', map('timestampFormat', 'dd/MM/yyyy')); + {"time":"2015-08-26 00:00:00.0"} + +Function: from_json +Usage: from_json(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`. + + +-- !query 8 +select from_json('{"a":1}', '{"type":"struct", "fields":[{"name":"a", "type":"integer", "nullable":true}]}') +-- !query 8 schema +struct> +-- !query 8 output +{"a":1} + + +-- !query 9 +select from_json('{"time":"26/08/2015"}', '{"type":"struct", "fields":[{"name":"time", "type":"timestamp", "nullable":true}]}', map('timestampFormat', 'dd/MM/yyyy')) +-- !query 9 schema +struct> +-- !query 9 output +{"time":2015-08-26 00:00:00.0} + + +-- !query 10 +select from_json('{"a":1}', 1) +-- !query 10 schema +struct<> +-- !query 10 output +org.apache.spark.sql.AnalysisException +Must be a string literal, but: 1;; line 1 pos 7 + + +-- !query 11 +select from_json('{"a":1}', '{"a": 1}') +-- !query 11 schema +struct<> +-- !query 11 output +org.apache.spark.sql.AnalysisException +Illegal json string for representing a schema: {"a": 1}";; line 1 pos 7 + + +-- !query 12 +select from_json('{"a":1}', '{"type":"struct", "fields":[{"name":"a", "type":"integer", "nullable":true}]}', named_struct('mode', 'PERMISSIVE')) +-- !query 12 schema +struct<> +-- !query 12 output +org.apache.spark.sql.AnalysisException +Must use a map() function for options;; line 1 pos 7 + + +-- !query 13 +select from_json() +-- !query 13 schema +struct<> +-- !query 13 output +org.apache.spark.sql.AnalysisException +Invalid number of arguments for function from_json; line 1 pos 7 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 7ea5eec42f898..cf4cc92ecef5a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -207,14 +207,13 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { val df2 = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))).toDF("a") checkAnswer( - df2.selectExpr("""to_json(a, '{"timestampFormat": "dd/MM/yyyy HH:mm"}')"""), + df2.selectExpr("to_json(a, map('timestampFormat', 'dd/MM/yyyy HH:mm'))"), Row("""{"_1":"26/08/2015 18:00"}""") :: Nil) val errMsg1 = intercept[AnalysisException] { - df2.selectExpr("""to_json(a, '{"k": [{"k": "v"}]}')""").collect + df2.selectExpr("to_json(a, named_struct('a', 1))") } - assert(errMsg1.getMessage.startsWith( - """The format must be '{"key": "value", ...}', but {"k": [{"k": "v"}]}""")) + assert(errMsg1.getMessage.startsWith("Must use a map() function for options")) // from_json val df3 = Seq("""{"a": 1}""").toDS() @@ -227,14 +226,20 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { val schema2 = new StructType().add("time", TimestampType) checkAnswer( df4.selectExpr( - s"""from_json(value, '${schema2.json}', """ + - """'{"timestampFormat": "dd/MM/yyyy HH:mm"}')"""), + s"from_json(value, '${schema2.json}', map('timestampFormat', 'dd/MM/yyyy HH:mm'))"), Row(Row(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))) val errMsg2 = intercept[AnalysisException] { - df4.selectExpr(s"""from_json(value, '${schema2.json}', '{"k": [{"k": "v"}]}')""") + df4.selectExpr(s"from_json(value, 1)") } - assert(errMsg2.getMessage.startsWith( - """The format must be '{"key": "value", ...}', but {"k": [{"k": "v"}]}""")) + assert(errMsg2.getMessage.startsWith("Must be a string literal, but:")) + val errMsg3 = intercept[AnalysisException] { + df4.selectExpr(s"""from_json(value, '{"a": 1}')""") + } + assert(errMsg3.getMessage.startsWith("Illegal json string for representing a schema:")) + val errMsg4 = intercept[AnalysisException] { + df4.selectExpr(s"from_json(value, '${schema2.json}', named_struct('a', 1))") + } + assert(errMsg4.getMessage.startsWith("Must use a map() function for options")) } } From 53b758b0418d6f9d3a0e238b1ab0ffa07e373c60 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Fri, 3 Mar 2017 13:30:34 +0900 Subject: [PATCH 06/10] Drop from_json support --- .../catalyst/analysis/FunctionRegistry.scala | 1 - .../expressions/jsonExpressions.scala | 26 ------ .../sql/catalyst/json/JacksonUtils.scala | 7 +- .../sql-tests/inputs/json-functions.sql | 11 --- .../sql-tests/results/json-functions.sql.out | 81 +------------------ .../apache/spark/sql/JsonFunctionsSuite.scala | 29 +------ 6 files changed, 3 insertions(+), 152 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 7c981331303bd..556fa9901701b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -423,7 +423,6 @@ object FunctionRegistry { // json expression[StructToJson]("to_json"), - expression[JsonToStruct]("from_json"), // Cast aliases (SPARK-16730) castAlias("boolean", BooleanType), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 45516f135a84d..130b0ce9f536a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -482,17 +482,6 @@ case class JsonTuple(children: Seq[Expression]) /** * Converts an json input string to a [[StructType]] or [[ArrayType]] with the specified schema. */ -// scalastyle:off line.size.limit -@ExpressionDescription( - usage = "_FUNC_(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`.", - extended = """ - Examples: - > SELECT _FUNC_('{"a":1}', '{"type":"struct", "fields":[{"name":"a", "type":"integer", "nullable":true}]}'); - {"a":1} - > SELECT _FUNC_('{"time":"26/08/2015"}', '{"type":"struct", "fields":[{"name":"time", "type":"timestamp", "nullable":true}]}', map('timestampFormat', 'dd/MM/yyyy')); - {"time":"2015-08-26 00:00:00.0"} - """) -// scalastyle:on line.size.limit case class JsonToStruct( schema: DataType, options: Map[String, String], @@ -504,21 +493,6 @@ case class JsonToStruct( def this(schema: DataType, options: Map[String, String], child: Expression) = this(schema, options, child, None) - // Used in `FunctionRegistry` - def this(child: Expression, schema: Expression) = - this( - schema = JacksonUtils.validateSchemaLiteral(schema), - options = Map.empty[String, String], - child = child, - timeZoneId = None) - - def this(child: Expression, schema: Expression, options: Expression) = - this( - schema = JacksonUtils.validateSchemaLiteral(schema), - options = JacksonUtils.validateMapData(options), - child = child, - timeZoneId = None) - override def checkInputDataTypes(): TypeCheckResult = schema match { case _: StructType | ArrayType(_: StructType, _) => super.checkInputDataTypes() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala index 2021dfbb05199..0406ec5a95ec2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala @@ -22,7 +22,7 @@ import scala.util.Try import com.fasterxml.jackson.core.{JsonParser, JsonToken} import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{CreateMap, Expression, Literal} +import org.apache.spark.sql.catalyst.expressions.{CreateMap, Expression} import org.apache.spark.sql.catalyst.util.ArrayBasedMapData import org.apache.spark.sql.types._ @@ -68,11 +68,6 @@ object JacksonUtils { s"""Illegal json string for representing a schema: $schemaAsJson"""") } - def validateSchemaLiteral(exp: Expression): StructType = exp match { - case Literal(s, StringType) => strToStructType(s.toString) - case e => throw new AnalysisException(s"Must be a string literal, but: $e") - } - /** Convert a map literal to Map-type data. */ def validateMapData(exp: Expression): Map[String, String] = exp match { case m: CreateMap => m.dataType.acceptsType(MapType(StringType, StringType, false)) diff --git a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql index 9f18b8c69691b..9308560451bf5 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql @@ -6,14 +6,3 @@ select to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), m -- Check if errors handled select to_json(named_struct('a', 1, 'b', 2), named_struct('mode', 'PERMISSIVE')); select to_json(); - --- from_json -describe function from_json; -describe function extended from_json; -select from_json('{"a":1}', '{"type":"struct", "fields":[{"name":"a", "type":"integer", "nullable":true}]}'); -select from_json('{"time":"26/08/2015"}', '{"type":"struct", "fields":[{"name":"time", "type":"timestamp", "nullable":true}]}', map('timestampFormat', 'dd/MM/yyyy')); --- Check if errors handled -select from_json('{"a":1}', 1); -select from_json('{"a":1}', '{"a": 1}'); -select from_json('{"a":1}', '{"type":"struct", "fields":[{"name":"a", "type":"integer", "nullable":true}]}', named_struct('mode', 'PERMISSIVE')); -select from_json(); diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out index a1f5d5e26f249..d8aa4fb9fa788 100644 --- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 14 +-- Number of queries: 6 -- !query 0 @@ -61,82 +61,3 @@ struct<> -- !query 5 output org.apache.spark.sql.AnalysisException Invalid number of arguments for function to_json; line 1 pos 7 - - --- !query 6 -describe function from_json --- !query 6 schema -struct --- !query 6 output -Class: org.apache.spark.sql.catalyst.expressions.JsonToStruct -Function: from_json -Usage: from_json(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`. - - --- !query 7 -describe function extended from_json --- !query 7 schema -struct --- !query 7 output -Class: org.apache.spark.sql.catalyst.expressions.JsonToStruct -Extended Usage: - Examples: - > SELECT from_json('{"a":1}', '{"type":"struct", "fields":[{"name":"a", "type":"integer", "nullable":true}]}'); - {"a":1} - > SELECT from_json('{"time":"26/08/2015"}', '{"type":"struct", "fields":[{"name":"time", "type":"timestamp", "nullable":true}]}', map('timestampFormat', 'dd/MM/yyyy')); - {"time":"2015-08-26 00:00:00.0"} - -Function: from_json -Usage: from_json(jsonStr, schema[, options]) - Returns a struct value with the given `jsonStr` and `schema`. - - --- !query 8 -select from_json('{"a":1}', '{"type":"struct", "fields":[{"name":"a", "type":"integer", "nullable":true}]}') --- !query 8 schema -struct> --- !query 8 output -{"a":1} - - --- !query 9 -select from_json('{"time":"26/08/2015"}', '{"type":"struct", "fields":[{"name":"time", "type":"timestamp", "nullable":true}]}', map('timestampFormat', 'dd/MM/yyyy')) --- !query 9 schema -struct> --- !query 9 output -{"time":2015-08-26 00:00:00.0} - - --- !query 10 -select from_json('{"a":1}', 1) --- !query 10 schema -struct<> --- !query 10 output -org.apache.spark.sql.AnalysisException -Must be a string literal, but: 1;; line 1 pos 7 - - --- !query 11 -select from_json('{"a":1}', '{"a": 1}') --- !query 11 schema -struct<> --- !query 11 output -org.apache.spark.sql.AnalysisException -Illegal json string for representing a schema: {"a": 1}";; line 1 pos 7 - - --- !query 12 -select from_json('{"a":1}', '{"type":"struct", "fields":[{"name":"a", "type":"integer", "nullable":true}]}', named_struct('mode', 'PERMISSIVE')) --- !query 12 schema -struct<> --- !query 12 output -org.apache.spark.sql.AnalysisException -Must use a map() function for options;; line 1 pos 7 - - --- !query 13 -select from_json() --- !query 13 schema -struct<> --- !query 13 output -org.apache.spark.sql.AnalysisException -Invalid number of arguments for function from_json; line 1 pos 7 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index cf4cc92ecef5a..58655f5622d6e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -198,7 +198,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { checkAnswer(dfTwo, readBackTwo) } - test("SPARK-19637 Support to_json/from_json in SQL") { + test("SPARK-19637 Support to_json in SQL") { // to_json val df1 = Seq(Tuple1(Tuple1(1))).toDF("a") checkAnswer( @@ -214,32 +214,5 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { df2.selectExpr("to_json(a, named_struct('a', 1))") } assert(errMsg1.getMessage.startsWith("Must use a map() function for options")) - - // from_json - val df3 = Seq("""{"a": 1}""").toDS() - val schema1 = new StructType().add("a", IntegerType) - checkAnswer( - df3.selectExpr(s"from_json(value, '${schema1.json}')"), - Row(Row(1)) :: Nil) - - val df4 = Seq("""{"time": "26/08/2015 18:00"}""").toDS() - val schema2 = new StructType().add("time", TimestampType) - checkAnswer( - df4.selectExpr( - s"from_json(value, '${schema2.json}', map('timestampFormat', 'dd/MM/yyyy HH:mm'))"), - Row(Row(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))) - - val errMsg2 = intercept[AnalysisException] { - df4.selectExpr(s"from_json(value, 1)") - } - assert(errMsg2.getMessage.startsWith("Must be a string literal, but:")) - val errMsg3 = intercept[AnalysisException] { - df4.selectExpr(s"""from_json(value, '{"a": 1}')""") - } - assert(errMsg3.getMessage.startsWith("Illegal json string for representing a schema:")) - val errMsg4 = intercept[AnalysisException] { - df4.selectExpr(s"from_json(value, '${schema2.json}', named_struct('a', 1))") - } - assert(errMsg4.getMessage.startsWith("Must use a map() function for options")) } } From bc8b07e81b62979bf9a832b229bd512763f67c10 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Fri, 3 Mar 2017 14:39:42 +0900 Subject: [PATCH 07/10] Apply some comments --- .../org/apache/spark/sql/catalyst/json/JacksonUtils.scala | 7 ------- .../src/main/scala/org/apache/spark/sql/functions.scala | 3 +-- .../scala/org/apache/spark/sql/JsonFunctionsSuite.scala | 1 - 3 files changed, 1 insertion(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala index 0406ec5a95ec2..269f3a2f1c3d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala @@ -61,13 +61,6 @@ object JacksonUtils { schema.foreach(field => verifyType(field.name, field.dataType)) } - def strToStructType(schemaAsJson: String): StructType = Try { - DataType.fromJson(schemaAsJson).asInstanceOf[StructType] - }.getOrElse { - throw new AnalysisException( - s"""Illegal json string for representing a schema: $schemaAsJson"""") - } - /** Convert a map literal to Map-type data. */ def validateMapData(exp: Expression): Map[String, String] = exp match { case m: CreateMap => m.dataType.acceptsType(MapType(StringType, StringType, false)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index ad6e1beb9f5d7..c881bdb85e6b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -28,7 +28,6 @@ import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedFunction} import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ -import org.apache.spark.sql.catalyst.json.JacksonUtils import org.apache.spark.sql.catalyst.plans.logical.BroadcastHint import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.expressions.UserDefinedFunction @@ -3060,7 +3059,7 @@ object functions { * @since 2.1.0 */ def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column = - from_json(e, JacksonUtils.strToStructType(schema), options) + from_json(e, DataType.fromJson(schema).asInstanceOf[StructType], options) /** * (Scala-specific) Converts a column containing a `StructType` into a JSON string with the diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 58655f5622d6e..0738582e95922 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -199,7 +199,6 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { } test("SPARK-19637 Support to_json in SQL") { - // to_json val df1 = Seq(Tuple1(Tuple1(1))).toDF("a") checkAnswer( df1.selectExpr("to_json(a)"), From 228ff37d2c2dcb1212de6fb26e39e3081a5c4364 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Fri, 3 Mar 2017 15:44:37 +0900 Subject: [PATCH 08/10] Remove unnecessary import --- .../scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala index 269f3a2f1c3d3..b583ed1f2e554 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.catalyst.json -import scala.util.Try - import com.fasterxml.jackson.core.{JsonParser, JsonToken} import org.apache.spark.sql.AnalysisException From 6daea903c8f94a1914c697226550c79a0ce69c93 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sat, 4 Mar 2017 08:39:20 +0900 Subject: [PATCH 09/10] Apply review comments --- .../expressions/jsonExpressions.scala | 19 +++++++++++++++++-- .../sql/catalyst/json/JacksonUtils.scala | 14 -------------- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 130b0ce9f536a..5ae59e2c3ff59 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -23,11 +23,12 @@ import scala.util.parsing.combinator.RegexParsers import com.fasterxml.jackson.core._ +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.json._ -import org.apache.spark.sql.catalyst.util.{GenericArrayData, ParseModes} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, GenericArrayData, ParseModes} import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -588,7 +589,7 @@ case class StructToJson( def this(child: Expression) = this(Map.empty, child, None) def this(child: Expression, options: Expression) = this( - options = JacksonUtils.validateMapData(options), + options = StructToJson.convertToMapData(options), child = child, timeZoneId = None) @@ -632,3 +633,17 @@ case class StructToJson( override def inputTypes: Seq[AbstractDataType] = StructType :: Nil } + +object StructToJson { + + def convertToMapData(exp: Expression): Map[String, String] = exp match { + case m: CreateMap + if m.dataType.acceptsType(MapType(StringType, StringType, valueContainsNull = false)) => + val arrayMap = m.eval().asInstanceOf[ArrayBasedMapData] + ArrayBasedMapData.toScalaMap(arrayMap).map { case (key, value) => + key.toString -> value.toString + } + case _ => + throw new AnalysisException("Must use a map() function for options") + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala index b583ed1f2e554..3b23c6cd2816f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonUtils.scala @@ -19,9 +19,6 @@ package org.apache.spark.sql.catalyst.json import com.fasterxml.jackson.core.{JsonParser, JsonToken} -import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalyst.expressions.{CreateMap, Expression} -import org.apache.spark.sql.catalyst.util.ArrayBasedMapData import org.apache.spark.sql.types._ object JacksonUtils { @@ -58,15 +55,4 @@ object JacksonUtils { schema.foreach(field => verifyType(field.name, field.dataType)) } - - /** Convert a map literal to Map-type data. */ - def validateMapData(exp: Expression): Map[String, String] = exp match { - case m: CreateMap => m.dataType.acceptsType(MapType(StringType, StringType, false)) - val arrayMap = m.eval().asInstanceOf[ArrayBasedMapData] - ArrayBasedMapData.toScalaMap(arrayMap).map { case (key, value) => - key.toString -> value.toString - }.toMap - case _ => - throw new AnalysisException("Must use a map() function for options") - } } From 6d164740a6434b80d3559d72deca09729835e273 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Sat, 4 Mar 2017 12:58:36 +0900 Subject: [PATCH 10/10] Add error messages --- .../spark/sql/catalyst/expressions/jsonExpressions.scala | 3 +++ .../src/main/scala/org/apache/spark/sql/functions.scala | 2 +- .../scala/org/apache/spark/sql/JsonFunctionsSuite.scala | 6 ++++++ 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 5ae59e2c3ff59..18b5f2f7ed2e8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -643,6 +643,9 @@ object StructToJson { ArrayBasedMapData.toScalaMap(arrayMap).map { case (key, value) => key.toString -> value.toString } + case m: CreateMap => + throw new AnalysisException( + s"A type of keys and values in map() must be string, but got ${m.dataType}") case _ => throw new AnalysisException("Must use a map() function for options") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index c881bdb85e6b5..201f726db3fad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -3059,7 +3059,7 @@ object functions { * @since 2.1.0 */ def from_json(e: Column, schema: String, options: java.util.Map[String, String]): Column = - from_json(e, DataType.fromJson(schema).asInstanceOf[StructType], options) + from_json(e, DataType.fromJson(schema), options) /** * (Scala-specific) Converts a column containing a `StructType` into a JSON string with the diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index 0738582e95922..cdea3b9a0f79f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -213,5 +213,11 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext { df2.selectExpr("to_json(a, named_struct('a', 1))") } assert(errMsg1.getMessage.startsWith("Must use a map() function for options")) + + val errMsg2 = intercept[AnalysisException] { + df2.selectExpr("to_json(a, map('a', 1))") + } + assert(errMsg2.getMessage.startsWith( + "A type of keys and values in map() must be string, but got")) } }