Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up webserver endpoints adding to audit log #37580

Merged
merged 5 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1837,8 +1837,8 @@ webserver:
The audit logs in the db will not be affected by this parameter.
version_added: 2.3.0
type: string
example: ~
default: "gantt,landing_times,tries,duration,calendar,graph,grid,tree,tree_data"
example: "cli_task_run,running,success"
default: ~
audit_view_included_events:
description: |
Comma separated string of view events to include in dag audit view.
Expand Down
43 changes: 6 additions & 37 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -1325,8 +1325,7 @@ def legacy_code(self):

@expose("/dags/<string:dag_id>/code")
@auth.has_access_dag("GET", DagAccessEntity.CODE)
@provide_session
def code(self, dag_id, session: Session = NEW_SESSION):
def code(self, dag_id):
"""Dag Code."""
kwargs = {
**sanitize_args(request.args),
Expand All @@ -1348,7 +1347,6 @@ def dag_details(self, dag_id):

@expose("/rendered-templates")
@auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@action_logging
@provide_session
def rendered_templates(self, session):
"""Get rendered Dag."""
Expand All @@ -1362,7 +1360,7 @@ def rendered_templates(self, session):

logging.info("Retrieving rendered templates.")
dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id)
dag_run = dag.get_dagrun(execution_date=dttm, session=session)
dag_run = dag.get_dagrun(execution_date=dttm)
raw_task = dag.get_task(task_id).prepare_for_execution()

no_dagrun = False
Expand Down Expand Up @@ -1467,7 +1465,6 @@ def rendered_templates(self, session):

@expose("/rendered-k8s")
@auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@action_logging
@provide_session
def rendered_k8s(self, *, session: Session = NEW_SESSION):
"""Get rendered k8s yaml."""
Expand Down Expand Up @@ -1533,7 +1530,6 @@ def rendered_k8s(self, *, session: Session = NEW_SESSION):
@expose("/get_logs_with_metadata")
@auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@auth.has_access_dag("GET", DagAccessEntity.TASK_LOGS)
@action_logging
@provide_session
def get_logs_with_metadata(self, session: Session = NEW_SESSION):
"""Retrieve logs including metadata."""
Expand Down Expand Up @@ -1614,7 +1610,6 @@ def get_logs_with_metadata(self, session: Session = NEW_SESSION):

@expose("/log")
@auth.has_access_dag("GET", DagAccessEntity.TASK_LOGS)
@action_logging
@provide_session
def log(self, session: Session = NEW_SESSION):
"""Retrieve log."""
Expand Down Expand Up @@ -1659,7 +1654,6 @@ def log(self, session: Session = NEW_SESSION):

@expose("/redirect_to_external_log")
@auth.has_access_dag("GET", DagAccessEntity.TASK_LOGS)
@action_logging
@provide_session
def redirect_to_external_log(self, session: Session = NEW_SESSION):
"""Redirects to external log."""
Expand Down Expand Up @@ -1691,7 +1685,6 @@ def redirect_to_external_log(self, session: Session = NEW_SESSION):

@expose("/task")
@auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@action_logging
@provide_session
def task(self, session: Session = NEW_SESSION):
"""Retrieve task."""
Expand Down Expand Up @@ -1817,7 +1810,6 @@ def include_task_attrs(attr_name):

@expose("/xcom")
@auth.has_access_dag("GET", DagAccessEntity.XCOM)
@action_logging
@provide_session
def xcom(self, session: Session = NEW_SESSION):
"""Retrieve XCOM."""
Expand Down Expand Up @@ -2346,6 +2338,7 @@ def dagrun_clear(self, *, session: Session = NEW_SESSION):

@expose("/blocked", methods=["POST"])
@auth.has_access_dag("GET", DagAccessEntity.RUN)
@action_logging
@provide_session
def blocked(self, session: Session = NEW_SESSION):
"""Mark Dag Blocked."""
Expand Down Expand Up @@ -2491,7 +2484,7 @@ def dagrun_queued(self):

@expose("/dagrun_details")
def dagrun_details(self):
"""Redirect to the GRID DAGRun page. This is avoids breaking links."""
"""Redirect to the Grid DagRun page. This is avoids breaking links."""
dag_id = request.args.get("dag_id")
run_id = request.args.get("run_id")
return redirect(url_for("Airflow.grid", dag_id=dag_id, dag_run_id=run_id))
Expand Down Expand Up @@ -2762,24 +2755,19 @@ def success(self):
)

@expose("/dags/<string:dag_id>")
@gzipped
@action_logging
def dag(self, dag_id):
"""Redirect to default DAG view."""
kwargs = {**sanitize_args(request.args), "dag_id": dag_id}
return redirect(url_for("Airflow.grid", **kwargs))

@expose("/tree")
@gzipped
@action_logging
def legacy_tree(self):
"""Redirect to the replacement - grid view. Kept for backwards compatibility."""
return redirect(url_for("Airflow.grid", **sanitize_args(request.args)))

@expose("/dags/<string:dag_id>/grid")
@auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@gzipped
@action_logging
@provide_session
def grid(self, dag_id: str, session: Session = NEW_SESSION):
"""Get Dag's grid view."""
Expand Down Expand Up @@ -2838,16 +2826,13 @@ def grid(self, dag_id: str, session: Session = NEW_SESSION):
)

@expose("/calendar")
@gzipped
@action_logging
def legacy_calendar(self):
"""Redirect from url param."""
return redirect(url_for("Airflow.calendar", **sanitize_args(request.args)))

@expose("/dags/<string:dag_id>/calendar")
@auth.has_access_dag("GET", DagAccessEntity.RUN)
@gzipped
@action_logging
@provide_session
def calendar(self, dag_id: str, session: Session = NEW_SESSION):
"""Get DAG runs as calendar."""
Expand Down Expand Up @@ -2953,15 +2938,12 @@ def calendar(self, dag_id: str, session: Session = NEW_SESSION):
)

@expose("/graph")
@gzipped
@action_logging
def legacy_graph(self):
"""Redirect from url param."""
return redirect(url_for("Airflow.graph", **sanitize_args(request.args)))

@expose("/dags/<string:dag_id>/graph")
@gzipped
@action_logging
@provide_session
def graph(self, dag_id: str, session: Session = NEW_SESSION):
"""Redirect to the replacement - grid + graph. Kept for backwards compatibility."""
Expand All @@ -2984,14 +2966,12 @@ def graph(self, dag_id: str, session: Session = NEW_SESSION):
return redirect(url_for("Airflow.grid", **kwargs))

@expose("/duration")
@action_logging
def legacy_duration(self):
"""Redirect from url param."""
return redirect(url_for("Airflow.duration", **sanitize_args(request.args)))

@expose("/dags/<string:dag_id>/duration")
@auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@action_logging
@provide_session
def duration(self, dag_id: str, session: Session = NEW_SESSION):
"""Get Dag as duration graph."""
Expand Down Expand Up @@ -3137,14 +3117,12 @@ def grouping_key(ti: TaskInstance):
)

@expose("/tries")
@action_logging
def legacy_tries(self):
"""Redirect from url param."""
return redirect(url_for("Airflow.tries", **sanitize_args(request.args)))

@expose("/dags/<string:dag_id>/tries")
@auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@action_logging
@provide_session
def tries(self, dag_id: str, session: Session = NEW_SESSION):
"""Show all tries."""
Expand Down Expand Up @@ -3220,14 +3198,12 @@ def tries(self, dag_id: str, session: Session = NEW_SESSION):
)

@expose("/landing_times")
@action_logging
def legacy_landing_times(self):
"""Redirect from url param."""
return redirect(url_for("Airflow.landing_times", **sanitize_args(request.args)))

@expose("/dags/<string:dag_id>/landing-times")
@auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@action_logging
@provide_session
def landing_times(self, dag_id: str, session: Session = NEW_SESSION):
"""Show landing times."""
Expand Down Expand Up @@ -3326,14 +3302,12 @@ def paused(self):
return "OK"

@expose("/gantt")
@action_logging
def legacy_gantt(self):
"""Redirect from url param."""
return redirect(url_for("Airflow.gantt", **sanitize_args(request.args)))

@expose("/dags/<string:dag_id>/gantt")
@auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@action_logging
@provide_session
def gantt(self, dag_id: str, session: Session = NEW_SESSION):
"""Redirect to the replacement - grid + gantt. Kept for backwards compatibility."""
Expand All @@ -3349,7 +3323,6 @@ def gantt(self, dag_id: str, session: Session = NEW_SESSION):

@expose("/extra_links")
@auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@action_logging
@provide_session
def extra_links(self, *, session: Session = NEW_SESSION):
"""
Expand Down Expand Up @@ -3406,12 +3379,10 @@ def extra_links(self, *, session: Session = NEW_SESSION):
@expose("/object/graph_data")
@auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@gzipped
@action_logging
@provide_session
def graph_data(self, session: Session = NEW_SESSION):
def graph_data(self):
"""Get Graph Data."""
dag_id = request.args.get("dag_id")
dag = get_airflow_app().dag_bag.get_dag(dag_id, session=session)
dag = get_airflow_app().dag_bag.get_dag(dag_id)
root = request.args.get("root")
if root:
filter_upstream = request.args.get("filter_upstream") == "true"
Expand All @@ -3435,7 +3406,6 @@ def graph_data(self, session: Session = NEW_SESSION):

@expose("/object/task_instances")
@auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
@action_logging
def task_instances(self):
"""Show task instances."""
dag_id = request.args.get("dag_id")
Expand Down Expand Up @@ -5798,7 +5768,6 @@ class DagDependenciesView(AirflowBaseView):
@expose("/dag-dependencies")
@auth.has_access_dag("GET", DagAccessEntity.DEPENDENCIES)
@gzipped
@action_logging
def list(self):
"""Display DAG dependencies."""
title = "DAG Dependencies"
Expand Down
36 changes: 6 additions & 30 deletions tests/www/views/test_views_decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
# under the License.
from __future__ import annotations

import urllib.parse

import pytest

from airflow.models import DagBag, Variable
Expand Down Expand Up @@ -92,39 +90,17 @@ def clean_db():
clear_db_variables()


def test_action_logging_get(session, admin_client):
url = (
f"dags/example_bash_operator/grid?"
f"execution_date={urllib.parse.quote_plus(str(EXAMPLE_DAG_DEFAULT_DATE))}"
)
resp = admin_client.get(url, follow_redirects=True)
check_content_in_response("success", resp)
def test_action_logging_robots(session, admin_client):
url = "/robots.txt"
admin_client.get(url, follow_redirects=True)

# In mysql backend, this commit() is needed to write down the logs
session.commit()
_check_last_log(
session,
dag_id="example_bash_operator",
event="grid",
execution_date=EXAMPLE_DAG_DEFAULT_DATE,
)


def test_action_logging_get_legacy_view(session, admin_client):
url = (
f"tree?dag_id=example_bash_operator&"
f"execution_date={urllib.parse.quote_plus(str(EXAMPLE_DAG_DEFAULT_DATE))}"
)
resp = admin_client.get(url, follow_redirects=True)
check_content_in_response("success", resp)

# In mysql backend, this commit() is needed to write down the logs
session.commit()
_check_last_log(
session,
dag_id="example_bash_operator",
event="legacy_tree",
execution_date=EXAMPLE_DAG_DEFAULT_DATE,
event="robots",
dag_id=None,
execution_date=None,
)


Expand Down