Skip to content

Commit

Permalink
Add tests for dataset_expression
Browse files Browse the repository at this point in the history
  • Loading branch information
sunank200 committed Mar 7, 2024
1 parent e00d9bc commit d19aa74
Show file tree
Hide file tree
Showing 3 changed files with 201 additions and 0 deletions.
107 changes: 107 additions & 0 deletions tests/api_connexion/endpoints/test_dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,36 @@ def _create_dag_model_for_details_endpoint(self, dag_id, session=None):
)
session.add(dag_model)

@provide_session
def _create_dag_model_for_details_endpoint_with_dataset_expression(self, dag_id, session=None):
dag_model = DagModel(
dag_id=dag_id,
fileloc="/tmp/dag.py",
schedule_interval="2 2 * * *",
is_active=True,
is_paused=False,
dataset_expression={
"__type": "dataset_any",
"__var": [
{"__type": "dataset", "__var": {"extra": {"hi": "bye"}, "uri": "s3://dag1/output_1.txt"}},
{
"__type": "dataset_all",
"__var": [
{
"__type": "dataset",
"__var": {"extra": {"hi": "bye"}, "uri": "s3://dag2/output_1.txt"},
},
{
"__type": "dataset",
"__var": {"extra": {"hi": "bye"}, "uri": "s3://dag3/output_3.txt"},
},
],
},
],
},
)
session.add(dag_model)

@provide_session
def _create_deactivated_dag(self, session=None):
dag_model = DagModel(
Expand Down Expand Up @@ -356,6 +386,83 @@ def test_should_respond_200(self, url_safe_serializer):
}
assert response.json == expected

def test_should_respond_200_with_dataset_expression(self, url_safe_serializer):
self._create_dag_model_for_details_endpoint_with_dataset_expression(self.dag_id)
current_file_token = url_safe_serializer.dumps("/tmp/dag.py")
response = self.client.get(
f"/api/v1/dags/{self.dag_id}/details", environ_overrides={"REMOTE_USER": "test"}
)
assert response.status_code == 200
last_parsed = response.json["last_parsed"]
expected = {
"catchup": True,
"concurrency": 16,
"dag_id": "test_dag",
"dag_run_timeout": None,
"dataset_expression": {
"__type": "dataset_any",
"__var": [
{"__type": "dataset", "__var": {"extra": {"hi": "bye"}, "uri": "s3://dag1/output_1.txt"}},
{
"__type": "dataset_all",
"__var": [
{
"__type": "dataset",
"__var": {"extra": {"hi": "bye"}, "uri": "s3://dag2/output_1.txt"},
},
{
"__type": "dataset",
"__var": {"extra": {"hi": "bye"}, "uri": "s3://dag3/output_3.txt"},
},
],
},
],
},
"default_view": None,
"description": None,
"doc_md": "details",
"end_date": None,
"fileloc": "/tmp/dag.py",
"file_token": current_file_token,
"has_import_errors": False,
"has_task_concurrency_limits": True,
"is_active": True,
"is_paused": False,
"is_paused_upon_creation": None,
"is_subdag": False,
"last_expired": None,
"last_parsed": last_parsed,
"last_parsed_time": None,
"last_pickled": None,
"max_active_runs": 16,
"max_active_tasks": 16,
"next_dagrun": None,
"next_dagrun_create_after": None,
"next_dagrun_data_interval_end": None,
"next_dagrun_data_interval_start": None,
"orientation": "LR",
"owners": [],
"params": {
"foo": {
"__class": "airflow.models.param.Param",
"description": None,
"schema": {},
"value": 1,
}
},
"pickle_id": None,
"render_template_as_native_obj": False,
"root_dag_id": None,
"schedule_interval": {"__type": "CronExpression", "value": "2 2 * * *"},
"scheduler_lock": None,
"start_date": "2020-06-15T00:00:00+00:00",
"tags": [],
"template_searchpath": None,
"timetable_description": None,
"timezone": UTC_JSON_REPR,
}
assert response.json == expected

def test_should_response_200_with_doc_md_none(self, url_safe_serializer):
current_file_token = url_safe_serializer.dumps("/tmp/dag.py")
self._create_dag_model_for_details_endpoint(self.dag2_id)
Expand Down
57 changes: 57 additions & 0 deletions tests/api_connexion/schemas/test_dag_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
DAGDetailSchema,
DAGSchema,
)
from airflow.datasets import Dataset
from airflow.models import DagModel, DagTag
from airflow.models.dag import DAG

Expand Down Expand Up @@ -197,3 +198,59 @@ def test_serialize_test_dag_detail_schema(url_safe_serializer):
obj = schema.dump(dag)
expected.update({"last_parsed": obj["last_parsed"]})
assert obj == expected


@pytest.mark.db_test
def test_serialize_test_dag_with_dataset_schedule_detail_schema(url_safe_serializer):
dataset1 = Dataset(uri="s3://bucket/obj1")
dataset2 = Dataset(uri="s3://bucket/obj2")
dag = DAG(
dag_id="test_dag",
start_date=datetime(2020, 6, 19),
doc_md="docs",
orientation="LR",
default_view="duration",
params={"foo": 1},
schedule=dataset1 & dataset2,
tags=["example1", "example2"],
)
schema = DAGDetailSchema()

expected = {
"catchup": True,
"concurrency": 16,
"max_active_tasks": 16,
"dag_id": "test_dag",
"dag_run_timeout": None,
"default_view": "duration",
"description": None,
"doc_md": "docs",
"fileloc": __file__,
"file_token": url_safe_serializer.dumps(__file__),
"is_active": None,
"is_paused": None,
"is_subdag": False,
"orientation": "LR",
"owners": [],
"params": {
"foo": {
"__class": "airflow.models.param.Param",
"value": 1,
"description": None,
"schema": {},
}
},
"schedule_interval": {"__type": "CronExpression", "value": "Dataset"},
"start_date": "2020-06-19T00:00:00+00:00",
"tags": [{"name": "example1"}, {"name": "example2"}],
"template_searchpath": None,
"timezone": UTC_JSON_REPR,
"max_active_runs": 16,
"pickle_id": None,
"end_date": None,
"is_paused_upon_creation": None,
"render_template_as_native_obj": False,
}
obj = schema.dump(dag)
expected.update({"last_parsed": obj["last_parsed"]})
assert obj == expected
37 changes: 37 additions & 0 deletions tests/models/test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2878,6 +2878,43 @@ def test_dags_needing_dagruns_dataset_triggered_dag_info_queued_times(self, sess
assert first_queued_time == DEFAULT_DATE
assert last_queued_time == DEFAULT_DATE + timedelta(hours=1)

def test_dataset_expression(self, session):
dataset_expr = {
"__type": "dataset_any",
"__var": [
{"__type": "dataset", "__var": {"extra": {"hi": "bye"}, "uri": "s3://dag1/output_1.txt"}},
{
"__type": "dataset_all",
"__var": [
{
"__type": "dataset",
"__var": {"extra": {"hi": "bye"}, "uri": "s3://dag2/output_1.txt"},
},
{
"__type": "dataset",
"__var": {"extra": {"hi": "bye"}, "uri": "s3://dag3/output_3.txt"},
},
],
},
],
}
dag_id = "test_dag_dataset_expression"
orm_dag = DagModel(
dag_id=dag_id,
dataset_expression=dataset_expr,
is_active=True,
is_paused=False,
next_dagrun=timezone.utcnow(),
next_dagrun_create_after=timezone.utcnow() + timedelta(days=1),
)
session.add(orm_dag)
session.commit()
retrieved_dag = session.query(DagModel).filter(DagModel.dag_id == dag_id).one()
assert retrieved_dag.dataset_expression == dataset_expr

session.rollback()
session.close()


class TestQueries:
def setup_method(self) -> None:
Expand Down

0 comments on commit d19aa74

Please sign in to comment.