<a href="https://colab.research.google.com/github/anouardbt/dbt-cloud-api-demos/blob/main/dbt_Cloud%20Logs%20to%20GCS%20and%20BigQuery.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction
The goal of this script is to demonstrate dbt Cloud REST API capabilities, for running and leveraging Jobs and their metadata.

Disclaimer :
This is free and unencumbered software released into the public domain.

Anyone is free to copy, modify, publish, use, compile, sell, or
distribute this software, either in source code form or as a compiled
binary, for any purpose, commercial or non-commercial, and by any
means.

In jurisdictions that recognize copyright laws, the author or authors
of this software dedicate any and all copyright interest in the
software to the public domain. We make this dedication for the benefit
of the public at large and to the detriment of our heirs and
successors. We intend this dedication to be an overt act of
relinquishment in perpetuity of all present and future rights to this
software under copyright law.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
IN NO EVENT SHALL THE AUTHORS BE LIABLE FOR ANY CLAIM, DAMAGES OR
OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
OTHER DEALINGS IN THE SOFTWARE.

For more information, please refer to <https://unlicense.org>

In [1]:
!pip install google-auth-oauthlib google-cloud-logging

In [None]:
!pip install google-cloud-storage

In [None]:
!pip install google-cloud-storage google-cloud-bigquery

In [3]:
import requests
from google.colab import auth
from google.auth import default
from google.cloud import logging
#from google.cloud.logging_v2.resource import Resource
from google.cloud import storage
from google.cloud import bigquery
import json
from datetime import datetime
import sys


In [5]:
auth.authenticate_user()
creds, project = default()

In [6]:
# dbt Cloud API configuration
DBT_CLOUD_ACCOUNT_ID = "70403103916174"  # Replace with your actual account ID
DBT_CLOUD_API_KEY = "Insert Token Here"  # Replace with your actual API key
DBT_CLOUD_API_BASE_URL = 'https://c1.us1.dbt.com/api/v2'

# GCP configuration
GCP_PROJECT_ID = 'sales-demo-project-314714'  # Using the project from default credentials
GCS_BUCKET_NAME = "dbt-cloud-logging"
BIGQUERY_DATASET = "ani_experiments"  # Replace with your BigQuery dataset name
BIGQUERY_TABLE_PREFIX = "dbt_artifacts_"  # Table prefix for partitioned BigQuery

In [7]:
# Initialize Google Cloud Storage and BigQuery clients
storage_client = storage.Client(project=GCP_PROJECT_ID, credentials=creds)
bq_client = bigquery.Client(project=GCP_PROJECT_ID, credentials=creds)

In [8]:
def get_dbt_cloud_artifacts(run_id):
    url = f"{DBT_CLOUD_API_BASE_URL}/accounts/{DBT_CLOUD_ACCOUNT_ID}/runs/{run_id}/artifacts/"
    headers = {
        'Authorization': f'Token {DBT_CLOUD_API_KEY}',
        'Content-Type': 'application/json'
    }
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    return response.json()

In [9]:
def get_artifact_content(run_id, path):
    """Fetches the content of a specific artifact and handles both JSON and non-JSON files."""
    url = f"{DBT_CLOUD_API_BASE_URL}/accounts/{DBT_CLOUD_ACCOUNT_ID}/runs/{run_id}/artifacts/{path}"
    headers = {
        'Authorization': f'Token {DBT_CLOUD_API_KEY}',
        'Content-Type': 'application/json'
    }
    response = requests.get(url, headers=headers)

    # Check if the request was successful
    response.raise_for_status()

    # Try to parse the content as JSON; if it fails, treat it as text
    try:
        return response.json()  # Return JSON if it is a valid JSON response
    except ValueError:
        return response.text  # If it's not JSON, return as text (e.g., SQL or other file types)

In [10]:
def upload_to_gcs(content, gcs_path):
    """Uploads the content to Google Cloud Storage (private by default)."""
    # Reference the GCS bucket
    bucket = storage_client.bucket(GCS_BUCKET_NAME)
    blob = bucket.blob(gcs_path)

    # Convert content to string and upload
    blob.upload_from_string(json.dumps(content), content_type="application/json")

    # Keep the blob private (no blob.make_public())

    print(f"Artifact uploaded to GCS: {blob.name}")
    return f"gs://{GCS_BUCKET_NAME}/{gcs_path}"  # Return GCS URI for private blob

In [11]:
def upload_to_bigquery(content, artifact_path, run_id):
    """Uploads nested JSON content to BigQuery in a single column and partitions by system timestamp, including run_id."""
    # Sanitize the artifact_path to create a valid BigQuery table name and adding prefix
    sanitized_table_name = artifact_path.replace('/', '_').replace('.', '_')

    full_table_name = f"{BIGQUERY_TABLE_PREFIX}{sanitized_table_name}"

    # Generate the current timestamp from the system
    timestamp = datetime.utcnow().isoformat()  # Use the current timestamp for partitioning

    # Define the table name
    table_id = f"{GCP_PROJECT_ID}.{BIGQUERY_DATASET}.{full_table_name}"

    # Define schema with three columns: JSON content, partitioning timestamp, and run_id
    schema = [
        bigquery.SchemaField("json_data", "JSON"),  # Store entire JSON content in this field
        bigquery.SchemaField("timestamp", "TIMESTAMP"),  # Store the current system timestamp for partitioning
        bigquery.SchemaField("run_id", "INTEGER")  # Add run_id as an integer column
    ]

    # Define table options with partitioning by system-generated timestamp
    job_config = bigquery.LoadJobConfig(
        schema=schema,
        source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND,  # Append to the existing table
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field="timestamp"  # Partition by system-generated timestamp
        )
    )

    # Wrap content into a single field for BigQuery JSON column, and include the generated system timestamp and run_id
    row = {"json_data": content, "timestamp": timestamp, "run_id": run_id}

    # Upload the content as a single row to BigQuery
    load_job = bq_client.load_table_from_json([row], table_id, job_config=job_config)
    load_job.result()  # Wait for the job to complete

    print(f"Uploaded nested JSON artifact {artifact_path} with run_id {run_id} to BigQuery table {sanitized_table_name}")

In [12]:
def process_run_artifacts(run_id):
    """Fetches all artifacts from dbt cloud, uploads them to GCS, and uploads JSON files to BigQuery."""
    try:
        # Fetch the list of artifacts
        response = get_dbt_cloud_artifacts(run_id)

        if not isinstance(response, dict):
            print(f"Unexpected response format. Expected a dict, got: {type(response)}")
            return

        # Extract artifacts from the response (all artifacts are processed)
        artifacts = response.get('data', response.get('results', None))

        if not isinstance(artifacts, list):
            print(f"Artifacts are not in a list format. Got: {type(artifacts)}")
            return

        for artifact in artifacts:
            # Extract artifact path
            if isinstance(artifact, str):
                artifact_path = artifact
            elif isinstance(artifact, dict):
                artifact_path = artifact.get('path')
                if not artifact_path:
                    print(f"No 'path' key in artifact: {artifact}")
                    continue
            else:
                print(f"Unexpected artifact format. Expected a string or dict, got: {type(artifact)}")
                continue

            # Fetch the artifact content (handles both JSON and non-JSON files)
            try:
                artifact_content = get_artifact_content(run_id, artifact_path)
            except requests.exceptions.RequestException as e:
                print(f"Error fetching artifact content for path {artifact_path}: {e}")
                continue

            # Upload artifact content to GCS (private by default)
            gcs_path = f"dbt_artifacts/{run_id}/{artifact_path.replace('/', '_')}"
            gcs_uri = upload_to_gcs(artifact_content, gcs_path)

            # If the content is JSON, upload it to BigQuery
            if isinstance(artifact_content, dict):
                upload_to_bigquery(artifact_content, artifact_path, run_id)
            else:
                print(f"Non-JSON artifact {artifact_path} saved to GCS but not uploaded to BigQuery.")

    except requests.exceptions.RequestException as e:
        print(f"Error fetching artifacts: {e}")
    except Exception as e:
        print(f"Unexpected error: {e}")
        print(f"Error details: {sys.exc_info()}")


In [13]:
def main():
    # Example usage
    run_id = 70403121082874  # Replace with actual run ID
    process_run_artifacts(run_id)

if __name__ == "__main__":
    main()

Artifact uploaded to GCS: dbt_artifacts/70403121082874/catalog.json
Uploaded nested JSON artifact catalog.json with run_id 70403121082874 to BigQuery table catalog_json
Artifact uploaded to GCS: dbt_artifacts/70403121082874/compiled_analytics_analyses_generate_model_yaml.sql
Non-JSON artifact compiled/analytics/analyses/generate_model_yaml.sql saved to GCS but not uploaded to BigQuery.
Artifact uploaded to GCS: dbt_artifacts/70403121082874/compiled_analytics_analyses_generate_source_yaml.sql
Non-JSON artifact compiled/analytics/analyses/generate_source_yaml.sql saved to GCS but not uploaded to BigQuery.
Artifact uploaded to GCS: dbt_artifacts/70403121082874/compiled_analytics_analyses_select_from_orders_example.sql
Non-JSON artifact compiled/analytics/analyses/select_from_orders_example.sql saved to GCS but not uploaded to BigQuery.
Artifact uploaded to GCS: dbt_artifacts/70403121082874/compiled_analytics_dbt_project.yml_hooks_analytics-on-run-end-0.sql
Non-JSON artifact compiled/analy