Skip to content

Commit

Permalink
adopted the latest compression way of python command
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Sep 20, 2014
1 parent 58e41ff commit e80647e
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
import operator

from pyspark.serializers import NoOpSerializer,\
BatchedSerializer, CloudPickleSerializer, pack_long
BatchedSerializer, CloudPickleSerializer, pack_long,\
CompressedSerializer
from pyspark.rdd import _JavaStackTrace
from pyspark.storagelevel import StorageLevel
from pyspark.resultiterable import ResultIterable
Expand Down Expand Up @@ -463,7 +464,8 @@ def _jdstream(self):
serializer = self.ctx.serializer

command = (self.func, self._prev_jrdd_deserializer, serializer)
pickled_command = CloudPickleSerializer().dumps(command)
ser = CompressedSerializer(CloudPickleSerializer())
pickled_command = ser.dumps(command)
broadcast_vars = ListConverter().convert(
[x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
self.ctx._gateway._gateway_client)
Expand All @@ -472,12 +474,13 @@ def _jdstream(self):
env = MapConverter().convert(self.ctx.environment,
self.ctx._gateway._gateway_client)
includes = ListConverter().convert(self.ctx._python_includes,
self.ctx._gateway._gateway_client)
self.ctx._gateway._gateway_client)
python_dstream = self.ctx._jvm.PythonDStream(self._prev_jdstream.dstream(),
bytearray(pickled_command),
env, includes, self.preservesPartitioning,
self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator,
class_tag)
bytearray(pickled_command),
env, includes, self.preservesPartitioning,
self.ctx.pythonExec,
broadcast_vars, self.ctx._javaAccumulator,
class_tag)
self._jdstream_val = python_dstream.asJavaDStream()
return self._jdstream_val

Expand Down

0 comments on commit e80647e

Please sign in to comment.