Skip to content
Permalink
Browse files

AIRFLOW-3791: Dataflow - Support check status if pipeline spans on mu…

…ltiple jobs (#4633)

* AIRFLOW-3791: Dataflow
Support to check if job is already running before starting java job
In case dataflow creates more than one job, we need to track all jobs for status

* AIRFLOW-3791: Dataflow
Support to check if job is already running before starting java job
In case dataflow creates more than one job, we need to track all jobs for status

* Update airflow/contrib/hooks/gcp_dataflow_hook.py

Co-Authored-By: Fokko Driesprong <fokko@driesprong.frl>

* Update airflow/contrib/hooks/gcp_dataflow_hook.py

Co-Authored-By: Fokko Driesprong <fokko@driesprong.frl>

* Update gcp_dataflow_hook.py

* Update dataflow_operator.py

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow
change default for check if running

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow
merge redundant code of _get_job_id_from_name

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow
merge redundant code of _get_job_id_from_name

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow
merge redundant code of _get_job_id_from_name

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow
merge redundant code of _get_job_id_from_name

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow
merge redundant code of _get_job_id_from_name

* Merge branch 'AIRFLOW-3791_Dataflow' of github.com:chaimt/airflow into AIRFLOW-3791_Dataflow
merge redundant code of _get_job_id_from_name
  • Loading branch information...
chaimt authored and Fokko committed Jul 19, 2019
1 parent 30defe1 commit 1598b0ae8abc9918a55905dd7bbbc041f6c6692f
@@ -47,58 +47,131 @@ class DataflowJobStatus:
JOB_STATE_FAILED = "JOB_STATE_FAILED"
JOB_STATE_CANCELLED = "JOB_STATE_CANCELLED"
JOB_STATE_PENDING = "JOB_STATE_PENDING"
FAILED_END_STATES = {JOB_STATE_FAILED, JOB_STATE_CANCELLED}
SUCCEEDED_END_STATES = {JOB_STATE_DONE}
END_STATES = SUCCEEDED_END_STATES | FAILED_END_STATES


class _DataflowJob(LoggingMixin):
def __init__(self, dataflow, project_number, name, location, poll_sleep=10,
job_id=None, num_retries=None):
job_id=None, num_retries=None, multiple_jobs=None):
self._dataflow = dataflow
self._project_number = project_number
self._job_name = name
self._job_location = location
self._multiple_jobs = multiple_jobs
self._job_id = job_id
self._num_retries = num_retries
self._job = self._get_job()
if self._num_retries is None:
self._num_retries = 0
self._poll_sleep = poll_sleep
self._jobs = self._get_jobs()

def _get_job_id_from_name(self):
jobs = self._dataflow.projects().locations().jobs().list(
projectId=self._project_number,
location=self._job_location
).execute(num_retries=self._num_retries)
for job in jobs['jobs']:
if job['name'].lower() == self._job_name.lower():
self._job_id = job['id']
return job
return None
def is_job_running(self):
"""
Helper method to check if jos is still running in dataflow
:return: True if job is running.
:rtype: bool
"""
for job in self._jobs:
if job['currentState'] not in DataflowJobStatus.END_STATES:
return True
return False

def _get_job(self):
if self._job_id:
job = self._dataflow.projects().locations().jobs().get(
# pylint: disable=too-many-nested-blocks
def _get_dataflow_jobs(self):
"""
Helper method to get list of jobs that start with job name or id
:return: list of jobs including id's
:rtype: list
"""
if not self._multiple_jobs and self._job_id:
return self._dataflow.projects().locations().jobs().get(
projectId=self._project_number,
location=self._job_location,
jobId=self._job_id).execute(num_retries=self._num_retries)
elif self._job_name:
job = self._get_job_id_from_name()
jobs = self._dataflow.projects().locations().jobs().list(
projectId=self._project_number,
location=self._job_location
).execute(num_retries=self._num_retries)
dataflow_jobs = []
if jobs:
for job in jobs['jobs']:
if job['name'].startswith(self._job_name.lower()):
dataflow_jobs.append(job)
if len(dataflow_jobs) == 1:
self._job_id = dataflow_jobs[0]['id']
return dataflow_jobs
else:
raise Exception('Missing both dataflow job ID and name.')

if job and 'currentState' in job:
self.log.info(
'Google Cloud DataFlow job %s is %s',
job['name'], job['currentState']
)
elif job:
self.log.info(
'Google Cloud DataFlow with job_id %s has name %s',
self._job_id, job['name']
)
else:
self.log.info(
'Google Cloud DataFlow job not available yet..'
)
def _get_jobs(self):
"""
Helper method to get all jobs by name
:return: jobs
:rtype: list
"""
self._jobs = self._get_dataflow_jobs()

for job in self._jobs:
if job and 'currentState' in job:
self._job_state = job['currentState']
self.log.info(
'Google Cloud DataFlow job %s is %s',
job['name'], job['currentState']
)
elif job:
self.log.info(
'Google Cloud DataFlow with job_id %s has name %s',
self._job_id, job['name']
)
else:
self.log.info(
'Google Cloud DataFlow job not available yet..'
)

return job
return self._jobs

# pylint: disable=too-many-nested-blocks
def check_dataflow_job_state(self, job):
"""
Helper method to check the state of all jobs in dataflow for this task
if job failed raise exception
:return: True if job is done.
:rtype: bool
:raise: Exception
"""
if DataflowJobStatus.JOB_STATE_DONE == job['currentState']:
# check all jobs are done
count_not_done = 0
for inner_jobs in self._jobs:
if inner_jobs and 'currentState' in job:
if not DataflowJobStatus.JOB_STATE_DONE == inner_jobs['currentState']:
count_not_done += 1
if count_not_done == 0:
return True
elif DataflowJobStatus.JOB_STATE_FAILED == job['currentState']:
raise Exception("Google Cloud Dataflow job {} has failed.".format(
job['name']))
elif DataflowJobStatus.JOB_STATE_CANCELLED == job['currentState']:
raise Exception("Google Cloud Dataflow job {} was cancelled.".format(
job['name']))
elif DataflowJobStatus.JOB_STATE_RUNNING == job['currentState'] and \
DataflowJobStatus.JOB_TYPE_STREAMING == job['type']:
return True
elif job['currentState'] in {DataflowJobStatus.JOB_STATE_RUNNING,
DataflowJobStatus.JOB_STATE_PENDING}:
time.sleep(self._poll_sleep)
else:
self.log.debug(str(job))
raise Exception(
"Google Cloud Dataflow job {} was unknown state: {}".format(
job['name'], job['currentState']))
return False

def wait_for_done(self):
"""
@@ -109,37 +182,21 @@ def wait_for_done(self):
:raise: Exception
"""
while True:
if self._job and 'currentState' in self._job:
if self._job['currentState'] == DataflowJobStatus.JOB_STATE_DONE:
return True
elif self._job['currentState'] == DataflowJobStatus.JOB_STATE_RUNNING and \
self._job['type'] == DataflowJobStatus.JOB_TYPE_STREAMING:
return True
elif self._job['currentState'] == DataflowJobStatus.JOB_STATE_FAILED:
raise Exception("Google Cloud Dataflow job {} has failed.".format(
self._job['name']))
elif self._job['currentState'] == DataflowJobStatus.JOB_STATE_CANCELLED:
raise Exception("Google Cloud Dataflow job {} was cancelled.".format(
self._job['name']))
elif self._job['currentState'] == DataflowJobStatus.JOB_STATE_RUNNING:
time.sleep(self._poll_sleep)
elif self._job['currentState'] == DataflowJobStatus.JOB_STATE_PENDING:
time.sleep(15)
for job in self._jobs:
if job and 'currentState' in job:
if self.check_dataflow_job_state(job):
return True
else:
self.log.debug(str(self._job))
raise Exception(
"Google Cloud Dataflow job {} was unknown state: {}".format(
self._job['name'], self._job['currentState']))
else:
time.sleep(15)

self._job = self._get_job()
time.sleep(self._poll_sleep)
self._jobs = self._get_jobs()

def get(self):
"""
Returns Dataflow job.
:return: list of jobs
:rtype: list
"""
return self._job
return self._jobs


class _Dataflow(LoggingMixin):
@@ -183,14 +240,13 @@ def _extract_job(line):
matched_job = job_id_pattern.search(line or '')
if matched_job:
return matched_job.group(1).decode()

return None

def wait_for_done(self):
"""
Waits for Dataflow job to complete.
:return: Job id.
:return: Job id
:rtype: str
"""
reads = [self._proc.stderr.fileno(), self._proc.stdout.fileno()]
@@ -228,6 +284,7 @@ class DataFlowHook(GoogleCloudBaseHook):
All the methods in the hook where project_id is used must be called with
keyword arguments rather than positional.
"""

def __init__(self,
gcp_conn_id='google_cloud_default',
delegate_to=None,
@@ -245,14 +302,13 @@ def get_conn(self):
'dataflow', 'v1b3', http=http_authorized, cache_discovery=False)

@GoogleCloudBaseHook._Decorators.provide_gcp_credential_file
def _start_dataflow(self, variables, name, command_prefix, label_formatter):
def _start_dataflow(self, variables, name, command_prefix, label_formatter, multiple_jobs=False):
variables = self._set_variables(variables)
cmd = command_prefix + self._build_cmd(variables, label_formatter)
job_id = _Dataflow(cmd).wait_for_done()
_DataflowJob(self.get_conn(), variables['project'], name,
variables['region'],
self.poll_sleep, job_id,
self.num_retries).wait_for_done()
variables['region'], self.poll_sleep, job_id, self.num_retries, multiple_jobs) \
.wait_for_done()

@staticmethod
def _set_variables(variables):
@@ -262,29 +318,34 @@ def _set_variables(variables):
variables['region'] = DEFAULT_DATAFLOW_LOCATION
return variables

def start_java_dataflow(self, job_name, variables, dataflow, job_class=None,
append_job_name=True):
def start_java_dataflow(self, job_name, variables, jar, job_class=None,
append_job_name=True, multiple_jobs=False):
"""
Starts Dataflow java job.
:param job_name: The name of the job.
:type job_name: str
:param variables: Variables passed to the job.
:type variables: dict
:param jar: Name of the jar for the job
:type job_class: str
:param job_class: Name of the java class for the job.
:type job_class: str
:param append_job_name: True if unique suffix has to be appended to job name.
:type append_job_name: bool
:param multiple_jobs: True if to check for multiple job in dataflow
:type multiple_jobs: bool
"""
name = self._build_dataflow_job_name(job_name, append_job_name)
variables['jobName'] = name

def label_formatter(labels_dict):
return ['--labels={}'.format(
json.dumps(labels_dict).replace(' ', ''))]
command_prefix = (["java", "-cp", dataflow, job_class] if job_class
else ["java", "-jar", dataflow])
self._start_dataflow(variables, name, command_prefix, label_formatter)

command_prefix = (["java", "-cp", jar, job_class] if job_class
else ["java", "-jar", jar])
self._start_dataflow(variables, name, command_prefix, label_formatter, multiple_jobs)

def start_template_dataflow(self, job_name, variables, parameters, dataflow_template,
append_job_name=True):
@@ -329,6 +390,7 @@ def start_python_dataflow(self, job_name, variables, dataflow, py_options,
def label_formatter(labels_dict):
return ['--labels={}={}'.format(key, value)
for key, value in labels_dict.items()]

self._start_dataflow(variables, name, ["python2"] + py_options + [dataflow],
label_formatter)

@@ -387,3 +449,15 @@ def _start_template_dataflow(self, name, variables, parameters,
_DataflowJob(self.get_conn(), variables['project'], name, variables['region'],
self.poll_sleep, num_retries=self.num_retries).wait_for_done()
return response

def is_job_dataflow_running(self, name, variables):
"""
Helper method to check if jos is still running in dataflow
:return: True if job is running.
:rtype: bool
"""
variables = self._set_variables(variables)
job = _DataflowJob(self.get_conn(), variables['project'], name,
variables['region'], self.poll_sleep)
return job.is_job_running()

0 comments on commit 1598b0a

Please sign in to comment.
You can’t perform that action at this time.