Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions examples/basic_demo/project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ models_dir: models
docs:
# Adjust `dag_dir` to change where `fft dag --html` writes documentation (docs/Technical_Overview.md#documentation).
dag_dir: site/dag
include_rendered_sql: true
models:
users_clean.ff:
description: "Normalizes CRM users and extracts email_domain."
columns:
email_domain:
description: "Lowercased domain extracted from email."
mart_users_by_domain.ff:
description: "Aggregates signup counts per email domain."

# Project-level variables accessible via {{ var('key') }} inside models.
# Example:
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ dependencies = [
"pydantic-settings>=2.4",
"python-dotenv>=1.0",
"httpx>=0.28.1",
"sqlparse>=0.5.5",
]

[project.optional-dependencies]
Expand Down Expand Up @@ -153,7 +154,7 @@ known-third-party = ["duckdb", "pandas"]
combine-as-imports = true

[tool.ruff.lint.pylint]
max-args = 15
max-args = 18
max-returns = 15
max-branches = 20
max-statements = 60
Expand Down
85 changes: 85 additions & 0 deletions src/fastflowtransform/artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,3 +348,88 @@ def load_last_run_durations(project_dir: Path) -> dict[str, float]:
if isinstance(name, str) and isinstance(dur_ms, (int, float)):
out[name] = float(dur_ms) / 1000.0
return out


# ---------- TEST RESULTS ----------


@dataclass
class TestResult:
kind: str
table: str # display label (may include arrows for relationships)
relation: str | None # machine-join key (best-effort; usually the tested table)
column: str | None
ok: bool
severity: str # "error" | "warn"
duration_ms: int
msg: str | None = None
param_str: str = ""
example_sql: str | None = None


def write_test_results(
project_dir: Path,
*,
started_at: str,
finished_at: str,
results: list[TestResult],
) -> Path:
"""
Write test_results.json containing a run envelope + individual test outcomes.
"""
project_dir = Path(project_dir)
out_dir = _target_dir(project_dir)
path = out_dir / "test_results.json"

data = {
"metadata": {"tool": "fastflowtransform", "generated_at": _iso_now()},
"test_started_at": started_at,
"test_finished_at": finished_at,
"results": [asdict(r) for r in results],
}
_json_dump(path, data)
return path


# ---------- UNIT TEST RESULTS ----------


@dataclass
class UTestResult:
model: str
case: str
status: str # "pass" | "fail" | "error" | "skip"
duration_ms: int
cache_hit: bool = False

message: str | None = None
target_relation: str | None = None
spec_path: str = ""


def write_utest_results(
project_dir: Path,
*,
started_at: str,
finished_at: str,
failures: int,
results: list[UTestResult],
engine: str | None = None,
) -> Path:
"""
Write utest_results.json containing a run envelope + per-case results.
"""
project_dir = Path(project_dir)
out_dir = _target_dir(project_dir)
path = out_dir / "utest_results.json"

data = {
"metadata": {"tool": "fastflowtransform", "generated_at": _iso_now()},
"utest_started_at": started_at,
"utest_finished_at": finished_at,
"engine": engine or "",
"failures": int(failures or 0),
"results": [asdict(r) for r in results],
}
_json_dump(path, data)
return path
4 changes: 3 additions & 1 deletion src/fastflowtransform/cli/docs_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# fastflowtransform/cli/docs_utils.py
from __future__ import annotations

import inspect
import re
from datetime import UTC, datetime
from pathlib import Path
Expand Down Expand Up @@ -99,7 +100,8 @@ def _build_docs_manifest(
lineage_map = lineage_mod.infer_sql_lineage(rendered, alias_map)
elif n.kind == "python":
func = REGISTRY.py_funcs[n.name]
lineage_map = lineage_mod.infer_py_lineage(func, getattr(n, "requires", None), None)
src = inspect.getsource(func)
lineage_map = lineage_mod.infer_py_lineage(src, getattr(n, "requires", None))
except Exception:
lineage_map = {}

Expand Down
37 changes: 37 additions & 0 deletions src/fastflowtransform/cli/test_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
import re
import time
from collections.abc import Callable, Iterable, Mapping
from contextlib import suppress
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from typing import Any

import typer

from fastflowtransform.artifacts import TestResult, write_test_results
from fastflowtransform.cli.bootstrap import _prepare_context, configure_executor_contracts
from fastflowtransform.cli.options import (
EngineOpt,
Expand Down Expand Up @@ -49,6 +52,7 @@ class DQResult:
severity: Severity = "error"
param_str: str = ""
example_sql: str | None = None
relation: str | None = None


_REF_CALL_RE = re.compile(r"^ref\(\s*(['\"])([^'\"]+)\1\s*\)$")
Expand Down Expand Up @@ -351,6 +355,10 @@ def _run_dq_tests(executor: BaseExecutor, tests: Iterable[Any]) -> list[DQResult
table_for_exec,
) = _prepare_test(raw_test, executor)

relation: str | None = None
if isinstance(table_for_exec, str) and table_for_exec.strip():
relation = table_for_exec.strip()

t0 = time.perf_counter()

runner: Runner | None = TESTS.get(kind)
Expand All @@ -373,6 +381,7 @@ def _run_dq_tests(executor: BaseExecutor, tests: Iterable[Any]) -> list[DQResult
severity=severity,
param_str=param_str,
example_sql=None,
relation=relation,
)
)
continue
Expand All @@ -392,6 +401,7 @@ def _run_dq_tests(executor: BaseExecutor, tests: Iterable[Any]) -> list[DQResult
severity=severity,
param_str=param_str,
example_sql=example,
relation=relation,
)
)

Expand Down Expand Up @@ -494,9 +504,36 @@ def test(
typer.secho("No tests configured.", fg="bright_black")
raise typer.Exit(code=0)

started_at = datetime.now(UTC).isoformat(timespec="seconds")

results = _run_dq_tests(execu, tests)
_print_summary(results)

finished_at = datetime.now(UTC).isoformat(timespec="seconds")

# Persist for docs (best-effort; never fail the command because of artifact IO)
with suppress(Exception):
write_test_results(
ctx.project,
started_at=started_at,
finished_at=finished_at,
results=[
TestResult(
kind=r.kind,
table=r.table,
relation=r.relation,
column=r.column,
ok=bool(r.ok),
severity=str(r.severity),
duration_ms=int(r.ms),
msg=r.msg,
param_str=r.param_str,
example_sql=r.example_sql,
)
for r in results
],
)

# Exit code: count only ERROR fails
failed = sum((not r.ok) and (r.severity != "warn") for r in results)
raise typer.Exit(code=2 if failed > 0 else 0)
Expand Down
37 changes: 37 additions & 0 deletions src/fastflowtransform/cli/utest_cmd.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
# fastflowtransform/cli/utest_cmd.py
from __future__ import annotations

from contextlib import suppress
from datetime import UTC, datetime

import typer

from fastflowtransform.artifacts import UTestResult, write_utest_results
from fastflowtransform.cli.bootstrap import _prepare_context
from fastflowtransform.cli.options import (
CaseOpt,
Expand Down Expand Up @@ -38,14 +43,46 @@ def utest(
echo("ℹ️ No unit tests found (tests/unit/*.yml).") # noqa: RUF001
raise typer.Exit(0)

started_at = datetime.now(UTC).isoformat(timespec="seconds")
collected: list[dict] = []

failures = run_unit_specs(
specs,
ex,
ctx.jinja_env,
only_case=case,
cache_mode=getattr(cache, "value", str(cache)) if cache is not None else "off",
reuse_meta=bool(reuse_meta),
results_out=collected,
)
finished_at = datetime.now(UTC).isoformat(timespec="seconds")

# Write artifact for docs (best-effort; never block exit)
with suppress(Exception):
write_utest_results(
ctx.project,
started_at=started_at,
finished_at=finished_at,
failures=failures,
engine=getattr(ex, "engine_name", None),
results=[
UTestResult(
model=str(r.get("model") or ""),
case=str(r.get("case") or ""),
status=str(r.get("status") or ""),
duration_ms=int(r.get("duration_ms") or 0),
cache_hit=bool(r.get("cache_hit")),
message=(str(r.get("message")) if r.get("message") else None),
target_relation=(
str(r.get("target_relation")) if r.get("target_relation") else None
),
spec_path=str(r.get("spec_path") or ""),
)
for r in collected
if (r.get("model") and r.get("case"))
],
)

raise typer.Exit(code=2 if failures > 0 else 0)


Expand Down
26 changes: 26 additions & 0 deletions src/fastflowtransform/config/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,24 @@ class SeedsBlock(BaseModel):
# ---------------------------------------------------------------------------


class DocsColumnConfig(BaseModel):
"""Column-level docs surfaced in generated documentation."""

model_config = ConfigDict(extra="allow")

description: str | None = None


class DocsModelConfig(BaseModel):
"""Model-level docs surfaced in generated documentation."""

# Allow extra keys so docs stay flexible.
model_config = ConfigDict(extra="allow")

description: str | None = None
columns: dict[str, DocsColumnConfig] = Field(default_factory=dict)


class DocsConfig(BaseModel):
"""
Optional documentation-related configuration.
Expand All @@ -140,11 +158,19 @@ class DocsConfig(BaseModel):

docs:
dag_dir: "site/dag"
include_rendered_sql: true
models:
users:
description: "Raw users table"
columns:
id: "Primary key"
"""

model_config = ConfigDict(extra="forbid")

dag_dir: str | None = None
include_rendered_sql: bool = Field(default=False)
models: dict[str, DocsModelConfig] = Field(default_factory=dict)


# ---------------------------------------------------------------------------
Expand Down
Loading