Skip to content

Commit

Permalink
adding run_id column to log table
Browse files Browse the repository at this point in the history
  • Loading branch information
SamWheating committed Mar 1, 2024
1 parent 6ec7386 commit ebbc05f
Show file tree
Hide file tree
Showing 15 changed files with 1,134 additions and 985 deletions.
4 changes: 4 additions & 0 deletions airflow/api_connexion/endpoints/event_log_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def get_event_logs(
*,
dag_id: str | None = None,
task_id: str | None = None,
run_id: str | None = None,
owner: str | None = None,
event: str | None = None,
excluded_events: str | None = None,
Expand All @@ -74,6 +75,7 @@ def get_event_logs(
"when",
"dag_id",
"task_id",
"run_id",
"event",
"execution_date",
"owner",
Expand All @@ -86,6 +88,8 @@ def get_event_logs(
query = query.where(Log.dag_id == dag_id)
if task_id:
query = query.where(Log.task_id == task_id)
if run_id:
query = query.where(Log.run_id == run_id)
if owner:
query = query.where(Log.owner == owner)
if event:
Expand Down
16 changes: 15 additions & 1 deletion airflow/api_connexion/openapi/v1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,7 @@ paths:
- $ref: "#/components/parameters/OrderBy"
- $ref: "#/components/parameters/FilterDAGID"
- $ref: "#/components/parameters/FilterTaskID"
- $ref: "#/components/parameters/FilterRunID"
- $ref: "#/components/parameters/Event"
- $ref: "#/components/parameters/Owner"
- $ref: "#/components/parameters/Before"
Expand Down Expand Up @@ -3242,7 +3243,12 @@ components:
readOnly: true
nullable: true
task_id:
description: The DAG ID
description: The Task ID
type: string
readOnly: true
nullable: true
run_id:
description: The DAG Run ID
type: string
readOnly: true
nullable: true
Expand Down Expand Up @@ -5504,6 +5510,14 @@ components:
required: false
description: Returns objects matched by the Task ID.

FilterRunID:
in: query
name: run_id
schema:
type: string
required: false
description: Returns objects matched by the Run ID.

# Other parameters
FileToken:
in: path
Expand Down
1 change: 1 addition & 0 deletions airflow/api_connexion/schemas/event_log_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class Meta:
dttm = auto_field(data_key="when", dump_only=True)
dag_id = auto_field(dump_only=True)
task_id = auto_field(dump_only=True)
run_id = auto_field(dump_only=True)
event = auto_field(dump_only=True)
execution_date = auto_field(dump_only=True)
owner = auto_field(dump_only=True)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""Add run_id to (Audit) log table
Revision ID: d75389605139
Revises: 1fd565369930
Create Date: 2024-02-29 17:50:03.759967
"""

import sqlalchemy as sa
from alembic import op


# revision identifiers, used by Alembic.
revision = 'd75389605139'
down_revision = '1fd565369930'
branch_labels = None
depends_on = None
airflow_version = '2.9.0'

from airflow.migrations.db_types import StringID

def upgrade():
"""Apply Add run_id to Log."""

# Note: we could repopulate the run_id of old runs via a join with DagRun on date + dag_id,
# But this would incur a potentially heavy migration for non-essential changes.
# Instead, we've chosen to only populate this column from 2.9.0 onwards.
with op.batch_alter_table("log") as batch_op:
batch_op.add_column(sa.Column("run_id", StringID(), nullable=True))

def downgrade():
"""Unapply Add run_id to Log."""
with op.batch_alter_table("log") as batch_op:
batch_op.drop_column("run_id")
4 changes: 4 additions & 0 deletions airflow/models/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class Log(Base):
map_index = Column(Integer)
event = Column(String(30))
execution_date = Column(UtcDateTime)
run_id = Column(StringID())
owner = Column(String(500))
owner_display_name = Column(String(500))
extra = Column(Text)
Expand All @@ -57,6 +58,7 @@ def __init__(self, event, task_instance=None, owner=None, owner_display_name=Non
self.dag_id = task_instance.dag_id
self.task_id = task_instance.task_id
self.execution_date = task_instance.execution_date
self.run_id = task_instance.run_id
self.map_index = task_instance.map_index
if getattr(task_instance, "task", None):
task_owner = task_instance.task.owner
Expand All @@ -67,6 +69,8 @@ def __init__(self, event, task_instance=None, owner=None, owner_display_name=Non
self.dag_id = kwargs["dag_id"]
if kwargs.get("execution_date"):
self.execution_date = kwargs["execution_date"]
if kwargs.get("run_id"):
self.run_id = kwargs["run_id"]
if "map_index" in kwargs:
self.map_index = kwargs["map_index"]

Expand Down
15 changes: 14 additions & 1 deletion airflow/www/static/js/api/useEventLogs.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import { useAutoRefresh } from "src/context/autorefresh";
export default function useEventLogs({
dagId,
taskId,
runId,
limit,
offset,
orderBy,
Expand All @@ -36,7 +37,18 @@ export default function useEventLogs({
}: API.GetEventLogsVariables) {
const { isRefreshOn } = useAutoRefresh();
return useQuery(
["eventLogs", dagId, taskId, limit, offset, orderBy, after, before, owner],
[
"eventLogs",
dagId,
taskId,
runId,
limit,
offset,
orderBy,
after,
before,
owner,
],
() => {
const eventsLogUrl = getMetaValue("event_logs_api");
const orderParam = orderBy ? { order_by: orderBy } : {};
Expand All @@ -46,6 +58,7 @@ export default function useEventLogs({
limit,
...{ dag_id: dagId },
...{ task_id: taskId },
...{ run_id: runId },
...orderParam,
after,
before,
Expand Down
19 changes: 17 additions & 2 deletions airflow/www/static/js/dag/details/AuditLog.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ const AuditLog = ({ taskId, run }: Props) => {
const { data, isLoading } = useEventLogs({
dagId,
taskId,
runId: run?.runId || undefined,
before: run?.lastSchedulingDecision || undefined,
after: run?.queuedAt || undefined,
orderBy,
Expand All @@ -77,6 +78,10 @@ const AuditLog = ({ taskId, run }: Props) => {
Header: "Task ID",
accessor: "taskId",
};
const runId = {
Header: "Run ID",
accessor: "runId",
};
const rest = [
{
Header: "Event",
Expand All @@ -92,8 +97,13 @@ const AuditLog = ({ taskId, run }: Props) => {
Cell: CodeCell,
},
];
return !taskId ? [when, task, ...rest] : [when, ...rest];
}, [taskId]);
return [
when,
...(!run ? [runId] : []),
...(!taskId ? [task] : []),
...rest,
];
}, [taskId, run]);

const memoData = useMemo(() => data?.eventLogs, [data?.eventLogs]);
const memoSort = useMemo(() => sortBy, [sortBy]);
Expand Down Expand Up @@ -141,6 +151,11 @@ const AuditLog = ({ taskId, run }: Props) => {
</FormHelperText>
)}
</FormControl>
<FormControl>
<FormLabel>Filter by Run ID</FormLabel>
<Input placeholder={run?.runId} isDisabled />
<FormHelperText />
</FormControl>
<FormControl>
<FormLabel>Filter by Task ID</FormLabel>
<Input placeholder={taskId} isDisabled />
Expand Down
8 changes: 7 additions & 1 deletion airflow/www/static/js/types/api-generated.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1198,8 +1198,10 @@ export interface components {
when?: string;
/** @description The DAG ID */
dag_id?: string | null;
/** @description The DAG ID */
/** @description The Task ID */
task_id?: string | null;
/** @description The DAG Run ID */
run_id?: string | null;
/** @description A key describing the type of event. */
event?: string;
/**
Expand Down Expand Up @@ -2538,6 +2540,8 @@ export interface components {
FilterDAGID: string;
/** @description Returns objects matched by the Task ID. */
FilterTaskID: string;
/** @description Returns objects matched by the Run ID. */
FilterRunID: string;
/**
* @description The key containing the encrypted path to the file. Encryption and decryption take place only on
* the server. This prevents the client from reading an non-DAG file. This also ensures API
Expand Down Expand Up @@ -3491,6 +3495,8 @@ export interface operations {
dag_id?: components["parameters"]["FilterDAGID"];
/** Returns objects matched by the Task ID. */
task_id?: components["parameters"]["FilterTaskID"];
/** Returns objects matched by the Run ID. */
run_id?: components["parameters"]["FilterRunID"];
/** The name of event log. */
event?: components["parameters"]["Event"];
/** The owner's name of event log. */
Expand Down
3 changes: 3 additions & 0 deletions airflow/www/templates/airflow/dag_audit_log.html
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ <h4 style="display: block; padding-top: 10px; padding-bottom: 4px">
<th>{{ sortable_column("Task ID", "task_id") }}</th>
<th>{{ sortable_column("Event", "event") }}</th>
<th>{{ sortable_column("Logical Date", "execution_date") }}</th>
<th>{{ sortable_column("Run ID", "run_id") }}</th>
<th>Owner
<span class="material-icons text-muted js-tooltip" aria-hidden="true" data-original-title="This is the user who triggered the event.">info</span>
</th>
Expand All @@ -86,6 +87,8 @@ <h4 style="display: block; padding-top: 10px; padding-bottom: 4px">
<td>{{ log.event if log.event else None }}</td>
<!-- Execution Date -->
<td>{{ log.execution_date if log.execution_date else None }}</td>
<!-- Dagrun ID -->
<td>{{ log.run_id if log.run_id else None }}</td>
<!-- By User -->
<td>{{ log.owner if log.owner else None }}</td>
<!-- Details -->
Expand Down
2 changes: 2 additions & 0 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -5240,6 +5240,7 @@ class LogModelView(AirflowModelView):
"dttm",
"dag_id",
"task_id",
"run_id",
"event",
"execution_date",
"owner",
Expand All @@ -5250,6 +5251,7 @@ class LogModelView(AirflowModelView):
"dttm",
"dag_id",
"task_id",
"run_id",
"event",
"execution_date",
"owner",
Expand Down
2 changes: 1 addition & 1 deletion docs/apache-airflow/img/airflow_erd.sha256
Original file line number Diff line number Diff line change
@@ -1 +1 @@
8cf665c41c065c9368adf2e96450e8cc111dc0653bfabdee977fd6e4964f5646
7b5e83ee2a39b641fb1ff91be8582347e19d7f9c2c4aa6fed48097920c89f92f

0 comments on commit ebbc05f

Please sign in to comment.