From cdee210181c59abfab273fb17ed0b37318c0780f Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 5 Apr 2016 16:46:55 -0700 Subject: [PATCH 1/2] fix unpersist of Broadcast in Python --- python/pyspark/broadcast.py | 11 +++++++++++ python/pyspark/tests.py | 15 +++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py index 663c9abe0881e..36e6328f60017 100644 --- a/python/pyspark/broadcast.py +++ b/python/pyspark/broadcast.py @@ -104,6 +104,17 @@ def unpersist(self, blocking=False): if self._jbroadcast is None: raise Exception("Broadcast can only be unpersisted in driver") self._jbroadcast.unpersist(blocking) + + def destroy(self): + """ + Destroy all data and metadata related to this broadcast variable. + Use this with caution; once a broadcast variable has been destroyed, + it cannot be used again. This method blocks until destroy has + completed. + """ + if self._jbroadcast is None: + raise Exception("Broadcast can only be destroyed in driver") + self._jbroadcast.destroy() os.unlink(self._path) def __reduce__(self): diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 40fccb8c0090c..15c87e22f98b0 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -694,6 +694,21 @@ def test_large_broadcast(self): m = self.sc.parallelize(range(1), 1).map(lambda x: len(bdata.value)).sum() self.assertEqual(N, m) + def test_unpersist(self): + N = 1000 + data = [[float(i) for i in range(300)] for i in range(N)] + bdata = self.sc.broadcast(data) # 3MB + bdata.unpersist() + m = self.sc.parallelize(range(1), 1).map(lambda x: len(bdata.value)).sum() + self.assertEqual(N, m) + bdata.destroy() + try: + self.sc.parallelize(range(1), 1).map(lambda x: len(bdata.value)).sum() + except Exception as e: + pass + else: + raise Exception("job should fail after destroy the broadcast") + def test_multiple_broadcasts(self): N = 1 << 21 b1 = self.sc.broadcast(set(range(N))) # multiple blocks in JVM From e0408f3d026ea59e4337fafabda16e178b3a7317 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 6 Apr 2016 10:11:12 -0700 Subject: [PATCH 2/2] update docstring --- python/pyspark/broadcast.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py index 36e6328f60017..a0b819220e6d3 100644 --- a/python/pyspark/broadcast.py +++ b/python/pyspark/broadcast.py @@ -99,7 +99,11 @@ def value(self): def unpersist(self, blocking=False): """ - Delete cached copies of this broadcast on the executors. + Delete cached copies of this broadcast on the executors. If the + broadcast is used after this is called, it will need to be + re-sent to each executor. + + :param blocking: Whether to block until unpersisting has completed """ if self._jbroadcast is None: raise Exception("Broadcast can only be unpersisted in driver")