From d16e0f3e22287a7f3779ed24239d84179602e30a Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 6 Jun 2016 17:56:06 -0700 Subject: [PATCH 1/9] truncate strings --- .../scala/org/apache/spark/util/Utils.scala | 46 +++++++++++++++++++ .../org/apache/spark/util/UtilsSuite.scala | 7 +++ .../sql/catalyst/expressions/Expression.scala | 4 +- .../spark/sql/catalyst/trees/TreeNode.scala | 6 +-- .../apache/spark/sql/types/StructType.scala | 16 ++----- .../spark/sql/execution/ExistingRDD.scala | 10 ++-- .../spark/sql/execution/QueryExecution.scala | 5 +- .../aggregate/HashAggregateExec.scala | 7 +-- .../aggregate/SortAggregateExec.scala | 7 +-- .../datasources/LogicalRelation.scala | 3 +- .../apache/spark/sql/execution/limit.scala | 5 +- .../execution/streaming/StreamExecution.scala | 3 +- .../sql/execution/streaming/memory.scala | 3 +- 13 files changed, 89 insertions(+), 33 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 1a9dbcae8c083..a9a9294dc59fb 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -26,6 +26,7 @@ import java.nio.charset.StandardCharsets import java.nio.file.Files import java.util.{Locale, Properties, Random, UUID} import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicBoolean import javax.net.ssl.HttpsURLConnection import scala.annotation.tailrec @@ -78,6 +79,51 @@ private[spark] object Utils extends Logging { private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 @volatile private var localRootDirs: Array[String] = null + /** + * The performance overhead of creating and logging strings for wide schemas can be large. To + * limit the impact, we bound the number of fields to include by default. This can be overriden + * by setting the 'spark.debug.maxToStringFields' conf in SparkEnv. + */ + val DEFAULT_MAX_TO_STRING_FIELDS = 25 + + private def maxNumToStringFields = { + if (SparkEnv.get != null) { + SparkEnv.get.conf.getInt("spark.debug.maxToStringFields", DEFAULT_MAX_TO_STRING_FIELDS) + } else { + DEFAULT_MAX_TO_STRING_FIELDS + } + } + + /** Whether we have warned about plan string truncation yet. */ + private val truncationWarningPrinted = new AtomicBoolean(false) + + /** + * Format a sequence with semantics similar to calling .mkString(). Any elements beyond + * maxNumToStringFields will be dropped and replaced by a "... N more fields" placeholder. + * + * @return the trimmed and formatted string. + */ + def truncatedString[T]( + seq: Seq[T], + start: String, + sep: String, + end: String, + maxNumFields: Int = maxNumToStringFields): String = { + if (seq.length > maxNumFields) { + if (truncationWarningPrinted.compareAndSet(false, true)) { + logWarning( + "Truncated the string representation of a plan since it was too large. This " + + "behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.") + } + seq.take(maxNumFields - 1).mkString( + start, sep, sep + "... " + (seq.length - maxNumFields + 1) + " more fields" + end) + } else { + seq.mkString(start, sep, end) + } + } + + /** Shorthand for calling truncatedString() without start or end strings. */ + def truncatedString[T](seq: Seq[T], sep: String): String = truncatedString(seq, "", sep, "") /** Serialize an object using Java serialization */ def serialize[T](o: T): Array[Byte] = { diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 66987498669d4..8dd1f3f2e2435 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -40,6 +40,13 @@ import org.apache.spark.network.util.ByteUnit class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { + test("truncatedString") { + assert(Utils.truncatedString(Nil, "[", ", ", "]", 2) == "[]") + assert(Utils.truncatedString(Seq(1, 2), "[", ", ", "]", 2) == "[1, 2]") + assert(Utils.truncatedString(Seq(1, 2, 3), "[", ", ", "]", 2) == "[1, ... 2 more fields]") + assert(Utils.truncatedString(Seq(1, 2, 3), ", ") == "1, 2, 3") + } + test("timeConversion") { // Test -1 assert(Utils.timeStringAsSeconds("-1") === -1) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 2ec46216e1cdb..02d811aafb4bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils //////////////////////////////////////////////////////////////////////////////////////////////////// // This file defines the basic expression abstract classes in Catalyst. @@ -192,7 +193,8 @@ abstract class Expression extends TreeNode[Expression] { override def simpleString: String = toString - override def toString: String = prettyName + flatArguments.mkString("(", ", ", ")") + override def toString: String = prettyName + Utils.truncatedString( + flatArguments.toSeq, "(", ", ", ")") /** * Returns SQL representation of this expression. For expressions extending [[NonSQLExpression]], diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 22d82c61080c0..a983679c682ac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -448,10 +448,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { case tn: TreeNode[_] => tn.simpleString :: Nil case seq: Seq[Any] if seq.toSet.subsetOf(allChildren.asInstanceOf[Set[Any]]) => Nil case iter: Iterable[_] if iter.isEmpty => Nil - case seq: Seq[_] => seq.mkString("[", ", ", "]") :: Nil - case set: Set[_] => set.mkString("{", ", ", "}") :: Nil + case seq: Seq[_] => Utils.truncatedString(seq, "[", ", ", "]") :: Nil + case set: Set[_] => Utils.truncatedString(set.toSeq, "{", ", ", "}") :: Nil case array: Array[_] if array.isEmpty => Nil - case array: Array[_] => array.mkString("[", ", ", "]") :: Nil + case array: Array[_] => Utils.truncatedString(array, "[", ", ", "]") :: Nil case null => Nil case None => Nil case Some(null) => Nil diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index 9a923737598c3..6406f96c1ee13 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -22,11 +22,12 @@ import scala.util.Try import org.json4s.JsonDSL._ -import org.apache.spark.SparkException +import org.apache.spark.{SparkEnv, SparkException} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, InterpretedOrdering} import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, LegacyTypeStringParser} import org.apache.spark.sql.catalyst.util.quoteIdentifier +import org.apache.spark.util.Utils /** * :: DeveloperApi :: @@ -294,7 +295,7 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru override def simpleString: String = { val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.simpleString}") - s"struct<${fieldTypes.mkString(",")}>" + Utils.truncatedString(fieldTypes, "struct<", ",", ">") } override def sql: String = { @@ -307,16 +308,7 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru val fieldTypes = fields.take(maxNumberFields).map { case f => s"${f.name}: ${f.dataType.simpleString(maxNumberFields)}" } - builder.append("struct<") - builder.append(fieldTypes.mkString(", ")) - if (fields.length > 2) { - if (fields.length - fieldTypes.length == 1) { - builder.append(" ... 1 more field") - } else { - builder.append(" ... " + (fields.length - 2) + " more fields") - } - } - builder.append(">").toString() + Utils.truncatedString(fieldTypes, "struct<", ",", ">", maxNumberFields) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index b8b392608de1b..9ab98fd124a34 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.util.Utils object RDDConversions { def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = { @@ -123,7 +124,7 @@ private[sql] case class RDDScanExec( } override def simpleString: String = { - s"Scan $nodeName${output.mkString("[", ",", "]")}" + s"Scan $nodeName${Utils.truncatedString(output, "[", ",", "]")}" } } @@ -186,7 +187,8 @@ private[sql] case class RowDataSourceScanExec( key + ": " + StringUtils.abbreviate(value, 100) } - s"$nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}" + s"$nodeName${Utils.truncatedString(output, "[", ",", "]")}" + + s"${Utils.truncatedString(metadataEntries, " ", ", ", "")}" } override def inputRDDs(): Seq[RDD[InternalRow]] = { @@ -239,8 +241,8 @@ private[sql] case class BatchedDataSourceScanExec( val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield { key + ": " + StringUtils.abbreviate(value, 100) } - val metadataStr = metadataEntries.mkString(" ", ", ", "") - s"Batched$nodeName${output.mkString("[", ",", "]")}$metadataStr" + val metadataStr = Utils.truncatedString(metadataEntries, " ", ", ", "") + s"Batched$nodeName${Utils.truncatedString(output, "[", ",", "]")}$metadataStr" } override def inputRDDs(): Seq[RDD[InternalRow]] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 330459c11ea98..ed78e941c7148 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCom import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _} +import org.apache.spark.util.Utils /** * The primary workflow for executing relational queries using Spark. Designed to allow easy @@ -206,8 +207,8 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { } override def toString: String = { - def output = - analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}").mkString(", ") + def output = Utils.truncatedString( + analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ") val analyzedPlan = Seq(stringOrError(output), stringOrError(analyzed)).filter(_.nonEmpty).mkString("\n") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index f270ca07554f5..0c9f0a0509d06 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.types.{DecimalType, StringType, StructType} import org.apache.spark.unsafe.KVIterator +import org.apache.spark.util.Utils /** * Hash-based aggregate operator that can also fallback to sorting when data exceeds memory size. @@ -769,9 +770,9 @@ case class HashAggregateExec( testFallbackStartsAt match { case None => - val keyString = groupingExpressions.mkString("[", ",", "]") - val functionString = allAggregateExpressions.mkString("[", ",", "]") - val outputString = output.mkString("[", ",", "]") + val keyString = Utils.truncatedString(groupingExpressions, "[", ",", "]") + val functionString = Utils.truncatedString(allAggregateExpressions, "[", ",", "]") + val outputString = Utils.truncatedString(output, "[", ",", "]") s"HashAggregate(key=$keyString, functions=$functionString, output=$outputString)" case Some(fallbackStartsAt) => s"HashAggregateWithControlledFallback $groupingExpressions " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala index 9e48ff8d707bd..2bf82a1bce3e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, UnspecifiedDistribution} import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.metric.SQLMetrics +import org.apache.spark.util.Utils /** * Sort-based aggregate operator. @@ -106,9 +107,9 @@ case class SortAggregateExec( override def simpleString: String = { val allAggregateExpressions = aggregateExpressions - val keyString = groupingExpressions.mkString("[", ",", "]") - val functionString = allAggregateExpressions.mkString("[", ",", "]") - val outputString = output.mkString("[", ",", "]") + val keyString = Utils.truncatedString(groupingExpressions, "[", ",", "]") + val functionString = Utils.truncatedString(allAggregateExpressions, "[", ",", "]") + val outputString = Utils.truncatedString(output, "[", ",", "]") s"SortAggregate(key=$keyString, functions=$functionString, output=$outputString)" } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 0e0748ff32df3..a418d02983eb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference} import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.sources.BaseRelation +import org.apache.spark.util.Utils /** * Used to link a [[BaseRelation]] in to a logical query plan. @@ -82,5 +83,5 @@ case class LogicalRelation( expectedOutputAttributes, metastoreTableIdentifier).asInstanceOf[this.type] - override def simpleString: String = s"Relation[${output.mkString(",")}] $relation" + override def simpleString: String = s"Relation[${Utils.truncatedString(output, ",")}] $relation" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index b71f3335c99e5..781c016095427 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, LazilyGeneratedOrdering} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.exchange.ShuffleExchange +import org.apache.spark.util.Utils /** @@ -159,8 +160,8 @@ case class TakeOrderedAndProjectExec( override def outputOrdering: Seq[SortOrder] = sortOrder override def simpleString: String = { - val orderByString = sortOrder.mkString("[", ",", "]") - val outputString = output.mkString("[", ",", "]") + val orderByString = Utils.truncatedString(sortOrder, "[", ",", "]") + val outputString = Utils.truncatedString(output, "[", ",", "]") s"TakeOrderedAndProject(limit=$limit, orderBy=$orderByString, output=$outputString)" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index 16d38a2f7db56..b00177d42d14f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -332,7 +332,8 @@ class StreamExecution( newData.get(source).map { data => val newPlan = data.logicalPlan assert(output.size == newPlan.output.size, - s"Invalid batch: ${output.mkString(",")} != ${newPlan.output.mkString(",")}") + s"Invalid batch: ${Utils.truncatedString(output, ",")} != " + + s"${Utils.truncatedString(newPlan.output, ",")}") replacements ++= output.zip(newPlan.output) newPlan }.getOrElse { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 4496f41615a4d..77fd043ef7219 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LeafNode import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType +import org.apache.spark.util.Utils object MemoryStream { protected val currentBlockId = new AtomicInteger(0) @@ -81,7 +82,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } } - override def toString: String = s"MemoryStream[${output.mkString(",")}]" + override def toString: String = s"MemoryStream[${Utils.truncatedString(output, ",")}]" override def getOffset: Option[Offset] = synchronized { if (batches.isEmpty) { From f4f4368d3550b864c6286ce04770990b41c6741c Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 6 Jun 2016 18:37:13 -0700 Subject: [PATCH 2/9] Mon Jun 6 18:37:13 PDT 2016 --- .../benchmark/WideSchemaBenchmark.scala | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala index b4811fe27a513..06466e629b3f2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/WideSchemaBenchmark.scala @@ -155,6 +155,55 @@ many column field r/w: Best/Avg Time(ms) Rate(M/s) Per Ro */ } + ignore("wide shallowly nested struct field read and write") { + val benchmark = new Benchmark( + "wide shallowly nested struct field r/w", scaleFactor) + for (width <- widthsToTest) { + val numRows = scaleFactor / width + var datum: String = "{" + for (i <- 1 to width) { + if (i == 1) { + datum += s""""value_$i": 1""" + } else { + datum += s""", "value_$i": 1""" + } + } + datum += "}" + datum = s"""{"a": {"b": {"c": $datum, "d": $datum}, "e": $datum}}""" + val df = sparkSession.read.json(sparkSession.range(numRows).map(_ => datum).rdd).cache() + df.count() // force caching + addCases(benchmark, df, s"$width wide x $numRows rows", "a.b.c.value_1") + } + benchmark.run() + +/* +Java HotSpot(TM) 64-Bit Server VM 1.7.0_80-b15 on Linux 4.2.0-36-generic +Intel(R) Xeon(R) CPU E5-1650 v3 @ 3.50GHz +wide shallowly nested struct field r/w: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative +------------------------------------------------------------------------------------------------ +1 wide x 100000 rows (read in-mem) 100 / 125 1.0 997.7 1.0X +1 wide x 100000 rows (write in-mem) 130 / 147 0.8 1302.9 0.8X +1 wide x 100000 rows (read parquet) 195 / 228 0.5 1951.4 0.5X +1 wide x 100000 rows (write parquet) 248 / 259 0.4 2479.7 0.4X +10 wide x 10000 rows (read in-mem) 76 / 89 1.3 757.2 1.3X +10 wide x 10000 rows (write in-mem) 90 / 116 1.1 900.0 1.1X +10 wide x 10000 rows (read parquet) 90 / 135 1.1 903.9 1.1X +10 wide x 10000 rows (write parquet) 222 / 240 0.4 2222.8 0.4X +100 wide x 1000 rows (read in-mem) 71 / 91 1.4 710.8 1.4X +100 wide x 1000 rows (write in-mem) 252 / 324 0.4 2522.4 0.4X +100 wide x 1000 rows (read parquet) 310 / 329 0.3 3095.9 0.3X +100 wide x 1000 rows (write parquet) 253 / 267 0.4 2525.7 0.4X +1000 wide x 100 rows (read in-mem) 144 / 160 0.7 1439.5 0.7X +1000 wide x 100 rows (write in-mem) 2055 / 2326 0.0 20553.9 0.0X +1000 wide x 100 rows (read parquet) 750 / 925 0.1 7496.8 0.1X +1000 wide x 100 rows (write parquet) 235 / 317 0.4 2347.5 0.4X +2500 wide x 40 rows (read in-mem) 219 / 227 0.5 2190.9 0.5X +2500 wide x 40 rows (write in-mem) 5177 / 5423 0.0 51773.2 0.0X +2500 wide x 40 rows (read parquet) 1642 / 1714 0.1 16417.7 0.1X +2500 wide x 40 rows (write parquet) 357 / 381 0.3 3568.2 0.3X +*/ + } + ignore("wide struct field read and write") { val benchmark = new Benchmark("wide struct field r/w", scaleFactor) for (width <- widthsToTest) { From 17f98d76aec40bc7c6b8c46925d4013f9bccd639 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Mon, 6 Jun 2016 18:43:24 -0700 Subject: [PATCH 3/9] Mon Jun 6 18:43:24 PDT 2016 --- core/src/main/scala/org/apache/spark/util/Utils.scala | 5 +++-- core/src/test/scala/org/apache/spark/util/UtilsSuite.scala | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a9a9294dc59fb..f9d05409e1c3d 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -115,8 +115,9 @@ private[spark] object Utils extends Logging { "Truncated the string representation of a plan since it was too large. This " + "behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.") } - seq.take(maxNumFields - 1).mkString( - start, sep, sep + "... " + (seq.length - maxNumFields + 1) + " more fields" + end) + val numFields = math.max(0, maxNumFields - 1) + seq.take(numFields).mkString( + start, sep, sep + "... " + (seq.length - numFields) + " more fields" + end) } else { seq.mkString(start, sep, end) } diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 8dd1f3f2e2435..a5363f0bfd600 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -44,6 +44,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging { assert(Utils.truncatedString(Nil, "[", ", ", "]", 2) == "[]") assert(Utils.truncatedString(Seq(1, 2), "[", ", ", "]", 2) == "[1, 2]") assert(Utils.truncatedString(Seq(1, 2, 3), "[", ", ", "]", 2) == "[1, ... 2 more fields]") + assert(Utils.truncatedString(Seq(1, 2, 3), "[", ", ", "]", -5) == "[, ... 3 more fields]") assert(Utils.truncatedString(Seq(1, 2, 3), ", ") == "1, 2, 3") } From 68a97dc5c0c62f13eca374190a02a46f9f050e15 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 7 Jun 2016 23:01:29 -0700 Subject: [PATCH 4/9] Fix test --- .../src/main/scala/org/apache/spark/sql/types/StructType.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index 6406f96c1ee13..bc9542caf47c1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -295,7 +295,7 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru override def simpleString: String = { val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.simpleString}") - Utils.truncatedString(fieldTypes, "struct<", ",", ">") + Utils.truncatedString(fieldTypes, "struct<", ", ", ">") } override def sql: String = { From 61162a9c560976a4b4bfa7c564c68314c525ddde Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 7 Jun 2016 23:02:16 -0700 Subject: [PATCH 5/9] oops, really fix it --- .../src/main/scala/org/apache/spark/sql/types/StructType.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index bc9542caf47c1..d3a45d018b55c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -308,7 +308,7 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru val fieldTypes = fields.take(maxNumberFields).map { case f => s"${f.name}: ${f.dataType.simpleString(maxNumberFields)}" } - Utils.truncatedString(fieldTypes, "struct<", ",", ">", maxNumberFields) + Utils.truncatedString(fieldTypes, "struct<", ", ", ">", maxNumberFields) } /** From c52c44df2552c81e46a36c1a449d11e08ea10bb5 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 7 Jun 2016 23:05:59 -0700 Subject: [PATCH 6/9] make spacing consistent as well --- .../apache/spark/sql/types/StructType.scala | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index d3a45d018b55c..3bf64ccad0c2e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -294,7 +294,7 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru override def defaultSize: Int = fields.map(_.dataType.defaultSize).sum override def simpleString: String = { - val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.simpleString}") + val fieldTypes = fields.map(field => s"${field.name}: ${field.dataType.simpleString}") Utils.truncatedString(fieldTypes, "struct<", ", ", ">") } @@ -304,11 +304,17 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru } private[sql] override def simpleString(maxNumberFields: Int): String = { - val builder = new StringBuilder - val fieldTypes = fields.take(maxNumberFields).map { - case f => s"${f.name}: ${f.dataType.simpleString(maxNumberFields)}" - } - Utils.truncatedString(fieldTypes, "struct<", ", ", ">", maxNumberFields) + val fieldTypes = fields.map(field => s"${field.name}: ${field.dataType.simpleString(maxNumberFields)}") + builder.append("struct<") + Utils.truncatedString(fieldTypes, "struct<", ",", ">", maxNumberFields) + builder.append(fieldTypes.mkString(", ")) + if (fields.length > 2) { + if (fields.length - fieldTypes.length == 1) { + builder.append(" ... 1 more field") + } else { + builder.append(" ... " + (fields.length - 2) + " more fields") + } + } + builder.append(">").toString() } /** From 7da90fa5982f0a51d7e44ee260c137ec136b8c67 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 7 Jun 2016 23:07:32 -0700 Subject: [PATCH 7/9] Update StructType.scala --- .../scala/org/apache/spark/sql/types/StructType.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index 3bf64ccad0c2e..c0ad29c28bd3e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -304,10 +304,13 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru } private[sql] override def simpleString(maxNumberFields: Int): String = { - val fieldTypes = fields.map(field => s"${field.name}: ${field.dataType.simpleString(maxNumberFields)}") - builder.append("struct<") + Utils.truncatedString(fieldTypes, "struct<", ",", ">", maxNumberFields) + val builder = new StringBuilder + val fieldTypes = fields.take(maxNumberFields).map { + builder.append("struct<") + Utils.truncatedString(fieldTypes, "struct<", ",", ">", maxNumberFields) + case f => s"${f.name}: ${f.dataType.simpleString(maxNumberFields)}" + builder.append(fieldTypes.mkString(", ")) + } + builder.append("struct<") builder.append(fieldTypes.mkString(", ")) - if (fields.length > 2) { + if (fields.length > 2) { if (fields.length - fieldTypes.length == 1) { builder.append(" ... 1 more field") } else { From 1d5574a96a10824e3a75d935d662d43c0454ae36 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Tue, 7 Jun 2016 23:08:53 -0700 Subject: [PATCH 8/9] ugh github --- .../apache/spark/sql/types/StructType.scala | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index c0ad29c28bd3e..e73edb59f42f6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -304,19 +304,19 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru } private[sql] override def simpleString(maxNumberFields: Int): String = { - val builder = new StringBuilder - val fieldTypes = fields.take(maxNumberFields).map { + builder.append("struct<") + Utils.truncatedString(fieldTypes, "struct<", ",", ">", maxNumberFields) - case f => s"${f.name}: ${f.dataType.simpleString(maxNumberFields)}" + builder.append(fieldTypes.mkString(", ")) + val builder = new StringBuilder + val fieldTypes = fields.take(maxNumberFields).map { + case f => s"${f.name}: ${f.dataType.simpleString(maxNumberFields)}" } builder.append("struct<") - builder.append(fieldTypes.mkString(", ")) + builder.append(fieldTypes.mkString(", ")) if (fields.length > 2) { - if (fields.length - fieldTypes.length == 1) { - builder.append(" ... 1 more field") - } else { - builder.append(" ... " + (fields.length - 2) + " more fields") - } - } + if (fields.length - fieldTypes.length == 1) { + builder.append(" ... 1 more field") + } else { + builder.append(" ... " + (fields.length - 2) + " more fields") + } + } builder.append(">").toString() } From 66aac3b5e4b36c75f9cde7edd8659475b0a1d305 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Wed, 8 Jun 2016 11:35:23 -0700 Subject: [PATCH 9/9] Wed Jun 8 11:35:23 PDT 2016 --- .../main/scala/org/apache/spark/sql/types/StructType.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index e73edb59f42f6..436512ff69335 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -294,8 +294,8 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru override def defaultSize: Int = fields.map(_.dataType.defaultSize).sum override def simpleString: String = { - val fieldTypes = fields.map(field => s"${field.name}: ${field.dataType.simpleString}") - Utils.truncatedString(fieldTypes, "struct<", ", ", ">") + val fieldTypes = fields.view.map(field => s"${field.name}:${field.dataType.simpleString}") + Utils.truncatedString(fieldTypes, "struct<", ",", ">") } override def sql: String = {