Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19872][PYTHON] Use the correct deserializer for RDD construction for coalesce/repartition #17282

Closed
wants to merge 1 commit into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Mar 13, 2017

What changes were proposed in this pull request?

This PR proposes to use the correct deserializer, BatchedSerializer for RDD construction for coalesce/repartition when the shuffle is enabled. Currently, it is passing UTF8Deserializer as is not BatchedSerializer from the copied one.

with the file, text.txt below:

a
b

d
e
f
g
h
i
j
k
l

  • Before
>>> sc.textFile('text.txt').repartition(1).collect()
UTF8Deserializer(True)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File ".../spark/python/pyspark/rdd.py", line 811, in collect
    return list(_load_from_socket(port, self._jrdd_deserializer))
  File ".../spark/python/pyspark/serializers.py", line 549, in load_stream
    yield self.loads(stream)
  File ".../spark/python/pyspark/serializers.py", line 544, in loads
    return s.decode("utf-8") if self.use_unicode else s
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/encodings/utf_8.py", line 16, in decode
    return codecs.utf_8_decode(input, errors, True)
UnicodeDecodeError: 'utf8' codec can't decode byte 0x80 in position 0: invalid start byte
  • After
>>> sc.textFile('text.txt').repartition(1).collect()
[u'a', u'b', u'', u'd', u'e', u'f', u'g', u'h', u'i', u'j', u'k', u'l', u'']

How was this patch tested?

Unit test in python/pyspark/tests.py.

@HyukjinKwon
Copy link
Member Author

cc @davies, could you see if it makes sense?

@HyukjinKwon HyukjinKwon changed the title [SPARK-19872][PYTHON] Only reseralize BatchedSerializers when repartitioning [SPARK-19872][PYTHON] Only reseralize with BatchedSerializer when repartitioning for skewed partition handling Mar 13, 2017
@SparkQA
Copy link

SparkQA commented Mar 13, 2017

Test build #74462 has finished for PR 17282 at commit 30688db.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Mar 15, 2017

The root cause is coalesce uses wrong jrdd_deserializer when constructing new RDD.

The correct fixing looks like:

diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index a5e6e2b..291c1ca 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -2072,10 +2072,12 @@ class RDD(object):
             batchSize = min(10, self.ctx._batchSize or 1024)
             ser = BatchedSerializer(PickleSerializer(), batchSize)
             selfCopy = self._reserialize(ser)
+            jrdd_deserializer = selfCopy._jrdd_deserializer
             jrdd = selfCopy._jrdd.coalesce(numPartitions, shuffle)
         else:
+            jrdd_deserializer = self._jrdd_deserializer
             jrdd = self._jrdd.coalesce(numPartitions, shuffle)
-        return RDD(jrdd, self.ctx, self._jrdd_deserializer)
+        return RDD(jrdd, self.ctx, jrdd_deserializer)

@HyukjinKwon
Copy link
Member Author

@viirya you are right. I overlooked. Thanks for correcting this.

@HyukjinKwon HyukjinKwon changed the title [SPARK-19872][PYTHON] Only reseralize with BatchedSerializer when repartitioning for skewed partition handling [SPARK-19872][PYTHON] Use the correct deserializer for RDD construction for coalesce/repartition Mar 15, 2017
@SparkQA
Copy link

SparkQA commented Mar 15, 2017

Test build #74592 has finished for PR 17282 at commit 925cd2e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Mar 15, 2017

LGTM

@HyukjinKwon
Copy link
Member Author

@viirya, thank you for taking a look.

@davies
Copy link
Contributor

davies commented Mar 15, 2017

lgtm, merging into master, and 2.1 branch.

asfgit pushed a commit that referenced this pull request Mar 15, 2017
…ion for coalesce/repartition

## What changes were proposed in this pull request?

This PR proposes to use the correct deserializer, `BatchedSerializer` for RDD construction for coalesce/repartition when the shuffle is enabled. Currently, it is passing `UTF8Deserializer` as is not `BatchedSerializer` from the copied one.

with the file, `text.txt` below:

```
a
b

d
e
f
g
h
i
j
k
l

```

- Before

```python
>>> sc.textFile('text.txt').repartition(1).collect()
```

```
UTF8Deserializer(True)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File ".../spark/python/pyspark/rdd.py", line 811, in collect
    return list(_load_from_socket(port, self._jrdd_deserializer))
  File ".../spark/python/pyspark/serializers.py", line 549, in load_stream
    yield self.loads(stream)
  File ".../spark/python/pyspark/serializers.py", line 544, in loads
    return s.decode("utf-8") if self.use_unicode else s
  File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/encodings/utf_8.py", line 16, in decode
    return codecs.utf_8_decode(input, errors, True)
UnicodeDecodeError: 'utf8' codec can't decode byte 0x80 in position 0: invalid start byte
```

- After

```python
>>> sc.textFile('text.txt').repartition(1).collect()
```

```
[u'a', u'b', u'', u'd', u'e', u'f', u'g', u'h', u'i', u'j', u'k', u'l', u'']
```

## How was this patch tested?

Unit test in `python/pyspark/tests.py`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #17282 from HyukjinKwon/SPARK-19872.

(cherry picked from commit 7387126)
Signed-off-by: Davies Liu <davies.liu@gmail.com>
@asfgit asfgit closed this in 7387126 Mar 15, 2017
@HyukjinKwon
Copy link
Member Author

Thank you both @davies and @viirya

@HyukjinKwon HyukjinKwon deleted the SPARK-19872 branch January 2, 2018 03:43
@Majdouline-Meddad
Copy link

Majdouline-Meddad commented Feb 9, 2018

My code is : ## sc.binaryFiles('hdfs://localhost:9000/user/majdouline/Training').repartition(90).collect()

and i got this error :
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/spark/python/pyspark/worker.py", line 174, in main
process()
File "/usr/local/spark/python/pyspark/worker.py", line 169, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark/python/pyspark/serializers.py", line 268, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/usr/local/spark/python/pyspark/serializers.py", line 328, in _load_stream_without_unbatching
for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream):
File "/usr/local/spark/python/pyspark/serializers.py", line 529, in load_stream
yield self.loads(stream)
File "/usr/local/spark/python/pyspark/serializers.py", line 524, in loads
return s.decode("utf-8") if self.use_unicode else s
File "/grid/hadoop/yarn/local/usercache/rsrpsinr/appcache/application_1506405147397_0015/container_1506405147397_0015_01_000005/Python2SparkDl/lib/python2.7/encodings/utf_8.py", line 16, in decode
return codecs.utf_8_decode(input, errors, True)
UnicodeDecodeError: 'utf8' codec can't decode byte 0x80 in position 0: invalid start byte

I had change rdd.py and serializers (version 2.1.0 to 2.0.2), but i got the same error
Can you help me please to fixe that .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants