Skip to content

Commit

Permalink
fixed to changes suggested by @squito
Browse files Browse the repository at this point in the history
  • Loading branch information
Joseph Batchik committed Jul 14, 2015
1 parent d421bf5 commit 6d1925c
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 27 deletions.
10 changes: 5 additions & 5 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@
<name>Spark Project Core</name>
<url>http://spark.apache.org/</url>
<dependencies>
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.8.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
Expand Down Expand Up @@ -403,6 +398,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.8.2.1</version>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
11 changes: 5 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -166,17 +166,16 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
* Use Kryo serialization and register the given set of Avro schemas so that the generic
* record serializer can decrease network IO
*/
def registerAvroSchema(schemas: Array[Schema]): SparkConf =
schemas.foldLeft(this) { (conf, schema) =>
conf.set(avroSchemaKey(SchemaNormalization.parsingFingerprint64(schema)), schema.toString)
}
def registerAvroSchemas(schemas: Schema*): SparkConf = schemas.foldLeft(this) { (conf, schema) =>
conf.set(avroSchemaKey(schema), schema.toString)
}

/** Gets all the avro schemas in the configuration used in the generic Avro record serializer */
def getAvroSchema: Map[Long, String] = {
def getAvroSchema: Map[Long, String] =
getAll.filter { case (k, v) => k.startsWith(avroSchemaNamespace) }
.map { case (k, v) => (k.substring(avroSchemaNamespace.length).toLong, v) }
.toMap
}


/** Remove a parameter from the configuration */
def remove(key: String): SparkConf = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@ import scala.collection.mutable

import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import org.apache.avro.{Schema, SchemaNormalization}
import org.apache.avro.generic.{GenericData, GenericRecord}
import org.apache.avro.io._
import org.apache.avro.{Schema, SchemaNormalization}

object GenericAvroSerializer {
val avroSchemaNamespace = "avro.schema."
def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + fingerprint
/**
*
*/
private[spark] object GenericAvroSerializer {
final val avroSchemaNamespace = "avro.schema."
def avroSchemaKey(schema: Schema): String =
avroSchemaNamespace + SchemaNormalization.parsingFingerprint64(schema)
}

/**
Expand All @@ -41,7 +45,8 @@ object GenericAvroSerializer {
* Actions like parsing or compressing schemas are computationally expensive so the serializer
* caches all previously seen values as to reduce the amount of work needed to do.
*/
class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[GenericRecord] {
private[serializer] class GenericAvroSerializer(schemas: Map[Long, String])
extends KSerializer[GenericRecord] {

/** Used to reduce the amount of effort to compress the schema */
private val compressCache = new mutable.HashMap[Schema, Array[Byte]]()
Expand All @@ -51,12 +56,10 @@ class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[Gene
private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]()
private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]()

/** Fingerprinting is very expensive to this alleviates most of the work */
/** Fingerprinting is very expensive so this alleviates most of the work */
private val fingerprintCache = new mutable.HashMap[Schema, Long]()
private val schemaCache = new mutable.HashMap[Long, Schema]()

private def getSchema(fingerprint: Long): Option[String] = schemas.get(fingerprint)

/**
* Used to compress Schemas when they are being sent over the wire.
* The compression results are memoized to reduce the compression time since the
Expand Down Expand Up @@ -107,7 +110,7 @@ class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[Gene
val fingerprint = fingerprintCache.getOrElseUpdate(schema, {
SchemaNormalization.parsingFingerprint64(schema)
})
getSchema(fingerprint) match {
schemas.get(fingerprint) match {
case Some(_) => {
output.writeBoolean(true)
output.writeLong(fingerprint)
Expand All @@ -122,8 +125,8 @@ class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[Gene
}

writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema))
.asInstanceOf[DatumWriter[R]]
.write(datum, encoder)
.asInstanceOf[DatumWriter[R]]
.write(datum, encoder)
encoder.flush()
}

Expand All @@ -136,7 +139,7 @@ class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[Gene
if (input.readBoolean()) {
val fingerprint = input.readLong()
schemaCache.getOrElseUpdate(fingerprint, {
getSchema(fingerprint) match {
schemas.get(fingerprint) match {
case Some(s) => new Schema.Parser().parse(s)
case None => throw new RuntimeException(s"Unknown fingerprint: $fingerprint")
}
Expand All @@ -148,8 +151,8 @@ class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[Gene
}
val decoder = DecoderFactory.get.directBinaryDecoder(input, null)
readerCache.getOrElseUpdate(schema, GenericData.get.createDatumReader(schema))
.asInstanceOf[DatumReader[GenericRecord]]
.read(null.asInstanceOf[GenericRecord], decoder)
.asInstanceOf[DatumReader[GenericRecord]]
.read(null, decoder)
}

override def write(kryo: Kryo, output: KryoOutput, datum: GenericRecord): Unit =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ class KryoSerializer(conf: SparkConf)
private val classesToRegister = conf.get("spark.kryo.classesToRegister", "")
.split(',')
.filter(!_.isEmpty)
conf.getExecutorEnv

private val avroSchemas = conf.getAvroSchema

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {
output.flush()
val normalLength = output.total - beginningNormalPosition

conf.registerAvroSchema(Array(schema))
conf.registerAvroSchemas(schema)
val genericSerFinger = new GenericAvroSerializer(conf.getAvroSchema)
val beginningFingerprintPosition = output.total()
genericSerFinger.serializeDatum(record, output)
Expand Down

0 comments on commit 6d1925c

Please sign in to comment.