diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 474c03fdd1575..4161792976c7b 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -169,9 +169,10 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { * record serializer can decrease network IO */ def registerAvroSchemas(schemas: Schema*): SparkConf = { - schemas.foldLeft(this) { (conf, schema) => - conf.set(avroNamespace + SchemaNormalization.parsingFingerprint64(schema), schema.toString) + for (schema <- schemas) { + set(avroNamespace + SchemaNormalization.parsingFingerprint64(schema), schema.toString) } + this } /** Gets all the avro schemas in the configuration used in the generic Avro record serializer */ diff --git a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala index 3105bc7fbf386..b8e8fa3be8768 100644 --- a/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/GenericAvroSerializer.scala @@ -29,7 +29,7 @@ import org.apache.avro.generic.{GenericData, GenericRecord} import org.apache.avro.io._ import org.apache.commons.io.IOUtils -import org.apache.spark.SparkEnv +import org.apache.spark.{SparkException, SparkEnv} import org.apache.spark.io.CompressionCodec /** @@ -38,6 +38,9 @@ import org.apache.spark.io.CompressionCodec * schema, as to reduce network IO. * Actions like parsing or compressing schemas are computationally expensive so the serializer * caches all previously seen values as to reduce the amount of work needed to do. + * @param schemas a map where the keys are unique IDs for Avro schemas and the values are the + * string representation of the Avro schema, used to decrease the amount of data + * that needs to be serialized. */ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) extends KSerializer[GenericRecord] { @@ -118,7 +121,12 @@ private[serializer] class GenericAvroSerializer(schemas: Map[Long, String]) schemaCache.getOrElseUpdate(fingerprint, { schemas.get(fingerprint) match { case Some(s) => new Schema.Parser().parse(s) - case None => throw new RuntimeException(s"Unknown fingerprint: $fingerprint") + case None => + throw new SparkException( + """Error reading attempting to read avro data -- + |encountered an unknown fingerprint: $fingerprint, not sure what schema to use. + |This could happen if you registered additional schemas after starting your + |spark context.""".stripMargin) } }) } else {