Skip to content

Commit

Permalink
test(api_connextion): add more test cases to queue event
Browse files Browse the repository at this point in the history
  • Loading branch information
Lee-W committed Feb 7, 2024
1 parent a9ae4c2 commit 32642c5
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 6 deletions.
4 changes: 2 additions & 2 deletions airflow/api_connexion/endpoints/dataset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def _build_get_dataset_ddrqs_where_clause(uri: str, session: Session, before: st
def get_dataset_queue_events(
*, uri: str, before: str | None = None, session: Session = NEW_SESSION
) -> APIResponse:
"""Get queued Dataset events for a Dataset"""
"""Get queued Dataset events for a Dataset."""
where_clauses = _build_get_dataset_ddrqs_where_clause(uri=uri, session=session, before=before)
ddrqs = None
if where_clauses:
Expand All @@ -263,7 +263,7 @@ def get_dataset_queue_events(
def delete_dataset_queue_events(
*, uri: str, before: str | None = None, session: Session = NEW_SESSION
) -> APIResponse:
"""Delete queued Dataset events for a Dataset"""
"""Delete queued Dataset events for a Dataset."""
where_clauses = _build_get_dataset_ddrqs_where_clause(uri=uri, session=session, before=before)
if where_clauses:
delete_stmt = delete(DatasetDagRunQueue).where(*where_clauses)
Expand Down
112 changes: 108 additions & 4 deletions tests/api_connexion/endpoints/test_dataset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
from __future__ import annotations

import urllib
from typing import Generator

import pytest
import time_machine

from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
from airflow.models.dagrun import DagRun
Expand Down Expand Up @@ -611,7 +613,44 @@ def test_should_return_conf_max_if_req_max_above_conf(self, session):
assert len(response.json["dataset_events"]) == 150


class TestGetDagDatasetQueueEvent(TestDatasetEndpoint):
class TestQueueEventEndpoint(TestDatasetEndpoint):
@pytest.fixture
def time_freezer(self) -> Generator:
freezer = time_machine.travel(self.default_time, tick=False)
freezer.start()

yield

freezer.stop()

def _create_dataset_dag_run_queues(self, dag_id, dataset_id, session):
ddrq = DatasetDagRunQueue(target_dag_id=dag_id, dataset_id=dataset_id)
session.add(ddrq)
session.commit()
return ddrq


class TestGetDagDatasetQueueEvent(TestQueueEventEndpoint):
@pytest.mark.usefixtures("time_freezer")
def test_should_respond_200(self, session, create_dummy_dag):
dag, _ = create_dummy_dag()
dag_id = dag.dag_id
dataset_id = self._create_dataset(session).id
self._create_dataset_dag_run_queues(dag_id, dataset_id, session)
dataset_uri = "s3://bucket/key"

response = self.client.get(
f"/api/v1/dags/{dag_id}/datasets/eventQueue/{dataset_uri}",
environ_overrides={"REMOTE_USER": "test_queue_event"},
)

assert response.status_code == 200
assert response.json == {
"created_at": self.default_time,
"dataset_id": 1,
"target_dag_id": "dag",
}

def test_should_respond_404(self):
dag_id = "not_exists"
dataset_uri = "not_exists"
Expand Down Expand Up @@ -704,7 +743,31 @@ def test_should_raise_403_forbidden(self, session):
assert response.status_code == 403


class TestGetDagDatasetQueueEvents(TestDatasetEndpoint):
class TestGetDagDatasetQueueEvents(TestQueueEventEndpoint):
@pytest.mark.usefixtures("time_freezer")
def test_should_respond_200(self, session, create_dummy_dag, time_freezer):
dag, _ = create_dummy_dag()
dag_id = dag.dag_id
dataset_id = self._create_dataset(session).id
self._create_dataset_dag_run_queues(dag_id, dataset_id, session)

response = self.client.get(
f"/api/v1/dags/{dag_id}/datasets/eventQueue",
environ_overrides={"REMOTE_USER": "test_queue_event"},
)

assert response.status_code == 200
assert response.json == {
"dataset_dag_run_queues": [
{
"created_at": self.default_time,
"dataset_id": 1,
"target_dag_id": "dag",
}
],
"total_entries": 1,
}

def test_should_respond_404(self):
dag_id = "not_exists"

Expand Down Expand Up @@ -774,7 +837,32 @@ def test_should_raise_403_forbidden(self):
assert response.status_code == 403


class TestGetDatasetQueueEvents(TestDatasetEndpoint):
class TestGetDatasetQueueEvents(TestQueueEventEndpoint):
@pytest.mark.usefixtures("time_freezer")
def test_should_respond_200(self, session, create_dummy_dag):
dag, _ = create_dummy_dag()
dag_id = dag.dag_id
dataset_id = self._create_dataset(session).id
self._create_dataset_dag_run_queues(dag_id, dataset_id, session)
dataset_uri = "s3://bucket/key"

response = self.client.get(
f"/api/v1/datasets/eventQueue/{dataset_uri}",
environ_overrides={"REMOTE_USER": "test_queue_event"},
)

assert response.status_code == 200
assert response.json == {
"dataset_dag_run_queues": [
{
"created_at": self.default_time,
"dataset_id": 1,
"target_dag_id": "dag",
}
],
"total_entries": 1,
}

def test_should_respond_404(self):
dataset_uri = "not_exists"

Expand Down Expand Up @@ -809,7 +897,23 @@ def test_should_raise_403_forbidden(self):
assert response.status_code == 403


class TestDeleteDatasetQueueEvents(TestDatasetEndpoint):
class TestDeleteDatasetQueueEvents(TestQueueEventEndpoint):
def test_delete_should_respond_204(self, session, create_dummy_dag):
dag, _ = create_dummy_dag()
dag_id = dag.dag_id
dataset_id = self._create_dataset(session).id
self._create_dataset_dag_run_queues(dag_id, dataset_id, session)
dataset_uri = "s3://bucket/key"

response = self.client.delete(
f"/api/v1/datasets/eventQueue/{dataset_uri}",
environ_overrides={"REMOTE_USER": "test_queue_event"},
)

assert response.status_code == 204
conn = session.query(DatasetDagRunQueue).all()
assert len(conn) == 0

def test_should_respond_404(self):
dataset_uri = "not_exists"

Expand Down

0 comments on commit 32642c5

Please sign in to comment.