In [None]:
# Setup
%pip install databricks-sdk==0.40.0 databricks-feature-engineering==0.8.0 --quiet
dbutils.library.restartPython()

In [None]:
# Initialize
%run ../_resources/00-setup $reset_all_data=false

import sys
sys.path.append('../_resources')
from gamification_framework import (
    init_learner,
    display_challenge_intro,
    display_challenge_success,
    ChallengeValidator
)

learner = init_learner()
display_challenge_intro(
    challenge_name="Custom Tool Building Workshop",
    difficulty="Intermediate",
    points=200,
    description="Build production-ready AI tools from scratch. Learn design patterns, error handling, and testing strategies for agent tools."
)

---

## üìö Lesson 1: Tool Anatomy

Every great AI tool has three components:

### 1Ô∏è‚É£ **Clear Purpose**
- What specific problem does it solve?
- When should an agent use it?

### 2Ô∏è‚É£ **Well-Defined Interface**  
- What inputs does it need?
- What outputs does it produce?
- What are the edge cases?

### 3Ô∏è‚É£ **Robust Implementation**
- Error handling
- Input validation
- Clear error messages

Let's see an example:

In [None]:
# Example: A well-designed tool

# First, create sample data table
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {catalog}.{schema}.technician_availability (
    technician_id STRING,
    name STRING,
    skill_level STRING,
    available_date DATE,
    location STRING,
    hourly_rate DOUBLE
)
""")

# Insert sample data
spark.sql(f"""
INSERT OVERWRITE {catalog}.{schema}.technician_availability VALUES
('TECH-001', 'Maria Garcia', 'Senior', current_date(), 'North Region', 85.00),
('TECH-002', 'John Smith', 'Junior', current_date(), 'North Region', 45.00),
('TECH-003', 'Sarah Chen', 'Senior', current_date() + 1, 'South Region', 90.00),
('TECH-004', 'Ahmed Hassan', 'Mid-Level', current_date(), 'North Region', 65.00),
('TECH-005', 'Lisa Anderson', 'Senior', current_date() + 2, 'Central Region', 88.00)
""")

print("‚úÖ Sample data created")

In [None]:
# Now create the UC Function (AI Tool)

spark.sql(f"""
CREATE OR REPLACE FUNCTION {catalog}.{schema}.find_available_technicians(
    skill_level STRING COMMENT 'Required skill level: Senior, Mid-Level, or Junior',
    location STRING COMMENT 'Preferred location region',
    date_needed DATE COMMENT 'Date when technician is needed'
)
RETURNS STRING
COMMENT 'Finds available technicians matching the criteria. Returns formatted list with names, rates, and availability.'
RETURN (
    SELECT 
        CASE 
            WHEN COUNT(*) = 0 THEN 
                'No technicians available matching criteria. Consider expanding search to nearby regions or different dates.'
            ELSE
                CONCAT(
                    'Found ', CAST(COUNT(*) AS STRING), ' available technician(s):\n',
                    CONCAT_WS('\n', 
                        COLLECT_LIST(
                            CONCAT(
                                '‚Ä¢ ', name, ' (', skill_level, ') - $', 
                                CAST(hourly_rate AS STRING), '/hr - Available: ', 
                                CAST(available_date AS STRING), ' - Location: ', location
                            )
                        )
                    )
                )
        END
    FROM {catalog}.{schema}.technician_availability
    WHERE skill_level = find_available_technicians.skill_level
        AND location = find_available_technicians.location  
        AND available_date <= find_available_technicians.date_needed
)
""")

print("‚úÖ Tool created: find_available_technicians")
print("\nüìù Tool Features:")
print("  ‚úì Clear parameter descriptions (COMMENT)")
print("  ‚úì Handles zero results gracefully")
print("  ‚úì Returns formatted, human-readable output")
print("  ‚úì Includes helpful context (rates, dates, locations)")

In [None]:
# Test the tool
result = spark.sql(f"""
SELECT {catalog}.{schema}.find_available_technicians(
    'Senior',
    'North Region', 
    current_date()
) as result
""").collect()[0]['result']

print("üß™ Test Result:")
print(result)

---

## üéØ Challenge 1: Build a Cost Estimation Tool

**Your Task:** Create a UC Function that estimates maintenance costs.

**Requirements:**

1. **Function Name**: `estimate_maintenance_cost`
2. **Parameters**:
   - `issue_type` (STRING): Type of issue (bearing, blade, gearbox, electrical, sensor)
   - `severity` (STRING): Low, Medium, High, Critical
   - `parts_needed` (INT): Number of parts to replace
3. **Business Logic**:
   - Base costs: bearing=$5000, blade=$25000, gearbox=$50000, electrical=$8000, sensor=$2000
   - Severity multiplier: Low=1.0x, Medium=1.5x, High=2.0x, Critical=3.0x
   - Additional parts: +20% per extra part beyond the first
   - Labor: 15% of parts cost
4. **Output**: Formatted cost breakdown with total
5. **Error Handling**: Handle invalid inputs gracefully

In [None]:
# üí™ YOUR TURN - Build the cost estimation tool

spark.sql(f"""
CREATE OR REPLACE FUNCTION {catalog}.{schema}.estimate_maintenance_cost(
    -- TODO: Define your parameters with COMMENT descriptions
)
RETURNS STRING
COMMENT 'TODO: Add a clear description of what this tool does'
RETURN (
    -- TODO: Implement the cost calculation logic
    -- 
    -- Hints:
    -- - Use CASE statements for base costs and multipliers
    -- - Calculate parts cost, labor cost, and total
    -- - Format output as a readable breakdown
    -- - Handle edge cases (invalid issue type, negative parts, etc.)
    -- 
    -- Example output format:
    -- üí∞ Maintenance Cost Estimate:
    -- Issue Type: Gearbox (High Severity)
    -- Base Cost: $50,000
    -- Severity Multiplier: 2.0x
    -- Parts Cost: $100,000 (2 parts)
    -- Labor Cost: $15,000 (15%)
    -- TOTAL ESTIMATED COST: $115,000
    
    SELECT 'TODO: Implement cost calculation'
)
""")

print("‚úÖ Your function created - now test it!")

In [None]:
# Test your tool with different scenarios

test_cases = [
    ("gearbox", "High", 2),
    ("sensor", "Low", 1),
    ("blade", "Critical", 3),
    ("invalid_type", "Medium", 1),  # Test error handling
]

print("üß™ Testing your tool:\n")
for issue_type, severity, parts in test_cases:
    try:
        result = spark.sql(f"""
        SELECT {catalog}.{schema}.estimate_maintenance_cost(
            '{issue_type}', '{severity}', {parts}
        ) as result
        """).collect()[0]['result']
        print(f"Test: {issue_type}, {severity}, {parts} parts")
        print(result)
        print("-" * 60)
    except Exception as e:
        print(f"‚ùå Error: {str(e)}")
        print("-" * 60)

---

## üìö Lesson 2: Python-Based Tools

Some tools need more complex logic. Let's build one in Python:

In [None]:
# Create a Python UDF for complex scheduling logic

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from datetime import datetime, timedelta
import json

@udf(returnType=StringType())
def schedule_maintenance_window(turbine_id: str, priority: str, estimated_hours: int) -> str:
    """
    Intelligently schedules maintenance based on priority and capacity.
    
    Args:
        turbine_id: Unique turbine identifier
        priority: CRITICAL, HIGH, MEDIUM, LOW
        estimated_hours: Expected maintenance duration
        
    Returns:
        JSON string with recommended schedule
    """
    try:
        # Priority-based scheduling
        priority_days = {
            'CRITICAL': 0,  # Immediate
            'HIGH': 1,
            'MEDIUM': 3,
            'LOW': 7
        }
        
        days_to_add = priority_days.get(priority.upper(), 7)
        start_date = datetime.now() + timedelta(days=days_to_add)
        
        # Optimize for minimal grid impact (schedule at night for non-critical)
        if priority.upper() != 'CRITICAL':
            start_time = "22:00"  # 10 PM
        else:
            start_time = "ASAP"  
        
        # Calculate end time
        if start_time != "ASAP":
            end_datetime = start_date + timedelta(hours=estimated_hours)
            end_time = end_datetime.strftime("%H:%M")
        else:
            end_time = f"+{estimated_hours}hrs"
        
        result = {
            "turbine_id": turbine_id,
            "priority": priority,
            "scheduled_date": start_date.strftime("%Y-%m-%d"),
            "start_time": start_time,
            "estimated_duration_hours": estimated_hours,
            "end_time": end_time,
            "notes": f"{'EMERGENCY - Immediate action required' if priority == 'CRITICAL' else 'Scheduled during off-peak hours'}"
        }
        
        return json.dumps(result, indent=2)
        
    except Exception as e:
        return json.dumps({"error": f"Scheduling failed: {str(e)}"})

# Register as SQL function
spark.udf.register("schedule_maintenance_window", schedule_maintenance_window)

print("‚úÖ Python-based tool registered")

In [None]:
# Test the Python tool
result = spark.sql("""
SELECT schedule_maintenance_window('WT-042', 'CRITICAL', 6) as schedule
""").collect()[0]['schedule']

print("üìÖ Scheduling Result:")
print(result)

---

## üéØ Challenge 2: Build a Smart Diagnostic Tool

**Your Task:** Create a Python UDF that analyzes sensor patterns and suggests root causes.

**Requirements:**

1. **Function Name**: `diagnose_turbine_issue`
2. **Parameters**:
   - `vibration_level` (DOUBLE): mm/s
   - `temperature` (DOUBLE): Celsius
   - `power_output` (DOUBLE): MW
   - `rated_capacity` (DOUBLE): MW
3. **Diagnostic Logic**:
   - High vibration + Normal temp = Bearing issue
   - High temp + Low power = Gearbox overheating
   - Normal vibration + High temp = Cooling system failure
   - Low power + Normal other metrics = Blade pitch problem
   - Multiple anomalies = Compound issue requiring expert
4. **Output**: JSON with diagnosis, confidence, and recommended actions

In [None]:
# üí™ YOUR TURN - Build the diagnostic tool

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
import json

@udf(returnType=StringType())
def diagnose_turbine_issue(
    vibration_level: float,
    temperature: float, 
    power_output: float,
    rated_capacity: float
) -> str:
    """
    TODO: Add docstring explaining the function
    """
    try:
        # TODO: Implement diagnostic logic
        # 
        # Define thresholds:
        # - Normal vibration: < 8 mm/s
        # - Normal temperature: 60-75¬∞C  
        # - Normal power: > 80% of rated capacity
        #
        # Check combinations and return diagnosis
        #
        # Return JSON format:
        # {
        #   "diagnosis": "Issue description",
        #   "root_cause": "Likely cause",
        #   "confidence": "High/Medium/Low",
        #   "severity": "Critical/High/Medium/Low",
        #   "recommended_actions": ["action1", "action2"],
        #   "estimated_repair_time": "X hours"
        # }
        
        result = {
            "diagnosis": "TODO",
            "root_cause": "TODO",
            "confidence": "TODO",
            "severity": "TODO",
            "recommended_actions": [],
            "estimated_repair_time": "TODO"
        }
        
        return json.dumps(result, indent=2)
        
    except Exception as e:
        return json.dumps({"error": f"Diagnosis failed: {str(e)}"})

# Register the function
spark.udf.register("diagnose_turbine_issue", diagnose_turbine_issue)

print("‚úÖ Your diagnostic tool registered")

In [None]:
# Test your diagnostic tool

test_scenarios = [
    (12.5, 70, 2.1, 2.5, "High vibration scenario"),
    (7.0, 95, 1.5, 2.5, "High temperature scenario"),
    (6.5, 68, 1.2, 2.5, "Low power scenario"),
    (15.0, 100, 0.8, 2.5, "Multiple anomalies"),
]

print("üß™ Testing diagnostic tool:\n")
for vib, temp, power, rated, description in test_scenarios:
    result = spark.sql(f"""
    SELECT diagnose_turbine_issue({vib}, {temp}, {power}, {rated}) as diagnosis
    """).collect()[0]['diagnosis']
    
    print(f"üìä {description}")
    print(f"   Vibration: {vib} mm/s | Temp: {temp}¬∞C | Power: {power}/{rated} MW")
    print(result)
    print("-" * 70)

---

## üéØ Challenge 3: Tool Integration Test

Now let's test if your tools work together as part of an agent system:

In [None]:
# Validate your tools are properly registered
validator = ChallengeValidator()

print("üîç Validating your tools...\n")

# Check if functions exist
tools_to_check = [
    "find_available_technicians",
    "estimate_maintenance_cost",
]

tools_valid = True
for tool_name in tools_to_check:
    is_valid = validator.validate_uc_function(catalog, schema, tool_name)
    if not is_valid:
        tools_valid = False

if tools_valid:
    print("\n‚úÖ All tools validated successfully!")
    learner.complete_challenge("custom_tool_building", points=200)
    learner.award_badge("tool_builder")
    display_challenge_success("Custom Tool Building Workshop", 200)
else:
    print("\n‚ö†Ô∏è Some tools need fixes. Review the output above.")

---

## üéì Key Takeaways

You've learned:

‚úÖ **Tool Design Patterns** - Creating intuitive, AI-friendly interfaces  
‚úÖ **SQL Functions** - Leveraging data directly in tools  
‚úÖ **Python UDFs** - Complex logic and custom algorithms  
‚úÖ **Error Handling** - Making tools robust and reliable  
‚úÖ **Testing Strategies** - Validating tool behavior

### üèÜ Tool Design Best Practices

1. **Clear Names**: Use verb_noun pattern (e.g., `get_status`, `calculate_cost`)
2. **Document Everything**: Use COMMENT extensively
3. **Handle Errors**: Never let tools crash silently
4. **Format Output**: Return human-readable results
5. **Test Edge Cases**: Invalid inputs, null values, extreme scenarios

---

## üöÄ Next Steps

Your tools are ready! Now use them in:

- **05.2-agent-creation-guide**: Build an agent using your tools
- **05.7-multi-agent-orchestration**: Combine multiple tool-using agents
- **05.X-real-world-scenarios**: Test tools in emergency simulations

In [None]:
# Check your overall progress
learner.display_progress()