Skip to content

Commit

Permalink
Add subdir parameter to dags reserialize command (#26170)
Browse files Browse the repository at this point in the history
  • Loading branch information
mik-laj committed Sep 6, 2022
1 parent f878854 commit 4be5616
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 3 deletions.
5 changes: 4 additions & 1 deletion airflow/cli/cli_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -1189,7 +1189,10 @@ class GroupCommand(NamedTuple):
"version of Airflow that you are running."
),
func=lazy_load_command('airflow.cli.commands.dag_command.dag_reserialize'),
args=(ARG_CLEAR_ONLY,),
args=(
ARG_CLEAR_ONLY,
ARG_SUBDIR,
),
),
)
TASKS_COMMANDS = (
Expand Down
3 changes: 1 addition & 2 deletions airflow/cli/commands/dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,5 @@ def dag_reserialize(args, session: Session = NEW_SESSION):
session.query(SerializedDagModel).delete(synchronize_session=False)

if not args.clear_only:
dagbag = DagBag()
dagbag.collect_dags(only_if_updated=False, safe_mode=False)
dagbag = DagBag(process_subdir(args.subdir))
dagbag.sync_to_db(session=session)
24 changes: 24 additions & 0 deletions tests/cli/commands/test_dag_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,30 @@ def test_reserialize(self):
serialized_dags_after_reserialize = session.query(SerializedDagModel).all()
assert len(serialized_dags_after_reserialize) >= 40 # Serialized DAGs back

def test_reserialize_should_support_subdir_argument(self):
# Run clear of serialized dags
dag_command.dag_reserialize(self.parser.parse_args(['dags', 'reserialize', "--clear-only"]))

# Assert no serialized Dags
with create_session() as session:
serialized_dags_after_clear = session.query(SerializedDagModel).all()
assert len(serialized_dags_after_clear) == 0

# Serialize manually
dag_path = self.dagbag.dags['example_bash_operator'].fileloc
# Set default value of include_examples parameter to false
dagbag_default = list(DagBag.__init__.__defaults__)
dagbag_default[1] = False
with mock.patch(
'airflow.cli.commands.dag_command.DagBag.__init__.__defaults__', tuple(dagbag_default)
):
dag_command.dag_reserialize(self.parser.parse_args(['dags', 'reserialize', '--subdir', dag_path]))

# Check serialized DAG are back
with create_session() as session:
serialized_dags_after_reserialize = session.query(SerializedDagModel).all()
assert len(serialized_dags_after_reserialize) == 1 # Serialized DAG back

@mock.patch("airflow.cli.commands.dag_command.DAG.run")
def test_backfill(self, mock_run):
dag_command.dag_backfill(
Expand Down
1 change: 1 addition & 0 deletions tests/cli/commands/test_task_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ def setup_class(cls):
clear_db_runs()

cls.dag = cls.dagbag.get_dag(cls.dag_id)
cls.dagbag.sync_to_db()
cls.dag_run = cls.dag.create_dagrun(
state=State.NONE, run_id=cls.run_id, run_type=DagRunType.MANUAL, execution_date=DEFAULT_DATE
)
Expand Down

0 comments on commit 4be5616

Please sign in to comment.