# GFQL Validation for LLMs and Automation

Learn how to integrate GFQL validation with Large Language Models and automation pipelines.

## Target Audience
- AI/ML Engineers building GFQL generation systems
- Developers integrating LLMs with graph queries
- Teams building automated query generation pipelines

## What You'll Learn
- Structured error formats for LLM consumption
- Automated fix suggestions and corrections
- Integration patterns with LLM APIs
- Building robust query generation pipelines

In [None]:
# Core imports
import json
import pandas as pd
import graphistry
from typing import Dict, List, Any, Optional

from graphistry.compute.validate import (
    validate_syntax,
    validate_schema,
    validate_query,
    extract_schema_from_dataframes,
    ValidationIssue
)

print(f"PyGraphistry version: {graphistry.__version__}")

## JSON Serialization Basics

Convert validation results to structured formats for LLMs.

In [None]:
# Helper to serialize ValidationIssue to JSON
def validation_issue_to_dict(issue: ValidationIssue) -> Dict[str, Any]:
    """Convert ValidationIssue to JSON-serializable dict."""
    return {
        "level": issue.level,
        "message": issue.message,
        "operation_index": issue.operation_index,
        "field": issue.field,
        "suggestion": issue.suggestion
    }

# Example with invalid query
invalid_query = [
    {"type": "node"},  # Wrong type
    {"type": "e_forward", "filter": {"weight": "high"}}  # Invalid filter
]

issues = validate_syntax(invalid_query)

# Convert to JSON
json_output = {
    "query": invalid_query,
    "valid": len(issues) == 0,
    "error_count": sum(1 for i in issues if i.level == "error"),
    "warning_count": sum(1 for i in issues if i.level == "warning"),
    "issues": [validation_issue_to_dict(issue) for issue in issues]
}

print("JSON output for LLM:")
print(json.dumps(json_output, indent=2))

## Structured Error Formats

Create error formats optimized for LLM understanding and correction.

In [None]:
def create_llm_error_report(query: List[Dict], issues: List[ValidationIssue]) -> Dict[str, Any]:
    """Create structured error report for LLMs."""
    
    # Categorize errors
    errors_by_type = {
        "syntax_errors": [],
        "semantic_warnings": [],
        "schema_errors": []
    }
    
    for issue in issues:
        issue_dict = validation_issue_to_dict(issue)
        
        # Add context about the problematic operation
        if issue.operation_index is not None and issue.operation_index < len(query):
            issue_dict["operation"] = query[issue.operation_index]
        
        # Categorize
        if "Invalid operation type" in issue.message or "Invalid filter" in issue.message:
            errors_by_type["syntax_errors"].append(issue_dict)
        elif "Column" in issue.message and "not found" in issue.message:
            errors_by_type["schema_errors"].append(issue_dict)
        else:
            errors_by_type["semantic_warnings"].append(issue_dict)
    
    return {
        "validation_status": "failed" if any(errors_by_type.values()) else "passed",
        "total_issues": len(issues),
        "fixable_issues": sum(1 for i in issues if i.suggestion is not None),
        "errors_by_type": errors_by_type,
        "requires_schema": len(errors_by_type["schema_errors"]) > 0
    }

# Test with complex errors
complex_invalid = [
    {"type": "n", "filter": {"unknown_col": {"eq": "value"}}},
    {"type": "edge_forward"},  # Wrong type
    {"type": "n", "filter": {"score": {"equals": 100}}}  # Wrong operator
]

# Create sample schema
sample_df = pd.DataFrame({"id": [1], "name": ["test"], "score": [100]})
schema = extract_schema_from_dataframes(sample_df, pd.DataFrame())

# Validate syntax and schema
syntax_issues = validate_syntax(complex_invalid)
schema_issues = validate_schema(complex_invalid, schema)
all_issues = syntax_issues + schema_issues

report = create_llm_error_report(complex_invalid, all_issues)
print("LLM Error Report:")
print(json.dumps(report, indent=2))

## Automated Fix Suggestions

Generate actionable suggestions for fixing validation errors.

In [None]:
def suggest_fixes(query: List[Dict], issues: List[ValidationIssue], 
                  schema: Optional[Any] = None) -> List[Dict[str, Any]]:
    """Generate fix suggestions for validation issues."""
    fixes = []
    
    for issue in issues:
        fix = {
            "issue": validation_issue_to_dict(issue),
            "fixes": []
        }
        
        # Fix invalid operation types
        if "Invalid operation type" in issue.message:
            if issue.operation_index is not None:
                op = query[issue.operation_index]
                if op.get("type") == "node":
                    fix["fixes"].append({
                        "action": "replace",
                        "path": f"[{issue.operation_index}].type",
                        "old_value": "node",
                        "new_value": "n"
                    })
                elif op.get("type") == "edge_forward":
                    fix["fixes"].append({
                        "action": "replace",
                        "path": f"[{issue.operation_index}].type",
                        "old_value": "edge_forward",
                        "new_value": "e_forward"
                    })
        
        # Fix invalid operators
        if "Invalid operator" in issue.message:
            if "equals" in issue.message:
                fix["fixes"].append({
                    "action": "replace_key",
                    "description": "Change 'equals' to 'eq'",
                    "example": '{"score": {"eq": 100}}'
                })
            elif "greater" in issue.message:
                fix["fixes"].append({
                    "action": "replace_key",
                    "description": "Change 'greater' to 'gt'",
                    "example": '{"score": {"gt": 100}}'
                })
        
        # Fix column not found
        if "Column" in issue.message and "not found" in issue.message and schema:
            # Extract column name from message
            import re
            match = re.search(r"Column '(\w+)'", issue.message)
            if match:
                missing_col = match.group(1)
                # Suggest similar columns
                if hasattr(schema, 'node_columns'):
                    available = list(schema.node_columns.keys())
                    fix["fixes"].append({
                        "action": "use_available_column",
                        "missing": missing_col,
                        "available": available,
                        "suggestion": f"Use one of: {', '.join(available)}"
                    })
        
        if fix["fixes"]:
            fixes.append(fix)
    
    return fixes

# Test fix suggestions
fixes = suggest_fixes(complex_invalid, all_issues, schema)
print("Fix Suggestions:")
print(json.dumps(fixes, indent=2))

## Error Categorization

Categorize errors for prioritized fixing by LLMs.

In [None]:
def categorize_and_prioritize_errors(issues: List[ValidationIssue]) -> Dict[str, Any]:
    """Categorize and prioritize errors for LLM processing."""
    
    categories = {
        "critical": [],    # Must fix - query won't run
        "important": [],   # Should fix - query may fail on data
        "suggested": []    # Nice to fix - performance/style
    }
    
    for issue in issues:
        issue_dict = validation_issue_to_dict(issue)
        
        # Critical: syntax errors
        if issue.level == "error" and any(keyword in issue.message for keyword in 
            ["Invalid operation", "Invalid filter", "Invalid predicate"]):
            categories["critical"].append(issue_dict)
        
        # Important: schema errors
        elif "not found" in issue.message or "type mismatch" in issue.message:
            categories["important"].append(issue_dict)
        
        # Suggested: warnings
        elif issue.level == "warning":
            categories["suggested"].append(issue_dict)
        
        else:
            categories["important"].append(issue_dict)
    
    return {
        "categories": categories,
        "fix_order": [
            "1. Fix all critical errors first (syntax)",
            "2. Then fix important errors (schema/types)",
            "3. Finally address suggested improvements"
        ],
        "estimated_iterations": max(1, len(categories["critical"]) + 
                                   (1 if categories["important"] else 0))
    }

# Test categorization
categorized = categorize_and_prioritize_errors(all_issues)
print("Error Categorization for LLM:")
print(json.dumps(categorized, indent=2))

## LLM Integration Patterns

Common patterns for integrating validation with LLM query generation.

In [None]:
class GFQLValidationPipeline:
    """Pipeline for validating and fixing LLM-generated GFQL queries."""
    
    def __init__(self, schema=None, max_iterations=3):
        self.schema = schema
        self.max_iterations = max_iterations
        self.history = []
    
    def validate_and_report(self, query: List[Dict]) -> Dict[str, Any]:
        """Validate query and create comprehensive report."""
        
        # Syntax validation
        syntax_issues = validate_syntax(query)
        
        # Schema validation if available
        schema_issues = []
        if self.schema:
            schema_issues = validate_schema(query, self.schema)
        
        all_issues = syntax_issues + schema_issues
        
        # Create report
        report = {
            "query": query,
            "iteration": len(self.history),
            "valid": len(all_issues) == 0,
            "issues": [validation_issue_to_dict(i) for i in all_issues],
            "error_report": create_llm_error_report(query, all_issues),
            "fixes": suggest_fixes(query, all_issues, self.schema),
            "categories": categorize_and_prioritize_errors(all_issues)
        }
        
        self.history.append(report)
        return report
    
    def create_llm_prompt(self, report: Dict[str, Any]) -> str:
        """Create prompt for LLM to fix the query."""
        
        if report["valid"]:
            return "Query is valid. No fixes needed."
        
        prompt = f"""Fix the following GFQL query based on validation errors:

Current Query:
{json.dumps(report['query'], indent=2)}

Errors to Fix:
{json.dumps(report['categories']['categories'], indent=2)}

Suggested Fixes:
{json.dumps(report['fixes'], indent=2)}

Please provide the corrected query as a JSON array.
Fix critical errors first, then important ones.
"""
        return prompt

# Example usage
pipeline = GFQLValidationPipeline(schema=schema)

# Validate problematic query
report = pipeline.validate_and_report(complex_invalid)

print("Validation Pipeline Report:")
print(f"Valid: {report['valid']}")
print(f"Total Issues: {len(report['issues'])}")
print(f"\nLLM Prompt Preview:")
print(pipeline.create_llm_prompt(report)[:500] + "...")

## Mock LLM Examples

Simulate LLM query generation and iterative refinement.

In [None]:
class MockLLM:
    """Mock LLM for demonstrating validation integration."""
    
    def generate_query(self, natural_language: str) -> List[Dict]:
        """Simulate LLM generating GFQL from natural language."""
        
        # Simulate common LLM mistakes
        if "high risk users" in natural_language.lower():
            return [
                {"type": "node", "filter": {"user_type": {"equals": "user"}}},  # Wrong!
                {"type": "e_forward"},
                {"type": "n", "filter": {"risk_score": {"greater": 80}}}  # Wrong operator
            ]
        
        return [{"type": "n"}]  # Default
    
    def fix_query(self, query: List[Dict], fixes: List[Dict]) -> List[Dict]:
        """Simulate LLM applying fixes."""
        import copy
        fixed = copy.deepcopy(query)
        
        # Apply some fixes
        for fix in fixes:
            for suggested_fix in fix.get("fixes", []):
                if suggested_fix["action"] == "replace":
                    # Simple fix simulation
                    if "node" in str(fixed):
                        fixed = json.loads(json.dumps(fixed).replace('"node"', '"n"'))
                    if "equals" in str(fixed):
                        fixed = json.loads(json.dumps(fixed).replace('"equals"', '"eq"'))
                    if "greater" in str(fixed):
                        fixed = json.loads(json.dumps(fixed).replace('"greater"', '"gt"'))
        
        return fixed

# Demonstrate iterative refinement
llm = MockLLM()
pipeline = GFQLValidationPipeline(schema=schema, max_iterations=3)

# Initial generation
nl_query = "Find high risk users connected to recent transactions"
print(f"Natural Language: {nl_query}\n")

query = llm.generate_query(nl_query)
print(f"Initial LLM Query:")
print(json.dumps(query, indent=2))

# Iterative refinement
for i in range(pipeline.max_iterations):
    print(f"\n--- Iteration {i+1} ---")
    
    report = pipeline.validate_and_report(query)
    
    if report["valid"]:
        print("✅ Query is valid!")
        break
    
    print(f"Issues found: {len(report['issues'])}")
    
    # LLM fixes the query
    query = llm.fix_query(query, report["fixes"])
    print(f"Fixed query:")
    print(json.dumps(query, indent=2))

print(f"\nFinal validation history: {len(pipeline.history)} iterations")

In [None]:
# Simulate more complex LLM interaction
def simulate_llm_conversation(natural_language: str, schema=None):
    """Simulate a full LLM conversation with validation feedback."""
    
    print(f"User: {natural_language}")
    print("\nLLM: I'll create a GFQL query for that.\n")
    
    # Initial attempt (with intentional errors)
    if "products purchased by VIP customers" in natural_language:
        attempt1 = [
            {"type": "n", "filter": {"customer_type": {"eq": "VIP"}}},
            {"type": "edge", "filter": {"action": {"eq": "purchase"}}},
            {"type": "n", "filter": {"type": {"eq": "product"}}}
        ]
    else:
        attempt1 = [{"type": "nodes"}]  # Generic error
    
    print("First attempt:")
    print(json.dumps(attempt1, indent=2))
    
    # Validate
    issues = validate_syntax(attempt1)
    if schema:
        issues.extend(validate_schema(attempt1, schema))
    
    if issues:
        print("\nValidation found issues:")
        for issue in issues[:3]:  # Show first 3
            print(f"- {issue.level}: {issue.message}")
            if issue.suggestion:
                print(f"  Suggestion: {issue.suggestion}")
        
        print("\nLLM: Let me fix those issues...\n")
        
        # Fixed version
        if "products purchased by VIP customers" in natural_language:
            attempt2 = [
                {"type": "n", "filter": {"type": {"eq": "customer"}}},  # Fixed column
                {"type": "e_forward"},  # Fixed type
                {"type": "n", "filter": {"type": {"eq": "product"}}}
            ]
        else:
            attempt2 = [{"type": "n"}]  # Fixed
        
        print("Fixed query:")
        print(json.dumps(attempt2, indent=2))
        
        # Re-validate
        final_issues = validate_syntax(attempt2)
        if not final_issues:
            print("\n✅ Query is now valid!")
        else:
            print(f"\n⚠️  Still has {len(final_issues)} issues")
    else:
        print("\n✅ Query is valid on first attempt!")

# Test conversations
print("=== Conversation 1 ===")
simulate_llm_conversation("Show me products purchased by VIP customers", schema)

print("\n\n=== Conversation 2 ===")
simulate_llm_conversation("Find all nodes in the graph")

## Prompt Engineering for GFQL

Best practices for prompting LLMs to generate valid GFQL.

In [None]:
def create_gfql_system_prompt(schema=None) -> str:
    """Create system prompt for LLMs generating GFQL."""
    
    prompt = """You are a GFQL (Graph Frame Query Language) expert. 

GFQL Rules:
1. Queries are JSON arrays of operations
2. Valid operation types: "n" (node), "e_forward", "e_reverse", "e" (edge)
3. Filters use operators: eq, ne, gt, gte, lt, lte, contains, regex, in, between
4. Complex filters use _and, _or for combining conditions
5. Always validate column names against the schema

Common Patterns:
- Node filter: {"type": "n", "filter": {"column": {"op": value}}}
- Edge traversal: {"type": "e_forward", "hops": 1}
- Named operations: {"type": "n", "name": "my_nodes"}
"""
    
    if schema and hasattr(schema, 'node_columns'):
        prompt += f"\n\nAvailable columns:\n"
        prompt += f"Nodes: {list(schema.node_columns.keys())}\n"
        if hasattr(schema, 'edge_columns'):
            prompt += f"Edges: {list(schema.edge_columns.keys())}\n"
    
    return prompt

# Generate prompts
print("System Prompt for LLM:")
print(create_gfql_system_prompt(schema))

print("\n" + "="*50 + "\n")

# Example user prompts with expected GFQL
example_prompts = [
    {
        "user": "Find all customers",
        "gfql": [{"type": "n", "filter": {"type": {"eq": "customer"}}}]
    },
    {
        "user": "Show nodes with score above 90",
        "gfql": [{"type": "n", "filter": {"score": {"gt": 90}}}]
    },
    {
        "user": "Find customers and their connections",
        "gfql": [
            {"type": "n", "filter": {"type": {"eq": "customer"}}},
            {"type": "e_forward", "hops": 1},
            {"type": "n"}
        ]
    }
]

print("Example Prompts for Training:")
for ex in example_prompts:
    print(f"\nUser: {ex['user']}")
    print(f"GFQL: {json.dumps(ex['gfql'])}")

## Best Practices

Key recommendations for LLM integration with GFQL validation.

In [None]:
# Best practices demonstration
class LLMIntegrationBestPractices:
    """Demonstrate best practices for LLM-GFQL integration."""
    
    @staticmethod
    def validate_before_execution(query):
        """Always validate before executing."""
        issues = validate_syntax(query)
        if issues:
            return {"execute": False, "reason": "Validation failed", "issues": issues}
        return {"execute": True}
    
    @staticmethod
    def provide_schema_context(schema):
        """Give LLMs schema information."""
        context = {
            "node_columns": {},
            "edge_columns": {}
        }
        
        if hasattr(schema, 'node_columns'):
            for col, dtype in schema.node_columns.items():
                context["node_columns"][col] = {
                    "type": str(dtype),
                    "operators": ["eq", "ne", "in"] if "object" in str(dtype) 
                                else ["eq", "ne", "gt", "gte", "lt", "lte"]
                }
        
        return context
    
    @staticmethod
    def implement_retry_logic(query, max_retries=3):
        """Implement exponential backoff for fixes."""
        import time
        
        for attempt in range(max_retries):
            issues = validate_syntax(query)
            if not issues:
                return {"success": True, "attempts": attempt + 1}
            
            # Simulate fix attempt
            time.sleep(0.1 * (2 ** attempt))  # Exponential backoff
            
        return {"success": False, "attempts": max_retries}

# Demonstrate best practices
bp = LLMIntegrationBestPractices()

print("1. Always Validate Before Execution:")
test_query = [{"type": "n"}, {"type": "invalid"}]
result = bp.validate_before_execution(test_query)
print(f"   Execute: {result['execute']}")
if not result['execute']:
    print(f"   Reason: {result['reason']}")

print("\n2. Provide Schema Context to LLMs:")
context = bp.provide_schema_context(schema)
print(f"   Schema context: {json.dumps(context, indent=2)[:200]}...")

print("\n3. Implement Retry Logic:")
retry_result = bp.implement_retry_logic([{"type": "n"}])
print(f"   Success: {retry_result['success']} in {retry_result['attempts']} attempt(s)")

## Summary & Resources

### Key Takeaways
1. **Structured Formats**: Convert validation to JSON for LLM consumption
2. **Error Categorization**: Prioritize fixes (critical → important → suggested)
3. **Iterative Refinement**: Use validation feedback for query improvement
4. **Schema Context**: Always provide available columns to LLMs
5. **Prompt Engineering**: Use specific GFQL rules and examples

### Integration Checklist
- ✅ Serialize validation issues to JSON
- ✅ Implement fix suggestion generation
- ✅ Create iterative validation pipeline
- ✅ Provide schema context in prompts
- ✅ Handle rate limiting and retries
- ✅ Log validation metrics

### Next Steps
- Integrate with real LLM providers (OpenAI, Anthropic, etc.)
- Build production validation pipelines
- Create domain-specific GFQL templates
- Monitor and improve generation accuracy

### Resources
- [GFQL Language Specification](https://docs.graphistry.com/gfql/spec/language/)
- [Production Validation Patterns](./gfql_validation_production.ipynb)
- [GFQL Documentation](https://docs.graphistry.com/gfql/)