Skip to content

Commit

Permalink
[SPARK-24709][SQL] schema_of_json() - schema inference from an example
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

In the PR, I propose to add new function - *schema_of_json()* which infers schema of JSON string literal. The result of the function is a string containing a schema in DDL format.

One of the use cases is using of *schema_of_json()* in the combination with *from_json()*. Currently, _from_json()_ requires a schema as a mandatory argument. The *schema_of_json()* function will allow to point out an JSON string as an example which has the same schema as the first argument of _from_json()_. For instance:

```sql
select from_json(json_column, schema_of_json('{"c1": [0], "c2": [{"c3":0}]}'))
from json_table;
```

## How was this patch tested?

Added new test to `JsonFunctionsSuite`, `JsonExpressionsSuite` and SQL tests to `json-functions.sql`

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21686 from MaxGekk/infer_schema_json.
  • Loading branch information
MaxGekk authored and HyukjinKwon committed Jul 4, 2018
1 parent 5585c57 commit 776f299
Show file tree
Hide file tree
Showing 11 changed files with 163 additions and 18 deletions.
27 changes: 27 additions & 0 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2189,11 +2189,16 @@ def from_json(col, schema, options={}):
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(from_json(df.value, schema).alias("json")).collect()
[Row(json=[Row(a=1)])]
>>> schema = schema_of_json(lit('''{"a": 0}'''))
>>> df.select(from_json(df.value, schema).alias("json")).collect()
[Row(json=Row(a=1))]
"""

sc = SparkContext._active_spark_context
if isinstance(schema, DataType):
schema = schema.json()
elif isinstance(schema, Column):
schema = _to_java_column(schema)
jc = sc._jvm.functions.from_json(_to_java_column(col), schema, options)
return Column(jc)

Expand Down Expand Up @@ -2235,6 +2240,28 @@ def to_json(col, options={}):
return Column(jc)


@ignore_unicode_prefix
@since(2.4)
def schema_of_json(col):
"""
Parses a column containing a JSON string and infers its schema in DDL format.
:param col: string column in json format
>>> from pyspark.sql.types import *
>>> data = [(1, '{"a": 1}')]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(schema_of_json(df.value).alias("json")).collect()
[Row(json=u'struct<a:bigint>')]
>>> df.select(schema_of_json(lit('{"a": 0}')).alias("json")).collect()
[Row(json=u'struct<a:bigint>')]
"""

sc = SparkContext._active_spark_context
jc = sc._jvm.functions.schema_of_json(_to_java_column(col))
return Column(jc)


@since(1.5)
def size(col):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,7 @@ object FunctionRegistry {
// json
expression[StructsToJson]("to_json"),
expression[JsonToStructs]("from_json"),
expression[SchemaOfJson]("schema_of_json"),

// cast
expression[Cast]("cast"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.expressions

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, CharArrayWriter, InputStreamReader, StringWriter}
import java.io._

import scala.util.parsing.combinator.RegexParsers

Expand All @@ -28,7 +28,8 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.json._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, BadRecordException, FailFastMode, GenericArrayData, MapData}
import org.apache.spark.sql.catalyst.json.JsonInferSchema.inferField
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -525,17 +526,19 @@ case class JsonToStructs(
override def nullable: Boolean = true

// Used in `FunctionRegistry`
def this(child: Expression, schema: Expression) =
def this(child: Expression, schema: Expression, options: Map[String, String]) =
this(
schema = JsonExprUtils.validateSchemaLiteral(schema),
options = Map.empty[String, String],
schema = JsonExprUtils.evalSchemaExpr(schema),
options = options,
child = child,
timeZoneId = None,
forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA))

def this(child: Expression, schema: Expression) = this(child, schema, Map.empty[String, String])

def this(child: Expression, schema: Expression, options: Expression) =
this(
schema = JsonExprUtils.validateSchemaLiteral(schema),
schema = JsonExprUtils.evalSchemaExpr(schema),
options = JsonExprUtils.convertToMapData(options),
child = child,
timeZoneId = None,
Expand Down Expand Up @@ -744,11 +747,44 @@ case class StructsToJson(
override def inputTypes: Seq[AbstractDataType] = TypeCollection(ArrayType, StructType) :: Nil
}

/**
* A function infers schema of JSON string.
*/
@ExpressionDescription(
usage = "_FUNC_(json[, options]) - Returns schema in the DDL format of JSON string.",
examples = """
Examples:
> SELECT _FUNC_('[{"col":0}]');
array<struct<col:int>>
""",
since = "2.4.0")
case class SchemaOfJson(child: Expression)
extends UnaryExpression with String2StringExpression with CodegenFallback {

private val jsonOptions = new JSONOptions(Map.empty, "UTC")
private val jsonFactory = new JsonFactory()
jsonOptions.setJacksonOptions(jsonFactory)

override def convert(v: UTF8String): UTF8String = {
val dt = Utils.tryWithResource(CreateJacksonParser.utf8String(jsonFactory, v)) { parser =>
parser.nextToken()
inferField(parser, jsonOptions)
}

UTF8String.fromString(dt.catalogString)
}
}

object JsonExprUtils {

def validateSchemaLiteral(exp: Expression): DataType = exp match {
def evalSchemaExpr(exp: Expression): DataType = exp match {
case Literal(s, StringType) => DataType.fromDDL(s.toString)
case e => throw new AnalysisException(s"Expected a string literal instead of $e")
case e @ SchemaOfJson(_: Literal) =>
val ddlSchema = e.eval().asInstanceOf[UTF8String]
DataType.fromDDL(ddlSchema.toString)
case e => throw new AnalysisException(
"Schema should be specified in DDL format as a string literal" +
s" or output of the schema_of_json function instead of ${e.sql}")
}

def convertToMapData(exp: Expression): Map[String, String] = exp match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.json
package org.apache.spark.sql.catalyst.json

import java.util.Comparator

Expand All @@ -25,7 +25,6 @@ import org.apache.spark.SparkException
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
import org.apache.spark.sql.catalyst.json.JSONOptions
import org.apache.spark.sql.catalyst.util.{DropMalformedMode, FailFastMode, ParseMode, PermissiveMode}
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -103,7 +102,7 @@ private[sql] object JsonInferSchema {
/**
* Infer the type of a json document from the parser's token stream
*/
private def inferField(parser: JsonParser, configOptions: JSONOptions): DataType = {
def inferField(parser: JsonParser, configOptions: JSONOptions): DataType = {
import com.fasterxml.jackson.core.JsonToken._
parser.getCurrentToken match {
case null | VALUE_NULL => NullType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -706,4 +706,11 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
assert(schemaToCompare == schema)
}
}

test("SPARK-24709: infer schema of json strings") {
checkEvaluation(SchemaOfJson(Literal.create("""{"col":0}""")), "struct<col:bigint>")
checkEvaluation(
SchemaOfJson(Literal.create("""{"col0":["a"], "col1": {"col2": "b"}}""")),
"struct<col0:array<string>,col1:struct<col2:string>>")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.input.{PortableDataStream, StreamInputFormat}
import org.apache.spark.rdd.{BinaryFileRDD, RDD}
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JsonInferSchema, JSONOptions}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
Expand Down
42 changes: 42 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3381,6 +3381,48 @@ object functions {
from_json(e, dataType, options)
}

/**
* (Scala-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
* as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema.
* Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
* @param schema the schema to use when parsing the json string
*
* @group collection_funcs
* @since 2.4.0
*/
def from_json(e: Column, schema: Column): Column = {
from_json(e, schema, Map.empty[String, String].asJava)
}

/**
* (Java-specific) Parses a column containing a JSON string into a `MapType` with `StringType`
* as keys type, `StructType` or `ArrayType` of `StructType`s with the specified schema.
* Returns `null`, in the case of an unparseable string.
*
* @param e a string column containing JSON data.
* @param schema the schema to use when parsing the json string
* @param options options to control how the json is parsed. accepts the same options and the
* json data source.
*
* @group collection_funcs
* @since 2.4.0
*/
def from_json(e: Column, schema: Column, options: java.util.Map[String, String]): Column = {
withExpr(new JsonToStructs(e.expr, schema.expr, options.asScala.toMap))
}

/**
* Parses a column containing a JSON string and infers its schema.
*
* @param e a string column containing JSON data.
*
* @group collection_funcs
* @since 2.4.0
*/
def schema_of_json(e: Column): Column = withExpr(new SchemaOfJson(e.expr))

/**
* (Scala-specific) Converts a column containing a `StructType`, `ArrayType` of `StructType`s,
* a `MapType` or `ArrayType` of `MapType`s into a JSON string with the specified schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,7 @@ DROP VIEW IF EXISTS jsonTable;
-- from_json - complex types
select from_json('{"a":1, "b":2}', 'map<string, int>');
select from_json('{"a":1, "b":"2"}', 'struct<a:int,b:string>');

-- infer schema of json literal
select schema_of_json('{"c1":0, "c2":[1]}');
select from_json('{"c1":[1, 2, 3]}', schema_of_json('{"c1":[0]}'));
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 28
-- Number of queries: 30


-- !query 0
Expand Down Expand Up @@ -183,7 +183,7 @@ select from_json('{"a":1}', 1)
struct<>
-- !query 17 output
org.apache.spark.sql.AnalysisException
Expected a string literal instead of 1;; line 1 pos 7
Schema should be specified in DDL format as a string literal or output of the schema_of_json function instead of 1;; line 1 pos 7


-- !query 18
Expand Down Expand Up @@ -274,3 +274,19 @@ select from_json('{"a":1, "b":"2"}', 'struct<a:int,b:string>')
struct<jsontostructs({"a":1, "b":"2"}):struct<a:int,b:string>>
-- !query 27 output
{"a":1,"b":"2"}


-- !query 28
select schema_of_json('{"c1":0, "c2":[1]}')
-- !query 28 schema
struct<schemaofjson({"c1":0, "c2":[1]}):string>
-- !query 28 output
struct<c1:bigint,c2:array<bigint>>


-- !query 29
select from_json('{"c1":[1, 2, 3]}', schema_of_json('{"c1":[0]}'))
-- !query 29 schema
struct<jsontostructs({"c1":[1, 2, 3]}):struct<c1:array<bigint>>>
-- !query 29 output
{"c1":[1,2,3]}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql

import org.apache.spark.sql.functions.{from_json, lit, map, struct, to_json}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -311,7 +311,7 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
val errMsg1 = intercept[AnalysisException] {
df3.selectExpr("from_json(value, 1)")
}
assert(errMsg1.getMessage.startsWith("Expected a string literal instead of"))
assert(errMsg1.getMessage.startsWith("Schema should be specified in DDL format as a string"))
val errMsg2 = intercept[AnalysisException] {
df3.selectExpr("""from_json(value, 'time InvalidType')""")
}
Expand Down Expand Up @@ -392,4 +392,17 @@ class JsonFunctionsSuite extends QueryTest with SharedSQLContext {
checkAnswer(Seq("""{"{"f": 1}": "a"}""").toDS().select(from_json($"value", schema)),
Row(null))
}

test("SPARK-24709: infers schemas of json strings and pass them to from_json") {
val in = Seq("""{"a": [1, 2, 3]}""").toDS()
val out = in.select(from_json('value, schema_of_json(lit("""{"a": [1]}"""))) as "parsed")
val expected = StructType(StructField(
"parsed",
StructType(StructField(
"a",
ArrayType(LongType, true), true) :: Nil),
true) :: Nil)

assert(out.schema == expected)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.{SparkException, TestUtils}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{functions => F, _}
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JsonInferSchema, JSONOptions}
import org.apache.spark.sql.catalyst.json.JsonInferSchema.compatibleType
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.ExternalRDD
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.json.JsonInferSchema.compatibleType
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
Expand Down

0 comments on commit 776f299

Please sign in to comment.