In [7]:
# dbt Cloud API Integration Notebook
# ================================
# This notebook demonstrates how to work with both the dbt Cloud Admin API
# and the Discovery API for metadata exploration

import os
import requests
import json
import pandas as pd
from dotenv import load_dotenv
import matplotlib.pyplot as plt
import networkx as nx

# Load environment variables
load_dotenv()

# Get credentials from environment variables
DBT_CLOUD_TOKEN = os.getenv("DBT_CLOUD_TOKEN")
DBT_CLOUD_ACCOUNT_ID = os.getenv("DBT_CLOUD_ACCOUNT_ID")
DBT_ENVIRONMENT_ID = os.getenv("DBT_ENVIRONMENT_ID")
DISCOVERY_API_URL = os.getenv("DISCOVERY_API_URL", "https://metadata.cloud.getdbt.com/graphql")

In [29]:
def run_graphql_query(query, variables=None):
    """Run a GraphQL query against the dbt Cloud Discovery API"""
    if variables is None:
        variables = {}

    request_body = {
        "query": query,
        "variables": variables
    }

    try:
        response = requests.post(
            DISCOVERY_API_URL,
            headers={"Content-Type": "application/json", "Authorization": f"Bearer {DBT_CLOUD_TOKEN}"},
            json=request_body
        )

        if response.status_code == 200:
            return response.json()
        else:
            print(f"Error: {response.status_code} - {response.text}")
            return None

    except Exception as e:
        print(f"Error: {str(e)}")
        return None

In [26]:
# Headers for the request
headers = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {DBT_CLOUD_TOKEN}"
}

# Sample GraphQL query to get models
query = """
query ($environmentId: BigInt!, $first: Int!) {
  environment(id: $environmentId) {
    applied {
      models(first: $first) {
        edges {
          node {
            name
            description
            resourceType
            uniqueId
            database
            schema
            alias
          }
        }
      }
    }
  }
}
"""

# Variables for the query
variables = {
    "environmentId": DBT_ENVIRONMENT_ID,
    "first": 5  # Limit to 10 models
}

# Create the request body
request_body = {
    "query": query,
    "variables": variables
}

# Make the request
def test_discovery_api():
    try:
        response = requests.post(
            DISCOVERY_API_URL,
            headers=headers,
            json=request_body
        )

        # Check if the request was successful
        if response.status_code == 200:
            result = response.json()

            # Check if there's an error in the response
            if "errors" in result:
                print(f"GraphQL Error: {json.dumps(result['errors'], indent=2)}")
                return None

            # Extract models from the response
            models = result.get("data", {}).get("environment", {}).get("applied", {}).get("models", {}).get("edges", [])

            print(f"✅ Successfully queried Discovery API")
            print(f"Found {len(models)} models")

            # Print model names as a sample
            for edge in models:
                model = edge.get("node", {})
                print(f"  - {model.get('name')} ({model.get('uniqueId')})")

            return result
        else:
            print(f"❌ Failed to query Discovery API: {response.status_code}")
            print(f"Response: {response.text}")
            return None

    except Exception as e:
        print(f"❌ Error: {str(e)}")
        return None

# Execute the test
if __name__ == "__main__":
    test_discovery_api()

✅ Successfully queried Discovery API
Found 5 models
  - dim_dbt__current_models (model.dbt_artifacts.dim_dbt__current_models)
  - dim_dbt__exposures (model.dbt_artifacts.dim_dbt__exposures)
  - dim_dbt__models (model.dbt_artifacts.dim_dbt__models)
  - dim_dbt__seeds (model.dbt_artifacts.dim_dbt__seeds)
  - dim_dbt__snapshots (model.dbt_artifacts.dim_dbt__snapshots)


In [None]:
def get_test_results(limit=100):
    """
    Retrieve test results from dbt Cloud Discovery API

    Args:
        limit (int): Maximum number of test results to retrieve

    Returns:
        DataFrame: Test results data
    """
    print("🔍 Retrieving dbt test results...")

    query = """
    query ($environmentId: BigInt!, $first: Int!) {
      environment(id: $environmentId) {
        applied {
          tests(first: $first) {
            edges {
              node {
                name
                columnName
                parents {
                  name
                  resourceType
                }
                executionInfo {
                  lastRunStatus
                  lastRunError
                  executeCompletedAt
                  executionTime
                }
              }
            }
          }
        }
      }
    }
    """

    variables = {
        "environmentId": int(DBT_ENVIRONMENT_ID),
        "first": limit
    }

    result = run_graphql_query(query, variables)

    if not result or "data" not in result:
        print("❌ Failed to retrieve test results")
        return pd.DataFrame()

    # Extract test data
    edges = result.get("data", {}).get("environment", {}).get("applied", {}).get("tests", {}).get("edges", [])

    if not edges:
        print("⚠️ No test results found")
        return pd.DataFrame()

    # Process the test results
    test_results = []
    for edge in edges:
        node = edge.get("node", {})

        # Get parent model/source information
        parents = node.get("parents", [])
        parent_names = [p.get("name") for p in parents]
        parent_types = [p.get("resourceType") for p in parents]

        # Get execution info
        exec_info = node.get("executionInfo", {})

        test_result = {
            "test_name": node.get("name"),
            "column_name": node.get("columnName"),
            "parent_models": ", ".join(parent_names) if parent_names else None,
            "parent_types": ", ".join(parent_types) if parent_types else None,
            "status": exec_info.get("lastRunStatus"),
            "error_message": exec_info.get("lastRunError"),
            "last_run_at": exec_info.get("executeCompletedAt"),
            "execution_time_seconds": exec_info.get("executionTime")
        }

        test_results.append(test_result)

    # Convert to DataFrame
    df = pd.DataFrame(test_results)

    # Print summary stats
    print(f"\n✅ Retrieved {len(test_results)} test results")

    if not df.empty:
        # Count by status
        status_counts = df["status"].value_counts()
        print("\n📊 Test Status Summary:")
        for status, count in status_counts.items():
            status_symbol = "✅" if status == "success" else "❌" if status == "error" else "⚠️"
            print(f"{status_symbol} {status}: {count} tests")

        # Count tests per model
        model_test_counts = df["parent_models"].value_counts().head(10)
        print("\n🔎 Top 10 Models by Test Count:")
        for model, count in model_test_counts.items():
            print(f"  • {model}: {count} tests")

        # Calculate coverage
        total_models = len(df["parent_models"].unique())
        print(f"\n📏 Test Coverage:")
        print(f"  • Total models with tests: {total_models}")

        # Show failed tests if any
        failed_tests = df[df["status"] == "error"]
        if not failed_tests.empty:
            print(f"\n❌ Failed Tests ({len(failed_tests)}):")
            for idx, test in failed_tests.iterrows():
                print(f"  • {test['test_name']} on {test['parent_models']}")
                if test["error_message"]:
                    error_msg = test["error_message"]
                    # Truncate long error messages
                    if len(error_msg) > 100:
                        error_msg = error_msg[:97] + "..."
                    print(f"    Error: {error_msg}")

    return df

# Execute the function to get test results
test_results_df = get_test_results(limit=100)

# Display test results DataFrame (optional)
if not test_results_df.empty:
    display(test_results_df) if 'display' in globals() else print(test_results_df.head())

In [34]:
def get_source_freshness(limit=100, database="production"):
    """
    Retrieve source freshness information from dbt Cloud Discovery API

    Args:
        limit (int): Maximum number of sources to retrieve
        database (str): Database name to filter sources by

    Returns:
        DataFrame: Source freshness data
    """
    print("🔍 Retrieving source freshness information...")

    query = """
    query ($environmentId: BigInt!, $first: Int!, $database: String) {
      environment(id: $environmentId) {
        applied {
          sources(
            first: $first
            filter: { freshnessChecked: true, database: $database }
          ) {
            edges {
              node {
                sourceName
                name
                identifier
                loader
                freshness {
                  freshnessStatus
                  freshnessChecked
                  maxLoadedAt
                  maxLoadedAtTimeAgoInS
                  snapshottedAt
                  criteria {
                    errorAfter {
                      count
                      period
                    }
                    warnAfter {
                      count
                      period
                    }
                  }
                }
              }
            }
          }
        }
      }
    }
    """

    variables = {
        "environmentId": int(DBT_ENVIRONMENT_ID),
        "first": limit,
        "database": database
    }

    result = run_graphql_query(query, variables)

    if not result or "data" not in result:
        print("❌ Failed to retrieve source freshness data")
        return pd.DataFrame()

    # Extract source data
    edges = result.get("data", {}).get("environment", {}).get("applied", {}).get("sources", {}).get("edges", [])

    if not edges:
        print("⚠️ No source freshness data found")
        return pd.DataFrame()

    # Process the source freshness results
    freshness_data = []
    for edge in edges:
        node = edge.get("node", {})
        freshness = node.get("freshness", {})

        # Get criteria information
        criteria = freshness.get("criteria", {})
        error_after = criteria.get("errorAfter", {})
        warn_after = criteria.get("warnAfter", {})

        source_data = {
            "source_name": node.get("sourceName"),
            "table_name": node.get("name"),
            "identifier": node.get("identifier"),
            "loader": node.get("loader"),
            "freshness_status": freshness.get("freshnessStatus"),
            "last_loaded_at": freshness.get("maxLoadedAt"),
            "time_since_load_seconds": freshness.get("maxLoadedAtTimeAgoInS"),
            "snapshotted_at": freshness.get("snapshottedAt"),
            "warn_after_count": warn_after.get("count"),
            "warn_after_period": warn_after.get("period"),
            "error_after_count": error_after.get("count"),
            "error_after_period": error_after.get("period")
        }

        freshness_data.append(source_data)

    # Convert to DataFrame
    df = pd.DataFrame(freshness_data)

    # Print summary stats
    print(f"\n✅ Retrieved freshness data for {len(freshness_data)} sources")

    if not df.empty:
        # Count by status
        status_counts = df["freshness_status"].value_counts()
        print("\n📊 Freshness Status Summary:")
        for status, count in status_counts.items():
            status_symbol = "✅" if status == "pass" else "⚠️" if status == "warn" else "❌"
            print(f"{status_symbol} {status}: {count} sources")

        # Show stale sources if any
        stale_sources = df[df["freshness_status"].isin(["warn", "error"])]
        if not stale_sources.empty:
            print(f"\n⚠️ Stale Sources ({len(stale_sources)}):")
            for idx, source in stale_sources.iterrows():
                hours_ago = round(source["time_since_load_seconds"] / 3600, 1) if source["time_since_load_seconds"] else "N/A"
                print(f"  • {source['source_name']}.{source['table_name']}: {source['freshness_status']} - Last loaded {hours_ago} hours ago")

    return df

# Execute the function to get source freshness information
source_freshness_df = get_source_freshness(limit=50)

# Display source freshness DataFrame
if not source_freshness_df.empty:
    display(source_freshness_df) if 'display' in globals() else print(source_freshness_df)

🔍 Retrieving source freshness information...
⚠️ No source freshness data found
