# Synthesize Questions from AI generated Table Metadata

In [0]:
# %pip install mlflow mlflow[databricks] databricks-agents
# dbutils.library.restartPython()

@frpm_ai_comment @satscores_ai_comment @schools_ai_comment 
based on these three tables, use every column except the 'comment' column, since it is incomplete and outdated. Put more weight on the 'updated_comment' column in addition to other columns, I want to synthesize business intelligence questions similar to these 3 questions.
{'What is the highest eligible free rate for K-12 students in the schools in Alameda County?',
'Please list the lowest three eligible free rates for students aged 5-17 in continuation schools.',
'Please list the zip code of all the charter schools in Fresno County Office of Education.'}

Please return 10 such synthetic examples.

## Step 1 – Load & Display Column Metadata From All Three Tables

In this step we:
1. Read the three metadata tables into Spark DataFrames.
2. Drop the obsolete `comment` column.
3. Union the three DataFrames together.
4. Display the full result so that the LLM can later consume every row from all tables.

Because each table only contains one row per column in the underlying datasets, the overall size is small, so scanning the full tables is safe even though we normally limit scans.

In [0]:
from pyspark.sql import functions as F

# Fully qualified table names
frpm_tbl = "yyang.hackathon_2025q3_project_geniepromptautocompletion_genieeng_feindustryhls.frpm_ai_comment"
sat_tbl = "yyang.hackathon_2025q3_project_geniepromptautocompletion_genieeng_feindustryhls.satscores_ai_comment"
school_tbl = "yyang.hackathon_2025q3_project_geniepromptautocompletion_genieeng_feindustryhls.schools_ai_comment"

# Load DataFrames and drop obsolete column
frpm_df = spark.table(frpm_tbl).drop("comment")
sat_df = spark.table(sat_tbl).drop("comment")
school_df = spark.table(school_tbl).drop("comment")

# Add a source column for traceability
frpm_df = frpm_df.withColumn("source_table", F.lit("frpm_ai_comment"))
sat_df = sat_df.withColumn("source_table", F.lit("satscores_ai_comment"))
school_df = school_df.withColumn("source_table", F.lit("schools_ai_comment"))

# Union all
combined_df = frpm_df.unionByName(sat_df).unionByName(school_df)

# Display full metadata
display(combined_df)

## Step 2 – Synthesize \# of BI Questions with an LLM

We will now:
1. Extract the column metadata we displayed in Step&nbsp;1 into a textual form suitable for prompting.
2. Construct a prompt that:
   * Instructs the model to generate BI questions similar in style to the three examples given by the user.
   * Specifies that the `comment` column should be ignored and that extra weight should be placed on `updated_comment`.
   * Embeds the full column list including their `updated_comment` field.
3. Invoke the Databricks model-serving endpoint `databricks-meta-llama-3-1-405b-instruct` via `mlflow.deployments` to obtain exactly # questions.
4. Display the generated questions.

In [0]:

num_syn = 200

In [0]:
# Only return the questions you synthesized, row by row, dont skip any row. Dont explain anything. Dont include anything else.

In [0]:
import mlflow
from textwrap import dedent
import os

# Ensure combined_df exists
df_pd = combined_df.select("col_name", "data_type", "updated_comment", "source_table").toPandas()

# Build metadata block for prompt
def row_to_line(row):
    return f"{row['col_name']} ({row['data_type']}, source={row['source_table']}) - {row['updated_comment']}"

metadata_lines = "\n".join(df_pd.apply(row_to_line, axis=1).tolist())

example_questions = (
    "1. What is the highest eligible free rate for K-12 students in the schools in Alameda County?\n"
    "2. Please list the lowest three eligible free rates for students aged 5-17 in continuation schools.\n"
    "3. Please list the zip code of all the charter schools in Fresno County Office of Education."
)


system_prompt = dedent(f"""
You are an expert data analyst tasked with crafting business-intelligence (BI) questions that can be answered from the provided school datasets.

Requirements:
* Use every column when relevant, but COMPLETELY IGNORE the field named `comment` because it is obsolete.
* Place extra weight on the `updated_comment` field, which contains improved descriptions.
* Produce exactly {num_syn} DISTINCT questions similar in style and complexity to the examples given.
* Vary the aggregation (highest, lowest, average, count, percentage, etc.), filtering dimensions (county, school type, grade span, demographics, years, etc.), and output formats (list values, top-N, aggregated value).
* Do NOT repeat the sample questions verbatim.
* Number the questions 1-{num_syn}.
* Do NOT generate any programming code.
* Return the final result in a json dictionary format. Only return the dictionary as your output.


Available columns with updated descriptions:\n{metadata_lines}

Sample question style:\n{example_questions}

""")

# Initialize deployments client
try:
    from mlflow import deployments as ml_deployments
except ImportError:
    import mlflow.deployments as ml_deployments

client = ml_deployments.get_deploy_client("databricks")

chosen_endpoint = os.environ.get("LLM_ENDPOINT", "databricks-llama-4-maverick")
print(f"Using endpoint: {chosen_endpoint}")

response = client.predict(
    endpoint=chosen_endpoint,
    inputs={"messages": [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": f"Please generate the {num_syn} BI questions now."}
    ]}
)

try:
    questions_text = response["choices"][0]["message"]["content"].strip()
except Exception:
    questions_text = str(response)

print("Generated Questions:\n")
print(questions_text)

In [0]:
# TODO: if generate more than 200, say I tried 500, output above will skip many questions, it tends to use '...' in the answer. We have to use a loop to concatenate, or we can write an agent calling the above function tool to generate the answer set X times and concatenate with dedupe.

In [0]:
import json

questions_text = questions_text.replace("```json","").replace("```", "")

questions_dict = json.loads(questions_text)
questions_df = spark.createDataFrame([(k, v) for k, v in questions_dict.items()], ["number", "query"])
display(questions_df)

In [0]:
# from pyspark.sql import Row

# # Convert questions_text (numbered list) to list of questions
# questions = [q.strip().split('. ', 1)[1] if '. ' in q else q.strip() for q in questions_text.split('\n') if q.strip() and q[0].isdigit()]

# # Create Spark DataFrame
# questions_df = spark.createDataFrame([Row(query=q) for q in questions]) 

In [0]:
questions_df.display()

In [0]:
questions_df.write.mode("append") \
            .option("delta.enableChangeDataFeed", "false") \
            .saveAsTable("yyang.hackathon_2025q3_project_geniepromptautocompletion_genieeng_feindustryhls.aisynthesized_questions_table") 

## Step 3 - Update the existing delta table storing questions 

The managed VS index will sync on this table

In [0]:
from delta.tables import DeltaTable
from pyspark.sql import functions as F

target_table = "yyang.hackathon_2025q3_project_geniepromptautocompletion_genieeng_feindustryhls.questions_table"

# Get current max id from the target table
max_id = spark.table(target_table).agg(F.max("id")).collect()[0][0]
if max_id is None:
    max_id = 0

# Add incremented id to questions_df
questions_with_id = questions_df.withColumn("id", F.monotonically_increasing_id() + max_id + 1)

# Select only required columns for upsert
upsert_df = questions_with_id.select("id", "query")

# Upsert into Delta table
delta_table = DeltaTable.forName(spark, target_table)
delta_table.alias("target").merge(
    upsert_df.alias("source"),
    "target.query = source.query"
).whenNotMatchedInsert(
  values={"id": "source.id", "query": "source.query"}
).execute()

# whenMatchedUpdate is skipped for above DeltaTable operation

In [0]:
spark.table(target_table).count()

## Step 4 - Sync the VS Index manually

Depending on your setting of the VS Index update schedule, e.g., if 'Continous', you can skip below. But if you have "Triggered", please manually sync the VS index by running below cells.

In [0]:
%pip install databricks-vectorsearch
%restart_python

In [0]:
from databricks.vector_search.client import VectorSearchClient
vsc = VectorSearchClient()

In [0]:
table_name = "yyang.hackathon_2025q3_project_geniepromptautocompletion_genieeng_feindustryhls.questions_table"

vs_index_fullname = table_name.replace("questions_table", "questions_table_vs")

vs_endpoint_name = "vs_endpoint_2025q3_hackathon_project_geniepromptautocompletion_genieeng_feindustryhls".lower()[:49]

In [0]:
index = vsc.get_index(endpoint_name=vs_endpoint_name, index_name=vs_index_fullname)

index.describe()

In [0]:
index.sync()