Skip to content

Add AIP-to-user-stories skill for generating recipe playbooks from AIPs#65776

Merged
Lee-W merged 1 commit intoapache:mainfrom
Dev-iL:2604/playbook_skill
Apr 30, 2026
Merged

Add AIP-to-user-stories skill for generating recipe playbooks from AIPs#65776
Lee-W merged 1 commit intoapache:mainfrom
Dev-iL:2604/playbook_skill

Conversation

@Dev-iL
Copy link
Copy Markdown
Collaborator

@Dev-iL Dev-iL commented Apr 24, 2026

Summary

Adds a Claude Code skill (/aip-user-stories) that generates recipe playbooks from Airflow Improvement Proposals (AIPs). The skill has two modes:

  • Post-implementation mode — given an AIP URL and one or more PR URLs, it discovers the actual implementation in the codebase and generates verified recipe playbooks with three tiers of code confidence (verified, adapted, unverified).
  • Pre-implementation mode — given only an AIP URL (no PRs), it generates speculative user stories with open design questions to help AIP authors validate their design before coding.

Changes

  • New skill: .claude/skills/aip-user-stories/SKILL.md — full skill definition with 6-phase workflow for both modes, including AIP parsing, PR discovery, codebase analysis, recipe proposal, generation, and assembly.
  • Playbook template: .claude/skills/aip-user-stories/references/playbook-template.md — output template with sections for prerequisites, overview, recipes (post mode), and user stories (pre mode).
  • .gitignore: Changed .claude/ exclusion to .claude/* with !.claude/skills/ so the skill files are tracked in git while other Claude Code artifacts remain ignored.
  • .github/CODEOWNERS: Added /.claude/skills/ entry with the same reviewers as other agentic instruction files.
  • .pre-commit-config.yaml:
    • Added --detect-license-in-X-top-lines 10 to the short-license hook so it finds the Apache header in files with YAML frontmatter (like SKILL.md).
    • Narrowed the exclude list for the short-license hook — code-review.instructions.md and GitHub skill files now carry their own license headers, so the broad excludes are no longer needed.
    • Added .claude/skills to blacken-docs exclude (Markdown code fences in skill files are illustrative, not standalone Python).
  • .github/instructions/code-review.instructions.md: Added Apache License header (was previously excluded from the license check instead).

Appendix

Example output for AIP-93 (pre mode)

AIP-93 Asset Watermarks — Playbook

Airflow version: Future 3.x (WIP — depends on AIP-103)
Required packages: apache-airflow (core); provider packages for specific data sources (e.g., apache-airflow-providers-amazon, apache-airflow-providers-postgres)

Prerequisites

  • Airflow 3.x with AIP-103 (Task State Management) merged and enabled.
  • A configured Task State backend (default: DbTaskStateBackend using the metadata database). Configurable via [task_state] backend in airflow.cfg.
  • For S3 examples: apache-airflow-providers-amazon.
  • For SQL examples: a database provider package (e.g., apache-airflow-providers-postgres).

Overview

Incremental processing is the most common pattern in data orchestration, yet Airflow has never offered first-class support for persisting state — watermarks, cursors, or checkpoints — across executions. DAG authors using traditional operators resort to XCom or Variables, both of which are poor fits: XCom is scoped to a single DAG run and cleared on retry, while Variables are global singletons with no asset awareness.

For event-driven scheduling via Asset Watchers (AIP-82), the problem is worse. BaseEventTrigger instances running in the Triggerer have no built-in mechanism to persist state between invocations. Attempts to store state inside trigger instances or use Variables from async trigger code have proven fragile and unreliable.

AIP-93 proposes making BaseEventTrigger Asset-aware and leveraging AIP-103's state management infrastructure to give triggers and watchers a clean, scoped interface for storing and retrieving watermarks. The core pattern is: on each run, read the last watermark, scan for changes since that watermark, update the watermark, and yield events. This AIP also introduces a decorator-based authoring experience for Asset watchers, lowering the barrier for the community to build and distribute event-driven integrations.

Relationship to AIP-103: AIP-103 provides the persistence layer — AssetScope, BaseTaskStateBackend, and the /state/asset/{asset_id}/{key} Execution API endpoints. AIP-93 builds on top of this to define the patterns and APIs for using that layer within Asset Watchers and Event Triggers.


User Stories

1. Persisting a Timestamp Watermark Across Trigger Runs

Goal: Store and retrieve a simple timestamp watermark so an Event Trigger can resume scanning from where it last left off, rather than re-processing everything from the beginning.

# PROPOSED API — not yet implemented
import asyncio
from datetime import datetime, timezone

from airflow.triggers.base import BaseEventTrigger, TriggerEvent


class IncrementalScanTrigger(BaseEventTrigger):
    """Trigger that scans a resource incrementally using a timestamp watermark."""

    def __init__(self, resource_id: str, poke_interval: float = 60.0):
        super().__init__()
        self.resource_id = resource_id
        self.poke_interval = poke_interval

    def serialize(self):
        return (
            "my_package.triggers.IncrementalScanTrigger",
            {"resource_id": self.resource_id, "poke_interval": self.poke_interval},
        )

    async def run(self):
        # Retrieve the last watermark from asset-scoped state (AIP-103)
        last_watermark = await self.asset_state.aget("last_scanned_at")

        while True:
            new_items = await self._scan_since(last_watermark)

            if new_items:
                now = datetime.now(timezone.utc).isoformat()
                await self.asset_state.aset("last_scanned_at", now)
                yield TriggerEvent({"status": "success", "items": new_items})
                return

            await asyncio.sleep(self.poke_interval)

    async def _scan_since(self, watermark: str | None) -> list:
        """Scan the resource for items newer than the watermark."""
        ...

This story illustrates the fundamental watermark pattern. The trigger reads a previously stored timestamp via self.asset_state.aget(), scans only for changes after that point, and writes the new watermark back via self.asset_state.aset(). The asset_state object is automatically scoped to the Asset this trigger is watching — no manual key namespacing needed.

Open Design Questions:

  • How does asset_state get injected into the trigger? Currently BaseEventTrigger (in airflow-core/src/airflow/triggers/base.py) has no asset_state attribute. Does the Triggerer inject it when the trigger is associated with an AssetWatcher, or does the trigger need to explicitly request it? What happens if the same trigger class is used outside of an AssetWatcher context?
  • What happens if the watermark write succeeds but the TriggerEvent is lost? If the Triggerer crashes between aset() and yield TriggerEvent(...), the watermark advances but no event is emitted. On restart, the data in that window is silently skipped. Should watermark updates be transactional with event submission?
  • Should watermark reads/writes be atomic with respect to concurrent trigger instances? If two Triggerer processes run the same watcher (e.g., during a Triggerer restart with overlap), both could read the same watermark and produce duplicate events. Does AIP-103 provide any concurrency control?
  • What is the serialization format for watermark values? AIP-103's backend interface uses str for values. Should AIP-93 define a convention (ISO 8601 for timestamps, JSON for structured values), or leave it to trigger authors?

2. Using Asset State as a General Key-Value Store in Triggers

Goal: Store arbitrary structured state — not just timestamps — across trigger invocations, such as cursor positions, page tokens, or sets of previously seen IDs.

# PROPOSED API — not yet implemented
import asyncio
import json

from airflow.triggers.base import BaseEventTrigger, TriggerEvent


class PaginatedAPITrigger(BaseEventTrigger):
    """Trigger that watches an API endpoint using cursor-based pagination."""

    def __init__(self, api_url: str, poke_interval: float = 120.0):
        super().__init__()
        self.api_url = api_url
        self.poke_interval = poke_interval

    def serialize(self):
        return (
            "my_package.triggers.PaginatedAPITrigger",
            {"api_url": self.api_url, "poke_interval": self.poke_interval},
        )

    async def run(self):
        # Retrieve multiple state keys
        cursor = await self.asset_state.aget("page_cursor")
        seen_ids_raw = await self.asset_state.aget("seen_ids")
        seen_ids = set(json.loads(seen_ids_raw)) if seen_ids_raw else set()

        while True:
            response = await self._fetch_page(cursor)
            new_items = [
                item for item in response["items"] if item["id"] not in seen_ids
            ]

            if new_items:
                seen_ids.update(item["id"] for item in new_items)
                await self.asset_state.aset(
                    "page_cursor", response.get("next_cursor", "")
                )
                await self.asset_state.aset(
                    "seen_ids", json.dumps(sorted(seen_ids))
                )
                yield TriggerEvent({"status": "success", "new_items": new_items})
                return

            cursor = response.get("next_cursor", cursor)
            await asyncio.sleep(self.poke_interval)

    async def _fetch_page(self, cursor: str | None) -> dict:
        """Fetch a page of results from the API."""
        ...

This story demonstrates using multiple state keys for different aspects of the same incremental process. The cursor tracks pagination position while the seen IDs set provides deduplication. Both are persisted and scoped to the same Asset.

Open Design Questions:

  • Is there a size limit on state values? The DbTaskStateBackend stores values as strings in the metadata DB. For a growing set of seen IDs, this could become arbitrarily large. Should AIP-93 recommend or enforce limits? Should large state values use a different backend (e.g., object storage)?
  • Should there be batch get/set operations? Reading page_cursor and seen_ids requires two separate round-trips through the Execution API. A get_many(keys) / set_many(mapping) API would reduce latency, especially for triggers managing multiple state keys.
  • How does state serialization interact with the backend? AIP-103 uses str values. Should triggers be responsible for JSON serialization, or should the SDK provide typed helpers (e.g., asset_state.get_json("seen_ids"), asset_state.set_json("seen_ids", data))?
  • What happens to accumulated state when a watcher is removed from an Asset? If a user removes an AssetWatcher from an Asset definition, is the associated state garbage-collected, orphaned, or retained for potential re-attachment?

3. Watching an S3 Bucket for New Files

Goal: Incrementally detect new objects in an S3 bucket by persisting a timestamp watermark, avoiding a full bucket scan on every trigger invocation.

# PROPOSED API — not yet implemented
import asyncio
from datetime import datetime, timezone

from airflow.triggers.base import BaseEventTrigger, TriggerEvent


class S3NewObjectsTrigger(BaseEventTrigger):
    """Watch an S3 bucket/prefix for new objects using a timestamp watermark."""

    def __init__(
        self,
        bucket: str,
        prefix: str = "",
        aws_conn_id: str = "aws_default",
        poke_interval: float = 60.0,
    ):
        super().__init__()
        self.bucket = bucket
        self.prefix = prefix
        self.aws_conn_id = aws_conn_id
        self.poke_interval = poke_interval

    def serialize(self):
        return (
            "airflow.providers.amazon.triggers.s3.S3NewObjectsTrigger",
            {
                "bucket": self.bucket,
                "prefix": self.prefix,
                "aws_conn_id": self.aws_conn_id,
                "poke_interval": self.poke_interval,
            },
        )

    async def run(self):
        from airflow.providers.amazon.hooks.s3 import S3Hook

        hook = S3Hook(aws_conn_id=self.aws_conn_id)
        last_scanned = await self.asset_state.aget("last_scanned_at")

        while True:
            objects = await hook.list_keys_async(
                bucket_name=self.bucket,
                prefix=self.prefix,
                from_datetime=(
                    datetime.fromisoformat(last_scanned) if last_scanned else None
                ),
            )

            if objects:
                now = datetime.now(timezone.utc).isoformat()
                await self.asset_state.aset("last_scanned_at", now)
                yield TriggerEvent({
                    "status": "success",
                    "bucket": self.bucket,
                    "keys": objects,
                })
                return

            await asyncio.sleep(self.poke_interval)


# DAG file: using the trigger with an Asset and AssetWatcher
from airflow.sdk import DAG, Asset, AssetWatcher, task

s3_trigger = S3NewObjectsTrigger(bucket="my-data-lake", prefix="raw/events/")
data_lake = Asset(
    "s3://my-data-lake/raw/events/",
    watchers=[AssetWatcher(name="s3_new_objects", trigger=s3_trigger)],
)

with DAG(dag_id="process_new_events", schedule=[data_lake], catchup=False):

    @task
    def process_events(**context):
        triggering_event = context["triggering_event"]
        new_keys = triggering_event.payload["keys"]
        for key in new_keys:
            print(f"Processing: {key}")

    process_events()

This story shows the most common real-world use case for asset watermarks: monitoring cloud object storage. The trigger uses a timestamp watermark to avoid rescanning the entire bucket prefix on every poll. The DAG that schedules on this Asset receives the list of new keys via the TriggerEvent payload.

Open Design Questions:

  • Should the watermark be set before or after yielding the event? Setting it before (as shown) risks skipping data if the event is lost. Setting it after risks reprocessing data if the trigger restarts between yield and set. What transaction guarantees does AIP-103 provide here?
  • How does this interact with S3 eventual consistency? A newly written object might not appear in list_keys immediately. If the watermark advances past the object's LastModified timestamp before it becomes visible, the object is permanently skipped. Should triggers implement a configurable "lookback window" that overlaps the previous scan by a safety margin?
  • What if the bucket has millions of objects modified in one interval? The trigger would yield a single TriggerEvent with a huge payload. Should there be a mechanism for batched events, or is pagination the trigger author's responsibility?
  • How does this integrate with catchup=True? If a DAG has been paused and accumulates many events, does each event create a separate DAG run, or are they coalesced? How does this interact with the watermark — does each DAG run see a different watermark window?

4. Watching a SQL Table for New/Updated Rows

Goal: Detect new or modified rows in a relational database table by tracking a high-water mark on an updated_at column.

# PROPOSED API — not yet implemented
import asyncio
from datetime import datetime, timezone

from airflow.triggers.base import BaseEventTrigger, TriggerEvent


class SQLIncrementalTrigger(BaseEventTrigger):
    """Watch a SQL table for rows modified after the last watermark."""

    def __init__(
        self,
        conn_id: str,
        table: str,
        watermark_column: str = "updated_at",
        poke_interval: float = 60.0,
    ):
        super().__init__()
        self.conn_id = conn_id
        self.table = table
        self.watermark_column = watermark_column
        self.poke_interval = poke_interval

    def serialize(self):
        return (
            "my_package.triggers.SQLIncrementalTrigger",
            {
                "conn_id": self.conn_id,
                "table": self.table,
                "watermark_column": self.watermark_column,
                "poke_interval": self.poke_interval,
            },
        )

    async def run(self):
        from airflow.providers.common.sql.hooks.sql import DbApiHook

        hook = DbApiHook.get_hook(self.conn_id)
        last_watermark = await self.asset_state.aget("table_last_updated_at")

        while True:
            if last_watermark:
                query = (
                    f"SELECT * FROM {self.table} "
                    f"WHERE {self.watermark_column} > %s "
                    f"ORDER BY {self.watermark_column}"
                )
                records = hook.get_records(query, parameters=[last_watermark])
            else:
                query = (
                    f"SELECT * FROM {self.table} "
                    f"ORDER BY {self.watermark_column}"
                )
                records = hook.get_records(query)

            if records:
                max_watermark = max(row[-1] for row in records)
                await self.asset_state.aset(
                    "table_last_updated_at",
                    (
                        max_watermark.isoformat()
                        if isinstance(max_watermark, datetime)
                        else str(max_watermark)
                    ),
                )
                yield TriggerEvent({
                    "status": "success",
                    "table": self.table,
                    "row_count": len(records),
                })
                return

            await asyncio.sleep(self.poke_interval)


# DAG file
from airflow.sdk import DAG, Asset, AssetWatcher, task

pg_trigger = SQLIncrementalTrigger(
    conn_id="postgres_default",
    table="orders",
    watermark_column="updated_at",
)
orders_table = Asset(
    "postgres://warehouse/orders",
    watchers=[AssetWatcher(name="orders_watcher", trigger=pg_trigger)],
)

with DAG(dag_id="process_new_orders", schedule=[orders_table], catchup=False):

    @task
    def ingest_orders(**context):
        print("New rows detected in orders table")

    ingest_orders()

This story mirrors the S3 pattern but for relational databases. The watermark column (typically updated_at or an auto-incrementing ID) determines which rows are "new." The trigger reads the last watermark from asset state, queries for rows beyond it, and advances the watermark to the maximum value in the result set.

Open Design Questions:

  • SQL injection risk: The table name and column name are interpolated directly into the query string. Should AIP-93 provide safe query-building utilities, or is this the trigger author's responsibility? Should BaseEventTrigger offer parameterized query helpers?
  • What about deleted rows? A watermark on updated_at catches inserts and updates but misses deletes. Should AIP-93 address soft-delete patterns (e.g., an is_deleted flag) or is that out of scope?
  • First-run behavior: On the first invocation, last_watermark is None, causing a full table scan. For large tables this could be catastrophic. Should there be a convention for setting an initial watermark (e.g., "only look at rows from the last 24 hours")?
  • Clock skew between Airflow and the database: The watermark is derived from the max value in the result set (not from datetime.now()), which avoids clock skew. Should this pattern be documented as a best practice, or should AIP-93 enforce it?

5. Building a Custom Asset Watcher for Any Data Source

Goal: Apply the watermark pattern to a non-standard data source (e.g., a REST API, message queue, or custom service) using the same state management primitives.

# PROPOSED API — not yet implemented
import asyncio
import json
from datetime import datetime, timezone

from airflow.triggers.base import BaseEventTrigger, TriggerEvent


class WebhookLogTrigger(BaseEventTrigger):
    """Watch a webhook log API for new events since the last processed event ID."""

    def __init__(
        self, api_base_url: str, api_key_conn_id: str, poke_interval: float = 30.0
    ):
        super().__init__()
        self.api_base_url = api_base_url
        self.api_key_conn_id = api_key_conn_id
        self.poke_interval = poke_interval

    def serialize(self):
        return (
            "my_package.triggers.WebhookLogTrigger",
            {
                "api_base_url": self.api_base_url,
                "api_key_conn_id": self.api_key_conn_id,
                "poke_interval": self.poke_interval,
            },
        )

    async def run(self):
        import httpx

        last_event_id = await self.asset_state.aget("last_event_id")
        stats_raw = await self.asset_state.aget("processing_stats")
        stats = json.loads(stats_raw) if stats_raw else {"total_processed": 0}

        async with httpx.AsyncClient() as client:
            while True:
                params = {"after": last_event_id} if last_event_id else {}
                resp = await client.get(
                    f"{self.api_base_url}/events",
                    params=params,
                    headers=await self._get_auth_headers(),
                )
                resp.raise_for_status()
                events = resp.json()["data"]

                if events:
                    last_event_id = events[-1]["id"]
                    stats["total_processed"] += len(events)
                    stats["last_run"] = datetime.now(timezone.utc).isoformat()

                    await self.asset_state.aset("last_event_id", str(last_event_id))
                    await self.asset_state.aset(
                        "processing_stats", json.dumps(stats)
                    )

                    yield TriggerEvent({
                        "status": "success",
                        "event_count": len(events),
                        "event_ids": [e["id"] for e in events],
                    })
                    return

                await asyncio.sleep(self.poke_interval)

    async def _get_auth_headers(self) -> dict:
        """Retrieve API credentials from Airflow connection."""
        ...


# DAG file
from airflow.sdk import DAG, Asset, AssetWatcher, task

webhook_trigger = WebhookLogTrigger(
    api_base_url="https://api.example.com/v1",
    api_key_conn_id="example_api",
)
webhook_events = Asset(
    "https://api.example.com/v1/events",
    watchers=[AssetWatcher(name="webhook_log_watcher", trigger=webhook_trigger)],
)

with DAG(dag_id="process_webhook_events", schedule=[webhook_events], catchup=False):

    @task
    def handle_events(**context):
        event_ids = context["triggering_event"].payload["event_ids"]
        for event_id in event_ids:
            print(f"Processing webhook event: {event_id}")

    handle_events()

This story generalizes the watermark pattern beyond cloud storage and SQL. It uses an event ID as the watermark (rather than a timestamp), demonstrates multiple state keys for different concerns (cursor vs. statistics), and shows the pattern applied to an HTTP API.

Open Design Questions:

  • Should AIP-93 provide a base class or mixin that standardizes the watermark pattern? All stories so far follow the same structure: read watermark, scan, update watermark, yield event. A WatermarkTrigger base class could reduce boilerplate and enforce best practices. But would this over-constrain trigger authors who need different patterns?
  • How should triggers handle transient errors? If the API call fails mid-scan, the watermark hasn't advanced, so a retry will rescan from the same point — which is correct. But if the aset() call fails after a successful scan, the trigger might yield an event without persisting the watermark, causing duplicate processing on the next run. Should there be retry/rollback semantics?
  • Is there a discovery mechanism for available state keys? Can a trigger introspect what keys are stored for its asset (e.g., await self.asset_state.list_keys())? This would help with debugging and building generic monitoring tools.
  • How does this interact with trigger serialization? BaseEventTrigger.serialize() returns the classpath and kwargs. The watermark state is NOT part of kwargs (it's in the state backend). Is this separation clear enough, or will trigger authors accidentally try to store state in kwargs?

6. Decorator-Based Asset Watcher Authoring

Goal: Define Asset watching logic using a simple decorator instead of writing a full BaseEventTrigger subclass, lowering the barrier to entry for building event-driven integrations.

# PROPOSED API — not yet implemented
from datetime import datetime, timezone

from airflow.sdk import Asset, asset


# Option A: Decorator on a function that becomes the watcher
@asset.watcher(poke_interval=60.0)
async def watch_s3_bucket(asset_state, *, bucket: str, prefix: str = ""):
    """Watch an S3 bucket for new files."""
    from airflow.providers.amazon.hooks.s3 import S3Hook

    hook = S3Hook()
    last_scanned = await asset_state.aget("last_scanned_at")

    objects = await hook.list_keys_async(
        bucket_name=bucket,
        prefix=prefix,
        from_datetime=(
            datetime.fromisoformat(last_scanned) if last_scanned else None
        ),
    )

    if objects:
        await asset_state.aset(
            "last_scanned_at", datetime.now(timezone.utc).isoformat()
        )
        return {"keys": objects}  # Becomes TriggerEvent payload

    return None  # No event — will retry after poke_interval


# Using the decorated watcher with an Asset
data_lake = Asset(
    "s3://my-data-lake/raw/",
    watchers=[watch_s3_bucket(bucket="my-data-lake", prefix="raw/")],
)


# Option B: Decorator that registers directly on the asset
my_asset = Asset("s3://my-data-lake/raw/")


@my_asset.watch(poke_interval=60.0)
async def on_new_files(asset_state):
    last_scanned = await asset_state.aget("last_scanned_at")
    # ... scan logic ...
    return {"keys": ["file1.parquet", "file2.parquet"]}

This story addresses one of the AIP's explicit goals: a more intuitive, decorator-based authoring experience. Instead of subclassing BaseEventTrigger, implementing serialize(), and managing the async generator protocol, the user writes a simple function that receives asset_state and returns a payload (or None to retry).

Open Design Questions:

  • How does the decorator create a serializable trigger? BaseEventTrigger.serialize() must return a classpath and kwargs for reconstruction by the Triggerer. A decorated function isn't a class. Does the decorator generate a synthetic class, or does it use a generic FunctionTrigger that stores the function reference and kwargs?
  • How are function arguments handled? In Option A, bucket and prefix are passed when attaching the watcher. These need to survive serialization. Should they be stored in the trigger kwargs, or in the asset state?
  • What about the async generator protocol? Current triggers use async def run(self) -> AsyncIterator[TriggerEvent] with yield. The decorator replaces this with a simpler return-value protocol. Can decorated watchers still use yield for streaming multiple events in one invocation?
  • Does the decorator approach support cleanup() and on_kill()? These lifecycle methods exist on BaseTrigger. If the decorated function is the entire trigger, how does the user define cleanup logic? Additional decorators (@watch_s3_bucket.on_kill)? Optional callback parameters?

7. Asset-Aware Event Triggers

Goal: Make BaseEventTrigger aware of which Asset it is watching, so the trigger can automatically scope its state and behavior to the correct asset without manual configuration.

# PROPOSED API — not yet implemented
import asyncio
from datetime import datetime, timezone

from airflow.triggers.base import BaseEventTrigger, TriggerEvent


class GenericFileWatchTrigger(BaseEventTrigger):
    """A trigger that adapts its behavior based on the asset it's watching."""

    def __init__(self, poke_interval: float = 60.0):
        super().__init__()
        self.poke_interval = poke_interval

    def serialize(self):
        return (
            "my_package.triggers.GenericFileWatchTrigger",
            {"poke_interval": self.poke_interval},
        )

    async def run(self):
        # self.asset is injected by the Triggerer when the trigger is
        # associated with an AssetWatcher — contains asset name, uri, extra
        asset_uri = self.asset.uri
        scheme = asset_uri.split("://")[0]

        last_scanned = await self.asset_state.aget("last_scanned_at")

        while True:
            if scheme == "s3":
                new_items = await self._scan_s3(asset_uri, last_scanned)
            elif scheme == "gs":
                new_items = await self._scan_gcs(asset_uri, last_scanned)
            elif scheme.startswith("file"):
                new_items = await self._scan_local(asset_uri, last_scanned)
            else:
                raise ValueError(f"Unsupported asset scheme: {scheme}")

            if new_items:
                now = datetime.now(timezone.utc).isoformat()
                await self.asset_state.aset("last_scanned_at", now)
                yield TriggerEvent({
                    "status": "success",
                    "asset_uri": asset_uri,
                    "items": new_items,
                })
                return

            await asyncio.sleep(self.poke_interval)

    async def _scan_s3(self, uri: str, since: str | None) -> list:
        ...

    async def _scan_gcs(self, uri: str, since: str | None) -> list:
        ...

    async def _scan_local(self, uri: str, since: str | None) -> list:
        ...


# DAG file — same trigger class, different assets
from airflow.sdk import DAG, Asset, AssetWatcher, task

s3_data = Asset(
    "s3://bucket/raw/",
    watchers=[AssetWatcher(name="s3_watch", trigger=GenericFileWatchTrigger())],
)
gcs_data = Asset(
    "gs://bucket/raw/",
    watchers=[AssetWatcher(name="gcs_watch", trigger=GenericFileWatchTrigger())],
)

with DAG(
    dag_id="multi_cloud_ingest",
    schedule=[s3_data | gcs_data],
    catchup=False,
):

    @task
    def process(**context):
        ...

    process()

This story demonstrates the key architectural change AIP-93 proposes: making triggers aware of the asset they're monitoring. Currently, BaseEventTrigger (in airflow-core/src/airflow/triggers/base.py) has no knowledge of which asset it serves — the connection is external, via AssetWatcherModel. With asset-awareness, a single trigger class can adapt its behavior based on the asset URI, and asset_state is automatically scoped to the correct asset.

Open Design Questions:

  • What does self.asset expose? Is it a full Asset object (with name, uri, group, extra, watchers) or a lightweight reference? Loading the full object might require a database query. Should it be lazy-loaded?
  • What happens when a trigger is shared across multiple assets? The current AssetWatcherModel has a composite primary key (asset_id, trigger_id). If the same trigger instance is assigned to multiple assets, which self.asset does it see? Is a trigger always 1:1 with an asset in the watcher context?
  • How does this interact with trigger deduplication? BaseEventTrigger.hash() uses classpath and kwargs to identify unique triggers. Two watchers using GenericFileWatchTrigger() with the same poke_interval but different assets would hash to the same value. Should the asset identity be part of the trigger hash?
  • Should self.asset.extra be writable? Asset extra metadata is currently user-defined at DAG parse time. If a trigger could update it (e.g., adding a last_file_count metric), it would blur the line between asset metadata and asset state. Should extra remain read-only with all mutable state going through asset_state?

8. Partitioned Asset Watermarks

Goal: Track independent watermarks per partition of a partitioned Asset, so each partition's incremental processing state is isolated.

# PROPOSED API — not yet implemented
import asyncio
from datetime import datetime, timezone

from airflow.triggers.base import BaseEventTrigger, TriggerEvent


class PartitionedS3Trigger(BaseEventTrigger):
    """Watch an S3 prefix with date-partitioned subdirectories."""

    def __init__(
        self,
        bucket: str,
        base_prefix: str,
        partition_pattern: str = "dt={date}/",
        aws_conn_id: str = "aws_default",
        poke_interval: float = 60.0,
    ):
        super().__init__()
        self.bucket = bucket
        self.base_prefix = base_prefix
        self.partition_pattern = partition_pattern
        self.aws_conn_id = aws_conn_id
        self.poke_interval = poke_interval

    def serialize(self):
        return (
            "airflow.providers.amazon.triggers.s3.PartitionedS3Trigger",
            {
                "bucket": self.bucket,
                "base_prefix": self.base_prefix,
                "partition_pattern": self.partition_pattern,
                "aws_conn_id": self.aws_conn_id,
                "poke_interval": self.poke_interval,
            },
        )

    async def run(self):
        from airflow.providers.amazon.hooks.s3 import S3Hook

        hook = S3Hook(aws_conn_id=self.aws_conn_id)

        while True:
            partitions = await self._discover_partitions(hook)

            for partition_key in partitions:
                watermark = await self.asset_state.aget(
                    f"watermark:{partition_key}"
                )

                prefix = (
                    f"{self.base_prefix}"
                    f"{self.partition_pattern.format(date=partition_key)}"
                )
                new_objects = await hook.list_keys_async(
                    bucket_name=self.bucket,
                    prefix=prefix,
                    from_datetime=(
                        datetime.fromisoformat(watermark) if watermark else None
                    ),
                )

                if new_objects:
                    now = datetime.now(timezone.utc).isoformat()
                    await self.asset_state.aset(
                        f"watermark:{partition_key}", now
                    )

                    yield TriggerEvent({
                        "status": "success",
                        "partition_key": partition_key,
                        "keys": new_objects,
                    })
                    return

            await asyncio.sleep(self.poke_interval)

    async def _discover_partitions(self, hook) -> list[str]:
        """List partition subdirectories under the base prefix."""
        ...


# DAG file — partitioned asset with per-partition scheduling
from airflow.sdk import DAG, Asset, AssetWatcher, task
from airflow.sdk.definitions.timetables.assets import PartitionedAssetTimetable

partitioned_trigger = PartitionedS3Trigger(
    bucket="data-lake",
    base_prefix="events/",
    partition_pattern="dt={date}/",
)
events = Asset(
    "s3://data-lake/events/",
    watchers=[AssetWatcher(name="partitioned_s3", trigger=partitioned_trigger)],
)

with DAG(
    dag_id="process_partitioned_events",
    timetable=PartitionedAssetTimetable(
        asset_condition=events,
        partition_mapper_config={},
    ),
    catchup=False,
):

    @task
    def process_partition(**context):
        partition = context["dag_run"].partition_key
        event = context["triggering_event"]
        keys = event.payload["keys"]
        print(f"Processing {len(keys)} files for partition {partition}")

    process_partition()

This story combines AIP-93 watermarks with Airflow's existing asset partition infrastructure (PartitionedAssetTimetable in task-sdk/src/airflow/sdk/definitions/timetables/assets.py). Each date partition (e.g., dt=2025-01-15/) gets its own watermark stored under a namespaced key (watermark:2025-01-15). When new files land in a partition, only that partition's watermark advances, and the resulting DAG run is scoped to that partition.

Open Design Questions:

  • Should partition-scoped state use the AssetScope or a new PartitionScope? AIP-103 defines AssetScope(asset_id=str) but no partition-level scoping. The pattern above manually namespaces keys with watermark:{partition_key}. Should AIP-103 be extended with AssetPartitionScope(asset_id, partition_key) for first-class partition support?
  • How many partitions can a single trigger manage? If an S3 prefix has thousands of date partitions, the trigger would make thousands of state reads per poll cycle. Should there be a mechanism to prune old partitions from active scanning (e.g., only scan the last N days)?
  • How does TriggerEvent map to partitioned DAG runs? The current Trigger.submit_event() calls AssetManager.register_asset_change() without a partition key. How does the event's partition_key propagate from the trigger through to AssetEvent.partition_key and ultimately to AssetPartitionDagRun?
  • What happens when a new partition appears? If dt=2025-01-20/ is created for the first time, there's no watermark for it. The trigger would do a full scan of that partition. Is this the desired behavior, or should new partitions inherit a default watermark (e.g., "partition creation time")?

Example output for AIP-76 (post mode)

AIP-76 Asset Partitions — Playbook

Airflow version: 3.2.0
Required packages: apache-airflow (core only — no extra providers needed)

Prerequisites

  • Airflow 3.2.0 or later.
  • Database migrated to include the partition schema (airflow db migrate).
  • All imports come from airflow.sdk. No provider packages are needed for basic partition functionality.

Overview

AIP-76 introduces asset partitions to Airflow, enabling data processing at finer granularity than a DAG's schedule. Before partitions, a DAG run represented a single unit of work for an entire dataset. With partitions, each DAG run can target a specific slice — an hour's worth of data, a region, or a combination of both — and downstream DAGs trigger only when matching partitions from all required upstream assets are available.

The implementation has two sides: producers emit partitioned asset events using CronPartitionTimetable, and consumers react to those events using PartitionedAssetTimetable. Between them, partition mappers transform partition keys so that upstream and downstream granularities can differ (e.g., hourly events roll up into daily partitions). The scheduler tracks partition key alignment across multiple upstream assets and only creates a downstream DAG run when all required partitions match.

Partitioned events are a separate scheduling path from regular asset-triggered DAGs. A PartitionedAssetTimetable consumer ignores non-partitioned events, and a regular AssetTriggeredTimetable consumer ignores partitioned events.


Recipes

1. Produce Hourly Partitioned Events with CronPartitionTimetable

Goal: Set up a producer DAG that emits asset events tagged with a partition key on every cron tick.

from airflow.sdk import DAG, Asset, CronPartitionTimetable, task

team_a_player_stats = Asset(
    uri="file://incoming/player-stats/team_a.csv",
    name="team_a_player_stats",
)

with DAG(
    dag_id="ingest_team_a_player_stats",
    schedule=CronPartitionTimetable("0 * * * *", timezone="UTC"),
    tags=["player-stats", "ingestion"],
):

    @task(outlets=[team_a_player_stats])
    def ingest_team_a_stats():
        pass

    ingest_team_a_stats()

Verified — from airflow-core/src/airflow/example_dags/example_asset_partition.py

CronPartitionTimetable works like CronTriggerTimetable but additionally stamps each DAG run — and the asset events it emits — with a partition key derived from the run's scheduled time. The default key_format is "%Y-%m-%dT%H:%M:%S", so a run scheduled at 2026-03-10 09:00 UTC produces partition key "2026-03-10T09:00:00".

You can customize the key format and offset:

CronPartitionTimetable(
    "0 * * * *",
    timezone="UTC",
    key_format="%Y-%m-%d/%H",   # partition key: "2026-03-10/09"
    run_offset=-1,               # partition date is one cron interval before run date
)

2. Produce Partitioned Events with the @asset Decorator

Goal: Use the compact @asset decorator to define a single-task partitioned producer without an explicit DAG block.

from airflow.sdk import CronPartitionTimetable, asset


@asset(
    uri="file://incoming/player-stats/team_b.csv",
    schedule=CronPartitionTimetable("15 * * * *", timezone="UTC"),
    tags=["player-stats", "ingestion"],
)
def team_b_player_stats():
    pass

Verified — from airflow-core/src/airflow/example_dags/example_asset_partition.py

The @asset decorator creates both a DAG and a single task in one step. The resulting asset can be referenced by name or URI in downstream PartitionedAssetTimetable consumers, just like any other asset.


3. Consume a Single Partitioned Asset

Goal: Set up a downstream DAG that triggers when a partitioned event arrives from one upstream asset.

from airflow.sdk import PartitionedAssetTimetable, asset

combined_player_stats = Asset(uri="file://curated/player-stats/combined.csv", name="combined_player_stats")


@asset(
    uri="file://analytics/player-stats/computed-player-odds.csv",
    schedule=PartitionedAssetTimetable(assets=combined_player_stats),
    tags=["player-stats", "odds"],
)
def compute_player_odds():
    pass

Verified — from airflow-core/src/airflow/example_dags/example_asset_partition.py

When no default_partition_mapper is specified, IdentityMapper is used — the downstream partition key is the same as the upstream key. The consumer DAG gets one DAG run per unique partition key emitted by the upstream asset. Non-partitioned events from the same asset are ignored.


4. Consume Multiple Partitioned Assets with Aligned Keys

Goal: Trigger a downstream DAG only when all required upstream assets have emitted events with a matching partition key.

from airflow.sdk import DAG, Asset, PartitionedAssetTimetable, StartOfHourMapper, task

team_a_player_stats = Asset(uri="file://incoming/player-stats/team_a.csv", name="team_a_player_stats")
team_b_player_stats = Asset(uri="file://incoming/player-stats/team_b.csv", name="team_b_player_stats")
team_c_player_stats = Asset(uri="file://incoming/player-stats/team_c.csv", name="team_c_player_stats")

with DAG(
    dag_id="clean_and_combine_player_stats",
    schedule=PartitionedAssetTimetable(
        assets=team_a_player_stats & team_b_player_stats & team_c_player_stats,
        default_partition_mapper=StartOfHourMapper(),
    ),
    catchup=False,
    tags=["player-stats", "cleanup"],
):

    @task
    def combine_player_stats():
        pass

    combine_player_stats()

Verified — from airflow-core/src/airflow/example_dags/example_asset_partition.py

The & operator creates an AssetAll condition — all three assets must have a matching partition key before the downstream DAG triggers. The StartOfHourMapper normalizes each upstream key (e.g., "2026-03-10T09:15:00") to hour granularity ("2026-03-10T09"), so events from different producers that fall within the same hour are aligned.

If Team A emits "2026-03-10T09:00:00" and Team B emits "2026-03-10T09:15:00", StartOfHourMapper maps both to "2026-03-10T09". The downstream DAG won't run until Team C also emits an event that maps to the same key.


5. Chain Partitioned Assets Across Multiple Levels

Goal: Propagate partition keys through a multi-level pipeline: producer → consumer → consumer.

from airflow.sdk import (
    Asset,
    CronPartitionTimetable,
    PartitionedAssetTimetable,
    StartOfHourMapper,
    asset,
    task,
)

team_a = Asset(uri="file://incoming/player-stats/team_a.csv", name="team_a_player_stats")
combined = Asset(uri="file://curated/player-stats/combined.csv", name="combined_player_stats")


@asset(
    uri="file://incoming/player-stats/team_a.csv",
    schedule=CronPartitionTimetable("0 * * * *", timezone="UTC"),
)
def team_a_player_stats():
    pass


@asset(
    uri="file://curated/player-stats/combined.csv",
    schedule=PartitionedAssetTimetable(
        assets=team_a,
        default_partition_mapper=StartOfHourMapper(),
    ),
)
def combined_player_stats():
    pass


@asset(
    uri="file://analytics/player-stats/computed-player-odds.csv",
    schedule=PartitionedAssetTimetable(assets=combined),
)
def compute_player_odds():
    pass

Adapted from airflow-core/src/airflow/example_dags/example_asset_partition.py

Each level in the chain is an independent partition-aware DAG. team_a_player_stats produces events with full-timestamp keys. combined_player_stats maps those to hourly keys and emits new partition events. compute_player_odds picks up those hourly keys with the default IdentityMapper. The partition key flows through the chain, with each level optionally transforming the granularity.


6. Aggregate Hourly Partitions to Daily with Temporal Mappers

Goal: Roll up hourly upstream partition keys to daily granularity so the downstream DAG triggers once per day, after any hourly event within that day.

from airflow.sdk import DAG, Asset, PartitionedAssetTimetable, StartOfDayMapper, task

hourly_sales = Asset(uri="file://incoming/sales/hourly.csv", name="hourly_sales")

with DAG(
    dag_id="daily_sales_summary",
    schedule=PartitionedAssetTimetable(
        assets=hourly_sales,
        default_partition_mapper=StartOfDayMapper(),
    ),
    catchup=False,
):

    @task
    def summarize_sales(dag_run=None):
        print(f"Processing daily partition: {dag_run.partition_key}")

    summarize_sales()

Adapted from airflow-core/src/airflow/example_dags/example_asset_partition.py

StartOfDayMapper truncates timestamp-based keys to "%Y-%m-%d" format. An upstream event with key "2026-03-10T09:15:00" maps to downstream key "2026-03-10". The downstream DAG run is created when the first hourly event for that day arrives (since a single upstream asset is required). If you need all 24 hourly partitions before triggering, that requires rollup support which is tracked for future implementation.

Available temporal mappers and their output formats:

Mapper Default output format Example output
StartOfHourMapper %Y-%m-%dT%H 2026-03-10T09
StartOfDayMapper %Y-%m-%d 2026-03-10
StartOfWeekMapper %Y-%m-%d (W%V) 2026-03-09 (W11)
StartOfMonthMapper %Y-%m 2026-03
StartOfQuarterMapper %Y-Q{quarter} 2026-Q1
StartOfYearMapper %Y 2026

All temporal mappers accept optional input_format and output_format parameters to override the defaults:

StartOfDayMapper(input_format="%Y/%m/%d %H:%M", output_format="%Y_%m_%d")

7. Map Composite Partition Keys with ProductMapper

Goal: Handle multi-dimensional partition keys (e.g., "region|timestamp") by applying a different mapper to each segment.

from airflow.sdk import (
    DAG,
    Asset,
    IdentityMapper,
    PartitionedAssetTimetable,
    ProductMapper,
    StartOfDayMapper,
    task,
)

regional_sales = Asset(uri="file://incoming/sales/regional.csv", name="regional_sales")

with DAG(
    dag_id="aggregate_regional_sales",
    schedule=PartitionedAssetTimetable(
        assets=regional_sales,
        default_partition_mapper=ProductMapper(IdentityMapper(), StartOfDayMapper()),
    ),
    catchup=False,
    tags=["sales", "aggregation"],
):

    @task
    def aggregate_sales(dag_run=None):
        print(dag_run.partition_key)

    aggregate_sales()

Verified — from airflow-core/src/airflow/example_dags/example_asset_partition.py

ProductMapper splits the incoming key by a delimiter ("|" by default), applies one mapper per segment, and rejoins the results. For input key "us|2026-03-10T09:00:00":

  1. Segment 0 ("us") → IdentityMapper"us"
  2. Segment 1 ("2026-03-10T09:00:00") → StartOfDayMapper"2026-03-10"
  3. Result: "us|2026-03-10"

ProductMapper requires at least two mappers (positional-only) and accepts a custom delimiter:

ProductMapper(IdentityMapper(), StartOfHourMapper(), delimiter="::")

8. Restrict Partition Keys with AllowedKeyMapper

Goal: Accept only specific partition keys (e.g., region codes) and reject all others.

from airflow.sdk import AllowedKeyMapper, Asset, PartitionedAssetTimetable, asset

region_raw_stats = Asset(uri="file://incoming/player-stats/by-region.csv", name="region_raw_stats")


@asset(
    uri="file://analytics/player-stats/regional-breakdown.csv",
    schedule=PartitionedAssetTimetable(
        assets=region_raw_stats,
        default_partition_mapper=AllowedKeyMapper(["us", "eu", "apac"]),
    ),
    tags=["player-stats", "regional"],
)
def regional_stats_breakdown():
    pass

Verified — from airflow-core/src/airflow/example_dags/example_asset_partition.py

AllowedKeyMapper validates that upstream partition keys belong to a fixed set. If the upstream emits "us", the downstream triggers with key "us". If it emits "latam" (not in the list), the event is silently ignored — no downstream DAG run is created for that partition.

This is useful for segment-based partitioning where partition keys are categorical values rather than timestamps.


9. Configure Per-Asset Mapper Overrides

Goal: Apply different partition mappers to different upstream assets in the same consumer DAG.

from airflow.sdk import (
    DAG,
    Asset,
    IdentityMapper,
    PartitionedAssetTimetable,
    StartOfDayMapper,
    StartOfHourMapper,
    StartOfYearMapper,
    task,
)

hourly_sales = Asset(uri="file://incoming/sales/hourly.csv", name="hourly_sales")
daily_targets = Asset(uri="file://incoming/sales/targets.csv", name="daily_targets")

with DAG(
    dag_id="join_sales_and_targets",
    schedule=PartitionedAssetTimetable(
        assets=hourly_sales & daily_targets,
        default_partition_mapper=StartOfDayMapper(),
        partition_mapper_config={
            daily_targets: IdentityMapper(),
        },
    ),
):

    @task
    def join_data():
        pass

    join_data()

Adapted from airflow-core/docs/authoring-and-scheduling/assets.rst

The default_partition_mapper applies to all upstream assets unless overridden in partition_mapper_config. Here, hourly_sales events are mapped with StartOfDayMapper (truncating hourly keys to daily), while daily_targets events are passed through unchanged with IdentityMapper.

The downstream DAG triggers only when both assets produce a matching partition key after their respective mappers are applied. If hourly_sales emits "2026-03-10T09:00:00" (mapped to "2026-03-10") and daily_targets emits "2026-03-10" (unchanged), the keys match and the DAG run is created.

You can also use Asset.ref() to reference assets by name or URI when the full Asset object isn't available:

partition_mapper_config={
    Asset.ref(name="daily_targets"): IdentityMapper(),
}

Caution with mismatched mappers: If mappers produce incompatible key formats, the downstream DAG will never trigger. For example, mapping one asset with StartOfYearMapper (output: "2026") and another with StartOfHourMapper (output: "2026-03-10T09") means the keys will never align.


10. Read partition_key Inside a Running Task

Goal: Access the resolved partition key at task execution time to drive partition-specific logic.

from airflow.sdk import DAG, Asset, PartitionedAssetTimetable, StartOfHourMapper, task

combined_player_stats = Asset(uri="file://curated/player-stats/combined.csv", name="combined_player_stats")

with DAG(
    dag_id="process_partition",
    schedule=PartitionedAssetTimetable(
        assets=combined_player_stats,
        default_partition_mapper=StartOfHourMapper(),
    ),
    catchup=False,
):

    @task
    def process(dag_run=None):
        partition_key = dag_run.partition_key
        print(f"Processing partition: {partition_key}")

    process()

Verified — from airflow-core/src/airflow/example_dags/example_asset_partition.py

The dag_run.partition_key attribute is available on the DagRun instance passed to every task. For partitioned DAG runs, it contains the resolved downstream partition key (after mapper transformation). For non-partitioned DAG runs, it is None.


11. Trigger a Partitioned DAG Run via REST API

Goal: Manually materialize a specific partition by triggering a DAG run with an explicit partition key.

curl -X POST "http://<airflow-host>/api/v2/dags/aggregate_regional_sales/dagRuns" \
  -H "Content-Type: application/json" \
  -d '{
    "logical_date": "2026-03-10T00:00:00Z",
    "partition_key": "us|2026-03-10T09:00:00"
  }'

Verified — from airflow-core/docs/authoring-and-scheduling/assets.rst

The REST API accepts partition_key in the request body when triggering a DAG run. The partition key is stored directly on the DAG run and is accessible via dag_run.partition_key in tasks. This bypasses the normal partition mapper pipeline — the key you provide is used as-is.

You can also trigger a partitioned DAG run from the Airflow UI via the "Trigger DAG" dialog, which includes a partition_key field.


Not Yet Implemented

The following AIP-76 features are proposed but not present in the current codebase:

  • PartitionAtRuntime — Dynamic partition keys determined during execution based on runtime data (e.g., querying a database for active customers). Tracked in #44146.
  • PartitionByInterval / PartitionBySequence / PartitionByProduct — The AIP proposes these as high-level partition definition classes on assets. The implementation instead uses CronPartitionTimetable for producing partitions and PartitionMapper subclasses for consuming them.
  • Custom PartitionKey subclasses — The AIP proposes rich, typed partition keys (e.g., a Position(x, y, z) class). The implementation uses plain string keys.
  • Rollup / many-to-one partition aggregation — The ability to require that all upstream partitions within a range (e.g., all 24 hourly partitions) are present before triggering a daily downstream run. Currently, the first matching partition key triggers the downstream DAG. A TODO in the scheduler tracks this.
  • Partition-aware backfill — The AIP describes explicit backfill for time-based partitions. Integration with the backfill system is not yet complete.
  • Partition skipping — Runtime decision to skip specific partitions.
  • Watermarking — Incremental load tracking via partition markers.

Manifest / ADR

Definition: AIP-to-User-Stories Playbook Skill

1. Intent & Context

  • Goal: Create a Claude Code skill (/aip-user-stories) that auto-detects its mode from inputs: (1) post-implementation (PR URLs provided) generates verified recipe/how-to guides, and (2) pre-implementation (no PR URLs) generates user stories with speculative code to help AIP authors validate their design.
  • Mental Model: AIPs describe design intent; PRs and merged code represent actual implementation. In post mode, the skill bridges this gap with verified recipes. In pre mode, no implementation exists — the skill helps AIP authors think through user stories and surface design questions. Mode is auto-detected: PRs present → post; no PRs → pre.
  • Interview: thorough
  • Medium: local

2. Approach

  • Architecture: Single skill directory at /home/USER/repositories/airflow/.claude/skills/aip-user-stories/ containing:

    • SKILL.md — main skill prompt with phase-based execution flow
    • references/playbook-template.md — output template for both modes (recipes for playbook, user stories for assist)

    Two modes, auto-detected from inputs:

    Invocation: /aip-user-stories <AIP-URL> [<PR-URL>...] [<file>...]

    • PR URLs present → post-implementation mode
    • No PR URLs → pre-implementation mode

    Post-implementation mode (PRs provided):

    1. Parse — extract AIP URL (or pasted content), PR URLs, optional example file paths. URLs identified by https:// prefix; everything else is a file path.
    2. Fetch — retrieve AIP content via WebFetch (fallback: ask user to paste), PR diffs/metadata via gh CLI, read local example/source/test files.
    3. Analyze — cross-reference AIP features against actual code, flag unimplemented features. When multiple PRs conflict, prefer the latest PR with code changes (docs-only PRs inform context but don't override).
    4. Propose — present recipe list (title + one-liner) grouped by concept for user approval. For unimplemented AIP features, ask user whether to include (all-placeholder code) or skip.
    5. Generate — for each approved recipe, verify code against codebase (source, example DAGs, tests), produce content.
    6. Assemble — combine overview + recipes into playbook, write to .claude/aip-{number}.md.

    Pre-implementation mode (no PRs):

    1. Parse — extract AIP URL (or pasted content). File paths ignored with a warning (no implementation to reference).
    2. Fetch — retrieve AIP content via WebFetch (fallback: ask user to paste).
    3. Analyze — extract proposed features, APIs, and use cases from the AIP. No code verification (nothing is implemented).
    4. Propose — present user story list (title + one-liner) grouped by concept for user approval.
    5. Generate — for each approved story, produce: user story with goal, speculative code (all marked # PROPOSED API — not yet implemented), and open design questions the AIP should address.
    6. Assemble — combine overview + user stories into document, write to .claude/aip-{number}.md.
  • Execution Order:

    • D1 (SKILL.md) → D2 (playbook template) → D3 (verify with AIP-76 example)
    • Rationale: SKILL.md defines process; template defines output format; AIP-76 validates the skill works.
  • Risk Areas:

    • [R-1] WebFetch fails on Confluence AIP pages | Detect: empty content. Mitigation: paste fallback.
    • [R-2] LLM fills code blocks with plausible but wrong API calls (post mode) | Detect: code uses classes/functions not in codebase. Mitigation: explicit verification instruction + placeholder fallback.
    • [R-3] Recipe list misses important use cases visible in tests but not in examples | Detect: test files show uncovered patterns. Mitigation: include tests as verification source.
    • [R-4] Pre mode generates user stories that don't match AIP intent | Detect: stories propose features the AIP doesn't describe. Mitigation: stories must trace back to specific AIP sections.
  • Trade-offs:

    • [T-1] Completeness vs accuracy → Prefer accuracy (post mode). When in doubt between adapted and unverified, prefer adapted if the individual components are each verified. Prefer completeness (pre mode — better to surface more stories for the author to evaluate).
    • [T-2] AIP fidelity vs implementation truth → Prefer implementation (post). Prefer AIP (pre — no implementation exists).
    • [T-3] Detailed placeholders vs brief placeholders → Prefer brief (TODO + See reference + ellipsis).

3. Global Invariants (The Constitution)

  • [INV-G1] The skill prompt must not contain ambiguous instructions, vague language, or implicit expectations. No prescriptive HOW (step-by-step tool instructions), no arbitrary limits, no weak language ("try to", "maybe"). No AI-typical vocabulary (delve, tapestry, landscape, leverage, harness, navigate, seamless, robust, transformative, etc.) in its own prose or output instructions. | Verify: prompt quality review + vocabulary check

    verify:
      method: subagent
      agent: general-purpose
      model: inherit
      prompt: "Review the skill prompt at /home/USER/repositories/airflow/.claude/skills/aip-user-stories/SKILL.md against these prompt quality criteria: clarity (no ambiguous instructions), no conflicts (no contradictory rules), structure (critical rules surfaced prominently), information density (every word earns its place), no anti-patterns (no prescriptive HOW, arbitrary limits, capability instructions, weak language like 'try to' or 'maybe'), invocation fit (trigger/caller/consumer match), domain context (Airflow terms captured), complexity fit (matches task complexity), edge case coverage (handles boundary inputs), output calibration (format/length/detail match use case), emotional tone (calm, direct, trusted advisor). Report any MEDIUM+ issues."
    verify:
      method: bash
      command: "grep -i -E '(delve|tapestry|landscape|leverage|harness|navigate|seamless|robust|transformative|meticulous|elevate|foster|empower|underpin|multifaceted|holistic|paradigm|synergy|ecosystem)' /home/USER/repositories/airflow/.claude/skills/aip-user-stories/SKILL.md /home/USER/repositories/airflow/.claude/skills/aip-user-stories/references/*.md 2>/dev/null && echo 'FAIL: AI slop vocabulary found' || echo 'PASS'"
  • [INV-G2] The skill directory must have SKILL.md + references/playbook-template.md. Template in companion file, not inlined. | Verify: directory structure check

    verify:
      method: bash
      command: "test -f /home/USER/repositories/airflow/.claude/skills/aip-user-stories/SKILL.md && test -f /home/USER/repositories/airflow/.claude/skills/aip-user-stories/references/playbook-template.md && echo 'PASS' || echo 'FAIL: missing SKILL.md or references/playbook-template.md'"
  • [INV-G3] SKILL.md must have a description field that follows the What + When + Triggers pattern (trigger specification, not human-readable summary). Must include trigger terms for both modes (pre and post). | Verify: description check

    verify:
      method: subagent
      agent: general-purpose
      model: inherit
      prompt: "Read /home/USER/repositories/airflow/.claude/skills/aip-user-stories/SKILL.md frontmatter. Check that the 'description' field follows the pattern: WHAT the skill does + WHEN to use it + trigger terms. It should cover both pre-implementation and post-implementation modes. It should NOT be a human-readable summary but a trigger specification. Report PASS or FAIL with explanation."
  • [INV-G4] In post mode, code blocks use three tiers: (1) Verified — pattern exists verbatim in codebase; clean code, no markers. (2) Adapted — combines verified components in a new way (e.g., swapping one mapper for another); clean code, brief note below the block: "Adapted from [source file]". (3) Unverified — no codebase evidence; uses placeholder: # TODO: Implement [description] / # See: [reference] / .... Verification sources: source code, example DAGs, and test files. | Verify: three-tier instruction

    verify:
      method: subagent
      agent: general-purpose
      model: inherit
      prompt: "Read /home/USER/repositories/airflow/.claude/skills/aip-user-stories/SKILL.md. Verify the post-implementation mode defines three code block tiers: (1) Verified — verbatim from codebase, no markers, (2) Adapted — combines verified components, noted below block, (3) Unverified — TODO placeholder. Verification sources must include source code, example DAGs, and test files. Report PASS or FAIL."
  • [INV-G5] In pre mode, ALL code blocks must be marked as speculative with # PROPOSED API — not yet implemented. No code should appear to be verified or production-ready. | Verify: pre mode code marking

    verify:
      method: subagent
      agent: general-purpose
      model: inherit
      prompt: "Read /home/USER/repositories/airflow/.claude/skills/aip-user-stories/SKILL.md. Verify the pre-implementation mode instructions require ALL code blocks to be marked as speculative/proposed, not verified. Look for explicit instruction about marking code as 'PROPOSED API' or similar. Report PASS or FAIL."
  • [INV-G6] In post mode, the skill must follow implementation truth, not AIP spec. When AIP proposes APIs that differ from implementation, the playbook follows the implementation. | Verify: source of truth instruction

    verify:
      method: subagent
      agent: general-purpose
      model: inherit
      prompt: "Read /home/USER/repositories/airflow/.claude/skills/aip-user-stories/SKILL.md. Verify the post-implementation mode contains an explicit instruction that the playbook follows actual implementation, NOT the AIP specification, when they diverge. Report PASS or FAIL."
  • [INV-G7] SKILL.md must include a Gotchas section with at least 3 specific, actionable failure modes grounded in this skill's domain. | Verify: gotchas check

    verify:
      method: subagent
      agent: general-purpose
      model: inherit
      prompt: "Read /home/USER/repositories/airflow/.claude/skills/aip-user-stories/SKILL.md. Find the Gotchas section. Verify it has at least 3 items, each specific to AIP/Airflow/code-verification domain and actionable. Report PASS or FAIL with the gotchas found."
  • [INV-G8] The playbook template (references/playbook-template.md) must include: prerequisites/version section, brief conceptual overview section, and recipe/story sections. Must NOT include API reference, migration guide, or troubleshooting sections. | Verify: template structure

    verify:
      method: subagent
      agent: general-purpose
      model: inherit
      prompt: "Read /home/USER/repositories/airflow/.claude/skills/aip-user-stories/references/playbook-template.md. Verify it includes sections for: (1) prerequisites/version requirements, (2) brief conceptual overview, (3) recipe sections (post mode) or user story sections (pre mode). Verify it does NOT include: API reference tables, migration guides, troubleshooting sections. Report PASS or FAIL."
  • [INV-G9] The skill must write output to .claude/aip-{number}.md where {number} is the AIP number extracted from the URL or provided by the user. | Verify: output path instruction

    verify:
      method: subagent
      agent: general-purpose
      model: inherit
      prompt: "Read /home/USER/repositories/airflow/.claude/skills/aip-user-stories/SKILL.md. Verify it instructs the LLM to write output to '.claude/aip-{number}.md' where {number} is extracted from the AIP URL (or asked from the user). Report PASS or FAIL."
  • [INV-G10] In pre mode, each user story must include open design questions probing: (a) API ergonomics — easy to use correctly, hard to misuse? (b) edge cases — unusual inputs or configurations? (c) compatibility — interaction with existing Airflow patterns (e.g., catchup, backfill, dynamic task mapping)? (d) implementation feasibility — constraints the AIP hasn't addressed? Generic questions like "what about error handling?" don't count. | Verify: design questions instruction

    verify:
      method: subagent
      agent: general-purpose
      model: inherit
      prompt: "Read /home/USER/repositories/airflow/.claude/skills/aip-user-stories/SKILL.md. Verify the pre-implementation mode instructions require each user story to include open design questions that probe specific areas (API ergonomics, edge cases, compatibility with existing Airflow patterns, implementation feasibility). Generic questions should not count. Report PASS or FAIL."

4. Process Guidance (Non-Verifiable)

  • [PG-1] Skill type: Document/Scaffolding skill — produces structured output from external inputs.
  • [PG-2] Emotional tone: Calm, direct, trusted advisor. No urgency language. Failure is normal.
  • [PG-3] Empty input behavior: If no arguments provided, print usage synopsis showing both modes and stop.
  • [PG-4] Phase markers: Use clear phase headings in the skill prompt so instructions for each phase don't rot mid-context.
  • [PG-5] No prescriptive HOW: State goals and constraints. Don't prescribe tool usage sequences.
  • [PG-6] Playbook prose quality: Each recipe should explain WHY this pattern works, not just WHAT the code does. No AI slop. If the user chose to skip unimplemented AIP features during the Propose phase, add a brief "Not Yet Implemented" note at the end listing them. Don't include this section if all features were covered.
  • [PG-7] AIP number extraction: Extract from URL path (e.g., "AIP-76"). If undeterminable, ask user.
  • [PG-8] No provenance markers: Output code blocks are clean. Verification is internal to generation.
  • [PG-9] File overwrite: If .claude/aip-{number}.md already exists, ask user before overwriting.
  • [PG-10] Minimum input (post mode): At least one PR URL required. AIP URL + PR URL(s) is minimum valid invocation. Example file paths optional.
  • [PG-11] Minimum input (pre mode): Only AIP URL (or pasted content) required. File paths ignored with a warning (no implementation to reference). When presenting usage, show both invocation patterns: URL-based and paste-based.
  • [PG-12] Pre mode code quality: Speculative code should follow AIP's proposed API as closely as possible, using the AIP's own code examples as the basis. Mark all code as proposed/unverified.
  • [PG-13] Mode auto-detection: Mode is determined by presence of PR URLs. If any https://github.com/.../pull/... URLs are in the arguments, use post mode. Otherwise, pre mode. No flags needed.
  • [PG-14] Recipe granularity: One recipe per distinct use case that the feature enables. A use case is a specific problem the user is solving, not an API class or configuration option. If two API classes serve the same use case, they belong in one recipe. If one API class serves multiple use cases, split into separate recipes.
  • [PG-15] Recipe sizing: Each recipe's code should be the minimal example that illustrates how the feature addresses the use case. No boilerplate, no unrelated setup. Explanation should cover WHY this pattern works, not line-by-line narration.
  • [PG-16] Terminology consistency: Use 'recipe' in post mode, 'user story' in pre mode, 'playbook' for the overall output, 'AIP' for the input. Modes are 'pre' and 'post' internally.

5. Known Assumptions

  • [ASM-1] The Airflow codebase is checked out locally and the skill runs from within the repo. | Default: yes (skill lives in .claude/skills/) | Impact if wrong: post mode code verification fails; pre mode unaffected.
  • [ASM-2] The gh CLI is available and authenticated for fetching PR data. | Default: yes | Impact if wrong: PR data cannot be fetched in post mode; pre mode unaffected.
  • [ASM-3] WebFetch works reliably on Confluence pages. | Default: uncertain (Confluence pages are public but JavaScript-heavy; WebFetch may return partial content) | Impact if wrong: user pastes content instead. Paste is a co-equal input path, not a degraded fallback.
  • [ASM-4] One AIP maps to a manageable number of recipes/stories (roughly 3-15). | Default: yes | Impact if wrong: large output; user can trim via the Propose phase.
  • [ASM-5] WebFetch is available as a built-in Claude Code tool. | Default: yes | Impact if wrong: AIP URL fetching fails; user must paste.

6. Deliverables (The Work)

Deliverable 1: SKILL.md — Main Skill Prompt

The skill file at /home/USER/repositories/airflow/.claude/skills/aip-user-stories/SKILL.md containing the skill prompt and both mode definitions. References the template in references/playbook-template.md.

Acceptance Criteria:

  • [AC-1.1] SKILL.md has valid YAML frontmatter with name: aip-user-stories and a trigger-pattern description covering both modes. | Verify: frontmatter check

    verify:
      method: bash
      command: "head -10 /home/USER/repositories/airflow/.claude/skills/aip-user-stories/SKILL.md | grep -q 'name: aip-user-stories' && echo 'PASS' || echo 'FAIL'"
  • [AC-1.2] SKILL.md defines argument parsing: <AIP-URL> [<PR-URL>...] [<file>...]. PR URLs auto-detected as GitHub pull request URLs. Mode auto-detected: PR URLs present → post, absent → pre. No arguments prints usage. | Verify: argument parsing

    verify:
      method: subagent
      agent: general-purpose
      model: inherit
      prompt: "Read /home/USER/repositories/airflow/.claude/skills/aip-user-stories/SKILL.md. Verify it defines argument parsing supporting: (1) AIP URL + optional PR URLs + optional file paths, (2) auto-detection of mode from presence of PR URLs, (3) URL detection by https:// prefix, (4) usage message when no arguments. Report PASS or FAIL."
  • [AC-1.3] SKILL.md defines the six execution phases for post mode (Parse, Fetch, Analyze, Propose, Generate, Assemble) with clear inputs/outputs. | Verify: post mode phases

    verify:
      method: subagent
      agent: general-purpose
      model: inherit
      prompt: "Read /home/USER/repositories/airflow/.claude/skills/aip-user-stories/SKILL.md. Verify it defines post-implementation mode execution phases covering: (1) argument parsing, (2) fetching AIP + PR data + local files, (3) analyzing features against implementation, (4) proposing recipe list for user approval, (5) generating verified recipes, (6) assembling and writing playbook. Report PASS or FAIL."
  • [AC-1.4] SKILL.md defines the six execution phases for pre mode (Parse, Fetch, Analyze, Propose, Generate, Assemble) with appropriate differences: no code verification, speculative code, design questions. | Verify: pre mode phases

    verify:
      method: subagent
      agent: general-purpose
      model: inherit
      prompt: "Read /home/USER/repositories/airflow/.claude/skills/aip-user-stories/SKILL.md. Verify it defines pre-implementation mode execution phases that: (1) don't require PR URLs, (2) skip code verification, (3) produce speculative code marked as proposed, (4) include open design questions per story, (5) propose user stories for approval. Report PASS or FAIL."
  • [AC-1.5] Propose phase presents items as a numbered list with title + one-liner, grouped by concept, and waits for user approval. | Verify: proposal format

    verify:
      method: subagent
      agent: general-purpose
      model: inherit
      prompt: "Read /home/USER/repositories/airflow/.claude/skills/aip-user-stories/SKILL.md. Verify the Propose phase (both modes) instructs: (1) numbered list with title + one-sentence description, (2) grouped by concept, (3) wait for user approval before generating. Report PASS or FAIL."
  • [AC-1.6] Handles unimplemented AIP features (post mode): asks user whether to include with placeholder code or skip. | Verify: unimplemented features

    verify:
      method: subagent
      agent: general-purpose
      model: inherit
      prompt: "Read /home/USER/repositories/airflow/.claude/skills/aip-user-stories/SKILL.md. Verify post mode handles AIP features not found in implementation by asking user to include or skip. Report PASS or FAIL."
  • [AC-1.7] Error handling: fail with clear message for inaccessible URLs. For AIP content, URL fetch and pasted content are equally valid input paths — present both options rather than trying URL first and falling back. | Verify: error handling

    verify:
      method: subagent
      agent: general-purpose
      model: inherit
      prompt: "Read /home/USER/repositories/airflow/.claude/skills/aip-user-stories/SKILL.md. Verify it handles inaccessible URLs with clear messages. For AIP content, verify that pasted content is treated as an equally valid input path (not just a fallback). Report PASS or FAIL."
  • [AC-1.8] Gotchas section with at least 3 specific, actionable failure modes. | Verify: gotchas quality

    verify:
      method: subagent
      agent: general-purpose
      model: inherit
      prompt: "Read /home/USER/repositories/airflow/.claude/skills/aip-user-stories/SKILL.md. Find the Gotchas section. Verify at least 3 items, each specific to AIP/Airflow domain and actionable. Report PASS or FAIL."
  • [AC-1.9] Version detection: post mode greps the PR diff for versionadded:: directives; if not found, asks the user. Pre mode asks the user for target version. | Verify: version detection

    verify:
      method: subagent
      agent: general-purpose
      model: inherit
      prompt: "Read /home/USER/repositories/airflow/.claude/skills/aip-user-stories/SKILL.md. Verify version detection: (1) post mode looks for versionadded directives in PR diff, falls back to asking user, (2) pre mode asks user for target version. Report PASS or FAIL."
  • [AC-1.10] Multiple PR handling (post mode): latest PR with code changes takes priority; docs-only PRs inform context. | Verify: multi-PR

    verify:
      method: subagent
      agent: general-purpose
      model: inherit
      prompt: "Read /home/USER/repositories/airflow/.claude/skills/aip-user-stories/SKILL.md. Verify post mode handles multiple PRs: latest code-change PR takes priority, docs-only PRs inform context. Report PASS or FAIL."
  • [AC-1.11] SKILL.md references the playbook template in references/playbook-template.md and instructs the LLM to follow it for output structure in both modes. | Verify: template reference

    verify:
      method: bash
      command: "grep -q 'playbook-template' /home/USER/repositories/airflow/.claude/skills/aip-user-stories/SKILL.md && echo 'PASS' || echo 'FAIL: no reference to playbook template'"

Deliverable 2: Playbook Template

Reference template at /home/USER/repositories/airflow/.claude/skills/aip-user-stories/references/playbook-template.md.

Acceptance Criteria:

  • [AC-2.1] Template includes a prerequisites section with placeholders for Airflow version and required packages. | Verify: prerequisites section

    verify:
      method: bash
      command: "grep -i 'prerequisit' /home/USER/repositories/airflow/.claude/skills/aip-user-stories/references/playbook-template.md && echo 'PASS' || echo 'FAIL'"
  • [AC-2.2] Template includes a conceptual overview section (2-3 paragraphs from AIP motivation). | Verify: overview section

    verify:
      method: bash
      command: "grep -i 'overview' /home/USER/repositories/airflow/.claude/skills/aip-user-stories/references/playbook-template.md && echo 'PASS' || echo 'FAIL'"
  • [AC-2.3] Template defines recipe structure (post mode): title, goal, per-recipe prerequisites, code block(s), explanation. | Verify: recipe structure

    verify:
      method: subagent
      agent: general-purpose
      model: inherit
      prompt: "Read /home/USER/repositories/airflow/.claude/skills/aip-user-stories/references/playbook-template.md. Verify the recipe section includes: (1) title, (2) goal, (3) per-recipe prerequisites, (4) code block area, (5) explanation. Report PASS or FAIL."
  • [AC-2.4] Template defines user story structure (pre mode): title, goal, speculative code (marked as proposed), open design questions. | Verify: story structure

    verify:
      method: subagent
      agent: general-purpose
      model: inherit
      prompt: "Read /home/USER/repositories/airflow/.claude/skills/aip-user-stories/references/playbook-template.md. Verify it includes a pre-implementation mode section with: (1) user story title, (2) goal, (3) speculative code area with 'PROPOSED API' marking, (4) open design questions area. Report PASS or FAIL."
  • [AC-2.5] Template does NOT include API reference, migration guide, or troubleshooting sections. | Verify: scope check

    verify:
      method: bash
      command: "grep -i -E '(api reference|migration guide|troubleshoot)' /home/USER/repositories/airflow/.claude/skills/aip-user-stories/references/playbook-template.md && echo 'FAIL: out-of-scope sections found' || echo 'PASS'"

Deliverable 3: Validation with AIP-76

Manual review that the skill would produce coherent output for AIP-76 inputs.

Acceptance Criteria:

  • [AC-3.1] In post mode (AIP-76 + PR docs: asset partition #63262 + example_asset_partition.py), the skill would identify recipe candidates covering: CronPartitionTimetable, PartitionedAssetTimetable, temporal mappers, ProductMapper, AllowedKeyMapper. | Verify: manual review

    verify:
      method: manual
      prompt: "Review SKILL.md's Analyze phase and confirm it would identify these patterns from AIP-76 inputs."
  • [AC-3.2] In post mode, PartitionAtRuntime (from AIP-76) would be flagged as unimplemented and the user asked how to handle it. | Verify: manual review

    verify:
      method: manual
      prompt: "Review SKILL.md's unimplemented feature detection and confirm it applies to PartitionAtRuntime."
  • [AC-3.3] In pre mode (AIP-76 only, no PRs), the skill would generate user stories using AIP's proposed API (PartitionByInterval, PartitionBySequence, PartitionAtRuntime) with speculative code and design questions. | Verify: manual review

    verify:
      method: manual
      prompt: "Review SKILL.md's pre mode and confirm it would use AIP-76's proposed API in speculative code blocks."


Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

Generated-by: Claude Opus 4.6 following the guidelines


  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

@boring-cyborg boring-cyborg Bot added area:dev-tools backport-to-v3-2-test Mark PR with this label to backport to v3-2-test branch labels Apr 24, 2026
@Dev-iL Dev-iL requested review from Lee-W and uranusjr April 24, 2026 12:16
Copy link
Copy Markdown
Member

@potiuk potiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nie!

@potiuk
Copy link
Copy Markdown
Member

potiuk commented Apr 25, 2026

You need to rebase though :)

@Dev-iL
Copy link
Copy Markdown
Collaborator Author

Dev-iL commented Apr 26, 2026

You need to rebase though :)

c'est la vie 🤷‍♂️

@Dev-iL Dev-iL force-pushed the 2604/playbook_skill branch 4 times, most recently from 2c6dcf2 to 4f5ab98 Compare April 26, 2026 08:06
Comment thread .github/skills/aip-user-stories/SKILL.md
@Dev-iL Dev-iL force-pushed the 2604/playbook_skill branch from 4f5ab98 to 03b5705 Compare April 26, 2026 16:07
Copy link
Copy Markdown
Member

@Lee-W Lee-W left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a few nits, but mostly good

Comment thread .github/skills/aip-user-stories/SKILL.md Outdated
Comment thread .github/skills/aip-user-stories/SKILL.md Outdated
Comment thread .github/skills/aip-user-stories/SKILL.md Outdated
Comment thread .github/skills/aip-user-stories/SKILL.md Outdated
Comment thread .github/skills/aip-user-stories/SKILL.md Outdated
Comment thread .github/skills/aip-user-stories/SKILL.md Outdated
@Dev-iL Dev-iL force-pushed the 2604/playbook_skill branch from 03b5705 to f33ec3d Compare April 27, 2026 09:12
@Dev-iL
Copy link
Copy Markdown
Collaborator Author

Dev-iL commented Apr 27, 2026

Added an example for "pre mode" using AIP-93.

@jroachgolf84 @gyli - would appreciate your comments on the output's usefulness.

Copy link
Copy Markdown
Collaborator

@jroachgolf84 jroachgolf84 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it, I think it makes sense!

@Dev-iL Dev-iL force-pushed the 2604/playbook_skill branch from f33ec3d to c35fd47 Compare April 29, 2026 18:51
Adds a Claude Code skill that generates verified recipe playbooks from Airflow Improvement Proposals (AIPs). Supports two modes: post-implementation (generates verified recipes from actual codebase code and PRs) and pre-implementation (generates speculative user stories to help AIP authors validate design before coding).

Supporting changes:
- Update .gitignore to track .claude/skills/ while ignoring other .claude/ content
- Add CODEOWNERS entry for .claude/skills/
- Widen license header detection window to cover files with frontmatter
- Narrow pre-commit license-check excludes (instructions and skill files
  now carry headers, so the broad excludes are no longer needed)
- Add Apache license header to code-review.instructions.md
- Exclude .claude/skills from blacken-docs (Markdown code fences are not
  standalone Python files)
@Dev-iL Dev-iL force-pushed the 2604/playbook_skill branch from c35fd47 to c033ed8 Compare April 29, 2026 20:27
@Lee-W Lee-W merged commit 0960ad2 into apache:main Apr 30, 2026
78 checks passed
@github-actions
Copy link
Copy Markdown
Contributor

Backport failed to create: v3-2-test. View the failure log Run details

Note: As of Merging PRs targeted for Airflow 3.X
the committer who merges the PR is responsible for backporting the PRs that are bug fixes (generally speaking) to the maintenance branches.

In matter of doubt please ask in #release-management Slack channel.

Status Branch Result
v3-2-test Commit Link

You can attempt to backport this manually by running:

cherry_picker 0960ad2 v3-2-test

This should apply the commit to the v3-2-test branch and leave the commit in conflict state marking
the files that need manual conflict resolution.

After you have resolved the conflicts, you can continue the backport process by running:

cherry_picker --continue

If you don't have cherry-picker installed, see the installation guide.

@Dev-iL Dev-iL deleted the 2604/playbook_skill branch April 30, 2026 05:00
seruman pushed a commit to seruman/airflow that referenced this pull request Apr 30, 2026
potiuk added a commit that referenced this pull request May 2, 2026
…#66275)

The `insert-license` agentic-markdown hook on v3-2-test cannot detect
license headers placed below YAML frontmatter, so it prepends a second
license at the top of the file. That break also confuses markdownlint:
the `---` line of the frontmatter, after a blank line, gets parsed as a
setext-H2 underline, and every later `# ...` heading then fails MD003.

This started failing on v3-2-test once #66169 landed
`.github/skills/prepare-providers-documentation/SKILL.md`, which is not
on the existing per-file exclude list.

The same issue was solved on `main` in #65776 (commit 0960ad2) by
passing `--detect-license-in-X-top-lines '30'` to the SHORT_LICENSE
hook so it recognises an existing license inside the first 30 lines of
the file. Backport only the `.pre-commit-config.yaml` change from that
PR — not the new `aip-user-stories` skill that shipped alongside it —
and add the matching license header inside
`.github/instructions/code-review.instructions.md` (so it does not
regress once `.github/instructions/` is no longer blanket-excluded).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:dev-tools backport-to-v3-2-test Mark PR with this label to backport to v3-2-test branch

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants