Skip to content

Conversation

@ConeyLiu
Copy link
Contributor

@ConeyLiu ConeyLiu commented Oct 24, 2019

What changes were proposed in this pull request?

Add support of TaskContext.get() in a barrier task from Python side, this makes it easier to migrate legacy user code to barrier execution mode.

Why are the changes needed?

In Spark Core, there is a TaskContext object which is a singleton. We set a task context instance which can be TaskContext or BarrierTaskContext before the task function startup, and unset it to none after the function end. So we can both get TaskContext and BarrierTaskContext with the object. However we can only get the BarrierTaskContext with BarrierTaskContext, we will get None if we get it by TaskContext.get in a barrier stage.

This is useful when people switch from normal code to barrier code, and only need a little update.

Does this PR introduce any user-facing change?

Yes.
Previously:

def func(iterator):
    task_context = TaskContext.get() . # this could be None.
    barrier_task_context = BarrierTaskContext.get() # get the BarrierTaskContext instance
    ...

rdd.barrier().mapPartitions(func)

Proposed:

def func(iterator):
    task_context = TaskContext.get() . # this could also get the BarrierTaskContext instance which is same as barrier_task_context
    barrier_task_context = BarrierTaskContext.get() # get the BarrierTaskContext instance
    ...

rdd.barrier().mapPartitions(func)

How was this patch tested?

New UT tests.

@ConeyLiu
Copy link
Contributor Author

Hi, @cloud-fan @HyukjinKwon, could you help to review this? Thanks a lot.

@jiangxb1987
Copy link
Contributor

This is useful when people switch from normal code to barrier code

Do you have any use cases that people want to reuse their production code and migrate on to barrier execution mode ?

@ConeyLiu
Copy link
Contributor Author

ConeyLiu commented Oct 24, 2019 via email

@BryanCutler
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Oct 24, 2019

Test build #112617 has finished for PR 26239 at commit c8fe7cc.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 25, 2019

Test build #112638 has finished for PR 26239 at commit e3e5872.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 25, 2019

Test build #112656 has finished for PR 26239 at commit a55fb6b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


# reset task context to None
TaskContext._setTaskContext(None)
BarrierTaskContext._setTaskContext(None)
Copy link
Member

@HyukjinKwon HyukjinKwon Oct 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, what happens if it fails with exceptions in the middle of execution in this worker?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it really needed? We always set the global TaskContext and never reset it previouslly.

Copy link
Contributor Author

@ConeyLiu ConeyLiu Oct 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, what happens if it fails with exceptions in the middle of execution in this worker?

If exceptions occured, the worker will be closed with sys.exit(-1).

Is it really needed? We always set the global TaskContext and never reset it previouslly.

Previously:

val rdd = ...
val barriered = rdd.barrier().mapPartitions(...)

barriered.mapPartitions(...)  # here the BarrierTaskContext still existed.

This code is just a guard program, it shouldn't increase extra overhead or behavior change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is just a guard program, it shouldn't increase extra overhead or behavior change.

I guess that's only when the worker is reused. Can you clarify it with comments here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for reviewing. updated.

@HyukjinKwon
Copy link
Member

Looks making sense to me otherwise

@SparkQA
Copy link

SparkQA commented Oct 29, 2019

Test build #112818 has finished for PR 26239 at commit dc89212.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987 jiangxb1987 changed the title [SPARK-29582][PYSPARK] Unify the behavior of pyspark.TaskContext with spark core [SPARK-29582][PYSPARK] Support TaskContext.get() in a barrier task from Python side Oct 29, 2019
@jiangxb1987
Copy link
Contributor

Please also update the previous and proposed behaviors in the PR description, like:

Previously:
rdd.barrier().mapPartitions {.....}
>> ......
Proposed:
rdd.barrier().mapPartitions {.....}
>> ......

A RuntimeError will raise if it is not in a barrier stage.
"""
if not isinstance(cls._taskContext, BarrierTaskContext):
raise RuntimeError('''It is not in a barrier stage''')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not supported behavior, so please raise Exception instead of RuntimeError

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think Exception is consistent at least.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for reviewing. updated

@SparkQA
Copy link

SparkQA commented Oct 30, 2019

Test build #112925 has finished for PR 26239 at commit 04a2b0e.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 30, 2019

Test build #112927 has finished for PR 26239 at commit 384e1d2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor

LGTM

running tasks.

.. note:: Must be called on the worker, not the driver. Returns None if not initialized.
A RuntimeError will raise if it is not in a barrier stage.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
A RuntimeError will raise if it is not in a barrier stage.
An exception will raise if it is not in a barrier stage.

@SparkQA
Copy link

SparkQA commented Oct 31, 2019

Test build #112986 has finished for PR 26239 at commit 7d0f0d2.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 31, 2019

Test build #112989 has finished for PR 26239 at commit a3d14c8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master.

@ConeyLiu
Copy link
Contributor Author

thanks all !

@ConeyLiu ConeyLiu deleted the barrier_task_context branch October 31, 2019 04:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants