Skip to content

Commit

Permalink
create internal wrapper around KryoPool to implement reset functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
patrickbrownsync committed Nov 7, 2018
1 parent 3c00321 commit 3bfc4eb
Showing 1 changed file with 23 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import scala.util.control.NonFatal
import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
import com.esotericsoftware.kryo.io.{UnsafeInput => KryoUnsafeInput, UnsafeOutput => KryoUnsafeOutput}
import com.esotericsoftware.kryo.pool.{KryoFactory, KryoPool}
import com.esotericsoftware.kryo.pool.{KryoCallback, KryoFactory, KryoPool}
import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}
import org.apache.avro.generic.{GenericData, GenericRecord}
Expand Down Expand Up @@ -101,13 +101,29 @@ class KryoSerializer(conf: SparkConf)
}
}

@transient
var pool: KryoPool = getPool
private class PoolWrapper extends KryoPool {
private var pool: KryoPool = getPool

override def borrow(): Kryo = pool.borrow()

override def release(kryo: Kryo): Unit = pool.release(kryo)

override def run[T](kryoCallback: KryoCallback[T]): T = pool.run(kryoCallback)

def reset(): Unit = {
pool = getPool
}

private def getPool: KryoPool = {
new KryoPool.Builder(factory).softReferences.build
private def getPool: KryoPool = {
new KryoPool.Builder(factory).softReferences.build
}
}

@transient
private lazy val internalPool = new PoolWrapper

def pool: KryoPool = internalPool

def newKryo(): Kryo = {
val instantiator = new EmptyScalaKryoInstantiator
val kryo = instantiator.newKryo()
Expand Down Expand Up @@ -231,8 +247,8 @@ class KryoSerializer(conf: SparkConf)
}

override def setDefaultClassLoader(classLoader: ClassLoader): Serializer = {
defaultClassLoader = Some(classLoader)
pool = getPool
super.setDefaultClassLoader(classLoader)
internalPool.reset()
this
}

Expand Down

0 comments on commit 3bfc4eb

Please sign in to comment.