In [None]:
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Module 04: BigQuery Data Engineering Agent for Reporting Mart generation

This notebook contains the prep needed to build the reporting mart off of the code generated by the BigQuery Data Engineering Agent.

Programmatic invocation of the Data Engineering Agent is currently not supported, we will therefore do the pre-work here and switch to the UI to generate the Reporting Mart code

## 1. Foundations

### 1.1. Installs

In [None]:
!pip install google-cloud-dataplex==2.11.0  -q
!pip install google-cloud-dataform==0.6.2 -q

### 1.2. Variable initialization & imports

In [None]:
PROJECT_ID_LIST=!gcloud config list --format "value(core.project)" 2>/dev/null
PROJECT_ID=PROJECT_ID_LIST[0]
LOCATION="us-central1"
OLTP_DATASET_ID="rscw_oltp_stg_ds"
DWH_DATASET_ID="rscw_dwh_ds"
OLTP_METADATA_DATASET_ID="rscw_oltp_metadata_ds"
DWH_METADATA_DATASET_ID="rscw_dwh_metadata_ds"
DATAFORM_REPO="rscw-df-repo"
DATAFORM_WORKSPACE="rscw-df-ws"
OLTP_DATASET_RESOURCE_URI = f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{OLTP_DATASET_ID}"
DWH_DATASET_RESOURCE_URI = f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DWH_DATASET_ID}"
BASE_URL_FOR_DATAPLEX_SCAN="https://dataplex.googleapis.com/v1"
MAX_POLLS=250
POLLING_INTERVAL_SECONDS=5

In [None]:
import re
import time
import json
import os
import io
import base64
import yaml
import pandas as pd
import pandas_gbq
import requests
import datetime
import decimal
import google.auth
import google.auth.transport.requests
import pyarrow as pa
import pyarrow.parquet as pq
import psycopg2
import google.genai
import git
import traceback
import subprocess
import shutil
import looker_sdk
import configparser

from typing import List, Dict, Any, Optional
from google.cloud import bigquery
from google.cloud import dataplex_v1
from google.cloud import dataform_v1beta1
from google.api_core import exceptions, operation
from google.cloud.exceptions import NotFound
from google.cloud.exceptions import Conflict
from google.protobuf.timestamp_pb2 import Timestamp
from google.genai.types import CreateBatchJobConfig, JobState
from google.cloud import storage
from google.cloud import secretmanager
from google.adk.agents import Agent
from google.adk.tools import ToolContext

## 2. Util functions

### 2.1. General functions

In [None]:
def get_auth_token():
    creds, _ = google.auth.default()
    # Refresh the credentials to get an access token
    creds.refresh(google.auth.transport.requests.Request())
    return creds.token

def sanitize_string_with_hyphens(input_string):
    """
    Converts a string to lowercase and replaces any character that is not
    a lowercase letter or a number with a hyphen.
    """
    # convert the entire string to lowercase.
    processed_string = input_string.lower()

    # The pattern [^a-z0-9] matches any single character that is NOT
    # a lowercase letter (a-z) or a digit (0-9).
    sanitized_string = re.sub(r'[^a-z0-9]', '-', processed_string)

    return sanitized_string


### 2.2. BQ utils

In [None]:
def truncate_bigquery_table(bq_table_uri: str):
    """
    Deletes all rows from a specified BigQuery table.

    This function executes a TRUNCATE TABLE DML statement, which is an
    efficient way to clear a table.

    Returns:
        A string confirming the successful truncation or describing an error.
    """
    try:
        # Construct a BigQuery client object.
        client = bigquery.Client()

        # Sanitize the table URI by wrapping it in backticks.
        safe_table_uri = f"`{bq_table_uri}`"

        # Construct the DML query to truncate the table.
        truncate_query = f"TRUNCATE TABLE {safe_table_uri}"

        # Execute the query.
        print(f"Executing query: {truncate_query}")
        query_job = client.query(truncate_query)

        # Wait for the DML query to complete.
        query_job.result()

        return f"Successfully truncated table {bq_table_uri}"

    except exceptions.NotFound:
        return f"Error: The table '{bq_table_uri}' was not found."
    except Exception as e:
        return f"An unexpected error occurred: {e}"

def read_bigquery_table(
    project_id: str,
    dataset_id: str,
    table_id: str
) -> pd.DataFrame:
    """
    Reads all rows from a BigQuery table and returns them as a pandas DataFrame.

    This function authenticates using the environment's default credentials and uses
    the BigQuery Storage Read API for efficient data retrieval.

    Returns:
        A pandas DataFrame containing all rows from the table.
        Returns an empty DataFrame if the table is not found or an error occurs.
    """
    # Construct a BigQuery client object.
    # The client library will automatically handle authentication.
    client = bigquery.Client(project=project_id)

    # Construct the full table ID in the format `project.dataset.table`.
    table_ref = f"{project_id}.{dataset_id}.{table_id}"

    try:
        # Use the list_rows() method to get a row iterator from the API.
        # The to_dataframe() method downloads all rows and converts them to a DataFrame.
        print(f"Reading all rows from table: {table_ref}...")
        rows = client.list_rows(table_ref)
        dataframe = rows.to_dataframe()
        print(f"Successfully read {len(dataframe)} rows.")
        return dataframe

    except Exception as e:
        print(f"An error occurred: {e}")
        # Return an empty DataFrame in case of an error.
        return pd.DataFrame()

def write_dict_to_bigquery(bq_table_uri: str, data_to_insert: dict, delete_conditions: dict = None):
    """
    Writes a row of data to a BigQuery table.

    Optionally deletes rows from the table based on a condition before inserting new data.
    Create the dataset if it doesn't exist.
    Create the table if it doesn't exist, using the data's keys as the schema.
    Truncate the table if it exists.
    Update the table schema if the data contains new columns.
    Insert the data as a new row.

    Returns:
        A string indicating success or failure.
    """
    if not data_to_insert:
        return "No data provided to write to BigQuery."

    try:
        # Construct a BigQuery client object.
        client = bigquery.Client()

        # Parse the table URI.
        project_id, dataset_id, table_id = bq_table_uri.split('.')
        dataset_ref = client.dataset(dataset_id)
        table_ref = dataset_ref.table(table_id)
        table_ref_str = f"{project_id}.{dataset_id}.{table_id}"

        # Create dataset if it doesn't exist.
        try:
            client.get_dataset(dataset_ref)
        except exceptions.NotFound:
            print(f"Dataset {dataset_id} not found. Creating it in us-central1.")
            dataset = bigquery.Dataset(dataset_ref)
            dataset.location = "us-central1"
            client.create_dataset(dataset, exists_ok=True)

        # Prepare the schema based on the data to insert.
        new_schema_fields = [
            bigquery.SchemaField(key, "STRING") for key in data_to_insert
        ]

        # Check if table exists and update schema if necessary.
        try:
            table = client.get_table(table_ref)
            current_schema = {field.name for field in table.schema}
            new_schema = list(table.schema)

            for field in new_schema_fields:
                if field.name not in current_schema:
                    new_schema.append(field)

            table.schema = new_schema
            client.update_table(table, ["schema"])

        except exceptions.NotFound:
            print(f"Table {table_id} not found. Creating it.")
            table = bigquery.Table(table_ref_str, schema=new_schema_fields)
            client.create_table(table)

        # Delete rows if conditions are provided.
        if delete_conditions and table_id in ["tables", "columns"]:
            where_clauses = []
            for condition in delete_conditions:
                column = condition.get('column')
                value = condition.get('value')
                if column and value is not None:
                    where_clauses.append(f"{column} = '{value}'")

            if where_clauses:
                delete_query = f"DELETE FROM {table_ref_str} WHERE " + " AND ".join(where_clauses)
                query_job = client.query(delete_query)
                query_job.result()  # Wait for the job to complete.

        # Insert the new row.
        row_to_insert = {key: str(value) for key, value in data_to_insert.items()}
        errors = client.insert_rows_json(table_ref_str, [row_to_insert])

        if not errors:
            return f"Successfully wrote data to {table_ref_str}"
        else:
            return f"Failed to insert rows: {errors}"

    except Exception as e:
        return f"An unexpected error occurred: {e}"

def update_bigquery_metadata(
    project_id: str,
    dataset_id: str,
    new_description: str,
    table_id: str = None,
    column_name: str = None
):
  """
  Updates the description of a BigQuery dataset, table, or column.
  """
  try:
    client = bigquery.Client(project=project_id)

    if table_id and column_name:
      # Update a column's description
      dataset_ref = client.dataset(dataset_id)
      table_ref = dataset_ref.table(table_id)
      table = client.get_table(table_ref)

      new_schema = []
      column_found = False
      for field in table.schema:
        if field.name == column_name:
          column_found = True
          # Recreate the SchemaField with the new description
          new_field = field.to_api_repr()
          new_field['description'] = new_description
          new_schema.append(bigquery.SchemaField.from_api_repr(new_field))
        else:
          new_schema.append(field)

      if not column_found:
        print(f"Error: Column '{column_name}' not found in table '{table_id}'.")
        return

      table.schema = new_schema
      client.update_table(table, ["schema"])
      print(f"Successfully updated description for column: {column_name} in table {project_id}.{dataset_id}.{table_id}")

    elif table_id:
      # Update a table's description
      dataset_ref = client.dataset(dataset_id)
      table_ref = dataset_ref.table(table_id)
      table = client.get_table(table_ref)
      table.description = new_description
      client.update_table(table, ["description"])
      print(f"Successfully updated description for table: {project_id}.{dataset_id}.{table_id}")

    else:
      # Update a dataset's description
      dataset_ref = client.dataset(dataset_id)
      dataset = client.get_dataset(dataset_ref)
      dataset.description = new_description
      client.update_dataset(dataset, ["description"])
      print(f"Successfully updated description for dataset: {project_id}.{dataset_id}")

  except NotFound as e:
    print(f"Error: Resource not found. Please check your IDs. Details: {e}")
  except Exception as e:
    print(f"An error occurred: {e}")

def get_dataset_tables(dataset_id):
  """
  Fetches a list of tables within a specified BigQuery dataset.
  This function initializes a BigQuery client for a predefined project
  and retrieves an iterator for the tables in the given dataset.
  """
  client = bigquery.Client(project=f"{PROJECT_ID}")
  tables = client.list_tables(dataset_id)
  return tables

### 2.3. Dataform utils

In [None]:

def delete_dataform_workspace(token: str, project_id: str, location: str, repository_id: str, workspace_id: str):
    """
    Deletes a Dataform workspace.
    """

    headers = {
        "Authorization": f"Bearer {token}",
    }
    workspace_url = f"https://dataform.googleapis.com/v1beta1/projects/{project_id}/locations/{location}/repositories/{repository_id}/workspaces/{workspace_id}"

    print(f"Attempting to delete workspace '{workspace_id}'...")
    try:
        response = requests.delete(workspace_url, headers=headers)
        response.raise_for_status()
        print(f"Workspace '{workspace_id}' deleted successfully.")
    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 404:
            print(f"Workspace '{workspace_id}' not found.")
        else:
            print(f"Failed to delete workspace: {e.response.text}")

def create_dataform_repository_and_workspace(token, project_id, location, repository_id, workspace_id):
    """
    Checks for a Dataform repository, creates it if it doesn't exist,
    and then creates a workspace in it.
    """

    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
    }

    repo_url = f"https://dataform.googleapis.com/v1beta1/projects/{project_id}/locations/{location}/repositories/{repository_id}"

    # Check if the repository exists
    repo_check_response = requests.get(repo_url, headers=headers)

    repo_exists = False
    if repo_check_response.status_code == 200:
        print(f"Repository '{repository_id}' already exists.")
        repo_exists = True
    elif repo_check_response.status_code == 404:
        print(f"Repository '{repository_id}' not found. Creating it...")
        # Create the repository since it doesn't exist
        create_repo_url = f"https://dataform.googleapis.com/v1beta1/projects/{project_id}/locations/{location}/repositories?repositoryId={repository_id}"
        repo_create_response = requests.post(create_repo_url, headers=headers)

        if repo_create_response.status_code == 200:
            print(f"Repository '{repository_id}' created successfully.")
            repo_exists = True
        else:
            print(f"Failed to create repository: {repo_create_response.text}")
    else:
        # Handle other potential errors during the check
        print(f"Error checking repository: {repo_check_response.text}")

    # If the repository exists (either pre-existing or newly created), create the workspace
    if repo_exists:
        # Corrected the workspace URL construction
        workspace_url = f"{repo_url}/workspaces?workspaceId={workspace_id}"
        workspace_response = requests.post(workspace_url, headers=headers)

        if workspace_response.status_code == 200:
            print(f"Workspace '{workspace_id}' created successfully.")
        elif workspace_response.status_code == 409:
             print(f"Workspace '{workspace_id}' already exists.")
        else:
            print(f"Failed to create workspace: {workspace_response.text}")



def initialize_dataform_workspace(token, project_id, location, repository_id, workspace_id):
    """
    Initializes a Dataform workspace by creating initial files and installing packages.
    """

    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
    }

    workspace_path = f"projects/{project_id}/locations/{location}/repositories/{repository_id}/workspaces/{workspace_id}"
    base_url = f"https://dataform.googleapis.com/v1beta1/{workspace_path}"

    # 1. Create workflow_settings.yaml
    workflow_settings = {
        'defaultProject': project_id,
        'defaultLocation': location,
        'defaultDataset': 'dataform',
        'defaultAssertionDataset': 'dataform_assertions',
        'dataformCoreVersion': '3.0.16',
    }
    yaml_string = yaml.dump(workflow_settings)
    # The API expects the file content to be a base64 encoded string
    encoded_yaml = base64.b64encode(yaml_string.encode('utf-8')).decode('utf-8')

    write_file_url = f"{base_url}:writeFile"
    write_settings_payload = {
        "path": "workflow_settings.yaml",
        "contents": encoded_yaml
    }

    settings_response = requests.post(write_file_url, headers=headers, json=write_settings_payload)

    if settings_response.status_code == 200:
        print("Successfully wrote workflow_settings.yaml.")
    else:
        print(f"Failed to write workflow_settings.yaml: {settings_response.text}")
        return

    # 2. Create .gitignore
    gitignore_content = "node_modules/"
    encoded_gitignore = base64.b64encode(gitignore_content.encode('utf-8')).decode('utf-8')
    write_gitignore_payload = {
        "path": ".gitignore",
        "contents": encoded_gitignore
    }
    gitignore_response = requests.post(write_file_url, headers=headers, json=write_gitignore_payload)

    if gitignore_response.status_code == 200:
        print("Successfully wrote .gitignore.")
    else:
        print(f"Failed to write .gitignore: {gitignore_response.text}")
        return

    # 3. Install npm packages
    print("Installing npm packages...")
    install_packages_url = f"{base_url}:installNpmPackages"
    packages_response = requests.post(install_packages_url, headers=headers)

    if packages_response.status_code == 200:
        print("NPM packages installed successfully.")
    else:
        print(f"Failed to install NPM packages: {packages_response.text}")

def get_dataform_workspace_contents(
    token: str,
    project_id: str,
    location: str,
    repository_id: str,
    workspace_id: str,
    dataform_dir: str = "definitions",
):
    """
    Lists files in a Dataform workspace's 'definitions' folder, reads their content,
    and provides an explanation.

    This function recursively lists all files within the 'definitions' directory,
    reads the content of each one, decodes it, and generates a high-level summary
    of what the SQLX file does based on its configuration and query.

    Returns:
        A list of dictionaries, where each dictionary contains the file path, its
        decoded content, and an explanation. Returns None on failure.
    """

    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
    }

    try:
        print(f"Querying Dataform workspace files in '{dataform_dir}/' folder...")
        all_files = list_all_dataform_files(token, dataform_dir)
    except requests.exceptions.HTTPError as e:
        print(f"API Error while listing files: {e}\nResponse body: {e.response.text}")
        return None

    # Read the content of each file
    file_contents = []
    read_endpoint_base = (
        f"https://dataform.googleapis.com/v1/projects/{project_id}/"
        f"locations/{location}/repositories/{repository_id}/workspaces/{workspace_id}:readFile"
    )

    for file_path in all_files:
        try:
            response = requests.get(read_endpoint_base, headers=headers, params={"path": file_path})
            response.raise_for_status()

            encoded_content = response.json().get("fileContents", "")
            decoded_content = base64.b64decode(encoded_content).decode("utf-8")

            explanation = "Could not determine the purpose of this file."
            if file_path.endswith(".sqlx"):
                config_type_match = re.search(r'type:\s*["\'](\w+)["\']', decoded_content)
                action_type = config_type_match.group(1) if config_type_match else "action"
                refs = re.findall(r"ref\(['\"]([\w_]+)['\"]\)", decoded_content)

                if refs:
                    explanation = f"This file defines a new {action_type} that depends on the following source(s): {', '.join(refs)}."
                else:
                    explanation = f"This file defines a new {action_type}."

            file_contents.append({
                "file_path": file_path,
                "content": decoded_content,
                "explanation": explanation
            })

        except requests.exceptions.HTTPError as e:
            print(f"API Error reading file '{file_path}': {e}")
            continue
        except Exception as e:
            print(f"An error occurred processing file '{file_path}': {e}")
            continue

    return file_contents

def execute_dataform_pipeline(
    token: str,
    project_id: str,
    location: str,
    repository_id: str,
    workspace_id: str,
    service_account_email: str,
    target_table_name: str = ""
)-> dict:
    """
    Compiles a Dataform workspace, executes the pipeline, and waits for it to complete.

    If a target_table_name is provided, only actions tagged with that name will be executed.

    Returns:
        The final workflow invocation object, or None if any step fails.
    """

    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
    }

    # --- Step 1: Compile the workspace ---
    print("Compiling Dataform workspace...")
    compilation_endpoint = (
        f"https://dataform.googleapis.com/v1/projects/{project_id}/"
        f"locations/{location}/repositories/{repository_id}/compilationResults"
    )
    workspace_resource_name = (
        f"projects/{project_id}/locations/{location}/"
        f"repositories/{repository_id}/workspaces/{workspace_id}"
    )
    compilation_body = {"workspace": workspace_resource_name}

    try:
        response = requests.post(compilation_endpoint, headers=headers, json=compilation_body)
        response.raise_for_status()
        compilation_result = response.json()
        compilation_result_name = compilation_result.get("name")
        print(f"Successfully compiled. Result name: {compilation_result_name}")
    except requests.exceptions.HTTPError as e:
        print(f"API Error during compilation: {e}\nResponse body: {e.response.text}")
        return None

    # --- Step 2: Execute the compiled result ---
    print("Executing the pipeline...")
    invocation_endpoint = (
        f"https://dataform.googleapis.com/v1/projects/{project_id}/"
        f"locations/{location}/repositories/{repository_id}/workflowInvocations"
    )

    invocation_config = {
        "serviceAccount": service_account_email
    }

    # If a target_table_name is provided, include it in the invocationConfig to filter execution by tags.
    if len(target_table_name) > 0:
        invocation_config["includedTags"] = [target_table_name]
        print(f"Executing only actions tagged with: {target_table_name}")

    invocation_body = {
        "compilationResult": compilation_result_name,
        "invocationConfig": invocation_config
    }

    try:
        response = requests.post(invocation_endpoint, headers=headers, json=invocation_body)
        response.raise_for_status()
        workflow_invocation = response.json()
        invocation_name = workflow_invocation.get('name')
        print(f"Successfully started workflow invocation: {invocation_name}")
    except requests.exceptions.HTTPError as e:
        print(f"API Error during execution: {e}\nResponse body: {e.response.text}")
        return None

    # --- Step 3: Wait for the execution to complete ---
    print("\nWaiting for execution to complete...")
    status_endpoint = f"https://dataform.googleapis.com/v1/{invocation_name}"

    while True:
        try:
            status_response = requests.get(status_endpoint, headers=headers)
            status_response.raise_for_status()
            invocation_details = status_response.json()
            current_state = invocation_details.get("state")

            print(f"Current state: {current_state}")

            if current_state in ["SUCCEEDED", "FAILED", "CANCELLED"]:
                print(f"Execution finished with state: {current_state}")
                if current_state == "SUCCEEDED":
                     print("Pipeline executed successfully.")
                else:
                     print("Pipeline execution did not succeed.")
                return invocation_details

            # Wait for 2 seconds before checking the status again
            time.sleep(2)

        except requests.exceptions.HTTPError as e:
            print(f"API Error while checking status: {e}\nResponse body: {e.response.text}")
            return None


def get_executed_files_from_invocation(token: str, invocation_details: dict) -> list[str]:
    """
    Extracts the list of executed file paths from a Dataform workflow invocation object.
    This version includes detailed print statements for debugging.
    """

    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
    }

    compilation_result_name = invocation_details.get("compilationResult")
    if not compilation_result_name:
        print("No compilationResult found in invocation details.")
        return []

    compilation_result_url = f"https://dataform.googleapis.com/v1/{compilation_result_name}"

    try:
        print(f"Fetching compilation result from: {compilation_result_url}")
        response = requests.get(compilation_result_url, headers=headers)
        response.raise_for_status()
        compilation_result = response.json()

        # --- New: Print the full compilation result for inspection ---
        print("\n--- Full Compilation Result ---")
        print(json.dumps(compilation_result, indent=2))
        print("-------------------------------\n")

    except requests.exceptions.HTTPError as e:
        print(f"API Error during compilation result fetch: {e}\nResponse body: {e.response.text}")
        return []

    executed_files = []
    included_tags = invocation_details.get("invocationConfig", {}).get("includedTags", [])
    print(f"Execution was filtered by the following tags: {included_tags}")

    if "compilationResultActions" in compilation_result:
        print("\n--- Analyzing Compiled Actions ---")
        for action in compilation_result["compilationResultActions"]:
            file_path = action.get("filePath", "N/A")
            action_tags = action.get("tags", [])

            print(f"Found action for file: {file_path} with tags: {action_tags}")

            if not included_tags or any(tag in action_tags for tag in included_tags):
                if "filePath" in action:
                    print(f"  -> Match found! Adding '{file_path}' to the list of executed files.")
                    executed_files.append(action["filePath"])
        print("--------------------------------\n")
    else:
        print("Warning: 'compilationResultActions' not found in the compilation result.")


    return executed_files

def execute_pipeline_code(target_tag_value: str) -> dict:
    """
    Executes the full Dataform pipeline to create whatever is defined in the
    Workspace files.
    """

    # Validate the 'dataset' parameter to ensure it's a valid choice.
    valid_target_tag_values = ["reports", ""]
    if target_tag_value not in valid_target_tag_values:
        raise ValueError(f"Invalid dataset '{target_tag_value}' provided. Please use one of {valid_target_tag_values}.")


    print("--- Tool: Executing the Dataform pipeline... ---")

    token = get_auth_token()
    details = {}
    # Execute the pipeline from the workspace
    final_invocation_details = execute_dataform_pipeline(
        token,
        PROJECT_ID,
        LOCATION,
        DATAFORM_REPO,
        DATAFORM_WORKSPACE,
        SERVICE_ACCOUNT_FOR_DATAFORM,
        target_tag_value
    )

    if final_invocation_details:
        print("\n--- Fetching executed files ---")

        dataform_directory = "definitions"

        if target_tag_value in ["reports"]:
            dataform_directory = "definitions/reports"

        executed_files = list_all_dataform_files(token, dataform_directory)

        details["invocation_details"]=final_invocation_details

        if executed_files:
            details["executed_files"]=executed_files
            print("The following files were executed:")
            for file_path in executed_files:
                print(f"- {file_path}")
        else:
            print("Could not retrieve the list of executed files.")

        print("\n--- Final Invocation Details ---")
        print(json.dumps(final_invocation_details, indent=2))
        print("------------------------------")

    return {"status": "success", "Response": final_invocation_details, "Details":executed_files}

def explain_pipeline_code() -> Dict[str, Any]:
    """
    Gets the content and explanations of all the files in the Dataform repo
    definitions folder and return them as a dictionary.
    """
    print("--- Tool: Getting explanations for the files in the workspace... ---")

    token = get_auth_token()

    workspace_contents = get_dataform_workspace_contents(
        token,
        PROJECT_ID,
        LOCATION,
        DATAFORM_REPO,
        DATAFORM_WORKSPACE
    )

    if workspace_contents:
        return {"status": "success", "Details": workspace_contents}
    else:
        print("Could not retrieve the workspace contents.")
        # Return a dictionary indicating failure and an empty list
        return {"status": "error", "An error occured. Workspace contents are empty": []}

def list_all_dataform_files(token: str, dataform_dir: str = "definitions"):
    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
    }

    query_endpoint = (
        f"https://dataform.googleapis.com/v1/projects/{PROJECT_ID}/"
        f"locations/{LOCATION}/repositories/{DATAFORM_REPO}/workspaces/{DATAFORM_WORKSPACE}:queryDirectoryContents"
    )

    all_files = set()

    dirs_to_query = [dataform_dir]
    known_dirs = {dataform_dir}

    while dirs_to_query:
        path_to_query = dirs_to_query.pop(0)

        params = {"path": path_to_query}
        response = requests.get(query_endpoint, headers=headers, params=params)

        if response.status_code == 404:
            print(f"Warning: Directory not found at path: '{path_to_query}'. Skipping.")
            continue
        response.raise_for_status()

        for entry in response.json().get("directoryEntries", []):
            if "directory" in entry:
                full_path = entry['directory']
                if full_path not in known_dirs:
                    dirs_to_query.append(full_path)
                    known_dirs.add(full_path)
            elif "file" in entry:
                full_path = entry['file']
                all_files.add(full_path)

    return sorted(list(all_files))

def get_dataform_workspace_contents(
    token: str,
    project_id: str,
    location: str,
    repository_id: str,
    workspace_id: str,
    dataform_dir: str = "definitions",
):
    """
    Lists files in a Dataform workspace's 'definitions' folder, reads their content,
    and provides an explanation.

    This function recursively lists all files within the 'definitions' directory,
    reads the content of each one, decodes it, and generates a high-level summary
    of what the SQLX file does based on its configuration and query.

    Returns:
        A list of dictionaries, where each dictionary contains the file path, its
        decoded content, and an explanation. Returns None on failure.
    """

    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
    }

    try:
        print(f"Querying Dataform workspace files in '{dataform_dir}/' folder...")
        all_files = list_all_dataform_files(token, dataform_dir)
    except requests.exceptions.HTTPError as e:
        print(f"API Error while listing files: {e}\nResponse body: {e.response.text}")
        return None

    # Read the content of each file
    file_contents = []
    read_endpoint_base = (
        f"https://dataform.googleapis.com/v1/projects/{project_id}/"
        f"locations/{location}/repositories/{repository_id}/workspaces/{workspace_id}:readFile"
    )

    for file_path in all_files:
        try:
            response = requests.get(read_endpoint_base, headers=headers, params={"path": file_path})
            response.raise_for_status()

            encoded_content = response.json().get("fileContents", "")
            decoded_content = base64.b64decode(encoded_content).decode("utf-8")

            explanation = "Could not determine the purpose of this file."
            if file_path.endswith(".sqlx"):
                config_type_match = re.search(r'type:\s*["\'](\w+)["\']', decoded_content)
                action_type = config_type_match.group(1) if config_type_match else "action"
                refs = re.findall(r"ref\(['\"]([\w_]+)['\"]\)", decoded_content)

                if refs:
                    explanation = f"This file defines a new {action_type} that depends on the following source(s): {', '.join(refs)}."
                else:
                    explanation = f"This file defines a new {action_type}."

            file_contents.append({
                "file_path": file_path,
                "content": decoded_content,
                "explanation": explanation
            })

        except requests.exceptions.HTTPError as e:
            print(f"API Error reading file '{file_path}': {e}")
            continue
        except Exception as e:
            print(f"An error occurred processing file '{file_path}': {e}")
            continue

    return file_contents

def delete_dataform_folder(
    token: str,
    project_id: str,
    location: str,
    repository_id: str,
    workspace_id: str,
    folder: str
):
    """
    Deletes a Dataform folder only if it exists.
    """

    if folder is None:
        raise ValueError("The 'folder' parameter cannot be None.")

    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
    }
    instruction_path = ".gdeagent/instructions"

    # --- Step 1: Check if the directory exists before trying to delete it ---
    #queryDirectoryContents will return a 404 if the path does not exist.
    check_endpoint = (
        f"https://dataform.googleapis.com/v1beta1/projects/{project_id}/"
        f"locations/{location}/repositories/{repository_id}/workspaces/{workspace_id}:queryDirectoryContents"
    )

    print(f"Checking for existence of directory: '{folder}'...")
    try:
        check_response = requests.get(check_endpoint, headers=headers, params={"path": folder})

        # A 404 status code means the directory was not found.
        if check_response.status_code == 404:
            print(f"Directory '{folder}' not found. Nothing to delete.")
            return  # Exit the function gracefully

        # Raise an exception for any other HTTP errors during the check.
        check_response.raise_for_status()

    except requests.exceptions.HTTPError as e:
        print(f"An error occurred while checking for the directory: {e}")
        return

    # --- Step 2: If the check was successful, the directory exists, so we delete it ---
    print(f"Directory '{folder}' found. Proceeding with deletion...")
    delete_endpoint = (
        f"https://dataform.googleapis.com/v1beta1/projects/{project_id}/"
        f"locations/{location}/repositories/{repository_id}/workspaces/{workspace_id}:removeDirectory"
    )
    body = {"path": folder}

    try:
        response = requests.post(delete_endpoint, headers=headers, json=body)
        response.raise_for_status()  # Raise an exception for bad status codes during deletion
        print(f"Successfully deleted directory: '{folder}'")
        return response.json()
    except requests.exceptions.HTTPError as e:
        print(f"An error occurred during deletion: {e}")
        # Re-raise the exception to be handled by the caller if deletion fails
        raise e

def create_dataform_instructions_file(
    token: str,
    project_id: str,
    location: str,
    repository_id: str,
    workspace_id: str,
    filename: str,
    content: str
):
    """
    Creates a Dataform instruction file with the given content.

    Returns:
        The JSON response from the Dataform API.
    """

    # The Dataform API endpoint for writing a file
    # This is based on the v1beta1 API version
    api_endpoint = (
        f"https://dataform.googleapis.com/v1beta1/projects/{project_id}/"
        f"locations/{location}/repositories/{repository_id}/workspaces/{workspace_id}:writeFile"
    )

    # Instruction files must be placed in the .gdeagent/instructions/ directory
    # No subdirectories are allowed.
    instruction_path = f".gdeagent/instructions/{filename}"

    # The content of the file must be base64 encoded
    encoded_content = base64.b64encode(content.encode("utf-8")).decode("utf-8")

    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
    }

    body = {
        "path": instruction_path,
        "contents": encoded_content,
    }

    response = requests.post(api_endpoint, headers=headers, json=body)
    response.raise_for_status()  # Raise an exception for bad status codes

    return response.json()

### 2.4. Dataplex utils


In [None]:
def get_operation_success_status(token, operation_name) -> bool:
    """
    Gets the status of an operation until it is 'done'.
    """

    url = f"{BASE_URL_FOR_DATAPLEX_SCAN}/{operation_name}"
    headers = {"Authorization": f"Bearer {token}"}

    response = requests.get(url, headers=headers)
    response.raise_for_status()
    operation = response.json()

    if operation.get("done"):
        if operation.get("error"):
            return False
        else:
            return True

def generate_patch_label_request_body(scan_type, scan_id):
    """
        Returns the patch labels json that needs to be attached to the source table to tie programmatic scans to the UI

        Args:
            scan_type (str): Type of scan (DATA_PROFLE_SCAN/DATA_DOCUMENTATION_SCAN/DATA_KNOWLEDGE_ENGINE_SCAN)
            scan_id (str): Scan id
            operation_type (str): Type of operation (CREATE_SCAN/RUN_SCAN)

        Returns:
            string: json with the patch labels
        """
    label_json=""
    scan_stub = ""
    if scan_type == "DATA_PROFILE_SCAN":
        scan_stub="dp"
    elif scan_type == "DATA_DOCUMENTATION_SCAN":
        scan_stub="data-documentation"
    elif scan_type == "DATA_KNOWLEDGE_ENGINE_SCAN":
        scan_stub="data-documentation"


    label_json = {
        "labels": {f"dataplex-{scan_stub}-published-scan":f"{scan_id}",
                 f"dataplex-{scan_stub}-published-project":f"{PROJECT_ID}",
                 f"dataplex-{scan_stub}-published-location":f"{LOCATION}"}
      }

    return label_json

def patch_source_table_with_labels(token, dataset, scan_type, scan_nm, source_table_nm=None):
    """
    Patches the source BigQuery table or dataset with labels to correlate with the data scans.

    If a table name is provided, it will patch the table. Otherwise, it will patch the dataset.

    Args:
        dataset (str): The ID of the dataset.
        scan_type (str): The type of the scan.
        scan_nm (str): The name of the scan.
        source_table_nm (str, optional): The name of the source table. Defaults to None.

    Returns:
        dict: The final operation response if successful, None if timed out or failed.
    """

    if source_table_nm:
        API_ENDPOINT = f"https://bigquery.googleapis.com/bigquery/v2/projects/{PROJECT_ID}/datasets/{dataset}/tables/{source_table_nm}"
    else:
        API_ENDPOINT = f"https://bigquery.googleapis.com/bigquery/v2/projects/{PROJECT_ID}/datasets/{dataset}"

    patch_request_body = generate_patch_label_request_body(scan_type, scan_nm)

    headers = {
        "Authorization": f"Bearer {token}",
        "Content-Type": "application/json",
    }

    # Apply the patch
    try:
        response = requests.patch(API_ENDPOINT, headers=headers, json=patch_request_body)
        response.raise_for_status()  # Raise an exception for HTTP errors

    except requests.exceptions.HTTPError as http_err:
        print(f"HTTP error occurred: {http_err}")
        print(f"Response Body: {response.text}")
    except requests.exceptions.RequestException as req_err:
        print(f"An error occurred during the API call: {req_err}")

def create_table_data_documentation_scan_job(token, project_id, location, dataset_resource, data_scan_id, table_id, delete_existing=False):
    """
    Initiates the creation of a Dataplex Data Documentation scan.
    If delete_existing is True, it first deletes the scan if it already exists.
    """

    url = f"{BASE_URL_FOR_DATAPLEX_SCAN}/projects/{project_id}/locations/{location}/dataScans?dataScanId={data_scan_id}"
    headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
    if delete_existing:
        print(f"Attempting to delete existing data documentation scan '{data_scan_id}'...")
        try:
            delete_response = requests.delete(url, headers=headers)
            delete_response.raise_for_status()
            print(f"Successfully initiated deletion for scan '{data_scan_id}'.")
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 404:
                print(f"Scan '{data_scan_id}' not found, no need to delete.")
            else:
                raise
    payload = {
        "displayName": data_scan_id,
        "data": {"resource": f"{dataset_resource}/tables/{table_id}"},
        "executionSpec": {"trigger": {"onDemand": {}}},
        "type": "DATA_DOCUMENTATION",
        "dataDocumentationSpec": {}
    }
    print(f"Attempting to create data documentation scan {data_scan_id}...")
    try:
        response = requests.post(url, headers=headers, data=json.dumps(payload))
        response.raise_for_status()
        operation_name = response.json().get("name")
        print(f"Scan creation initiated. Operation: {operation_name}")
        return operation_name
    except requests.exceptions.HTTPError as e:
        # If a 409 conflict occurs, it means the resource exists, which is acceptable.
        if e.response.status_code == 409:
            print(f"Scan '{data_scan_id}' already exists. Skipping creation and proceeding.")
            return None
        else:
            raise

def create_dataset_documentation_scan_job(token, project_id, location, dataset_resource, scan_id, delete_existing=False):
    """
    Initiates the creation of a Dataplex Knowledge Engine scan.
    If delete_existing is True, it first deletes the scan if it already exists.
    """

    url = f"{BASE_URL_FOR_DATAPLEX_SCAN}/projects/{project_id}/locations/{location}/dataScans?dataScanId={scan_id}"
    headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
    if delete_existing:
        print(f"Attempting to delete existing data scan '{scan_id}'...")
        try:
            delete_response = requests.delete(url, headers=headers)
            delete_response.raise_for_status()
            print(f"Successfully initiated deletion for scan '{scan_id}'.")
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 404:
                print(f"Scan '{scan_id}' not found, no need to delete.")
            else:
                # Re-raise other HTTP errors during deletion.
                raise
    payload = {
        "displayName": scan_id,
        "type": "DATA_DOCUMENTATION",
        "data": {"resource": dataset_resource},
        "dataDocumentationSpec": {},
        "executionSpec": {"trigger": {"onDemand": {}}}
    }
    print(f"Attempting to create data scan '{scan_id}'...")
    try:
        response = requests.post(url, headers=headers, data=json.dumps(payload))
        response.raise_for_status()
        operation_name = response.json().get("name")
        print(f"Scan creation initiated. Operation: {operation_name}")
        return operation_name
    except requests.exceptions.HTTPError as e:
        # If a 409 conflict occurs, it means the resource exists, which is acceptable.
        if e.response.status_code == 409:
            print(f"Scan '{scan_id}' already exists. Skipping creation and proceeding.")
            return None
        else:
            # For any other HTTP error, re-raise the exception.
            raise

def delete_data_scan(token, project_id, location, scan_id):
    """
    Deletes a Dataplex data scan.

    Returns:
        bool: True if deletion was successful or the scan didn't exist, False otherwise.
    """

    # The API endpoint for deleting a specific data scan
    url = f"{BASE_URL_FOR_DATAPLEX_SCAN}/projects/{project_id}/locations/{location}/dataScans/{scan_id}"
    headers = {"Authorization": f"Bearer {token}"}

    print(f"Attempting to delete data scan '{scan_id}'...")
    try:
        response = requests.delete(url, headers=headers)
        # Raises an HTTPError for bad responses (4xx or 5xx)
        response.raise_for_status()
        print(f"Successfully initiated deletion for scan '{scan_id}'.")
        # The API returns a long-running operation, you might need to wait for it.
        return True
    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 404:
            print(f"Scan '{scan_id}' not found. No action needed.")
            return True # Considered success as the scan is not present
        else:
            print(f"An HTTP error occurred: {e}")
            print(f"Response content: {e.response.text}")
            return False
    except requests.exceptions.RequestException as e:
        print(f"An unexpected error occurred: {e}")
        return False

def create_scan_jobs(dataset_type: str, scan_type) -> dict:
    """
    Creates knowledge and documentation scans for a specified dataset.
    The dataset can be the operational dataset or the star schema dataset, also known as
    the data warehouse dataset.

    Valid inputs for 'dataset_type' are 'OLTP' or 'OLAP'
    Valid inputs for 'scan_type' are 'DATASET_DOCUMENTATION_SCAN' or 'TABLE_DOCUMENTATION_SCAN'
    """

    # Validate the 'dataset_type' parameter to ensure it's a valid choice.
    valid_dataset_type = ["OLTP", "OLAP"]
    if dataset_type not in valid_dataset_type:
        raise ValueError(f"Invalid dataset type '{dataset_type}' provided. Please use one of {valid_dataset_type}.")

    # Validate the 'scan_type' parameter to ensure it's a valid choice.
    valid_scan_types = ["DATASET_DOCUMENTATION_SCAN", "TABLE_DOCUMENTATION_SCAN"]
    if scan_type not in valid_scan_types:
        raise ValueError(f"Invalid scan type '{scan_type}' provided. Please use one of {valid_scan_types}.")

    print(f"--- Tool: Starting Data Insights scans for the {dataset_type} dataset... ---")

    token = get_auth_token()

    # Set the correct constants based on the dataset parameter.
    if dataset_type == "OLTP":
        dataset_resource = OLTP_DATASET_RESOURCE_URI
        dataset_id = OLTP_DATASET_ID
        knowledge_scan_id = sanitize_string_with_hyphens(dataset_id + "-dataset-documentation-scan")
        metadata_dataset_id = OLTP_METADATA_DATASET_ID
    else:  # This will be 'dwh' due to the validation above
        dataset_resource = DWH_DATASET_RESOURCE_URI
        dataset_id = DWH_DATASET_ID
        knowledge_scan_id = sanitize_string_with_hyphens(dataset_id + "-dataset-documentation-scan")
        metadata_dataset_id = DWH_METADATA_DATASET_ID

    try:

        scan_job = {}
        scan_job_creation = {}
        details= {}

        if(scan_type == "DATASET_DOCUMENTATION_SCAN"):
          # Create the knowledge scan and wait
          delete_data_scan(token, PROJECT_ID, LOCATION, knowledge_scan_id)
          op_name = create_dataset_documentation_scan_job(token, PROJECT_ID, LOCATION, dataset_resource, knowledge_scan_id)
          patch_source_table_with_labels(token, dataset_id, "DATA_KNOWLEDGE_ENGINE_SCAN", knowledge_scan_id)
          scan_job[knowledge_scan_id]=op_name
          scan_job_creation[knowledge_scan_id]=False

          details[knowledge_scan_id] = f"Created the Data Insights scan called {knowledge_scan_id}."

        else:
          # Create the data scans for each table in the dataset
          for table in get_dataset_tables(dataset_id):
            data_scan_id = sanitize_string_with_hyphens(f"{table.table_id}-table-documentation-scan")
            delete_data_scan(token, PROJECT_ID, LOCATION, data_scan_id)
            op_name = create_table_data_documentation_scan_job(token, PROJECT_ID, LOCATION, dataset_resource, data_scan_id, table.table_id)
            patch_source_table_with_labels(token, dataset_id, "DATA_DOCUMENTATION_SCAN", data_scan_id,  table.table_id)
            scan_job[data_scan_id]=op_name
            scan_job_creation[data_scan_id]=False
            details[data_scan_id] = f"Created the Data Insights scan for table {table} called {data_scan_id}."


        # Pool constantly to see if all the scan jobs have been created
        for _ in range(MAX_POLLS):
          for data_scan_id, op_name in scan_job.items():
            scan_job_creation[data_scan_id] = get_operation_success_status(token, op_name)

            if all(scan_job_creation.values()):
              return {"status": "success",
                      "Response": f"Successfully created the Data Insight scan(s) for the {dataset_type} dataset.",
                      "Details": details
                      }

            time.sleep(POLLING_INTERVAL_SECONDS)

    except Exception as e:
        error_message = f"An error occurred while generating the metadata: {e}"
        # Return a dictionary with an error status for ADK
        return {"status": "error", "error": error_message}


def run_scan_job(token, scan_id):
    """
    Starts a job run for the data scan.
    Returns the scan job name
    """

    url = f"{BASE_URL_FOR_DATAPLEX_SCAN}/projects/{PROJECT_ID}/locations/{LOCATION}/dataScans/{scan_id}:run"
    headers = {"Authorization": f"Bearer {token}"}
    print(f"Starting scan job for '{scan_id}'...")
    response = requests.post(url, headers=headers, data={})
    response.raise_for_status()
    job_name = response.json().get("job", {}).get("name")
    print(f"Scan job started. Job name: {job_name}")
    return job_name


def run_scan_jobs(dataset_type: str, scan_type: str) -> dict:
    """
    Starts the execution of the Data Insights scans (knowledge and documentation) for a specified
    dataset. The dataset can be the operational dataset or the star schema dataset,
    also know as the data warehouse dataset.

    Valid inputs for 'dataset_type' are 'OLTP' or 'OLAP'
    Valid inputs for 'scan_type' are 'DATASET_DOCUMENTATION_SCAN' or 'TABLE_DOCUMENTATION_SCAN'
    """

    # Validate the 'dataset_type' parameter to ensure it's a valid choice.
    valid_dataset_type = ["OLTP", "OLAP"]
    if dataset_type not in valid_dataset_type:
        raise ValueError(f"Invalid dataset type '{dataset_type}' provided. Please use one of {valid_dataset_type}.")

    # Validate the 'scan_type' parameter to ensure it's a valid choice.
    valid_scan_types = ["DATASET_DOCUMENTATION_SCAN", "TABLE_DOCUMENTATION_SCAN"]
    if scan_type not in valid_scan_types:
        raise ValueError(f"Invalid scan type '{scan_type}' provided. Please use one of {valid_scan_types}.")

    print(f"--- Tool: Starting Data Insights scans for the {dataset_type} dataset... ---")

    token = get_auth_token()
    details= {}

    # 2. Set the correct constants based on the dataset parameter.
    if dataset_type == "OLTP":
        dataset_resource = OLTP_DATASET_RESOURCE_URI
        dataset_id = OLTP_DATASET_ID
        knowledge_scan_id = sanitize_string_with_hyphens(dataset_id + "-dataset-documentation-scan")
        metadata_dataset_id = OLTP_METADATA_DATASET_ID
    else:  # This will be 'dwh' due to the validation above
        dataset_resource = DWH_DATASET_RESOURCE_URI
        dataset_id = DWH_DATASET_ID
        knowledge_scan_id = sanitize_string_with_hyphens(dataset_id + "-dataset-documentation-scan")
        metadata_dataset_id = DWH_METADATA_DATASET_ID


    try:

        if(scan_type == "DATASET_DOCUMENTATION_SCAN"):
          # Run the knowledge scan
          job_name = run_scan_job(token, knowledge_scan_id)
          details[knowledge_scan_id]=f"Started Data Insights job {knowledge_scan_id} for dataset {dataset_id}"

        else:
          # Run the documentation data scans
          for table in get_dataset_tables(f"{dataset_id}"):
            data_scan_id = sanitize_string_with_hyphens(f"{table.table_id}-table-documentation-scan")
            job_name = run_scan_job(token, data_scan_id)
            details[data_scan_id]=f"Started the Data Insights job {data_scan_id} for table {table}"

        return {"status": "success",
                "Response": f"Successfully scheduled the runs for all the Data Insights scans.",
                "Details": details
                }

    except Exception as e:
        error_message = f"An error occurred during metadata generation: {e}"
        # Return a dictionary with an error status for ADK
        return {"status": "error", "error": error_message}


def get_latest_scan_jobs_run_status(dataset_type: str) -> dict:
    """
    Checks the execution status of the Data Insights scans (knowledge and documentation)
    for a specified dataset. The dataset can be the operational dataset or the star schema
    dataset, also know as data warehouse dataset.

    Valid inputs for 'dataset_type' are 'OLTP' or 'OLAP'.
    """

    # Validate the 'dataset' parameter to ensure it's a valid choice.
    valid_datasets = ["OLTP", "OLAP"]
    if dataset_type not in valid_datasets:
        raise ValueError(f"Invalid dataset '{dataset_type}' provided. Please use one of {valid_datasets}.")

    print(f"--- Tool: Getting the Data Insight scans status for the {dataset_type} dataset... ---")

    token = get_auth_token()

    # Set the correct constants based on the dataset parameter.
    if dataset_type == "OLTP":
        dataset_resource = OLTP_DATASET_RESOURCE_URI
        dataset_id = OLTP_DATASET_ID
        knowledge_scan_id = sanitize_string_with_hyphens(dataset_id + "-dataset-documentation-scan")
        metadata_dataset_id = OLTP_METADATA_DATASET_ID
    else:  # This will be 'dwh' due to the validation above
        dataset_resource = DWH_DATASET_RESOURCE_URI
        dataset_id = DWH_DATASET_ID
        knowledge_scan_id = sanitize_string_with_hyphens(dataset_id + "-dataset-documentation-scan")
        metadata_dataset_id = DWH_METADATA_DATASET_ID

    try:
        scan_job_details = {}

        # Run the knowledge scan
        scan_job_details[knowledge_scan_id] = get_latest_dataplex_scan_job_details(token, knowledge_scan_id)

        # Run the documentation data scans
        for table in get_dataset_tables(f"{dataset_id}"):
          data_scan_id = sanitize_string_with_hyphens(f"{table.table_id}-table-documentation-scan")
          scan_job_details[data_scan_id] = get_latest_dataplex_scan_job_details(token, data_scan_id)

        print(scan_job_details)

        running_jobs = sum(details['state'] == 'RUNNING' for details in scan_job_details.values() if details)
        succeeded_jobs = sum(details['state'] == 'SUCCEEDED' for details in scan_job_details.values() if details)
        failed_jobs = sum(details['state'] == 'FAILED' for details in scan_job_details.values() if details)
        pending_jobs = sum(details['state'] == 'PENDING' for details in scan_job_details.values() if details)


        overall_status = f"{running_jobs} running jobs, {succeeded_jobs} jobs succeeded, {failed_jobs} jobs failed, and {pending_jobs} jobs are pending out of {len(scan_job_details)} total jobs"

        if failed_jobs > 0:
            return {"status": "error", "final": "yes", "error": f"{overall_status}. Please run the Data Insights job creation and execution steps again.", "Details": scan_job_details, "Overview": scan_job_details}
        elif pending_jobs+running_jobs > 0:
            return {"status": "pending", "final": "no", "Response": f"{overall_status}. Please check again in a few moments...", "Details":scan_job_details, "Overview": scan_job_details}
        else:
            return {"status": "success", "final": "yes", "Response": f"{overall_status}.", "Details":scan_job_details, "Overview": scan_job_details}

    except Exception as e:
        error_message = f"An error occurred during metadata generation: {e}"
        return {"status": "error", "error": error_message}

def get_latest_dataplex_scan_job_details(token: str, scan_id: str) -> dict:
    """
    Gets the latest run job state and elapsed time for a certain scan id.
    """
    url = f"{BASE_URL_FOR_DATAPLEX_SCAN}/projects/{PROJECT_ID}/locations/{LOCATION}/dataScans/{scan_id}/jobs"
    headers = {"Authorization": f"Bearer {token}"}

    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        jobs = response.json().get("dataScanJobs", [])
        if jobs:
            sorted_jobs = sorted(jobs, key=lambda x: x.get('createTime', ''), reverse=True)
            latest_job = sorted_jobs[0]

            state = latest_job.get("state")
            start_time_str = latest_job.get("startTime")
            end_time_str = latest_job.get("endTime")

            elapsed_time_str = "Not started"
            if start_time_str:
                try:

                    def parse_timestamp(ts_str):
                        # truncating microseconds to 6 digits
                        if '.' in ts_str:
                            main_part, frac_part = ts_str.split('.', 1)
                            frac_part = frac_part.replace('Z', '')[:6]
                            ts_str = f"{main_part}.{frac_part}"
                            return datetime.datetime.strptime(ts_str, "%Y-%m-%dT%H:%M:%S.%f").replace(tzinfo=datetime.timezone.utc)
                        else:
                            ts_str = ts_str.replace('Z', '')
                            return datetime.datetime.strptime(ts_str, "%Y-%m-%dT%H:%M:%S").replace(tzinfo=datetime.timezone.utc)

                    start_time = parse_timestamp(start_time_str)

                    end_time = datetime.datetime.now(datetime.timezone.utc)
                    if end_time_str:
                        end_time = parse_timestamp(end_time_str)

                    elapsed_time = end_time - start_time
                    total_seconds = int(elapsed_time.total_seconds())
                    hours, remainder = divmod(total_seconds, 3600)
                    minutes, seconds = divmod(remainder, 60)
                    elapsed_time_str = f"{hours}h {minutes}m {seconds}s"
                except Exception as e:
                    elapsed_time_str = f"Could not calculate elapsed time: {e}"

            return {"state": state, "elapsed_time": elapsed_time_str}
        else:
            return None

    except requests.exceptions.HTTPError as e:
        print(f"An HTTP error occurred: {e}")
        return None
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return None

In [None]:
def describe_dataset(description_df: pd.DataFrame) -> str:
    """
    Converts the dataset description DataFrame into a human-readable string.

    Returns:
        A string containing the dataset's description.
    """
    try:
        # Extracts the first description from the DataFrame
        description = description_df["dataset_description"].iloc[0]
        return f"Dataset Overview:\n{description}"
    except (IndexError, KeyError):
        return "No dataset description was found."

def describe_relationships(relationships_df: pd.DataFrame) -> List[str]:
    """
    Converts the relationships DataFrame into human-readable sentences.

    Returns:
        A list of strings, where each string describes a table relationship.
    """
    descriptions = []
    for _, row in relationships_df.iterrows():
        # Constructs a sentence describing the join between two tables
        sentence = (
            f"Table '{row['table_1']}' connects to table '{row['table_2']}' by joining "
            f"'{row['table_1']}.{row['table_1_column']}' with '{row['table_2']}.{row['table_2_column']}'."
        )
        descriptions.append(sentence)
    return descriptions

def describe_tables(tables_df: pd.DataFrame) -> List[str]:
    """
    Converts the tables DataFrame into human-readable descriptions.

    Returns:
        A list of strings, where each string is a description of a table.
    """
    descriptions = []
    for _, row in tables_df.iterrows():
        # Formats a description for each table
        description = f"Table '{row['name']}': {row['description']}"
        descriptions.append(description)
    return descriptions

def describe_columns(columns_df: pd.DataFrame) -> str:
    """
    Converts the columns DataFrame into a structured, human-readable text block.

    Returns:
        A single string that describes the columns for each table.
    """
    full_description = "Column Details for Each Table:\n"
    # Group the DataFrame by table name to process each table's columns together
    for table_name, group in columns_df.groupby("table_name"):
        full_description += f"\n--- Table: {table_name} ---\n"
        for _, row in group.iterrows():
            # Add a formatted line for each column's name and description
            full_description += f"- {row['column_name']}: {row['column_description']}\n"
    return full_description


def get_scan_results(token,scan_id):
    """
    Retrieves the results of the data scan.
    """
    print("Fetching scan results...")

    url = f"{BASE_URL_FOR_DATAPLEX_SCAN}/projects/{PROJECT_ID}/locations/{LOCATION}/dataScans/{scan_id}?view=FULL"
    headers = {"Authorization": f"Bearer {token}"}
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    print("Successfully fetched scan results.")
    return response.json()


def persist_dataplex_scan_output_to_bq_tables(dataset_type: str) -> dict:
    """
    Saves to BigQuery the Data Insights scans (knowledge and documentation)
    for a specified dataset. The dataset can be the operational dataset or the star schema
    dataset, also know as data warehouse dataset.

    Valid inputs for 'dataset_type' are 'OLTP' or 'OLAP'.
    """


    # Validate the 'dataset' parameter to ensure it's a valid choice.
    valid_dataset_types = ["OLTP", "OLAP"]
    if dataset_type not in valid_dataset_types:
        raise ValueError(f"Invalid dataset '{dataset_type}' provided. Please use one of {valid_dataset_types}.")

    print(f"--- Tool: Starting saving the Data Insights scans for the {dataset_type} dataset... ---")

    token = get_auth_token()

    # 2. Set the correct constants based on the dataset parameter.
    if dataset_type == "OLTP":
        dataset_resource = OLTP_DATASET_RESOURCE_URI
        dataset_id = OLTP_DATASET_ID
        knowledge_scan_id = sanitize_string_with_hyphens(dataset_id + "-dataset-documentation-scan")
        metadata_dataset_id = OLTP_METADATA_DATASET_ID
    else:  # This will be 'dwh' due to the validation above
        dataset_resource = DWH_DATASET_RESOURCE_URI
        dataset_id = DWH_DATASET_ID
        knowledge_scan_id = sanitize_string_with_hyphens(dataset_id + "-dataset-documentation-scan")
        metadata_dataset_id = DWH_METADATA_DATASET_ID


    table_scan_suffix = "table-documentation-scan"


    try:

        relationships = {}
        details = {}

        # Fetch the knowledge scan and display the results
        results = get_scan_results(token, knowledge_scan_id)


        # Part 1: Dataset description
        datasetDescription = results.get("dataDocumentationResult", {}).get("datasetResult", {}).get("overview")
        details["Description"] = datasetDescription

        data = {
                "dataset_description": datasetDescription
        }

        if datasetDescription is not None:
            truncate_bigquery_table(f"{PROJECT_ID}.{metadata_dataset_id}.dataset_description")
            write_dict_to_bigquery(f"{PROJECT_ID}.{metadata_dataset_id}.dataset_description", data)
            update_bigquery_metadata(PROJECT_ID, dataset_id, datasetDescription)
            #print("PART 1 - Persisted dataset description")


        # Part 2: Table relationships
        schemaRelationships = results.get("dataDocumentationResult", {}).get("datasetResult", {}).get("schemaRelationships")
        details["Relationships"] = schemaRelationships
        if schemaRelationships is not None:
            truncate_bigquery_table(f"{PROJECT_ID}.{metadata_dataset_id}.dataset_table_relationships")

        # Use a 'for' loop to iterate over each element in the list
        for i, relationship in enumerate(schemaRelationships):
            # Now 'relationship' is one of the dictionaries from the list

            # Safely access the data inside each dictionary
            join_type = relationship.get("type", "Unknown Type").replace("SCHEMA_JOIN", "JOIN")

            # Reset
            left_table = "N/A"
            left_column = "N/A"
            right_table = "N/A"
            right_column = "N/A"

            # Access elements of interest
            left_table_fqn = relationship.get("leftSchemaPaths", []).get("tableFqn", {})
            left_table_resource_uri_parts = left_table_fqn.split("/")
            left_table=left_table_resource_uri_parts[8]
            left_table_column = relationship.get("leftSchemaPaths", []).get("paths", {})[0]

            right_table_fqn = relationship.get("rightSchemaPaths", []).get("tableFqn", {})
            right_table_resource_uri_parts = right_table_fqn.split("/")
            right_table=  right_table_resource_uri_parts[8]
            right_table_column = relationship.get("rightSchemaPaths", []).get("paths", {})[0]

            row_data = {
                "table_1": left_table,
                "table_1_column": left_table_column,
                "table_2": right_table,
                "table_2_column": right_table_column,
                "join_type": join_type
            }

            write_dict_to_bigquery(f"{PROJECT_ID}.{metadata_dataset_id}.dataset_table_relationships", row_data)


        #print("PART 2 - Persisted dataset table relationships")


        # Part 3: Tables
        tableResults = results.get("dataDocumentationResult", {}).get("datasetResult", {}).get("tableResults", {})
        details["tableResults"] = tableResults
        if tableResults is not None:
            truncate_bigquery_table(f"{PROJECT_ID}.{metadata_dataset_id}.table_descriptions")
            truncate_bigquery_table(f"{PROJECT_ID}.{metadata_dataset_id}.table_column_descriptions")

        for i, tableResult in enumerate(tableResults):
          table_nm=""
          table_description=""

          table_fqn=tableResult.get("name", "")
          table_resource_uri_parts = table_fqn.split("/")
          table_nm=table_resource_uri_parts[8]
          table_description=tableResult.get("overview", "")

          row_data = {
                "name": table_nm,
                "description": table_description,
            }

          # Persist table description
          write_dict_to_bigquery(f"{PROJECT_ID}.{metadata_dataset_id}.table_descriptions", row_data)
          #print("PART 3 - Persisted table metadata")

          # Part 4: Persist table columns and descriptions
          tableColumnResults = tableResult.get("schema", {}).get("fields", [])
          details["tableColumnResults"] = tableColumnResults

          for i, tableColumnResult in enumerate(tableColumnResults):

            table_column_nm="None"
            table_column_description="None"

            table_column_nm=tableColumnResult.get("name")
            table_column_description=tableColumnResult.get("description")

            row_data = {
                  "table_name": table_nm,
                  "column_name": table_column_nm,
                  "column_description": table_column_description,
              }

            # Persist table column descriptions
            write_dict_to_bigquery(f"{PROJECT_ID}.{metadata_dataset_id}.table_column_descriptions", row_data)

          #print("PART 4 - Persisted table column metadata")

    except Exception as e:
        error_message = f"An error occurred during metadata generation: {e}"
        # Return a dictionary with an error status for ADK
        return {"status": "error", "error": error_message}

    return {"status": "success",
            "Response": f"Successfully saved the Data scans for the {dataset_type} dataset.",
            "Details": details
            }


## 3. Reporting utils

In [None]:
PIPELINE_ID = f"projects/{PROJECT_ID}/locations/{LOCATION}/repositories/{DATAFORM_REPO}/workspaces/{DATAFORM_WORKSPACE}"

# Instruction files
REPORTING_NAMING_CONVENTIONS_FILE = "03-reporting-naming-conventions.md"
REPORTING_NAMING_CONVENTIONS_CONTENT = """
Naming conventions for the reporting dataset:

* Tables:

    - Report tables: rpt_[report_name]

Always use ${ref()} to ensure dependency between nodes in dataform. ex: ${ref("referenced_sqlx_file")}
Always create a date dimension table that contains the year, month and date and link it properly to all the facts.
Always the string 'reports' and the source table names must appear as tags in the config block of the .sqlx definition files for each report.
"""


In [None]:
def set_stage_for_report_generation():
    """
    1. Persists the Data Insights scan results for the star schema dataset to instructions files for
    the Data Engineering Agent.
    2. Generates the naming conventions instructions file and saves it to the Dataform workspace
    3. Generates the agent grounding instructions file (metadata from the Dataplex scans) and saves it to the Dataform workspace
    """

    agent_grounding_content = ""

    token = get_auth_token()


    # Persist the latest Dataplex scan data to a separate dataset for use for agentic grounding
    persist_dataplex_scan_output_to_bq_tables("OLAP")
    print("1. Persisted Dataplex scan information to BQ dataset")

    # Generate agent grounding file content
    dwh_description_df = read_bigquery_table(PROJECT_ID,DWH_METADATA_DATASET_ID, "dataset_description")
    dwh_relationships_df = read_bigquery_table(PROJECT_ID,DWH_METADATA_DATASET_ID, "dataset_table_relationships")
    dwh_tables_df = read_bigquery_table(PROJECT_ID,DWH_METADATA_DATASET_ID, "table_descriptions")
    dwh_columns_df = read_bigquery_table(PROJECT_ID,DWH_METADATA_DATASET_ID, "table_column_descriptions")

    agent_grounding_instruction_file = "04-agent-grounding.md"
    agent_grounding_content = describe_dataset(dwh_description_df)
    agent_grounding_content += "\n"
    agent_grounding_content += "------------------------------------"
    agent_grounding_content += "\n"
    agent_grounding_content += f"Details for each table in the star schema dataset {DWH_METADATA_DATASET_ID}:"
    agent_grounding_content += "\n"
    agent_grounding_content += "\n".join(describe_tables(dwh_tables_df))
    agent_grounding_content += "\n"
    agent_grounding_content += "------------------------------------"
    agent_grounding_content += "\n"
    agent_grounding_content += f"Details on the relationships between tables in the star schema dataset {DWH_METADATA_DATASET_ID}:"
    agent_grounding_content += "\n"
    agent_grounding_content += "\n".join(describe_relationships(dwh_relationships_df))
    agent_grounding_content += "\n"
    agent_grounding_content += "------------------------------------"

    agent_grounding_content += f"Details on each column of the tables in the star schema dataset {DWH_METADATA_DATASET_ID}:"
    agent_grounding_content += describe_columns(dwh_columns_df)
    agent_grounding_content += "------------------------------------"

    print("2. Gathered agent grounding content")

    try:

        result = create_dataform_instructions_file(
            token,
            PROJECT_ID,
            LOCATION,
            DATAFORM_REPO,
            DATAFORM_WORKSPACE,
            filename=REPORTING_NAMING_CONVENTIONS_FILE,
            content=REPORTING_NAMING_CONVENTIONS_CONTENT
        )

        print(f"3. Successfully created the instruction file: {REPORTING_NAMING_CONVENTIONS_FILE}")



        result = create_dataform_instructions_file(
              token,
              PROJECT_ID,
              LOCATION,
              DATAFORM_REPO,
              DATAFORM_WORKSPACE,
              filename=agent_grounding_instruction_file,
              content=agent_grounding_content
          )

        print(f"4. Successfully created the instruction file: {agent_grounding_instruction_file}")




    except Exception as e:
        error_message = f"An error occurred while generating the analytical reports: {e}"
        # Return a dictionary with an error status for ADK
        return {"status": "error", "error": error_message}

    return {"status": "success",
            "Response": "Successfully completed all the pre-work.",
            }

## 4. Generate Dataplex Scans for agentic grounding for the Data Warehouse tables

### 4.1. Create & run table documentation scans to completion

#### 4.1.1. Create table documentation scan for all tables in the data warehouse dataset

In [None]:
create_scan_jobs("OLAP", "TABLE_DOCUMENTATION_SCAN")

#### 4.1.2. Execute the table documentation scan jobs for all the tables in the Data Warehouse dataset

In [None]:
run_scan_jobs("OLAP", "TABLE_DOCUMENTATION_SCAN")

#### 4.1.3. Wait for completion of all the table documentation scan jobs


Run this function over and over again until all jobs show as "SUCCEEDED" before proceeding to the next cell

In [None]:
get_latest_scan_jobs_run_status("OLAP")

### 4.2. Create and run dataset documentation scans (formerly called knowledge engine scans)

#### 4.2.1. Create the scan

In [None]:
create_scan_jobs("OLAP", "DATASET_DOCUMENTATION_SCAN")

#### 4.2.2. Run the scan

In [None]:
run_scan_jobs("OLAP", "DATASET_DOCUMENTATION_SCAN")

#### 4.2.3. Wait for its completion

Run this function over and over again until all jobs show as "SUCCEEDED" before proceeding to the next cell

In [None]:
get_latest_scan_jobs_run_status("OLAP")

## 5. Complete the pre-work needed for the Data Engineering Agent to generate the reporting mart

In [None]:
set_stage_for_report_generation()

--- Tool: Starting saving the Data Insights scans for the OLAP dataset... ---
Fetching scan results...
Successfully fetched scan results.
Executing query: TRUNCATE TABLE `data-insights-quickstart.rscw_dwh_metadata_ds.dataset_description`
Successfully updated description for dataset: data-insights-quickstart.rscw_dwh_ds
Executing query: TRUNCATE TABLE `data-insights-quickstart.rscw_dwh_metadata_ds.dataset_table_relationships`
Executing query: TRUNCATE TABLE `data-insights-quickstart.rscw_dwh_metadata_ds.table_descriptions`
Executing query: TRUNCATE TABLE `data-insights-quickstart.rscw_dwh_metadata_ds.table_column_descriptions`
1. Persisted Dataplex scan information to BQ dataset
Reading all rows from table: data-insights-quickstart.rscw_dwh_metadata_ds.dataset_description...
Successfully read 1 rows.
Reading all rows from table: data-insights-quickstart.rscw_dwh_metadata_ds.dataset_table_relationships...
Successfully read 16 rows.
Reading all rows from table: data-insights-quickstart.rs

{'status': 'success', 'Response': 'Successfully completed all the pre-work.'}

## This concludes the pre-work needed for reporting mart code generation. Proceed to the user manual for further instructions.