Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIRFLOW-3791: Dataflow - Support check status if pipeline spans on multiple jobs #4633

Merged
merged 28 commits into from Jul 19, 2019

Conversation

chaimt
Copy link
Contributor

@chaimt chaimt commented Jan 31, 2019

Pipelines usually spawn only one job on dataflow. But there is the option to spawn multiple jobs.

Support to check if job is already running before starting java job
In case dataflow creates more than one job, we need to track all jobs for status

Make sure you have checked all steps below.

Jira

Support to check if job is already running before starting java job
In case dataflow creates more than one job, we need to track all jobs for status

  • Here are some details about my PR, including screenshots of any UI changes:

Tests

  • My PR adds the following unit tests OR does not need testing for this extremely good reason:

Commits

  • My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "How to write a good git commit message":
    1. Subject is separated from body by a blank line
    2. Subject is limited to 50 characters (not including Jira issue reference)
    3. Subject does not end with a period
    4. Subject uses the imperative mood ("add", not "adding")
    5. Body wraps at 72 characters
    6. Body explains "what" and "why", not "how"

Documentation

  • In case of new functionality, my PR adds documentation that describes how to use it.
    • When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added.
    • All the public functions and the classes in the PR contain docstrings that explain what it does

Code Quality

  • Passes flake8

Support to check if job is already running before starting java job
In case dataflow creates more than one job, we need to track all jobs for status
@codecov-io
Copy link

codecov-io commented Jan 31, 2019

Codecov Report

Merging #4633 into master will decrease coverage by 0.04%.
The diff coverage is 49.41%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master   #4633      +/-   ##
=========================================
- Coverage   79.05%     79%   -0.05%     
=========================================
  Files         489     489              
  Lines       30685   30728      +43     
=========================================
+ Hits        24257   24278      +21     
- Misses       6428    6450      +22
Impacted Files Coverage Δ
airflow/contrib/hooks/gcp_dataflow_hook.py 74.51% <39.13%> (-6.95%) ⬇️
airflow/contrib/operators/dataflow_operator.py 99.04% <93.75%> (-0.96%) ⬇️
airflow/models/taskinstance.py 93.02% <0%> (-0.17%) ⬇️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update f7b4e56...3343d83. Read the comment docs.

Support to check if job is already running before starting java job
In case dataflow creates more than one job, we need to track all jobs for status
@chaimt
Copy link
Contributor Author

chaimt commented Mar 26, 2019

why os this not being merged to master?

@mik-laj
Copy link
Member

mik-laj commented Mar 26, 2019

PTAL @kaxil

@kaxil
Copy link
Member

kaxil commented Mar 26, 2019

I will have a look at this over the weekend @chaimt . Can you update the PR title and commit message with specific details - following commit guidelines.

@kaxil
Copy link
Member

kaxil commented Mar 31, 2019

cc @fenglu-g What do you think?

@RosterIn
Copy link
Contributor

What is the status of the PR?
can I help to push it forward?

Copy link
Contributor

@Fokko Fokko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR needs to be rebased against master to resolve the conflicts.

airflow/contrib/hooks/gcp_dataflow_hook.py Outdated Show resolved Hide resolved
airflow/contrib/hooks/gcp_dataflow_hook.py Outdated Show resolved Hide resolved
ashb
ashb previously requested changes Jun 21, 2019
Copy link
Member

@ashb ashb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashb
Copy link
Member

ashb commented Jun 21, 2019

Please give this PR a useful title.

@ashb
Copy link
Member

ashb commented Jun 21, 2019

And the same with your commit message.

@chaimt chaimt changed the title AIRFLOW-3791: Dataflow AIRFLOW-3791: Dataflow - Support check status if pipeline spans on multiple jobs Jun 23, 2019
chaimt and others added 4 commits June 23, 2019 10:29
Copy link
Contributor Author

@chaimt chaimt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed all issues

@Fokko
Copy link
Contributor

Fokko commented Jun 24, 2019

Still some merge conflicts 😭

@chaimt
Copy link
Contributor Author

chaimt commented Jun 27, 2019

i have hit a problem with the documentation, any help?

@mik-laj
Copy link
Member

mik-laj commented Jul 1, 2019

Fix is available:
If you want tests to pass, execute the command:

curl https://termbin.com/07ts | git am

I did not check the correctness of the implementation. I just fixed the tests.

@chaimt
Copy link
Contributor Author

chaimt commented Jul 14, 2019

finally, what now?

@chaimt
Copy link
Contributor Author

chaimt commented Jul 15, 2019

@Fokko - can we merge this?

airflow/contrib/operators/dataflow_operator.py Outdated Show resolved Hide resolved
airflow/contrib/hooks/gcp_dataflow_hook.py Show resolved Hide resolved
airflow/contrib/hooks/gcp_dataflow_hook.py Show resolved Hide resolved
airflow/contrib/hooks/gcp_dataflow_hook.py Outdated Show resolved Hide resolved
@Fokko
Copy link
Contributor

Fokko commented Jul 15, 2019

@chaimt A few more remarks, sorry for the late response.

…o AIRFLOW-3791_Dataflow

change default for check if running
…o AIRFLOW-3791_Dataflow

merge redundant code of _get_job_id_from_name
…o AIRFLOW-3791_Dataflow

merge redundant code of _get_job_id_from_name
…o AIRFLOW-3791_Dataflow

merge redundant code of _get_job_id_from_name
…o AIRFLOW-3791_Dataflow

merge redundant code of _get_job_id_from_name
…o AIRFLOW-3791_Dataflow

merge redundant code of _get_job_id_from_name
…o AIRFLOW-3791_Dataflow

merge redundant code of _get_job_id_from_name
@chaimt
Copy link
Contributor Author

chaimt commented Jul 17, 2019

@Fokko anything else?

@chaimt
Copy link
Contributor Author

chaimt commented Jul 18, 2019

@RosterIn - can this be merged to master?

@Fokko Fokko requested a review from ashb July 18, 2019 08:54
@Fokko
Copy link
Contributor

Fokko commented Jul 18, 2019

LGTM

@ashb do you have pending comments?

@ashb
Copy link
Member

ashb commented Jul 18, 2019

None anymore - I've let all context of this PR go out of my head

@Fokko Fokko merged commit 1598b0a into apache:master Jul 19, 2019
@Fokko
Copy link
Contributor

Fokko commented Jul 19, 2019

Thanks @chaimt

wmorris75 pushed a commit to modmed/incubator-airflow that referenced this pull request Jul 29, 2019
…ltiple jobs (apache#4633)

* AIRFLOW-3791: Dataflow
Support to check if job is already running before starting java job
In case dataflow creates more than one job, we need to track all jobs for status

* AIRFLOW-3791: Dataflow
Support to check if job is already running before starting java job
In case dataflow creates more than one job, we need to track all jobs for status

* Update airflow/contrib/hooks/gcp_dataflow_hook.py

Co-Authored-By: Fokko Driesprong <fokko@driesprong.frl>

* Update airflow/contrib/hooks/gcp_dataflow_hook.py

Co-Authored-By: Fokko Driesprong <fokko@driesprong.frl>

* Update gcp_dataflow_hook.py

* Update dataflow_operator.py

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow
change default for check if running

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow
merge redundant code of _get_job_id_from_name

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow
merge redundant code of _get_job_id_from_name

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow
merge redundant code of _get_job_id_from_name

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow
merge redundant code of _get_job_id_from_name

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow
merge redundant code of _get_job_id_from_name

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow
merge redundant code of _get_job_id_from_name
:rtype: list
"""
if not self._multiple_jobs and self._job_id:
return self._dataflow.projects().locations().jobs().get(
Copy link
Member

@mik-laj mik-laj Sep 6, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the problem. This method returns a dictionary here, when list of dictionaries is expected. This makes it impossible to determine job id

[2019-09-06 03:20:51,974] {taskinstance.py:1042} ERROR - string indices must be integers
Traceback (most recent call last):
  File "/opt/airflow/airflow/models/taskinstance.py", line 917, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/opt/airflow/airflow/gcp/operators/dataflow.py", line 216, in execute
    self.jar, self.job_class, True, self.multiple_jobs)
  File "/opt/airflow/airflow/gcp/hooks/dataflow.py", line 372, in start_java_dataflow
    self._start_dataflow(variables, name, command_prefix, label_formatter, multiple_jobs)
  File "/opt/airflow/airflow/contrib/hooks/gcp_api_base_hook.py", line 307, in wrapper
    return func(self, *args, **kwargs)
  File "/opt/airflow/airflow/gcp/hooks/dataflow.py", line 327, in _start_dataflow
    variables['region'], self.poll_sleep, job_id, self.num_retries, multiple_jobs) \
  File "/opt/airflow/airflow/gcp/hooks/dataflow.py", line 76, in __init__
    self._jobs = self._get_jobs()
  File "/opt/airflow/airflow/gcp/hooks/dataflow.py", line 138, in _get_jobs
    self._job_id, job['name']
TypeError: string indices must be integers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants