diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 4001ecab5ea00..833a85469e377 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -314,12 +314,16 @@ def pickleFile(self, name, minPartitions=None): return RDD(self._jsc.objectFile(name, minPartitions), self, BatchedSerializer(PickleSerializer())) - def textFile(self, name, minPartitions=None): + def textFile(self, name, minPartitions=None, use_unicode=True): """ Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings. + If use_unicode is False, the strings will be kept as `str` (encoding + as `utf-8`), which is faster and smaller than unicode. (Added in + Spark 1.2) + >>> path = os.path.join(tempdir, "sample-text.txt") >>> with open(path, "w") as testFile: ... testFile.write("Hello world!") @@ -329,9 +333,9 @@ def textFile(self, name, minPartitions=None): """ minPartitions = minPartitions or min(self.defaultParallelism, 2) return RDD(self._jsc.textFile(name, minPartitions), self, - UTF8Deserializer()) + UTF8Deserializer(use_unicode)) - def wholeTextFiles(self, path, minPartitions=None): + def wholeTextFiles(self, path, minPartitions=None, use_unicode=True): """ Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system @@ -339,6 +343,10 @@ def wholeTextFiles(self, path, minPartitions=None): key-value pair, where the key is the path of each file, the value is the content of each file. + If use_unicode is False, the strings will be kept as `str` (encoding + as `utf-8`), which is faster and smaller than unicode. (Added in + Spark 1.2) + For example, if you have the following files:: hdfs://a-hdfs-path/part-00000 @@ -369,7 +377,7 @@ def wholeTextFiles(self, path, minPartitions=None): """ minPartitions = minPartitions or self.defaultMinPartitions return RDD(self._jsc.wholeTextFiles(path, minPartitions), self, - PairDeserializer(UTF8Deserializer(), UTF8Deserializer())) + PairDeserializer(UTF8Deserializer(use_unicode), UTF8Deserializer(use_unicode))) def _dictToJavaMap(self, d): jm = self._jvm.java.util.HashMap() diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index df90cafb245bf..e99fb926b5212 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -409,18 +409,22 @@ class UTF8Deserializer(Serializer): Deserializes streams written by String.getBytes. """ + def __init__(self, use_unicode=False): + self.use_unicode = use_unicode + def loads(self, stream): length = read_int(stream) - return stream.read(length).decode('utf8') + s = stream.read(length) + return s.decode("utf-8") if self.use_unicode else s def load_stream(self, stream): - while True: - try: + try: + while True: yield self.loads(stream) - except struct.error: - return - except EOFError: - return + except struct.error: + return + except EOFError: + return def read_long(stream):