##### Support Request Agent Stream

Stream support requests through the support agent endpoint and persist reports.

In [None]:
DATABRICKS_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None)
DATABRICKS_HOST = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().getOrElse(None)

CATALOG = dbutils.widgets.get("CATALOG")
SUPPORT_AGENT_ENDPOINT_NAME = dbutils.widgets.get("SUPPORT_AGENT_ENDPOINT_NAME")

In [None]:
%pip install openai

In [None]:
import json
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from openai import OpenAI

BATCH_SIZE = 20


def process_support_request(support_request_id: str, user_id: str, order_id: str, request_text: str) -> str:
    client = OpenAI(
        api_key=DATABRICKS_TOKEN,
        base_url=f"{DATABRICKS_HOST}/serving-endpoints",
    )

    fallback = json.dumps({
        "support_request_id": support_request_id,
        "user_id": user_id,
        "order_id": order_id,
        "credit_recommendation": None,
        "refund_recommendation": None,
        "draft_response": "Thanks for contacting support. We are reviewing your request and will follow up shortly.",
        "past_interactions_summary": "No history available.",
        "order_details_summary": "Order context unavailable.",
        "decision_confidence": "low",
        "escalation_flag": True,
    })

    message = (
        f"support_request_id={support_request_id} "
        f"user_id={user_id} "
        f"order_id={order_id} "
        f"text={request_text}"
    )

    for _ in range(3):
        try:
            response_obj = client.responses.create(
                model=SUPPORT_AGENT_ENDPOINT_NAME,
                input=[{"role": "user", "content": message}],
            )
            response = response_obj.output[-1].content[0].text
            json.loads(response)
            return response
        except Exception:
            continue

    return fallback


process_support_request_udf = udf(process_support_request, StringType())

spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.support")
spark.sql(
    f"""
    CREATE TABLE IF NOT EXISTS {CATALOG}.support.support_agent_reports (
      support_request_id STRING,
      user_id STRING,
      order_id STRING,
      request_text STRING,
      ts TIMESTAMP,
      agent_response STRING
    )
    """
)

# Backfill-safe schema evolution for existing tables.
try:
    spark.sql(f"ALTER TABLE {CATALOG}.support.support_agent_reports ADD COLUMNS (request_text STRING)")
except Exception:
    pass

spark.sql(
    f"""
    ALTER TABLE {CATALOG}.support.support_agent_reports
    SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
    """
)

# Backfill raw request text for historical rows where it was not stored yet.
spark.sql(
    f"""
    MERGE INTO {CATALOG}.support.support_agent_reports s
    USING (
      SELECT support_request_id, request_text
      FROM {CATALOG}.support.raw_support_requests
    ) r
    ON s.support_request_id = r.support_request_id
    WHEN MATCHED AND s.request_text IS NULL THEN
      UPDATE SET s.request_text = r.request_text
    """
)

pending = spark.sql(
    f"""
    SELECT r.support_request_id, r.user_id, r.order_id, r.request_text
    FROM {CATALOG}.support.raw_support_requests r
    LEFT ANTI JOIN {CATALOG}.support.support_agent_reports s
      ON r.support_request_id = s.support_request_id
    LIMIT {BATCH_SIZE}
    """
)

processed = (
    pending
    .withColumn("ts", F.current_timestamp())
    .withColumn(
        "agent_response",
        process_support_request_udf(
            F.col("support_request_id"),
            F.col("user_id"),
            F.col("order_id"),
            F.col("request_text"),
        ),
    )
    .select("support_request_id", "user_id", "order_id", "request_text", "ts", "agent_response")
)

processed.write.mode("append").saveAsTable(f"{CATALOG}.support.support_agent_reports")
print(f"Processed {processed.count()} support requests in this run")