New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add DataflowStartFlexTemplateOperator #8550
Conversation
@jaketf Can I ask for review? I know that you are also interested in integration with Dataflow. Will this solve your clients' problems fully? If they use it, they will no longer have to use KubernetesPodOperator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/cc: @azurezyq
@@ -93,15 +93,30 @@ def inner_wrapper(self: "DataflowHook", *args, **kwargs) -> RT: | |||
class DataflowJobStatus: | |||
""" | |||
Helper class with Dataflow job statuses. | |||
Reference: https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job.JobState |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is also modified in #8553. I am assuming it is the same changes, skipping this file for the review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I replied here #8553 (comment) and changed here 92858eb
def on_kill(self) -> None: | ||
self.log.info("On kill.") | ||
if self.job_id: | ||
self.hook.cancel_job(job_id=self.job_id, project_id=self.project_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to call this if job is no longer running?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I will change it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it here 1eac772
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Improved it here 4b78b27
f.write( | ||
textwrap.dedent( | ||
"""\ | ||
steps: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is being built in this cloud image pipeline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am building an image from your repository.
https://github.com/GoogleCloudPlatform/java-docs-samples/tree/954553c/dataflow/flex-templates/streaming_beam_sql
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack. Does it required this many containers in the image?
I am not very familiar. Is this a VM image with containers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed some build steps.
aa29389
In order to build this Docker image I used Google Cloud Build. The Cloud Build service will provide a new virtual machine for each build. Each build contains many steps. Each step is described by a Docker image. As a result of the build, a new Docker image is built.
I personally would not worry about a large number of images, because after the build is completed, the virtual machine is deleted. It does not run on the local machine.
@@ -15,9 +15,20 @@ | |||
# KIND, either express or implied. See the License for the | |||
# specific language governing permissions and limitations | |||
# under the License. | |||
import os |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file reads more like an end2end test of various dataflow system things. Should it be a different PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We always try to write system tests and integrations simultaneously to ensure the best integration reliability. Thanks to this, every person on my team can easily test every integration.
schedule_interval=None, # Override to match your needs | ||
) as dag_flex_template: | ||
start_flex_template = DataflowStartFlexTemplateOperator( | ||
task_id="start_flex_template_java", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe drop java from the name? (Given a template structure, what language was used to write it matters less.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I renamed it to start_flex_template_streaming_beam_sql
here d788bab
""" | ||
Starts flex templates with the Dataflow pipeline. | ||
|
||
:param body: The request body |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use more descriptive name launch_flex_template_parameters
If we are not going to construct this body here and document the parameters then this should include link tot he API spec for this model.
Unfortunately it looks like we mention this end point here in flex templates docs but not here in the REST API docs.
I've opened an internal docs bug on this.
In the mean time I'm pretty sure this body is just the same models as the "old" templates endpoint uses LaunchTemplateParameters.
@aaltay can you confirm?
Looks like LaunchTemplateParameters is missing the container_spec_gcs_path
key which is necessary for this endpoint.
Why is this key snake_case in the API while all others (e.g. jobName) are camelCase anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see documentation of the request body is available now:
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.locations.flexTemplates/launch#request-body
Thanks taking care of it @jaketf
I added links to in hook and operator.
In relation to naming I think snake_case is used as standard for python and camelCase when it is required by external APIs (usually within dictionaries). I am not sure I understand correctly your question. Could you elaborate please?
""" | ||
Starts flex templates with the Dataflow pipeline. | ||
|
||
:param body: The request body |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see comment on same param in hook.
get_method = ( | ||
self.mock_dataflow.projects.return_value. | ||
locations.return_value. | ||
jobs.return_value. | ||
get | ||
) | ||
get_method.return_value.execute.return_value = job | ||
get_method.return_value.execute.side_effect = [ | ||
{"id": TEST_JOB_ID, "name": TEST_JOB_NAME, "currentState": DataflowJobStatus.JOB_STATE_RUNNING}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note from other PR #8553 see how JobStatus.JOB_STATE
is redundant stutter?
Especially if we allow user to control failed states we should remove the stutter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
created separate issue for that: #11205
location=location, | ||
poll_sleep=self.poll_sleep, | ||
num_retries=self.num_retries) | ||
jobs_controller.wait_for_done() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mik-laj was reflecting on this in light of the data fusion operator issue.
This is "start" naming confusing.
This method (and DataflowStartFlexTemplateOperator) are called "start" flex template but this appears like you are waiting for the job to complete.
The existing dataflow operators do not have this start word and I think the user expectation is that they poll the job to completion. Otherwise you can't do much useful downstream in the DAG without having some sensor that waits on this job completion.
If we want to support blocking or not blocking I'd suggest having a wait_for_done
kwarg that defaults to True
(the expected behavior based on similar operators). This might mean that we need a new method in the controller wait_for_running
that blocks until the pipeline enters the RUNNING state.
What do you think?
same applies for #8553
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am trying to imitate the naming conventions that we currently have in this integration.
DataflowTemplatedJobStartOperator
It uses the verb "Start" to describe identical operations.
I plan to add a blocking and non-blocking mode to all operators in a separate PR, because this requires the development of sensors. For now, I only wanted to focus on one issue.
What's the current status? Do you have any progress? I want this patch, Can I help you anything? @mik-laj |
Hiello. |
@mik-laj Any update? I created that pull request PolideaInternal#952 to fix the conflict and some points which were discussed here. Can you see it if it's helpful? |
f.write( | ||
textwrap.dedent( | ||
"""\ | ||
steps: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered use standard dict
and use json.dumps
to generate file content? You don't use any YAML specific syntax and JSON is subset of YAML. I believe that dict
is easier to maintain long term than a hard-coded text block.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it was used for sake of simplicity. It is easier to take a look on the multiline string which represents quite simple YAML. Am I right @mik-laj ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it easier to read a YAML dictionary inserted as text (no formatting in most IDE) than a Python dictionary with correct editor formatting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, I changed my mind. I think dictionary looks better especially that Cloud Build accept JSON config files
cloud_build_config = {
'steps': [
{'name': 'gcr.io/cloud-builders/git', 'args': ['clone', "$_EXAMPLE_REPO", "repo_dir"]},
{
'name': 'gcr.io/cloud-builders/git',
'args': ['checkout', '$_EXAMPLE_COMMIT'],
'dir': 'repo_dir',
},
{
'name': 'maven',
'args': ['mvn', 'clean', 'package'],
'dir': 'repo_dir/$_EXAMPLE_SUBDIR',
},
{
'name': 'gcr.io/cloud-builders/docker',
'args': ['build', '-t', '$_TEMPLATE_IMAGE', '.'],
'dir': 'repo_dir/$_EXAMPLE_SUBDIR',
},
],
'images': ['$_TEMPLATE_IMAGE'],
}
ba5cc30
to
98d6c85
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you rebase?
af0da63
to
a9ba302
Compare
@ad-m I rebased on the lastest master and squashed commits. |
@mik-laj could you remove dont-merge label, please? |
""" | ||
if not self._jobs: | ||
raise ValueError("The _jobs should be set") | ||
while True: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider implementing a timeout?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have timeout: See: https://airflow.readthedocs.io/en/latest/_api/airflow/models/index.html#airflow.models.BaseOperator -> execution_timeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PTAL: 144b63f
Since the cancel
method is executed only by the operator (on_kill
) I didn't allow user to configure this timeout because I don't think it is worth to add complexity to the user and the code itself. It is added to prevent hanging this _wait_for_states
forever when execution_timeout
is not set and provide more meaningful log message if the timeout eventually occur.
I proposed 5 minutes of default timeout. @aaltay @kamilwu what do you think about this value ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is reasonable.
Side note, cancel usually completes quickly. On the other hand drain is the safer way to cancel streaming pipelines and that can take a long time depending on the state of the pipeline.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we use this only in on_kill, should we even bother with waiting? I would assume that a successful API call should be enough for us
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mik-laj what do you think about removing waiting? I lean towards @turbaszek opinion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it is not strictly related changed and I don't want to block this PR I deleted this part of logic and created separated PR for handling waiting and timeout in job cancel: #11501 (draft because I must add tests).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Hey @TobKed . Can you please rebase this one to the latest master. We fixed (hopefully) a problem with queues of jobs for GitHub actions and I think when you rebase, it should run much faster (more info on devlist shortly). |
c57cd16
to
ea04ead
Compare
rebased on the latest master |
Thanks @TobKed ! |
Ja także dziękuje @TobKed za rozwój projektu Airflow! |
Make sure to mark the boxes below before creating PR: [x]
In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.