From ac0270aa60a3f98abfc534be49c759fbeb6c901d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 25 May 2018 17:24:23 -0700 Subject: [PATCH 1/7] Added --- .../scala/org/apache/spark/api/python/PythonRunner.scala | 7 +++++++ python/pyspark/taskcontext.py | 7 +++++++ python/pyspark/tests.py | 8 ++++++++ python/pyspark/worker.py | 6 ++++++ 4 files changed, 28 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index f075a7e0eb0b4..41eac10d9b267 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -183,6 +183,13 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( dataOut.writeInt(context.partitionId()) dataOut.writeInt(context.attemptNumber()) dataOut.writeLong(context.taskAttemptId()) + val localProps = context.asInstanceOf[TaskContextImpl].getLocalProperties.asScala + dataOut.writeInt(localProps.size) + localProps.foreach { case (k, v) => + PythonRDD.writeUTF(k, dataOut) + PythonRDD.writeUTF(v, dataOut) + } + // sparkFilesDir PythonRDD.writeUTF(SparkFiles.getRootDirectory(), dataOut) // Python includes (*.zip and *.egg files) diff --git a/python/pyspark/taskcontext.py b/python/pyspark/taskcontext.py index e5218d9e75e78..5ac36ebba853b 100644 --- a/python/pyspark/taskcontext.py +++ b/python/pyspark/taskcontext.py @@ -34,6 +34,7 @@ class TaskContext(object): _partitionId = None _stageId = None _taskAttemptId = None + _localProperties = None def __new__(cls): """Even if users construct TaskContext instead of using get, give them the singleton.""" @@ -88,3 +89,9 @@ def taskAttemptId(self): TaskAttemptID. """ return self._taskAttemptId + + def getLocalProperty(self, key): + """ + Get a local property set upstream in the driver, or None if it is missing. + """ + return self._localProperties[key] diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 498d6b57e4353..6fd04978a1a58 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -543,6 +543,14 @@ def test_tc_on_driver(self): tc = TaskContext.get() self.assertTrue(tc is None) + def test_get_local_property(self): + """Verify that local properties set on the driver are available in TaskContext.""" + self.sc.setLocalProperty("testkey", "testvalue") + rdd = self.sc.parallelize(range(1), 1) + prop1 = rdd1.map(lambda x: TaskContext.get().getLocalProperty("testkey")).collect()[0] + self.assertEqual(prop1, "testkey") + prop2 = rdd1.map(lambda x: TaskContext.get().getLocalProperty("otherkey")).collect()[0] + self.assertTrue(prop2 is None) class RDDTests(ReusedPySparkTestCase): diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 5d2e58bef6466..fbcb8af8bfb24 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -222,6 +222,12 @@ def main(infile, outfile): taskContext._partitionId = read_int(infile) taskContext._attemptNumber = read_int(infile) taskContext._taskAttemptId = read_long(infile) + taskContext._localProperties = dict() + for i in range(read_int(infile)): + k = utf8_deserializer.loads(infile) + v = utf8_deserializer.loads(infile) + taskContext._localProperties[k] = v + shuffle.MemoryBytesSpilled = 0 shuffle.DiskBytesSpilled = 0 _accumulatorRegistry.clear() From 52610e3a9e5868549da4910e36aec05714037922 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 25 May 2018 17:27:10 -0700 Subject: [PATCH 2/7] Fixed test --- python/pyspark/tests.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 6fd04978a1a58..2b55f4e3a8dbf 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -547,9 +547,9 @@ def test_get_local_property(self): """Verify that local properties set on the driver are available in TaskContext.""" self.sc.setLocalProperty("testkey", "testvalue") rdd = self.sc.parallelize(range(1), 1) - prop1 = rdd1.map(lambda x: TaskContext.get().getLocalProperty("testkey")).collect()[0] + prop1 = rdd.map(lambda x: TaskContext.get().getLocalProperty("testkey")).collect()[0] self.assertEqual(prop1, "testkey") - prop2 = rdd1.map(lambda x: TaskContext.get().getLocalProperty("otherkey")).collect()[0] + prop2 = rdd.map(lambda x: TaskContext.get().getLocalProperty("otherkey")).collect()[0] self.assertTrue(prop2 is None) class RDDTests(ReusedPySparkTestCase): From b9d8dd304ed3d172a2e44919103e9500893fc829 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 28 May 2018 15:54:46 -0700 Subject: [PATCH 3/7] Fixed lint --- python/pyspark/tests.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 2b55f4e3a8dbf..210cdd182bad7 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -552,6 +552,7 @@ def test_get_local_property(self): prop2 = rdd.map(lambda x: TaskContext.get().getLocalProperty("otherkey")).collect()[0] self.assertTrue(prop2 is None) + class RDDTests(ReusedPySparkTestCase): def test_range(self): From e197460cf56c42d164da5955eaa62d4f80cf033e Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 29 May 2018 12:41:34 -0700 Subject: [PATCH 4/7] fixed --- python/pyspark/tests.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 210cdd182bad7..3387673828901 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -545,13 +545,15 @@ def test_tc_on_driver(self): def test_get_local_property(self): """Verify that local properties set on the driver are available in TaskContext.""" - self.sc.setLocalProperty("testkey", "testvalue") - rdd = self.sc.parallelize(range(1), 1) - prop1 = rdd.map(lambda x: TaskContext.get().getLocalProperty("testkey")).collect()[0] - self.assertEqual(prop1, "testkey") - prop2 = rdd.map(lambda x: TaskContext.get().getLocalProperty("otherkey")).collect()[0] - self.assertTrue(prop2 is None) - + try: + self.sc.setLocalProperty("testkey", "testvalue") + rdd = self.sc.parallelize(range(1), 1) + prop1 = rdd.map(lambda x: TaskContext.get().getLocalProperty("testkey")).collect()[0] + self.assertEqual(prop1, "testvalue") + prop2 = rdd.map(lambda x: TaskContext.get().getLocalProperty("otherkey")).collect()[0] + self.assertTrue(prop2 is None) + finally: + self.sc.setLocalProperty("testkey", None) class RDDTests(ReusedPySparkTestCase): From 9d95c12a0ada0520f426723406a7d99aada2760d Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 29 May 2018 13:46:45 -0700 Subject: [PATCH 5/7] fixed style --- python/pyspark/tests.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 3387673828901..df3c3064f187b 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -555,6 +555,7 @@ def test_get_local_property(self): finally: self.sc.setLocalProperty("testkey", None) + class RDDTests(ReusedPySparkTestCase): def test_range(self): From 2ea9cbc80787f1417fa4502c3c2b9b89f46d0632 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 29 May 2018 15:42:33 -0700 Subject: [PATCH 6/7] Addressed comments --- python/pyspark/taskcontext.py | 2 +- python/pyspark/tests.py | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/python/pyspark/taskcontext.py b/python/pyspark/taskcontext.py index 5ac36ebba853b..282db010ea75d 100644 --- a/python/pyspark/taskcontext.py +++ b/python/pyspark/taskcontext.py @@ -94,4 +94,4 @@ def getLocalProperty(self, key): """ Get a local property set upstream in the driver, or None if it is missing. """ - return self._localProperties[key] + return self._localProperties.get(key, default=None) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index df3c3064f187b..baeb5ccabb58f 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -545,15 +545,17 @@ def test_tc_on_driver(self): def test_get_local_property(self): """Verify that local properties set on the driver are available in TaskContext.""" + key = "testkey" + value = "testvalue" + self.sc.setLocalProperty(key, value) try: - self.sc.setLocalProperty("testkey", "testvalue") rdd = self.sc.parallelize(range(1), 1) - prop1 = rdd.map(lambda x: TaskContext.get().getLocalProperty("testkey")).collect()[0] - self.assertEqual(prop1, "testvalue") + prop1 = rdd.map(lambda x: TaskContext.get().getLocalProperty(key)).collect()[0] + self.assertEqual(prop1, value) prop2 = rdd.map(lambda x: TaskContext.get().getLocalProperty("otherkey")).collect()[0] self.assertTrue(prop2 is None) finally: - self.sc.setLocalProperty("testkey", None) + self.sc.setLocalProperty(key, None) class RDDTests(ReusedPySparkTestCase): From 6fef2f1808c559dfcf73a7678e655ba4a9aff62a Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 29 May 2018 23:28:00 -0700 Subject: [PATCH 7/7] Removed keyword --- python/pyspark/taskcontext.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/taskcontext.py b/python/pyspark/taskcontext.py index 282db010ea75d..63ae1f30e17ca 100644 --- a/python/pyspark/taskcontext.py +++ b/python/pyspark/taskcontext.py @@ -94,4 +94,4 @@ def getLocalProperty(self, key): """ Get a local property set upstream in the driver, or None if it is missing. """ - return self._localProperties.get(key, default=None) + return self._localProperties.get(key, None)