Target of this notebook is to use langchain e GCP llm to monitor the cost of BQ jobs. The idea is to use the information schema table to ask the LLM to give me a daily list of the most expensive jobs. Second point is to use the LLM to optmise the SQL code if possible.

first we install the required libraries

In [1]:
!pip install google-cloud-aiplatform google-cloud-bigquery langchain --upgrade --user

Collecting google-cloud-bigquery
  Downloading google_cloud_bigquery-3.22.0-py2.py3-none-any.whl.metadata (8.9 kB)
Downloading google_cloud_bigquery-3.22.0-py2.py3-none-any.whl (236 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m236.7/236.7 kB[0m [31m15.5 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: google-cloud-bigquery
Successfully installed google-cloud-bigquery-3.22.0


In [2]:
import google.cloud.bigquery as bq
import langchain
from google.cloud import aiplatform
from langchain.llms import VertexAI
from langchain.document_loaders import BigQueryLoader
from langchain.prompts import PromptTemplate
from langchain.schema import format_document

# Print LangChain and Vertex AI versions
print(f"LangChain version: {langchain.__version__}")
print(f"Vertex AI SDK version: {aiplatform.__version__}")

LangChain version: 0.1.19
Vertex AI SDK version: 1.51.0


we extract the infomation about the jobs from the INFORMATION_SCHEMA.JOBS table

First we define our foundation model 

In [3]:
llm = VertexAI(model_name="gemini-pro", temperature=0)

llm("What are the INFORMATION SCHEMAS and how should I use them?")

  warn_deprecated(
  warn_deprecated(


'## INFORMATION SCHEMAS in SQL\n\nThe INFORMATION SCHEMAS are a standardized way to access metadata about your database. They are available in most SQL databases, including MySQL, PostgreSQL, and SQL Server. \n\n### What information do they contain?\n\nINFORMATION SCHEMAS contain a wealth of information about your database, including:\n\n* **Tables and views:** You can find information about the columns, data types, and constraints of each table and view.\n* **Stored procedures and functions:** You can see the definition and parameters of stored procedures and functions.\n* **Triggers:** You can learn about the events that trigger each trigger and the actions'

Now let's define our query and laod the dataset with BQ loader

In [4]:
# Define our query
query = f"""
SELECT 
  creation_time,
  user_email,
  job_id,
  query,
  total_bytes_billed,
  6*total_bytes_billed/1e12 as total_cost_eur
FROM
  `jit-training-dma-devops.region-EU.INFORMATION_SCHEMA.JOBS`
--WHERE user_email not like '%7728271103%'
order by creation_time;
"""

# Load the data
loader = BigQueryLoader(
    query,metadata_columns=['creation_time','user_email'],page_content_columns=['creation_time','user_email','job_id','query','total_bytes_billed','total_cost_eur']
)
data = loader.load()

  warn_deprecated(


In [5]:
data[1]

Document(page_content='creation_time: 2024-04-08 23:27:34.719000+00:00\nuser_email: alessandro.provenza@jakala.com\njob_id: bquxjob_47d33dda_18ec009d77b\nquery: \r\n  CREATE VIEW dev_ale.first_view AS\r\n  SELECT * FROM `jit-training-dma-devops.test_listing_ale_sub.test_pubsub`\ntotal_bytes_billed: 0\ntotal_cost_eur: 0.0', metadata={'creation_time': datetime.datetime(2024, 4, 8, 23, 27, 34, 719000, tzinfo=datetime.timezone.utc), 'user_email': 'alessandro.provenza@jakala.com'})

Write the chain:
- step 1 : define the LLM we want to use for the model, gemini-pro in this case
- step 2 : define the chain that get as input the result of the query contained in the content fields
- step 3 : pass the content to the LLM

In [19]:
# Use code generation model
llm = VertexAI(model_name="gemini-pro", max_output_tokens=4096)

# Define the chain
chain = (
    {
        "content": lambda docs: "\n\n".join(
            format_document(doc, PromptTemplate.from_template("{page_content}"))
            for doc in docs
        )
    }
    | PromptTemplate.from_template(
        """
        Give me a list of the 20 jobs that used more bytes in the last day,report the cost as well. 
        Examine the query that is used for each job and propose an optimisation :\n\n{content}
        """
    )
    # | PromptTemplate.from_template(
    #     "Give me the amout of bytes and the total cost for each user :\n\n{content}"
    # )
    | llm
)

# Invoke the chain with the documents, and remove code backticks
result = chain.invoke(data).strip("```")
print(result)

## Top 20 Jobs by Bytes Billed in the Last Day

Here's a list of the 20 jobs that used the most bytes in the last day, along with their cost and potential optimization suggestions:

| Job ID | User Email | Query | Bytes Billed | Cost (EUR) | Optimization Suggestion |
|---|---|---|---|---|---|
| bquxjob_4f8673ff_18f3495e68d | riccardo.rubini@jakala.com | `SELECT * FROM `jit-training-dma-devops.region-EU.INFORMATION_SCHEMA.JOBS` | 1060110336 | 0.006360662016 | Consider filtering the data to reduce the amount scanned. |
| bquxjob_79a43895_18f349eb711 | riccardo.rubini@jakala.com | `SELECT user_email, job_id, query, total_bytes_billed FROM `jit-training-dma-devops.region-EU.INFORMATION_SCHEMA.JOBS` | 281018368 | 0.001686110208 | Consider using a smaller subset of columns or filtering the data. |
| bquxjob_8cc49b4_18f349eeac7 | riccardo.rubini@jakala.com | `SELECT *, user_email, job_id, query, total_bytes_billed FROM `jit-training-dma-devops.region-EU.INFORMATION_SCHEMA.JOBS` | 1060110336 |

TODO: use function calling to return this data into a format suitable to be loaded into a pandas dataframe. 

Test implementation of the same chain with the addition of the function calling to obtain a structured output

The actual structured outputt is the following: | Job ID | User Email | Query | Total Bytes Billed | Total Cost (EUR) |

In [13]:
#import the needed libraries to implement the function calling
from vertexai.generative_models import (
    FunctionDeclaration,
    GenerativeModel,
    GenerationConfig,
    GenerationResponse,
    Tool,
)

from vertexai.preview.generative_models import ToolConfig

from proto.marshal.collections import repeated
from proto.marshal.collections import maps
import json

In [14]:
# @title Helper functions


def recurse_proto_repeated_composite(repeated_object):
    repeated_list = []
    for item in repeated_object:
        if isinstance(item, repeated.RepeatedComposite):
            item = recurse_proto_repeated_composite(item)
            repeated_list.append(item)
        elif isinstance(item, maps.MapComposite):
            item = recurse_proto_marshal_to_dict(item)
            repeated_list.append(item)
        else:
            repeated_list.append(item)

    return repeated_list


def recurse_proto_marshal_to_dict(marshal_object):
    new_dict = {}
    for k, v in marshal_object.items():
        if not v:
            continue
        elif isinstance(v, maps.MapComposite):
            v = recurse_proto_marshal_to_dict(v)
        elif isinstance(v, repeated.RepeatedComposite):
            v = recurse_proto_repeated_composite(v)
        new_dict[k] = v

    return new_dict


def get_text(response: GenerationResponse):
    """Returns the Text from the Generation Response object."""
    part = response.candidates[0].content.parts[0]
    try:
        text = part.text
    except:
        text = None

    return text


def get_function_name(response: GenerationResponse):
    return response.candidates[0].content.parts[0].function_call.name


def get_function_args(response: GenerationResponse) -> dict:
    return recurse_proto_marshal_to_dict(
        response.candidates[0].content.parts[0].function_call.args
    )


def pprint(params):
    print(json.dumps(params, sort_keys=True, indent=2, separators=(",", ": ")))

In [18]:
get_structured_output = FunctionDeclaration(
    name="get_structured_output",
    description="format the input string in a structured way. The fields are divided by | while each row rapresents a different record",
    parameters={
        "type": "object",
        "properties": {
            "job_id": {
                "type": "string",
                "description": "job id of the considered bigquery job it is the first field",
            },
            "user_email": {
                "type": "string",
                "description": "mail of the user that run for each job it is the seccond field for each row",
            },
            "query": {
                "type": "string",
                "description": "the sql query of each job used to ",
            },
            "total_bytes_billed": {
                "type": "string",
                "description": "total amount of billed bytes associated to each job",
            },
             "opimised_query": {
                "type": "string",
                "description": "proposition of opmisation for the sql query associated to a certain job",
            },
        },
    },
)

formatting_tool = Tool(
    function_declarations=[get_structured_output]
)
tool_config = ToolConfig(
    function_calling_config=ToolConfig.FunctionCallingConfig(
        mode=ToolConfig.FunctionCallingConfig.Mode.AUTO,  # The default model behavior. The model decides whether to predict a function call or a natural language response.
    )
)

In [19]:
model = GenerativeModel(
    "gemini-1.0-pro-001", generation_config=GenerationConfig(temperature=0)
)

In [22]:
# Use the precedent output to ouput the model into the structure defined into the function calling


prompt =f"""Extract job_id,user_email,query,total bytes billed and cost from the following string. 
        Split the lines with \n\n
        \n\n {result}
        """

response = model.generate_content(
    prompt,
    tools=[formatting_tool],
    tool_config=tool_config
)

params = get_function_args(response)
pprint(params)

{
  "query": "SELECT \n  creation_time,\n  user_email,\n  job_id,\n  query,\n  total_bytes_billed\nFROM\n  `jit-training-dma-devops.region-EU.INFORMATION_SCHEMA.JOBS`\norder by creation_time;"
}


In [24]:
print(params['query'])

SELECT 
  creation_time,
  user_email,
  job_id,
  query,
  total_bytes_billed
FROM
  `jit-training-dma-devops.region-EU.INFORMATION_SCHEMA.JOBS`
order by creation_time;
