Skip to content

Commit

Permalink
updated the composer dag (#229)
Browse files Browse the repository at this point in the history
* updated the composer dag

* added error branch into the composer dag
  • Loading branch information
shourya116 committed Mar 10, 2023
1 parent 1916bfc commit cab50cd
Showing 1 changed file with 46 additions and 5 deletions.
Expand Up @@ -95,7 +95,7 @@ def get_session_headers() -> dict:
"""
This method is to get the session and headers object for authenticating the api requests using credentials.
Args:
Returns: tuple
Returns: dict
"""
# getting the credentials and project details for gcp project
credentials, your_project_id = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
Expand Down Expand Up @@ -156,6 +156,22 @@ def _get_dataplex_job_state() -> str:
print(f"CloudDQ task status is {task_status}")
return task_status

def _get_dataplex_task() -> str:
"""
This method will return the status for the task.
Args:
Returns: str
"""
headers = get_session_headers()
res = requests.get(
f"{DATAPLEX_ENDPOINT}/v1/projects/{DATAPLEX_PROJECT_ID}/locations/{DATAPLEX_REGION}/lakes/{DATAPLEX_LAKE_ID}/tasks/{DATAPLEX_TASK_ID}",
headers=headers)
if res.status_code == 404:
return "task_not_exist"
elif res.status_code == 200:
return "task_exist"
else:
return "ERROR"

with models.DAG(
'clouddq_airflow_example',
Expand All @@ -171,8 +187,30 @@ def _get_dataplex_job_state() -> str:
dag=dag,
)

# this will check for the existing dataplex task
get_dataplex_task = BranchPythonOperator(
task_id="get_dataplex_task",
python_callable=_get_dataplex_task,
provide_context=True
)

dataplex_task_exists = BashOperator(
task_id="task_exist",
bash_command="echo 'Task Already Exists'",
dag=dag,
)
dataplex_task_not_exists = BashOperator(
task_id="task_not_exist",
bash_command="echo 'Task not Present'",
dag=dag,
)
dataplex_task_error = BashOperator(
task_id="ERROR",
bash_command="echo 'Error in fetching dataplex task details'",
dag=dag,
)

# this will delete the existing dataplex task with the given task_id
# for first run this should be commented.
delete_dataplex_task = DataplexDeleteTaskOperator(
project_id=DATAPLEX_PROJECT_ID,
region=DATAPLEX_REGION,
Expand All @@ -189,6 +227,7 @@ def _get_dataplex_job_state() -> str:
body=EXAMPLE_TASK_BODY,
dataplex_task_id=DATAPLEX_TASK_ID,
task_id="create_dataplex_task",
trigger_rule="none_failed_min_one_success",
)

# this will get the status of dataplex task job
Expand All @@ -210,8 +249,10 @@ def _get_dataplex_job_state() -> str:
dag=dag,
)

start_op >> delete_dataplex_task # for first run this need to be commented
delete_dataplex_task >> create_dataplex_task # for first run this need to be commented
# start_op>>create_dataplex_task # this need to be uncommented for first run
start_op >> get_dataplex_task
get_dataplex_task >> [dataplex_task_exists, dataplex_task_not_exists, dataplex_task_error]
dataplex_task_exists >> delete_dataplex_task
delete_dataplex_task >> create_dataplex_task
dataplex_task_not_exists >> create_dataplex_task
create_dataplex_task >> dataplex_task_state
dataplex_task_state >> [dataplex_task_success, dataplex_task_failed]

0 comments on commit cab50cd

Please sign in to comment.