-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-746][CORE] Added Avro Serialization to Kryo #7004
Changes from 4 commits
97fba62
2b545cc
f4ae251
ab46d10
d421bf5
6d1925c
0f5471a
c5fe794
fa9298b
1183a48
dd71efe
c0cf329
8158d51
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -398,6 +398,40 @@ | |
<artifactId>py4j</artifactId> | ||
<version>0.8.2.1</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.avro</groupId> | ||
<artifactId>avro</artifactId> | ||
<version>${avro.version}</version> | ||
<scope>${hadoop.deps.scope}</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.avro</groupId> | ||
<artifactId>avro-mapred</artifactId> | ||
<version>${avro.version}</version> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similarly to the previous one, this one should only contain groupId / artifactId (everything else is inherited). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The one thing you have to add here, though, is:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for pointing that out, just fixed that. |
||
<scope>${hadoop.deps.scope}</scope> | ||
<exclusions> | ||
<exclusion> | ||
<groupId>io.netty</groupId> | ||
<artifactId>netty</artifactId> | ||
</exclusion> | ||
<exclusion> | ||
<groupId>org.mortbay.jetty</groupId> | ||
<artifactId>jetty</artifactId> | ||
</exclusion> | ||
<exclusion> | ||
<groupId>org.mortbay.jetty</groupId> | ||
<artifactId>jetty-util</artifactId> | ||
</exclusion> | ||
<exclusion> | ||
<groupId>org.mortbay.jetty</groupId> | ||
<artifactId>servlet-api</artifactId> | ||
</exclusion> | ||
<exclusion> | ||
<groupId>org.apache.velocity</groupId> | ||
<artifactId>velocity</artifactId> | ||
</exclusion> | ||
</exclusions> | ||
</dependency> | ||
</dependencies> | ||
<build> | ||
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,11 +18,13 @@ | |
package org.apache.spark | ||
|
||
import java.util.concurrent.ConcurrentHashMap | ||
import java.util.concurrent.atomic.AtomicBoolean | ||
|
||
import scala.collection.JavaConverters._ | ||
import scala.collection.mutable.LinkedHashSet | ||
|
||
import org.apache.avro.{Schema, SchemaNormalization} | ||
|
||
import org.apache.spark.serializer.GenericAvroSerializer.{avroSchemaNamespace, avroSchemaKey} | ||
import org.apache.spark.serializer.KryoSerializer | ||
import org.apache.spark.util.Utils | ||
|
||
|
@@ -160,6 +162,21 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { | |
set("spark.serializer", classOf[KryoSerializer].getName) | ||
this | ||
} | ||
/** | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add a newline |
||
* Use Kryo serialization and register the given set of Avro schemas so that the generic | ||
* record serializer can decrease network IO | ||
*/ | ||
def registerAvroSchema(schemas: Array[Schema]): SparkConf = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how about using varargs here? then you could call using any of:
and maybe rename the method to plural, |
||
schemas.foldLeft(this) { (conf, schema) => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you need a fold here? This seems confusing. Why not just loop over the schemas and call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it, changed. |
||
conf.set(avroSchemaKey(SchemaNormalization.parsingFingerprint64(schema)), schema.toString) | ||
} | ||
|
||
/** Gets all the avro schemas in the configuration used in the generic Avro record serializer */ | ||
def getAvroSchema: Map[Long, String] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do the keys and values of this map denote? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The keys are longs, which represent the a unique ID of the schema and the values are the string representation of the schema. |
||
getAll.filter { case (k, v) => k.startsWith(avroSchemaNamespace) } | ||
.map { case (k, v) => (k.substring(avroSchemaNamespace.length).toLong, v) } | ||
.toMap | ||
} | ||
|
||
/** Remove a parameter from the configuration */ | ||
def remove(key: String): SparkConf = { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,161 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.serializer | ||
|
||
import java.io.ByteArrayOutputStream | ||
import java.nio.ByteBuffer | ||
import java.util.zip.{Inflater, Deflater} | ||
|
||
import scala.collection.mutable | ||
|
||
import com.esotericsoftware.kryo.{Kryo, Serializer => KSerializer} | ||
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} | ||
import org.apache.avro.generic.{GenericData, GenericRecord} | ||
import org.apache.avro.io._ | ||
import org.apache.avro.{Schema, SchemaNormalization} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. super nit: class imports go before more deeply nested packages (its not just alphabetic), so it should be:
I recommend using the plugin (not IntelliJ's builtin ordering) as described on the wiki: https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide#SparkCodeStyleGuide-Imports |
||
|
||
object GenericAvroSerializer { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
val avroSchemaNamespace = "avro.schema." | ||
def avroSchemaKey(fingerprint: Long): String = avroSchemaNamespace + fingerprint | ||
} | ||
|
||
/** | ||
* Custom serializer used for generic Avro records. If the user registers the schemas | ||
* ahead of time, then the schema's fingerprint will be sent with each message instead of the actual | ||
* schema, as to reduce network IO. | ||
* Actions like parsing or compressing schemas are computationally expensive so the serializer | ||
* caches all previously seen values as to reduce the amount of work needed to do. | ||
*/ | ||
class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[GenericRecord] { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
/** Used to reduce the amount of effort to compress the schema */ | ||
private val compressCache = new mutable.HashMap[Schema, Array[Byte]]() | ||
private val decompressCache = new mutable.HashMap[ByteBuffer, Schema]() | ||
|
||
/** Reuses the same datum reader/writer since the same schema will be used many times */ | ||
private val writerCache = new mutable.HashMap[Schema, DatumWriter[_]]() | ||
private val readerCache = new mutable.HashMap[Schema, DatumReader[_]]() | ||
|
||
/** Fingerprinting is very expensive to this alleviates most of the work */ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. typo: so |
||
private val fingerprintCache = new mutable.HashMap[Schema, Long]() | ||
private val schemaCache = new mutable.HashMap[Long, Schema]() | ||
|
||
private def getSchema(fingerprint: Long): Option[String] = schemas.get(fingerprint) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this method seems kinda pointless, same thing to inline everywhere |
||
|
||
/** | ||
* Used to compress Schemas when they are being sent over the wire. | ||
* The compression results are memoized to reduce the compression time since the | ||
* same schema is compressed many times over | ||
*/ | ||
def compress(schema: Schema): Array[Byte] = compressCache.getOrElseUpdate(schema, { | ||
val deflater = new Deflater(Deflater.BEST_COMPRESSION) | ||
val schemaBytes = schema.toString.getBytes("UTF-8") | ||
deflater.setInput(schemaBytes) | ||
deflater.finish() | ||
val buffer = Array.ofDim[Byte](schemaBytes.length) | ||
val outputStream = new ByteArrayOutputStream(schemaBytes.length) | ||
while(!deflater.finished()) { | ||
val count = deflater.deflate(buffer) | ||
outputStream.write(buffer, 0, count) | ||
} | ||
outputStream.close() | ||
outputStream.toByteArray | ||
}) | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: delete extra newline |
||
/** | ||
* Decompresses the schema into the actual in-memory object. Keeps an internal cache of already | ||
* seen values so to limit the number of times that decompression has to be done. | ||
*/ | ||
def decompress(schemaBytes: ByteBuffer): Schema = decompressCache.getOrElseUpdate(schemaBytes, { | ||
val inflater = new Inflater() | ||
val bytes = schemaBytes.array() | ||
inflater.setInput(bytes) | ||
val outputStream = new ByteArrayOutputStream(bytes.length) | ||
val tmpBuffer = Array.ofDim[Byte](1024) | ||
while (!inflater.finished()) { | ||
val count = inflater.inflate(tmpBuffer) | ||
outputStream.write(tmpBuffer, 0, count) | ||
} | ||
inflater.end() | ||
outputStream.close() | ||
new Schema.Parser().parse(new String(outputStream.toByteArray, "UTF-8")) | ||
}) | ||
|
||
/** | ||
* Serializes a record to the given output stream. It caches a lot of the internal data as | ||
* to not redo work | ||
*/ | ||
def serializeDatum[R <: GenericRecord](datum: R, output: KryoOutput): Unit = { | ||
val encoder = EncoderFactory.get.binaryEncoder(output, null) | ||
val schema = datum.getSchema | ||
val fingerprint = fingerprintCache.getOrElseUpdate(schema, { | ||
SchemaNormalization.parsingFingerprint64(schema) | ||
}) | ||
getSchema(fingerprint) match { | ||
case Some(_) => { | ||
output.writeBoolean(true) | ||
output.writeLong(fingerprint) | ||
} | ||
case None => { | ||
output.writeBoolean(false) | ||
val compressedSchema = compress(schema) | ||
output.writeInt(compressedSchema.length) | ||
output.writeBytes(compressedSchema) | ||
|
||
} | ||
} | ||
|
||
writerCache.getOrElseUpdate(schema, GenericData.get.createDatumWriter(schema)) | ||
.asInstanceOf[DatumWriter[R]] | ||
.write(datum, encoder) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: just indent these lines 2 extra spaces, don't vertical align |
||
encoder.flush() | ||
} | ||
|
||
/** | ||
* Deserializes generic records into their in-memory form. There is internal | ||
* state to keep a cache of already seen schemas and datum readers. | ||
*/ | ||
def deserializeDatum(input: KryoInput): GenericRecord = { | ||
val schema = { | ||
if (input.readBoolean()) { | ||
val fingerprint = input.readLong() | ||
schemaCache.getOrElseUpdate(fingerprint, { | ||
getSchema(fingerprint) match { | ||
case Some(s) => new Schema.Parser().parse(s) | ||
case None => throw new RuntimeException(s"Unknown fingerprint: $fingerprint") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know it'll be apparent in the stack trace, but what do you think about making the message was just a bit more clear, eg. "Error reading attempting to read avro data -- encountered an unknown fingerprint: $fingerprint, not sure what schema to use. This could happen if you registered additional schemas after starting your spark context." (I think that explanation is plausible anyway ...) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I will change it to throw a |
||
} | ||
}) | ||
} else { | ||
val length = input.readInt() | ||
decompress(ByteBuffer.wrap(input.readBytes(length))) | ||
} | ||
} | ||
val decoder = DecoderFactory.get.directBinaryDecoder(input, null) | ||
readerCache.getOrElseUpdate(schema, GenericData.get.createDatumReader(schema)) | ||
.asInstanceOf[DatumReader[GenericRecord]] | ||
.read(null.asInstanceOf[GenericRecord], decoder) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here on indentation There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the |
||
} | ||
|
||
override def write(kryo: Kryo, output: KryoOutput, datum: GenericRecord): Unit = | ||
serializeDatum(datum, output) | ||
|
||
override def read(kryo: Kryo, input: KryoInput, datumClass: Class[GenericRecord]): GenericRecord = | ||
deserializeDatum(input) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,6 +21,8 @@ import java.io.{EOFException, IOException, InputStream, OutputStream} | |
import java.nio.ByteBuffer | ||
import javax.annotation.Nullable | ||
|
||
import org.apache.avro.generic.{GenericData, GenericRecord} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: order |
||
|
||
import scala.reflect.ClassTag | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: import order, should be below scala imports |
||
|
||
import com.esotericsoftware.kryo.{Kryo, KryoException} | ||
|
@@ -72,6 +74,9 @@ class KryoSerializer(conf: SparkConf) | |
private val classesToRegister = conf.get("spark.kryo.classesToRegister", "") | ||
.split(',') | ||
.filter(!_.isEmpty) | ||
conf.getExecutorEnv | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. looks like a stray line |
||
|
||
private val avroSchemas = conf.getAvroSchema | ||
|
||
def newKryoOutput(): KryoOutput = new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize)) | ||
|
||
|
@@ -99,6 +104,9 @@ class KryoSerializer(conf: SparkConf) | |
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer()) | ||
kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer()) | ||
|
||
kryo.register(classOf[GenericRecord], new GenericAvroSerializer(avroSchemas)) | ||
kryo.register(classOf[GenericData.Record], new GenericAvroSerializer(avroSchemas)) | ||
|
||
try { | ||
// Use the default classloader when calling the user registrator. | ||
Thread.currentThread.setContextClassLoader(classLoader) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.serializer | ||
|
||
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, OutputStream} | ||
import java.nio.ByteBuffer | ||
|
||
import com.esotericsoftware.kryo.io.{Output, Input} | ||
import org.apache.avro.generic.GenericData.Record | ||
import org.apache.avro.{SchemaBuilder, Schema} | ||
import org.apache.spark.{SparkFunSuite, SharedSparkContext} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: empty line between other imports and spark imports
|
||
|
||
class GenericAvroSerializerSuite extends SparkFunSuite with SharedSparkContext { | ||
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") | ||
|
||
val schema : Schema = SchemaBuilder | ||
.record("testRecord").fields() | ||
.requiredString("data") | ||
.endRecord() | ||
val record = new Record(schema) | ||
record.put("data", "test data") | ||
|
||
test("schema compression and decompression") { | ||
val genericSer = new GenericAvroSerializer(conf.getAvroSchema) | ||
assert(schema === genericSer.decompress(ByteBuffer.wrap(genericSer.compress(schema)))) | ||
} | ||
|
||
test("record serialization and deserialization") { | ||
val genericSer = new GenericAvroSerializer(conf.getAvroSchema) | ||
|
||
val outputStream = new ByteArrayOutputStream() | ||
val output = new Output(outputStream) | ||
genericSer.serializeDatum(record, output) | ||
output.flush() | ||
output.close() | ||
|
||
val input = new Input(new ByteArrayInputStream(outputStream.toByteArray)) | ||
assert(genericSer.deserializeDatum(input) === record) | ||
} | ||
|
||
test("uses schema fingerprint to decrease message size") { | ||
val genericSerFull = new GenericAvroSerializer(conf.getAvroSchema) | ||
|
||
val output = new Output(new ByteArrayOutputStream()) | ||
|
||
val beginningNormalPosition = output.total() | ||
genericSerFull.serializeDatum(record, output) | ||
output.flush() | ||
val normalLength = output.total - beginningNormalPosition | ||
|
||
conf.registerAvroSchema(Array(schema)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if you switch to varargs this could just be |
||
val genericSerFinger = new GenericAvroSerializer(conf.getAvroSchema) | ||
val beginningFingerprintPosition = output.total() | ||
genericSerFinger.serializeDatum(record, output) | ||
val fingerprintLength = output.total - beginningFingerprintPosition | ||
|
||
assert(fingerprintLength < normalLength) | ||
} | ||
|
||
test("caches previously seen schemas") { | ||
val genericSer = new GenericAvroSerializer(conf.getAvroSchema) | ||
val compressedSchema = genericSer.compress(schema) | ||
val decompressedScheam = genericSer.decompress(ByteBuffer.wrap(compressedSchema)) | ||
|
||
assert(compressedSchema.eq(genericSer.compress(schema))) | ||
assert(decompressedScheam.eq(genericSer.decompress(ByteBuffer.wrap(compressedSchema)))) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't this included already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
avro
andavro-mapred
are already in thedependencyMangement
of the parent POM, so nothing more thangroupId
,artifactId
and perhapsscope
should be included here. In particular,version
should not generally be specified when it is already in thedependencyManagement
. This is a frequent error in the Spark POMs.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to Mark's comments.