Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AIRFLOW-3791: Dataflow - Support check status if pipeline spans on multiple jobs #4633

Merged
merged 28 commits into from Jul 19, 2019
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0fbc4dc
AIRFLOW-3791: Dataflow
chaimt Jan 31, 2019
d576bf6
AIRFLOW-3791: Dataflow
chaimt Feb 3, 2019
fbfac70
Update airflow/contrib/hooks/gcp_dataflow_hook.py
chaimt Jun 23, 2019
0046685
Update airflow/contrib/hooks/gcp_dataflow_hook.py
chaimt Jun 23, 2019
3203570
Update gcp_dataflow_hook.py
chaimt Jun 23, 2019
3d8ad47
Update dataflow_operator.py
chaimt Jun 23, 2019
41231a9
Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow int…
chaimt Jun 23, 2019
c40d8f0
Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow int…
chaimt Jun 24, 2019
2f6b606
Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow int…
chaimt Jun 24, 2019
840bd8a
Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow int…
chaimt Jun 25, 2019
a34e8c2
Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow int…
chaimt Jun 25, 2019
cacd7ea
Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow int…
chaimt Jun 25, 2019
b175c21
Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow int…
chaimt Jun 25, 2019
f6516ec
Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow int…
chaimt Jun 26, 2019
5673ab6
Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow int…
chaimt Jun 27, 2019
280c40c
Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow int…
chaimt Jun 28, 2019
aca5540
Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow int…
chaimt Jul 14, 2019
093deb6
Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow int…
chaimt Jul 14, 2019
578b555
Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow int…
chaimt Jul 14, 2019
07a81fe
Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow int…
chaimt Jul 14, 2019
a0125a2
Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow int…
chaimt Jul 16, 2019
cf49fd4
Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow int…
chaimt Jul 16, 2019
7bda854
Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow int…
chaimt Jul 16, 2019
f7d1579
Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow int…
chaimt Jul 16, 2019
805d4dd
Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow int…
chaimt Jul 16, 2019
baaf8f7
Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow int…
chaimt Jul 16, 2019
6ff96c5
Merge branch 'master' of https://github.com/apache/airflow into AIRFL…
chaimt Jul 16, 2019
3343d83
Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow int…
chaimt Jul 16, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
200 changes: 137 additions & 63 deletions airflow/contrib/hooks/gcp_dataflow_hook.py
Expand Up @@ -47,58 +47,131 @@ class DataflowJobStatus:
JOB_STATE_FAILED = "JOB_STATE_FAILED"
JOB_STATE_CANCELLED = "JOB_STATE_CANCELLED"
JOB_STATE_PENDING = "JOB_STATE_PENDING"
FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED}
SUCCEEDED_END_STATES = {JOB_STATE_DONE}
END_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES


class _DataflowJob(LoggingMixin):
def __init__(self, dataflow, project_number, name, location, poll_sleep=10,
job_id=None, num_retries=None):
job_id=None, num_retries=None, multiple_jobs=None):
self._dataflow = dataflow
self._project_number = project_number
self._job_name = name
self._job_location = location
self._multiple_jobs = multiple_jobs
chaimt marked this conversation as resolved.
Show resolved Hide resolved
self._job_id = job_id
self._num_retries = num_retries
self._job = self._get_job()
if self._num_retries is None:
self._num_retries = 0
self._poll_sleep = poll_sleep
self._jobs = self._get_jobs()

def is_job_running(self):
"""
Helper method to check if jos is still running in dataflow

:return: True if job is running.
:rtype: bool
"""
for job in self._jobs:
if job['currentState'] not in DataflowJobStatus.END_STATES:
return True
return False

def _get_job_id_from_name(self):
"""
Helper method to get list of jobs that start with job name

:return: list of jobs including id's
:rtype: list
"""
jobs = self._dataflow.projects().locations().jobs().list(
projectId=self._project_number,
location=self._job_location
).execute(num_retries=self._num_retries)
for job in jobs['jobs']:
if job['name'].lower() == self._job_name.lower():
self._job_id = job['id']
return job
return None
dataflow_jobs = []
if jobs:
for job in jobs['jobs']:
if job['name'].startswith(self._job_name.lower()):
dataflow_jobs.append(job)
if len(dataflow_jobs) == 1:
self._job_id = dataflow_jobs[0]['id']
return dataflow_jobs

def _get_jobs(self):
"""
Helper method to get all jobs by name

def _get_job(self):
if self._job_id:
job = self._dataflow.projects().locations().jobs().get(
:return: jobs
:rtype: list
"""
if not self._multiple_jobs and self._job_id:
self._jobs = []
self._jobs.append(self._dataflow.projects().locations().jobs().get(
chaimt marked this conversation as resolved.
Show resolved Hide resolved
projectId=self._project_number,
location=self._job_location,
jobId=self._job_id).execute(num_retries=self._num_retries)
jobId=self._job_id).execute(num_retries=self._num_retries))
elif self._job_name:
job = self._get_job_id_from_name()
self._jobs = self._get_job_id_from_name()
else:
raise Exception('Missing both dataflow job ID and name.')

if job and 'currentState' in job:
self.log.info(
'Google Cloud DataFlow job %s is %s',
job['name'], job['currentState']
)
elif job:
self.log.info(
'Google Cloud DataFlow with job_id %s has name %s',
self._job_id, job['name']
)
else:
self.log.info(
'Google Cloud DataFlow job not available yet..'
)
for job in self._jobs:
if job and 'currentState' in job:
self._job_state = job['currentState']
self.log.info(
'Google Cloud DataFlow job %s is %s',
job['name'], job['currentState']
)
elif job:
self.log.info(
'Google Cloud DataFlow with job_id %s has name %s',
self._job_id, job['name']
)
else:
self.log.info(
'Google Cloud DataFlow job not available yet..'
)

return job
return self._jobs

# pylint: disable=too-many-nested-blocks
def check_dataflow_job_state(self, job):
"""
Helper method to check the state of all jobs in dataflow for this task
if job failed raise exception
:return: True if job is done.
:rtype: bool
:raise: Exception
"""
if DataflowJobStatus.JOB_STATE_DONE == job['currentState']:
# check all jobs are done
count_not_done = 0
for inner_jobs in self._jobs:
if inner_jobs and 'currentState' in job:
if not DataflowJobStatus.JOB_STATE_DONE == inner_jobs['currentState']:
count_not_done += 1
if count_not_done == 0:
return True
elif DataflowJobStatus.JOB_STATE_RUNNING == job['currentState'] and \
DataflowJobStatus.JOB_TYPE_STREAMING == job['type']:
return True
elif DataflowJobStatus.JOB_STATE_FAILED == job['currentState']:
raise Exception("Google Cloud Dataflow job {} has failed.".format(
job['name']))
elif DataflowJobStatus.JOB_STATE_CANCELLED == job['currentState']:
raise Exception("Google Cloud Dataflow job {} was cancelled.".format(
job['name']))
elif job['currentState'] in {DataflowJobStatus.JOB_STATE_RUNNING,
chaimt marked this conversation as resolved.
Show resolved Hide resolved
DataflowJobStatus.JOB_STATE_PENDING}:
time.sleep(self._poll_sleep)
else:
self.log.debug(str(job))
raise Exception(
"Google Cloud Dataflow job {} was unknown state: {}".format(
job['name'], job['currentState']))
return False

def wait_for_done(self):
"""
Expand All @@ -109,37 +182,21 @@ def wait_for_done(self):
:raise: Exception
"""
while True:
if self._job and 'currentState' in self._job:
if self._job['currentState'] == DataflowJobStatus.JOB_STATE_DONE:
return True
elif self._job['currentState'] == DataflowJobStatus.JOB_STATE_RUNNING and \
self._job['type'] == DataflowJobStatus.JOB_TYPE_STREAMING:
return True
elif self._job['currentState'] == DataflowJobStatus.JOB_STATE_FAILED:
raise Exception("Google Cloud Dataflow job {} has failed.".format(
self._job['name']))
elif self._job['currentState'] == DataflowJobStatus.JOB_STATE_CANCELLED:
raise Exception("Google Cloud Dataflow job {} was cancelled.".format(
self._job['name']))
elif self._job['currentState'] == DataflowJobStatus.JOB_STATE_RUNNING:
time.sleep(self._poll_sleep)
elif self._job['currentState'] == DataflowJobStatus.JOB_STATE_PENDING:
time.sleep(15)
for job in self._jobs:
if job and 'currentState' in job:
if self.check_dataflow_job_state(job):
return True
else:
self.log.debug(str(self._job))
raise Exception(
"Google Cloud Dataflow job {} was unknown state: {}".format(
self._job['name'], self._job['currentState']))
else:
time.sleep(15)

self._job = self._get_job()
time.sleep(self._poll_sleep)
self._jobs = self._get_jobs()

def get(self):
"""
Returns Dataflow job.
:return: list of jobs
:rtype: list
"""
return self._job
return self._jobs


class _Dataflow(LoggingMixin):
Expand Down Expand Up @@ -183,14 +240,13 @@ def _extract_job(line):
matched_job = job_id_pattern.search(line or '')
if matched_job:
return matched_job.group(1).decode()

return None

def wait_for_done(self):
"""
Waits for Dataflow job to complete.

:return: Job id.
:return: Job id
:rtype: str
"""
reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()]
Expand Down Expand Up @@ -228,6 +284,7 @@ class DataFlowHook(GoogleCloudBaseHook):
All the methods in the hook where project_id is used must be called with
keyword arguments rather than positional.
"""

def __init__(self,
gcp_conn_id='google_cloud_default',
delegate_to=None,
Expand All @@ -245,14 +302,13 @@ def get_conn(self):
'dataflow', 'v1b3', http=http_authorized, cache_discovery=False)

@GoogleCloudBaseHook._Decorators.provide_gcp_credential_file
def _start_dataflow(self, variables, name, command_prefix, label_formatter):
def _start_dataflow(self, variables, name, command_prefix, label_formatter, multiple_jobs=False):
variables = self._set_variables(variables)
cmd = command_prefix + self._build_cmd(variables, label_formatter)
job_id = _Dataflow(cmd).wait_for_done()
_DataflowJob(self.get_conn(), variables['project'], name,
variables['region'],
self.poll_sleep, job_id,
self.num_retries).wait_for_done()
variables['region'], self.poll_sleep, job_id, self.num_retries, multiple_jobs)\
.wait_for_done()

@staticmethod
def _set_variables(variables):
Expand All @@ -262,29 +318,34 @@ def _set_variables(variables):
variables['region'] = DEFAULT_DATAFLOW_LOCATION
return variables

def start_java_dataflow(self, job_name, variables, dataflow, job_class=None,
append_job_name=True):
def start_java_dataflow(self, job_name, variables, jar, job_class=None,
append_job_name=True, multiple_jobs=False):
"""
Starts Dataflow java job.

:param job_name: The name of the job.
:type job_name: str
:param variables: Variables passed to the job.
:type variables: dict
:param jar: Name of the jar for the job
:type job_class: str
:param job_class: Name of the java class for the job.
:type job_class: str
:param append_job_name: True if unique suffix has to be appended to job name.
:type append_job_name: bool
:param multiple_jobs: True if to check for multiple job in dataflow
:type multiple_jobs: bool
"""
name = self._build_dataflow_job_name(job_name, append_job_name)
variables['jobName'] = name

def label_formatter(labels_dict):
return ['--labels={}'.format(
json.dumps(labels_dict).replace(' ', ''))]
command_prefix = (["java", "-cp", dataflow, job_class] if job_class
else ["java", "-jar", dataflow])
self._start_dataflow(variables, name, command_prefix, label_formatter)

command_prefix = (["java", "-cp", jar, job_class] if job_class
else ["java", "-jar", jar])
self._start_dataflow(variables, name, command_prefix, label_formatter, multiple_jobs)

def start_template_dataflow(self, job_name, variables, parameters, dataflow_template,
append_job_name=True):
Expand Down Expand Up @@ -329,6 +390,7 @@ def start_python_dataflow(self, job_name, variables, dataflow, py_options,
def label_formatter(labels_dict):
return ['--labels={}={}'.format(key, value)
for key, value in labels_dict.items()]

self._start_dataflow(variables, name, ["python2"] + py_options + [dataflow],
label_formatter)

Expand Down Expand Up @@ -387,3 +449,15 @@ def _start_template_dataflow(self, name, variables, parameters,
_DataflowJob(self.get_conn(), variables['project'], name, variables['region'],
self.poll_sleep, num_retries=self.num_retries).wait_for_done()
return response

def is_job_dataflow_running(self, name, variables):
"""
Helper method to check if jos is still running in dataflow

:return: True if job is running.
:rtype: bool
"""
variables = self._set_variables(variables)
job = _DataflowJob(self.get_conn(), variables['project'], name,
variables['region'], self.poll_sleep)
return job.is_job_running()