Skip to content

Commit

Permalink
[SPARK-21094][PYTHON] Add popen_kwargs to launch_gateway
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Allow the caller to customize the py4j JVM subprocess pipes and buffers for programmatic capturing of its output.

https://issues.apache.org/jira/browse/SPARK-21094 has more detail about the use case.

## How was this patch tested?

Tested by running the pyspark unit tests locally.

Closes #18339 from parente/feature/SPARK-21094-popen-args.

Lead-authored-by: Peter Parente <parente@cs.unc.edu>
Co-authored-by: Peter Parente <peter.parente@maxpoint.com>
Signed-off-by: Holden Karau <holden@pigscanfly.ca>
  • Loading branch information
2 people authored and holdenk committed Feb 16, 2019
1 parent 28ced38 commit 3d6066e
Showing 1 changed file with 16 additions and 3 deletions.
19 changes: 16 additions & 3 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,21 @@
from pyspark.util import _exception_message


def launch_gateway(conf=None):
def launch_gateway(conf=None, popen_kwargs=None):
"""
launch jvm gateway
:param conf: spark configuration passed to spark-submit
:param popen_kwargs: Dictionary of kwargs to pass to Popen when spawning
the py4j JVM. This is a developer feature intended for use in
customizing how pyspark interacts with the py4j JVM (e.g., capturing
stdout/stderr).
:return:
"""
if "PYSPARK_GATEWAY_PORT" in os.environ:
gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"])
gateway_secret = os.environ["PYSPARK_GATEWAY_SECRET"]
# Process already exists
proc = None
else:
SPARK_HOME = _find_spark_home()
# Launch the Py4j gateway using Spark's run command so that we pick up the
Expand Down Expand Up @@ -75,15 +81,20 @@ def launch_gateway(conf=None):
env["_PYSPARK_DRIVER_CONN_INFO_PATH"] = conn_info_file

# Launch the Java gateway.
popen_kwargs = {} if popen_kwargs is None else popen_kwargs
# We open a pipe to stdin so that the Java gateway can die when the pipe is broken
popen_kwargs['stdin'] = PIPE
# We always set the necessary environment variables.
popen_kwargs['env'] = env
if not on_windows:
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_func():
signal.signal(signal.SIGINT, signal.SIG_IGN)
proc = Popen(command, stdin=PIPE, preexec_fn=preexec_func, env=env)
popen_kwargs['preexec_fn'] = preexec_func
proc = Popen(command, **popen_kwargs)
else:
# preexec_fn not supported on Windows
proc = Popen(command, stdin=PIPE, env=env)
proc = Popen(command, **popen_kwargs)

# Wait for the file to appear, or for the process to exit, whichever happens first.
while not proc.poll() and not os.path.isfile(conn_info_file):
Expand Down Expand Up @@ -118,6 +129,8 @@ def killChild():
gateway = JavaGateway(
gateway_parameters=GatewayParameters(port=gateway_port, auth_token=gateway_secret,
auto_convert=True))
# Store a reference to the Popen object for use by the caller (e.g., in reading stdout/stderr)
gateway.proc = proc

# Import the classes used by PySpark
java_import(gateway.jvm, "org.apache.spark.SparkConf")
Expand Down

0 comments on commit 3d6066e

Please sign in to comment.