Skip to content

Commit

Permalink
hotfix: Customized pickler does not work in cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
davies committed Oct 16, 2014
1 parent 293a0b5 commit 0f02050
Show file tree
Hide file tree
Showing 14 changed files with 102 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -746,6 +746,7 @@ private[spark] object PythonRDD extends Logging {
def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = {
pyRDD.rdd.mapPartitions { iter =>
val unpickle = new Unpickler
SerDeUtil.initialize()
iter.flatMap { row =>
unpickle.loads(row) match {
// in case of objects are pickled in batch mode
Expand Down Expand Up @@ -785,7 +786,7 @@ private[spark] object PythonRDD extends Logging {
}.toJavaRDD()
}

private class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {
private[spark] class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {
private val pickle = new Pickler()
private var batch = 1
private val buffer = new mutable.ArrayBuffer[Any]
Expand Down Expand Up @@ -822,6 +823,7 @@ private[spark] object PythonRDD extends Logging {
*/
def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = {
pyRDD.rdd.mapPartitions { iter =>
SerDeUtil.initialize()
val unpickle = new Unpickler
iter.flatMap { row =>
val obj = unpickle.loads(row)
Expand Down
14 changes: 12 additions & 2 deletions core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.{Logging, SparkException}
import org.apache.spark.rdd.RDD

/** Utilities for serialization / deserialization between Python and Java, using Pickle. */
private[python] object SerDeUtil extends Logging {
private[spark] object SerDeUtil extends Logging {
// Unpickle array.array generated by Python 2.6
class ArrayConstructor extends net.razorvine.pickle.objects.ArrayConstructor {
// /* Description of types */
Expand Down Expand Up @@ -76,9 +76,18 @@ private[python] object SerDeUtil extends Logging {
}
}

private var initialized = false
// This should be called before trying to unpickle array.array from Python
// In cluster mode, this should be put in closure
def initialize() = {
Unpickler.registerConstructor("array", "array", new ArrayConstructor())
synchronized{
if (!initialized) {
Unpickler.registerConstructor("array", "array", new ArrayConstructor())
initialized = true
}
}
}
initialize()

private def checkPickle(t: (Any, Any)): (Boolean, Boolean) = {
val pickle = new Pickler
Expand Down Expand Up @@ -143,6 +152,7 @@ private[python] object SerDeUtil extends Logging {
obj.asInstanceOf[Array[_]].length == 2
}
pyRDD.mapPartitions { iter =>
initialize()
val unpickle = new Unpickler
val unpickled =
if (batchSerialized) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@
package org.apache.spark.mllib.api.python

import java.io.OutputStream
import java.util.{ArrayList => JArrayList}

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.language.existentials
import scala.reflect.ClassTag

import net.razorvine.pickle._

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
import org.apache.spark.mllib.feature.Word2Vec
Expand Down Expand Up @@ -506,7 +510,7 @@ class PythonMLLibAPI extends Serializable {
/**
* SerDe utility functions for PythonMLLibAPI.
*/
private[spark] object SerDe extends Serializable {
object SerDe extends Serializable {

val PYSPARK_PACKAGE = "pyspark.mllib"

Expand Down Expand Up @@ -639,13 +643,24 @@ private[spark] object SerDe extends Serializable {
}
}

var initialized = false
// This should be called before trying to serialize any above classes
// In cluster mode, this should be put in the closure
def initialize(): Unit = {
new DenseVectorPickler().register()
new DenseMatrixPickler().register()
new SparseVectorPickler().register()
new LabeledPointPickler().register()
new RatingPickler().register()
SerDeUtil.initialize()
synchronized {
if (!initialized) {
new DenseVectorPickler().register()
new DenseMatrixPickler().register()
new SparseVectorPickler().register()
new LabeledPointPickler().register()
new RatingPickler().register()
initialized = true
}
}
}
// will not called in Executor automatically
initialize()

def dumps(obj: AnyRef): Array[Byte] = {
new Pickler().dumps(obj)
Expand All @@ -659,4 +674,33 @@ private[spark] object SerDe extends Serializable {
def asTupleRDD(rdd: RDD[Array[Any]]): RDD[(Int, Int)] = {
rdd.map(x => (x(0).asInstanceOf[Int], x(1).asInstanceOf[Int]))
}

/**
* Convert an RDD of Java objects to an RDD of serialized Python objects, that is usable by
* PySpark.
*/
def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = {
jRDD.rdd.mapPartitions { iter =>
initialize() // let it called in executor
new PythonRDD.AutoBatchedPickler(iter)
}
}

/**
* Convert an RDD of serialized Python objects to RDD of objects, that is usable by PySpark.
*/
def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = {
pyRDD.rdd.mapPartitions { iter =>
initialize() // let it called in executor
val unpickle = new Unpickler
iter.flatMap { row =>
val obj = unpickle.loads(row)
if (batched) {
obj.asInstanceOf[JArrayList[_]]
} else {
Seq(obj)
}
}
}.toJavaRDD()
}
}
2 changes: 0 additions & 2 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,6 @@ def _ensure_initialized(cls, instance=None, gateway=None):
SparkContext._gateway = gateway or launch_gateway()
SparkContext._jvm = SparkContext._gateway.jvm
SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile
SparkContext._jvm.SerDeUtil.initialize()
SparkContext._jvm.SerDe.initialize()

if instance:
if (SparkContext._active_spark_context and
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/mllib/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from numpy import array

from pyspark import SparkContext, PickleSerializer
from pyspark.mllib.linalg import SparseVector, _convert_to_vector
from pyspark.mllib.linalg import SparseVector, _convert_to_vector, _to_java_object_rdd
from pyspark.mllib.regression import LabeledPoint, LinearModel, _regression_train_wrapper


Expand Down Expand Up @@ -244,7 +244,7 @@ def train(cls, data, lambda_=1.0):
:param lambda_: The smoothing parameter
"""
sc = data.context
jlist = sc._jvm.PythonMLLibAPI().trainNaiveBayes(data._to_java_object_rdd(), lambda_)
jlist = sc._jvm.PythonMLLibAPI().trainNaiveBayes(_to_java_object_rdd(data), lambda_)
labels, pi, theta = PickleSerializer().loads(str(sc._jvm.SerDe.dumps(jlist)))
return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))

Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/mllib/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

from pyspark import SparkContext
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.mllib.linalg import SparseVector, _convert_to_vector
from pyspark.mllib.linalg import SparseVector, _convert_to_vector, _to_java_object_rdd

__all__ = ['KMeansModel', 'KMeans']

Expand Down Expand Up @@ -85,7 +85,7 @@ def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||"
# cache serialized data to avoid objects over head in JVM
cached = rdd.map(_convert_to_vector)._reserialize(AutoBatchedSerializer(ser)).cache()
model = sc._jvm.PythonMLLibAPI().trainKMeansModel(
cached._to_java_object_rdd(), k, maxIterations, runs, initializationMode)
_to_java_object_rdd(cached), k, maxIterations, runs, initializationMode)
bytes = sc._jvm.SerDe.dumps(model.clusterCenters())
centers = ser.loads(str(bytes))
return KMeansModel([c.toArray() for c in centers])
Expand Down
5 changes: 2 additions & 3 deletions python/pyspark/mllib/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
Python package for feature in MLlib.
"""
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer

from pyspark.mllib.linalg import _convert_to_vector
from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd

__all__ = ['Word2Vec', 'Word2VecModel']

Expand Down Expand Up @@ -176,7 +175,7 @@ def fit(self, data):
seed = self.seed

model = sc._jvm.PythonMLLibAPI().trainWord2Vec(
data._to_java_object_rdd(), vectorSize,
_to_java_object_rdd(data), vectorSize,
learningRate, numPartitions, numIterations, seed)
return Word2VecModel(sc, model)

Expand Down
13 changes: 13 additions & 0 deletions python/pyspark/mllib/linalg.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

import numpy as np

from pyspark.serializers import AutoBatchedSerializer, PickleSerializer

__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors']


Expand All @@ -50,6 +52,17 @@ def fast_pickle_array(ar):
_have_scipy = False


# this will call the MLlib version of pythonToJava()
def _to_java_object_rdd(rdd):
""" Return an JavaRDD of Object by unpickling
It will convert each Python object into Java object by Pyrolite, whenever the
RDD is serialized in batch or not.
"""
rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
return rdd.ctx._jvm.SerDe.pythonToJava(rdd._jrdd, True)


def _convert_to_vector(l):
if isinstance(l, Vector):
return l
Expand Down
2 changes: 1 addition & 1 deletion python/pyspark/mllib/random.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def serialize(f):
@wraps(f)
def func(sc, *a, **kw):
jrdd = f(sc, *a, **kw)
return RDD(sc._jvm.PythonRDD.javaToPython(jrdd), sc,
return RDD(sc._jvm.SerDe.javaToPython(jrdd), sc,
BatchedSerializer(PickleSerializer(), 1024))
return func

Expand Down
7 changes: 4 additions & 3 deletions python/pyspark/mllib/recommendation.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from pyspark import SparkContext
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.rdd import RDD
from pyspark.mllib.linalg import _to_java_object_rdd

__all__ = ['MatrixFactorizationModel', 'ALS']

Expand Down Expand Up @@ -77,9 +78,9 @@ def predictAll(self, user_product):
first = tuple(map(int, first))
assert all(type(x) is int for x in first), "user and product in user_product shoul be int"
sc = self._context
tuplerdd = sc._jvm.SerDe.asTupleRDD(user_product._to_java_object_rdd().rdd())
tuplerdd = sc._jvm.SerDe.asTupleRDD(_to_java_object_rdd(user_product).rdd())
jresult = self._java_model.predict(tuplerdd).toJavaRDD()
return RDD(sc._jvm.PythonRDD.javaToPython(jresult), sc,
return RDD(sc._jvm.SerDe.javaToPython(jresult), sc,
AutoBatchedSerializer(PickleSerializer()))


Expand All @@ -97,7 +98,7 @@ def _prepare(cls, ratings):
# serialize them by AutoBatchedSerializer before cache to reduce the
# objects overhead in JVM
cached = ratings._reserialize(AutoBatchedSerializer(PickleSerializer())).cache()
return cached._to_java_object_rdd()
return _to_java_object_rdd(cached)

@classmethod
def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/mllib/regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
from numpy import array

from pyspark import SparkContext
from pyspark.mllib.linalg import SparseVector, _convert_to_vector
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.mllib.linalg import SparseVector, _convert_to_vector, _to_java_object_rdd

__all__ = ['LabeledPoint', 'LinearModel', 'LinearRegressionModel', 'RidgeRegressionModel',
'LinearRegressionWithSGD', 'LassoWithSGD', 'RidgeRegressionWithSGD']
Expand Down Expand Up @@ -131,7 +131,7 @@ def _regression_train_wrapper(sc, train_func, modelClass, data, initial_weights)
# use AutoBatchedSerializer before cache to reduce the memory
# overhead in JVM
cached = data._reserialize(AutoBatchedSerializer(ser)).cache()
ans = train_func(cached._to_java_object_rdd(), initial_bytes)
ans = train_func(_to_java_object_rdd(cached), initial_bytes)
assert len(ans) == 2, "JVM call result had unexpected length"
weights = ser.loads(str(ans[0]))
return modelClass(weights, ans[1])
Expand Down
7 changes: 4 additions & 3 deletions python/pyspark/mllib/stat.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from functools import wraps

from pyspark import PickleSerializer
from pyspark.mllib.linalg import _to_java_object_rdd


__all__ = ['MultivariateStatisticalSummary', 'Statistics']
Expand Down Expand Up @@ -106,7 +107,7 @@ def colStats(rdd):
array([ 2., 0., 0., -2.])
"""
sc = rdd.ctx
jrdd = rdd._to_java_object_rdd()
jrdd = _to_java_object_rdd(rdd)
cStats = sc._jvm.PythonMLLibAPI().colStats(jrdd)
return MultivariateStatisticalSummary(sc, cStats)

Expand Down Expand Up @@ -162,14 +163,14 @@ def corr(x, y=None, method=None):
if type(y) == str:
raise TypeError("Use 'method=' to specify method name.")

jx = x._to_java_object_rdd()
jx = _to_java_object_rdd(x)
if not y:
resultMat = sc._jvm.PythonMLLibAPI().corr(jx, method)
bytes = sc._jvm.SerDe.dumps(resultMat)
ser = PickleSerializer()
return ser.loads(str(bytes)).toArray()
else:
jy = y._to_java_object_rdd()
jy = _to_java_object_rdd(y)
return sc._jvm.PythonMLLibAPI().corr(jx, jy, method)


Expand Down
8 changes: 4 additions & 4 deletions python/pyspark/mllib/tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from pyspark import SparkContext, RDD
from pyspark.serializers import BatchedSerializer, PickleSerializer
from pyspark.mllib.linalg import Vector, _convert_to_vector
from pyspark.mllib.linalg import Vector, _convert_to_vector, _to_java_object_rdd
from pyspark.mllib.regression import LabeledPoint

__all__ = ['DecisionTreeModel', 'DecisionTree']
Expand Down Expand Up @@ -61,8 +61,8 @@ def predict(self, x):
return self._sc.parallelize([])
if not isinstance(first[0], Vector):
x = x.map(_convert_to_vector)
jPred = self._java_model.predict(x._to_java_object_rdd()).toJavaRDD()
jpyrdd = self._sc._jvm.PythonRDD.javaToPython(jPred)
jPred = self._java_model.predict(_to_java_object_rdd(x)).toJavaRDD()
jpyrdd = self._sc._jvm.SerDe.javaToPython(jPred)
return RDD(jpyrdd, self._sc, BatchedSerializer(ser, 1024))

else:
Expand Down Expand Up @@ -104,7 +104,7 @@ def _train(data, type, numClasses, categoricalFeaturesInfo,
first = data.first()
assert isinstance(first, LabeledPoint), "the data should be RDD of LabeledPoint"
sc = data.context
jrdd = data._to_java_object_rdd()
jrdd = _to_java_object_rdd(data)
cfiMap = MapConverter().convert(categoricalFeaturesInfo,
sc._gateway._gateway_client)
model = sc._jvm.PythonMLLibAPI().trainDecisionTreeModel(
Expand Down
6 changes: 3 additions & 3 deletions python/pyspark/mllib/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import warnings

from pyspark.rdd import RDD
from pyspark.serializers import BatchedSerializer, PickleSerializer
from pyspark.serializers import AutoBatchedSerializer, PickleSerializer
from pyspark.mllib.linalg import Vectors, SparseVector, _convert_to_vector
from pyspark.mllib.regression import LabeledPoint

Expand Down Expand Up @@ -174,8 +174,8 @@ def loadLabeledPoints(sc, path, minPartitions=None):
"""
minPartitions = minPartitions or min(sc.defaultParallelism, 2)
jrdd = sc._jvm.PythonMLLibAPI().loadLabeledPoints(sc._jsc, path, minPartitions)
jpyrdd = sc._jvm.PythonRDD.javaToPython(jrdd)
return RDD(jpyrdd, sc, BatchedSerializer(PickleSerializer()))
jpyrdd = sc._jvm.SerDe.javaToPython(jrdd)
return RDD(jpyrdd, sc, AutoBatchedSerializer(PickleSerializer()))


def _test():
Expand Down

0 comments on commit 0f02050

Please sign in to comment.