From f98aea2b59025f4a41e28fac8e2b2b689ddf4d27 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 23 Jun 2018 19:16:22 +0200 Subject: [PATCH 1/9] Initial implementation of the infer_schema function --- .../catalyst/analysis/FunctionRegistry.scala | 1 + .../expressions/aggregate/InferSchema.scala | 165 ++++++++++++++++++ .../sql/catalyst}/json/JsonInferSchema.scala | 47 +++-- .../datasources/json/JsonDataSource.scala | 4 +- 4 files changed, 196 insertions(+), 21 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala rename sql/{core/src/main/scala/org/apache/spark/sql/execution/datasources => catalyst/src/main/scala/org/apache/spark/sql/catalyst}/json/JsonInferSchema.scala (92%) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 8abc616c1a3f7..e8384c6447115 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -308,6 +308,7 @@ object FunctionRegistry { expression[RegrSlope]("regr_slope"), expression[RegrR2]("regr_r2"), expression[RegrIntercept]("regr_intercept"), + expression[InferSchema]("infer_schema"), // string functions expression[Ascii]("ascii"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala new file mode 100644 index 0000000000000..f8e16da31cd04 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions.aggregate + +import scala.util.Try + +import com.fasterxml.jackson.core.JsonFactory + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, JsonExprUtils} +import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JsonInferSchema, JSONOptions} +import org.apache.spark.sql.catalyst.json.JsonInferSchema.compatibleRootType +import org.apache.spark.sql.catalyst.util.DropMalformedMode +import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType} +import org.apache.spark.unsafe.types.UTF8String + +case class InferSchema( + child: Expression, + inputType: String, + options: Map[String, String], + override val mutableAggBufferOffset: Int, + override val inputAggBufferOffset: Int) extends ImperativeAggregate { + + require(inputType.toLowerCase == "json", "Only JSON format is supported") + + def this(child: Expression) = { + this( + child = child, + inputType = "json", + options = Map.empty[String, String], + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + def this(child: Expression, options: Expression) = { + this( + child = child, + inputType = "json", + options = JsonExprUtils.convertToMapData(options), + mutableAggBufferOffset = 0, + inputAggBufferOffset = 0) + } + + override def nullable: Boolean = true + + override def children: Seq[Expression] = Seq(child) + + override def dataType: DataType = StringType + + override lazy val deterministic: Boolean = false + + override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate = + copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = + copy(inputAggBufferOffset = newInputAggBufferOffset) + + // Note: although this simply copies aggBufferAttributes, this common code can not be placed + // in the superclass because that will lead to initialization ordering issues. + override val inputAggBufferAttributes: Seq[AttributeReference] = { + aggBufferAttributes.map(_.newInstance()) + } + + override def aggBufferSchema: StructType = { + StructType(Seq(StructField("inferred_schema", StringType))) + } + private val indexOfSchemaField = 0 + + override def aggBufferAttributes: Seq[AttributeReference] = { + Seq(AttributeReference("inferred_schema", StringType)()) + } + + override def initialize(mutableAggBuffer: InternalRow): Unit = { + mutableAggBuffer.setNullAt(mutableAggBufferOffset + indexOfSchemaField) + } + + override def update(mutableAggBuffer: InternalRow, inputRow: InternalRow): Unit = { + val inputValue = child.eval(inputRow) + if (inputValue != null) { + val currentSchema = inferSchema(inputValue) + val bufferOffset = mutableAggBufferOffset + indexOfSchemaField + val mergedSchema = if (mutableAggBuffer.isNullAt(bufferOffset)) { + currentSchema + } else { + val inferredSchema = mutableAggBuffer.getUTF8String(bufferOffset) + mergeSchemas(inferredSchema, currentSchema) + } + + mergedSchema.foreach(schema => mutableAggBuffer.update(bufferOffset, schema)) + } + } + + override def merge(mutableAggBuffer: InternalRow, inputAggBuffer: InternalRow): Unit = { + val inputOffset = inputAggBufferOffset + indexOfSchemaField + if (!inputAggBuffer.isNullAt(inputOffset)) { + val inputSchema = Some(inputAggBuffer.getUTF8String(inputOffset)) + val bufferOffset = mutableAggBufferOffset + indexOfSchemaField + val mergedSchema = if (mutableAggBuffer.isNullAt(bufferOffset)) { + inputSchema + } else { + val bufferSchema = mutableAggBuffer.getUTF8String(bufferOffset) + mergeSchemas(bufferSchema, inputSchema) + } + + mergedSchema.foreach(schema => mutableAggBuffer.update(bufferOffset, schema)) + } + } + + override def eval(input: InternalRow = null): Any = { + if (input == null) { + null + } else { + input.getUTF8String(indexOfSchemaField) + } + } + + private val jsonOptions = new JSONOptions( + parameters = options + ("mode" -> DropMalformedMode.name), + defaultTimeZoneId = "UTC") + private val jsonFactory = new JsonFactory() + + private def inferSchema(input: Any): Option[UTF8String] = input match { + case jsonRow: UTF8String => + val dataType = JsonInferSchema.inferForRow( + row = jsonRow, + configOptions = jsonOptions, + createParser = CreateJacksonParser.utf8String, + factory = jsonFactory) + + dataType.map(dt => UTF8String.fromString(dt.catalogString)) + case other => + throw new IllegalArgumentException(s"Wrong input type ${other.getClass.getCanonicalName}") + } + + private def mergeSchemas( + inferredSchema: UTF8String, + currentSchema: Option[UTF8String]): Option[UTF8String] = { + currentSchema.flatMap { schema => + val parseMode = jsonOptions.parseMode + val columnNameOfCorruptRecord = jsonOptions.columnNameOfCorruptRecord + val typeMerger = compatibleRootType(columnNameOfCorruptRecord, parseMode) + val inferredType = DataType.fromDDL(inferredSchema.toString) + val currentType = DataType.fromDDL(schema.toString) + + Try(typeMerger(inferredType, currentType)) + .map(dt => UTF8String.fromString(dt.catalogString)) + .toOption + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala similarity index 92% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index f6edc7bfb3750..962798e39f31f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.execution.datasources.json +package org.apache.spark.sql.catalyst.json import java.util.Comparator @@ -25,12 +25,36 @@ import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.TypeCoercion import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil -import org.apache.spark.sql.catalyst.json.JSONOptions import org.apache.spark.sql.catalyst.util.{DropMalformedMode, FailFastMode, ParseMode, PermissiveMode} import org.apache.spark.sql.types._ import org.apache.spark.util.Utils private[sql] object JsonInferSchema { + def inferForRow[T]( + row: T, + configOptions: JSONOptions, + createParser: (JsonFactory, T) => JsonParser, + factory: JsonFactory): Option[DataType] = { + val parseMode = configOptions.parseMode + val columnNameOfCorruptRecord = configOptions.columnNameOfCorruptRecord + + try { + Utils.tryWithResource(createParser(factory, row)) { parser => + parser.nextToken() + Some(inferField(parser, configOptions)) + } + } catch { + case e @ (_: RuntimeException | _: JsonProcessingException) => parseMode match { + case PermissiveMode => + Some(StructType(Seq(StructField(columnNameOfCorruptRecord, StringType)))) + case DropMalformedMode => + None + case FailFastMode => + throw new SparkException("Malformed records are detected in schema inference. " + + s"Parse Mode: ${FailFastMode.name}.", e) + } + } + } /** * Infer the type of a collection of json records in three stages: @@ -51,22 +75,7 @@ private[sql] object JsonInferSchema { val factory = new JsonFactory() configOptions.setJacksonOptions(factory) iter.flatMap { row => - try { - Utils.tryWithResource(createParser(factory, row)) { parser => - parser.nextToken() - Some(inferField(parser, configOptions)) - } - } catch { - case e @ (_: RuntimeException | _: JsonProcessingException) => parseMode match { - case PermissiveMode => - Some(StructType(Seq(StructField(columnNameOfCorruptRecord, StringType)))) - case DropMalformedMode => - None - case FailFastMode => - throw new SparkException("Malformed records are detected in schema inference. " + - s"Parse Mode: ${FailFastMode.name}.", e) - } - } + inferForRow(row, configOptions, createParser, factory) }.reduceOption(typeMerger).toIterator } @@ -246,7 +255,7 @@ private[sql] object JsonInferSchema { /** * Remove top-level ArrayType wrappers and merge the remaining schemas */ - private def compatibleRootType( + def compatibleRootType( columnNameOfCorruptRecords: String, parseMode: ParseMode): (DataType, DataType) => DataType = { // Since we support array of json objects at the top level, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 3b6df45e949e8..220c626d2a874 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -27,13 +27,13 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat - import org.apache.spark.TaskContext + import org.apache.spark.input.{PortableDataStream, StreamInputFormat} import org.apache.spark.rdd.{BinaryFileRDD, RDD} import org.apache.spark.sql.{Dataset, Encoders, SparkSession} import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions} +import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JsonInferSchema, JSONOptions} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.text.TextFileFormat From a0c9a1137c5444890f048bd480d63496e31ec599 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 23 Jun 2018 19:29:24 +0200 Subject: [PATCH 2/9] Move typeMerger out of the merge function --- .../expressions/aggregate/InferSchema.scala | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala index f8e16da31cd04..ea11efe7a4b42 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala @@ -147,19 +147,20 @@ case class InferSchema( throw new IllegalArgumentException(s"Wrong input type ${other.getClass.getCanonicalName}") } + private val typeMerger = compatibleRootType( + jsonOptions.columnNameOfCorruptRecord, + jsonOptions.parseMode) + private def mergeSchemas( inferredSchema: UTF8String, currentSchema: Option[UTF8String]): Option[UTF8String] = { currentSchema.flatMap { schema => - val parseMode = jsonOptions.parseMode - val columnNameOfCorruptRecord = jsonOptions.columnNameOfCorruptRecord - val typeMerger = compatibleRootType(columnNameOfCorruptRecord, parseMode) - val inferredType = DataType.fromDDL(inferredSchema.toString) - val currentType = DataType.fromDDL(schema.toString) - - Try(typeMerger(inferredType, currentType)) - .map(dt => UTF8String.fromString(dt.catalogString)) - .toOption + Try { + val inferredType = DataType.fromDDL(inferredSchema.toString) + val currentType = DataType.fromDDL(schema.toString) + + typeMerger(inferredType, currentType) + }.map(dt => UTF8String.fromString(dt.catalogString)).toOption } } } From 17a1f98448194af984de43d4dedad99271e25189 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 23 Jun 2018 22:14:38 +0200 Subject: [PATCH 3/9] SQL test for the infer_schema function --- .../sql/catalyst/json/JsonInferSchema.scala | 2 +- .../datasources/json/JsonDataSource.scala | 2 +- .../sql-tests/inputs/json-functions.sql | 18 ++++++++ .../sql-tests/results/json-functions.sql.out | 43 ++++++++++++++++++- .../datasources/json/JsonSuite.scala | 4 +- 5 files changed, 64 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index 962798e39f31f..c163eefcc6d47 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.util.{DropMalformedMode, FailFastMode, Pars import org.apache.spark.sql.types._ import org.apache.spark.util.Utils -private[sql] object JsonInferSchema { +object JsonInferSchema { def inferForRow[T]( row: T, configOptions: JSONOptions, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala index 220c626d2a874..2fee2128ba1f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonDataSource.scala @@ -27,8 +27,8 @@ import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.io.Text import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat -import org.apache.spark.TaskContext +import org.apache.spark.TaskContext import org.apache.spark.input.{PortableDataStream, StreamInputFormat} import org.apache.spark.rdd.{BinaryFileRDD, RDD} import org.apache.spark.sql.{Dataset, Encoders, SparkSession} diff --git a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql index dc15d13cd1dd3..1749e35a6c35b 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql @@ -35,3 +35,21 @@ DROP VIEW IF EXISTS jsonTable; -- from_json - complex types select from_json('{"a":1, "b":2}', 'map'); select from_json('{"a":1, "b":"2"}', 'struct'); + +-- schema inferring +create temporary view json_structs(json) as select * from values + ('{"a":1}'), + ('{"a" 2}'), + ('{"a": 3}'), + (NULL); + +select infer_schema(json) from json_structs; + +create temporary view json_array(json) as select * from values + ('{"a": [1, 2, 3], "b": ["1"]}'), + ('{"a": [4, 5], "b": []}'), + ('{"a": []}'), + ('{}'), + (NULL); + +select infer_schema(json) from json_array; diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out index 2b3288dc5a137..faf30f431eb14 100644 --- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 28 +-- Number of queries: 32 -- !query 0 @@ -274,3 +274,44 @@ select from_json('{"a":1, "b":"2"}', 'struct') struct> -- !query 27 output {"a":1,"b":"2"} + + +-- !query 28 +create temporary view json_structs(json) as select * from values + ('{"a":1}'), + ('{"a" 2}'), + ('{"a": 3}'), + (NULL) +-- !query 28 schema +struct<> +-- !query 28 output + + + +-- !query 29 +select infer_schema(json) from json_structs +-- !query 29 schema +struct +-- !query 29 output +struct + + +-- !query 30 +create temporary view json_array(json) as select * from values + ('{"a": [1, 2, 3], "b": ["1"]}'), + ('{"a": [4, 5], "b": []}'), + ('{"a": []}'), + ('{}'), + (NULL) +-- !query 30 schema +struct<> +-- !query 30 output + + + +-- !query 31 +select infer_schema(json) from json_array +-- !query 31 schema +struct +-- !query 31 output +struct,b:array> diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index a8a4a524a97f9..0d50e8367c86e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -31,11 +31,11 @@ import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.{SparkException, TestUtils} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{functions => F, _} -import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, JSONOptions} +import org.apache.spark.sql.catalyst.json._ +import org.apache.spark.sql.catalyst.json.JsonInferSchema.compatibleType import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.ExternalRDD import org.apache.spark.sql.execution.datasources.DataSource -import org.apache.spark.sql.execution.datasources.json.JsonInferSchema.compatibleType import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ From 45fc2e419dda2e53f5ff7e7ecbbee64d2bf23cf7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 24 Jun 2018 12:31:44 +0200 Subject: [PATCH 4/9] Pretty name is changed to infer_schema --- .../sql/catalyst/expressions/aggregate/InferSchema.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala index ea11efe7a4b42..82662ae3c936b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala @@ -57,12 +57,10 @@ case class InferSchema( } override def nullable: Boolean = true - override def children: Seq[Expression] = Seq(child) - override def dataType: DataType = StringType - override lazy val deterministic: Boolean = false + override def prettyName: String = "infer_schema" override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate = copy(mutableAggBufferOffset = newMutableAggBufferOffset) From 4db679927e35cc41e2160b91d2435dc653f368a9 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 24 Jun 2018 12:40:01 +0200 Subject: [PATCH 5/9] Refactoring --- .../expressions/aggregate/InferSchema.scala | 14 +++++--------- .../sql-tests/results/json-functions.sql.out | 4 ++-- 2 files changed, 7 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala index 82662ae3c936b..17b014cc21595 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JsonInferSchema, JSONOptions} import org.apache.spark.sql.catalyst.json.JsonInferSchema.compatibleRootType import org.apache.spark.sql.catalyst.util.DropMalformedMode -import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType} +import org.apache.spark.sql.types.{DataType, StringType, StructType} import org.apache.spark.unsafe.types.UTF8String case class InferSchema( @@ -68,21 +68,17 @@ case class InferSchema( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = copy(inputAggBufferOffset = newInputAggBufferOffset) + override def aggBufferAttributes: Seq[AttributeReference] = { + Seq(AttributeReference("infer_schema", StringType)()) + } // Note: although this simply copies aggBufferAttributes, this common code can not be placed // in the superclass because that will lead to initialization ordering issues. override val inputAggBufferAttributes: Seq[AttributeReference] = { aggBufferAttributes.map(_.newInstance()) } - - override def aggBufferSchema: StructType = { - StructType(Seq(StructField("inferred_schema", StringType))) - } + override val aggBufferSchema: StructType = StructType.fromAttributes(aggBufferAttributes) private val indexOfSchemaField = 0 - override def aggBufferAttributes: Seq[AttributeReference] = { - Seq(AttributeReference("inferred_schema", StringType)()) - } - override def initialize(mutableAggBuffer: InternalRow): Unit = { mutableAggBuffer.setNullAt(mutableAggBufferOffset + indexOfSchemaField) } diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out index faf30f431eb14..2a36e53525eea 100644 --- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out @@ -291,7 +291,7 @@ struct<> -- !query 29 select infer_schema(json) from json_structs -- !query 29 schema -struct +struct -- !query 29 output struct @@ -312,6 +312,6 @@ struct<> -- !query 31 select infer_schema(json) from json_array -- !query 31 schema -struct +struct -- !query 31 output struct,b:array> From 7e5ad618b6fba583db85dd1bdb251cc824c80bc8 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 24 Jun 2018 13:32:32 +0200 Subject: [PATCH 6/9] bug fix --- .../sql/catalyst/expressions/aggregate/InferSchema.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala index 17b014cc21595..b6698191ee06f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala @@ -68,8 +68,8 @@ case class InferSchema( override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = copy(inputAggBufferOffset = newInputAggBufferOffset) - override def aggBufferAttributes: Seq[AttributeReference] = { - Seq(AttributeReference("infer_schema", StringType)()) + override val aggBufferAttributes: Seq[AttributeReference] = { + Seq(AttributeReference("infer_schema_agg_buffer", StringType)()) } // Note: although this simply copies aggBufferAttributes, this common code can not be placed // in the superclass because that will lead to initialization ordering issues. From 96e5cd33fbc4711302f9f0cf47e851df66fda524 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 24 Jun 2018 14:05:12 +0200 Subject: [PATCH 7/9] Added description for InferSchema --- .../catalyst/expressions/aggregate/InferSchema.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala index b6698191ee06f..ae43716bbe6b1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala @@ -22,26 +22,28 @@ import scala.util.Try import com.fasterxml.jackson.core.JsonFactory import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, JsonExprUtils} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, ExpressionDescription, JsonExprUtils} import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JsonInferSchema, JSONOptions} import org.apache.spark.sql.catalyst.json.JsonInferSchema.compatibleRootType import org.apache.spark.sql.catalyst.util.DropMalformedMode import org.apache.spark.sql.types.{DataType, StringType, StructType} import org.apache.spark.unsafe.types.UTF8String +@ExpressionDescription( + usage = """_FUNC_(expr,[options]) - Infers schema for JSON `expr` by using JSON `options`.""") case class InferSchema( child: Expression, - inputType: String, + inputFormat: String, options: Map[String, String], override val mutableAggBufferOffset: Int, override val inputAggBufferOffset: Int) extends ImperativeAggregate { - require(inputType.toLowerCase == "json", "Only JSON format is supported") + require(inputFormat.toLowerCase == "json", "Only JSON format is supported") def this(child: Expression) = { this( child = child, - inputType = "json", + inputFormat = "json", options = Map.empty[String, String], mutableAggBufferOffset = 0, inputAggBufferOffset = 0) @@ -50,7 +52,7 @@ case class InferSchema( def this(child: Expression, options: Expression) = { this( child = child, - inputType = "json", + inputFormat = "json", options = JsonExprUtils.convertToMapData(options), mutableAggBufferOffset = 0, inputAggBufferOffset = 0) From 333139da49951df1aee39aeabc286a162dd92ad9 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 24 Jun 2018 14:05:46 +0200 Subject: [PATCH 8/9] Drop views --- .../sql-tests/inputs/json-functions.sql | 2 ++ .../sql-tests/results/json-functions.sql.out | 28 +++++++++++++++---- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql index 1749e35a6c35b..92980dabf22c4 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/json-functions.sql @@ -44,6 +44,7 @@ create temporary view json_structs(json) as select * from values (NULL); select infer_schema(json) from json_structs; +drop view if exists json_structs; create temporary view json_array(json) as select * from values ('{"a": [1, 2, 3], "b": ["1"]}'), @@ -53,3 +54,4 @@ create temporary view json_array(json) as select * from values (NULL); select infer_schema(json) from json_array; +drop view if exists json_array; diff --git a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out index 2a36e53525eea..086eca14a6d95 100644 --- a/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/json-functions.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 32 +-- Number of queries: 34 -- !query 0 @@ -297,21 +297,37 @@ struct -- !query 30 +drop view if exists json_structs +-- !query 30 schema +struct<> +-- !query 30 output + + + +-- !query 31 create temporary view json_array(json) as select * from values ('{"a": [1, 2, 3], "b": ["1"]}'), ('{"a": [4, 5], "b": []}'), ('{"a": []}'), ('{}'), (NULL) --- !query 30 schema +-- !query 31 schema struct<> --- !query 30 output +-- !query 31 output --- !query 31 +-- !query 32 select infer_schema(json) from json_array --- !query 31 schema +-- !query 32 schema struct --- !query 31 output +-- !query 32 output struct,b:array> + + +-- !query 33 +drop view if exists json_array +-- !query 33 schema +struct<> +-- !query 33 output + From 0e3d1de37bfc73cf1cc5aa33b2f5d80aa154a06b Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Tue, 26 Jun 2018 13:49:05 +0200 Subject: [PATCH 9/9] Added an example for the function --- .../sql/catalyst/expressions/aggregate/InferSchema.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala index ae43716bbe6b1..f36e0d9a9f46e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/InferSchema.scala @@ -30,7 +30,14 @@ import org.apache.spark.sql.types.{DataType, StringType, StructType} import org.apache.spark.unsafe.types.UTF8String @ExpressionDescription( - usage = """_FUNC_(expr,[options]) - Infers schema for JSON `expr` by using JSON `options`.""") + usage = """_FUNC_(expr, [options]) - Infers schema for JSON `expr` by using JSON `options`.""", + examples = """ + Examples: + > CREATE TEMPORARY VIEW json_table(json) AS SELECT * FROM VALUES ('{"a":1}'), ('{"a": 3}'); + > SELECT _FUNC_(json) FROM json_table; + struct + """, + since = "2.4.0") case class InferSchema( child: Expression, inputFormat: String,