Skip to content

Commit

Permalink
[SPARK-3047] [PySpark] add an option to use str in textFileRDD
Browse files Browse the repository at this point in the history
str is much efficient than unicode (both CPU and memory), it'e better to use str in textFileRDD. In order to keep compatibility, use unicode by default. (Maybe change it in the future).

use_unicode=True:

daviesliudm:~/work/spark$ time python wc.py
(u'./universe/spark/sql/core/target/java/org/apache/spark/sql/execution/ExplainCommand$.java', 7776)

real	2m8.298s
user	0m0.185s
sys	0m0.064s

use_unicode=False

daviesliudm:~/work/spark$ time python wc.py
('./universe/spark/sql/core/target/java/org/apache/spark/sql/execution/ExplainCommand$.java', 7776)

real	1m26.402s
user	0m0.182s
sys	0m0.062s

We can see that it got 32% improvement!

Author: Davies Liu <davies.liu@gmail.com>

Closes #1951 from davies/unicode and squashes the following commits:

8352d57 [Davies Liu] update version number
a286f2f [Davies Liu] rollback loads()
85246e5 [Davies Liu] add docs for use_unicode
a0295e1 [Davies Liu] add an option to use str in textFile()
  • Loading branch information
davies authored and JoshRosen committed Sep 11, 2014
1 parent ed1980f commit 1ef656e
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
16 changes: 12 additions & 4 deletions python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,12 +331,16 @@ def pickleFile(self, name, minPartitions=None):
return RDD(self._jsc.objectFile(name, minPartitions), self,
BatchedSerializer(PickleSerializer()))

def textFile(self, name, minPartitions=None):
def textFile(self, name, minPartitions=None, use_unicode=True):
"""
Read a text file from HDFS, a local file system (available on all
nodes), or any Hadoop-supported file system URI, and return it as an
RDD of Strings.
If use_unicode is False, the strings will be kept as `str` (encoding
as `utf-8`), which is faster and smaller than unicode. (Added in
Spark 1.2)
>>> path = os.path.join(tempdir, "sample-text.txt")
>>> with open(path, "w") as testFile:
... testFile.write("Hello world!")
Expand All @@ -346,16 +350,20 @@ def textFile(self, name, minPartitions=None):
"""
minPartitions = minPartitions or min(self.defaultParallelism, 2)
return RDD(self._jsc.textFile(name, minPartitions), self,
UTF8Deserializer())
UTF8Deserializer(use_unicode))

def wholeTextFiles(self, path, minPartitions=None):
def wholeTextFiles(self, path, minPartitions=None, use_unicode=True):
"""
Read a directory of text files from HDFS, a local file system
(available on all nodes), or any Hadoop-supported file system
URI. Each file is read as a single record and returned in a
key-value pair, where the key is the path of each file, the
value is the content of each file.
If use_unicode is False, the strings will be kept as `str` (encoding
as `utf-8`), which is faster and smaller than unicode. (Added in
Spark 1.2)
For example, if you have the following files::
hdfs://a-hdfs-path/part-00000
Expand Down Expand Up @@ -386,7 +394,7 @@ def wholeTextFiles(self, path, minPartitions=None):
"""
minPartitions = minPartitions or self.defaultMinPartitions
return RDD(self._jsc.wholeTextFiles(path, minPartitions), self,
PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))
PairDeserializer(UTF8Deserializer(use_unicode), UTF8Deserializer(use_unicode)))

def _dictToJavaMap(self, d):
jm = self._jvm.java.util.HashMap()
Expand Down
18 changes: 11 additions & 7 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,18 +429,22 @@ class UTF8Deserializer(Serializer):
Deserializes streams written by String.getBytes.
"""

def __init__(self, use_unicode=False):
self.use_unicode = use_unicode

def loads(self, stream):
length = read_int(stream)
return stream.read(length).decode('utf8')
s = stream.read(length)
return s.decode("utf-8") if self.use_unicode else s

def load_stream(self, stream):
while True:
try:
try:
while True:
yield self.loads(stream)
except struct.error:
return
except EOFError:
return
except struct.error:
return
except EOFError:
return


def read_long(stream):
Expand Down

0 comments on commit 1ef656e

Please sign in to comment.