Skip to content

Mayuresh16/dagsmith

Repository files navigation

DagSmith

Smith production-ready Airflow DAGs from YAML — schema-validated, registry-driven, GCP-ready.

Build CI Nightly
Quality Coverage ≥80% Ruff mypy strict
Stack Python 3.13+ Airflow ≥3.2 Pydantic v2
Meta Version 0.1.0
License License: Apache 2.0
Docs Docs

Table of Contents


Overview

DagSmith is a code-generation framework that compiles structured YAML pipeline definitions into fully typed, production-ready Apache Airflow DAG files. Instead of writing repetitive Python boilerplate for each DAG, you declare your pipeline in YAML and DagSmith handles the rest: imports, operator instantiation, dependency wiring, and code formatting.

┌──────────────┐     ┌──────────────┐     ┌──────────────┐     ┌──────────────┐
│  YAML Spec   │────▶│   Validate   │────▶│   Generate   │────▶│  Format      │
│  (author)    │     │  (Pydantic)  │     │  (.py DAG)   │     │  (ruff)      │
└──────────────┘     └──────────────┘     └──────────────┘     └──────────────┘

Key Features

Feature Description
Author-time validation Pydantic schemas catch bad config before code generation, not at Airflow deploy time
Pluggable registry Add new operators/sensors to a YAML config file with zero Python code changes
GCP-native First-class BigQuery and GCS operator support with automatic FinOps label injection
Clean output Generated DAGs are human-readable, ruff-formatted Python you can review and version-control
16+ built-in operators BigQuery, GCS, Python, Bash, Branching, Sensors, Triggers, TaskGroups — each with full validation
Generic plugin system Register any Airflow operator/sensor in YAML and use immediately
Variable substitution ${VAR__NAME__VAR} expansion across all YAML sections before validation
Full CLI toolkit generate, validate, list, resolve with colorized output and CI-friendly exit codes

Quick Start

Prerequisites

Tool Version Purpose
Python ≥ 3.13 Runtime
uv latest Package manager & script runner
ruff latest Linter & formatter (post-processing)

Install

# Install uv
# Linux / macOS:
curl -LsSf https://astral.sh/uv/install.sh | sh
# Windows (PowerShell):
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex"

# Install ruff
# Linux / macOS:
curl -LsSf https://astral.sh/ruff/install.sh | sh
# Windows (PowerShell):
powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/ruff/install.ps1 | iex"

# Install project dependencies
uv sync --group dev

Generate your first DAG

# Generate all example DAGs
dagsmith generate examples/

# Generate a single DAG
dagsmith generate examples/01_simple_bq_pipeline.yaml

# Validate without generating
dagsmith validate examples/ --strict

# List registered operators
dagsmith list

# Preview variable-expanded YAML
dagsmith resolve examples/01_simple_bq_pipeline.yaml

CLI Reference

dagsmith generate

Render YAML specs into .py DAG files.

dagsmith generate TARGETS [options]
Flag Description
TARGETS One or more YAML file paths or directories (required)
-p, --pattern REGEX Filter YAML files by filename regex
-o, --output-dir DIR Output directory (default: ./dags/)
--dry-run Validate and render without writing files
-x, --fail-fast Stop on first failure
--no-format Skip ruff post-processing

dagsmith validate

Validate YAML specs without generating code. Ideal for CI gates.

dagsmith validate TARGETS [options]
Flag Description
TARGETS One or more YAML file paths or directories (required)
-p, --pattern REGEX Filter YAML files by filename regex
--strict Treat warnings (missing metadata, zero retries, isolated tasks) as errors

dagsmith list

Display all registered operators, sensors, and utilities.

dagsmith list [options]
Flag Description
--origin Filter by section: standard, third_party, custom
--type Filter by class type: operator, sensor, util, model

dagsmith resolve

Expand ${VAR__...__VAR} references and output the fully resolved YAML.

dagsmith resolve TARGETS [options]
Flag Description
TARGETS One or more YAML file paths or directories (required)
-p, --pattern REGEX Filter YAML files by filename regex
-o, --output FILE Write resolved YAML to a file instead of stdout
-x, --fail-fast Stop on first variable expansion error

Global flags: -v, --verbose (debug output), -q, --quiet (warnings/errors only)

YAML Spec Format

Structure

variables:          # Optional — ${VAR} substitution
configurations:     # Optional — reusable config values
metadata:           # Required — documentation metadata
dag:                # Required — airflow.DAG() constructor
gcp:                # Required — GCP connection defaults
default_args:       # Optional — applied to every task
user_defined_macros: # Optional — Jinja macros
tasks:              # Optional — operator/sensor/group specs
dependencies:       # Optional — task execution order

Full Example

variables:
  VAR__PROJECT_ID__VAR: "my-gcp-project-001"

configurations:
  base_path: "/home/airflow/gcs/dags/${VAR__PROJECT_ID__VAR}/"

metadata:
  title: "Daily Account Activity Load"
  owner: "data-team@example.com"
  email: "data-team@example.com"
  version: "1.0.0"
  jira: "DE-101"
  developer_name: "daily_load"

dag:
  dag_id: "sequential_bq"
  description: "Load daily account activity into BigQuery."
  schedule: "0 6 * * *"
  start_date: "2026-01-02 12:13:14"
  timezone: "America/New_York"
  catchup: false
  max_active_runs: 1
  dagrun_timeout: 7200
  is_paused_upon_creation: true
  tags:
    - "warehouse:bigquery"
    - "module:daily_load"

gcp:
  project_id: "${VAR__PROJECT_ID__VAR}"
  location: "us-east4"

default_args:
  owner: "airflow"
  retries: 1
  retry_delay: 60
  email: [ "data-team@example.com" ]
  email_on_failure: true

tasks:
  - task_id: "stage_data"
    operator: BigQueryInsertJobOperator
    sql: "sql/stage_acct_activity.sql"
    params:
      project_id: "${VAR__PROJECT_ID__VAR}"
      src_dataset: "warehouse_tables"

  - task_id: "transform_data"
    operator: BigQueryInsertJobOperator
    sql: "sql/transform_acct_activity.sql"

  - task_id: "load_final"
    operator: BigQueryInsertJobOperator
    sql: "sql/load_acct_activity.sql"
    retries: 3

dependencies:
  - "stage_data >> transform_data >> load_final"
More YAML patterns (click to expand)

Task Groups

tasks:
  - operator: TaskGroup
    group_id: "staging"
    tooltip: "Stage source tables"
    tasks:
      - task_id: "stage_orders"
        operator: BigQueryInsertJobOperator
        sql: "sql/stage_orders.sql"
      - task_id: "stage_customers"
        operator: BigQueryInsertJobOperator
        sql: "sql/stage_customers.sql"
    dependencies:
      - "stage_orders >> stage_customers"

dependencies:
  - "start >> staging >> aggregation"

Sensors

- task_id: "wait_for_upstream"
  operator: ExternalTaskSensor
  external_dag_id: "upstream_pipeline"
  external_task_id: "final_step"
  mode: "reschedule"
  poke_interval: 300
  timeout: 21600
  allowed_states: [ "success" ]
  execution_delta: 3600

Python Callable

- task_id: "validate_params"
  operator: PythonOperator
  python_callable: "callables.validators.validate_params"
  op_kwargs:
    env: "{{ params.env }}"

Generic Plugin Operator

# No Python code needed — register in airflow_registry.yaml
- task_id: "notify_slack"
  operator: SlackWebhookOperator
  slack_webhook_conn_id: "slack_default"
  message: "Daily load completed for {{ ds }}"
  channel: "#data-alerts"

Dependency Syntax

dependencies:
  - "task_a >> task_b >> task_c"         # sequential
  - "[task_x, task_y] >> task_z"         # fan-in
  - "task_z >> [task_a, task_b]"         # fan-out
  - "task_c << [task_a, task_b]"         # fan-in (reverse)
  - "group_a >> group_b"                 # task group references

Variables & Substitution

variables:
  VAR__PROJECT_ID__VAR: "my-gcp-project-001"
  VAR__DATASET__VAR: "warehouse_tables"

gcp:
  project_id: "${VAR__PROJECT_ID__VAR}"

Naming rules: must be ALL_UPPERCASE, begin with VAR__, end with __VAR.

Field Aliases

Canonical Alias Section
retry_delay retry_delay_seconds default_args, task-level
sla sla_seconds default_args
schedule schedule_interval dag
gcp_conn_id google_cloud_conn_id gcp
execution_delta execution_delta_seconds ExternalTaskSensor
execution_date logical_date TriggerDagRunOperator
poke_interval poll_interval GCSObjectsWithPrefixExistenceSensor

Supported Operators & Sensors

Built-in operators have dedicated Pydantic models with full field-level validation.

Standard (Airflow core)

Operator Description
PythonOperator Run a Python callable
BranchPythonOperator Branch based on callable return value
BashOperator Execute a bash command
EmptyOperator No-op placeholder / pipeline marker
TriggerDagRunOperator Trigger another DAG
ExternalTaskSensor Wait for a task in another DAG

BigQuery

Operator Description
BigQueryInsertJobOperator Run SQL via BigQuery Jobs API
BigQueryCheckOperator Assert a SQL query returns truthy
BigQueryValueCheckOperator Assert a SQL scalar matches expected value
BigQueryTableExistenceSensor Wait for a table to exist

GCS (Google Cloud Storage)

Operator Description
GCSToBigQueryOperator Load GCS files into BigQuery
GCSToGCSOperator Copy/move objects between GCS buckets
GCSDeleteObjectsOperator Delete objects from a GCS bucket
GCSObjectsWithPrefixExistenceSensor Wait for objects with a prefix to exist

Generic Plugin

Any operator/sensor registered in src/dagsmith/configs/airflow_registry.yaml works immediately — no Python code changes.

Generic Plugin System

# 1. Register in src/dagsmith/configs/airflow_registry.yaml (or DAGSMITH_EXTRA_REGISTRY)
airflow_class_registry:
  custom:
    SlackWebhookOperator:
      module: airflow.providers.slack.operators.slack_webhook
      class: SlackWebhookOperator
      type: operator

# 2. Use in any YAML spec
tasks:
  - task_id: "notify"
    operator: SlackWebhookOperator
    slack_webhook_conn_id: "slack_default"
    message: "Pipeline complete!"

Use DAGSMITH_EXTRA_REGISTRY env var to maintain a separate registry file without modifying the bundled config:

export DAGSMITH_EXTRA_REGISTRY=/path/to/my_registry.yaml
dagsmith generate specs/
Built-in operators Generic operators Generic sensors
Field validation Full Pydantic schema None (runtime errors) Sensor fields only
Registry entry Not required Required Required
Python changes None None None

FinOps Labels

Every BigQueryInsertJobOperator task automatically gets FinOps labels injected from src/dagsmith/configs/airflow_registry.yaml:

Label Value (Jinja template)
dag_id {{ dag.dag_id }}
task_id {{ task.task_id }}
execution_date {{ ds_nodash }}
instance_name {{ var.value.composer_env_name | default('composer') }}
run_id Cleaned, lowercase dag_run.run_id

Project Layout

dagsmith/
  examples/                      # 18 sample YAML DAG specs
  references/
    reference_template.yaml      # fully documented YAML template
  docs/                          # interactive HTML documentation (GitHub Pages)
  src/
    dagsmith/                    # top-level package
      __init__.py                # version
      __main__.py                # python -m dagsmith
      callables.py               # dotted-path -> (module, fn, alias) resolver
      code_generator.py          # renders YamlDagSpec -> .py string
      constants.py               # color constants, TriggerRule enum
      cron.py                    # cron expression humanizer
      dependencies.py            # >> / << dependency string parser
      loader.py                  # YAML loading + ${VAR} expansion + validation
      utils.py                   # py_repr, safe_var, humanize_readable_time
      configs/
        airflow_registry.yaml    # operator/sensor registry + FinOps labels
        custom_airflow_registry.yaml  # custom/third-party operator overrides
      cli/                       # CLI subpackage (mirrors Airflow's cli/)
        __init__.py              # main() entry point + re-exports
        cli_parser.py            # argparse setup
        cli_formatter.py         # colorized log formatting
        utils.py                 # file resolution + ruff post-processing
        commands/
          __init__.py
          generate.py            # generate sub-command
          validate.py            # validate sub-command
          list_cmd.py            # list sub-command
          resolve.py             # resolve sub-command
      registry/
        __init__.py              # public API exports
        core.py                  # loads airflow_registry.yaml, get_import_line
        models.py                # RegistryEntry, RegistryConfig Pydantic models
      schemas/
        __init__.py              # YamlDagSpec root model, discriminated unions
        base.py                  # BaseTaskSpec, BaseSensorOperatorSpec, DagSpec
        generic.py               # GenericOperatorSpec, GenericSensorSpec
        shared_renderers.py      # render_common_fields, render_bigquery_common_fields
        bigquery/                # BQ operator/sensor specs + renderers
        gcs/                     # GCS operator/sensor specs + renderers
        standard/                # PythonOperator, BashOperator, etc.
  tests/                         # mirrors src/dagsmith/ layout
  pyproject.toml                 # deps, ruff, mypy, pytest config
  Dockerfile                     # lightweight runtime image (python:3.13-slim + uv)

Documentation

Full interactive documentation is available under docs/:

Page Description
Overview Features, how it works, operator table
Quick Start Installation and first DAG
CLI Reference All commands, flags, and examples
YAML Spec Format Every section and field documented
Operators & Sensors All 16 operators with field details
Callables Callbacks, python_callable, placement guide
Best Practices FinOps, aliases, architecture, tips
Full YAML Template Copy-paste reference + generated output examples

GitHub Pages: Enable Pages with source set to /docs folder to host the documentation site.

Development

Toolchain

Tool Purpose Config
uv Package manager + script runner pyproject.toml
ruff Lint + format (line-length: 110) [tool.ruff]
mypy Strict type checking (Python 3.13) [tool.mypy]
pytest Testing (coverage ≥ 80%) [tool.pytest]
yamllint YAML linting (line-length: 110) yamllint-config.yml
pre-commit Git hooks (lint, format, license headers) .pre-commit-config.yaml

Commands

uv sync --group dev             # install all deps (runtime + dev)
uv run pytest                   # run tests (coverage threshold: 80%)
uv run pytest -x                # fail fast
uv run pytest -n auto           # parallel execution
uv run ruff check --fix .       # lint + auto-fix
uv run ruff format .            # format
uv run mypy .                   # type-check

Adding a new operator

  1. Zero-code path: Register in src/dagsmith/configs/airflow_registry.yaml and use as a generic plugin immediately.
  2. Full validation path: Create spec class + renderer in src/dagsmith/schemas/<category>/, add to discriminated unions in src/dagsmith/schemas/__init__.py, add renderer dispatch in src/dagsmith/code_generator.py, register in src/dagsmith/configs/airflow_registry.yaml, add tests.

References & Examples

  • Reference Template — Fully documented YAML template covering every supported section and field with inline comments, defaults, aliases, and usage notes.
  • Examples — 18 numbered sample YAML specs demonstrating specific patterns: simple pipelines, fan-out/fan-in, task groups, sensors, triggers, Python callables, GCS operations, generic plugins, branching, nested groups, macros, and task-level callbacks.
  • Interactive Docs — Browsable HTML documentation with syntax-highlighted examples and copy-to-clipboard.

Contributing

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/my-feature)
  3. Run the full test suite (uv run pytest)
  4. Ensure linting and type checking pass (uv run ruff check . && uv run mypy .)
  5. Submit a pull request

License

Licensed under the Apache License 2.0.

Copyright 2026 DagSmith Contributors (Mayuresh Kedari)

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

    http://www.apache.org/licenses/LICENSE-2.0

About

A YAML-driven Apache Airflow DAG generator with schema validation, operator/sensor registry, and BigQuery/GCP support.

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors