Skip to content

Commit

Permalink
Add ability to run streaming Job for BeamRunPythonPipelineOperator in…
Browse files Browse the repository at this point in the history
… non deferrable mode (#36108)
  • Loading branch information
MaksYermak committed Dec 18, 2023
1 parent cbe8245 commit d793fda
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 15 deletions.
14 changes: 12 additions & 2 deletions airflow/providers/apache/beam/hooks/beam.py
Expand Up @@ -90,6 +90,7 @@ def process_fd(
fd,
log: logging.Logger,
process_line_callback: Callable[[str], None] | None = None,
check_job_status_callback: Callable[[], bool | None] | None = None,
):
"""
Print output to logs.
Expand All @@ -111,13 +112,16 @@ def process_fd(
if process_line_callback:
process_line_callback(line)
func_log(line.rstrip("\n"))
if check_job_status_callback and check_job_status_callback():
return


def run_beam_command(
cmd: list[str],
log: logging.Logger,
process_line_callback: Callable[[str], None] | None = None,
working_directory: str | None = None,
check_job_status_callback: Callable[[], bool | None] | None = None,
) -> None:
"""
Run pipeline command in subprocess.
Expand Down Expand Up @@ -149,14 +153,16 @@ def run_beam_command(
continue

for readable_fd in readable_fds:
process_fd(proc, readable_fd, log, process_line_callback)
process_fd(proc, readable_fd, log, process_line_callback, check_job_status_callback)
if check_job_status_callback and check_job_status_callback():
return

if proc.poll() is not None:
break

# Corner case: check if more output was created between the last read and the process termination
for readable_fd in reads:
process_fd(proc, readable_fd, log, process_line_callback)
process_fd(proc, readable_fd, log, process_line_callback, check_job_status_callback)

log.info("Process exited with return code: %s", proc.returncode)

Expand Down Expand Up @@ -187,6 +193,7 @@ def _start_pipeline(
command_prefix: list[str],
process_line_callback: Callable[[str], None] | None = None,
working_directory: str | None = None,
check_job_status_callback: Callable[[], bool | None] | None = None,
) -> None:
cmd = [*command_prefix, f"--runner={self.runner}"]
if variables:
Expand All @@ -196,6 +203,7 @@ def _start_pipeline(
process_line_callback=process_line_callback,
working_directory=working_directory,
log=self.log,
check_job_status_callback=check_job_status_callback,
)

def start_python_pipeline(
Expand All @@ -207,6 +215,7 @@ def start_python_pipeline(
py_requirements: list[str] | None = None,
py_system_site_packages: bool = False,
process_line_callback: Callable[[str], None] | None = None,
check_job_status_callback: Callable[[], bool | None] | None = None,
):
"""
Start Apache Beam python pipeline.
Expand Down Expand Up @@ -279,6 +288,7 @@ def start_python_pipeline(
variables=variables,
command_prefix=command_prefix,
process_line_callback=process_line_callback,
check_job_status_callback=check_job_status_callback,
)

def start_java_pipeline(
Expand Down
48 changes: 42 additions & 6 deletions airflow/providers/apache/beam/operators/beam.py
Expand Up @@ -67,15 +67,16 @@ def _set_dataflow(
self,
pipeline_options: dict,
job_name_variable_key: str | None = None,
) -> tuple[str, dict, Callable[[str], None]]:
) -> tuple[str, dict, Callable[[str], None], Callable[[], bool | None]]:
self.dataflow_hook = self.__set_dataflow_hook()
self.dataflow_config.project_id = self.dataflow_config.project_id or self.dataflow_hook.project_id
dataflow_job_name = self.__get_dataflow_job_name()
pipeline_options = self.__get_dataflow_pipeline_options(
pipeline_options, dataflow_job_name, job_name_variable_key
)
process_line_callback = self.__get_dataflow_process_callback()
return dataflow_job_name, pipeline_options, process_line_callback
check_job_status_callback = self.__check_dataflow_job_status_callback()
return dataflow_job_name, pipeline_options, process_line_callback, check_job_status_callback

def __set_dataflow_hook(self) -> DataflowHook:
self.dataflow_hook = DataflowHook(
Expand Down Expand Up @@ -123,6 +124,19 @@ def set_current_dataflow_job_id(job_id):
on_new_job_id_callback=set_current_dataflow_job_id
)

def __check_dataflow_job_status_callback(self) -> Callable[[], bool | None]:
def check_dataflow_job_status() -> bool | None:
if self.dataflow_job_id and self.dataflow_hook:
return self.dataflow_hook.is_job_done(
location=self.dataflow_config.location,
project_id=self.dataflow_config.project_id,
job_id=self.dataflow_job_id,
)
else:
return None

return check_dataflow_job_status


class BeamBasePipelineOperator(BaseOperator, BeamDataflowMixin, ABC):
"""
Expand Down Expand Up @@ -184,14 +198,20 @@ def _init_pipeline_options(
self,
format_pipeline_options: bool = False,
job_name_variable_key: str | None = None,
) -> tuple[bool, str | None, dict, Callable[[str], None] | None]:
) -> tuple[bool, str | None, dict, Callable[[str], None] | None, Callable[[], bool | None] | None]:
self.beam_hook = BeamHook(runner=self.runner)
pipeline_options = self.default_pipeline_options.copy()
process_line_callback: Callable[[str], None] | None = None
check_job_status_callback: Callable[[], bool | None] | None = None
is_dataflow = self.runner.lower() == BeamRunnerType.DataflowRunner.lower()
dataflow_job_name: str | None = None
if is_dataflow:
dataflow_job_name, pipeline_options, process_line_callback = self._set_dataflow(
(
dataflow_job_name,
pipeline_options,
process_line_callback,
check_job_status_callback,
) = self._set_dataflow(
pipeline_options=pipeline_options,
job_name_variable_key=job_name_variable_key,
)
Expand All @@ -203,9 +223,21 @@ def _init_pipeline_options(
snake_case_pipeline_options = {
convert_camel_to_snake(key): pipeline_options[key] for key in pipeline_options
}
return is_dataflow, dataflow_job_name, snake_case_pipeline_options, process_line_callback
return (
is_dataflow,
dataflow_job_name,
snake_case_pipeline_options,
process_line_callback,
check_job_status_callback,
)

return is_dataflow, dataflow_job_name, pipeline_options, process_line_callback
return (
is_dataflow,
dataflow_job_name,
pipeline_options,
process_line_callback,
check_job_status_callback,
)


class BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
Expand Down Expand Up @@ -297,6 +329,7 @@ def execute(self, context: Context):
self.dataflow_job_name,
self.snake_case_pipeline_options,
self.process_line_callback,
self.check_job_status_callback,
) = self._init_pipeline_options(format_pipeline_options=True, job_name_variable_key="job_name")
if not self.beam_hook:
raise AirflowException("Beam hook is not defined.")
Expand Down Expand Up @@ -329,6 +362,7 @@ def execute_sync(self, context: Context):
py_requirements=self.py_requirements,
py_system_site_packages=self.py_system_site_packages,
process_line_callback=self.process_line_callback,
check_job_status_callback=self.check_job_status_callback,
)
DataflowJobLink.persist(
self,
Expand Down Expand Up @@ -495,6 +529,7 @@ def execute(self, context: Context):
dataflow_job_name,
pipeline_options,
process_line_callback,
_,
) = self._init_pipeline_options()

if not self.beam_hook:
Expand Down Expand Up @@ -668,6 +703,7 @@ def execute(self, context: Context):
dataflow_job_name,
snake_case_pipeline_options,
process_line_callback,
_,
) = self._init_pipeline_options(format_pipeline_options=True, job_name_variable_key="job_name")

if not self.beam_hook:
Expand Down
18 changes: 18 additions & 0 deletions airflow/providers/google/cloud/hooks/dataflow.py
Expand Up @@ -1203,6 +1203,24 @@ def wait_for_done(
)
job_controller.wait_for_done()

@GoogleBaseHook.fallback_to_default_project_id
def is_job_done(self, location: str, project_id: str, job_id: str) -> bool:
"""
Check that Dataflow job is started(for streaming job) or finished(for batch job).
:param location: location the job is running
:param project_id: Google Cloud project ID in which to start a job
:param job_id: Dataflow job ID
"""
job_controller = _DataflowJobsController(
dataflow=self.get_conn(),
project_number=project_id,
location=location,
)
job = job_controller.fetch_job_by_id(job_id)

return job_controller._check_dataflow_job_state(job)


class AsyncDataflowHook(GoogleBaseAsyncHook):
"""Async hook class for dataflow service."""
Expand Down
6 changes: 6 additions & 0 deletions airflow/providers/google/cloud/operators/dataflow.py
Expand Up @@ -1286,6 +1286,12 @@ class DataflowStopJobOperator(GoogleCloudBaseOperator):
:param stop_timeout: wait time in seconds for successful job canceling/draining
"""

template_fields = [
"job_id",
"project_id",
"impersonation_chain",
]

def __init__(
self,
job_name_prefix: str | None = None,
Expand Down
18 changes: 12 additions & 6 deletions docs/apache-airflow-providers-google/operators/cloud/dataflow.rst
Expand Up @@ -65,17 +65,17 @@ Starting Non-templated pipeline

To create a new pipeline using the source file (JAR in Java or Python file) use
the create job operators. The source file can be located on GCS or on the local filesystem.
:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreateJavaJobOperator`
:class:`~airflow.providers.apache.beam.operators.beam.BeamRunJavaPipelineOperator`
or
:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreatePythonJobOperator`
:class:`~airflow.providers.apache.beam.operators.beam.BeamRunPythonPipelineOperator`

.. _howto/operator:DataflowCreateJavaJobOperator:

Java SDK pipelines
""""""""""""""""""

For Java pipeline the ``jar`` argument must be specified for
:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreateJavaJobOperator`
:class:`~airflow.providers.apache.beam.operators.beam.BeamRunJavaPipelineOperator`
as it contains the pipeline to be executed on Dataflow. The JAR can be available on GCS that Airflow
has the ability to download or available on the local filesystem (provide the absolute path to it).

Expand All @@ -101,7 +101,7 @@ Python SDK pipelines
""""""""""""""""""""

The ``py_file`` argument must be specified for
:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreatePythonJobOperator`
:class:`~airflow.providers.apache.beam.operators.beam.BeamRunPythonPipelineOperator`
as it contains the pipeline to be executed on Dataflow. The Python file can be available on GCS that Airflow
has the ability to download or available on the local filesystem (provide the absolute path to it).

Expand Down Expand Up @@ -129,8 +129,8 @@ Dataflow has multiple options of executing pipelines. It can be done in the foll
batch asynchronously (fire and forget), batch blocking (wait until completion), or streaming (run indefinitely).
In Airflow it is best practice to use asynchronous batch pipelines or streams and use sensors to listen for expected job state.

By default :class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreateJavaJobOperator`,
:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowCreatePythonJobOperator`,
By default :class:`~airflow.providers.apache.beam.operators.beam.BeamRunJavaPipelineOperator`,
:class:`~airflow.providers.apache.beam.operators.beam.BeamRunPythonPipelineOperator`,
:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowTemplatedJobStartOperator` and
:class:`~airflow.providers.google.cloud.operators.dataflow.DataflowStartFlexTemplateOperator`
have argument ``wait_until_finished`` set to ``None`` which cause different behaviour depends on the type of pipeline:
Expand Down Expand Up @@ -175,6 +175,12 @@ Streaming execution
To execute a streaming Dataflow job, ensure the streaming option is set (for Python) or read from an unbounded data
source, such as Pub/Sub, in your pipeline (for Java).

.. exampleinclude:: /../../tests/system/providers/google/cloud/dataflow/example_dataflow_streaming_python.py
:language: python
:dedent: 4
:start-after: [START howto_operator_start_streaming_python_job]
:end-before: [END howto_operator_start_streaming_python_job]

Setting argument ``drain_pipeline`` to ``True`` allows to stop streaming job by draining it
instead of canceling during killing task instance.

Expand Down

0 comments on commit d793fda

Please sign in to comment.