Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26658][PySpark] : Call pickle.dump with protocol version 3 for Python 3… #23577

Closed
wants to merge 1 commit into from

Conversation

pgandhi999
Copy link

@pgandhi999 pgandhi999 commented Jan 17, 2019

… to fix the serialization issue with large objects

When a pyspark job using python 3 tries to serialize large objects, it throws a pickle error in case of trying to serialize global variable object and overflow error in case of broadcast. Refer the corresponding JIRA https://issues.apache.org/jira/browse/SPARK-26658 for more details.

What changes were proposed in this pull request?

Fixed the issue by updating the pickle dump method in code to use protocol version 3 for python 3.

How was this patch tested?

Running manual tests before and after the fix.

Steps To Reproduce:

  • To reproduce the above issue, I am using the word2vec model trained on the Google News dataset downloaded from https://drive.google.com/file/d/0B7XkCwpI5KDYNlNUTTlSS21pQmM/edit?usp=sharing

  • Use python 3.x with module gensim installed(or ship the module zip file using --py-files).

  • Launch pyspark with the following command:
    bin/pyspark --master yarn --py-files additionalPythonModules.zip --conf spark.driver.memory=16g --conf spark.executor.memory=16g --conf spark.driver.memoryOverhead=16g --conf spark.executor.memoryOverhead=16g --conf spark.executor.pyspark.memory=16g

  • Run the following commands. For the sake of reproducing the issue, I have simply pasted certain parts of the code here:
    `SparkSession available as 'spark'.

import gensim
score_threshold = 0.65
synonym_limit = 3
model = gensim.models.KeyedVectors.load_word2vec_format('hdfs://home/pgandhi/GoogleNews-vectors-negative300.bin', binary=True)
def isPhrase(word):
... if word.find('_') != -1 :
... return 1
... return 0
...
def process_word(line):
... word = "test"
... positiveWords = []
... positiveWords.append(word)
... try :
... results = model.most_similar(positive=positiveWords)
... synonym_vec = []
... for i in range(len(results)) :
... result = results[i]
... if (result[1] > score_threshold ) :
... synonym = result[0]
... synonym = synonym.lower()
... if (isPhrase(synonym)==0) and (word != synonym) :
... synonym_vec.append(synonym)
... if len(synonym_vec) > synonym_limit :
... break
... if len(synonym_vec) > 0 :
... #print(word +"\t"+ ",".join(synonym_vec))
... return (word, ",".join(synonym_vec))
... except KeyError :
... sys.stderr.write("key error: " + word + "\n")
...
if name == "main":
... rdd = sc.parallelize(["test1", "test2", "test3"])
... rdd2 = rdd.map(process_word)
... rdd2.count()
...`

  • For reproducing the issue with broadcast, simply run the code below in pyspark shell:
    `SparkSession available as 'spark'.

import gensim
model = sc.broadcast(gensim.models.KeyedVectors.load_word2vec_format('hdfs://home/pgandhi/GoogleNews-vectors-negative300.bin', binary=True))`

… to fix the serialization issue with large objects

When a pyspark job using python 3 tries to serialize large objects, it throws a pickle error in case of trying to serialize global variable object and overflow error in case of broadcast.

Fixed this by updating the dump method in code to use protocol version 3 for python3.
@pgandhi999 pgandhi999 changed the title [SPARK-26658] : Call pickle.dump with protocol version 3 for Python 3… [SPARK-26658][PySpark] : Call pickle.dump with protocol version 3 for Python 3… Jan 17, 2019
@pgandhi999
Copy link
Author

ok to test.

@redsanket
Copy link

redsanket commented Jan 17, 2019

LGTM, might be nice to copy over how to reproduce section here just a thought so all at one place.

@pgandhi999
Copy link
Author

Makes sense @redsanket , thank you.

else:
import pickle
protocol = 3

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

guess we can use pickle.DEFAULT_PROTOCOL and pickle.DEFAULT_PROTOCOL - 1 just a thought not sure it matters

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same code exists in serializers.py, I have simply tried to maintain consistency.

@SparkQA
Copy link

SparkQA commented Jan 17, 2019

Test build #101381 has finished for PR 23577 at commit 3204525.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

It duplicates #20691

@HyukjinKwon
Copy link
Member

Closing this due to no feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants