Skip to content

Commit

Permalink
implemented @squito suggestion
Browse files Browse the repository at this point in the history
  • Loading branch information
Joseph Batchik committed Jul 15, 2015
1 parent 0f5471a commit c5fe794
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 30 deletions.
22 changes: 13 additions & 9 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.collection.mutable.LinkedHashSet

import org.apache.avro.Schema
import org.apache.avro.{SchemaNormalization, Schema}

import org.apache.spark.serializer.GenericAvroSerializer.{avroSchemaNamespace, avroSchemaKey}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -162,20 +161,25 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
set("spark.serializer", classOf[KryoSerializer].getName)
this
}

private final val avroNamespace = "avro.schema."

/**
* Use Kryo serialization and register the given set of Avro schemas so that the generic
* record serializer can decrease network IO
*/
def registerAvroSchemas(schemas: Schema*): SparkConf = schemas.foldLeft(this) { (conf, schema) =>
conf.set(avroSchemaKey(schema), schema.toString)
def registerAvroSchemas(schemas: Schema*): SparkConf = {
schemas.foldLeft(this) { (conf, schema) =>
conf.set(avroNamespace + SchemaNormalization.parsingFingerprint64(schema), schema.toString)
}
}

/** Gets all the avro schemas in the configuration used in the generic Avro record serializer */
def getAvroSchema: Map[Long, String] =
getAll.filter { case (k, v) => k.startsWith(avroSchemaNamespace) }
.map { case (k, v) => (k.substring(avroSchemaNamespace.length).toLong, v) }
.toMap

def getAvroSchema: Map[Long, String] = {
getAll.filter { case (k, v) => k.startsWith(avroNamespace) }
.map { case (k, v) => (k.substring(avroNamespace.length).toLong, v) }
.toMap
}

/** Remove a parameter from the configuration */
def remove(key: String): SparkConf = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,26 +20,17 @@ package org.apache.spark.serializer
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.nio.ByteBuffer

import org.apache.commons.io.IOUtils
import org.apache.spark.io.CompressionCodec
import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}

import scala.collection.mutable

import org.apache.commons.io.IOUtils
import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}

import org.apache.avro.{Schema, SchemaNormalization}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.io._

/**
*
*/
private[spark] object GenericAvroSerializer {
final val avroSchemaNamespace = "avro.schema."
def avroSchemaKey(schema: Schema): String =
avroSchemaNamespace + SchemaNormalization.parsingFingerprint64(schema)
}

/**
* Custom serializer used for generic Avro records. If the user registers the schemas
Expand Down Expand Up @@ -98,17 +89,14 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
SchemaNormalization.parsingFingerprint64(schema)
})
schemas.get(fingerprint) match {
case Some(_) => {
case Some(_) =>
output.writeBoolean(true)
output.writeLong(fingerprint)
}
case None => {
case None =>
output.writeBoolean(false)
val compressedSchema = compress(schema)
output.writeInt(compressedSchema.length)
output.writeBytes(compressedSchema)

}
}

writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema))
Expand Down Expand Up @@ -148,4 +136,3 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
override def read(kryo: Kryo, input: KryoInput, datumClass: Class[GenericRecord]): GenericRecord =
deserializeDatum(input)
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@

package org.apache.spark.serializer

import java.io.{ByteArrayInputStream, ByteArrayOutputStream, OutputStream}
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.nio.ByteBuffer

import com.esotericsoftware.kryo.io.{Output, Input}
import org.apache.avro.generic.GenericData.Record
import org.apache.avro.{SchemaBuilder, Schema}
import org.apache.spark.io.CompressionCodec
import org.apache.avro.generic.GenericData.Record

import org.apache.spark.{SparkFunSuite, SharedSparkContext}

class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {
Expand Down

0 comments on commit c5fe794

Please sign in to comment.