diff --git a/core/pom.xml b/core/pom.xml index 7969cb2454a30..56c3f95720494 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -34,11 +34,6 @@ Spark Project Core http://spark.apache.org/ - - net.sf.py4j - py4j - 0.8.2.1 - org.apache.avro avro-mapred @@ -403,6 +398,11 @@ + + net.sf.py4j + py4j + 0.8.2.1 + target/scala-${scala.binary.version}/classes diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index a08d843dbc8da..0d7a1378bc9a1 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -166,17 +166,16 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { * Use Kryo serialization and register the given set of Avro schemas so that the generic * record serializer can decrease network IO */ - def registerAvroSchema(schemas: Array[Schema]): SparkConf = - schemas.foldLeft(this) { (conf, schema) => - conf.set(avroSchemaKey(SchemaNormalization.parsingFingerprint64(schema)), schema.toString) - } + def registerAvroSchemas(schemas: Schema*): SparkConf = schemas.foldLeft(this) { (conf, schema) => + conf.set(avroSchemaKey(schema), schema.toString) + } /** Gets all the avro schemas in the configuration used in the generic Avro record serializer */ - def getAvroSchema: Map[Long, String] = { + def getAvroSchema: Map[Long, String] = getAll.filter { case (k, v) => k.startsWith(avroSchemaNamespace) } .map { case (k, v) => (k.substring(avroSchemaNamespace.length).toLong, v) } .toMap - } + /** Remove a parameter from the configuration */ def remove(key: String): SparkConf = { diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala index 3071b65f9c70c..6a4a5c2f2395f 100644 --- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala @@ -25,13 +25,17 @@ import scala.collection.mutable import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} +import org.apache.avro.{Schema, SchemaNormalization} import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.avro.io._ -import org.apache.avro.{Schema, SchemaNormalization} -object GenericAvroSerializer { - val avroSchemaNamespace = "avro.schema." - def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + fingerprint +/** + * + */ +private[spark] object GenericAvroSerializer { + final val avroSchemaNamespace = "avro.schema." + def avroSchemaKey(schema: Schema): String = + avroSchemaNamespace + SchemaNormalization.parsingFingerprint64(schema) } /** @@ -41,7 +45,8 @@ object GenericAvroSerializer { * Actions like parsing or compressing schemas are computationally expensive so the serializer * caches all previously seen values as to reduce the amount of work needed to do. */ -class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[GenericRecord] { +private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) + extends KSerializer[GenericRecord] { /** Used to reduce the amount of effort to compress the schema */ private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() @@ -51,12 +56,10 @@ class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[Gene private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() - /** Fingerprinting is very expensive to this alleviates most of the work */ + /** Fingerprinting is very expensive so this alleviates most of the work */ private val fingerprintCache = new mutable.HashMap[Schema, Long]() private val schemaCache = new mutable.HashMap[Long, Schema]() - private def getSchema(fingerprint: Long): Option[String] = schemas.get(fingerprint) - /** * Used to compress Schemas when they are being sent over the wire. * The compression results are memoized to reduce the compression time since the @@ -107,7 +110,7 @@ class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[Gene val fingerprint = fingerprintCache.getOrElseUpdate(schema, { SchemaNormalization.parsingFingerprint64(schema) }) - getSchema(fingerprint) match { + schemas.get(fingerprint) match { case Some(_) => { output.writeBoolean(true) output.writeLong(fingerprint) @@ -122,8 +125,8 @@ class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[Gene } writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema)) - .asInstanceOf[DatumWriter[R]] - .write(datum, encoder) + .asInstanceOf[DatumWriter[R]] + .write(datum, encoder) encoder.flush() } @@ -136,7 +139,7 @@ class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[Gene if (input.readBoolean()) { val fingerprint = input.readLong() schemaCache.getOrElseUpdate(fingerprint, { - getSchema(fingerprint) match { + schemas.get(fingerprint) match { case Some(s) => new Schema.Parser().parse(s) case None => throw new RuntimeException(s"Unknown fingerprint: $fingerprint") } @@ -148,8 +151,8 @@ class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[Gene } val decoder = DecoderFactory.get.directBinaryDecoder(input, null) readerCache.getOrElseUpdate(schema, GenericData.get.createDatumReader(schema)) - .asInstanceOf[DatumReader[GenericRecord]] - .read(null.asInstanceOf[GenericRecord], decoder) + .asInstanceOf[DatumReader[GenericRecord]] + .read(null, decoder) } override def write(kryo: Kryo, output: KryoOutput, datum: GenericRecord): Unit = diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index a1b7deece3995..0ae3d5ce30ae8 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -74,7 +74,6 @@ class KryoSerializer(conf: SparkConf) private val classesToRegister = conf.get("spark.kryo.classesToRegister", "") .split(',') .filter(!_.isEmpty) - conf.getExecutorEnv private val avroSchemas = conf.getAvroSchema diff --git a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala index ffb05a9c79c66..cfc1baf930ddc 100644 --- a/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/GenericAvroSerializerSuite.scala @@ -63,7 +63,7 @@ class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { output.flush() val normalLength = output.total - beginningNormalPosition - conf.registerAvroSchema(Array(schema)) + conf.registerAvroSchemas(schema) val genericSerFinger = new GenericAvroSerializer(conf.getAvroSchema) val beginningFingerprintPosition = output.total() genericSerFinger.serializeDatum(record, output)