Skip to content

Commit

Permalink
Add map index to task run
Browse files Browse the repository at this point in the history
  • Loading branch information
keegansmith21 committed Mar 13, 2024
1 parent 6d4e7b6 commit 552f44c
Showing 1 changed file with 4 additions and 3 deletions.
7 changes: 4 additions & 3 deletions observatory_platform/sandbox/sandbox_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,11 @@ def add_connection(self, conn: Connection):
self.session.add(conn)
self.session.commit()

def run_task(self, task_id: str) -> TaskInstance:
def run_task(self, task_id: str, map_index: int = -1) -> TaskInstance:
"""Run an Airflow task.
:param task_id: the Airflow task identifier.
:param map_index: the map index if the task is a daynamic task
:return: None.
"""

Expand All @@ -289,9 +290,9 @@ def run_task(self, task_id: str) -> TaskInstance:
dag = self.dag_run.dag
run_id = self.dag_run.run_id
task = dag.get_task(task_id=task_id)
ti = TaskInstance(task, run_id=run_id)
ti = TaskInstance(task, run_id=run_id, map_index=map_index)
ti.refresh_from_db()
ti.run(ignore_ti_state=True)
ti.run(ignore_ti_state=True, ignore_all_deps=True)

return ti

Expand Down

0 comments on commit 552f44c

Please sign in to comment.