In [None]:
# Databricks notebook: Download artifacts for latest run of a given task of THIS job
# ---------------------------------------------------------------------------------

# PARAMETERS
# ----------
dbutils.widgets.text("TASK_KEY", "")               # task_key (task name) within this job
dbutils.widgets.text("SOURCE_PATH", "")            # subfolder inside artifacts, empty for root
dbutils.widgets.text("DEST_PATH", "")   # local/DBFS dest on this cluster

In [None]:
TASK_KEY = dbutils.widgets.get("TASK_KEY").strip()
SOURCE_PATH = dbutils.widgets.get("SOURCE_PATH").strip()
DEST_PATH = dbutils.widgets.get("DEST_PATH").strip()

In [None]:
# Import Databricks SDK
from databricks.sdk import WorkspaceClient
import json

In [None]:
# 1. Get JOB_ID of *this* notebook's job
# --------------------------------------
ctx_json_str = dbutils.notebook.entry_point.getDbutils().notebook().getContext().safeToJson()
print(type(ctx_json_str))
ctx_dict = json.loads(ctx_json_str)
print(ctx_dict)

# 2. Get job_id from the dict
JOB_ID = ctx_dict['attributes'].get("multitaskParentRunId")

print(f"Current job id (JOB_ID) = {JOB_ID}")

In [None]:
all_args = dbutils.notebook.entry_point.getCurrentBindings()

print(all_args)

In [None]:
# Create client using the job's identity (service principal / compute identity)
w = WorkspaceClient()

# Get run info (Jobs API 2.1)
run = w.jobs.get_run(run_id=int(JOB_ID))

# `run` is a JobsGetRunResponse; `run.tasks` is a list of RunTask instances
tasks = run.tasks or []

# Pretty-print equivalent to `jq .tasks`
print(json.dumps([t.as_dict() for t in tasks], indent=2))

In [None]:
dbt_run_id = next(
    (t.run_id for t in tasks if t.task_key == "dbt"),
    None,  # fallback if not found
)

print(dbt_run_id)

In [None]:
#get the details of the taks
dbt_task_out = w.jobs.get_run_output(run_id=dbt_run_id)
print(dbt_task_out)

In [None]:
dbt_artifact_url = None
if dbt_task_out.dbt_output is not None:
    dbt_artifact_url = dbt_task_out.dbt_output.artifacts_link

print("DBT_ARTIFACT_URL:", dbt_artifact_url)

In [None]:
local_path = "/tmp/artifact.tar.gz"
volume_path = "/Volumes/workspace/default/dbt_artifacts"

In [None]:
# This should work if auth is valid
me = w.current_user.me()
print("Workspace user:", me.user_name or me.display_name)

In [None]:
headers = {"Authorization": f"Bearer {w.config.token}"}

In [None]:
import requests
import os

In [None]:
# 3. Ensure the destination directory exists within the Volume
os.makedirs(os.path.dirname(volume_path), exist_ok=True)

print(f"Streaming artifact directly to Volume: {volume_path}...")

In [None]:
dest_file = os.path.join(volume_path, "artifact.tar.gz")

In [None]:
try:
    # 4. Stream the download to avoid loading large files into RAM
    with requests.get(dbt_artifact_url, headers=headers, stream=True) as r:
        r.raise_for_status()
        with open(dest_file, 'wb') as f:
            for chunk in r.iter_content(chunk_size=1024 * 1024): # 1MB chunks
                if chunk:
                    f.write(chunk)
    
    print("Transfer successful!")
except Exception as e:
    print(f"Failed to store artifact in Volume: {e}")

In [None]:
import os
import tarfile

print(f"Opening archive: {dest_file}")
try:
    # mode "r:*" lets tarfile auto-detect gzip/bzip2/xz/plain tar
    with tarfile.open(dest_file, mode="r:*") as tar_ref:
        # 1. List all members to find index.html (could be in a subfolder)
        all_members = tar_ref.getmembers()
        target_member = next(
            (m for m in all_members if m.name.endswith("index.html")),
            None
        )

        if target_member:
            print(f"Found it! Extracting {target_member.name}...")

            # 2. Extract only that one file
            tar_ref.extract(target_member, path=volume_path)

            # 3. Move/rename to a clean path if nested (e.g. target/index.html)
            old_path = os.path.join(volume_path, target_member.name)
            new_path = os.path.join(volume_path, "index.html")

            # Ensure the parent directory exists for old_path and new_path
            os.makedirs(os.path.dirname(old_path), exist_ok=True)
            os.makedirs(os.path.dirname(new_path), exist_ok=True)

            if old_path != new_path:
                os.replace(old_path, new_path)
                print(f"Cleaned up path. File is now at: {new_path}")
        else:
            print("Error: Could not find index.html in the tar archive.")
except FileNotFoundError:
    print("Error: The tar file wasn't found. Check your download step.")
except tarfile.ReadError:
    print("Error: The file is not a valid tar/tar.gz archive.")
except Exception as e:
    print(f"An unexpected error occurred: {e}")

In [None]:
import os
import tarfile

print(f"Opening archive: {dest_file}")
try:
    # mode "r:*" lets tarfile auto-detect gzip/bzip2/xz/plain tar
    with tarfile.open(dest_file, mode="r:*") as tar_ref:
        all_members = tar_ref.getmembers()

        # 1. Find index.html (docs)
        index_member = next(
            (m for m in all_members if m.name.endswith("index.html")),
            None
        )

        # 2. Find dbt log files (usually logs/*.log)
        log_members = [
            m for m in all_members
            if m.name.startswith("logs/") and m.name.endswith(".log")
        ]

        # --- Handle index.html ---
        if index_member:
            print(f"Found index.html at {index_member.name}. Extracting...")
            tar_ref.extract(index_member, path=volume_path)

            old_index_path = os.path.join(volume_path, index_member.name)
            new_index_path = os.path.join(volume_path, "index.html")

            os.makedirs(os.path.dirname(old_index_path), exist_ok=True)
            os.makedirs(os.path.dirname(new_index_path), exist_ok=True)

            if old_index_path != new_index_path:
                os.replace(old_index_path, new_index_path)
                print(f"Docs index is now at: {new_index_path}")
        else:
            print("Warning: Could not find index.html in the tar archive.")

        # --- Handle dbt logs ---
        if log_members:
            logs_target_dir = os.path.join(volume_path, "logs")
            os.makedirs(logs_target_dir, exist_ok=True)

            for m in log_members:
                print(f"Extracting log file: {m.name}")
                tar_ref.extract(m, path=volume_path)

                # m.name is like "logs/dbt.log" or "logs/<timestamp>.log"
                old_log_path = os.path.join(volume_path, m.name)
                log_filename = os.path.basename(m.name)
                new_log_path = os.path.join(logs_target_dir, log_filename)

                if old_log_path != new_log_path:
                    os.replace(old_log_path, new_log_path)

            print(f"Extracted {len(log_members)} log file(s) into {logs_target_dir}")
        else:
            print("Warning: No dbt log files (logs/*.log) found in the tar archive.")

except FileNotFoundError:
    print("Error: The tar file wasn't found. Check your download step.")
except tarfile.ReadError:
    print("Error: The file is not a valid tar/tar.gz archive.")
except Exception as e:
    print(f"An unexpected error occurred: {e}")