Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support @task.pyspark_submit #40633

Open
wants to merge 12 commits into
base: main
Choose a base branch
from

Conversation

softyoungha
Copy link
Contributor

@softyoungha softyoungha commented Jul 6, 2024

closes: #40566

Since this decorator will execute spark job by python_callable, I named it @task.pyspark_submit.


^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

@softyoungha softyoungha changed the title add pyspark_submit decorator Support @task.pyspark_submit Jul 6, 2024
Copy link
Contributor Author

@softyoungha softyoungha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will publish PR after adding the tests.

Comment on lines +734 to +780
conf: dict[str, Any] | None = None,
conn_id: str = "spark_default",
files: str | None = None,
py_files: str | None = None,
archives: str | None = None,
driver_class_path: str | None = None,
jars: str | None = None,
java_class: str | None = None,
packages: str | None = None,
exclude_packages: str | None = None,
repositories: str | None = None,
total_executor_cores: int | None = None,
executor_cores: int | None = None,
executor_memory: str | None = None,
driver_memory: str | None = None,
keytab: str | None = None,
principal: str | None = None,
proxy_user: str | None = None,
name: str = "arrow-spark",
num_executors: int | None = None,
status_poll_interval: int = 1,
env_vars: dict[str, Any] | None = None,
verbose: bool = False,
spark_binary: str | None = None,
properties_file: str | None = None,
yarn_queue: str | None = None,
deploy_mode: str | None = None,
use_krb5ccache: bool = False,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Retrieve all arguments of SparkSubmitOperator except for application and application_args.

:param application: The application that submitted as a job, either jar or py file. (templated)
:param conf: Arbitrary Spark configuration properties (templated)
:param conn_id: The :ref:`spark connection id <howto/connection:spark-submit>` as configured
in Airflow administration. When an invalid connection_id is supplied, it will default to yarn.
:param files: Upload additional files to the executor running the job, separated by a
comma. Files will be placed in the working directory of each executor.
For example, serialized objects. (templated)
:param py_files: Additional python files used by the job, can be .zip, .egg or .py. (templated)
:param jars: Submit additional jars to upload and place them in executor classpath. (templated)
:param driver_class_path: Additional, driver-specific, classpath settings. (templated)
:param java_class: the main class of the Java application
:param packages: Comma-separated list of maven coordinates of jars to include on the
driver and executor classpaths. (templated)
:param exclude_packages: Comma-separated list of maven coordinates of jars to exclude
while resolving the dependencies provided in 'packages' (templated)
:param repositories: Comma-separated list of additional remote repositories to search
for the maven coordinates given with 'packages'
:param total_executor_cores: (Standalone & Mesos only) Total cores for all executors
(Default: all the available cores on the worker)
:param executor_cores: (Standalone & YARN only) Number of cores per executor (Default: 2)
:param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
:param driver_memory: Memory allocated to the driver (e.g. 1000M, 2G) (Default: 1G)
:param keytab: Full path to the file that contains the keytab (templated)
:param principal: The name of the kerberos principal used for keytab (templated)
:param proxy_user: User to impersonate when submitting the application (templated)
:param name: Name of the job (default airflow-spark). (templated)
:param num_executors: Number of executors to launch
:param status_poll_interval: Seconds to wait between polls of driver status in cluster
mode (Default: 1)
:param application_args: Arguments for the application being submitted (templated)
:param env_vars: Environment variables for spark-submit. It supports yarn and k8s mode too. (templated)
:param verbose: Whether to pass the verbose flag to spark-submit process for debugging
:param spark_binary: The command to use for spark submit.
Some distros may use spark2-submit or spark3-submit.
(will overwrite any spark_binary defined in the connection's extra JSON)
:param properties_file: Path to a file from which to load extra properties. If not
specified, this will look for conf/spark-defaults.conf.
:param yarn_queue: The name of the YARN queue to which the application is submitted.
(will overwrite any yarn queue defined in the connection's extra JSON)
:param deploy_mode: Whether to deploy your driver on the worker nodes (cluster) or locally as a client.
(will overwrite any deployment mode defined in the connection's extra JSON)
:param use_krb5ccache: if True, configure spark to use ticket cache instead of relying
on keytab for Kerberos login

Comment on lines +83 to +104
if kwargs.get("application"):
if not conf.getboolean("operators", "ALLOW_ILLEGAL_ARGUMENTS"):
raise AirflowException(
"Invalid argument 'application' were passed to `@task.pyspark_submit`."
)
warnings.warn(
"Invalid argument 'application' were passed to @task.pyspark_submit.",
UserWarning,
stacklevel=2,
)
if kwargs.get("application_args"):
if not conf.getboolean("operators", "ALLOW_ILLEGAL_ARGUMENTS"):
raise AirflowException(
"Invalid argument 'application_args' were passed to `@task.pyspark_submit`."
)
warnings.warn(
"Invalid argument 'application_args' were passed to `@task.pyspark_submit`.",
UserWarning,
stacklevel=2,
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to include it in the kwargs-related exceptions of BaseOperator,
but I haven't yet found a way to exclude it from the reserved kwargs of the SparkSubmitOperator.
Any ideas?

kwargs.pop("_airflow_mapped_validation_only", None)
if kwargs:
if not conf.getboolean("operators", "ALLOW_ILLEGAL_ARGUMENTS"):
raise AirflowException(
f"Invalid arguments were passed to {self.__class__.__name__} (task_id: {task_id}). "
f"Invalid arguments were:\n**kwargs: {kwargs}",
)
warnings.warn(
f"Invalid arguments were passed to {self.__class__.__name__} (task_id: {task_id}). "
"Support for passing such arguments will be dropped in future. "
f"Invalid arguments were:\n**kwargs: {kwargs}",
category=RemovedInAirflow3Warning,
stacklevel=3,
)

Comment on lines 127 to 129
with TemporaryDirectory() as tmp_dir:
script_filename = os.path.join(tmp_dir, "script.py")
input_filename = os.path.join(tmp_dir, "SCRIPT__GENERATED__AIRFLOW.IN")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pattern follows _DockerDecoratedOperator.

def execute(self, context: Context):
with TemporaryDirectory(prefix="venv") as tmp_dir:
input_filename = os.path.join(tmp_dir, "script.in")
script_filename = os.path.join(tmp_dir, "script.py")
with open(input_filename, "wb") as file:
if self.op_args or self.op_kwargs:
self.pickling_library.dump({"args": self.op_args, "kwargs": self.op_kwargs}, file)
py_source = self.get_python_source()
write_python_script(
jinja_context={
"op_args": self.op_args,
"op_kwargs": self.op_kwargs,
"pickling_library": self.pickling_library.__name__,
"python_callable": self.python_callable.__name__,
"python_callable_source": py_source,
"expect_airflow": self.expect_airflow,
"string_args_global": False,
},
filename=script_filename,
)

Comment on lines 131 to 172
if self.op_args or self.op_kwargs:
with open(input_filename, "wb") as file:
self.pickling_library.dump({"args": self.op_args, "kwargs": self.op_kwargs}, file)
files = (self.files or "").split(",")
self.files = ",".join(files + [input_filename])

parameters = inspect.signature(self.python_callable).parameters
use_spark_context = use_spark_session = False
if "sc" in parameters:
use_spark_context = True
if "spark" in parameters:
use_spark_session = True

py_source = dedent(
f"""\
from pyspark import SparkFiles
from pyspark.sql import SparkSession

# Script
{{python_callable_source}}

if {bool(self.op_args or self.op_kwargs)}:
with open(SparkFiles.get("{os.path.basename(input_filename)}"), 'rb') as file:
arg_dict = {self.pickling_library.__name__}.load(file)
else:
arg_dict = {{default_arg_dict}}

if {use_spark_session}:
arg_dict["kwargs"]["spark"] = SparkSession.builder.getOrCreate()
if {use_spark_context}:
spark = arg_dict.get("spark") or SparkSession.builder.getOrCreate()
arg_dict["kwargs"]["sc"] = spark.sparkContext

# Call
{self.python_callable.__name__}(*arg_dict["args"], **arg_dict["kwargs"])

# Exit
exit(0)
"""
).format(
python_callable_source=self.get_python_source(), default_arg_dict='{"args": [], "kwargs": {}}'
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To enable reading input file from spark job submitted with spark-submit, it needs to be passed using --files option of spark-submit.
However, the actual path of files passed via --files option is determined at runtime (under /tmp somewhere).

In current write_python_script function, which uses Jinja2 template file, sys.argv[2] is hardcoded. This makes it seemingly impossible to pass op_args and op_kwargs as Jinja2 context.

Therefore, I crafted the execution statement directly and forced termination with exit(0).

{% if op_args or op_kwargs %}
with open(sys.argv[1], "rb") as file:
arg_dict = {{ pickling_library }}.load(file)
{% else %}
arg_dict = {"args": [], "kwargs": {}}
{% endif %}
{% if string_args_global | default(true) -%}
# Read string args
with open(sys.argv[3], "r") as file:
virtualenv_string_args = list(map(lambda x: x.strip(), list(file)))
{% endif %}
try:
res = {{ python_callable }}(*arg_dict["args"], **arg_dict["kwargs"])
except Exception as e:
with open(sys.argv[4], "w") as file:
file.write(str(e))
raise

Copy link
Contributor Author

@softyoungha softyoungha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered various ways to apply XCom with TaskFlow like @task.pyspark, but It's not easy to assume that the Airflow package will be installed on an external cluster like Yarn.
I think SparkSubmitOperator was not designed with XCom, like comment in stackoverflow

So, I'm not sure if this feature aligns with the direction Airflow is aiming for, making it difficult to determine if merging this PR is appropriate.

here is the task execution code

Test code
from airflow.decorators import task
from pyspark import SparkContext
from airflow.models import Connection
from airflow.utils import db

db.merge_conn(
    Connection(
        conn_id="spark_local",
        conn_type="spark",
        host="local",
        extra="",
    )
)


@task.pyspark_submit(
    conn_id="spark_local",
    conf={
        "spark.app.name": "MySparkApplication",
    },
    executor_memory="2g",
)
def sample_task(a: int, b: str, sc: "SparkContext", spark: "SparkSession"):
    print("spark:", spark)
    print("sc:", sc)
    print(a, b)


t = sample_task("this is Test", 12345)
t.operator.execute({})

Should this code also be added to the test cases?
The current test codes does not include actual run for executing spark-submit.

Output logs
[2024-07-08T04:03:22.817+0900] {base.py:84} INFO - Using connection ID 'spark_local' for task execution.
[2024-07-08T04:03:22.817+0900] {spark_submit.py:402} INFO - Spark-Submit cmd: spark-submit --master local --conf spark.app.name=MySparkApplication --files /tmp/tmpm1um6ti9/SCRIPT__GENERATED__AIRFLOW.IN --executor-memory 2g --name arrow-spark /tmp/tmpm1um6ti9/script.py
[2024-07-08T04:03:24.404+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:24 WARN Utils: Your hostname, {HOSTNAME} resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
[2024-07-08T04:03:24.406+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
[2024-07-08T04:03:24.736+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[2024-07-08T04:03:25.280+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO SparkContext: Running Spark version 3.5.1
[2024-07-08T04:03:25.281+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO SparkContext: OS info Linux, 5.15.153.1-microsoft-standard-WSL2, amd64
[2024-07-08T04:03:25.281+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO SparkContext: Java version 1.8.0_402
[2024-07-08T04:03:25.300+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO ResourceUtils: ==============================================================
[2024-07-08T04:03:25.301+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO ResourceUtils: No custom resources configured for spark.driver.
[2024-07-08T04:03:25.301+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO ResourceUtils: ==============================================================
[2024-07-08T04:03:25.301+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO SparkContext: Submitted application: arrow-spark
[2024-07-08T04:03:25.318+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO ResourceProfile: Default ResourceProfile created, executor resources: Map(cores -> name: cores, amount: 1, script: , vendor: , memory -> name: memory, amount: 1024, script: , vendor: , offHeap -> name: offHeap, amount: 0, script: , vendor: ), task resources: Map(cpus -> name: cpus, amount: 1.0)
[2024-07-08T04:03:25.324+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO ResourceProfile: Limiting resource is cpu
[2024-07-08T04:03:25.325+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO ResourceProfileManager: Added ResourceProfile id: 0
[2024-07-08T04:03:25.366+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO SecurityManager: Changing view acls to: ...
[2024-07-08T04:03:25.366+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO SecurityManager: Changing modify acls to: ...
[2024-07-08T04:03:25.366+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO SecurityManager: Changing view acls groups to:
[2024-07-08T04:03:25.366+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO SecurityManager: Changing modify acls groups to:
[2024-07-08T04:03:25.366+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: ...; groups with view permissions: EMPTY; users with modify permissions: ...; groups with modify permissions: EMPTY
[2024-07-08T04:03:25.530+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO Utils: Successfully started service 'sparkDriver' on port 46335.
[2024-07-08T04:03:25.573+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO SparkEnv: Registering MapOutputTracker
[2024-07-08T04:03:25.599+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO SparkEnv: Registering BlockManagerMaster
[2024-07-08T04:03:25.613+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
[2024-07-08T04:03:25.614+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
[2024-07-08T04:03:25.616+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO SparkEnv: Registering BlockManagerMasterHeartbeat
[2024-07-08T04:03:25.636+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-4b100128-7392-4391-9e8b-246cd1553e69
[2024-07-08T04:03:25.646+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO MemoryStore: MemoryStore started with capacity 366.3 MiB
[2024-07-08T04:03:25.655+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO SparkEnv: Registering OutputCommitCoordinator
[2024-07-08T04:03:25.772+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO JettyUtils: Start Jetty 0.0.0.0:4040 for SparkUI
[2024-07-08T04:03:25.817+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO Utils: Successfully started service 'SparkUI' on port 4040.
[2024-07-08T04:03:25.840+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO SparkContext: Added file file:///tmp/tmpm1um6ti9/SCRIPT__GENERATED__AIRFLOW.IN at file:///tmp/tmpm1um6ti9/SCRIPT__GENERATED__AIRFLOW.IN with timestamp 1720379005275
[2024-07-08T04:03:25.841+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO Utils: Copying /tmp/tmpm1um6ti9/SCRIPT__GENERATED__AIRFLOW.IN to /tmp/spark-9b28ce8b-5c3d-45da-8eea-97c9263b9fcb/userFiles-3f1aae95-8377-4cdc-9f34-104f907a5be2/SCRIPT__GENERATED__AIRFLOW.IN
[2024-07-08T04:03:25.893+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO Executor: Starting executor ID driver on host 10.255.255.254
[2024-07-08T04:03:25.893+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO Executor: OS info Linux, 5.15.153.1-microsoft-standard-WSL2, amd64
[2024-07-08T04:03:25.893+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO Executor: Java version 1.8.0_402
[2024-07-08T04:03:25.898+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO Executor: Starting executor with user classpath (userClassPathFirst = false): ''
[2024-07-08T04:03:25.898+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO Executor: Created or updated repl class loader org.apache.spark.util.MutableURLClassLoader@78dd0225 for default.
[2024-07-08T04:03:25.906+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO Executor: Fetching file:///tmp/tmpm1um6ti9/SCRIPT__GENERATED__AIRFLOW.IN with timestamp 1720379005275
[2024-07-08T04:03:25.920+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO Utils: /tmp/tmpm1um6ti9/SCRIPT__GENERATED__AIRFLOW.IN has been previously copied to /tmp/spark-9b28ce8b-5c3d-45da-8eea-97c9263b9fcb/userFiles-3f1aae95-8377-4cdc-9f34-104f907a5be2/SCRIPT__GENERATED__AIRFLOW.IN
[2024-07-08T04:03:25.929+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 45419.
[2024-07-08T04:03:25.929+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO NettyBlockTransferService: Server created on 10.255.255.254:45419
[2024-07-08T04:03:25.930+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
[2024-07-08T04:03:25.934+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.255.255.254, 45419, None)
[2024-07-08T04:03:25.936+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO BlockManagerMasterEndpoint: Registering block manager 10.255.255.254:45419 with 366.3 MiB RAM, BlockManagerId(driver, 10.255.255.254, 45419, None)   
[2024-07-08T04:03:25.937+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.255.255.254, 45419, None)
[2024-07-08T04:03:25.938+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:25 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.255.255.254, 45419, None)
[2024-07-08T04:03:26.212+0900] {spark_submit.py:573} INFO - spark: <pyspark.sql.session.SparkSession object at 0x7f89034e6b00>      <------ print
[2024-07-08T04:03:26.212+0900] {spark_submit.py:573} INFO - sc: <SparkContext master=local appName=arrow-spark>
[2024-07-08T04:03:26.213+0900] {spark_submit.py:573} INFO - this is Test 12345
[2024-07-08T04:03:26.232+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:26 INFO SparkContext: Invoking stop() from shutdown hook
[2024-07-08T04:03:26.232+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:26 INFO SparkContext: SparkContext is stopping with exitCode 0.
[2024-07-08T04:03:26.239+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:26 INFO SparkUI: Stopped Spark web UI at http://10.255.255.254:4040
[2024-07-08T04:03:26.244+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:26 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
[2024-07-08T04:03:26.250+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:26 INFO MemoryStore: MemoryStore cleared
[2024-07-08T04:03:26.250+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:26 INFO BlockManager: BlockManager stopped
[2024-07-08T04:03:26.253+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:26 INFO BlockManagerMaster: BlockManagerMaster stopped
[2024-07-08T04:03:26.254+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:26 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
[2024-07-08T04:03:26.257+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:26 INFO SparkContext: Successfully stopped SparkContext
[2024-07-08T04:03:26.257+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:26 INFO ShutdownHookManager: Shutdown hook called
[2024-07-08T04:03:26.257+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:26 INFO ShutdownHookManager: Deleting directory /tmp/spark-9b28ce8b-5c3d-45da-8eea-97c9263b9fcb
[2024-07-08T04:03:26.258+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:26 INFO ShutdownHookManager: Deleting directory /tmp/spark-9b28ce8b-5c3d-45da-8eea-97c9263b9fcb/pyspark-17620b82-dc94-40c7-8432-d508a710abe0
[2024-07-08T04:03:26.259+0900] {spark_submit.py:573} INFO - 24/07/08 04:03:26 INFO ShutdownHookManager: Deleting directory /tmp/spark-28c195a3-314b-4364-9311-5b0003c0e9d2

@softyoungha softyoungha marked this pull request as ready for review July 7, 2024 19:17
@softyoungha softyoungha force-pushed the feature/add-pyspark_submit-decorator branch from 7077c04 to 4423572 Compare July 8, 2024 17:53
"variable that will be automatically set within the Python process of the Spark "
"job submitted via spark-submit."
)
warnings.warn(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the motivation for this warning?
If the input is invalid we should raise exception don't we?

Copy link
Contributor Author

@softyoungha softyoungha Jul 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eladkal
Oh, I made a mistake: instead of writing if key in kwargs, if key in op_kwargs

The part you mentioned was considered with the following cases

@task.pyspark_submit(
    task_id="my-task",
    ...,
)
def pyspark_job(sc: SparkContext, spark: SparkSession):
    ...

# wrong argument case, but it works
pyspark_job(sc=1, spark=3)
# sc/spark will be injected by SparkContext/SparkSession internally during spark-submit

# args
if {bool(self.op_args or self.op_kwargs)}:
SparkSession.builder.getOrCreate()
with open(SparkFiles.get("{INPUT_FILENAME}"), "rb") as file:
arg_dict = {self.pickling_library.__name__}.load(file)
else:
arg_dict = {{default_arg_dict}}
if {use_spark_session}:
arg_dict["kwargs"]["spark"] = SparkSession.builder.getOrCreate()
if {use_spark_context}:
spark = arg_dict.get("spark") or SparkSession.builder.getOrCreate()
arg_dict["kwargs"]["sc"] = spark.sparkContext

In get_pyspark_source, op_args and op_kwargs are read first, then spark and sc are injected. I thought it was necessary to inform the user that even if they inject different values into sc and spark, they will still be internally assigned as SparkSession and SparkContext.
conf.getboolean('operators', 'ALLOW_ILLEGAL_ARGUMENTS') determines whether to raise a warning or an exception.
Doing it as an exception, as you mentioned, doesn't seem bad

Comment on lines 40 to 57
# Script
{{ python_callable_source }}

{% if use_arguments %}
SparkSession.builder.getOrCreate()
with open(SparkFiles.get("{{ input_filename }}"), "rb") as file:
arg_dict = {{ pickling_library }}.load(file)
{% else %}
arg_dict = {"args": [], "kwargs": {}}
{% endif %}

{% if use_spark_session %}
arg_dict["kwargs"]["spark_session"] = SparkSession.builder.getOrCreate()
{% endif %}

{% if use_spark_context %}
arg_dict["kwargs"]["spark_context"] = SparkSession.builder.getOrCreate().sparkContext
{% endif %}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have modified the part of the code that parses strings to use Jinja2 templates instead.
I will update and refactor the tests accordingly

@softyoungha softyoungha force-pushed the feature/add-pyspark_submit-decorator branch from 3623498 to 410e8c8 Compare July 15, 2024 16:06
@softyoungha softyoungha requested a review from eladkal July 15, 2024 16:07
@softyoungha softyoungha force-pushed the feature/add-pyspark_submit-decorator branch 2 times, most recently from e2e7459 to f3b3a81 Compare July 22, 2024 17:47
@softyoungha softyoungha force-pushed the feature/add-pyspark_submit-decorator branch 2 times, most recently from a5fd55f to 7b66de8 Compare August 1, 2024 11:34
@softyoungha softyoungha force-pushed the feature/add-pyspark_submit-decorator branch from 7b66de8 to f85360c Compare August 9, 2024 17:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support @task.spark_submit or @task.pyspark_submit
3 participants