From 0f5471ae7c2a95c38152640053e38b14154125dd Mon Sep 17 00:00:00 2001 From: Joseph Batchik Date: Tue, 14 Jul 2015 12:52:07 -0700 Subject: [PATCH] implemented @squito suggestion to use a codec that is already in spark --- .../scala/org/apache/spark/SparkConf.scala | 2 +- .../serializer/GenericAvroSerializer.scala | 39 +++++++------------ .../spark/serializer/KryoSerializer.scala | 1 + .../GenericAvroSerializerSuite.scala | 1 + 4 files changed, 16 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 0d7a1378bc9a1..a928d0998dff6 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable.LinkedHashSet -import org.apache.avro.{Schema, SchemaNormalization} +import org.apache.avro.Schema import org.apache.spark.serializer.GenericAvroSerializer.{avroSchemaNamespace, avroSchemaKey} import org.apache.spark.serializer.KryoSerializer diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala index 6a4a5c2f2395f..58697178cee0c 100644 --- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala @@ -17,9 +17,12 @@ package org.apache.spark.serializer -import java.io.ByteArrayOutputStream +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.nio.ByteBuffer -import java.util.zip.{Inflater, Deflater} + +import org.apache.commons.io.IOUtils +import org.apache.spark.io.CompressionCodec +import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} import scala.collection.mutable @@ -66,18 +69,11 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) * same schema is compressed many times over */ def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, { - val deflater = new Deflater(Deflater.BEST_COMPRESSION) - val schemaBytes = schema.toString.getBytes("UTF-8") - deflater.setInput(schemaBytes) - deflater.finish() - val buffer = Array.ofDim[Byte](schemaBytes.length) - val outputStream = new ByteArrayOutputStream(schemaBytes.length) - while(!deflater.finished()) { - val count = deflater.deflate(buffer) - outputStream.write(buffer, 0, count) - } - outputStream.close() - outputStream.toByteArray + val bos = new ByteArrayOutputStream() + val out = new SnappyOutputStream(bos) + out.write(schema.toString.getBytes("UTF-8")) + out.close() + bos.toByteArray }) @@ -86,18 +82,9 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) * seen values so to limit the number of times that decompression has to be done. */ def decompress(schemaBytes: ByteBuffer): Schema = decompressCache.getOrElseUpdate(schemaBytes, { - val inflater = new Inflater() - val bytes = schemaBytes.array() - inflater.setInput(bytes) - val outputStream = new ByteArrayOutputStream(bytes.length) - val tmpBuffer = Array.ofDim[Byte](1024) - while (!inflater.finished()) { - val count = inflater.inflate(tmpBuffer) - outputStream.write(tmpBuffer, 0, count) - } - inflater.end() - outputStream.close() - new Schema.Parser().parse(new String(outputStream.toByteArray, "UTF-8")) + val bis = new ByteArrayInputStream(schemaBytes.array()) + val bytes = IOUtils.toByteArray(new SnappyInputStream(bis)) + new Schema.Parser().parse(new String(bytes, "UTF-8")) }) /** diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 0ae3d5ce30ae8..06271d9c590dd 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -34,6 +34,7 @@ import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, Roaring import org.apache.spark._ import org.apache.spark.api.python.PythonBroadcast import org.apache.spark.broadcast.HttpBroadcast +import org.apache.spark.io.CompressionCodec import org.apache.spark.network.nio.{GetBlock, GotBlock, PutBlock} import org.apache.spark.network.util.ByteUnit import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus} diff --git a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala index cfc1baf930ddc..00eab599ddeef 100644 --- a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala @@ -23,6 +23,7 @@ import java.nio.ByteBuffer import com.esotericsoftware.kryo.io.{Output, Input} import org.apache.avro.generic.GenericData.Record import org.apache.avro.{SchemaBuilder, Schema} +import org.apache.spark.io.CompressionCodec import org.apache.spark.{SparkFunSuite, SharedSparkContext} class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {