In [0]:
import os
import requests
import pandas
import collections

from pyspark.sql.functions import monotonically_increasing_id, pandas_udf, concat_ws, lit, col, from_json, struct, expr
from pyspark.sql.types import StringType, ArrayType
from concurrent.futures import ThreadPoolExecutor

# TODO: Atualizar
CATALOG = 'perdomo_demos'
SCHEMA = 'demo_agents'
#################

VOLUME_NAME = 'raw_data'
VOLUME_FOLDER =  f"/Volumes/{CATALOG}/{SCHEMA}/{VOLUME_NAME}"

In [0]:
spark.sql(f"CREATE VOLUME IF NOT EXISTS {CATALOG}.{SCHEMA}.{VOLUME_NAME};")
spark.sql(f"USE CATALOG {CATALOG}")
spark.sql(f"USE SCHEMA {SCHEMA}")

### Definindo funções

In [0]:
%sql
CREATE OR REPLACE FUNCTION get_customer_by_email(email_input STRING COMMENT 'customer email used to retrieve customer information')
RETURNS TABLE (
    customer_id BIGINT,
    first_name STRING,
    last_name STRING,
    email STRING,
    phone STRING,
    address STRING,
    city STRING,
    state STRING,
    zip_code STRING,
    customer_segment STRING,
    registration_date DATE,
    customer_status STRING,
    loyalty_tier STRING,
    tenure_years DOUBLE,
    churn_risk_score BIGINT,
    customer_value_score BIGINT
)
COMMENT 'Returns the customer record matching the provided email address. Includes its ID, firstname, lastname and more.'
RETURN (
    SELECT * FROM customers
    WHERE email = email_input
    LIMIT 1
);

In [0]:
%sql
CREATE OR REPLACE FUNCTION get_customer_billing_and_subs(customer_id_input BIGINT COMMENT 'customer ID used to retrive orders, billing and subscriptiosn')
RETURNS TABLE (
    customer_id BIGINT,
    subscription_id BIGINT,
    service_type STRING,
    plan_name STRING,
    plan_tier STRING,
    monthly_charge BIGINT,
    start_date DATE,
    contract_length_months BIGINT,
    status STRING,
    autopay_enabled BOOLEAN,
    total_billed DOUBLE,
    total_paid DOUBLE,
    total_late_payments BIGINT,
    total_late_fees DOUBLE,
    latest_payment_status STRING
)
COMMENT 'Returns subscription and billing details for a customer.'
RETURN (
    SELECT
        s.customer_id, s.subscription_id, s.service_type, s.plan_name, s.plan_tier,
        s.monthly_charge, s.start_date, s.contract_length_months, s.status, s.autopay_enabled,
        COALESCE(b.total_billed, 0), COALESCE(b.total_paid, 0),
        COALESCE(b.total_late_payments, 0), COALESCE(b.total_late_fees, 0),
        COALESCE(b.latest_payment_status, 'N/A')
    FROM subscriptions s
    LEFT JOIN (
        SELECT
            subscription_id, customer_id,
            SUM(total_amount) AS total_billed,
            SUM(payment_amount) AS total_paid,
            COUNT_IF(payment_date > due_date OR payment_status = 'Late') AS total_late_payments,
            SUM(CASE WHEN payment_date > due_date OR payment_status = 'Late' THEN total_amount - payment_amount ELSE 0 END) AS total_late_fees,
            MAX(payment_status) AS latest_payment_status
        FROM billing
        WHERE customer_id = customer_id_input
        GROUP BY subscription_id, customer_id
    ) b ON s.subscription_id = b.subscription_id
    WHERE s.customer_id = customer_id_input
);

### Dados para demo

In [0]:
def download_file_from_git(dest, path):
    def download_file(url, destination):
      local_filename = url.split('/')[-1]
      # NOTE the stream=True parameter below
      with requests.get(url, stream=True) as r:
        r.raise_for_status()
        print('saving '+destination+'/'+local_filename)
        with open(destination+'/'+local_filename, 'wb') as f:
          for chunk in r.iter_content(chunk_size=8192): 
            # If you have chunk encoded response uncomment if
            # and set chunk_size parameter to None.
            #if chunk: 
            f.write(chunk)
      return local_filename
    
    def download_to_dest(url):
      try:
        s3url = url.replace("https://raw.githubusercontent.com/databricks-demos/dbdemos-dataset/main/", "https://dbdemos-dataset.s3.amazonaws.com/")
        download_file("https://dbdemos-dataset.s3.amazonaws.com/", dest)
      except:
        download_file(url, dest)

    if not os.path.exists(dest):
      os.makedirs(dest)
    
    files = requests.get(f'https://api.github.com/repos/databricks-demos/dbdemos-dataset/contents/llm/ai-agent{path}').json()
    files = [f['download_url'] for f in files if 'NOTICE' not in f['name']]
    with ThreadPoolExecutor(max_workers=10) as executor:
      collections.deque(executor.map(download_to_dest, files))

In [0]:
download_file_from_git(VOLUME_FOLDER+'/customers', "/customers")
download_file_from_git(VOLUME_FOLDER+'/subscriptions', "/subscriptions")
download_file_from_git(VOLUME_FOLDER+'/billing', "/billing")
download_file_from_git(VOLUME_FOLDER+'/eval_dataset', "/eval_dataset")
#download_file_from_git(VOLUME_FOLDER+'/pdf_documentation', "/pdf_documentation")

In [0]:
spark.read.parquet(f"{VOLUME_FOLDER}/customers").write.mode('overwrite').saveAsTable('customers')
spark.read.parquet(f"{VOLUME_FOLDER}/subscriptions").write.mode('overwrite').saveAsTable('subscriptions')
spark.read.parquet(f"{VOLUME_FOLDER}/billing").write.mode('overwrite').saveAsTable('billing')

### Dataset de avaliação sintético com ai_query

In [0]:
@pandas_udf(StringType())
def generate_question(email: pandas.Series, row_id: pandas.Series) -> pandas.Series:
    templates = [
        "What is the phone number of {email}?",
        "List all orders placed by {email}.",
        "What is the current subscription status for {email}?",
        "Show billing details for {email}.",
        "Does {email} have any unpaid invoices?",
        "Which products did {email} purchase?",
        "What is the loyalty tier of {email}?",
        "When did {email} register as a customer?",
        "Summarize all subscriptions held by {email}.",
        "What city does {email} live in?",
        "What is the current account status of {email}?",
        "How many years has {email} been a customer?",
        "What is the churn risk score for {email}?",
        "What is the customer value score for {email}?",
        "Is autopay enabled for {email}'s account?",
        "How many late payments has {email} had?",
        "What is the zip code of {email}?",
        "What type of customer is {email} (e.g., Individual, Business)?",
        "What is the full address of {email}?"
    ]
    return pandas.Series([
        templates[int(i) % len(templates)].format(email=e)
        for i, e in zip(row_id, email)
    ])

In [0]:
df = spark.table("customers").limit(50).withColumn("row_id", monotonically_increasing_id())
df = df.withColumn("question", generate_question("email", "row_id"))

df_pd = df.toPandas()
df_clean = spark.createDataFrame(df_pd)

In [0]:
# Prompt for AI_QUERY
df_clean = df_clean.withColumn(
    "prompt",
    concat_ws(
        " ",
        lit("You are evaluating an AI system."),
        lit("Based on the following customer record:"),
        concat_ws(", ",
            df_clean.first_name, df_clean.last_name, df_clean.email, df_clean.phone,
            df_clean.address, df_clean.city, df_clean.state, df_clean.zip_code,
            df_clean.customer_segment, df_clean.registration_date.cast("string"),
            df_clean.customer_status, df_clean.loyalty_tier,
            df_clean.tenure_years.cast("string"), df_clean.churn_risk_score.cast("string"),
            df_clean.customer_value_score.cast("string")
        ),
        lit("Generate a JSON array of factual statements (expected_facts) that should be included in the correct answer to the following question. Each item must be a complete, natural language sentence. Return only a valid JSON array of strings, nothing else."),
        lit("Question:"), df_clean.question
    )
)
df_clean.createOrReplaceTempView("customer_test_questions")

# Call AI_QUERY
final_df_raw = spark.sql("""
  SELECT 
    question,
    AI_QUERY("databricks-claude-3-7-sonnet", prompt) AS expected_facts_json
  FROM customer_test_questions
""")

In [0]:
# Parse JSON string into Array<String>
final_df = final_df_raw.withColumn(
    "expected_facts",
    from_json(col("expected_facts_json"), ArrayType(StringType()))
)

# Build structured evaluation format
eval_df = final_df.withColumn("inputs", struct("question")) \
                  .withColumn("predictions", lit("")) \
                  .withColumn("expectations", struct("expected_facts")) \
                  .select("inputs", "predictions", "expectations")

eval_df.write.format('json').mode("overwrite").save(f"{VOLUME_FOLDER}/eval_dataset")