diff --git a/dbt-admin/README.md b/dbt-admin/README.md new file mode 100644 index 0000000..d59aec1 --- /dev/null +++ b/dbt-admin/README.md @@ -0,0 +1,26 @@ +# DBT Admin UI + +A lightweight FastAPI app to manage multiple dbt Core projects in a mesh environment. + +## Features +- Per-project dbt-core version via isolated venvs +- Project registry with root path, profiles dir, and extra adapter packages +- Install/Update dbt for a project +- Evaluate project (dbt parse) and read `manifest.json` for stats +- Simple job history with logs + +## Quickstart + +```bash +cd /workspace/dbt-admin +python3 -m venv .venv +source .venv/bin/activate +pip install -r requirements.txt +uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload +``` + +Open http://localhost:8000 and create your first project. + +Notes: +- Use absolute paths for `root_path` and `profiles_dir`. +- Add adapter packages (e.g., `dbt-postgres==1.8.1`) in Extra Packages. \ No newline at end of file diff --git a/dbt-admin/app/__init__.py b/dbt-admin/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dbt-admin/app/__pycache__/__init__.cpython-313.pyc b/dbt-admin/app/__pycache__/__init__.cpython-313.pyc new file mode 100644 index 0000000..5f26490 Binary files /dev/null and b/dbt-admin/app/__pycache__/__init__.cpython-313.pyc differ diff --git a/dbt-admin/app/__pycache__/db.cpython-313.pyc b/dbt-admin/app/__pycache__/db.cpython-313.pyc new file mode 100644 index 0000000..8b15485 Binary files /dev/null and b/dbt-admin/app/__pycache__/db.cpython-313.pyc differ diff --git a/dbt-admin/app/__pycache__/main.cpython-313.pyc b/dbt-admin/app/__pycache__/main.cpython-313.pyc new file mode 100644 index 0000000..e45e10f Binary files /dev/null and b/dbt-admin/app/__pycache__/main.cpython-313.pyc differ diff --git a/dbt-admin/app/__pycache__/models.cpython-313.pyc b/dbt-admin/app/__pycache__/models.cpython-313.pyc new file mode 100644 index 0000000..a9ec570 Binary files /dev/null and b/dbt-admin/app/__pycache__/models.cpython-313.pyc differ diff --git a/dbt-admin/app/__pycache__/services.cpython-313.pyc b/dbt-admin/app/__pycache__/services.cpython-313.pyc new file mode 100644 index 0000000..308bdef Binary files /dev/null and b/dbt-admin/app/__pycache__/services.cpython-313.pyc differ diff --git a/dbt-admin/app/db.py b/dbt-admin/app/db.py new file mode 100644 index 0000000..5983d02 --- /dev/null +++ b/dbt-admin/app/db.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +from contextlib import contextmanager +from pathlib import Path +from typing import Iterator + +from sqlmodel import SQLModel, create_engine, Session + +DATA_DIR = Path(__file__).resolve().parent.parent / "data" +DATA_DIR.mkdir(parents=True, exist_ok=True) +DB_PATH = DATA_DIR / "app.sqlite3" + +engine = create_engine(f"sqlite:///{DB_PATH}", echo=False, connect_args={"check_same_thread": False}) + + +def init_db() -> None: + from .models import Project, Job # noqa: F401 # ensure models are imported for table creation + SQLModel.metadata.create_all(engine) + + +@contextmanager +def get_session() -> Iterator[Session]: + session = Session(engine) + try: + yield session + session.commit() + except Exception: + session.rollback() + raise + finally: + session.close() \ No newline at end of file diff --git a/dbt-admin/app/main.py b/dbt-admin/app/main.py new file mode 100644 index 0000000..35676c7 --- /dev/null +++ b/dbt-admin/app/main.py @@ -0,0 +1,186 @@ +from __future__ import annotations + +import threading +from datetime import datetime, timedelta +from pathlib import Path +from typing import Optional +from types import SimpleNamespace + +from fastapi import FastAPI, Request, Form, HTTPException +from fastapi.responses import HTMLResponse, RedirectResponse, FileResponse +from fastapi.staticfiles import StaticFiles +from fastapi.templating import Jinja2Templates + +from .db import init_db, get_session +from .models import Job, JobStatus, JobType, Project +from .services import compute_stats_from_manifest, create_job, run_job + +BASE_DIR = Path(__file__).resolve().parent +TEMPLATES_DIR = BASE_DIR / "templates" +STATIC_DIR = BASE_DIR / "static" + +app = FastAPI(title="DBT Admin UI") +app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static") +templates = Jinja2Templates(directory=str(TEMPLATES_DIR)) + + +@app.on_event("startup") +def on_startup() -> None: + init_db() + + +@app.get("/", response_class=HTMLResponse) +def home(request: Request): # type: ignore[no-untyped-def] + with get_session() as session: + projects = session.query(Project).order_by(Project.created_at.desc()).all() + return templates.TemplateResponse("home.html", {"request": request, "projects": projects}) + + +@app.get("/projects/new", response_class=HTMLResponse) +def new_project(request: Request): # type: ignore[no-untyped-def] + return templates.TemplateResponse("new_project.html", {"request": request}) + + +@app.post("/projects") +def create_project( # type: ignore[no-untyped-def] + name: str = Form(...), + root_path: str = Form(...), + dbt_version: str = Form(...), + profiles_dir: Optional[str] = Form(None), + extra_packages: Optional[str] = Form(None), +): + project = Project( + name=name, + root_path=root_path, + dbt_version=dbt_version, + profiles_dir=profiles_dir, + extra_packages=extra_packages, + updated_at=datetime.utcnow(), + ) + with get_session() as session: + session.add(project) + session.flush() + session.refresh(project) + return RedirectResponse(url=f"/projects/{project.id}", status_code=303) + + +@app.get("/projects/{project_id}", response_class=HTMLResponse) +def show_project(request: Request, project_id: int): # type: ignore[no-untyped-def] + with get_session() as session: + project = session.get(Project, project_id) + if not project: + raise HTTPException(status_code=404, detail="Project not found") + jobs = session.query(Job).filter(Job.project_id == project_id).order_by(Job.created_at.desc()).limit(20).all() + stats = compute_stats_from_manifest(project) + return templates.TemplateResponse("project_detail.html", {"request": request, "project": project, "jobs": jobs, "stats": stats}) + + +@app.post("/projects/{project_id}/update") +def update_project( # type: ignore[no-untyped-def] + project_id: int, + name: str = Form(...), + root_path: str = Form(...), + dbt_version: str = Form(...), + profiles_dir: Optional[str] = Form(None), + extra_packages: Optional[str] = Form(None), +): + with get_session() as session: + project = session.get(Project, project_id) + if not project: + raise HTTPException(status_code=404, detail="Project not found") + project.name = name + project.root_path = root_path + project.dbt_version = dbt_version + project.profiles_dir = profiles_dir + project.extra_packages = extra_packages + project.updated_at = datetime.utcnow() + session.add(project) + return RedirectResponse(url=f"/projects/{project_id}", status_code=303) + + +@app.post("/projects/{project_id}/delete") +def delete_project(project_id: int): # type: ignore[no-untyped-def] + with get_session() as session: + project = session.get(Project, project_id) + if not project: + raise HTTPException(status_code=404, detail="Project not found") + session.delete(project) + return RedirectResponse(url="/", status_code=303) + + +@app.post("/projects/{project_id}/install") +def install_project_env(project_id: int): # type: ignore[no-untyped-def] + with get_session() as session: + project = session.get(Project, project_id) + if not project: + raise HTTPException(status_code=404, detail="Project not found") + job = create_job(project, JobType.INSTALL) + thread = threading.Thread(target=run_job, args=(job, project), daemon=True) + thread.start() + return RedirectResponse(url=f"/projects/{project_id}", status_code=303) + + +@app.post("/projects/{project_id}/evaluate") +def evaluate_project(project_id: int): # type: ignore[no-untyped-def] + with get_session() as session: + project = session.get(Project, project_id) + if not project: + raise HTTPException(status_code=404, detail="Project not found") + job = create_job(project, JobType.EVALUATE) + thread = threading.Thread(target=run_job, args=(job, project), daemon=True) + thread.start() + return RedirectResponse(url=f"/projects/{project_id}", status_code=303) + + +@app.get("/jobs/{job_id}/logs") +def get_job_logs(job_id: int): # type: ignore[no-untyped-def] + with get_session() as session: + job = session.get(Job, job_id) + if not job or not job.log_path: + raise HTTPException(status_code=404, detail="Logs not found") + log_path = Path(job.log_path) + if not log_path.exists(): + raise HTTPException(status_code=404, detail="Logs not found") + return FileResponse(path=str(log_path), media_type="text/plain") + + +# Preview routes with mock data +@app.get("/preview", response_class=HTMLResponse) +def preview_home(request: Request): # type: ignore[no-untyped-def] + projects = [ + SimpleNamespace(id=1, name="Marketing Warehouse", dbt_version="1.8.6", root_path="/srv/dbt/marketing"), + SimpleNamespace(id=2, name="Finance Lakehouse", dbt_version="1.7.13", root_path="/srv/dbt/finance"), + ] + return templates.TemplateResponse("home.html", {"request": request, "projects": projects}) + + +@app.get("/preview/projects/{project_id}", response_class=HTMLResponse) +def preview_project(request: Request, project_id: int): # type: ignore[no-untyped-def] + project = SimpleNamespace( + id=project_id, + name="Marketing Warehouse" if project_id == 1 else "Finance Lakehouse", + root_path="/srv/dbt/marketing" if project_id == 1 else "/srv/dbt/finance", + dbt_version="1.8.6" if project_id == 1 else "1.7.13", + profiles_dir="/home/app/.dbt", + extra_packages="dbt-postgres==1.8.6, dbt-redshift==1.8.6", + ) + now = datetime.utcnow() + jobs = [ + SimpleNamespace(id=301, project_id=project_id, job_type="install", status="success", started_at=now - timedelta(hours=6), finished_at=now - timedelta(hours=6, minutes=2), log_path=None), + SimpleNamespace(id=302, project_id=project_id, job_type="evaluate", status="success", started_at=now - timedelta(hours=5), finished_at=now - timedelta(hours=5, minutes=1), log_path=None), + SimpleNamespace(id=303, project_id=project_id, job_type="evaluate", status="failed", started_at=now - timedelta(hours=3), finished_at=now - timedelta(hours=3, minutes=1), log_path=None), + SimpleNamespace(id=304, project_id=project_id, job_type="evaluate", status="running", started_at=now - timedelta(minutes=10), finished_at=None, log_path=None), + ] + stats = { + "models": 128, + "tests": 420, + "snapshots": 6, + "analyses": 4, + "seeds": 12, + "sources": 23, + "macros": 37, + "exposures": 5, + "metrics": 9, + "packages": 3, + } + return templates.TemplateResponse("project_detail.html", {"request": request, "project": project, "jobs": jobs, "stats": stats}) \ No newline at end of file diff --git a/dbt-admin/app/models.py b/dbt-admin/app/models.py new file mode 100644 index 0000000..2e38a45 --- /dev/null +++ b/dbt-admin/app/models.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from datetime import datetime +from enum import Enum +from typing import Optional + +from sqlmodel import Field, SQLModel + + +class JobType(str, Enum): + INSTALL = "install" + EVALUATE = "evaluate" + + +class JobStatus(str, Enum): + PENDING = "pending" + RUNNING = "running" + SUCCESS = "success" + FAILED = "failed" + + +class Project(SQLModel, table=True): + id: Optional[int] = Field(default=None, primary_key=True) + name: str + root_path: str + dbt_version: str + profiles_dir: Optional[str] = None + extra_packages: Optional[str] = Field(default=None, description="Comma-separated pip packages, e.g. dbt-postgres==1.8.1") + created_at: datetime = Field(default_factory=datetime.utcnow) + updated_at: datetime = Field(default_factory=datetime.utcnow) + + +class Job(SQLModel, table=True): + id: Optional[int] = Field(default=None, primary_key=True) + project_id: int = Field(foreign_key="project.id") + job_type: JobType + status: JobStatus = Field(default=JobStatus.PENDING) + created_at: datetime = Field(default_factory=datetime.utcnow) + started_at: Optional[datetime] = None + finished_at: Optional[datetime] = None + log_path: Optional[str] = None + error_message: Optional[str] = None \ No newline at end of file diff --git a/dbt-admin/app/services.py b/dbt-admin/app/services.py new file mode 100644 index 0000000..11ac822 --- /dev/null +++ b/dbt-admin/app/services.py @@ -0,0 +1,201 @@ +from __future__ import annotations + +import json +import os +import shlex +import subprocess +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional, Tuple + +from .models import Job, JobStatus, JobType, Project +from .db import DATA_DIR, get_session + +ENVS_DIR = DATA_DIR / "envs" +ENVS_DIR.mkdir(parents=True, exist_ok=True) +JOBS_LOGS_DIR = DATA_DIR / "job_logs" +JOBS_LOGS_DIR.mkdir(parents=True, exist_ok=True) + + +@dataclass +class CommandResult: + returncode: int + stdout: str + stderr: str + + +def get_project_env_dir(project_id: int) -> Path: + return ENVS_DIR / f"project_{project_id}" + + +def get_env_python_bin(env_dir: Path) -> Path: + # Linux path + python_bin = env_dir / "bin" / "python" + return python_bin + + +def get_env_pip_bin(env_dir: Path) -> Path: + pip_bin = env_dir / "bin" / "pip" + return pip_bin + + +def _create_env_with_virtualenv(env_dir: Path) -> bool: + try: + proc = subprocess.run(["python3", "-m", "virtualenv", str(env_dir)], capture_output=True, text=True) + return proc.returncode == 0 + except Exception: + return False + + +def _create_env_with_venv(env_dir: Path) -> bool: + try: + proc = subprocess.run(["python3", "-m", "venv", str(env_dir)], capture_output=True, text=True) + return proc.returncode == 0 + except Exception: + return False + + +def ensure_env(project: Project) -> Tuple[Path, Path, Path]: + env_dir = get_project_env_dir(project.id) # type: ignore[arg-type] + env_dir.mkdir(parents=True, exist_ok=True) + + python_bin = get_env_python_bin(env_dir) + pip_bin = get_env_pip_bin(env_dir) + + if not python_bin.exists(): + created = _create_env_with_virtualenv(env_dir) + if not created: + created = _create_env_with_venv(env_dir) + if not created: + raise RuntimeError("Failed to create a virtual environment. Ensure 'virtualenv' or 'venv' is available.") + + # Ensure pip is up to date (best-effort) + subprocess.run([str(pip_bin), "install", "--upgrade", "pip", "setuptools", "wheel"], check=False) + + return env_dir, python_bin, pip_bin + + +def install_dbt_for_project(project: Project) -> CommandResult: + env_dir, _python_bin, pip_bin = ensure_env(project) + + packages: List[str] = [f"dbt-core=={project.dbt_version}"] + if project.extra_packages: + packages.extend([pkg.strip() for pkg in project.extra_packages.split(",") if pkg.strip()]) + + proc = subprocess.run([str(pip_bin), "install", *packages], capture_output=True, text=True) + return CommandResult(returncode=proc.returncode, stdout=proc.stdout, stderr=proc.stderr) + + +def run_dbt_parse(project: Project) -> CommandResult: + env_dir = get_project_env_dir(project.id) # type: ignore[arg-type] + dbt_bin = env_dir / "bin" / "dbt" + # Fallback: Some envs may only have module entrypoint + if not dbt_bin.exists(): + cmd = [str(get_env_python_bin(env_dir)), "-m", "dbt", "parse", "--project-dir", project.root_path] + else: + cmd = [str(dbt_bin), "parse", "--project-dir", project.root_path] + + if project.profiles_dir: + cmd += ["--profiles-dir", project.profiles_dir] + + proc = subprocess.run(cmd, capture_output=True, text=True, cwd=project.root_path) + return CommandResult(returncode=proc.returncode, stdout=proc.stdout, stderr=proc.stderr) + + +def compute_stats_from_manifest(project: Project) -> Dict[str, int]: + manifest_path = Path(project.root_path) / "target" / "manifest.json" + if not manifest_path.exists(): + return {} + + with open(manifest_path, "r", encoding="utf-8") as f: + manifest = json.load(f) + + nodes = manifest.get("nodes", {}) + exposures = manifest.get("exposures", {}) + metrics = manifest.get("metrics", {}) or {} + + stats = { + "models": 0, + "tests": 0, + "snapshots": 0, + "analyses": 0, + "seeds": 0, + "sources": 0, + "macros": len(manifest.get("macros", {})), + "exposures": len(exposures), + "metrics": len(metrics), + "packages": len(manifest.get("metadata", {}).get("dependencies", [])), + } + + for node in nodes.values(): + resource_type = node.get("resource_type") + if resource_type == "model": + stats["models"] += 1 + elif resource_type == "test": + stats["tests"] += 1 + elif resource_type == "snapshot": + stats["snapshots"] += 1 + elif resource_type == "analysis": + stats["analyses"] += 1 + elif resource_type == "seed": + stats["seeds"] += 1 + elif resource_type == "source": + stats["sources"] += 1 + + return stats + + +def create_job(project: Project, job_type: JobType) -> Job: + log_file = JOBS_LOGS_DIR / f"job_{project.id}_{job_type.value}_{datetime.utcnow().strftime('%Y%m%d%H%M%S')}.log" + job = Job(project_id=project.id, job_type=job_type, status=JobStatus.PENDING, log_path=str(log_file)) + with get_session() as session: + session.add(job) + session.flush() + session.refresh(job) + return job + + +def run_job(job: Job, project: Project) -> None: + with get_session() as session: + job.started_at = datetime.utcnow() + job.status = JobStatus.RUNNING + session.add(job) + + def write_log(text: str) -> None: + if job.log_path: + with open(job.log_path, "a", encoding="utf-8") as lf: + lf.write(text) + + try: + if job.job_type == JobType.INSTALL: + write_log(f"Installing packages for project {project.name}...\n") + res = install_dbt_for_project(project) + write_log(res.stdout) + if res.stderr: + write_log("\n[stderr]\n" + res.stderr) + success = res.returncode == 0 + elif job.job_type == JobType.EVALUATE: + write_log(f"Running dbt parse for project {project.name}...\n") + res = run_dbt_parse(project) + write_log(res.stdout) + if res.stderr: + write_log("\n[stderr]\n" + res.stderr) + success = res.returncode == 0 + else: + write_log("Unknown job type\n") + success = False + + with get_session() as session2: + job.status = JobStatus.SUCCESS if success else JobStatus.FAILED + job.finished_at = datetime.utcnow() + if not success: + job.error_message = "Job failed. See logs." + session2.add(job) + except Exception as exc: # noqa: BLE001 + write_log(f"Exception: {exc}\n") + with get_session() as session3: + job.status = JobStatus.FAILED + job.finished_at = datetime.utcnow() + job.error_message = str(exc) + session3.add(job) \ No newline at end of file diff --git a/dbt-admin/app/templates/base.html b/dbt-admin/app/templates/base.html new file mode 100644 index 0000000..8adb4dc --- /dev/null +++ b/dbt-admin/app/templates/base.html @@ -0,0 +1,24 @@ + + +
+ + +| ID | +Type | +Status | +Started | +Finished | +Logs | +
|---|---|---|---|---|---|
| {{ job.id }} | +{{ job.job_type }} | ++ {% if job.status == 'success' %} + {{ job.status }} + {% elif job.status == 'failed' %} + {{ job.status }} + {% elif job.status == 'running' %} + {{ job.status }} + {% else %} + {{ job.status }} + {% endif %} + | +{{ job.started_at or '' }} | +{{ job.finished_at or '' }} | ++ {% if job.log_path %} + View + {% endif %} + | +