[AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …#7585
[AIRFLOW-6944] Allow AWS DataSync to "catch up" when Task is already …#7585baolsen wants to merge 1 commit intoapache:masterfrom baolsen:aws_datasync_task_statuses_20200226
Conversation
Codecov Report
@@ Coverage Diff @@
## master #7585 +/- ##
==========================================
- Coverage 86.85% 86.52% -0.33%
==========================================
Files 896 896
Lines 42622 42657 +35
==========================================
- Hits 37021 36911 -110
- Misses 5601 5746 +145
Continue to review full report at Codecov.
|
|
Hi @feluelle / @nuclearpinguin , would appreciate a review :) One thing I am aware of - currently there are 2 wait loops:
What is your recommendation to standardise these? |
| def _execute_datasync_task(self): | ||
| """Create and monitor an AWSDataSync TaskExecution for a Task.""" | ||
| hook = self.get_hook() | ||
| def _wait_get_status_before_start( |
There was a problem hiding this comment.
| def _wait_get_status_before_start( | |
| def _wait_for_status( |
I would say that we should not wait for task to be able to start. If something has to happen before the task we should use sensor or wait for the completion in the upstream operator. For example in case of |
| self.log.info( | ||
| 'The Task will be started because its status is in %s.', | ||
| self.TASK_STATUS_START) | ||
| # Start the DataSync Task |
There was a problem hiding this comment.
| # Start the DataSync Task |
| raise AirflowException( | ||
| 'Task cannot be started because its status is in %s.' | ||
| % self.TASK_STATUS_FAIL | ||
| ) |
There was a problem hiding this comment.
| raise AirflowException( | |
| 'Task cannot be started because its status is in %s.' | |
| % self.TASK_STATUS_FAIL | |
| ) | |
| raise AirflowException( | |
| f'Task cannot be started because its status is in {self.TASK_STATUS_FAIL}.' | |
| ) |
Please use f-strings :)
| % self.TASK_STATUS_FAIL | ||
| ) | ||
| else: | ||
| raise AirflowException('Unexpected task status %s.' % self.task_status) |
There was a problem hiding this comment.
| raise AirflowException('Unexpected task status %s.' % self.task_status) | |
| raise AirflowException(f'Unexpected task status {self.task_status}') |
| hook = self.get_hook() | ||
| task_status = hook.get_task_description(self.task_arn)['Status'] | ||
| iteration = 0 | ||
| while task_status in self.TASK_STATUS_WAIT_BEFORE_START: | ||
| self.log.info( | ||
| 'Task status is %s.' | ||
| ' Waiting for it to not be %s.' | ||
| ' Iteration %s/%s.', | ||
| task_status, | ||
| self.TASK_STATUS_WAIT_BEFORE_START, | ||
| iteration, | ||
| max_iterations) | ||
| time.sleep(self.wait_interval_seconds) | ||
| task_status = hook.get_task_description(self.task_arn)['Status'] | ||
| iteration = iteration + 1 | ||
| if iteration >= max_iterations: | ||
| break | ||
|
|
||
| return task_status |
There was a problem hiding this comment.
| hook = self.get_hook() | |
| task_status = hook.get_task_description(self.task_arn)['Status'] | |
| iteration = 0 | |
| while task_status in self.TASK_STATUS_WAIT_BEFORE_START: | |
| self.log.info( | |
| 'Task status is %s.' | |
| ' Waiting for it to not be %s.' | |
| ' Iteration %s/%s.', | |
| task_status, | |
| self.TASK_STATUS_WAIT_BEFORE_START, | |
| iteration, | |
| max_iterations) | |
| time.sleep(self.wait_interval_seconds) | |
| task_status = hook.get_task_description(self.task_arn)['Status'] | |
| iteration = iteration + 1 | |
| if iteration >= max_iterations: | |
| break | |
| return task_status | |
| hook = self.get_hook() | |
| for iteration in range(max_iterations): | |
| task_status = hook.get_task_description(self.task_arn)['Status'] | |
| self.log.info( | |
| 'Task status is %s.' | |
| ' Waiting for it to not be %s.' | |
| ' Iteration %s/%s.', | |
| task_status, | |
| self.TASK_STATUS_WAIT_BEFORE_START, | |
| iteration, | |
| max_iterations) | |
| if task_status not in self.TASK_STATUS_WAIT_BEFORE_START: | |
| break | |
| time.sleep(self.wait_interval_seconds) | |
| return task_status |
WDYT?
| hook = self.get_hook() | ||
| def _wait_get_status_before_start( | ||
| self, | ||
| max_iterations=12 * 180): # wait_interval_seconds*12*180=180 minutes by default |
There was a problem hiding this comment.
| max_iterations=12 * 180): # wait_interval_seconds*12*180=180 minutes by default | |
| max_iterations : int = 12 * 180) -> str: |
You already have that comment in the docs below. Even better it would be to add :param and add the comment there.
| The Task can be started when its Status is not in TASK_STATUS_WAIT_BEFORE_START | ||
| Uses wait_interval_seconds (which is also used while waiting for TaskExecution) | ||
| So, max_iterations=12*180 gives 180 minutes wait by default. | ||
| """ |
There was a problem hiding this comment.
Please also add :returns: here.
I think both options are useful to have. To #7410 (Tableau Integration) I added a |
|
@baolsen gentle ping :) |
|
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions. |
…running
Issue link: AIRFLOW-6944
Make sure to mark the boxes below before creating PR: [x]
[AIRFLOW-NNNN]. AIRFLOW-NNNN = JIRA ID** For document-only changes commit message can start with
[AIRFLOW-XXXX].In case of fundamental code change, Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in UPDATING.md.
Read the Pull Request Guidelines for more information.