In [None]:
from google.cloud import bigquery
from vertexai.generative_models import GenerativeModel, GenerationConfig
import warnings

# Do this: pip install google-cloud-bigquery-storage and db_dtypes

# Suppress warnings with a specific message
warnings.filterwarnings(
    "ignore", 
    message="Your application has authenticated using end user credentials from Google Cloud SDK without a quota project."
)

In [None]:
def generate_response(prompt, model_name="gemini-2.0-flash", temperature=0.4, top_k=30, top_p=0.3):
    ai_client = GenerativeModel(
        model_name=model_name,
        generation_config=GenerationConfig(
            temperature=temperature,  
            top_k=top_k,  
            top_p=top_p
        )
    )
    response_result = ai_client.generate_content(prompt)
    return response_result.candidates[0].content.text

In [None]:
# Setup

project_name = "placeholder"
bigquery_client = bigquery.Client(project=project_name)
product = "placeholder"
app_id = "placeholder"
dataset_id = f"""snowplow-cto-office.snowplow_{product}"""

In [None]:
full_responses_table_name = f"""{dataset_id}.{app_id}_session_chunks"""

query_headers = f"""SELECT
        column_name AS field_name,
        data_type AS field_type,
        description AS field_description,
    FROM
        `{dataset_id}.INFORMATION_SCHEMA.COLUMN_FIELD_PATHS`
    WHERE
        table_name = '{app_id}_session_chunks';
    """

query_job = bigquery_client.query(query_headers)
table = query_job.result()

rows = list(table)

all_fields_type = {row[0]: row[1] for row in rows}
all_fields_desc = {row[0]: row[2] for row in rows}
all_fields_names = {row[0]: row[0] for row in rows}

In [None]:
def extract_keywords(prompt, assumptions = None):
    instructions = f"""

    **Prompt:** 
    {prompt}

    **Assumptions about prompt:** 
    {assumptions}

    **Guidelines:** 
    - Extract the most important keywords from the prompt. 
    - The keywords will be given to a LLM together with the prompt so that it can better understand the prompt and what need to be answered.
    - Choose relevant words in the prompt that need to be considered rather that conjuring up new ones. Do not include redundant keywords.
    - Make use of assumptions if they exist. The assumptions are correct and are meant to clarify any ambiguous word or concept. You can think of them as addition to the prompt.

    ** Output format:** 
    <[keyword1, keyword2, ...]>
    
    """

    keywords = generate_response(instructions, temperature=0.2, top_k=20, op_p=0.5)
    #print(keywords)

    return keywords

In [None]:
def field_selection(prompt, keywords, assumptions = None):
    instructions = f"""

    **Prompt:**
    - {prompt}

    **Assumptions about prompt:**
    - {assumptions}

    **Table schema:**
    - Name of fields: {all_fields_names}
    - Data type of values in the fields: {all_fields_type}
    - Field descriptions: {all_fields_desc}

    **Instructions:**
    1. Identify and list all relevant fields from the table schema that are useful for answering the prompt.
    2. Use these keywords to find the relevant fields: {keywords}.
    3. Do not include any extraneous symbols, such as JSON formatting, brackets, or custom fields.
    4. For each selected field, explain its relevance and how it helps to answer the prompt.
    5. Make use of assumptions if they exist. The assumptions are correct and is incorporated into the prompt.

    **Important:**
    - If **all necessary fields** to answer the prompt are present in the schema, proceed to list the fields and their corresponding use cases: field_name, [use case]
    - If the prompt mentions a clearly defined concept or event that is missing AND cannot be calculated or derived from the available data, return only: `False, [explanation]`
    - Do **not** make assumptions or workarounds unless explicitly allowed.
    - ABSOLUTELY DO NOT USE ANY CUSTOM FIELDS. CUSTOM FIELDS ARE NOT ALLOWED.

    Output format:
    field_name1, use case
    field_name2, use case
    ...

    Do **not** hallucinate or infer information.

    """

    fields_response = generate_response(instructions, temperature=0.05, top_k=30, top_p=0.3)

    #print(fields_response)
    #print("----------------------------------------------------------------------------------")


    lines = fields_response.split("\n")
    first_part = []
    second_part = []
    var_type = []

    for line in lines:
        if "," in line:
            first, second = line.split(",", 1) 
            first_part.append(first.strip())  
            second_part.append(second.strip())  

    for i in first_part:
        var_type.append(all_fields_type[i])

    fields_type = {
        first_part[i]: {
            'data_type': var_type[i],
        }
        for i in range(len(first_part))
    }

    fields_desc = {
        first_part[i]: {
            'description': second_part[i],
        }
        for i in range(len(first_part))
    }

    return fields_type, fields_desc

In [None]:
def expected(prompt, fields_type, fields_desc, assumptions = None):
    instructions = f"""
    ### **Task:**   
    You are retrieving data from BigQuery to answer user queries. Given a prompt, state the expected result that the LLM should provide.  

    ### **Prompt:**   
    {prompt} 

    ### **Assumptions:**
    {assumptions}

    ### **Available table schema:**
    - Fields description: {fields_desc}
    - Fields type: {fields_type}

    ### **Instructions:**  
    - Use available data information from the fields to create the best possible expected result for the prompt.
    - Only state what the expected result should be with no explanations about it.
    - The expected result should reflect what the LLM should output after querying BigQuery. In essence, how the format and structure of the data should look like. 
    - Let the output be in concise sentences.
    - Make use of the assumptions when creating the expected result. However, only take the most reasonable parts that are relevant.

    **Output format:**  
    Expected result: <Your response here>
    """
    
    response = generate_response(instructions, temperature=0.2, top_k=20, top_p=0.5)
    #print(response)
    
    return response

In [None]:
def create_sql_code(prompt, fields_type, fields_desc, strategy, keywords, expected_result, assumptions = None):
    instructions = f""""
    **Prompt:** {prompt}

    **Table schema:** 
    - Data types: {fields_type} 
    - Descriptions: {fields_desc} 
    - Table: {full_responses_table_name}

    **Strategy:**
    - {strategy} 

    **Keywords:** 
    - {keywords}

    **Expected result:** 
    - {expected_result}

    **Assumptions about prompt:** 
    - {assumptions}

    **Constraints:** 
    - The query is sent to BigQuery. It needs to follow BigQuery syntax.
    - Fully follow the strategy
    - Fully create a query that aligns with the expected result
    - Keep query costs as low as possible

    ### **Task:**  
    - Generate the best possible BigQuery SQL query that:  
    1. Follows the strategy step-by-step without deviation. Treat the strategy as a strict blueprint; implement each step exactly as described.
    2. Exclude all Null Values from everywhere (`IS NOT NULL`)

    **Query Guidelines:**
    - Select only necessary fields; avoid extra data.
    - Use accurate column names; no transformations.
    - Prefer partition filtering over `EXTRACT()`. Use `DATE_TRUNC(date_column, DATE_PART)` correctly (e.g., `DATE_TRUNC(session_started_date, WEEK)`), without string literals.
    - Use approximate counts where possible.
    - Consider clustering for better filtering.
    - **Exclude NULL values** (`IS NOT NULL`).
    - Make use of the assumptions. The assumptions are correct and is incorporated into the prompt.
    - Order and keep the data user-friendly and answers the prompt.
    - Be careful of LIMIT expects an integer literal or parameter
    - The query should only give the user ONE table of result.

    Use standard SQL. The query will be sent to **BigQuery**, so DO NOT INCLUDE EXPLANATIONS. Make sure the query is VALID. Do not hallucinate.
    """

    code = generate_response(instructions, temperature=0.1, top_k=20, top_p=0.2)
    code.replace("```sql", "").replace("```", "").strip()
    #print(code)

    return code

In [None]:
def verify(prompt, fields_type, fields_desc, query, expected_result, assumptions, strategy):
    instructions = f"""
    ### **Prompt:** 
    {prompt}

    ### **Strategy for creating SQL code:** 
    {strategy}

    ### **SQL Query:** 
    {query}

    ### **Expected result:** 
    {expected_result}

    ### **Table schema:** 
    - Fields descriptions: {fields_desc}
    - Fields type: {fields_type}

    ### **Task:**  
    - Does this query align with the combination of the prompt, the expected result?
    - Will the syntax work for BigQuery?

    ### **Guidelines:**  
    - Assume the query is incorrect unless there is strong evidence it is correct.  
    - If answering "Yes," to a task, explicitly explain why.  
    - If answering "No," to a task, identify the issue and explain why it does not meet the requirement.  
    - Look at the query and see if the code makes sense and produces reasonable result.
    - Answer the task questions indepnedently from each other.
    - Follow the output format.
    
    ### **Output format:**  
    Yes/No. explanation
    Yes/No. explanation
    """
    

    response = generate_response(instructions, temperature=0.1, top_k=30, top_p=0.3)

    #print(response)

    lines = response.split("\n")
    first_part = []
    second_part = []
    for line in lines:
        if "." in line:  # Ensure there is a comma before splitting
            first, second = line.split(".", 1)  # Split only at the first comma
            first_part.append(first.strip())  # Store the first part (column name)
            second_part.append(second.strip())  # Store the second part (description)

    check_list = []
    for i in range(len(first_part)):
        if first_part[i] == "No":
            check_list.append(second_part[i])

    return check_list



In [None]:
def correct_sql_error(prompt, keywords, expected_result, sql_code, error, fields_type, fields_desc, strategy):
    instructions = f"""
    **Prompt:**
    - {prompt}

    **Important keywords:**
    - {keywords}

    **Strategy for creating SQL code:**
    - {strategy}

    **Expected result:**
    - {expected_result}
    
    ### **Original SQL Query (Faulty):**
    {sql_code}

    ### **Error Message:** 
    {error}

    ### **Table Details:** 
    - Table name: {full_responses_table_name}
    - Field data types: {fields_type}
    - Field descriptions: {fields_desc}

    ### **Task:** 
    - Review the query and update the query to solve the error while adhering to the prompt, keywords, strategy, and expected result:
    
    ### **Guidelines:** 
    - Look at the error and identify where in the code the error exists.
    - Look at the numbers [xx, xx]
    - Resolve the issue by identifying and changing the code appropriately.

    ### **Output:**  
    Please provide only the corrected SQL query. Ensure the corrected query is syntactically and logically sound, and adheres to the provided guidelines and constraints.
    """


    response = generate_response(instructions, temperature=0.1, top_k=40, top_p=0.3)
    #print(response)

    return response.replace("```sql", "").replace("```", "").strip()

In [None]:
def result_interpretation(prompt, result, code, assumptions):
    instructions = f"""
    ### **Prompt:**
    {prompt}  

    ### **SQL code:**   
    {code}  

    ### **Data:**   
    {result}  

    ### **Assumptions:**   
    {assumptions}  

    ### **Task:**  
    Analyze the provided dataset and generate a meaningful interpretation that answers the given prompt in a user-friendly manner.  

    ### **Steps:**  
    1. **Answer the question directly.** Provide a clear, data-driven response.  
    2. **Extract key insights.** Identify trends, anomalies, or correlations that are not immediately obvious.  
    3. **Use external knowledge when relevant.** If applicable, explain insights using common patterns or domain knowledge.  
    4. **Focus on the overall pattern** rather than unnecessary details unless a specific outlier is crucial.  
    5. **State relevant assumptions.** Only include assumptions that impact the interpretation of result (avoid discussing column names, data types, or table structures).  

    ### **Guidelines:**  
    - **Be concise.** Prioritize clarity over lengthy explanations.  
    - **Use a structured format.** Keep responses readable with bullet points and section headings.  
    - **Avoid redundant details.** Don’t restate information that is already evident from the dataset.  
    - **Ignore missing table schema details.** Focus on strategy, definitions, and calculations instead. Do not focus on BigQuery table assumptions.
    - **Assistant with a friendly tone.** Speak to the user as if this is the data you have to asnwer the prompt. Make objective statements and discuss uncertainties when possible.  
    - **BigQuery.** The data is extracted from BigQuery with (U.S weekday numbering convention e.g. Sunday is 1 and Monday is 2).
    - **Use the code for interpretation** The SQL code that extracts the data contains information which explains certain assumptions and why the data looks the way it does. Make use of the code when making assumptions.

    ### **Expected Output Format:**  
    1. **Best Answer to the Prompt** (direct response)  
    2. **Key Insights & Trends** (summary of findings)  
    3. **Relevant Assumptions** (only those impacting interpretation of the result)  
    """


    final_answer_response = generate_response(instructions, temperature=0.3, top_k=40, top_p=0.6)

    

    return final_answer_response

In [None]:
aiAgent = DataQueryAgent(project_name, bigquery_client, generate_response)

In [None]:
prompt = "Where are our users from?"
#prompt = "Which companies have the highest engagement?"
#prompt = "Which customers have the most sessions?"
#prompt = "Based on the bigger customers; what days and times is the best for performing updates, maintenance?"
#prompt = "At what times and days are people online the least?"
#prompt = "How do system outages impact user behavior? "
#prompt = "How did the user behavior change between 30th december 2024, 31th december 2024, 1th january 2025 and 2nd january 2025? "
#prompt = "Do performance issues correlate with a drop in activity?"
#prompt = "What combinations of browser and os are the worst according to performance?"
#prompt = "Based on user behavior, what will the peak activity hours be tomorrow?"
#prompt = "How does app performance affect user engagement and retention?"
#prompt = "Which devices and OS versions are experiencing the worst performance?"
prompt = "Vilka OS system och enheter är sämst enligt performance?"
#prompt = "Can you see a pattern if you compare Network Latency, Page Load Fails, Page Loading Time and Device and browsers?"
#prompt = "Can you calculate the monthly stickiness for the products in 2024?"
#prompt = "Can you calculate the WAU/MAU stickiness for the products for all weeks in 2024?"
#prompt = "Are there any insights in the correlation between the size of cutsomers and have huge page load times?"
#prompt = "What are the network latency & slow load times observed during hours when users are most active"
#prompt = "How much did the total session time increase or decrease in percentage and number between each month in 2025 compared to the months in 2024? Can you divide into user roles"
#prompt = "For all Windows users, is Edge or Chrome better according to performance? Look for browsers that contain the name either edge or chrome. Compare all versions."
#prompt = "What devices are mostly used? and what dimensions are they?"
#prompt = "Traffic Patterns & Peak Usage"
#prompt = "System Performance & Browser Issues"
#prompt = "Extract top 10 customers with longest session times in 2024, show the number of unique users and total sessions as well"
prompt = "When are our users the least active? Give top' 10 combinations of days and times. Segment it into 2 hours so for example 0-2am, 2-4am and so on"
#prompt = "Make a table of recurring feedback themes that occur at specific times of the year or at specific times of the month showing theme recurring interval (monthly, yearly, quarterly) and typical time in that interval."

data, interpretation = aiAgent.process_prompt(prompt)
print(data)
#print(data.to_string())
print("----------------------------------------------------------------------------------")
print(interpretation)

In [None]:
prompt = "Which customers have the most sessions?"
prompt = "Can you calculate the monthly stickiness for the customer with the highest stickiness in each module in 2024?"
#prompt = "Can you calculate the monthly stickiness for each of the module in 2024?"
#prompt = "How much did the total session time increase or decrease in percentage and number between each month in 2025 compared to the months in 2024? Can you divide into user roles"
#prompt = "Based on the bigger customers; what days and times is the best for performing updates, maintenance?"
#prompt = "When are our users the least active? Give top' 10 combinations of days and times. Segment it into 2 hours so for example 0-2am, 1-3am and so on"
#prompt = "Extract top 10 customers with longest session times in 2024, show the number of unique users and total sessions as well"
#prompt = "How much did the total session time increase or decrease in percentage and number between each month in 2025 compared to the months in 2024? Can you divide into user roles"
#prompt = "Which customers are the most sticky for which parts of the product?"
#prompt = "Which customers are the most sticky for each of the module?"
#prompt = "Whats the customer lifetime value of my product?"
#prompt = "Which customers have the longest average session length amongst its leaders?"
#prompt = "Are there Traffic Patterns that we should be extra awere of?"
#prompt = "42"
response = aiAgent.process_prompt(prompt)
print(response)

In [None]:
class DataQueryAgent:
    def __init__(self, project_id: str, bigquery_client, generate_response_fn):
        self.client = bigquery_client
        self.project_id = project_id
        self.generate_response = generate_response_fn

    
    def process_prompt(self, user_input):
        keywords = extract_keywords(user_input)
        fields_type, fields_desc = field_selection(user_input, keywords)
        assumptions = self.make_assumptions(user_input, fields_type, fields_desc, keywords)
        assumptions = self.refine_assumptions(user_input, assumptions)
        
        # Re-evaluate query parameters
        keywords = extract_keywords(user_input, assumptions)
        fields_type, fields_desc = field_selection(user_input, keywords, assumptions)
        expected_result = expected(user_input, fields_type, fields_desc, assumptions)
        
        # Generate a strategy, generate code, and verify SQL query
        sql_code, strategy = self.generate_valid_sql(user_input, fields_type, fields_desc, keywords, expected_result, assumptions)
        if not sql_code:
            return "Failed to generate a reasonable SQL query after 3 attempts."
        
        # Validate SQL query using BigQuery dry run
        valid_result, result_df = self.validate_sql_query(prompt, keywords, expected_result, sql_code, fields_type, fields_desc, strategy)
        if not valid_result:
            return "Failed to generate a valid SQL query after 3 attempts."
        
        # Process query results
        result_df = result_df.map(lambda x: "Missing Data" if x in {"", "None"} else x).dropna()
        interpretation = result_interpretation(user_input, result_df, sql_code, assumptions)
        
        return result_df, interpretation
    

    def refine_assumptions(self, user_input, assumptions):
        while True:
            user_clarifications = input(f"Are the following assumptions correct?\n\n{assumptions}")
            if user_clarifications.lower() in {"", "yes", "y"}:
                print(assumptions)
                print("----------------------------------------------------------------------------------")
                return assumptions
            assumptions = self.interpret_clarification(user_input, assumptions, user_clarifications)
    

    def generate_valid_sql(self, user_input, fields_type, fields_desc, keywords, expected_result, assumptions):
        for attempt in range(3):
            strategy = self.create_strategy(user_input, fields_type, fields_desc, keywords, expected_result, assumptions)
            sql_code = create_sql_code(user_input, fields_type, fields_desc, strategy, keywords, expected_result, assumptions)
            validation_errors = verify(user_input, fields_type, fields_desc, sql_code, expected_result, assumptions, strategy)
            if not validation_errors:
                return sql_code, strategy 
        
        return None, None


    def validate_sql_query(self, prompt, keywords, expected, sql_code, fields_type, fields_desc, strategy):
        """Performs a dry run of the SQL query to check validity."""
        job_config = bigquery.QueryJobConfig(dry_run=True)
        
        for attempt in range(3):
            try:
                query_job = self.client.query(sql_code, job_config=job_config)
                query_job.result()
                print(f"Query is valid. Estimated processing: {query_job.total_bytes_processed / 1e9:.2f} GB.")
                return True, self.client.query(sql_code).to_dataframe()
            except Exception as e:
                print(f"Query validation failed (Attempt {attempt + 1}/3): {e}")
                sql_code = correct_sql_error(prompt, keywords, expected, sql_code, e, fields_type, fields_desc, strategy)
        return False, None
    

    def make_assumptions(self, prompt, fields_type, fields_desc, keywords):
        instructions = f"""
        ### **Prompt:** {prompt}
        ### **Keywords:** {keywords}
        ### **Table Info:**
        - Fields Type: {fields_type}
        - Fields Description: {fields_desc}
        - Table name: {full_responses_table_name}
        
        ### **Constraints:**
        - Use only given fields.
        - No external data or assumptions beyond the provided fields.
        
        ### **Task:**
        - Propose short, reasonable, and user-friendly assumptions to resolve potential ambiguities in the prompt, leveraging the table info implicitly.
        - Propose reasonable definitions, calculations and other clarifications on the prompt.

        ### **Guidelines:**
        - Base assumptions on the most likely intent of the prompt, and keywords, informed by available data patterns from the table.
        - Keep assumptions simple and verifiable by the user, who lacks table schema knowledge.
        - Remember that the user cannot read all assumptions if they are too long. That is why it is important to keep each assumptions short as punctual.
        - Output a numeric list.
        - Do NOT generate SQL code.
        
        ### **Output Format:**
        1. ...
        2. ...
        """
        return self.generate_response(instructions, temperature=0.1, top_k=20, top_p=0.3)
    

    def interpret_clarification(self, prompt, assumptions, user_clarifications):
        instructions = f"""
        ### **Initial Prompt:** {prompt}
        ### **Current Assumptions:** {assumptions}
        ### **User Clarifications:** {user_clarifications}
        ### **Task:**
        - Update assumptions based on clarifications.
        - If no change is needed, return the current assumptions.
        """
        return self.generate_response(instructions, temperature=0.2, top_k=40, top_p=0.5)
    

    def create_strategy(self, prompt, fields_type, fields_desc, keywords, expected_result, assumptions):
        instructions = f"""
        ### **Prompt:** {prompt}
        ### **Keywords:** {keywords}
        ### **Expected Result:** {expected_result}
        ### **Table Details:**
        - Fields Type: {fields_type}
        - Fields Description: {fields_desc}
        ### **Assumptions:**
        - {assumptions}
        ### **Constraints:** 
        - Use only the fields in the specified table—no external data or tables.  
        - Base the strategy solely on the given prompt, keywords, expected result, and table details.  
        ### **Task:** 
        - Suggest specific SQL operations in a step-by-step tailored to the prompt, assumptions and expected result.
        - Make sure the steps guide the query to be as user friendly as possible.

        ### **Guidelines:**
        - Have many steps so that nothing is lost.
        - Do not generate executable SQL.
        - Keep the strategy concise and focused—no fluff or extraneous steps.

         ### **Output:**   
        - **Step-by-Step Strategy:** ("Title/Purpose" - do this)  
        """
        strategy = self.generate_response(instructions, temperature=0.1, top_k=40, top_p=0.5)
        #print(strategy)

        return strategy