### <font color='#4285f4'>Overview</font>

Overview: Shows how you can call the Data Engineering agent via the REST API to correct a broken pipeline

Process Flow:
1. Create both good and bad data
2. Runs a data quality scan
3. Gets the broken rules / data
4. Creates a prompt
5. Sends the prompt to the Data Engineering Agent
6. Runs the workflow/pipeline


Cost:
* Approximate cost: Less than $1

Author:
* Adam Paternostro

In [None]:
# Architecture Diagram
from IPython.display import Image
Image(url='https://storage.googleapis.com/data-analytics-golden-demo/colab-diagrams/autonomous-pipeline-repair.png', width=1200)

### <font color='#4285f4'>Video Walkthrough</font>

[Video](https://storage.googleapis.com/data-analytics-golden-demo/colab-videos/Demo-Agent-Self-Healing-Pipeline.mp4)

In [None]:
from IPython.display import HTML

HTML("""
<video width="800" height="600" controls>
  <source src="https://storage.googleapis.com/data-analytics-golden-demo/colab-videos/Demo-Agent-Self-Healing-Pipeline.mp4" type="video/mp4">
  Your browser does not support the video tag.
</video>
""")

### <font color='#4285f4'>License</font>

```
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
```

### <font color='#4285f4'>Pip installs</font>

In [None]:
# PIP Installs (if necessary)
import sys

# !{sys.executable} -m pip install REPLACE-ME

### <font color='#4285f4'>Initialize</font>

In [None]:
from PIL import Image
from IPython.display import HTML
import IPython.display
import google.auth
import requests
import json
import uuid
import base64
import os
import cv2
import random
import time
import datetime
import base64
import random

import logging
from tenacity import retry, wait_exponential, stop_after_attempt, before_sleep_log, retry_if_exception

In [None]:
# Set these (run this cell to verify the output)

bigquery_location = "${bigquery_non_multi_region}"
dataplex_region = "${dataplex_region}"
dataform_region = "${dataform_region}"
repository_name = "agentic-beans-repo"
workspace_name_original = "telemetry-coffee-machine-original"
workspace_name_auto = "telemetry-coffee-machine-auto"
location = "${location}" # for Gemini
gcp_account_name = "${gcp_account_name}"

logger = logging.getLogger()

# Get the current date and time
now = datetime.datetime.now()

# Format the date and time as desired
formatted_date = now.strftime("%Y-%m-%d-%H-%M")

# Get some values using gcloud
project_id = os.environ["GOOGLE_CLOUD_PROJECT"]
user = !(gcloud auth list --filter=status:ACTIVE --format="value(account)")

if len(user) != 1:
  raise RuntimeError(f"user is not set: {user}")
user = user[0]

print(f"project_id = {project_id}")
print(f"user = {user}")

### <font color='#4285f4'>Helper Methods</font>

#### rest_api_helper
Calls the Google Cloud REST API using the current users credentials.

In [None]:
def rest_api_helper(url: str, http_verb: str, request_body: str) -> str:
  """Calls the Google Cloud REST API passing in the current users credentials"""

  import google.auth.transport.requests
  import requests
  import google.auth
  import json

  # Get an access token based upon the current user
  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request()
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
    "Content-Type" : "application/json",
    "Authorization" : "Bearer " + access_token
  }

  if http_verb == "GET":
    response = requests.get(url, headers=headers)
  elif http_verb == "POST":
    response = requests.post(url, json=request_body, headers=headers)
  elif http_verb == "PUT":
    response = requests.put(url, json=request_body, headers=headers)
  elif http_verb == "PATCH":
    response = requests.patch(url, json=request_body, headers=headers)
  elif http_verb == "DELETE":
    response = requests.delete(url, headers=headers)
  else:
    raise RuntimeError(f"Unknown HTTP verb: {http_verb}")

  if response.status_code == 200:
    return json.loads(response.content)
    #image_data = json.loads(response.content)["predictions"][0]["bytesBase64Encoded"]
  else:
    error = f"Error rest_api_helper -> ' Status: '{response.status_code}' Text: '{response.text}'"
    raise RuntimeError(error)

#### RetryCondition (for retrying LLM calls)

In [None]:
def RetryCondition(error):
  error_string = str(error)
  print(error_string)

  retry_errors = [
      "RESOURCE_EXHAUSTED",
      "No content in candidate",
      # Add more error messages here as needed
  ]

  for retry_error in retry_errors:
    if retry_error in error_string:
      print("Retrying...")
      return True

  return False

#### Gemini LLM

In [None]:
@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def GeminiLLM(prompt, model = "gemini-2.0-flash", response_schema = None,
                 temperature = 1, topP = 1, topK = 32):

  # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference#supported_models

  llm_response = None
  if temperature < 0:
    temperature = 0

  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request() # required to acess access token
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
      "Content-Type" : "application/json",
      "Authorization" : "Bearer " + access_token
  }

  # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference
  url = f"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}/locations/{location}/publishers/google/models/{model}:generateContent"

  generation_config = {
    "temperature": temperature,
    "topP": topP,
    "maxOutputTokens": 8192,
    "candidateCount": 1,
    "responseMimeType": "application/json",
  }

  # Add inthe response schema for when it is provided
  if response_schema is not None:
    generation_config["responseSchema"] = response_schema

  if model == "gemini-2.0-flash":
    generation_config["topK"] = topK

  payload = {
    "contents": {
      "role": "user",
      "parts": {
          "text": prompt
      },
    },
    "generation_config": {
      **generation_config
    },
    "safety_settings": {
      "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
      "threshold": "BLOCK_LOW_AND_ABOVE"
    }
  }

  response = requests.post(url, json=payload, headers=headers)

  if response.status_code == 200:
    try:
      json_response = json.loads(response.content)
    except Exception as error:
      raise RuntimeError(f"An error occurred parsing the JSON: {error}")

    if "candidates" in json_response:
      candidates = json_response["candidates"]
      if len(candidates) > 0:
        candidate = candidates[0]
        if "content" in candidate:
          content = candidate["content"]
          if "parts" in content:
            parts = content["parts"]
            if len(parts):
              part = parts[0]
              if "text" in part:
                text = part["text"]
                llm_response = text
              else:
                raise RuntimeError("No text in part: {response.content}")
            else:
              raise RuntimeError("No parts in content: {response.content}")
          else:
            raise RuntimeError("No parts in content: {response.content}")
        else:
          raise RuntimeError("No content in candidate: {response.content}")
      else:
        raise RuntimeError("No candidates: {response.content}")
    else:
      raise RuntimeError("No candidates: {response.content}")

    # Remove some typically response characters (if asking for a JSON reply)
    llm_response = llm_response.replace("```json","")
    llm_response = llm_response.replace("```","")
    llm_response = llm_response.replace("\n","")

    return llm_response

  else:
    raise RuntimeError(f"Error with prompt:'{prompt}'  Status:'{response.status_code}' Text:'{response.text}'")

In [None]:
@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))
def GeminiLLM_VerifyImage(prompt, imageBase64, model = "gemini-2.0-flash", response_schema = None,
                 temperature = 1, topP = 1, topK = 32):

  # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference#supported_models

  llm_response = None
  if temperature < 0:
    temperature = 0

  creds, project = google.auth.default()
  auth_req = google.auth.transport.requests.Request() # required to acess access token
  creds.refresh(auth_req)
  access_token=creds.token

  headers = {
      "Content-Type" : "application/json",
      "Authorization" : "Bearer " + access_token
  }

  # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference
  url = f"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}/locations/{location}/publishers/google/models/{model}:generateContent"

  generation_config = {
    "temperature": temperature,
    "topP": topP,
    "maxOutputTokens": 8192,
    "candidateCount": 1,
    "responseMimeType": "application/json",
  }

  # Add inthe response schema for when it is provided
  if response_schema is not None:
    generation_config["responseSchema"] = response_schema

  if model == "gemini-2.0-flash":
    generation_config["topK"] = topK

  payload = {
    "contents": {
      "role": "user",
      "parts": [
          { "text": prompt },
          { "inlineData": {  "mimeType": "image/png", "data": f"{imageBase64}" } }
        ]
    },
    "generation_config": {
      **generation_config
    },
    "safety_settings": {
      "category": "HARM_CATEGORY_SEXUALLY_EXPLICIT",
      "threshold": "BLOCK_LOW_AND_ABOVE"
    }
  }

  response = requests.post(url, json=payload, headers=headers)

  if response.status_code == 200:
    try:
      json_response = json.loads(response.content)
    except Exception as error:
      raise RuntimeError(f"An error occurred parsing the JSON: {error}")

    if "candidates" in json_response:
      candidates = json_response["candidates"]
      if len(candidates) > 0:
        candidate = candidates[0]
        if "content" in candidate:
          content = candidate["content"]
          if "parts" in content:
            parts = content["parts"]
            if len(parts):
              part = parts[0]
              if "text" in part:
                text = part["text"]
                llm_response = text
              else:
                raise RuntimeError("No text in part: {response.content}")
            else:
              raise RuntimeError("No parts in content: {response.content}")
          else:
            raise RuntimeError("No parts in content: {response.content}")
        else:
          raise RuntimeError("No content in candidate: {response.content}")
      else:
        raise RuntimeError("No candidates: {response.content}")
    else:
      raise RuntimeError("No candidates: {response.content}")

    # Remove some typically response characters (if asking for a JSON reply)
    llm_response = llm_response.replace("```json","")
    llm_response = llm_response.replace("```","")
    llm_response = llm_response.replace("\n","")

    return llm_response

  else:
    raise RuntimeError(f"Error with prompt:'{prompt}'  Status:'{response.status_code}' Text:'{response.text}'")

#### Helper Functions

In [None]:
def RunQuery(sql):
  import time
  from google.cloud import bigquery
  client = bigquery.Client()

  if (sql.startswith("SELECT") or sql.startswith("WITH")):
      df_result = client.query(sql).to_dataframe()
      return df_result
  else:
    job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)
    query_job = client.query(sql, job_config=job_config)

    # Check on the progress by getting the job's updated state.
    query_job = client.get_job(
        query_job.job_id, location=query_job.location
    )
    print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    while query_job.state != "DONE":
      time.sleep(2)
      query_job = client.get_job(
          query_job.job_id, location=query_job.location
          )
      print("Job {} is currently in state {} with error result of {}".format(query_job.job_id, query_job.state, query_job.error_result))

    if query_job.error_result == None:
      return True
    else:
      raise Exception(query_job.error_result)

### <font color='#4285f4'>Data Quality Scan - Helper Methods</font>

In [None]:
def getStateDataQualityScan(project_id, dataplex_region, data_quality_scan_job_name):
  """Runs the data quality scan job and monitors until it completes"""

  # Gets the "state" of a scan
  url = f"https://dataplex.googleapis.com/v1/{data_quality_scan_job_name}"
  json_result = rest_api_helper(url, "GET", None)
  return json_result["state"]
  #== "STATE_UNSPECIFIED" or json_result["state"] == "RUNNING" or json_result["state"] == "PENDING":


In [None]:
def startDataQualityScan(project_id, dataplex_region, data_quality_scan_name):
  """Runs the data profile scan job and monitors until it completes"""

  # Start a scan
  # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/run
  print("Running Data Quality Scan")

  url = f"https://dataplex.googleapis.com/v1/projects/{project_id}/locations/{dataplex_region}/dataScans/{data_quality_scan_name}:run"


  request_body = { }

  json_result = rest_api_helper(url, "POST", request_body)
  job_name = json_result["job"]["name"]
  job_state = json_result["job"]["state"]
  print(f"Document Data Scan Run created: {job_name} - State: {job_state}")

  return job_name


In [None]:
def updateBigQueryTableDataplexLabels(project_id, dataplex_region, dataplex_asset_type, dataplex_asset_scan_name, bigquery_dataset_name, bigquery_table_name):
  """Sets the labels on the BigQuery table so users can see the data profile in the Console."""

  # Patch BigQuery
  # https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.dataScans/create
  print("Patching BigQuery Dataplex Labels")

  url = f"https://bigquery.googleapis.com/bigquery/v2/projects/{project_id}/datasets/{bigquery_dataset_name}/tables/{bigquery_table_name}"

  request_body = {}
  if dataplex_asset_type == "DATA-PROFILE-SCAN":
    request_body = {
        "labels" : {
            "dataplex-dp-published-project"  : project_id,
            "dataplex-dp-published-location" : dataplex_region,
            "dataplex-dp-published-scan"     : dataplex_asset_scan_name,
            }
        }
  elif dataplex_asset_type == "DATA-INSIGHTS-SCAN":
     request_body = {
        "labels" : {
            "dataplex-data-documentation-project"  : project_id,
            "dataplex-data-documentation-location" : dataplex_region,
            "dataplex-data-documentation-scan"     : dataplex_asset_scan_name,
            }
        }
  elif dataplex_asset_type == "DATA-QUALITY-SCAN":
     request_body = {
        "labels" : {

            "dataplex-dq-published-project"  : project_id,
            "dataplex-dq-published-location" : dataplex_region,
            "dataplex-dq-published-scan"     : dataplex_asset_scan_name,
            }
        }
  else:
    raise Exception(f"Unknown dataplex_asset_type of {dataplex_asset_type}")

  json_result = rest_api_helper(url, "PATCH", request_body)
  print(json_result)

### <font color='#4285f4'>Dataform - Helper Methods</font>

In [None]:
def compile_and_run_dataform_workflow(repository_name: str, workspace_name: str) -> dict:
    """
    Compiles a Dataform repository from a workspace and then runs the resulting workflow.

    This function performs two sequential operations:
    1. It creates a compilation result from the specified workspace.
    2. It starts a workflow invocation using the successful compilation result.

    Args:
        repository_name (str): The ID of the Dataform repository to compile and run.
        workspace_name (str): The ID of the workspace containing the code to be compiled.

    Returns:
        dict: A dictionary containing the status and the final response from the workflow invocation API call.
        {
            "status": "success" or "failed",
            "tool_name": "compile_and_run_dataform_workflow",
            "query": None,
            "messages": ["List of messages during processing"],
            "results": { ... API response from the workflow invocation ... }
        }
    """
    #project_id = os.getenv("AGENT_ENV_PROJECT_ID")
    #dataform_region = os.getenv("AGENT_ENV_DATAFORM_REGION", "us-central1")
    messages = []

    try:
        # --- Step 1: Compile the repository from the workspace ---
        messages.append(f"Step 1: Compiling repository '{repository_name}' from workspace '{workspace_name}'.")

        compile_url = f"https://dataform.googleapis.com/v1/projects/{project_id}/locations/{dataform_region}/repositories/{repository_name}/compilationResults"

        workspace_full_path = f"projects/{project_id}/locations/{dataform_region}/repositories/{repository_name}/workspaces/{workspace_name}"

        compile_request_body = {
            "workspace": workspace_full_path
        }

        compile_result = rest_api_helper(compile_url, "POST", compile_request_body)
        compilation_result_name = compile_result.get("name")

        # You might want to check the status of the compilation and only start it if it is "success"!

        if not compilation_result_name:
            raise Exception("Failed to get compilation result name from the compilation API response.")

        messages.append(f"Successfully compiled. Compilation result name: {compilation_result_name}")

        # --- Step 2: Run the workflow using the compilation result ---
        messages.append(f"Step 2: Starting workflow execution for compilation '{compilation_result_name}'.")

        invoke_url = f"https://dataform.googleapis.com/v1/projects/{project_id}/locations/{dataform_region}/repositories/{repository_name}/workflowInvocations"

        invoke_request_body = {
            "compilationResult": compilation_result_name,
              "invocationConfig": {
                "serviceAccount": f"bigquery-pipeline-sa@{project_id}.iam.gserviceaccount.com"
              }
        }

        invoke_result = rest_api_helper(invoke_url, "POST", invoke_request_body)

        messages.append("Successfully initiated workflow invocation.")
        #(f"compile_and_run_dataform_workflow invoke_result: {invoke_result}")

        return {
            "status": "success",
            "tool_name": "compile_and_run_dataform_workflow",
            "query": None,
            "messages": messages,
            "workflow_invocation_id": invoke_result["name"].rsplit('/', 1)[-1],
            "results": invoke_result
        }

    except Exception as e:
        error_message = f"An error occurred during the compile and run process: {e}"
        messages.append(error_message)
        logger.debug(error_message)
        return {
            "status": "failed",
            "tool_name": "compile_and_run_dataform_workflow",
            "query": None,
            "messages": messages,
            "results": None
        }

In [None]:
def get_worflow_invocation_status(repository_name: str, workflow_invocation_id: str) -> dict:
    """
    Checks on the execution status of a workflow.

    Args:
        repository_name (str): The ID of the Dataform repository to compile and run.
        workflow_invocation_id (str): The ID (guid) of workflow invocations id executing a pipeline.  It will return
            a workflow_invocation_id value which can be used to check on the execution status.

    Returns:
        dict: A dictionary containing the status and a boolean result.
        {
            "status": "success" or "failed",
            "tool_name": "get_worflow_invocation_status",
            "query": None,
            "messages": ["List of messages during processing"],
            "results": {
                "name": "projects/{project-id}/locations/us-central1/repositories/adam-agent-10-workflow/workflowInvocations/1752598992-06e003bc-aad3-477f-b761-02629a4d554f",
                "compilationResult": "projects/{project-number}/locations/us-central1/repositories/adam-agent-10-workflow/compilationResults/d4a2fa7c-c546-428a-814a-b8eece65a559",
                "state": "SUCCEEDED",
                "invocationTiming": {
                    "startTime": "2025-07-15T17:03:12.313196Z",
                    "endTime": "2025-07-15T17:03:17.650637343Z"
                },
                "resolvedCompilationResult": "projects/{project-number}/locations/us-central1/repositories/adam-agent-10-workflow/compilationResults/d4a2fa7c-c546-428a-814a-b8eece65a559",
                "internalMetadata": "{\"db_metadata_insert_time\":\"2025-07-15T17:03:12.321373Z\",\"quota_server_enabled\":true,\"service_account\":\"service-{project-number}@gcp-sa-dataform.iam.gserviceaccount.com\"}"
                }
        }
    """
    #project_id = os.getenv("AGENT_ENV_PROJECT_ID")
    #dataform_region = os.getenv("AGENT_ENV_DATAFORM_REGION")
    messages = []

    # The URL to list all repositories in the specified project and region. [1]
    url = f"https://dataform.googleapis.com/v1/projects/{project_id}/locations/{dataform_region}/repositories/{repository_name}/workflowInvocations/{workflow_invocation_id}"
    logger.debug(url)

    try:
        messages.append(f"Checkin on workflow invoation status with workflow_invocation_id: '{workflow_invocation_id}'.")
        # Call the REST API to get the list of all existing repositories. [1]
        json_result = rest_api_helper(url, "GET", None)
        logger.debug(json_result)

        return {
            "status": "success",
            "tool_name": "get_worflow_invocation_status",
            "query": None,
            "messages": messages,
            "results": json_result
        }

    except Exception as e:
        logger.debug(e)
        # Check if the string representation of the error contains '404'
        if '404' in str(e):
            messages.append(f"Workflow Invocation not found for '{workflow_invocation_id}'. This is an expected outcome.")
            return {
                "status": "success",
                "tool_name": "get_worflow_invocation_status",
                "query": None,
                "messages": messages,
                "results": { "state" : "NOT_FOUND" }
            }
        else:
            # Handle all other errors as failures
            error_message = f"An unexpected error occurred while checking for existence of file: {e}"
            messages.append(error_message)
            return {
                "status": "failed",
                "tool_name": "get_worflow_invocation_status",
                "query": None,
                "messages": messages,
                "results": None
            }



### <font color='#4285f4'>MAIN CODE - Using Data Quality to repair a Broken Pipeline</font>

#### <font color='Green'>Show the Pipeline **Working**<font>

#### Clean and populate the staging load table

In [None]:
%%bigquery

TRUNCATE TABLE `agentic_beans_raw_staging_load.telemetry_coffee_machine`;

In [None]:
%%bigquery

INSERT INTO `agentic_beans_raw_staging_load.telemetry_coffee_machine`
(telemetry_coffee_machine_id, telemetry_load_id, machine_id, truck_id, telemetry_timestamp, boiler_temperature_celsius, brew_pressure_bar, water_flow_rate_ml_per_sec, grinder_motor_rpm, grinder_motor_torque_nm, water_reservoir_level_percent, bean_hopper_level_grams, total_brew_cycles_counter, last_error_code, last_error_description, power_consumption_watts, cleaning_cycle_status)
SELECT CAST(GENERATE_UUID() AS STRING) AS telemetry_coffee_machine_id,
       FORMAT_TIMESTAMP('%Y%m%d%H%M%S', CURRENT_TIMESTAMP()) || '_machine_batch' telemetry_load_id,
       CAST(machine_id AS STRING) AS machine_id,
       CAST(truck_id AS STRING) AS truck_id,
       FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S %Z', CURRENT_TIMESTAMP()) AS telemetry_timestamp,
       CAST(boiler_temperature_celsius AS STRING) AS boiler_temperature_celsius,
       CAST(brew_pressure_bar AS STRING) AS brew_pressure_bar,
       CAST(water_flow_rate_ml_per_sec AS STRING) AS water_flow_rate_ml_per_sec,
       CAST(grinder_motor_rpm AS STRING) AS grinder_motor_rpm,
       CAST(grinder_motor_torque_nm AS STRING) AS grinder_motor_torque_nm,
       CAST(water_reservoir_level_percent AS STRING) AS water_reservoir_level_percent,
       CAST(bean_hopper_level_grams AS STRING) AS bean_hopper_level_grams,
       CAST(total_brew_cycles_counter AS STRING) AS total_brew_cycles_counter,
       CAST(last_error_code AS STRING) AS last_error_code,
       CAST(last_error_description AS STRING) AS last_error_description,
       CAST(power_consumption_watts AS STRING) AS power_consumption_watts,
       CAST(cleaning_cycle_status AS STRING) AS cleaning_cycle_status

 FROM `agentic_beans_raw.telemetry_coffee_machine` AS telemetry_coffee_machine
WHERE telemetry_timestamp = (SELECT max(telemetry_timestamp) FROM `agentic_beans_raw.telemetry_coffee_machine`);

#### Run the Data Quality Job

In [None]:
data_quality_scan_name = "telemetry-coffee-machine-staging-load-dq"
data_quality_scan_job_name = startDataQualityScan(project_id, dataplex_region, data_quality_scan_name)
print(f"Data Quality Scan Started: {data_quality_scan_job_name}")
time.sleep(10)
data_quality_scan_state = getStateDataQualityScan(project_id, dataplex_region, data_quality_scan_job_name)

while data_quality_scan_state == "PENDING" or \
      data_quality_scan_state == "STATE_UNSPECIFIED" or \
      data_quality_scan_state == "RUNNING" or \
      data_quality_scan_state == "CANCELING":

  print(f"Data Quality Scan State: {data_quality_scan_state}")
  time.sleep(10)
  data_quality_scan_state = getStateDataQualityScan(project_id, dataplex_region, data_quality_scan_job_name)

if data_quality_scan_state == "SUCCEEDED":
  print(f"Data Quality Scan Completed Successfully")
else:
  print(f"Data Quality Scan Failed with status of: {data_quality_scan_state}")

#### Check the Data Quality Job Results

In [None]:
%%bigquery

SELECT data_quality_job_id
  FROM `agentic_beans_raw_staging_load.telemetry_coffee_machine_data_quality`
 WHERE data_quality_scan.data_scan_id = 'telemetry-coffee-machine-staging-load-dq'
   AND job_start_time = (SELECT MAX(job_start_time)
                           FROM `agentic_beans_raw_staging_load.telemetry_coffee_machine_data_quality`
                          WHERE data_quality_scan.data_scan_id = 'telemetry-coffee-machine-staging-load-dq')
 LIMIT 1;

In [None]:
%%bigquery

SELECT COUNT(*) AS FailedCount
FROM `agentic_beans_raw_staging_load.telemetry_coffee_machine_data_quality`
WHERE data_quality_job_id       = (SELECT data_quality_job_id
                                     FROM `agentic_beans_raw_staging_load.telemetry_coffee_machine_data_quality`
                                    WHERE data_quality_scan.data_scan_id = 'ttelemetry-coffee-machine-staging-load-dq'
                                      AND job_start_time = (SELECT MAX(job_start_time)
                                                              FROM `agentic_beans_raw_staging_load.telemetry_coffee_machine_data_quality`
                                                             WHERE data_quality_scan.data_scan_id = 'telemetry-coffee-machine-staging-load-dq')
                                    LIMIT 1)
AND rule_passed = FALSE;

#### Run the Dataform Pipeline

In [None]:
workflow_invocation_original = compile_and_run_dataform_workflow(repository_name, workspace_name_original)

In [None]:
workflow_invocation_original

In [None]:
workflow_invocation_original_id=  workflow_invocation_original["results"]["name"].split('/')[-1]
time.sleep(30)  # should only take about 20 seconds to complete
get_worflow_invocation_status(repository_name, workflow_invocation_original_id)["results"]

#### <font color='red'>Show the Pipeline **BROKEN**<font>

#### Clean and populate the staging load table

In [None]:
%%bigquery

TRUNCATE TABLE `agentic_beans_raw_staging_load.telemetry_coffee_machine`;

Break the pipeline
- Add "° Celsius" to some of the temperature values (boiler_temperature_celsius)
- Add a "%" sign to some of the water reserve values (water_reservoir_level_percent)
- Skew some of the water reserve values by dividing by 100.  So, instead of 34.82 we would have .3482 (water_reservoir_level_percent)


In [None]:
%%bigquery

INSERT INTO `agentic_beans_raw_staging_load.telemetry_coffee_machine`
(telemetry_coffee_machine_id, telemetry_load_id, machine_id, truck_id, telemetry_timestamp, boiler_temperature_celsius, brew_pressure_bar, water_flow_rate_ml_per_sec, grinder_motor_rpm, grinder_motor_torque_nm, water_reservoir_level_percent, bean_hopper_level_grams, total_brew_cycles_counter, last_error_code, last_error_description, power_consumption_watts, cleaning_cycle_status)
SELECT CAST(GENERATE_UUID() AS STRING) AS telemetry_coffee_machine_id,
       FORMAT_TIMESTAMP('%Y%m%d%H%M%S', CURRENT_TIMESTAMP()) || '_machine_batch' telemetry_load_id,
       CAST(machine_id AS STRING) AS machine_id,
       CAST(truck_id AS STRING) AS truck_id,
       FORMAT_TIMESTAMP('%Y-%m-%d %H:%M:%S %Z', CURRENT_TIMESTAMP()) AS telemetry_timestamp,

       -- Break the coffee machine 'La Marzocco KB90 4-Group' as a firmware update
       -- The machine will now say "° Celsius"
       CASE WHEN truck_id IN (1,6,11,16) -- Coffee machine 'La Marzocco KB90 4-Group' as a firmware update
            THEN CAST(boiler_temperature_celsius AS STRING) || '° Celsius'
            ELSE CAST(boiler_temperature_celsius AS STRING)
        END AS boiler_temperature_celsius,

       CAST(brew_pressure_bar AS STRING) AS brew_pressure_bar,
       CAST(water_flow_rate_ml_per_sec AS STRING) AS water_flow_rate_ml_per_sec,
       CAST(grinder_motor_rpm AS STRING) AS grinder_motor_rpm,
       CAST(grinder_motor_torque_nm AS STRING) AS grinder_motor_torque_nm,

       -- Break the coffee machine 'La Marzocco KB90 4-Group' and 'Thermoplan Black&White3 CTS' as a firmware update
       -- The La Marzocco KB90 4-Group will now send the percents as decimals (.4867 instead of 48.67)
       -- The Thermoplan Black&White3 CTS will now send the percents with percent signs
       CASE WHEN truck_id IN (1,6,11,16) -- Coffee machine 'La Marzocco KB90 4-Group' as a firmware update
            THEN CAST(water_reservoir_level_percent / 100 AS STRING)
            WHEN truck_id IN (5,10,15,20) -- Coffee machine 'Thermoplan Black&White3 CTS' as a firmware update
            THEN CAST(water_reservoir_level_percent AS STRING) || '%'
            ELSE CAST(water_reservoir_level_percent AS STRING)
        END AS water_reservoir_level_percent,

       CAST(bean_hopper_level_grams AS STRING) AS bean_hopper_level_grams,
       CAST(total_brew_cycles_counter AS STRING) AS total_brew_cycles_counter,
       CAST(last_error_code AS STRING) AS last_error_code,
       CAST(last_error_description AS STRING) AS last_error_description,
       CAST(power_consumption_watts AS STRING) AS power_consumption_watts,
       CAST(cleaning_cycle_status AS STRING) AS cleaning_cycle_status

 FROM `agentic_beans_raw.telemetry_coffee_machine` AS telemetry_coffee_machine

WHERE telemetry_timestamp = (SELECT max(telemetry_timestamp) FROM `agentic_beans_raw.telemetry_coffee_machine`);

#### Run the Data Quality Job

In [None]:
data_quality_scan_name = "telemetry-coffee-machine-staging-load-dq"
data_quality_scan_job_name = startDataQualityScan(project_id, dataplex_region, data_quality_scan_name)
print(f"Data Quality Scan Started: {data_quality_scan_job_name}")
time.sleep(10)
data_quality_scan_state = getStateDataQualityScan(project_id, dataplex_region, data_quality_scan_job_name)

while data_quality_scan_state == "PENDING" or \
      data_quality_scan_state == "STATE_UNSPECIFIED" or \
      data_quality_scan_state == "RUNNING" or \
      data_quality_scan_state == "CANCELING":

  print(f"Data Quality Scan State: {data_quality_scan_state}")
  time.sleep(10)
  data_quality_scan_state = getStateDataQualityScan(project_id, dataplex_region, data_quality_scan_job_name)

if data_quality_scan_state == "SUCCEEDED":
  print(f"Data Quality Scan Completed Successfully")
else:
  print(f"Data Quality Scan Failed with status of: {data_quality_scan_state}")

#### Check the Data Quality Job Results

In [None]:
%%bigquery

SELECT data_quality_job_id
  FROM `agentic_beans_raw_staging_load.telemetry_coffee_machine_data_quality`
 WHERE data_quality_scan.data_scan_id = 'telemetry-coffee-machine-staging-load-dq'
   AND job_start_time = (SELECT MAX(job_start_time)
                           FROM `agentic_beans_raw_staging_load.telemetry_coffee_machine_data_quality`
                          WHERE data_quality_scan.data_scan_id = 'telemetry-coffee-machine-staging-load-dq')
LIMIT 1;

In [None]:
%%bigquery

SELECT COUNT(*) AS FailedCount
FROM `agentic_beans_raw_staging_load.telemetry_coffee_machine_data_quality`
WHERE data_quality_job_id = (SELECT data_quality_job_id
                               FROM `agentic_beans_raw_staging_load.telemetry_coffee_machine_data_quality`
                              WHERE data_quality_scan. data_scan_id = 'telemetry-coffee-machine-staging-load-dq'
                                AND job_start_time = (SELECT MAX(job_start_time)
                                                        FROM `agentic_beans_raw_staging_load.telemetry_coffee_machine_data_quality`
                                                       WHERE data_quality_scan.data_scan_id = 'telemetry-coffee-machine-staging-load-dq')
                              LIMIT 1)
AND rule_passed = FALSE;

In [None]:
%%bigquery

-- Show the rows that failed
SELECT data_source.dataset_id, data_source.table_id, rule_column, rule_parameters, rule_failed_records_query
FROM `agentic_beans_raw_staging_load.telemetry_coffee_machine_data_quality`
WHERE data_quality_job_id = (SELECT data_quality_job_id
                               FROM `agentic_beans_raw_staging_load.telemetry_coffee_machine_data_quality`
                              WHERE data_quality_scan.data_scan_id = 'telemetry-coffee-machine-staging-load-dq'
                                AND job_start_time = (SELECT MAX(job_start_time)
                                                        FROM `agentic_beans_raw_staging_load.telemetry_coffee_machine_data_quality`
                                                       WHERE data_quality_scan.data_scan_id = 'telemetry-coffee-machine-staging-load-dq')
                              LIMIT 1)
AND rule_passed = FALSE;

In [None]:
%%bigquery failed_row_df

-- Show the rows that failed
SELECT data_source.dataset_id, data_source.table_id, rule_column, rule_parameters, rule_failed_records_query
FROM `agentic_beans_raw_staging_load.telemetry_coffee_machine_data_quality`
WHERE data_quality_job_id = (SELECT data_quality_job_id
                               FROM `agentic_beans_raw_staging_load.telemetry_coffee_machine_data_quality`
                              WHERE data_quality_scan.data_scan_id = 'telemetry-coffee-machine-staging-load-dq'
                                AND job_start_time = (SELECT MAX(job_start_time)
                                                        FROM `agentic_beans_raw_staging_load.telemetry_coffee_machine_data_quality`
                                                       WHERE data_quality_scan.data_scan_id = 'telemetry-coffee-machine-staging-load-dq')
                              LIMIT 1)
AND rule_passed = FALSE;

#### Run the Dataform Pipeline: It should fail

In [None]:
workflow_invocation_original = compile_and_run_dataform_workflow(repository_name, workspace_name_original)

In [None]:
workflow_invocation_original

In [None]:
workflow_invocation_original_id=  workflow_invocation_original["results"]["name"].split('/')[-1]
time.sleep(30)  # should only take about 20 seconds to complete
get_worflow_invocation_status(repository_name, workflow_invocation_original_id)["results"]

#### Generate the prompt to send to the Data Engineering Agent

In [None]:
def trim_trailing_zeros_string(s_number):
    """
    Trims trailing zeros from a string representation of a number.
    Assumes the input is a string.

    BigQuery can return 10.030000000 for 10.03 and we do not need the trailing zeros which
    will confuse the RegEx.
    """
    if not isinstance(s_number, str):
        s_number = str(s_number) # Convert to string if not already

    s_number = s_number.rstrip('0')
    if s_number.endswith('.'):
        s_number = s_number.rstrip('.')
    return s_number

In [None]:
data_engineering_agent_meta_prompt = """I have the below column(s) that have failed a data quality check.
For each column:
- Generate a short and direct prompt for a data engineering agent to correct the ETL process.
  - The prompt should be specific, starting with "For {column-name}, please adjust the column by doing {fill me in}".
  - We cannot change the regular expression; we need to transform the data to meet the regular expression.
  - If the data cannot be corrected, please let me know.

"""

column_prompts = ""

for index, row in failed_row_df.iterrows():
  dataset_id = row["dataset_id"]
  table_id = row["table_id"]
  rule_column = row["rule_column"]
  rule_parameters = row["rule_parameters"]
  rule_failed_records_query = row["rule_failed_records_query"]

  print(f"dataset_id: {dataset_id}")
  print(f"dataset_id: {table_id}")
  print(f"dataset_id: {rule_column}")
  print(f"dataset_id: {rule_parameters}")
  print(f"dataset_id: {rule_failed_records_query}")
  print()

  # Run the rule_failed_records_query (but replace SELECT * with field name)
  sql = rule_failed_records_query.replace("SELECT *",f"SELECT {rule_column}")
  failed_row_data_df = RunQuery(sql)

  bad_values_xml = f"<invalid-values-for-{rule_column}>\n"
  for index, row in failed_row_data_df.iterrows():
    value = trim_trailing_zeros_string(row[f"{rule_column}"])
    bad_values_xml += f"<value>{value}</value>\n"
  bad_values_xml += f"</invalid-values-for-{rule_column}>\n"


  # Run the rule_failed_records_query for SUCCESSFUL rows from the "raw" dataset which contains valid values
  # Get the data from the most recent load.
  sql = f"""SELECT {rule_column}
             FROM `agentic_beans_raw.telemetry_coffee_machine`
           WHERE telemetry_load_id = (SELECT telemetry_load_id
                                         FROM `agentic_beans_raw.telemetry_coffee_machine`
                                       WHERE telemetry_timestamp = (SELECT MAX(telemetry_timestamp)
                                                                     FROM `agentic_beans_raw.telemetry_coffee_machine`)
                                       LIMIT 1)
           LIMIT 10;"""
  failed_row_data_df = RunQuery(sql)

  value_values_xml = f"<valid-values-for-{rule_column}>\n"
  for index, row in failed_row_data_df.iterrows():
    value = trim_trailing_zeros_string(row[f"{rule_column}"])
    value_values_xml += f"<value>{value}</value>\n"
  value_values_xml += f"</valid-values-for-{rule_column}>\n"


  column_prompts += f"""<column-{rule_column}>
Column Name: {rule_column}
RegEx: {rule_parameters}

{value_values_xml}

{bad_values_xml}

</column-{rule_column}>
"""

data_engineering_agent_meta_prompt += column_prompts

print(f"data_engineering_agent_meta_prompt: {data_engineering_agent_meta_prompt}")

In [None]:
#### Run Gemini
response_schema = {
  "type": "object",
  "required": [
    "generated_prompts"
  ],
  "properties": {
    "generated_prompts": {
      "type": "array",
      "items": {
        "type": "object",
        "required": [
          "column_prompt"
        ],
        "properties": {
          "column_prompt": {
            "type": "string"
          }
        }
      }
    }
  }
}

generated_prompt_response = GeminiLLM(data_engineering_agent_meta_prompt, response_schema=response_schema)

generated_prompt_dict = json.loads(generated_prompt_response)

data_engineering_agent_prompt = ""
for item in generated_prompt_dict["generated_prompts"]:
  data_engineering_agent_prompt += item["column_prompt"] + "\n"

# Sample Output:
# For boiler_temperature_celsius, please adjust the column by doing removing the '° Celsius' suffix from the values.
# For water_reservoir_level_percent, please adjust the column by doing multiply the values by 100 if the value is between 0 and 1, and remove the '%' suffix from the values.
print(f"{data_engineering_agent_prompt}")

#### Run the Data Engineering Agent

In [None]:
data_engineering_agent_prompt

In [None]:
# NOTE: This requires ALLOWLISTING
# Please DO NOT take a hard dependency on this API, it will be changing in the future and is just for testing.
# Contact us for more information.

url = f"https://geminidataanalytics.googleapis.com/v1alpha1/projects/{project_id}/locations/global:run"

request_body = {
  "parent": f"projects/{project_id}/locations/global",
  "pipeline_id": f"projects/{project_id}/locations/{dataform_region}/repositories/{repository_name}/workspaces/{workspace_name_auto}",
  "messages": [
    {
      "user_message": {
        "text": data_engineering_agent_prompt
      }
    }
  ]
}

print(f"request_body: {request_body}")

data_eng_agent_response = ""

try:
  data_eng_agent_response = rest_api_helper(url, "POST", request_body)
except Exception as e:
  data_eng_agent_response = f"An error occurred while performing the data engineering task: {e}"

print(f"data_eng_agent_response: {data_eng_agent_response}")

In [None]:
def llm_as_a_judge(original_prompt: str, response_from_processing: str) -> bool:
    """
    Determines if the prompt has run correctly
    """
    response_schema = {
        "type": "object",
        "properties": {"processing_status": {"type": "boolean"}},
        "required": ["processing_status"]
    }

    prompt = f"""Respond back with True if you think the below process request executed successfully or False if it looks like it failed.

    Processing Request: {original_prompt}

    Response from Processing: {response_from_processing}
    """
    gemini_response = GeminiLLM(prompt, response_schema=response_schema)
    gemini_response_json = json.loads(gemini_response)
    return bool(gemini_response_json["processing_status"])

In [None]:
# LLM as a Judge: Determine if our changes were successful
llm_as_a_judge_result = llm_as_a_judge(data_engineering_agent_prompt, data_eng_agent_response)

if llm_as_a_judge_result == True:
  print(f"The Data Engineering Agent successfully completed the pipeline correction.")
else:
  print(f"It appears the data engineering agent did not complete our desired results.")

In [None]:
# Commit the changes to the Dataform repo

# NOTE: Once you commit, you will need to "rollback" or copy the code from the "Original" code.
#       You would copy the original code from "telemetry-coffee-machine-original": definitions/agentic_beans_raw/telemetry_coffee_machine.sqlx
#       to the "telemetry-coffee-machine-auto" and commit the changes

url = f"https://dataform.googleapis.com/v1/projects/{project_id}/locations/{dataform_region}/repositories/{repository_name}/workspaces/{workspace_name_auto}:commit"

request_body = {
    "author":{
        "name":"Admin User",
        "emailAddress":gcp_account_name
        },
    "commitMessage":"Data Eng Agent Automation"
    }

print(f"request_body: {request_body}")

dataform_commit_response = rest_api_helper(url, "POST", request_body)

print(f"dataform_commit_response: {dataform_commit_response}")

#### Run the UPDATED Dataform Pipeline

In [None]:
# This will run the "auto pipeline" which was updated by the data engineering agent

workflow_invocation_auto = compile_and_run_dataform_workflow(repository_name, workspace_name_auto)

In [None]:
workflow_invocation_auto

In [None]:
workflow_invocation_auto_id=  workflow_invocation_auto["results"]["name"].split('/')[-1]
time.sleep(30)  # should only take about 20 seconds to complete
get_worflow_invocation_status(repository_name, workflow_invocation_auto_id)["results"]