From 791b96a555f4a18e061cca636ae08ba16f6cc30f Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Wed, 15 Apr 2015 16:48:25 -0700 Subject: [PATCH] Use UTF8String. --- .../org/apache/spark/sql/execution/Exchange.scala | 8 ++++---- .../spark/sql/execution/SparkSqlSerializer2.scala | 12 ++++++++---- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 355239c6d67b9..0eb5fab263546 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.shuffle.sort.SortShuffleManager -import org.apache.spark.{SparkEnv, HashPartitioner, RangePartitioner, SparkConf} +import org.apache.spark.{SparkEnv, HashPartitioner, RangePartitioner} import org.apache.spark.rdd.{RDD, ShuffledRDD} import org.apache.spark.serializer.Serializer import org.apache.spark.sql.{SQLContext, Row} @@ -79,7 +79,7 @@ case class Exchange( } } - private lazy val sparkConf = child.sqlContext.sparkContext.getConf + @transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf def serializer( keySchema: Array[DataType], @@ -92,10 +92,10 @@ case class Exchange( SparkSqlSerializer2.support(valueSchema) val serializer = if (useSqlSerializer2) { - logInfo("Use ShuffleSerializer") + logInfo("Use SparkSqlSerializer2.") new SparkSqlSerializer2(keySchema, valueSchema) } else { - logInfo("Use SparkSqlSerializer") + logInfo("Use SparkSqlSerializer.") new SparkSqlSerializer(sparkConf) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala index fbccaa3dfd39c..fdfc8417a3ec6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer2.scala @@ -263,8 +263,9 @@ private[sql] object SparkSqlSerializer2 { out.writeByte(NULL) } else { out.writeByte(NOT_NULL) - // TODO: Update it once the string improvement is in. - out.writeUTF(row.getString(i)) + val bytes = row.getAs[UTF8String](i).getBytes + out.writeInt(bytes.length) + out.write(bytes) } case BinaryType => @@ -386,8 +387,11 @@ private[sql] object SparkSqlSerializer2 { if (in.readByte() == NULL) { mutableRow.setNullAt(i) } else { - // TODO: Update it once the string improvement is in. - mutableRow.setString(i, in.readUTF()) + // TODO: reuse the byte array in the UTF8String. + val length = in.readInt() + val bytes = new Array[Byte](length) + in.readFully(bytes) + mutableRow.update(i, UTF8String(bytes)) } case BinaryType =>