diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 28261a4520a2..3aa5ed9efcbb 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -281,8 +281,16 @@ class KryoSerializationStream( private[this] var kryo: Kryo = serInstance.borrowKryo() + private[this] var kryoCorrupted: Boolean = false + override def writeObject[T: ClassTag](t: T): SerializationStream = { - kryo.writeClassAndObject(output, t) + try { + kryo.writeClassAndObject(output, t) + } catch { + case e: NegativeArraySizeException => + kryoCorrupted = true + throw e + } this } @@ -298,7 +306,9 @@ class KryoSerializationStream( try { output.close() } finally { - serInstance.releaseKryo(kryo) + if (!kryoCorrupted) { + serInstance.releaseKryo(kryo) + } kryo = null output = null } @@ -445,6 +455,7 @@ private[spark] class KryoSerializerInstance( override def serialize[T: ClassTag](t: T): ByteBuffer = { output.clear() val kryo = borrowKryo() + var kryoCorrupted = false try { kryo.writeClassAndObject(output, t) } catch { @@ -455,8 +466,13 @@ private[spark] class KryoSerializerInstance( "exceptionMsg" -> e.getMessage, "bufferSizeConfKey" -> KRYO_SERIALIZER_MAX_BUFFER_SIZE.key), cause = e) + case e: NegativeArraySizeException => + kryoCorrupted = true + throw e } finally { - releaseKryo(kryo) + if (!kryoCorrupted) { + releaseKryo(kryo) + } } ByteBuffer.wrap(output.toBytes) }