Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -409,12 +409,19 @@ def trigger_dag_run(
dag: DAG = dag_bag.get_dag(dag_id)
params = body.validate_context(dag)

conf = params["conf"]
# Get the logged in username and add it to conf as 'triggered_by'
username = "unknown"
if hasattr(user, "get_name"):
username = user.get_name()
conf["triggered_by"] = username

dag_run = dag.create_dagrun(
run_id=params["run_id"],
logical_date=params["logical_date"],
data_interval=params["data_interval"],
run_after=params["run_after"],
conf=params["conf"],
conf=conf,
run_type=DagRunType.MANUAL,
triggered_by=DagRunTriggeredByType.REST_API,
state=DagRunState.QUEUED,
Expand Down
5 changes: 4 additions & 1 deletion airflow-core/src/airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,10 @@ def __init__(
self.logical_date = logical_date
self.run_after = run_after
self.start_date = start_date
self.conf = conf or {}
params = conf or {}
if run_type != DagRunType.MANUAL:
params["triggered_by"] = "scheduler"
self.conf = params
if state is not None:
self.state = state
if queued_at is NOTSET:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1307,7 +1307,7 @@ def test_should_respond_200(
)
expected_response_json = {
"bundle_version": None,
"conf": {},
"conf": {"triggered_by": "test"},
"dag_display_name": DAG1_DISPLAY_NAME,
"dag_id": DAG1_ID,
"dag_run_id": expected_dag_run_id,
Expand Down Expand Up @@ -1513,7 +1513,7 @@ def test_should_response_409_for_duplicate_logical_date(self, test_client):
"run_type": "manual",
"state": "queued",
"triggered_by": "rest_api",
"conf": {},
"conf": {"triggered_by": "test"},
"note": note,
}

Expand Down Expand Up @@ -1599,6 +1599,61 @@ def test_should_respond_200_with_null_logical_date(self, test_client):
"run_type": "manual",
"state": "queued",
"triggered_by": "rest_api",
"conf": {},
"conf": {"triggered_by": "test"},
"note": None,
}

@pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_triggered_by_in_conf(self, test_client, session):
"""Test that triggered_by is added to conf with the correct username."""
now = timezone.utcnow().isoformat()
response = test_client.post(
f"/dags/{DAG1_ID}/dagRuns",
json={"logical_date": now},
)
assert response.status_code == 200

# Get the created DAG run from the database
dag_run = session.query(DagRun).filter_by(dag_id=DAG1_ID, run_id=response.json()["dag_run_id"]).one()

# Verify that triggered_by is in conf and has the correct value
assert "triggered_by" in dag_run.conf
assert dag_run.conf["triggered_by"] == "test" # Default test user name

@pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_triggered_by_with_custom_conf(self, test_client, session):
"""Test that triggered_by is added to conf while preserving custom conf values."""
now = timezone.utcnow().isoformat()
custom_conf = {"custom_key": "custom_value"}
response = test_client.post(
f"/dags/{DAG1_ID}/dagRuns",
json={"logical_date": now, "conf": custom_conf},
)
assert response.status_code == 200

# Get the created DAG run from the database
dag_run = session.query(DagRun).filter_by(dag_id=DAG1_ID, run_id=response.json()["dag_run_id"]).one()

# Verify that both triggered_by and custom conf values are present
assert "triggered_by" in dag_run.conf
assert dag_run.conf["triggered_by"] == "test"
assert "custom_key" in dag_run.conf
assert dag_run.conf["custom_key"] == "custom_value"

@pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_triggered_by_overrides_existing(self, test_client, session):
"""Test that triggered_by overrides any existing triggered_by in conf."""
now = timezone.utcnow().isoformat()
custom_conf = {"triggered_by": "custom_user"}
response = test_client.post(
f"/dags/{DAG1_ID}/dagRuns",
json={"logical_date": now, "conf": custom_conf},
)
assert response.status_code == 200

# Get the created DAG run from the database
dag_run = session.query(DagRun).filter_by(dag_id=DAG1_ID, run_id=response.json()["dag_run_id"]).one()

# Verify that triggered_by is overridden with the actual user
assert "triggered_by" in dag_run.conf
assert dag_run.conf["triggered_by"] == "test" # Should be overridden with actual user
40 changes: 40 additions & 0 deletions airflow-core/tests/unit/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -1244,6 +1244,46 @@ def test_dag_run_version_number(self, dag_maker, session):
# the latest task instance dag_version
assert dag_run.version_number == dag_v.version_number

def test_triggered_by_in_conf(self, dag_maker, session):
"""Test that triggered_by parameter is added to conf for non-manual runs"""
dag = DAG(dag_id="test_dags", schedule=datetime.timedelta(days=1), start_date=DEFAULT_DATE)

# Test for scheduled run type
scheduled_run = dag.create_dagrun(
run_id="test_triggered_by_in_conf_1",
run_after=None,
triggered_by=DagRunTriggeredByType.TEST,
run_type=DagRunType.SCHEDULED,
state=DagRunState.RUNNING,
start_date=timezone.utcnow(),
session=session,
)
assert scheduled_run.conf.get("triggered_by") == "scheduler"

# Test for backfill run type
backfill_run = dag.create_dagrun(
run_id="test_triggered_by_in_conf_2",
run_after=None,
triggered_by=DagRunTriggeredByType.TEST,
run_type=DagRunType.BACKFILL_JOB,
state=DagRunState.RUNNING,
start_date=timezone.utcnow(),
session=session,
)
assert backfill_run.conf.get("triggered_by") == "scheduler"

# Test for manual run type - should not have triggered_by
manual_run = dag.create_dagrun(
run_id="test_triggered_by_in_conf_3",
run_after=None,
triggered_by=DagRunTriggeredByType.TEST,
run_type=DagRunType.MANUAL,
state=DagRunState.RUNNING,
start_date=timezone.utcnow(),
session=session,
)
assert "triggered_by" not in manual_run.conf


@pytest.mark.parametrize(
("run_type", "expected_tis"),
Expand Down