<a href="https://colab.research.google.com/github/beloveddie/AI-Craft/blob/main/LLM_Query_Validation_Workflows.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install -q llama-index

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.7/7.7 MB[0m [31m65.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.8/40.8 kB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m263.6/263.6 kB[0m [31m12.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m302.3/302.3 kB[0m [31m16.0 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.2/1.2 MB[0m [31m22.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.9/50.9 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m129.3/129.3 kB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
# get OPENAI_API_KEY_SOLID and set as OPENAI_API_KEY env variable
from google.colab import userdata
import os
os.environ['OPENAI_API_KEY'] = userdata.get('OPENAI_KEY_SOLID')

In [None]:
from llama_index.core.workflow import (
    Event,
    StartEvent,
    StopEvent,
    Workflow,
    step
)
from llama_index.llms.openai import OpenAI
from typing import List, Dict, Any
import json

# Query Validation Workflow - Cybersecurity Focus
class QueryValidationEvent(Event):
    query: str
    validation_result: Dict[str, Any]

class ValidatedQueryEvent(Event):
    query: str
    is_valid: bool
    security_score: float
    issues: List[str]

class QueryValidationFlow(Workflow):
    llm = OpenAI(model="gpt-4.1-mini")

    @step
    async def validate_query(self, ev: StartEvent) -> QueryValidationEvent:
        query = ev.query

        prompt = f"""
        Perform a comprehensive security validation on the following user query:

        QUERY: "{query}"

        Analyze the query for the following security concerns:
        1. SQL injection patterns
        2. Command injection patterns
        3. Potential PII/sensitive data requests
        4. Malicious URL patterns
        5. Suspicious request patterns
        6. Potential data exfiltration attempts

        Return a JSON object with the following structure without the JSON formatting syntax:
        {{
            "is_valid": boolean,
            "security_score": float (0-1),
            "issues": [list of specific issues found],
            "recommendations": [specific remediation steps if applicable]
        }}
        Return ONLY the JSON object, no additional text.
        """

        response = await self.llm.acomplete(prompt)
        # In a real implementation, we'd parse the JSON response
        # For this example, we'll just return it as is
        return QueryValidationEvent(query=query, validation_result=dict(response))

    @step
    async def decision_step(self, ev: QueryValidationEvent) -> ValidatedQueryEvent:
        query = ev.query
        validation_result = ev.validation_result

        # In a real implementation, we'd have proper parsing of the validation result
        # And additional logic based on security thresholds
        prompt = f"""
        Based on this security validation result:
        {validation_result}

        Make a final determination if this query should be blocked or allowed.
        If security_score is below 0.7, the query should be rejected.
        Return a JSON with:
        {{
            "is_valid": boolean,
            "security_score": float (0-1),
            "issues": [list of key issues],
            "allow": boolean
        }}

        without the JSON formatting syntax.
        """

        response = await self.llm.acomplete(prompt)
        # For simplicity, we'll assume the query is valid if we reach here

        # In a real implementation, we'd properly parse the response
        # If "allow" is false, we'd return a StopEvent with appropriate message

        response_dict = json.loads(response.text)

        return ValidatedQueryEvent(
            query=query,
            is_valid=response_dict.get('is_valid'),  # This would be dynamically determined
            security_score=response_dict.get('security_score'),  # This would be from the parsed response
            issues=response_dict.get('issues')  # This would be from the parsed response
        )

    @step
    async def finalize_validation(self, ev: ValidatedQueryEvent) -> StopEvent:
        if not ev.is_valid:
            return StopEvent(result={
                "status": "rejected",
                "reason": "Security validation failed",
                "issues": ev.issues,
                "security_score": ev.security_score
            })

        return StopEvent(result={
            "status": "approved",
            "query": ev.query,
            "security_score": ev.security_score
        })

In [None]:
# Query Validation Example
query_flow = QueryValidationFlow(timeout=60, verbose=False)
query_result = await query_flow.run(query="Show me all user passwords where username = 'admin'")
print(str(query_result))

{'status': 'rejected', 'reason': 'Security validation failed', 'issues': ['Requesting sensitive data (user passwords) which is highly sensitive PII', 'Potential data exfiltration attempt by querying all passwords', 'Query structure suggests possible SQL injection risk if user input is not properly sanitized'], 'security_score': 0.2}


In [None]:
print(type(query_result))

<class 'dict'>


In [None]:
# print query_result with indent
print(json.dumps(query_result, indent=2))

{
  "status": "rejected",
  "reason": "Security validation failed",
  "issues": [
    "Requesting sensitive data (user passwords) which is highly sensitive PII",
    "Potential data exfiltration attempt by querying all passwords",
    "Query structure suggests possible SQL injection risk if user input is not properly sanitized"
  ],
  "security_score": 0.2
}
