In [None]:
import psycopg2
import datetime
import pandas as pd
from databricks.sdk import WorkspaceClient
import uuid
import mlflow
import dspy

mlflow.dspy.autolog()
gpt_oss = dspy.LM(model="databricks/databricks-gpt-oss-120b", cache=False)
dspy.configure(lm=gpt_oss)
w = WorkspaceClient()

# Access Databricks Asset Bundle variables passed via base_parameters
try:
    # These variables are passed from the DAB job configuration
    instance_name = dbutils.widgets.get("instance_name")
    current_username = dbutils.widgets.get("current_username")
    current_shortName = dbutils.widgets.get("current_shortname")
    batch_interval = dbutils.widgets.get("batch_interval")
except:
    # Fallback values for development/testing
    instance_name = "fe_shared_demo"
    current_username = "default_user"
    batch_interval = "180"

print(f"Using instance_name: {instance_name}")
print(f"Using current_username: {current_username}")
print(f"Using batch_interval: {batch_interval}")

instance = w.database.get_database_instance(name=instance_name)
cred = w.database.generate_database_credential(request_id=str(uuid.uuid4()), instance_names=[instance_name])

class turbine_details_to_report(dspy.Signature):
  """Convert the turbine look up results to the example report below. Replace the numbers with what you see in the turbine lookup results: 
  <Example Report>
  'Turbine Health Report - TURBINE_00000
Time: Sep 23, 2025 07:17 UTC | Status: ðŸ”´ CRITICAL - IMMEDIATE ACTION
Alert Summary
THERMAL FAULT - Priority 5/5 | Dispatch Score: 100/100 | SLA: BREACH
Current Metrics

Health Score: 46/100 (min: 45) | Degradation Risk: 46.3/100
Uptime: 0% | Status: STOPPED (5/9 readings)
Power Output: 5.8 kW avg (52.1 kW total) | Efficiency: 6.3%
Wind Speed: 2.0 m/s | Rotor RPM: 0.01

Technical Data

Temp Differential: 14Â°C | Gearbox Temp Variance: 2.6
Vibration: 0.63 max (variance: 0.11)
Oil Pressure: 41.0 bar (minimum)
Maintenance Alerts: 9 over 2.4 minutes

Action Required
IMMEDIATE_INTERVENTION - Maintenance window: 1 day
Location: 32.0Â°N, 101.5Â°W
Issue: Unit non-generating with thermal fault. Investigate gearbox and thermal systems immediately.'
</Example Report>"""

  turbine_lookup_results: str = dspy.InputField()
  report: str = dspy.OutputField()

def agent_status_update(query):
  """This is used to update what the agent decided to do for each turbine. Queries must be in postgres syntax"""
  with psycopg2.connect(
      host=instance.read_write_dns,
      dbname="databricks_postgres",
      user=f"{current_username}",
      password=cred.token,
      sslmode="require"
  ) as conn:
      with conn.cursor() as cur:
          result = cur.execute(query)
  return result


def turbine_lookup(turbine):
      """This is a query to look up information about a turbine."""

      genie_space_id = "01f09a993be215439f76eaa8c029d6cd"

      # Start a conversation
      conversation = w.genie.start_conversation_and_wait(
          space_id=genie_space_id,
          content=f"Provide the latest detailed status and sensor readings for turbine {turbine}",
          timeout=datetime.timedelta(minutes=20)
      )

      result = w.genie.get_message_attachment_query_result(
        space_id=genie_space_id,
        conversation_id=conversation.conversation_id,
        message_id=conversation.message_id,
        attachment_id=conversation.attachments[0].attachment_id
      )

      # Get column names
      column_names = [col.name for col in result.statement_response.manifest.schema.columns]

      # Get data
      data_array = result.statement_response.result.data_array

      # Create DataFrame
      df = pd.DataFrame(data_array, columns=column_names)
      df = df.to_string()

      # df = spark.table("users.austin_choi.gold_maintenance_alerts")
      # display(df)
      # df = df.filter(df.turbine_id == turbine).orderBy(df.window_start.desc()).limit(1)
      # display(df)     
      # df = df.toPandas().to_string()
      # print(df)
      with dspy.context(lm=dspy.LM('databricks/databricks-gpt-oss-20b')):
        predictor = dspy.Predict(turbine_details_to_report)
        result = predictor(turbine_lookup_results=df)
      return result.report

In [0]:
#Make Table
make_table = """
            CREATE TABLE IF NOT EXISTS turbine_monitoring (
                id SERIAL PRIMARY KEY,
                turbine_id VARCHAR(50) NOT NULL,
                turbine_details_and_status TEXT,
                dispatch_priority_score DECIMAL(5,2),
                recommended_action TEXT,
                maintenance_alerts_count INTEGER DEFAULT 0,
                agent_summary TEXT,
                last_agent_action_taken TEXT,
                Ticket_ID VARCHAR(50),
                ticket_filed BOOLEAN DEFAULT FALSE,
                ticket_status VARCHAR(50),
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """

agent_status_update(make_table)

make_ticket_table = """
            CREATE TABLE IF NOT EXISTS ticket_mock_table (
                id SERIAL PRIMARY KEY,
                Ticket_ID VARCHAR(50) NOT NULL,
                turbine_id VARCHAR(50) NOT NULL,
                ticket_status_output Text,
                Ticket_Submission Text,
                Ticket_Status Text,
                Last_Update TIMESTAMP,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """
agent_status_update(make_ticket_table)

### Decide which turbines need a dispatcher
1. Get latest maintenance request
2. Check status on lakebase 
3. determine dispatcher need

In [None]:
df = spark.sql(f"""
SELECT *
FROM users.{current_shortName}.maintenance_dispatch_queue t
WHERE maintenance_alerts_count >= 5
QUALIFY ROW_NUMBER() OVER (
  PARTITION BY turbine_id
  ORDER BY alert_generated_timestamp DESC
) <= 1
""")
display(df)

In [None]:
#add a check here to see if a ticket is already open for a turbine. If it is, skip it. 
query = """SELECT t1.turbine_id, t1.ticket_status
FROM turbine_monitoring t1
INNER JOIN (
    SELECT turbine_id, MAX(updated_at) AS max_updated_at
    FROM turbine_monitoring
    GROUP BY turbine_id
) t2 ON t1.turbine_id = t2.turbine_id AND t1.updated_at = t2.max_updated_at
where ticket_status = 'Open';"""

with psycopg2.connect(
      host=instance.read_write_dns,
      dbname="databricks_postgres",
      user=f"{current_username}",
      password=cred.token,
      sslmode="require"
  ) as conn:
      with conn.cursor() as cur:
          cur.execute(query)
          result = cur.fetchall()

In [0]:
turbines_with_open_tickets = [ticket[0] for ticket in result]

# Filter dataframe to exclude turbines with open tickets
filtered_df = df[~df['turbine_id'].isin(turbines_with_open_tickets)]
display(filtered_df)
query_results = filtered_df.toPandas().to_string()

turbine_id,turbine_lat,turbine_lon,window_start,dispatch_priority_score,recommended_action,fault_category,estimated_maintenance_window_days,work_complexity_hours,crew_size_required,equipment_required,avg_health_score,degradation_risk_score,maintenance_alerts_count,alert_generated_timestamp
TURBINE_00000,32.0,-101.5,2025-09-26T10:33:03.635Z,100.0,IMMEDIATE_INTERVENTION,THERMAL,1,4,3,"THERMAL_IMAGING,COOLING_SYSTEM",46.11111111111112,61.55555555555556,9,2025-09-26T19:57:37.462Z
TURBINE_00001,32.0,-101.5,2025-09-26T10:33:03.635Z,80.0,SCHEDULED_MAINTENANCE,ELECTRICAL,7,6,1,"MULTIMETER,INSULATION_TESTER",86.66666666666667,30.33333333333333,9,2025-09-26T19:57:37.462Z
TURBINE_00005,32.0,-101.5,2025-09-26T10:33:03.635Z,100.0,IMMEDIATE_INTERVENTION,THERMAL,1,4,3,"THERMAL_IMAGING,COOLING_SYSTEM",45.55555555555556,46.77777777777778,9,2025-09-26T19:57:37.462Z
TURBINE_00007,32.0,-101.5,2025-09-26T10:27:08.334Z,100.0,IMMEDIATE_INTERVENTION,THERMAL,1,4,3,"THERMAL_IMAGING,COOLING_SYSTEM",45.55555555555556,46.77777777777778,9,2025-09-26T19:57:37.462Z


In [0]:

class query_dispatch_decider(dspy.Signature):
  """determine which turbines require filing a ticket for a dispatcher. Turbines need a dispatcher if there are more than 5 maintenance alerts. Update the status with what you decided to do. You cannot submit tickets, only change ticket_status to 'Need to File'
  
  Update the VALUES section of the postgres sql query below with the information you see in the query:
  <Example Query>
    INSERT INTO turbine_monitoring 
    (turbine_id, turbine_details_and_status, dispatch_priority_score, recommended_action, 
     maintenance_alerts_count, agent_summary, last_agent_action_taken, Ticket_ID, ticket_filed, ticket_status)
VALUES 
    ('TURBINE_00000', (turbine_lookup output), 85.50, 'Schedule immediate inspection', 3, 
     'HIGH URGENCY: Multiple sensor anomalies detected', 'Ticket not filed due to low maintenance alerts', '<Ticket ID>, null if does not exist', False, 'Need to File');
</Example Query>
     
    For turbine_details_and_status, grab the latest information about a turbine using turbine_lookup.
     
    For Turbines that do not require any action, do a simple update saying no ticket filed."""

  query: str = dspy.InputField()
  turbine_ticket_status: list[dict] = dspy.OutputField(desc="""each turbine should have it's own entry like this: {"turbine": "TURBINE_00000", "ticket_status": "Need to File"}""")

In [0]:
predictor = dspy.ReAct(query_dispatch_decider, tools=[agent_status_update, turbine_lookup])
result = predictor(query=query_results)
print(result.turbine_ticket_status)

[{'turbine': 'TURBINE_00000', 'ticket_status': 'Need to File'}, {'turbine': 'TURBINE_00001', 'ticket_status': 'Need to File'}, {'turbine': 'TURBINE_00005', 'ticket_status': 'Need to File'}, {'turbine': 'TURBINE_00007', 'ticket_status': 'Need to File'}]


Trace(trace_id=tr-c7c0b03bf106850ac75ff88b1a1d92d8)

### File Ticket

In [None]:
def ticket_status_lookup(query):
  """Queries must be in postgres syntax. The column ticket_status will contain the status. The table is turbine_monitoring with this schema: 
  turbine_monitoring (
                id SERIAL PRIMARY KEY,
                turbine_id VARCHAR(50) NOT NULL,
                turbine_details_and_status TEXT,
                dispatch_priority_score DECIMAL(5,2),
                recommended_action TEXT,
                maintenance_alerts_count INTEGER DEFAULT 0,
                agent_summary TEXT,
                last_agent_action_taken TEXT,
                Ticket_ID VARCHAR(50),
                ticket_filed BOOLEAN DEFAULT FALSE,
                ticket_status VARCHAR(50),
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP"""
  with psycopg2.connect(
      host=instance.read_write_dns,
      dbname="databricks_postgres",
      user=f"{current_username}",
      password=cred.token,
      sslmode="require"
  ) as conn:
      with conn.cursor() as cur:
        cur.execute(query)
        result = cur.fetchall() 
  return result

def file_a_ticket(query):
  """Queries must be in postgres syntax. The column ticket_status will contain the status. The Table is ticket_mock_table with this schema:
  ticket_mock_table (
                id SERIAL PRIMARY KEY,
                Ticket_ID VARCHAR(50) NOT NULL,
                turbine_id VARCHAR(50) NOT NULL,
                ticket_status_output Text,
                Ticket_Submission Text,
                Ticket_Status Text,
                Last_Update TIMESTAMP,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP"""
  with psycopg2.connect(
      host=instance.read_write_dns,
      dbname="databricks_postgres",
      user=f"{current_username}",
      password=cred.token,
      sslmode="require"
  ) as conn:
      with conn.cursor() as cur:
          result = cur.execute(query)
  return result

In [0]:
class turbine_ticket_management(dspy.Signature):
  """Follow these Steps:
  1. Use ticket_status_lookup to see if a ticket needs to be filed. Only use the latest entry for a Turbine. Pull all columns of turbines that 'Need to File'. Ticket_status must say 'Need to File. 
  2. Use file_a_ticket to file said ticket based on the information from ticket_status_lookup. Ticket_submission must be populated with turbine_details_and_status 
  3. Once a ticket is filed, Update the turbine entry from ticket_status_lookup to say "Open" instead of "Need to File", update last_agent_action_taken with 'filed a ticket' and update the ticket ID with the newly created Ticket ID """
  turbine: list = dspy.InputField() 
  response_of_completed_actions: str = dspy.OutputField()

In [0]:
ticket_lookup = dspy.ReAct(turbine_ticket_management, tools=[ticket_status_lookup, file_a_ticket])
ticket_status_output = ticket_lookup(turbine=result.turbine_ticket_status)
print(ticket_status_output.response_of_completed_actions)

Tickets successfully filed and turbine records updated:

- TURBINE_00005 â†’ Ticket ID: TICKET_00005 (status Open, action filed a ticket)  
- TURBINE_00007 â†’ Ticket ID: TICKET_00007 (status Open, action filed a ticket)  
- TURBINE_00000 â†’ Ticket ID: TICKET_00000 (status Open, action filed a ticket)  
- TURBINE_00001 â†’ Ticket ID: TICKET_00001 (status Open, action filed a ticket)

All turbine entries now show `ticket_status = 'Open'`, `last_agent_action_taken = 'filed a ticket'`, and the appropriate `Ticket_ID`.


Trace(trace_id=tr-fc0ad11b7a2fb5d84f97bf5d6e223302)