Skip to content

Commit

Permalink
[SPARK-19872] [PYTHON] Use the correct deserializer for RDD construct…
Browse files Browse the repository at this point in the history
…ion for coalesce/repartition

## What changes were proposed in this pull request?

This PR proposes to use the correct deserializer, `BatchedSerializer` for RDD construction for coalesce/repartition when the shuffle is enabled. Currently, it is passing `UTF8Deserializer` as is not `BatchedSerializer` from the copied one.

with the file, `text.txt` below:

```
a
b

d
e
f
g
h
i
j
k
l

```

- Before

```python
>>> sc.textFile('text.txt').repartition(1).collect()
```

```
UTF8Deserializer(True)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File ".../spark/python/pyspark/rdd.py", line 811, in collect
    return list(_load_from_socket(port, self._jrdd_deserializer))
  File ".../spark/python/pyspark/serializers.py", line 549, in load_stream
    yield self.loads(stream)
  File ".../spark/python/pyspark/serializers.py", line 544, in loads
    return s.decode("utf-8") if self.use_unicode else s
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/encodings/utf_8.py", line 16, in decode
    return codecs.utf_8_decode(input, errors, True)
UnicodeDecodeError: 'utf8' codec can't decode byte 0x80 in position 0: invalid start byte
```

- After

```python
>>> sc.textFile('text.txt').repartition(1).collect()
```

```
[u'a', u'b', u'', u'd', u'e', u'f', u'g', u'h', u'i', u'j', u'k', u'l', u'']
```

## How was this patch tested?

Unit test in `python/pyspark/tests.py`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17282 from HyukjinKwon/SPARK-19872.
  • Loading branch information
HyukjinKwon authored and davies committed Mar 15, 2017
1 parent 9ff85be commit 7387126
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 1 deletion.
4 changes: 3 additions & 1 deletion python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -2072,10 +2072,12 @@ def coalesce(self, numPartitions, shuffle=False):
batchSize = min(10, self.ctx._batchSize or 1024)
ser = BatchedSerializer(PickleSerializer(), batchSize)
selfCopy = self._reserialize(ser)
jrdd_deserializer = selfCopy._jrdd_deserializer
jrdd = selfCopy._jrdd.coalesce(numPartitions, shuffle)
else:
jrdd_deserializer = self._jrdd_deserializer
jrdd = self._jrdd.coalesce(numPartitions, shuffle)
return RDD(jrdd, self.ctx, self._jrdd_deserializer)
return RDD(jrdd, self.ctx, jrdd_deserializer)

def zip(self, other):
"""
Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,12 @@ def test_repartition_no_skewed(self):
zeros = len([x for x in l if x == 0])
self.assertTrue(zeros == 0)

def test_repartition_on_textfile(self):
path = os.path.join(SPARK_HOME, "python/test_support/hello/hello.txt")
rdd = self.sc.textFile(path)
result = rdd.repartition(1).collect()
self.assertEqual(u"Hello World!", result[0])

def test_distinct(self):
rdd = self.sc.parallelize((1, 2, 3)*10, 10)
self.assertEqual(rdd.getNumPartitions(), 10)
Expand Down

0 comments on commit 7387126

Please sign in to comment.