In [None]:
import requests
import pandas as pd
import pytz

# Set API key
auth_token = "get_your_api_key" ##get your api key

# Define query_one with pagination
query_one = """
query GetLatestModelMetadata($environmentId: BigInt!, $first: Int!, $after: String) {
  environment(id: $environmentId) {
    applied {
      models(first: $first, after: $after) {
        edges {
          node {
            name
            uniqueId
            schema
            database
            materializedType
            executionInfo {
              lastSuccessRunId
              executionTime
              executeStartedAt
            }
          }
        }
        pageInfo {
          endCursor
          hasNextPage
        }
      }
    }
  }
}
"""

# Query the API
def query_discovery_api(auth_token, gql_query, variables):
    response = requests.post(
        'https://metadata.cloud.getdbt.com/graphql',
        headers={
            "Authorization": f"Bearer {auth_token}",
            "Content-Type": "application/json"
        },
        json={"query": gql_query, "variables": variables}
    )
    try:
        response_data = response.json()
    except ValueError:
        print(f"Invalid JSON response: {response.text}")
        return None

    # Handle GraphQL errors
    if 'errors' in response_data:
        print("GraphQL Errors:", response_data["errors"])
        return None

    return response_data.get('data', None)

# Variables for query_one
variables_query_one = {
    "environmentId": 'your_environment_id',  # Replace with your environment ID
    "first": 500,
    "after": None  # Start with no cursor for the first page
}

# Fetch all models with pagination
all_models = []
while True:
    data = query_discovery_api(auth_token, query_one, variables_query_one)
    
    if not data or not data.get("environment"):
        print("No metadata found or error occurred.")
        break

    models_data = data["environment"]["applied"]["models"]
    all_models.extend([edge['node'] for edge in models_data['edges']])

    # Check if there's another page
    page_info = models_data["pageInfo"]
    if page_info["hasNextPage"]:
        variables_query_one["after"] = page_info["endCursor"]
    else:
        break

# Convert all models to a dataframe
models_df = pd.DataFrame(all_models)

# Unnest the executionInfo column
if 'executionInfo' in models_df:
    models_df = pd.concat([models_df.drop(['executionInfo'], axis=1), models_df['executionInfo'].apply(pd.Series)], axis=1)

# Sort the models by execution time
models_df_sorted = models_df.sort_values('executionTime', ascending=False)




In [None]:
models_df

In [None]:
models_df['executeStartedAt'] = pd.to_datetime(models_df['executeStartedAt'])


In [None]:
# Convert from UTC to IST
from pytz import timezone
ist = timezone('Asia/Kolkata')

dt_columns = ['executeStartedAt']

for col in dt_columns:
    models_df[col] = models_df[col].dt.tz_convert(ist)


In [None]:
models_df

In [None]:
from sqlalchemy import create_engine

# Database connection details
username = "######"
password = "######"
host = "######"  # Or your database host
port = "######"  # Default PostgreSQL port
database = "######"

# Create an SQLAlchemy engine
engine = create_engine(f"postgresql+psycopg2://{username}:{password}@{host}:{port}/{database}")


In [None]:
models_df.to_sql('dbt_models', con=engine, schema='analytics', if_exists='replace', index=False)