Skip to content

Commit

Permalink
Use UTF8String.
Browse files Browse the repository at this point in the history
  • Loading branch information
yhuai committed Apr 15, 2015
1 parent 60a1487 commit 791b96a
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.{SparkEnv, HashPartitioner, RangePartitioner, SparkConf}
import org.apache.spark.{SparkEnv, HashPartitioner, RangePartitioner}
import org.apache.spark.rdd.{RDD, ShuffledRDD}
import org.apache.spark.serializer.Serializer
import org.apache.spark.sql.{SQLContext, Row}
Expand Down Expand Up @@ -79,7 +79,7 @@ case class Exchange(
}
}

private lazy val sparkConf = child.sqlContext.sparkContext.getConf
@transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf

def serializer(
keySchema: Array[DataType],
Expand All @@ -92,10 +92,10 @@ case class Exchange(
SparkSqlSerializer2.support(valueSchema)

val serializer = if (useSqlSerializer2) {
logInfo("Use ShuffleSerializer")
logInfo("Use SparkSqlSerializer2.")
new SparkSqlSerializer2(keySchema, valueSchema)
} else {
logInfo("Use SparkSqlSerializer")
logInfo("Use SparkSqlSerializer.")
new SparkSqlSerializer(sparkConf)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,9 @@ private[sql] object SparkSqlSerializer2 {
out.writeByte(NULL)
} else {
out.writeByte(NOT_NULL)
// TODO: Update it once the string improvement is in.
out.writeUTF(row.getString(i))
val bytes = row.getAs[UTF8String](i).getBytes
out.writeInt(bytes.length)
out.write(bytes)
}

case BinaryType =>
Expand Down Expand Up @@ -386,8 +387,11 @@ private[sql] object SparkSqlSerializer2 {
if (in.readByte() == NULL) {
mutableRow.setNullAt(i)
} else {
// TODO: Update it once the string improvement is in.
mutableRow.setString(i, in.readUTF())
// TODO: reuse the byte array in the UTF8String.
val length = in.readInt()
val bytes = new Array[Byte](length)
in.readFully(bytes)
mutableRow.update(i, UTF8String(bytes))
}

case BinaryType =>
Expand Down

0 comments on commit 791b96a

Please sign in to comment.