From 06362b9980c5acc04c5fd071503f5eda91e06b84 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Thu, 15 Oct 2015 13:34:15 +0900 Subject: [PATCH 1/3] [SPARK-11124] JsonParser/Generator should be closed for resource recycle --- .../scala/org/apache/spark/util/Utils.scala | 3 + .../expressions/jsonExpressions.scala | 56 +++++++++---------- .../datasources/json/InferSchema.scala | 7 ++- .../datasources/json/JacksonParser.scala | 39 +++++++------ 4 files changed, 55 insertions(+), 50 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index bd7e51c3b5100..394f42ddf757e 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2153,6 +2153,9 @@ private[spark] object Utils extends Logging { conf.getInt("spark.executor.instances", 0) == 0 } + def withResource[R <: Closeable, T](resource: R, f: () => T): T = { + try f.apply() finally resource.close() + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 0770fab0ae901..8b445a0eefe73 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.types.{StructField, StructType, StringType, DataType} import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.Utils import scala.util.parsing.combinator.RegexParsers @@ -135,16 +136,19 @@ case class GetJsonObject(json: Expression, path: Expression) if (parsed.isDefined) { try { val parser = jsonFactory.createParser(jsonStr.getBytes) - val output = new ByteArrayOutputStream() - val generator = jsonFactory.createGenerator(output, JsonEncoding.UTF8) - parser.nextToken() - val matched = evaluatePath(parser, generator, RawStyle, parsed.get) - generator.close() - if (matched) { - UTF8String.fromBytes(output.toByteArray) - } else { - null - } + Utils.withResource(parser, () => { + val output = new ByteArrayOutputStream() + val generator = jsonFactory.createGenerator(output, JsonEncoding.UTF8) + Utils.withResource(generator, () => { + parser.nextToken() + val matched = evaluatePath(parser, generator, RawStyle, parsed.get) + if (matched) { + UTF8String.fromBytes(output.toByteArray) + } else { + null + } + }) + }) } catch { case _: JsonProcessingException => null } @@ -251,16 +255,18 @@ case class GetJsonObject(json: Expression, path: Expression) // modified slightly if there is only a single element written val buffer = new StringWriter() val flattenGenerator = jsonFactory.createGenerator(buffer) - flattenGenerator.writeStartArray() var dirty = 0 - while (p.nextToken() != END_ARRAY) { - // track the number of array elements and only emit an outer array if - // we've written more than one element, this matches Hive's behavior - dirty += (if (evaluatePath(p, flattenGenerator, nextStyle, xs)) 1 else 0) - } - flattenGenerator.writeEndArray() - flattenGenerator.close() + Utils.withResource(flattenGenerator, () => { + flattenGenerator.writeStartArray() + + while (p.nextToken() != END_ARRAY) { + // track the number of array elements and only emit an outer array if + // we've written more than one element, this matches Hive's behavior + dirty += (if (evaluatePath(p, flattenGenerator, nextStyle, xs)) 1 else 0) + } + flattenGenerator.writeEndArray() + }) val buf = buffer.getBuffer if (dirty > 1) { @@ -371,12 +377,7 @@ case class JsonTuple(children: Seq[Expression]) try { val parser = jsonFactory.createParser(json.getBytes) - - try { - parseRow(parser, input) - } finally { - parser.close() - } + Utils.withResource(parser, () => parseRow(parser, input)) } catch { case _: JsonProcessingException => nullRow @@ -421,12 +422,7 @@ case class JsonTuple(children: Seq[Expression]) // write the output directly to UTF8 encoded byte array if (parser.nextToken() != JsonToken.VALUE_NULL) { val generator = jsonFactory.createGenerator(output, JsonEncoding.UTF8) - - try { - copyCurrentStructure(generator, parser) - } finally { - generator.close() - } + Utils.withResource(generator, () => copyCurrentStructure(generator, parser)) row(idx) = UTF8String.fromBytes(output.toByteArray) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala index b6f3410bad690..1abd1ebedadb0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala @@ -23,6 +23,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils private[sql] object InferSchema { /** @@ -48,8 +49,10 @@ private[sql] object InferSchema { iter.map { row => try { val parser = factory.createParser(row) - parser.nextToken() - inferField(parser) + Utils.withResource(parser, () => { + parser.nextToken() + inferField(parser) + }) } catch { case _: JsonParseException => StructType(Seq(StructField(columnNameOfCorruptRecords, StringType))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index c51140749c8e6..ab11046c399b9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.Utils private[sql] object JacksonParser { def apply( @@ -87,8 +88,7 @@ private[sql] object JacksonParser { case (_, StringType) => val writer = new ByteArrayOutputStream() val generator = factory.createGenerator(writer, JsonEncoding.UTF8) - generator.copyCurrentStructure(parser) - generator.close() + Utils.withResource(generator, () => generator.copyCurrentStructure(parser)) UTF8String.fromBytes(writer.toByteArray) case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, FloatType) => @@ -246,22 +246,25 @@ private[sql] object JacksonParser { iter.flatMap { record => try { val parser = factory.createParser(record) - parser.nextToken() - - convertField(factory, parser, schema) match { - case null => failedRecord(record) - case row: InternalRow => row :: Nil - case array: ArrayData => - if (array.numElements() == 0) { - Nil - } else { - array.toArray[InternalRow](schema) - } - case _ => - sys.error( - s"Failed to parse record $record. Please make sure that each line of the file " + - "(or each string in the RDD) is a valid JSON object or an array of JSON objects.") - } + Utils.withResource(parser, () => { + parser.nextToken() + + convertField(factory, parser, schema) match { + case null => failedRecord(record) + case row: InternalRow => row :: Nil + case array: ArrayData => + if (array.numElements() == 0) { + Nil + } else { + array.toArray[InternalRow](schema) + } + case _ => + sys.error( + s"Failed to parse record $record. Please make sure that each line of the file " + + "(or each string in the RDD) is a valid JSON object or " + + "an array of JSON objects.") + } + }) } catch { case _: JsonProcessingException => failedRecord(record) From 779ec09a8474e725c7804e8fc0baabd4c105975b Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Fri, 16 Oct 2015 10:14:58 +0900 Subject: [PATCH 2/3] addressed comments --- .../scala/org/apache/spark/util/Utils.scala | 5 ++- .../expressions/jsonExpressions.scala | 37 ++++++++++--------- .../datasources/json/InferSchema.scala | 5 +-- .../datasources/json/JacksonParser.scala | 10 ++--- 4 files changed, 29 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 394f42ddf757e..22c05a2479422 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -2153,8 +2153,9 @@ private[spark] object Utils extends Logging { conf.getInt("spark.executor.instances", 0) == 0 } - def withResource[R <: Closeable, T](resource: R, f: () => T): T = { - try f.apply() finally resource.close() + def tryWithResource[R <: Closeable, T](createResource: => R)(f: R => T): T = { + val resource = createResource + try f.apply(resource) finally resource.close() } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 8b445a0eefe73..034ed7a626dee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -135,20 +135,19 @@ case class GetJsonObject(json: Expression, path: Expression) if (parsed.isDefined) { try { - val parser = jsonFactory.createParser(jsonStr.getBytes) - Utils.withResource(parser, () => { + Utils.tryWithResource(jsonFactory.createParser(jsonStr.getBytes)) { parser => val output = new ByteArrayOutputStream() - val generator = jsonFactory.createGenerator(output, JsonEncoding.UTF8) - Utils.withResource(generator, () => { + val matched = Utils.tryWithResource( + jsonFactory.createGenerator(output, JsonEncoding.UTF8)) { generator => parser.nextToken() - val matched = evaluatePath(parser, generator, RawStyle, parsed.get) - if (matched) { - UTF8String.fromBytes(output.toByteArray) - } else { - null - } - }) - }) + evaluatePath(parser, generator, RawStyle, parsed.get) + } + if (matched) { + UTF8String.fromBytes(output.toByteArray) + } else { + null + } + } } catch { case _: JsonProcessingException => null } @@ -257,7 +256,7 @@ case class GetJsonObject(json: Expression, path: Expression) val flattenGenerator = jsonFactory.createGenerator(buffer) var dirty = 0 - Utils.withResource(flattenGenerator, () => { + Utils.tryWithResource(jsonFactory.createGenerator(buffer)) { flattenGenerator => flattenGenerator.writeStartArray() while (p.nextToken() != END_ARRAY) { @@ -266,7 +265,7 @@ case class GetJsonObject(json: Expression, path: Expression) dirty += (if (evaluatePath(p, flattenGenerator, nextStyle, xs)) 1 else 0) } flattenGenerator.writeEndArray() - }) + } val buf = buffer.getBuffer if (dirty > 1) { @@ -376,8 +375,9 @@ case class JsonTuple(children: Seq[Expression]) } try { - val parser = jsonFactory.createParser(json.getBytes) - Utils.withResource(parser, () => parseRow(parser, input)) + Utils.tryWithResource(jsonFactory.createParser(json.getBytes)) { + parser => parseRow(parser, input) + } } catch { case _: JsonProcessingException => nullRow @@ -421,8 +421,9 @@ case class JsonTuple(children: Seq[Expression]) // write the output directly to UTF8 encoded byte array if (parser.nextToken() != JsonToken.VALUE_NULL) { - val generator = jsonFactory.createGenerator(output, JsonEncoding.UTF8) - Utils.withResource(generator, () => copyCurrentStructure(generator, parser)) + Utils.tryWithResource(jsonFactory.createGenerator(output, JsonEncoding.UTF8)) { + generator => copyCurrentStructure(generator, parser) + } row(idx) = UTF8String.fromBytes(output.toByteArray) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala index 1abd1ebedadb0..d0780028dacb1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/InferSchema.scala @@ -48,11 +48,10 @@ private[sql] object InferSchema { val factory = new JsonFactory() iter.map { row => try { - val parser = factory.createParser(row) - Utils.withResource(parser, () => { + Utils.tryWithResource(factory.createParser(row)) { parser => parser.nextToken() inferField(parser) - }) + } } catch { case _: JsonParseException => StructType(Seq(StructField(columnNameOfCorruptRecords, StringType))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala index ab11046c399b9..09b8a9e936a1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JacksonParser.scala @@ -87,8 +87,9 @@ private[sql] object JacksonParser { case (_, StringType) => val writer = new ByteArrayOutputStream() - val generator = factory.createGenerator(writer, JsonEncoding.UTF8) - Utils.withResource(generator, () => generator.copyCurrentStructure(parser)) + Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) { + generator => generator.copyCurrentStructure(parser) + } UTF8String.fromBytes(writer.toByteArray) case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT, FloatType) => @@ -245,8 +246,7 @@ private[sql] object JacksonParser { iter.flatMap { record => try { - val parser = factory.createParser(record) - Utils.withResource(parser, () => { + Utils.tryWithResource(factory.createParser(record)) { parser => parser.nextToken() convertField(factory, parser, schema) match { @@ -264,7 +264,7 @@ private[sql] object JacksonParser { "(or each string in the RDD) is a valid JSON object or " + "an array of JSON objects.") } - }) + } } catch { case _: JsonProcessingException => failedRecord(record) From 2971abcefc1400ae9d16b7080c724b6c2389dda2 Mon Sep 17 00:00:00 2001 From: "navis.ryu" Date: Fri, 16 Oct 2015 15:33:00 +0900 Subject: [PATCH 3/3] remove unnecessary generator --- .../apache/spark/sql/catalyst/expressions/jsonExpressions.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala index 034ed7a626dee..8c9853e628d2c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala @@ -253,7 +253,6 @@ case class GetJsonObject(json: Expression, path: Expression) // temporarily buffer child matches, the emitted json will need to be // modified slightly if there is only a single element written val buffer = new StringWriter() - val flattenGenerator = jsonFactory.createGenerator(buffer) var dirty = 0 Utils.tryWithResource(jsonFactory.createGenerator(buffer)) { flattenGenerator =>