In [None]:
#Importing relevant libraries
import requests
import pandas as pd
import pytz

In [None]:
#DBT Account Id and API Key
account_id= "your_account_id"
API_TOKEN = "your_api_key" 

# Database connection details
username = "your_username"
password = "your_password"
host = "your_host"  # Or your database host
port = "your_port"  # Default PostgreSQL port
database = "your_database"
schema="your_schema"

In [None]:
BASE_URL = f'https://cloud.getdbt.com/api/v2/accounts/{account_id}/jobs/'

# Headers for authentication
headers = {
    "Authorization": f"Token {API_TOKEN}",
    "Content-Type": "application/json"
}

total_jobs = requests.get(BASE_URL, headers=headers).json().get("extra")['pagination']['total_count']

print("Total jobs for this dbt account are ", total_jobs)


In [None]:
# Pagination parameters
limit = 100  # Maximum number of records per request
total_records = total_jobs  # Total number of records
records_to_fetch = total_jobs

# Calculate the starting offset
start_offset = total_records - records_to_fetch

# Initialize an empty list to store the records
last_records = []


In [None]:



# Loop through and fetch 100 records at a time until we have all the records
offset = start_offset
while offset < total_records:
    # Make the API request with limit and offset
    params = {"limit": limit, "offset": offset}
    response = requests.get(BASE_URL, headers=headers, params=params)

    if response.status_code == 200:
        data = response.json()
        runs = data.get("data", [])
        last_records.extend(runs)

        # Increment offset by the limit
        offset += limit
    else:
        print(f"Failed to fetch data: {response.status_code}, {response.text}")
        break

# Convert the results to a pandas DataFrame
df = pd.DataFrame(last_records)




In [None]:
df

In [None]:
# Convert all timestamp columns to datetime
dt_columns= ['created_at','updated_at','next_run']
for column in dt_columns :
    try:
        df[column] = pd.to_datetime(df[column])
    except Exception:
        pass  # Skip non-datetime columns

# Define the timezone conversion
utc = pytz.utc
ist = pytz.timezone("Asia/Kolkata")

# Convert all datetime columns from UTC to IST
for column in df.select_dtypes(include=['datetime64[ns]']).columns:
    df[column] = df[column].dt.tz_localize(utc).dt.tz_convert(ist)

In [None]:
from pandas import json_normalize

# Flatten dict columns and merge with original DataFrame
for column in df.columns:
    if df[column].apply(lambda x: isinstance(x, dict)).any():
        flat_df = json_normalize(df[column])
        flat_df.columns = [f"{column}_{subcol}" for subcol in flat_df.columns]
        df = df.drop(columns=[column]).join(flat_df)


In [None]:
df

In [None]:
from sqlalchemy import create_engine


# Create an SQLAlchemy engine
engine = create_engine(f"postgresql+psycopg2://{username}:{password}@{host}:{port}/{database}")

In [None]:
df.to_sql('dbt_jobs',con=engine, schema=schema, if_exists='replace', index=False)