diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index a928d0998dff6..474c03fdd1575 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -22,9 +22,8 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.mutable.LinkedHashSet -import org.apache.avro.Schema +import org.apache.avro.{SchemaNormalization, Schema} -import org.apache.spark.serializer.GenericAvroSerializer.{avroSchemaNamespace, avroSchemaKey} import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.Utils @@ -162,20 +161,25 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { set("spark.serializer", classOf[KryoSerializer].getName) this } + + private final val avroNamespace = "avro.schema." + /** * Use Kryo serialization and register the given set of Avro schemas so that the generic * record serializer can decrease network IO */ - def registerAvroSchemas(schemas: Schema*): SparkConf = schemas.foldLeft(this) { (conf, schema) => - conf.set(avroSchemaKey(schema), schema.toString) + def registerAvroSchemas(schemas: Schema*): SparkConf = { + schemas.foldLeft(this) { (conf, schema) => + conf.set(avroNamespace + SchemaNormalization.parsingFingerprint64(schema), schema.toString) + } } /** Gets all the avro schemas in the configuration used in the generic Avro record serializer */ - def getAvroSchema: Map[Long, String] = - getAll.filter { case (k, v) => k.startsWith(avroSchemaNamespace) } - .map { case (k, v) => (k.substring(avroSchemaNamespace.length).toLong, v) } - .toMap - + def getAvroSchema: Map[Long, String] = { + getAll.filter { case (k, v) => k.startsWith(avroNamespace) } + .map { case (k, v) => (k.substring(avroNamespace.length).toLong, v) } + .toMap + } /** Remove a parameter from the configuration */ def remove(key: String): SparkConf = { diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala index 58697178cee0c..853b1b53ddfc3 100644 --- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala @@ -20,26 +20,17 @@ package org.apache.spark.serializer import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.nio.ByteBuffer -import org.apache.commons.io.IOUtils -import org.apache.spark.io.CompressionCodec -import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} - import scala.collection.mutable +import org.apache.commons.io.IOUtils import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} +import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream} + import org.apache.avro.{Schema, SchemaNormalization} import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.avro.io._ -/** - * - */ -private[spark] object GenericAvroSerializer { - final val avroSchemaNamespace = "avro.schema." - def avroSchemaKey(schema: Schema): String = - avroSchemaNamespace + SchemaNormalization.parsingFingerprint64(schema) -} /** * Custom serializer used for generic Avro records. If the user registers the schemas @@ -98,17 +89,14 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) SchemaNormalization.parsingFingerprint64(schema) }) schemas.get(fingerprint) match { - case Some(_) => { + case Some(_) => output.writeBoolean(true) output.writeLong(fingerprint) - } - case None => { + case None => output.writeBoolean(false) val compressedSchema = compress(schema) output.writeInt(compressedSchema.length) output.writeBytes(compressedSchema) - - } } writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema)) @@ -148,4 +136,3 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) override def read(kryo: Kryo, input: KryoInput, datumClass: Class[GenericRecord]): GenericRecord = deserializeDatum(input) } - diff --git a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala index 00eab599ddeef..bc9f3708ed69d 100644 --- a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala @@ -17,13 +17,13 @@ package org.apache.spark.serializer -import java.io.{ByteArrayInputStream, ByteArrayOutputStream, OutputStream} +import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.nio.ByteBuffer import com.esotericsoftware.kryo.io.{Output, Input} -import org.apache.avro.generic.GenericData.Record import org.apache.avro.{SchemaBuilder, Schema} -import org.apache.spark.io.CompressionCodec +import org.apache.avro.generic.GenericData.Record + import org.apache.spark.{SparkFunSuite, SharedSparkContext} class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext {