Skip to content

Commit

Permalink
implemented @squito suggestion to use a codec that is already in spark
Browse files Browse the repository at this point in the history
  • Loading branch information
Joseph Batchik committed Jul 14, 2015
1 parent 6d1925c commit 0f5471a
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 27 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.collection.mutable.LinkedHashSet

import org.apache.avro.{Schema, SchemaNormalization}
import org.apache.avro.Schema

import org.apache.spark.serializer.GenericAvroSerializer.{avroSchemaNamespace, avroSchemaKey}
import org.apache.spark.serializer.KryoSerializer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.spark.serializer

import java.io.ByteArrayOutputStream
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.nio.ByteBuffer
import java.util.zip.{Inflater, Deflater}

import org.apache.commons.io.IOUtils
import org.apache.spark.io.CompressionCodec
import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}

import scala.collection.mutable

Expand Down Expand Up @@ -66,18 +69,11 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
* same schema is compressed many times over
*/
def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, {
val deflater = new Deflater(Deflater.BEST_COMPRESSION)
val schemaBytes = schema.toString.getBytes("UTF-8")
deflater.setInput(schemaBytes)
deflater.finish()
val buffer = Array.ofDim[Byte](schemaBytes.length)
val outputStream = new ByteArrayOutputStream(schemaBytes.length)
while(!deflater.finished()) {
val count = deflater.deflate(buffer)
outputStream.write(buffer, 0, count)
}
outputStream.close()
outputStream.toByteArray
val bos = new ByteArrayOutputStream()
val out = new SnappyOutputStream(bos)
out.write(schema.toString.getBytes("UTF-8"))
out.close()
bos.toByteArray
})


Expand All @@ -86,18 +82,9 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
* seen values so to limit the number of times that decompression has to be done.
*/
def decompress(schemaBytes: ByteBuffer): Schema = decompressCache.getOrElseUpdate(schemaBytes, {
val inflater = new Inflater()
val bytes = schemaBytes.array()
inflater.setInput(bytes)
val outputStream = new ByteArrayOutputStream(bytes.length)
val tmpBuffer = Array.ofDim[Byte](1024)
while (!inflater.finished()) {
val count = inflater.inflate(tmpBuffer)
outputStream.write(tmpBuffer, 0, count)
}
inflater.end()
outputStream.close()
new Schema.Parser().parse(new String(outputStream.toByteArray, "UTF-8"))
val bis = new ByteArrayInputStream(schemaBytes.array())
val bytes = IOUtils.toByteArray(new SnappyInputStream(bis))
new Schema.Parser().parse(new String(bytes, "UTF-8"))
})

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, Roaring
import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network.nio.{GetBlock, GotBlock, PutBlock}
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.nio.ByteBuffer
import com.esotericsoftware.kryo.io.{Output, Input}
import org.apache.avro.generic.GenericData.Record
import org.apache.avro.{SchemaBuilder, Schema}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.{SparkFunSuite, SharedSparkContext}

class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {
Expand Down

0 comments on commit 0f5471a

Please sign in to comment.