New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Unify DAG creation/database cleaning fixtures for testing #3361
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking this on Kenneth! I think there are a few changes I'd like to see with this. A pytest fixture that exists in the highest level conftest.py
file and also has autouse=True
means that the fixture will be used in ever test that's run. Most of our tests don't even interact with the Airflow database, so this becomes quite excessive and unnecessary. Additionally, we don't need the extra fixtures for conditionally cleaning task instances and pools if we opt to 1) automatically define a task instance/pool name for each test and 2) always clean them up by selecting records that match those names (whether they exist or not doesn't matter).
I think one way to do this is to create some helper fixtures that generate a unique DAG ID and pool based on the test module. (I say module here instead of test name because we have the test multi-processing binned by module, see here and docs. Here's an example of what that might look like:
# in conftest.py
def _normalize_test_module_name(request) -> str:
# Extract the test name
name = request.module.__name__
# Replace periods with two underscores
return name.replace(".", "__")
@pytest.fixture
def get_test_dag_id(request):
return f"{_normalize_test_module_name(request)}_dag"
@pytest.fixture
def get_test_pool(request):
return f"{_normalize_test_module_name(request)}_pool"
@pytest.fixture
def clean_db(get_test_dag_id, get_test_pool):
with create_session() as session:
# synchronize_session='fetch' required here to refresh models
# https://stackoverflow.com/a/51222378 CC BY-SA 4.0
session.query(DagRun).filter(DagRun.dag_id == get_test_dag_id).delete()
session.query(TaskInstance).filter(
TaskInstance.dag_id.startswith(get_test_dag_id)
).delete(synchronize_session="fetch")
session.query(Pool).filter(id == get_test_pool).delete()
Tests that required a clean database, test DAG, test pool, etc, could then request them as fixtures in their function signature. For example, the test in test_ingestion_server.py
could be changed to:
@pytest.fixture()
def index_readiness_dag(get_test_dag_id, clean_db):
# Create a DAG that just has an index_readiness_check task
with DAG(dag_id=get_test_dag_id, schedule=None, start_date=TEST_START_DATE) as dag:
ingestion_server.index_readiness_check(
media_type="image", index_suffix="my_test_suffix", timeout=timedelta(days=1)
)
return dag
(note the usage of get_test_dag_id
and clean_db
when creating the fixture)
This might require a few more changes to the affected tests, but I think it's much better than adding the clean_db
step to every test!
Full diff I had locally for this test if you're interested
diff --git a/catalog/tests/conftest.py b/catalog/tests/conftest.py
index b5b0f0600..4cb33508b 100644
--- a/catalog/tests/conftest.py
+++ b/catalog/tests/conftest.py
@@ -28,35 +28,30 @@ def pytest_addoption(parser):
mark_extended = pytest.mark.skipif("not config.getoption('extended')")
-@pytest.fixture()
-def get_test_dag_id():
- return ""
+def _normalize_test_module_name(request) -> str:
+ # Extract the test name
+ name = request.module.__name__
+ # Replace periods with two underscores
+ return name.replace(".", "__")
-@pytest.fixture()
-def get_test_pool():
- return ""
+@pytest.fixture
+def get_test_dag_id(request):
+ return f"{_normalize_test_module_name(request)}_dag"
-@pytest.fixture()
-def isTaskInstance():
- return False
+@pytest.fixture
+def get_test_pool(request):
+ return f"{_normalize_test_module_name(request)}_pool"
-@pytest.fixture()
-def isPool():
- return False
-
-
-@pytest.fixture(autouse=True)
-def clean_db(get_test_dag_id, get_test_pool, isTaskInstance, isPool):
+@pytest.fixture
+def clean_db(get_test_dag_id, get_test_pool):
with create_session() as session:
# synchronize_session='fetch' required here to refresh models
# https://stackoverflow.com/a/51222378 CC BY-SA 4.0
session.query(DagRun).filter(DagRun.dag_id == get_test_dag_id).delete()
- if isTaskInstance:
- session.query(TaskInstance).filter(
- TaskInstance.dag_id == get_test_dag_id
- ).delete(synchronize_session="fetch")
- if isPool:
- session.query(Pool).filter(id == get_test_pool).delete()
+ session.query(TaskInstance).filter(
+ TaskInstance.dag_id == get_test_dag_id
+ ).delete(synchronize_session="fetch")
+ session.query(Pool).filter(id == get_test_pool).delete()
diff --git a/catalog/tests/dags/common/test_ingestion_server.py b/catalog/tests/dags/common/test_ingestion_server.py
index 88c2e3b0d..f2ee0a0f0 100644
--- a/catalog/tests/dags/common/test_ingestion_server.py
+++ b/catalog/tests/dags/common/test_ingestion_server.py
@@ -13,23 +13,12 @@ from common import ingestion_server
TEST_START_DATE = datetime(2022, 2, 1, 0, 0, 0)
-TEST_DAG_ID = "api_healthcheck_test_dag"
@pytest.fixture()
-def get_test_dag_id():
- return TEST_DAG_ID
-
-
-@pytest.fixture()
-def isTaskInstance():
- return True
-
-
-@pytest.fixture()
-def index_readiness_dag():
+def index_readiness_dag(get_test_dag_id, clean_db):
# Create a DAG that just has an index_readiness_check task
- with DAG(dag_id=TEST_DAG_ID, schedule=None, start_date=TEST_START_DATE) as dag:
+ with DAG(dag_id=get_test_dag_id, schedule=None, start_date=TEST_START_DATE) as dag:
ingestion_server.index_readiness_check(
media_type="image", index_suffix="my_test_suffix", timeout=timedelta(days=1)
)
@AetherUnbound unittest.TestCase only allow auto-use fixture(link) In |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks so much for reworking this, and sorry for the delay on review!
That's frustrating about the unittest.TestCase
scenario 😭 I'll make an issue for converting all our catalog tests from unittest
, I don't think there's a specific reason they need to be in that format (but that's not the purview of this particular PR!). Do you mind adding a "TODO" comment right above the test pool/test prefix constants in test_single_run_external_dags_sensor.py
noting that they can be removed and the fixtures can be used instead once this test is converted to pytest?
Also, I apologize for suggestion one thing then requesting another, but I'm realizing the get_*
prefix doesn't really fit our pytest fixtures standard. Would you mind renaming get_test_dag_id
to sample_dag_id_fixture
and get_test_pool
to sample_pool_fixture
?
Based on the low urgency of this PR, the following reviewers are being gently reminded to review this PR: @obulat Excluding weekend1 days, this PR was ready for review 11 day(s) ago. PRs labelled with low urgency are expected to be reviewed within 5 weekday(s)2. @ngken0995, if this PR is not ready for a review, please draft it to prevent reviewers from getting further unnecessary pings. Footnotes
|
(Drafting so our auto-ping ignores this for now, please feel free to undraft again once changes are made Kenneth!) |
31e217d
to
719f879
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fantastic, everything looks great! Thanks Kenneth!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great, thanks @ngken0995!
Fixes
Fixes #2622 by @AetherUnbound
Description
There will be one
clean_db()
in conftest.py file because tests from multiple test modules in the directory can access the fixture function.Pool
andTaskInstance
fromairflow.models
will have a if condition because not all tests runs a filter and deletion for them. There is a override method for fixture on a test module level(link) There are default valuesget_test_dag_id
,get_test_id
,isPool
,isTaskInstance
inconftest.py
. On each test involvingclean_db()
, there should be aget_test_dag_id()
fixture and options to runisPool
fixture andisTaskInstance
fixture by returning TrueTesting Instructions
just down -v
just catalog/up
just catalog/init
just catalog/test
Checklist
Update index.md
).main
) or a parent feature branch.Developer Certificate of Origin
Developer Certificate of Origin