From b7f964d119b5d0ea40896bb86b0110688d8330a8 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Wed, 28 Nov 2018 11:46:14 -0600 Subject: [PATCH 01/34] Added a configurable limit on the maximum length of a plan debug string. --- .../spark/sql/catalyst/trees/TreeNode.scala | 6 ++- .../sql/catalyst/util/SizeLimitedWriter.scala | 47 +++++++++++++++++++ .../spark/sql/catalyst/util/package.scala | 20 ++++++++ .../apache/spark/sql/internal/SQLConf.scala | 8 ++++ .../sql/catalyst/trees/TreeNodeSuite.scala | 17 +++++-- .../util/SizeLimitedWriterSuite.scala | 45 ++++++++++++++++++ 6 files changed, 137 insertions(+), 6 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriter.scala create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriterSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 2e9f9f53e94ac..68b3162e4a24f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} -import org.apache.spark.sql.catalyst.util.truncatedString +import org.apache.spark.sql.catalyst.util.{truncatedString, withSizeLimitedWriter} import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel @@ -484,7 +484,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { writer: Writer, verbose: Boolean, addSuffix: Boolean): Unit = { - generateTreeString(0, Nil, writer, verbose, "", addSuffix) + withSizeLimitedWriter(writer) { w => + generateTreeString(0, Nil, w, verbose, "", addSuffix) + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriter.scala new file mode 100644 index 0000000000000..c267a420b0bae --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriter.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import java.io.Writer + +class WriterSizeException(val attemptedSize: Long, val charLimit: Long) extends Exception( + s"Attempted to write $attemptedSize characters to a writer that is limited to $charLimit") + +/** + * This class is used to control the size of generated writers. Guarantees that the total number + * of characters written will be less than the specified size. + * + * Checks size before writing and throws a WriterSizeException if the total size would count the limit. + */ +class SizeLimitedWriter(underlying: Writer, charLimit: Long) extends Writer { + + var charsWritten: Long = 0 + + override def write(cbuf: Array[Char], off: Int, len: Int): Unit = { + val newLength = charsWritten + Math.min(cbuf.length - off, len) + if(newLength > charLimit) { + throw new WriterSizeException(newLength, charLimit) + } + charsWritten = newLength + underlying.write(cbuf, off, len) + } + + override def flush(): Unit = underlying.flush() + + override def close(): Unit = underlying.close() +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index 277584b20dcd2..830d4028b4ef7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -202,6 +202,26 @@ package object util extends Logging { /** Shorthand for calling truncatedString() without start or end strings. */ def truncatedString[T](seq: Seq[T], sep: String): String = truncatedString(seq, "", sep, "") + /** Whether we have warned about plan string truncation yet. */ + private val planSizeWarningPrinted = new AtomicBoolean(false) + + def withSizeLimitedWriter[T](writer: Writer)(f: (Writer) => T): Option[T] = { + try { + val limited = new SizeLimitedWriter(writer, SQLConf.get.maxPlanStringLength) + Some(f(limited)) + } + catch { + case e: WriterSizeException => + writer.write("...") + if (planSizeWarningPrinted.compareAndSet(false, true)) { + logWarning( + "Truncated the string representation of a plan since it was too long. This " + + s"behavior can be adjusted by setting '${SQLConf.MAX_PLAN_STRING_LENGTH.key}'.") + } + None + } + } + /* FIX ME implicit class debugLogging(a: Any) { def debugLogging() { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7bcf21595ce5a..7a39ace7a882f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1610,6 +1610,12 @@ object SQLConf { """ "... N more fields" placeholder.""") .intConf .createWithDefault(25) + + val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.debug.maxPlanLength") + .doc("Maximum number of characters to output for a plan in debug output. If the plan is " + + "longer, it will end with a ... and further output will be truncated.") + .longConf + .createWithDefault(8192) } /** @@ -2030,6 +2036,8 @@ class SQLConf extends Serializable with Logging { def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS) + def maxPlanStringLength: Long = getConf(SQLConf.MAX_PLAN_STRING_LENGTH) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index 64aa1ee39046d..055af7d2bbf09 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -21,21 +21,20 @@ import java.math.BigInteger import java.util.UUID import scala.collection.mutable.ArrayBuffer - import org.json4s.JsonAST._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods import org.json4s.jackson.JsonMethods._ - import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.dsl.expressions.DslString import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback -import org.apache.spark.sql.catalyst.plans.{LeftOuter, NaturalJoin} -import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Union} +import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, NaturalJoin, UsingJoin} +import org.apache.spark.sql.catalyst.plans.logical.{Join, LeafNode, Union} import org.apache.spark.sql.catalyst.plans.physical.{IdentityBroadcastMode, RoundRobinPartitioning, SinglePartition} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel @@ -595,4 +594,14 @@ class TreeNodeSuite extends SparkFunSuite { val expected = Coalesce(Stream(Literal(1), Literal(3))) assert(result === expected) } + + test("toString() tree depth") { + val ds = (1 until 100).foldLeft(Literal("TestLiteral"): Expression) { case (treeNode, x) => + Add(Literal(x), treeNode) + } + + val planString = ds.treeString + assert(planString.endsWith("...")) + assert(planString.length <= SQLConf.get.maxPlanStringLength) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriterSuite.scala new file mode 100644 index 0000000000000..a0d0c044c126d --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriterSuite.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.util + +import org.apache.commons.io.output.StringBuilderWriter + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.util.StringUtils._ + +class SizeLimitedWriterSuite extends SparkFunSuite { + + test("normal writer under limit") { + val writer = new StringBuilderWriter() + val limited = new SizeLimitedWriter(writer, 100) + limited.write("test") + limited.write("test") + + assert(writer.toString === "testtest") + } + + test("filter pattern") { + val writer = new StringBuilderWriter() + val limited = new SizeLimitedWriter(writer, 5) + assertThrows[WriterSizeException] { + limited.write("test") + limited.write("test") + } + assert(writer.toString === "test") + } +} From 3f4fe1f05ce467763caa906f9434e2f38e8d9fa9 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Wed, 28 Nov 2018 12:05:42 -0600 Subject: [PATCH 02/34] Removed unneeded imports --- .../org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index 055af7d2bbf09..ba506b486d3ed 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -31,8 +31,8 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.dsl.expressions.DslString import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback -import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, NaturalJoin, UsingJoin} -import org.apache.spark.sql.catalyst.plans.logical.{Join, LeafNode, Union} +import org.apache.spark.sql.catalyst.plans.{LeftOuter, NaturalJoin} +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Union} import org.apache.spark.sql.catalyst.plans.physical.{IdentityBroadcastMode, RoundRobinPartitioning, SinglePartition} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ From 30e4348b225a93b6a7a4ea2b7e8ce54dc57778c7 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Wed, 28 Nov 2018 12:09:48 -0600 Subject: [PATCH 03/34] Moved withSizeLimitedWriter to treeString function that uses StringWriter --- .../org/apache/spark/sql/catalyst/trees/TreeNode.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 68b3162e4a24f..ba5b34b191b18 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -473,7 +473,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { def treeString(verbose: Boolean, addSuffix: Boolean = false): String = { val writer = new StringBuilderWriter() try { - treeString(writer, verbose, addSuffix) + withSizeLimitedWriter(writer) { w => + treeString(w, verbose, addSuffix) + } writer.toString } finally { writer.close() @@ -484,9 +486,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { writer: Writer, verbose: Boolean, addSuffix: Boolean): Unit = { - withSizeLimitedWriter(writer) { w => - generateTreeString(0, Nil, w, verbose, "", addSuffix) - } + generateTreeString(0, Nil, writer, verbose, "", addSuffix) } /** From 5528ca12ed625ade3a4dc4f043da838b1005e826 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Wed, 28 Nov 2018 12:13:49 -0600 Subject: [PATCH 04/34] Fixed formatting --- .../scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index ba5b34b191b18..b650d3092dbf3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -486,7 +486,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { writer: Writer, verbose: Boolean, addSuffix: Boolean): Unit = { - generateTreeString(0, Nil, writer, verbose, "", addSuffix) + generateTreeString(0, Nil, writer, verbose, "", addSuffix) } /** From 3171cf31ae8001e5c37b31668154bbf2b6b4411f Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Wed, 28 Nov 2018 17:10:35 -0600 Subject: [PATCH 05/34] Coverted to javadoc style multiline comment --- .../sql/catalyst/util/SizeLimitedWriter.scala | 35 ++++++++++--------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriter.scala index c267a420b0bae..886d137c5df51 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriter.scala @@ -20,28 +20,29 @@ package org.apache.spark.sql.catalyst.util import java.io.Writer class WriterSizeException(val attemptedSize: Long, val charLimit: Long) extends Exception( - s"Attempted to write $attemptedSize characters to a writer that is limited to $charLimit") + s"Attempted to write $attemptedSize characters to a writer that is limited to $charLimit") /** - * This class is used to control the size of generated writers. Guarantees that the total number - * of characters written will be less than the specified size. - * - * Checks size before writing and throws a WriterSizeException if the total size would count the limit. - */ + * This class is used to control the size of generated writers. Guarantees that the total number + * of characters written will be less than the specified size. + * + * Checks size before writing and throws a WriterSizeException if the total size would count the + * limit. + */ class SizeLimitedWriter(underlying: Writer, charLimit: Long) extends Writer { - var charsWritten: Long = 0 + var charsWritten: Long = 0 - override def write(cbuf: Array[Char], off: Int, len: Int): Unit = { - val newLength = charsWritten + Math.min(cbuf.length - off, len) - if(newLength > charLimit) { - throw new WriterSizeException(newLength, charLimit) - } - charsWritten = newLength - underlying.write(cbuf, off, len) - } + override def write(cbuf: Array[Char], off: Int, len: Int): Unit = { + val newLength = charsWritten + Math.min(cbuf.length - off, len) + if (newLength > charLimit) { + throw new WriterSizeException(newLength, charLimit) + } + charsWritten = newLength + underlying.write(cbuf, off, len) + } - override def flush(): Unit = underlying.flush() + override def flush(): Unit = underlying.flush() - override def close(): Unit = underlying.close() + override def close(): Unit = underlying.close() } From 3ffdc6a13370be2f7cd03bea0e48f8e5ef62ccca Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Wed, 28 Nov 2018 17:39:00 -0600 Subject: [PATCH 06/34] Fixed scalastyle formatting of imports --- .../org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index ba506b486d3ed..c1fa03def250d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -21,10 +21,12 @@ import java.math.BigInteger import java.util.UUID import scala.collection.mutable.ArrayBuffer + import org.json4s.JsonAST._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods import org.json4s.jackson.JsonMethods._ + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ From 45a60fc7f9f4a0c04eae5ae10be68c5aba3dc3e1 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Wed, 28 Nov 2018 21:35:20 -0600 Subject: [PATCH 07/34] Have size limit cut off right at the correct number of characters. --- .../sql/catalyst/util/SizeLimitedWriter.scala | 16 ++++++++-------- .../apache/spark/sql/catalyst/util/package.scala | 3 ++- .../spark/sql/catalyst/trees/TreeNodeSuite.scala | 4 ++-- .../catalyst/util/SizeLimitedWriterSuite.scala | 4 ++-- 4 files changed, 14 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriter.scala index 886d137c5df51..143bb6712741d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriter.scala @@ -19,8 +19,8 @@ package org.apache.spark.sql.catalyst.util import java.io.Writer -class WriterSizeException(val attemptedSize: Long, val charLimit: Long) extends Exception( - s"Attempted to write $attemptedSize characters to a writer that is limited to $charLimit") +class WriterSizeException(val extraChars: Long, val charLimit: Long) extends Exception( + s"Writer reached limit of $charLimit characters. $extraChars extra characters ignored.") /** * This class is used to control the size of generated writers. Guarantees that the total number @@ -31,15 +31,15 @@ class WriterSizeException(val attemptedSize: Long, val charLimit: Long) extends */ class SizeLimitedWriter(underlying: Writer, charLimit: Long) extends Writer { - var charsWritten: Long = 0 + private var charsWritten: Long = 0 override def write(cbuf: Array[Char], off: Int, len: Int): Unit = { - val newLength = charsWritten + Math.min(cbuf.length - off, len) - if (newLength > charLimit) { - throw new WriterSizeException(newLength, charLimit) + val charsToWrite = Math.min(charLimit - charsWritten, len).toInt + underlying.write(cbuf, off, charsToWrite) + charsWritten += charsToWrite + if (charsToWrite < len) { + throw new WriterSizeException(len - charsToWrite, charLimit) } - charsWritten = newLength - underlying.write(cbuf, off, len) } override def flush(): Unit = underlying.flush() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index 830d4028b4ef7..a84cbabd7b099 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -207,7 +207,8 @@ package object util extends Logging { def withSizeLimitedWriter[T](writer: Writer)(f: (Writer) => T): Option[T] = { try { - val limited = new SizeLimitedWriter(writer, SQLConf.get.maxPlanStringLength) + // Subtract 3 from the string length to leave room for the "..." + val limited = new SizeLimitedWriter(writer, SQLConf.get.maxPlanStringLength - 3) Some(f(limited)) } catch { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index c1fa03def250d..3791dc21fdb2d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -597,13 +597,13 @@ class TreeNodeSuite extends SparkFunSuite { assert(result === expected) } - test("toString() tree depth") { + test("treeString limits plan length") { val ds = (1 until 100).foldLeft(Literal("TestLiteral"): Expression) { case (treeNode, x) => Add(Literal(x), treeNode) } val planString = ds.treeString assert(planString.endsWith("...")) - assert(planString.length <= SQLConf.get.maxPlanStringLength) + assert(planString.length === SQLConf.get.maxPlanStringLength) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriterSuite.scala index a0d0c044c126d..8e07b885568a4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriterSuite.scala @@ -33,13 +33,13 @@ class SizeLimitedWriterSuite extends SparkFunSuite { assert(writer.toString === "testtest") } - test("filter pattern") { + test("truncate at the limit") { val writer = new StringBuilderWriter() val limited = new SizeLimitedWriter(writer, 5) assertThrows[WriterSizeException] { limited.write("test") limited.write("test") } - assert(writer.toString === "test") + assert(writer.toString === "testt") } } From 9678799f3b203d667c2f4b27dcedd3591606a8cf Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Wed, 28 Nov 2018 21:49:56 -0600 Subject: [PATCH 08/34] Changed name to remove the debug from the config parameter name Added the limit in for query execution --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- .../scala/org/apache/spark/sql/execution/QueryExecution.scala | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7a39ace7a882f..12581c5f1ea87 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1611,8 +1611,8 @@ object SQLConf { .intConf .createWithDefault(25) - val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.debug.maxPlanLength") - .doc("Maximum number of characters to output for a plan in debug output. If the plan is " + + val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.maxPlanLength") + .doc("Maximum number of characters to output for a plan string. If the plan is " + "longer, it will end with a ... and further output will be truncated.") .longConf .createWithDefault(8192) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index cfb5e43207b03..f2c18afad72d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.truncatedString +import org.apache.spark.sql.catalyst.util.withSizeLimitedWriter import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand} import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _} @@ -202,7 +203,8 @@ class QueryExecution( } private def writeOrError(writer: Writer)(f: Writer => Unit): Unit = { - try f(writer) + try + withSizeLimitedWriter(writer)(f) catch { case e: AnalysisException => writer.write(e.toString) } From a5af8426779d22ba620e188e4411630db6d2e52f Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Wed, 28 Nov 2018 22:05:12 -0600 Subject: [PATCH 09/34] Fixed formatting error --- .../main/scala/org/apache/spark/sql/catalyst/util/package.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index a84cbabd7b099..acc6b4248515f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -207,7 +207,7 @@ package object util extends Logging { def withSizeLimitedWriter[T](writer: Writer)(f: (Writer) => T): Option[T] = { try { - // Subtract 3 from the string length to leave room for the "..." + // Subtract 3 from the string length to leave room for the "..." val limited = new SizeLimitedWriter(writer, SQLConf.get.maxPlanStringLength - 3) Some(f(limited)) } From a4be985b5e7eeed43409ba0b0cda2a45c44e5110 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Mon, 3 Dec 2018 17:23:26 -0600 Subject: [PATCH 10/34] Changed length default to Long.MaxValue to turn off behavior unless it is specifically configured. --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 12581c5f1ea87..831cc39cdb78c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1615,7 +1615,7 @@ object SQLConf { .doc("Maximum number of characters to output for a plan string. If the plan is " + "longer, it will end with a ... and further output will be truncated.") .longConf - .createWithDefault(8192) + .createWithDefault(Long.MaxValue) } /** From 1b692a0444a1c0f1fc24a08241f24dd35e4c428b Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Tue, 4 Dec 2018 13:05:06 -0600 Subject: [PATCH 11/34] Added test case for not limiting plan length and tested with a default value. --- .../org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index 3791dc21fdb2d..c478d6fad35ef 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.dsl.expressions.DslString import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback -import org.apache.spark.sql.catalyst.plans.{LeftOuter, NaturalJoin} +import org.apache.spark.sql.catalyst.plans.{LeftOuter, NaturalJoin, SQLHelper} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, Union} import org.apache.spark.sql.catalyst.plans.physical.{IdentityBroadcastMode, RoundRobinPartitioning, SinglePartition} import org.apache.spark.sql.internal.SQLConf @@ -82,7 +82,7 @@ case class SelfReferenceUDF( def apply(key: String): Boolean = config.contains(key) } -class TreeNodeSuite extends SparkFunSuite { +class TreeNodeSuite extends SparkFunSuite with SQLHelper { test("top node changed") { val after = Literal(1) transform { case Literal(1, _) => Literal(2) } assert(after === Literal(2)) From 22fe117656ea004757efaffd847f81dc01df8433 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Tue, 4 Dec 2018 17:03:30 -0600 Subject: [PATCH 12/34] Correctly added test case missed in the previous commit --- .../sql/catalyst/trees/TreeNodeSuite.scala | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index c478d6fad35ef..a903b274b6382 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -597,13 +597,24 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper { assert(result === expected) } - test("treeString limits plan length") { - val ds = (1 until 100).foldLeft(Literal("TestLiteral"): Expression) { case (treeNode, x) => + test("treeString default doesn't limit plan length") { + val ds = (1 until 20).foldLeft(Literal("TestLiteral"): Expression) { case (treeNode, x) => Add(Literal(x), treeNode) } val planString = ds.treeString - assert(planString.endsWith("...")) - assert(planString.length === SQLConf.get.maxPlanStringLength) + assert(!planString.endsWith("...")) + } + + test("treeString limits plan length") { + withSQLConf(SQLConf.MAX_PLAN_STRING_LENGTH.key -> "200") { + val ds = (1 until 20).foldLeft(Literal("TestLiteral"): Expression) { case (treeNode, x) => + Add(Literal(x), treeNode) + } + + val planString = ds.treeString + assert(planString.endsWith("...")) + assert(planString.length === SQLConf.get.maxPlanStringLength) + } } } From be3f265ecf9d154eeed98e79dc6bf7cac7297118 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Sat, 8 Dec 2018 11:20:43 -0600 Subject: [PATCH 13/34] Added more documentation of the plan length parameter. --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index ada3cd828457c..4979e25be9e0f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1613,7 +1613,10 @@ object SQLConf { val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.maxPlanLength") .doc("Maximum number of characters to output for a plan string. If the plan is " + - "longer, it will end with a ... and further output will be truncated.") + "longer, it will end with \"...\" and further output will be truncated. The default " + + "setting always generates a full plan. Set this to a lower value such as 8192 if plan " + + "strings are taking up too much memory or are causing OutOfMemory errors in the driver or " + + "UI processes.") .longConf .createWithDefault(Long.MaxValue) From 855f5404c53eba51ed373fa9e7be4eaafd60bb30 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Sat, 8 Dec 2018 12:50:11 -0600 Subject: [PATCH 14/34] Removed tab --- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 29adf872a79e4..437759e78f52d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1626,7 +1626,7 @@ object SQLConf { "longer, it will end with \"...\" and further output will be truncated. The default " + "setting always generates a full plan. Set this to a lower value such as 8192 if plan " + "strings are taking up too much memory or are causing OutOfMemory errors in the driver or " + - "UI processes.") + "UI processes.") .longConf .createWithDefault(Long.MaxValue) From 2eecbfac0a60dc5a49ef359ef748eaec940e244b Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Sun, 20 Jan 2019 20:05:02 -0600 Subject: [PATCH 15/34] Added plan size limits to StringConcat --- .../spark/sql/catalyst/plans/QueryPlan.scala | 4 +- .../spark/sql/catalyst/trees/TreeNode.scala | 9 ++-- .../sql/catalyst/util/SizeLimitedWriter.scala | 48 ------------------- .../spark/sql/catalyst/util/StringUtils.scala | 45 +++++++++++++---- .../spark/sql/catalyst/util/package.scala | 21 -------- .../apache/spark/sql/internal/SQLConf.scala | 13 +++-- .../util/SizeLimitedWriterSuite.scala | 45 ----------------- .../sql/catalyst/util/StringUtilsSuite.scala | 22 +++++++++ .../spark/sql/execution/QueryExecution.scala | 27 ++++------- .../sql/execution/WholeStageCodegenExec.scala | 8 ++-- 10 files changed, 86 insertions(+), 156 deletions(-) delete mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriter.scala delete mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriterSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 8f5444ed8a5a7..a9fffc1aba41f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -308,10 +308,10 @@ object QueryPlan extends PredicateHelper { */ def append[T <: QueryPlan[T]]( plan: => QueryPlan[T], - append: String => Unit, + append: String => Boolean, verbose: Boolean, addSuffix: Boolean, - maxFields: Int = SQLConf.get.maxToStringFields): Unit = { + maxFields: Int = SQLConf.get.maxToStringFields): Boolean = { try { plan.treeString(append, verbose, addSuffix, maxFields) } catch { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index d214ebb309031..6e54855afa869 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -487,10 +487,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { } def treeString( - append: String => Unit, + append: String => Boolean, verbose: Boolean, addSuffix: Boolean, - maxFields: Int): Unit = { + maxFields: Int): Boolean = { generateTreeString(0, Nil, append, verbose, "", addSuffix, maxFields) } @@ -554,11 +554,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { def generateTreeString( depth: Int, lastChildren: Seq[Boolean], - append: String => Unit, + append: String => Boolean, verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, - maxFields: Int): Unit = { + maxFields: Int): Boolean = { if (depth > 0) { lastChildren.init.foreach { isLast => @@ -591,6 +591,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { children.last.generateTreeString( depth + 1, lastChildren :+ true, append, verbose, prefix, addSuffix, maxFields) } + append("") } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriter.scala deleted file mode 100644 index 143bb6712741d..0000000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriter.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.util - -import java.io.Writer - -class WriterSizeException(val extraChars: Long, val charLimit: Long) extends Exception( - s"Writer reached limit of $charLimit characters. $extraChars extra characters ignored.") - -/** - * This class is used to control the size of generated writers. Guarantees that the total number - * of characters written will be less than the specified size. - * - * Checks size before writing and throws a WriterSizeException if the total size would count the - * limit. - */ -class SizeLimitedWriter(underlying: Writer, charLimit: Long) extends Writer { - - private var charsWritten: Long = 0 - - override def write(cbuf: Array[Char], off: Int, len: Int): Unit = { - val charsToWrite = Math.min(charLimit - charsWritten, len).toInt - underlying.write(cbuf, off, charsToWrite) - charsWritten += charsToWrite - if (charsToWrite < len) { - throw new WriterSizeException(len - charsToWrite, charLimit) - } - } - - override def flush(): Unit = underlying.flush() - - override def close(): Unit = underlying.close() -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala index 643b83b1741ae..2dade278c3130 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala @@ -17,14 +17,17 @@ package org.apache.spark.sql.catalyst.util +import java.util.concurrent.atomic.AtomicBoolean import java.util.regex.{Pattern, PatternSyntaxException} import scala.collection.mutable.ArrayBuffer +import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.unsafe.types.UTF8String -object StringUtils { +object StringUtils extends Logging { /** * Validate and convert SQL 'like' pattern to a Java regular expression. @@ -92,21 +95,26 @@ object StringUtils { /** * Concatenation of sequence of strings to final string with cheap append method - * and one memory allocation for the final string. + * and one memory allocation for the final string. Can also bound the final size of + * the string. */ - class StringConcat { + class StringConcat(val maxLength: Int = Integer.MAX_VALUE) { private val strings = new ArrayBuffer[String] private var length: Int = 0 + def atLimit: Boolean = length >= maxLength + /** * Appends a string and accumulates its length to allocate a string buffer for all - * appended strings once in the toString method. + * appended strings once in the toString method. Returns true if the string still + * has room for further appends before it hits its max limit. */ - def append(s: String): Unit = { - if (s != null) { + def append(s: String): Boolean = { + if (!atLimit && s != null) { strings.append(s) length += s.length } + return !atLimit } /** @@ -114,9 +122,30 @@ object StringUtils { * returns concatenated string. */ override def toString: String = { - val result = new java.lang.StringBuilder(length) - strings.foreach(result.append) + val finalLength = Math.min(length, maxLength) + val result = new java.lang.StringBuilder(finalLength) + strings.dropRight(1).foreach(result.append) + strings.lastOption.foreach { s => + val lastLength = Math.min(s.length, maxLength - result.length()) + result.append(s, 0, lastLength) + } result.toString } } + + /** Whether we have warned about plan string truncation yet. */ + private val planSizeWarningPrinted = new AtomicBoolean(false) + + /** A string concatenator for plan strings. Uses length from a configured value, and + * prints a warning the first time a plan is truncated. */ + class PlanStringConcat extends StringConcat(SQLConf.get.maxPlanStringLength) { + override def toString: String = { + if (atLimit && planSizeWarningPrinted.compareAndSet(false, true)) { + logWarning( + "Truncated the string representation of a plan since it was too long. This " + + s"behavior can be adjusted by setting '${SQLConf.MAX_PLAN_STRING_LENGTH.key}'.") + } + super.toString + } + } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index a2c95d13cfc97..7f5860e12cfd5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -204,27 +204,6 @@ package object util extends Logging { truncatedString(seq, "", sep, "", maxFields) } - /** Whether we have warned about plan string truncation yet. */ - private val planSizeWarningPrinted = new AtomicBoolean(false) - - def withSizeLimitedWriter[T](writer: Writer)(f: (Writer) => T): Option[T] = { - try { - // Subtract 3 from the string length to leave room for the "..." - val limited = new SizeLimitedWriter(writer, SQLConf.get.maxPlanStringLength - 3) - Some(f(limited)) - } - catch { - case e: WriterSizeException => - writer.write("...") - if (planSizeWarningPrinted.compareAndSet(false, true)) { - logWarning( - "Truncated the string representation of a plan since it was too long. This " + - s"behavior can be adjusted by setting '${SQLConf.MAX_PLAN_STRING_LENGTH.key}'.") - } - None - } - } - /* FIX ME implicit class debugLogging(a: Any) { def debugLogging() { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 2bd9f9dc6d0cd..7e4acb556f3e1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1627,12 +1627,11 @@ object SQLConf { val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.maxPlanLength") .doc("Maximum number of characters to output for a plan string. If the plan is " + - "longer, it will end with \"...\" and further output will be truncated. The default " + - "setting always generates a full plan. Set this to a lower value such as 8192 if plan " + - "strings are taking up too much memory or are causing OutOfMemory errors in the driver or " + - "UI processes.") - .longConf - .createWithDefault(Long.MaxValue) + "longer, further output will be truncated. The default setting always generates a full " + + "plan. Set this to a lower value such as 8192 if plan strings are taking up too much " + + "memory or are causing OutOfMemory errors in the driver or UI processes.") + .intConf + .createWithDefault(Int.MaxValue) val SET_COMMAND_REJECTS_SPARK_CORE_CONFS = buildConf("spark.sql.legacy.setCommandRejectsSparkCoreConfs") @@ -2065,7 +2064,7 @@ class SQLConf extends Serializable with Logging { def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS) - def maxPlanStringLength: Long = getConf(SQLConf.MAX_PLAN_STRING_LENGTH) + def maxPlanStringLength: Int = getConf(SQLConf.MAX_PLAN_STRING_LENGTH) def setCommandRejectsSparkCoreConfs: Boolean = getConf(SQLConf.SET_COMMAND_REJECTS_SPARK_CORE_CONFS) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriterSuite.scala deleted file mode 100644 index 8e07b885568a4..0000000000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/SizeLimitedWriterSuite.scala +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.util - -import org.apache.commons.io.output.StringBuilderWriter - -import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.catalyst.util.StringUtils._ - -class SizeLimitedWriterSuite extends SparkFunSuite { - - test("normal writer under limit") { - val writer = new StringBuilderWriter() - val limited = new SizeLimitedWriter(writer, 100) - limited.write("test") - limited.write("test") - - assert(writer.toString === "testtest") - } - - test("truncate at the limit") { - val writer = new StringBuilderWriter() - val limited = new SizeLimitedWriter(writer, 5) - assertThrows[WriterSizeException] { - limited.write("test") - limited.write("test") - } - assert(writer.toString === "testt") - } -} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala index 616ec12032dbd..3a73bb3e8100b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala @@ -56,4 +56,26 @@ class StringUtilsSuite extends SparkFunSuite { assert(concat("1", "2") == "12") assert(concat("abc", "\n", "123") == "abc\n123") } + + test("string concatenation with limit") { + def concat(seq: String*): String = { + seq.foldLeft(new StringConcat(7))((acc, s) => {acc.append(s); acc}).toString + } + assert(concat("under") == "under") + assert(concat("under", "over", "extra") == "underov") + assert(concat("underover") == "underov") + assert(concat("under", "ov") == "underov") + } + + test("string concatenation return value") { + assert(new StringConcat(7).append("under") == true) + assert(new StringConcat(7).append("underover") == false) + assert(new StringConcat(7).append("underov") == false) + } + + test("string concatenation append after limit") { + val concat = new StringConcat(7) + concat.append("underover") + assert(concat.append("extra") == false) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index d7512349309bd..5c5362a791277 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution import java.io.{BufferedWriter, OutputStreamWriter} import org.apache.hadoop.fs.Path - import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker} @@ -28,9 +27,8 @@ import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat +import org.apache.spark.sql.catalyst.util.StringUtils.{PlanStringConcat, StringConcat} import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.catalyst.util.withSizeLimitedWriter import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.Utils @@ -106,22 +104,14 @@ class QueryExecution( ReuseSubquery(sparkSession.sessionState.conf)) def simpleString: String = withRedaction { - val concat = new StringConcat() + val concat = new PlanStringConcat() concat.append("== Physical Plan ==\n") QueryPlan.append(executedPlan, concat.append, verbose = false, addSuffix = false) concat.append("\n") concat.toString } -// private def writeOrError(writer: Writer)(f: Writer => Unit): Unit = { -// try -// withSizeLimitedWriter(writer)(f) -// catch { -// case e: AnalysisException => writer.write(e.toString) -// } -// } -// - private def writePlans(append: String => Unit, maxFields: Int): Unit = { + private def writePlans(append: String => Boolean, maxFields: Int): Boolean = { val (verbose, addSuffix) = (true, false) append("== Parsed Logical Plan ==\n") QueryPlan.append(logical, append, verbose, addSuffix, maxFields) @@ -142,13 +132,13 @@ class QueryExecution( } override def toString: String = withRedaction { - val concat = new StringConcat() + val concat = new PlanStringConcat() writePlans(concat.append, SQLConf.get.maxToStringFields) concat.toString } def stringWithStats: String = withRedaction { - val concat = new StringConcat() + val concat = new PlanStringConcat() val maxFields = SQLConf.get.maxToStringFields // trigger to compute stats for logical plans @@ -203,9 +193,12 @@ class QueryExecution( val filePath = new Path(path) val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf()) val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath))) - + val append = (s: String) => { + writer.write(s) + true + } try { - writePlans(writer.write, maxFields) + writePlans(append, maxFields) writer.write("\n== Whole Stage Codegen ==\n") org.apache.spark.sql.execution.debug.writeCodegen(writer.write, executedPlan) } finally { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 3b0a99669ccd0..b315828a872fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -493,11 +493,11 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod override def generateTreeString( depth: Int, lastChildren: Seq[Boolean], - append: String => Unit, + append: String => Boolean, verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, - maxFields: Int): Unit = { + maxFields: Int): Boolean = { child.generateTreeString( depth, lastChildren, @@ -777,11 +777,11 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) override def generateTreeString( depth: Int, lastChildren: Seq[Boolean], - append: String => Unit, + append: String => Boolean, verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, - maxFields: Int): Unit = { + maxFields: Int): Boolean = { child.generateTreeString( depth, lastChildren, From 4082aa319b0d94911da7c17ec95ef673edab4307 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Sun, 20 Jan 2019 20:16:08 -0600 Subject: [PATCH 16/34] Scalastyle --- .../scala/org/apache/spark/sql/execution/QueryExecution.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 5c5362a791277..3564d51b3a015 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution import java.io.{BufferedWriter, OutputStreamWriter} import org.apache.hadoop.fs.Path + import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker} From f9085e72c4464ec9ed2affa2d5c052a3c8b30628 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Mon, 21 Jan 2019 15:11:25 -0600 Subject: [PATCH 17/34] Incorporated changes from code review --- .../spark/sql/catalyst/plans/QueryPlan.scala | 2 +- .../spark/sql/catalyst/trees/TreeNode.scala | 9 ++++----- .../spark/sql/catalyst/util/StringUtils.scala | 18 +++++++++++++----- .../apache/spark/sql/internal/SQLConf.scala | 5 ++++- 4 files changed, 22 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index a9fffc1aba41f..a692c72e1babf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -311,7 +311,7 @@ object QueryPlan extends PredicateHelper { append: String => Boolean, verbose: Boolean, addSuffix: Boolean, - maxFields: Int = SQLConf.get.maxToStringFields): Boolean = { + maxFields: Int = SQLConf.get.maxToStringFields): Unit = { try { plan.treeString(append, verbose, addSuffix, maxFields) } catch { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 6e54855afa869..419aeb687532d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} -import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat +import org.apache.spark.sql.catalyst.util.StringUtils.{PlanStringConcat, StringConcat} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -480,7 +480,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { verbose: Boolean, addSuffix: Boolean = false, maxFields: Int = SQLConf.get.maxToStringFields): String = { - val concat = new StringConcat() + val concat = new PlanStringConcat() treeString(concat.append, verbose, addSuffix, maxFields) concat.toString @@ -490,7 +490,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { append: String => Boolean, verbose: Boolean, addSuffix: Boolean, - maxFields: Int): Boolean = { + maxFields: Int): Unit = { generateTreeString(0, Nil, append, verbose, "", addSuffix, maxFields) } @@ -558,7 +558,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, - maxFields: Int): Boolean = { + maxFields: Int): Unit = { if (depth > 0) { lastChildren.init.foreach { isLast => @@ -591,7 +591,6 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { children.last.generateTreeString( depth + 1, lastChildren :+ true, append, verbose, prefix, addSuffix, maxFields) } - append("") } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala index 2dade278c3130..22849797ebb19 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala @@ -25,6 +25,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.unsafe.types.UTF8String object StringUtils extends Logging { @@ -98,7 +99,7 @@ object StringUtils extends Logging { * and one memory allocation for the final string. Can also bound the final size of * the string. */ - class StringConcat(val maxLength: Int = Integer.MAX_VALUE) { + class StringConcat(val maxLength: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { private val strings = new ArrayBuffer[String] private var length: Int = 0 @@ -124,10 +125,17 @@ object StringUtils extends Logging { override def toString: String = { val finalLength = Math.min(length, maxLength) val result = new java.lang.StringBuilder(finalLength) - strings.dropRight(1).foreach(result.append) - strings.lastOption.foreach { s => - val lastLength = Math.min(s.length, maxLength - result.length()) - result.append(s, 0, lastLength) + var ix = 0 + while(ix < strings.length) { + var s = strings(ix) + if(ix < strings.length - 1) { + result.append(s) + } + else { + val lastLength = Math.min(s.length, maxLength - result.length()) + result.append(s, 0, lastLength) + } + ix += 1 } result.toString } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 7e4acb556f3e1..1b22fa84dae58 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1631,7 +1631,10 @@ object SQLConf { "plan. Set this to a lower value such as 8192 if plan strings are taking up too much " + "memory or are causing OutOfMemory errors in the driver or UI processes.") .intConf - .createWithDefault(Int.MaxValue) + .checkValue(i => i >= 0 && i <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, "Invalid " + + "value for 'spark.sql.maxPlanLength'. Length must be a valid string length (nonnegative " + + "and shorter than the maximum size).") + .createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) val SET_COMMAND_REJECTS_SPARK_CORE_CONFS = buildConf("spark.sql.legacy.setCommandRejectsSparkCoreConfs") From b3d43b7d1b2ac02053f9c51110eb2b8e353b732c Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Mon, 21 Jan 2019 15:13:26 -0600 Subject: [PATCH 18/34] Missed one error --- .../scala/org/apache/spark/sql/execution/QueryExecution.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 3564d51b3a015..9f458621fea61 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -112,7 +112,7 @@ class QueryExecution( concat.toString } - private def writePlans(append: String => Boolean, maxFields: Int): Boolean = { + private def writePlans(append: String => Boolean, maxFields: Int): Unit = { val (verbose, addSuffix) = (true, false) append("== Parsed Logical Plan ==\n") QueryPlan.append(logical, append, verbose, addSuffix, maxFields) From e470ab2f7753c7af9f2dc861deb56d3259271c66 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Mon, 21 Jan 2019 15:36:20 -0600 Subject: [PATCH 19/34] Cleaned up append function and added tracking of the full plan length. --- .../spark/sql/catalyst/util/StringUtils.scala | 35 +++++++++---------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala index 22849797ebb19..26a6b619536c1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala @@ -101,7 +101,11 @@ object StringUtils extends Logging { */ class StringConcat(val maxLength: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { private val strings = new ArrayBuffer[String] - private var length: Int = 0 + protected var length: Int = 0 + + // This tracks the full length of all appended strings, returns how long a + // non-truncated the string would be. + protected var appendedLength: Int = 0 def atLimit: Boolean = length >= maxLength @@ -111,9 +115,13 @@ object StringUtils extends Logging { * has room for further appends before it hits its max limit. */ def append(s: String): Boolean = { + val sLen = s.length + appendedLength += sLen if (!atLimit && s != null) { - strings.append(s) - length += s.length + val available = maxLength - length + val stringToAppend = if (available >= sLen) s else s.substring(0, available) + strings.append(stringToAppend) + length += stringToAppend.length } return !atLimit } @@ -123,20 +131,8 @@ object StringUtils extends Logging { * returns concatenated string. */ override def toString: String = { - val finalLength = Math.min(length, maxLength) - val result = new java.lang.StringBuilder(finalLength) - var ix = 0 - while(ix < strings.length) { - var s = strings(ix) - if(ix < strings.length - 1) { - result.append(s) - } - else { - val lastLength = Math.min(s.length, maxLength - result.length()) - result.append(s, 0, lastLength) - } - ix += 1 - } + val result = new java.lang.StringBuilder(length) + strings.foreach(result.append) result.toString } } @@ -150,8 +146,9 @@ object StringUtils extends Logging { override def toString: String = { if (atLimit && planSizeWarningPrinted.compareAndSet(false, true)) { logWarning( - "Truncated the string representation of a plan since it was too long. This " + - s"behavior can be adjusted by setting '${SQLConf.MAX_PLAN_STRING_LENGTH.key}'.") + "Truncated the string representation of a plan since it was too long. The " + + s"plan had length ${appendedLength} and the maximum is ${length}. This behavior " + + "can be adjusted by setting '${SQLConf.MAX_PLAN_STRING_LENGTH.key}'.") } super.toString } From 35bc1d5f5d7733ea055a02c6f64e9ba61d8af0e7 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Mon, 21 Jan 2019 15:46:54 -0600 Subject: [PATCH 20/34] Got rid of unneeded "availableLength" flag. --- .../org/apache/spark/sql/catalyst/util/StringUtils.scala | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala index 26a6b619536c1..315d14d7cca59 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala @@ -103,10 +103,6 @@ object StringUtils extends Logging { private val strings = new ArrayBuffer[String] protected var length: Int = 0 - // This tracks the full length of all appended strings, returns how long a - // non-truncated the string would be. - protected var appendedLength: Int = 0 - def atLimit: Boolean = length >= maxLength /** @@ -116,13 +112,12 @@ object StringUtils extends Logging { */ def append(s: String): Boolean = { val sLen = s.length - appendedLength += sLen if (!atLimit && s != null) { val available = maxLength - length val stringToAppend = if (available >= sLen) s else s.substring(0, available) strings.append(stringToAppend) - length += stringToAppend.length } + length += sLen return !atLimit } @@ -147,7 +142,7 @@ object StringUtils extends Logging { if (atLimit && planSizeWarningPrinted.compareAndSet(false, true)) { logWarning( "Truncated the string representation of a plan since it was too long. The " + - s"plan had length ${appendedLength} and the maximum is ${length}. This behavior " + + s"plan had length ${length} and the maximum is ${maxLength}. This behavior " + "can be adjusted by setting '${SQLConf.MAX_PLAN_STRING_LENGTH.key}'.") } super.toString From 5ec58c8443886d4d2e7b5bbadf86a8307f6a4dc3 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Mon, 21 Jan 2019 16:14:06 -0600 Subject: [PATCH 21/34] Fixed errors missed --- .../apache/spark/sql/execution/WholeStageCodegenExec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index b315828a872fa..9542f5ed4e808 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -497,7 +497,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, - maxFields: Int): Boolean = { + maxFields: Int): Unit = { child.generateTreeString( depth, lastChildren, @@ -781,7 +781,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, - maxFields: Int): Boolean = { + maxFields: Int): Unit = { child.generateTreeString( depth, lastChildren, From bdfaf288c96b084f5171d5495e01db3d1cf6591b Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Mon, 21 Jan 2019 22:52:26 -0600 Subject: [PATCH 22/34] Updated to handle nulls again. Updated test now that plans don't end with ... --- .../spark/sql/catalyst/util/StringUtils.scala | 16 +++++++++------- .../spark/sql/catalyst/trees/TreeNodeSuite.scala | 3 +-- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala index 315d14d7cca59..8f45565564796 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala @@ -111,13 +111,15 @@ object StringUtils extends Logging { * has room for further appends before it hits its max limit. */ def append(s: String): Boolean = { - val sLen = s.length - if (!atLimit && s != null) { - val available = maxLength - length - val stringToAppend = if (available >= sLen) s else s.substring(0, available) - strings.append(stringToAppend) - } - length += sLen + if (s != null) { + val sLen = s.length + if (!atLimit) { + val available = maxLength - length + val stringToAppend = if (available >= sLen) s else s.substring(0, available) + strings.append(stringToAppend) + } + length += sLen + } return !atLimit } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index a903b274b6382..f198691017843 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -603,7 +603,7 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper { } val planString = ds.treeString - assert(!planString.endsWith("...")) + assert(planString.length > 250) } test("treeString limits plan length") { @@ -613,7 +613,6 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper { } val planString = ds.treeString - assert(planString.endsWith("...")) assert(planString.length === SQLConf.get.maxPlanStringLength) } } From 0cfcb4ed911834eeb933a3ef4af336eaec7069ae Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Mon, 21 Jan 2019 23:46:54 -0600 Subject: [PATCH 23/34] Tabs to spaces --- .../spark/sql/catalyst/util/StringUtils.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala index 8f45565564796..761f1487173c2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala @@ -111,15 +111,15 @@ object StringUtils extends Logging { * has room for further appends before it hits its max limit. */ def append(s: String): Boolean = { - if (s != null) { - val sLen = s.length - if (!atLimit) { - val available = maxLength - length - val stringToAppend = if (available >= sLen) s else s.substring(0, available) - strings.append(stringToAppend) - } - length += sLen - } + if (s != null) { + val sLen = s.length + if (!atLimit) { + val available = maxLength - length + val stringToAppend = if (available >= sLen) s else s.substring(0, available) + strings.append(stringToAppend) + } + length += sLen + } return !atLimit } From eb698889f906b3657b8fc2078e40ab5aa3585217 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Tue, 22 Jan 2019 07:46:49 -0600 Subject: [PATCH 24/34] Remove useless test. --- .../apache/spark/sql/catalyst/trees/TreeNodeSuite.scala | 9 --------- 1 file changed, 9 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index f198691017843..4a28cb52c4fa4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -597,15 +597,6 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper { assert(result === expected) } - test("treeString default doesn't limit plan length") { - val ds = (1 until 20).foldLeft(Literal("TestLiteral"): Expression) { case (treeNode, x) => - Add(Literal(x), treeNode) - } - - val planString = ds.treeString - assert(planString.length > 250) - } - test("treeString limits plan length") { withSQLConf(SQLConf.MAX_PLAN_STRING_LENGTH.key -> "200") { val ds = (1 until 20).foldLeft(Literal("TestLiteral"): Expression) { case (treeNode, x) => From daf02f2bbedec3ee8f0eeb0557cc85f850599f0b Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Fri, 8 Mar 2019 14:13:30 -0600 Subject: [PATCH 25/34] Addressed code review comments. --- .../spark/sql/catalyst/plans/QueryPlan.scala | 2 +- .../spark/sql/catalyst/trees/TreeNode.scala | 4 +-- .../spark/sql/catalyst/util/StringUtils.scala | 29 ++++++++++++------- .../apache/spark/sql/internal/SQLConf.scala | 6 ++-- .../sql/catalyst/trees/TreeNodeSuite.scala | 4 ++- .../sql/catalyst/util/StringUtilsSuite.scala | 11 +++++-- .../spark/sql/execution/QueryExecution.scala | 2 +- 7 files changed, 36 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index a692c72e1babf..8f5444ed8a5a7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -308,7 +308,7 @@ object QueryPlan extends PredicateHelper { */ def append[T <: QueryPlan[T]]( plan: => QueryPlan[T], - append: String => Boolean, + append: String => Unit, verbose: Boolean, addSuffix: Boolean, maxFields: Int = SQLConf.get.maxToStringFields): Unit = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 419aeb687532d..72b1931e47cf2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -487,7 +487,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { } def treeString( - append: String => Boolean, + append: String => Unit, verbose: Boolean, addSuffix: Boolean, maxFields: Int): Unit = { @@ -554,7 +554,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { def generateTreeString( depth: Int, lastChildren: Seq[Boolean], - append: String => Boolean, + append: String => Unit, verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala index 761f1487173c2..ee843ded7a890 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala @@ -100,7 +100,7 @@ object StringUtils extends Logging { * the string. */ class StringConcat(val maxLength: Int = ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) { - private val strings = new ArrayBuffer[String] + protected val strings = new ArrayBuffer[String] protected var length: Int = 0 def atLimit: Boolean = length >= maxLength @@ -110,7 +110,7 @@ object StringUtils extends Logging { * appended strings once in the toString method. Returns true if the string still * has room for further appends before it hits its max limit. */ - def append(s: String): Boolean = { + def append(s: String): Unit = { if (s != null) { val sLen = s.length if (!atLimit) { @@ -128,26 +128,33 @@ object StringUtils extends Logging { * returns concatenated string. */ override def toString: String = { - val result = new java.lang.StringBuilder(length) + val finalLength = if (atLimit) maxLength else length + val result = new java.lang.StringBuilder(finalLength) strings.foreach(result.append) result.toString } } - /** Whether we have warned about plan string truncation yet. */ - private val planSizeWarningPrinted = new AtomicBoolean(false) - - /** A string concatenator for plan strings. Uses length from a configured value, and - * prints a warning the first time a plan is truncated. */ - class PlanStringConcat extends StringConcat(SQLConf.get.maxPlanStringLength) { + /** + * A string concatenator for plan strings. Uses length from a configured value, and + * prints a warning the first time a plan is truncated. + */ + class PlanStringConcat extends StringConcat(Math.max(0, SQLConf.get.maxPlanStringLength - 30)) { override def toString: String = { - if (atLimit && planSizeWarningPrinted.compareAndSet(false, true)) { + if (atLimit) { logWarning( "Truncated the string representation of a plan since it was too long. The " + s"plan had length ${length} and the maximum is ${maxLength}. This behavior " + "can be adjusted by setting '${SQLConf.MAX_PLAN_STRING_LENGTH.key}'.") + val truncateMsg = s"... ${length - maxLength} more characters" + val result = new java.lang.StringBuilder(maxLength + truncateMsg.length) + strings.foreach(result.append) + result.append(truncateMsg) + result.toString + } + else { + super.toString } - super.toString } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1b22fa84dae58..12df3161c9442 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1625,15 +1625,15 @@ object SQLConf { .intConf .createWithDefault(25) - val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.maxPlanLength") + val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.maxPlanStringLength") .doc("Maximum number of characters to output for a plan string. If the plan is " + "longer, further output will be truncated. The default setting always generates a full " + "plan. Set this to a lower value such as 8192 if plan strings are taking up too much " + "memory or are causing OutOfMemory errors in the driver or UI processes.") .intConf .checkValue(i => i >= 0 && i <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, "Invalid " + - "value for 'spark.sql.maxPlanLength'. Length must be a valid string length (nonnegative " + - "and shorter than the maximum size).") + "value for 'spark.sql.maxPlanStringLength'. Length must be a valid string length " + + "(nonnegative and shorter than the maximum size).") .createWithDefault(ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH) val SET_COMMAND_REJECTS_SPARK_CORE_CONFS = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index 4a28cb52c4fa4..cc21cc307b2a1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -604,7 +604,9 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper { } val planString = ds.treeString - assert(planString.length === SQLConf.get.maxPlanStringLength) + logWarning("Plan string: " + planString) + assert(planString.endsWith(" more characters")) + assert(planString.length <= SQLConf.get.maxPlanStringLength) } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala index 3a73bb3e8100b..878093ce50b7a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala @@ -68,9 +68,14 @@ class StringUtilsSuite extends SparkFunSuite { } test("string concatenation return value") { - assert(new StringConcat(7).append("under") == true) - assert(new StringConcat(7).append("underover") == false) - assert(new StringConcat(7).append("underov") == false) + def checkLimit(s: String): Boolean = { + val sc = new StringConcat(7) + sc.append(s) + sc.atLimit + } + assert(checkLimit("under")) + assert(checkLimit("underover") == false) + assert(checkLimit("underov") == false) } test("string concatenation append after limit") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 9f458621fea61..23550cbadb251 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -112,7 +112,7 @@ class QueryExecution( concat.toString } - private def writePlans(append: String => Boolean, maxFields: Int): Unit = { + private def writePlans(append: String => Unit, maxFields: Int): Unit = { val (verbose, addSuffix) = (true, false) append("== Parsed Logical Plan ==\n") QueryPlan.append(logical, append, verbose, addSuffix, maxFields) From 7d89388f888ffb8f75b2ec4507057784070dcb2a Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Fri, 8 Mar 2019 14:15:28 -0600 Subject: [PATCH 26/34] Missed some Boolean->Unit updates --- .../apache/spark/sql/execution/WholeStageCodegenExec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 9542f5ed4e808..3b0a99669ccd0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -493,7 +493,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod override def generateTreeString( depth: Int, lastChildren: Seq[Boolean], - append: String => Boolean, + append: String => Unit, verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, @@ -777,7 +777,7 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) override def generateTreeString( depth: Int, lastChildren: Seq[Boolean], - append: String => Boolean, + append: String => Unit, verbose: Boolean, prefix: String = "", addSuffix: Boolean = false, From 4f56e48c93cae5dcb9f3a19b7b4523cd8d584d76 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Fri, 8 Mar 2019 14:19:53 -0600 Subject: [PATCH 27/34] Missed some Boolean->Unit updates --- .../scala/org/apache/spark/sql/execution/QueryExecution.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 23550cbadb251..0727cb9f3a0fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -196,7 +196,6 @@ class QueryExecution( val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath))) val append = (s: String) => { writer.write(s) - true } try { writePlans(append, maxFields) From a090fbb876161f295f4d898accd0e5449445d183 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Fri, 8 Mar 2019 14:26:36 -0600 Subject: [PATCH 28/34] Code formatting --- .../org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index cc21cc307b2a1..a3edea6751da1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -604,9 +604,9 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper { } val planString = ds.treeString - logWarning("Plan string: " + planString) + logWarning("Plan string: " + planString) assert(planString.endsWith(" more characters")) - assert(planString.length <= SQLConf.get.maxPlanStringLength) + assert(planString.length <= SQLConf.get.maxPlanStringLength) } } } From dcb4eb07f96b74980dc4e34e77fbe2d2e5e21b70 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Fri, 8 Mar 2019 16:09:44 -0600 Subject: [PATCH 29/34] Fixed Fatal Warning. --- .../scala/org/apache/spark/sql/catalyst/util/StringUtils.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala index ee843ded7a890..bded70c58c153 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala @@ -120,7 +120,6 @@ object StringUtils extends Logging { } length += sLen } - return !atLimit } /** From b4cb7bf1e646ba265b2952f6f8d77c654927bce1 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Sat, 9 Mar 2019 12:48:34 -0600 Subject: [PATCH 30/34] Fixed failing unit tests --- .../spark/sql/catalyst/util/StringUtilsSuite.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala index 878093ce50b7a..d5deabf092eaf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala @@ -73,14 +73,15 @@ class StringUtilsSuite extends SparkFunSuite { sc.append(s) sc.atLimit } - assert(checkLimit("under")) - assert(checkLimit("underover") == false) - assert(checkLimit("underov") == false) + assert(checkLimit("under") == false) + assert(checkLimit("1234567")) + assert(checkLimit("1234567890")) } test("string concatenation append after limit") { val concat = new StringConcat(7) - concat.append("underover") - assert(concat.append("extra") == false) + concat.append("under") + concat.append("extra") + assert(concat.toString === "underex") } } From 4fec590b45340332d65188781bbf71349edb248a Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Mon, 11 Mar 2019 14:12:29 -0500 Subject: [PATCH 31/34] Style/formatting issues from code review --- .../spark/sql/catalyst/util/StringUtils.scala | 3 +- .../sql/catalyst/util/StringUtilsSuite.scala | 31 +++++++------------ 2 files changed, 13 insertions(+), 21 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala index bded70c58c153..896ce108bb247 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala @@ -150,8 +150,7 @@ object StringUtils extends Logging { strings.foreach(result.append) result.append(truncateMsg) result.toString - } - else { + } else { super.toString } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala index d5deabf092eaf..852d075743e67 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala @@ -46,25 +46,25 @@ class StringUtilsSuite extends SparkFunSuite { test("string concatenation") { def concat(seq: String*): String = { - seq.foldLeft(new StringConcat())((acc, s) => {acc.append(s); acc}).toString + seq.foldLeft(new StringConcat()) { (acc, s) => acc.append(s); acc }.toString } assert(new StringConcat().toString == "") - assert(concat("") == "") - assert(concat(null) == "") - assert(concat("a") == "a") - assert(concat("1", "2") == "12") - assert(concat("abc", "\n", "123") == "abc\n123") + assert(concat("") === "") + assert(concat(null) === "") + assert(concat("a") === "a") + assert(concat("1", "2") === "12") + assert(concat("abc", "\n", "123") === "abc\n123") } test("string concatenation with limit") { def concat(seq: String*): String = { - seq.foldLeft(new StringConcat(7))((acc, s) => {acc.append(s); acc}).toString + seq.foldLeft(new StringConcat(7)) { (acc, s) => acc.append(s); acc }.toString } - assert(concat("under") == "under") - assert(concat("under", "over", "extra") == "underov") - assert(concat("underover") == "underov") - assert(concat("under", "ov") == "underov") + assert(concat("under") === "under") + assert(concat("under", "over", "extra") === "underov") + assert(concat("underover") === "underov") + assert(concat("under", "ov") === "underov") } test("string concatenation return value") { @@ -73,15 +73,8 @@ class StringUtilsSuite extends SparkFunSuite { sc.append(s) sc.atLimit } - assert(checkLimit("under") == false) + assert(checkLimit("under") === false) assert(checkLimit("1234567")) assert(checkLimit("1234567890")) } - - test("string concatenation append after limit") { - val concat = new StringConcat(7) - concat.append("under") - concat.append("extra") - assert(concat.toString === "underex") - } } From db0db18795ea5740082eddbca4084ad31e6dda13 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Mon, 11 Mar 2019 14:14:22 -0500 Subject: [PATCH 32/34] Style/formatting issues from code review --- .../org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala index 852d075743e67..63d3831404d47 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/StringUtilsSuite.scala @@ -73,7 +73,7 @@ class StringUtilsSuite extends SparkFunSuite { sc.append(s) sc.atLimit } - assert(checkLimit("under") === false) + assert(!checkLimit("under")) assert(checkLimit("1234567")) assert(checkLimit("1234567890")) } From b5b30f31c0603653017cb7efc90c608a18aa0954 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Tue, 12 Mar 2019 20:48:45 -0500 Subject: [PATCH 33/34] Changed plan string length to bytesConf and gave a better message for 0 length plans. --- .../apache/spark/sql/catalyst/util/StringUtils.scala | 6 +++++- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 6 +++--- .../spark/sql/catalyst/trees/TreeNodeSuite.scala | 11 +++++++++++ 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala index 896ce108bb247..6118d8c6fc282 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/StringUtils.scala @@ -145,7 +145,11 @@ object StringUtils extends Logging { "Truncated the string representation of a plan since it was too long. The " + s"plan had length ${length} and the maximum is ${maxLength}. This behavior " + "can be adjusted by setting '${SQLConf.MAX_PLAN_STRING_LENGTH.key}'.") - val truncateMsg = s"... ${length - maxLength} more characters" + val truncateMsg = if (maxLength == 0) { + s"Truncated plan of $length characters" + } else { + s"... ${length - maxLength} more characters" + } val result = new java.lang.StringBuilder(maxLength + truncateMsg.length) strings.foreach(result.append) result.append(truncateMsg) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 12df3161c9442..32cfcb9147d81 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1628,9 +1628,9 @@ object SQLConf { val MAX_PLAN_STRING_LENGTH = buildConf("spark.sql.maxPlanStringLength") .doc("Maximum number of characters to output for a plan string. If the plan is " + "longer, further output will be truncated. The default setting always generates a full " + - "plan. Set this to a lower value such as 8192 if plan strings are taking up too much " + + "plan. Set this to a lower value such as 8k if plan strings are taking up too much " + "memory or are causing OutOfMemory errors in the driver or UI processes.") - .intConf + .bytesConf(ByteUnit.BYTE) .checkValue(i => i >= 0 && i <= ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH, "Invalid " + "value for 'spark.sql.maxPlanStringLength'. Length must be a valid string length " + "(nonnegative and shorter than the maximum size).") @@ -2067,7 +2067,7 @@ class SQLConf extends Serializable with Logging { def maxToStringFields: Int = getConf(SQLConf.MAX_TO_STRING_FIELDS) - def maxPlanStringLength: Int = getConf(SQLConf.MAX_PLAN_STRING_LENGTH) + def maxPlanStringLength: Int = getConf(SQLConf.MAX_PLAN_STRING_LENGTH).toInt def setCommandRejectsSparkCoreConfs: Boolean = getConf(SQLConf.SET_COMMAND_REJECTS_SPARK_CORE_CONFS) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index a3edea6751da1..379c9be066c7d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -609,4 +609,15 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper { assert(planString.length <= SQLConf.get.maxPlanStringLength) } } + + test("treeString limit at zero") { + withSQLConf(SQLConf.MAX_PLAN_STRING_LENGTH.key -> "0") { + val ds = (1 until 2).foldLeft(Literal("TestLiteral"): Expression) { case (treeNode, x) => + Add(Literal(x), treeNode) + } + + val planString = ds.treeString + assert(planString.startsWith("Truncated plan of")) + } + } } From e4afa26baf8580bf5f630d0332e4ab49b243b434 Mon Sep 17 00:00:00 2001 From: Dave DeCaprio Date: Tue, 12 Mar 2019 20:49:53 -0500 Subject: [PATCH 34/34] Tabs to spaces --- .../sql/catalyst/trees/TreeNodeSuite.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala index 379c9be066c7d..6d4268716a7f1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/trees/TreeNodeSuite.scala @@ -610,14 +610,14 @@ class TreeNodeSuite extends SparkFunSuite with SQLHelper { } } - test("treeString limit at zero") { - withSQLConf(SQLConf.MAX_PLAN_STRING_LENGTH.key -> "0") { - val ds = (1 until 2).foldLeft(Literal("TestLiteral"): Expression) { case (treeNode, x) => - Add(Literal(x), treeNode) - } - - val planString = ds.treeString - assert(planString.startsWith("Truncated plan of")) - } - } + test("treeString limit at zero") { + withSQLConf(SQLConf.MAX_PLAN_STRING_LENGTH.key -> "0") { + val ds = (1 until 2).foldLeft(Literal("TestLiteral"): Expression) { case (treeNode, x) => + Add(Literal(x), treeNode) + } + + val planString = ds.treeString + assert(planString.startsWith("Truncated plan of")) + } + } }