# Experiment V2 Custom Workflows - Jupyter Notebook

This notebook demonstrates the complete workflow for creating and managing custom evaluation experiments using **Keywords AI Experiment V2 API**.

## What are Custom Workflows?

Custom workflows allow you to:
- **Submit your own workflow results** via API
- **Leverage automatic evaluator execution** on your outputs
- **Process dataset entries** with your own models/logic
- **Track and evaluate** custom processing pipelines

## Workflow Overview

1. **Create Experiment** - Initialize a custom workflow experiment (creates placeholder traces)
2. **Get Summary** - View aggregated statistics for the experiment
3. **List Traces** - Retrieve placeholder traces with dataset inputs
4. **Get Trace Details** - Fetch full input content for processing
5. **Process Input** - Run your custom logic on the input data
6. **Submit Results** - Update traces with your workflow outputs
7. **Check Evaluators** - View automatic evaluator scores

## API Documentation

- **Base URL**: `https://api.keywordsai.co`
- **Experiment V2**: `/api/v2/experiments/`
- **Authentication**: Bearer token in Authorization header

## Requirements

```bash
pip install requests
```

## ‚ö†Ô∏è Known Issues & API Differences

### Backend Issues:
1. **Summary endpoint** (`/logs/summary/`) - May return `NaN` for `avg_latency` causing 500 errors
2. **List endpoint** (`/logs/list/`) - May fail with 500 when filtering

**Workarounds implemented:**
- Summary calls are optional and will be skipped if they fail
- List calls use GET without filters to avoid backend serialization bugs
- Detailed error messages show what the backend returned

### API Behavior vs Documentation:
The actual API behavior differs from the provided documentation:

| Documented Behavior | Actual Behavior |
|---------------------|-----------------|
| Traces have status `'pending_custom_submission'` | Traces have status `'success'` |
| Need to filter for pending traces | Process all traces from custom workflows |
| Placeholder output with specific message | Varies (check trace details) |

**This notebook handles both documented and actual API behavior.**

## 1. Imports

In [173]:
import requests
import time
import json
from typing import Dict, Any, List, Optional

## 2. Configuration

**‚ö†Ô∏è Important:** Replace the placeholder values below with your actual credentials and IDs.

In [None]:
# API Configuration
BASE_URL = "http://127.0.0.1:8000"
API_KEY = "apikey"  # Replace with your actual API key

# Experiment Configuration
EXPERIMENT_NAME = "My Custom Workflow Experiment"
EXPERIMENT_DESCRIPTION = "Testing custom workflow implementation with V2 API"
DATASET_ID = "2cfbb5b4-53d2-4232-abd3-de4886f0255e"  # Replace with your dataset ID

# Workflow Configuration
WORKFLOW_CONFIG = {
    "type": "custom",
    "config": {
        "name": "Custom Processing Workflow",
        "description": "User-defined workflow for custom processing"
    }
}

# Evaluator Configuration (list of evaluator slugs)
EVALUATOR_SLUGS = [
    "edb79105-0efe-42fd-b6b5-6e10d2b04117",  # Replace with your evaluator slugs
]

# Global state variables (will be populated during execution)
experiment_id = None
trace_ids = []  # List of trace IDs to process

## 3. API Functions

These functions handle individual API operations for the experiment workflow.

### 3.1 Create Custom Workflow Experiment

In [175]:
def create_experiment(name: str, description: str, dataset_id: str, 
                     workflows: List[Dict], evaluator_slugs: List[str]) -> Dict[str, Any]:
    """Create a new custom workflow experiment.
    
    This creates placeholder traces for each dataset entry that you can
    update with your custom workflow results.
    """
    url = f"{BASE_URL}/api/v2/experiments/"
    
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {API_KEY}"
    }
    
    payload = {
        "name": name,
        "description": description,
        "dataset_id": dataset_id,
        "workflows": workflows,
        "evaluator_slugs": evaluator_slugs
    }
    
    print("Creating custom workflow experiment...")
    print(f"  URL: {url}")
    print(f"  Name: {name}")
    print(f"  Dataset: {dataset_id}")
    print(f"  Evaluators: {', '.join(evaluator_slugs)}")
    print(f"  Request Body: {json.dumps(payload, indent=2)}")
    
    response = requests.post(url, headers=headers, json=payload)
    response.raise_for_status()
    
    data = response.json()
    print(f"\n‚úì Experiment created with ID: {data.get('id')}")
    print(f"  Status: {data.get('status')}")
    print("  Placeholder traces are being created asynchronously...")
    
    return data


### 3.2 Get Experiment Summary


In [176]:
def get_experiment_summary(exp_id: str, filters: Optional[List[Dict]] = None) -> Dict[str, Any]:
    """Get aggregated summary statistics for experiment traces."""
    url = f"{BASE_URL}/api/v2/experiments/{exp_id}/logs/summary/"
    
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    
    print(f"\nGetting experiment summary for {exp_id}...")
    print(f"  URL: {url}")
    
    if filters:
        request_body = {"filters": filters}
        print(f"  Method: POST")
        print(f"  Request Body: {json.dumps(request_body, indent=2)}")
        response = requests.post(url, headers=headers, json=request_body)
    else:
        print(f"  Method: GET")
        response = requests.get(url, headers=headers)
    
    response.raise_for_status()
    
    data = response.json()
    print(f"‚úì Summary retrieved:")
    print(f"  Total traces: {data.get('total_count', 0)}")
    print(f"  Total cost: ${data.get('total_cost', 0):.4f}")
    print(f"  Total tokens: {data.get('total_tokens', 0)}")
    print(f"  Avg latency: {data.get('avg_latency', 0):.2f}s")
    
    return data


### 3.3 List Experiment Logs (Traces)


In [177]:
def list_experiment_logs(exp_id: str, filters: Optional[List[Dict]] = None, 
                        page: int = 1, page_size: int = 100) -> Dict[str, Any]:
    """List traces for an experiment with optional filtering.
    
    For custom workflows, this returns placeholder traces containing the
    dataset inputs you need to process.
    """
    url = f"{BASE_URL}/api/v2/experiments/{exp_id}/logs/list/"
    
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    
    params = {
        "page": page,
        "page_size": page_size
    }
    
    print(f"\nListing experiment logs for {exp_id}...")
    print(f"  URL: {url}")
    print(f"  Params: {params}")
    if filters:
        print(f"  Filters: {json.dumps(filters, indent=2)}")
    
    # Try to get logs
    try:
        if filters:
            request_body = {"filters": filters}
            print(f"  Method: POST")
            print(f"  Request Body: {json.dumps(request_body, indent=2)}")
            response = requests.post(url, headers=headers, json=request_body, params=params)
        else:
            print(f"  Method: GET")
            response = requests.get(url, headers=headers, params=params)
        
        response.raise_for_status()
        
    except requests.exceptions.HTTPError as e:
        print(f"‚ùå Error listing logs: {e}")
        print(f"   Status code: {e.response.status_code}")
        
        # Show response content for debugging
        try:
            error_json = e.response.json()
            print(f"   Response: {error_json}")
        except:
            print(f"   Response (text): {e.response.text[:500]}")
        
        # If POST with filters failed with 500, try GET without filters
        if filters and e.response.status_code == 500:
            print("\n‚ö† Retrying without filters (backend may have serialization bug)...")
            try:
                response = requests.get(url, headers=headers, params=params)
                response.raise_for_status()
                print("‚úì GET without filters succeeded")
            except Exception as retry_error:
                print(f"‚ùå Retry also failed: {retry_error}")
                raise e  # Raise original error
        else:
            raise
    
    data = response.json()
    results = data.get('results', [])
    print(f"‚úì Found {len(results)} logs (page {page})")
    print(f"  Total count: {data.get('count', 0)}")
    
    if results:
        status_counts = {}
        for log in results:
            status = log.get('status', 'unknown')
            status_counts[status] = status_counts.get(status, 0) + 1
        print(f"  Status breakdown: {status_counts}")
    
    return data


### 3.4 Get Trace Details


In [178]:
def get_trace_details(exp_id: str, trace_id: str, include_full_span_tree: bool = True) -> Dict[str, Any]:
    """Get detailed information about a specific trace.
    
    Use this to retrieve full input content (untruncated) for processing.
    
    Args:
        exp_id: Experiment ID
        trace_id: Trace ID
        include_full_span_tree: If True, includes detail=1 to get full span tree with evaluator results
    """
    url = f"{BASE_URL}/api/v2/experiments/{exp_id}/logs/{trace_id}/"
    
    headers = {
        "Authorization": f"Bearer {API_KEY}"
    }
    
    params = {}
    if include_full_span_tree:
        params["detail"] = 1  # Include full span tree with evaluator results
    
    print(f"  Getting trace details...")
    print(f"    URL: {url}")
    print(f"    Method: GET")
    if params:
        print(f"    Params: {params}")
    
    response = requests.get(url, headers=headers, params=params)
    response.raise_for_status()
    
    return response.json()


### 3.5 Submit Custom Workflow Results


In [179]:
def submit_workflow_results(exp_id: str, trace_id: str, 
                           input_data: Any, output_data: Any,
                           name: Optional[str] = None, 
                           metadata: Optional[Dict] = None) -> Dict[str, Any]:
    """Update a placeholder trace with your custom workflow results.
    
    The system will automatically run configured evaluators on your output.
    """
    url = f"{BASE_URL}/api/v2/experiments/{exp_id}/logs/{trace_id}/"
    
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }
    
    payload = {
        "input": input_data,
        "output": output_data
    }
    
    if name:
        payload["name"] = name
    if metadata:
        payload["metadata"] = metadata
    
    print(f"    Submitting results to: {url}")
    print(f"    Method: PATCH")
    print(f"    Payload keys: {list(payload.keys())}")
    # Don't print full input/output as they might be large
    print(f"    Input type: {type(input_data).__name__}")
    print(f"    Output type: {type(output_data).__name__}")
    
    response = requests.patch(url, headers=headers, json=payload)
    response.raise_for_status()
    
    updated_trace = response.json()
    
    # Validate the submission was successful
    print(f"    Response status: {response.status_code}")
    
    # Check if the response contains expected fields
    if 'id' not in updated_trace:
        print(f"    ‚ö† Warning: Response missing 'id' field")
    
    # Check if status changed (optional, may not always apply)
    response_status = updated_trace.get('status')
    if response_status:
        print(f"    Trace status: {response_status}")
    
    return updated_trace


### 3.6 Validate Workflow Submission


In [180]:
def validate_workflow_submission(exp_id: str, trace_id: str, 
                                expected_output: Any, 
                                verify_by_refetch: bool = False) -> bool:
    """Validate that a workflow submission was successful.
    
    Args:
        exp_id: Experiment ID
        trace_id: Trace ID that was updated
        expected_output: The output data that was submitted
        verify_by_refetch: If True, re-fetch the trace to verify the update persisted
        
    Returns:
        True if validation passes, False otherwise
    """
    if not verify_by_refetch:
        # Basic validation - just check if we got a trace ID back
        return trace_id is not None
    
    # Advanced validation - re-fetch and compare
    try:
        print(f"    Validating submission by re-fetching trace...")
        refetched_trace = get_trace_details(exp_id, trace_id, include_full_span_tree=False)
        
        # Check if output was updated
        refetched_output = refetched_trace.get('output')
        
        # Convert both to strings for comparison (in case of type differences)
        expected_str = json.dumps(expected_output, sort_keys=True) if not isinstance(expected_output, str) else expected_output
        refetched_str = json.dumps(refetched_output, sort_keys=True) if not isinstance(refetched_output, str) else str(refetched_output)
        
        if expected_str == refetched_str:
            print(f"    ‚úì Validation passed: Output matches")
            return True
        else:
            print(f"    ‚ö† Validation warning: Output differs")
            print(f"      Expected length: {len(expected_str)}")
            print(f"      Refetched length: {len(refetched_str)}")
            # Not necessarily an error - API might transform the data
            return True  # Return True but warn
            
    except Exception as e:
        print(f"    ‚ö† Validation error: {e}")
        return False


### 3.7 Wait Helper Function


In [181]:
def wait_for_processing(seconds: int = 10):
    """Wait for placeholder traces to be created or evaluators to run."""
    print(f"\nWaiting {seconds} seconds for processing...")
    time.sleep(seconds)
    print("‚úì Wait complete")


## 4. Custom Processing Logic

Define your custom workflow logic here. **Replace this placeholder function with your actual processing logic.**


In [182]:
def process_with_custom_logic(input_data: Any) -> Dict[str, Any]:
    """Your custom workflow processing logic goes here.
    
    TODO: Replace this with your actual processing logic.
    Examples:
    - Call your own LLM API
    - Run a fine-tuned model
    - Execute a custom pipeline
    - Combine multiple models
    """
    # Parse input if it's a JSON string
    if isinstance(input_data, str):
        try:
            parsed_input = json.loads(input_data)
        except:
            parsed_input = input_data
    else:
        parsed_input = input_data
    
    # Placeholder response - replace with your actual workflow
    output = {
        "response": "This is a placeholder. Replace with your actual workflow output.",
        "confidence": 0.95,
        "model_used": "custom-model-v1"
    }
    
    metadata = {
        "processing_time": 1.5,
        "tokens_used": 150,
        "custom_metric": 0.85
    }
    
    return {
        "output": output,
        "metadata": metadata
    }


## 5. Main Workflow Execution

This orchestrates the complete workflow from creation to evaluation.


In [None]:
def run_complete_workflow(max_traces: int = 5):
    """Execute the complete custom workflow from start to finish."""
    global experiment_id, trace_ids
    
    print("=" * 70)
    print("CUSTOM WORKFLOW EXPERIMENT - COMPLETE EXECUTION")
    print("=" * 70)
    
    try:
        # Step 1: Create experiment
        print("\n[STEP 1] Creating custom workflow experiment...")
        experiment_data = create_experiment(
            name=EXPERIMENT_NAME,
            description=EXPERIMENT_DESCRIPTION,
            dataset_id=DATASET_ID,
            workflows=[WORKFLOW_CONFIG],
            evaluator_slugs=EVALUATOR_SLUGS
        )
        experiment_id = experiment_data.get('id')
        
        # Step 2: Wait for placeholder traces
        print("\n[STEP 2] Waiting for placeholder traces to be created...")
        print("  (Traces are created asynchronously, this may take 15-30 seconds)")
        wait_for_processing(20)
        
        # Step 3: Get experiment summary (optional - may fail if traces not ready)
        print("\n[STEP 3] Getting experiment summary...")
        try:
            summary = get_experiment_summary(experiment_id)
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 500:
                print("‚ö† Summary endpoint returned 500 (backend bug with NaN values). Skipping summary check.")
                summary = None
            else:
                raise
        
        # Step 4: List placeholder traces (no filter to see all traces)
        print("\n[STEP 4] Listing placeholder traces...")
        logs_data = list_experiment_logs(experiment_id, filters=None, page_size=max_traces)
        
        results = logs_data.get('results', [])
        if not results:
            print("‚ö† No traces found yet. Placeholder traces are still being created.")
            print("   Try waiting longer or running the workflow again in a few moments.")
            return None
        
        print(f"‚úì Found {len(results)} trace(s)")
        
        # Show first trace structure for debugging
        if results:
            print(f"\n  üìã First trace structure (for debugging):")
            first_trace = results[0]
            print(f"    Keys: {list(first_trace.keys())}")
            print(f"    ID: {first_trace.get('id')}")
            print(f"    Status: {first_trace.get('status')}")
            print(f"    Name: {first_trace.get('name')}")
            if 'input' in first_trace:
                input_preview = str(first_trace.get('input'))[:100]
                print(f"    Input preview: {input_preview}...")
            if 'output' in first_trace:
                output_preview = str(first_trace.get('output'))[:100]
                print(f"    Output preview: {output_preview}...")
        

   
        print(f"\n  ‚ÑπÔ∏è  Processing all traces (API docs may be outdated)")
        status_counts = {}
        for t in results:
            status = t.get('status', 'unknown')
            status_counts[status] = status_counts.get(status, 0) + 1
        print(f"  Status breakdown: {status_counts}")
        
        results = results[:max_traces]  # Limit to max_traces
        print(f"  ‚Üí Will process {len(results)} trace(s)")
        
        # Step 5 & 6: Process and submit results
        print(f"\n[STEP 5-6] Processing and submitting {len(results)} traces...")
        processed_traces = []
        
        for i, trace in enumerate(results, 1):
            trace_id = trace.get('id')
            trace_ids.append(trace_id)
            
            print(f"\n  Processing trace {i}/{len(results)} ({trace_id[:16]}...)...")
            
            # Get trace details (without full span tree to keep it fast)
            trace_details = get_trace_details(experiment_id, trace_id, include_full_span_tree=False)
            
            # Extract input from span_tree or top level
            span_tree = trace_details.get('span_tree', [])
            input_data = span_tree[0].get('input') if span_tree else trace_details.get('input')
            
            print(f"    Input preview: {str(input_data)[:100]}...")
            
            # Process with custom logic
            print(f"    Running custom workflow...")
            result = process_with_custom_logic(input_data)
            
            # Submit results
            print(f"    Submitting results...")
            updated_trace = submit_workflow_results(
                exp_id=experiment_id,
                trace_id=trace_id,
                input_data=input_data,
                output_data=result['output'],
                name=f"Processed: {trace.get('name', 'Trace')}",
                metadata=result.get('metadata')
            )
            
            # Validate submission
            is_valid = validate_workflow_submission(
                exp_id=experiment_id,
                trace_id=trace_id,
                expected_output=result['output'],
                verify_by_refetch=False  # Set to True for thorough validation (slower)
            )
            
            if is_valid:
                processed_traces.append(updated_trace)
                print(f"    ‚úì Submitted and validated successfully")
            else:
                print(f"    ‚ö† Submission validation failed for trace {trace_id}")
        
        print(f"\n‚úì Successfully processed and submitted {len(processed_traces)} traces")
        
        # Step 7: Wait for evaluators
        print("\n[STEP 7] Waiting for evaluators to complete...")
        wait_for_processing(15)
        
        # Step 8: Get final summary (optional)
        print("\n[STEP 8] Getting final experiment summary...")
        try:
            final_summary = get_experiment_summary(experiment_id)
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == 500:
                print("‚ö† Summary endpoint returned 500 (backend bug with NaN values). Skipping final summary.")
                final_summary = {}
            else:
                raise
        
        # Display results
        print("\n" + "=" * 70)
        print("WORKFLOW COMPLETED SUCCESSFULLY")
        print("=" * 70)
        print(f"Experiment ID: {experiment_id}")
        print(f"Traces Processed: {len(processed_traces)}")
        print(f"Total Cost: ${final_summary.get('total_cost', 0):.4f}")
        print(f"Total Tokens: {final_summary.get('total_tokens', 0)}")
        
        return {
            "experiment_id": experiment_id,
            "processed_count": len(processed_traces),
            "trace_ids": trace_ids,
            "summary": final_summary
        }
        
    except requests.exceptions.RequestException as e:
        print(f"\n‚ùå Error: {e}")
        if hasattr(e, 'response') and hasattr(e.response, 'text'):
            print(f"Response: {e.response.text}")
        raise
    except Exception as e:
        print(f"\n‚ùå Unexpected error: {e}")
        raise


## 6. Run the Workflow

Execute the complete workflow. **Make sure you've configured Section 2 first!**


In [184]:
# Run the complete workflow
# Process maximum 5 traces (change this number as needed)
workflow_results = run_complete_workflow(max_traces=5)


CUSTOM WORKFLOW EXPERIMENT - COMPLETE EXECUTION

[STEP 1] Creating custom workflow experiment...
Creating custom workflow experiment...
  URL: http://127.0.0.1:8000/api/v2/experiments/
  Name: My Custom Workflow Experiment
  Dataset: 2cfbb5b4-53d2-4232-abd3-de4886f0255e
  Evaluators: edb79105-0efe-42fd-b6b5-6e10d2b04117
  Request Body: {
  "name": "My Custom Workflow Experiment",
  "description": "Testing custom workflow implementation with V2 API",
  "dataset_id": "2cfbb5b4-53d2-4232-abd3-de4886f0255e",
  "workflows": [
    {
      "type": "custom",
      "config": {
        "name": "Custom Processing Workflow",
        "description": "User-defined workflow for custom processing"
      }
    }
  ],
  "evaluator_slugs": [
    "edb79105-0efe-42fd-b6b5-6e10d2b04117"
  ]
}

‚úì Experiment created with ID: 1eac635fc8c848c19d93a72c7622cddd
  Status: pending
  Placeholder traces are being created asynchronously...

[STEP 2] Waiting for placeholder traces to be created...
  (Traces are create

## 7. Validate Workflow Submissions

Verify that all workflow submissions were successful and data persisted correctly.


In [185]:
# Validate all submitted traces
if workflow_results and trace_ids:
    print("\n" + "=" * 70)
    print("VALIDATING WORKFLOW SUBMISSIONS")
    print("=" * 70)
    
    print(f"\nRe-fetching {len(trace_ids)} traces to verify submissions...")
    
    validation_results = []
    for i, trace_id in enumerate(trace_ids, 1):
        print(f"\n[{i}/{len(trace_ids)}] Validating trace {trace_id[:16]}...")
        
        try:
            # Re-fetch the trace
            trace = get_trace_details(experiment_id, trace_id, include_full_span_tree=False)
            
            # Check key fields
            has_id = 'id' in trace
            has_output = 'output' in trace and trace['output']
            status = trace.get('status', 'unknown')
            
            validation_results.append({
                'trace_id': trace_id,
                'has_id': has_id,
                'has_output': has_output,
                'status': status,
                'valid': has_id and has_output
            })
            
            if has_id and has_output:
                print(f"  ‚úì Valid - Status: {status}, Output present: {len(str(trace['output']))} chars")
            else:
                print(f"  ‚ö† Issues detected:")
                if not has_id:
                    print(f"    - Missing ID")
                if not has_output:
                    print(f"    - Missing or empty output")
                    
        except Exception as e:
            print(f"  ‚ùå Error validating: {e}")
            validation_results.append({
                'trace_id': trace_id,
                'valid': False,
                'error': str(e)
            })
    
    # Summary
    valid_count = sum(1 for r in validation_results if r.get('valid', False))
    print(f"\n" + "=" * 70)
    print(f"VALIDATION SUMMARY")
    print(f"=" * 70)
    print(f"Total traces: {len(trace_ids)}")
    print(f"Valid submissions: {valid_count}")
    print(f"Failed validations: {len(trace_ids) - valid_count}")
    
    if valid_count == len(trace_ids):
        print(f"\n‚úÖ All workflow submissions validated successfully!")
    else:
        print(f"\n‚ö† Some submissions may have issues - review details above")
    
else:
    print("\n‚ö† No workflow results to validate. Run the workflow first.")



VALIDATING WORKFLOW SUBMISSIONS

Re-fetching 3 traces to verify submissions...

[1/3] Validating trace 436c34e878584a44...
  Getting trace details...
    URL: http://127.0.0.1:8000/api/v2/experiments/1eac635fc8c848c19d93a72c7622cddd/logs/436c34e878584a449da861600f5b05d0/
    Method: GET
  ‚úì Valid - Status: success, Output present: 50 chars

[2/3] Validating trace 53c74a30618f4536...
  Getting trace details...
    URL: http://127.0.0.1:8000/api/v2/experiments/1eac635fc8c848c19d93a72c7622cddd/logs/53c74a30618f4536a319feb2407b6496/
    Method: GET
  ‚úì Valid - Status: success, Output present: 50 chars

[3/3] Validating trace bde70a9189b341f3...
  Getting trace details...
    URL: http://127.0.0.1:8000/api/v2/experiments/1eac635fc8c848c19d93a72c7622cddd/logs/bde70a9189b341f382f6af6e370ef944/
    Method: GET
  ‚úì Valid - Status: success, Output present: 50 chars

VALIDATION SUMMARY
Total traces: 3
Valid submissions: 3
Failed validations: 0

‚úÖ All workflow submissions validated succes

## 8. View Evaluator Results

Check the evaluator scores for your submitted results.
