Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 15 additions & 19 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -930,6 +930,13 @@ def check_and_change_state_before_execution(
self.log.info("Executing %s on %s", self.task, self.execution_date)
return True

def _date_or_empty(self, attr):
if hasattr(self, attr):
date = getattr(self, attr)
if date:
return date.strftime('%Y%m%dT%H%M%S')
return ''

@provide_session
@Sentry.enrich_errors
def _run_raw_task(
Expand Down Expand Up @@ -1014,7 +1021,6 @@ def signal_handler(signum, frame):

# If a timeout is specified for the task, make it fail
# if it goes beyond
result = None
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was an unused argument, so I decided to remove it

if task_copy.execution_timeout:
try:
with timeout(int(
Expand Down Expand Up @@ -1057,15 +1063,10 @@ def signal_handler(signum, frame):
'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s',
self.dag_id,
self.task_id,
self.execution_date.strftime('%Y%m%dT%H%M%S') if hasattr(
self,
'execution_date') and self.execution_date else '',
self.start_date.strftime('%Y%m%dT%H%M%S') if hasattr(
self,
'start_date') and self.start_date else '',
self.end_date.strftime('%Y%m%dT%H%M%S') if hasattr(
self,
'end_date') and self.end_date else '')
self._date_or_empty('execution_date'),
self._date_or_empty('start_date'),
self._date_or_empty('end_date'),
)
except AirflowRescheduleException as reschedule_exception:
self.refresh_from_db()
self._handle_reschedule(actual_start_date, reschedule_exception, test_mode, context)
Expand Down Expand Up @@ -1104,15 +1105,10 @@ def signal_handler(signum, frame):
'dag_id=%s, task_id=%s, execution_date=%s, start_date=%s, end_date=%s',
self.dag_id,
self.task_id,
self.execution_date.strftime('%Y%m%dT%H%M%S') if hasattr(
self,
'execution_date') and self.execution_date else '',
self.start_date.strftime('%Y%m%dT%H%M%S') if hasattr(
self,
'start_date') and self.start_date else '',
self.end_date.strftime('%Y%m%dT%H%M%S') if hasattr(
self,
'end_date') and self.end_date else '')
self._date_or_empty('execution_date'),
self._date_or_empty('start_date'),
self._date_or_empty('end_date')
)
self.set_duration()
if not test_mode:
session.add(Log(self.state, self))
Expand Down